[经验分享] 【SUBJECT技术】定时器实现原理及应用举例 原创

诚迈_雨哥 显示全部楼层 发表于 2024-6-1 15:39:45

目录

[TOC]

一、定时器实现原理

1. timerfd 应用时序图

时钟管理器的应用包括典型的几个类,DeviceStatusService是用来监听timerFd的,timerFd是一个文件句柄,因此它包含了epoll的几步典型操作, 如EpollCreate, AddEpoll,Wait EPOLLIN和EpollClose。当有时钟被触发时将在 wait EPOLL 中等到,然后执行始终回调函数callback(); DragManager 是一个典型的应用方,它需要等间隔时间去做一些周期性的事情或者延迟一定时间之后去做一些事情,于是它在开始时添加一个时钟,里面有一个到时要做的事情的回调函数,成功后会产生一个时钟timerId,结束时钟应用的时候需要移除该timerId对应的时钟; TimerManager 是多个时钟管理器类,一个子系统当中有多个时钟定时器,就需要根据时钟的触发时间进行排队、添加、移除管理,这个里面维系了一个重要的数据结构 struct TimerItem,详细说明如后所述, 该类调用 Timerfd提供的基础功能。 Timerfd 是OpenHarmony系统底层提供的一个时钟管理器,它提供了典型的两个函数(timerfd_gettime 没用到)和一个数据结构,如后所述。

timerfd.png

2. 定时器序列管理

在传统的IO多路复用系统中,定时操作通常是直接去设置poll()等函数的超时时间,系统超时之后去执行对应的定时回调。OpenHarmony系统已经将时间也抽象为了一种事件,并提供了对应的文件描述符fd机制,其能够被poll()等多路复用函数进行监视,因此可以很简洁地融入到我们之前实现的 Reactor机制中。这也是更科学的做法。

定时设置,也就是用户指定一个时刻和一个回调函数,系统需要在该时刻去执行该回调函数。这个定义包含了两层含义:

用户的定时设置是任意的。有可能先设置一个10秒的定时,接下来再设置一个5秒的定时。用户不一定会以expiration time的先后顺序去设置定时; 系统顺序执行回调函数。系统需要将所有注册过的定时事件,按照expiration time的先后顺序,在用户设置的每一个expiration time去执行对应的回调函数; 基于上述的特点,我们需要的定时器模块的流程示意图所示。

Timer_archi.PNG

首先解释一下图示的流程

class Timer: 显然每一个expiration time都对应着一个callback function,因此将其封装为一个类class Timer,方便执行各自的回调函数。在图中即对应着五个不同颜色的小圆圈; 接口:TimerQueue向用户提供Timer Insert接口,供用户插入各个Timer。在图中,我们先后插入了expiration time为10s, 18s, 20s, 15s, 10.01s的Timer; class TimerQueue内部过程:①将得到的所有Timer进行排序;②每当有新的Timer expired之时,抽取出过期的Timer,在图中对应着10s和10.01s两个Timer;③依次执行这些expired Timer的回调函数;

接下来则是根据流程所示的特点去分析如何构建class TimerQueue

首先,是何时执行timerfd_create()。这个很简单,自然是在构造函数中执行,将得到的timer fd作为class TimerQueue的成员变量;

其次,是何时执行timerfd_settime()。这个是需要重点思考的问题。最直接的解决方案自然是每插入一个Timer就执行一次timerfd_settime(),对应图1的例子,系统就会将10s, 10.01s, 15s, 18s, 20s这几个时刻都设置为过期时刻,随后在每一个过期时刻将对应的Timer取出,并执行回调函数。但这个方案有一个潜在的问题。timerfd_settime()是系统调用,频繁的调用会导致系统开销增大。 改进的思路就是只对最早过期的Timer设置timerfd_settime()。具体而言分两种情况: 新插入一个Timer:在插入Timer之前比较该Timer和TimerQueue中排名最靠前的Timer的expiration time,如果在其之前,则增设一个timerfd_settime(); 取出过期的Timer:在取出了过期的Timer之后,如果TimerQueue中还有剩余的Timer,则再对残余的Timer中最靠前的那一个Timer设置timerfd_settime()

随后,是如何组织TimerQueue中的Timer。我们必须保证有序,而插入新Timer的位置又是随机的。综合考虑,使用std::set是最方便的。问题是如果存在expiration time相同的Timer怎么办。muduo源码给出的解决方法很巧妙,其将std::set的元素设置为std::pair<TimeStamp, Timer *>,这样一来就完美的解决了问题,即使两个Timer的过期时刻一致,在std::set中也还能用Timer的地址去排列各个Timer的先后顺序。

最后,就是将class TimerQueue集成到EventLoop中,也就是作为class EventLoop的成员变量,这个很简单,不再赘述。

二、定时器代码解读

1. 时钟参数

TimerManager 基于这个数据结构用list进行管理。

    struct TimerItem {
        int32_t id { 0 };             // 时钟id
        int32_t intervalMs { 0 };     // 时钟间隔,单位毫秒
        int32_t callbackCount { 0 };  // 时钟触发的回调次数
        int32_t repeatCount { 0 };    // 设置等间隔触发次数
        int64_t nextCallTime { 0 };   // 预计下次触发的时间
        std::function<void()> callback { nullptr }; // 到时触发的回调函数
    };

2. timerfd.h 文件解读

timerfd是OpenHarmony为用户程序提供的一个定时器接口。这个接口基于文件描述符,通过文件描述符的可读事件进行超时通知,因此可以配合select/poll/epoll等使用。 下面对timerfd系列函数先做一个简单的介绍:

timerfd_create()函数

#include <sys/timerfd.h>

int timerfd_create(int clockid, int flags);
/** 
timerfd_create(): 函数创建一个定时器对象,同时返回一个与之关联的文件描述符。
clockid:clockid标识指定的时钟计数器,可选值(CLOCK_REALTIME、CLOCK_MONOTONIC。。。)
CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时,中间时刻如果系统时间被用户改成其他,则对应的时间相应改变
CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
flags:参数flags(TFD_NONBLOCK(非阻塞模式)/TFD_CLOEXEC(表示当程序执行exec函数时本fd将被系统自动关闭,表示不传递)
*/

timerfd_settime()函数

#include <sys/timerfd.h>

struct timespec {
    time_t tv_sec;                /* Seconds */
    long   tv_nsec;               /* Nanoseconds */
};

struct itimerspec {
    struct timespec it_interval;  /* Interval for periodic timer (定时间隔周期)*/
    struct timespec it_value;     /* Initial expiration (第一次超时时间)*/
};
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
/*
    timerfd_settime(): 此函数用于设置新的超时时间,并开始计时,能够启动和停止定时器;
    fd: 参数fd是timerfd_create函数返回的文件句柄
    flags:参数flags为1代表设置的是绝对时间(TFD_TIMER_ABSTIME 表示绝对定时器);为0代表相对时间。
    new_value: 参数new_value指定定时器的超时时间以及超时间隔时间
    old_value: 如果old_value不为NULL, old_vlaue返回之前定时器设置的超时时间,具体参考timerfd_gettime()函数

    ** it_interval不为0则表示是周期性定时器。
       it_value和it_interval都为0表示停止定时器
*/

timerfd_gettime()函数

int timerfd_gettime(int fd, struct itimerspec *curr_value);
/*
    timerfd_gettime(): 函数获取距离下次超时剩余的时间
    curr_value.it_value: 字段表示距离下次超时的时间,如果该值为0,表示计时器已经解除
    改字段表示的值永远是一个相对值,无论TFD_TIMER_ABSTIME是否被设置
    curr_value.it_interval: 定时器间隔时间
*/

read函数

用read函数读取计时器的超时次数,该值是一个8字节无符号的长整型

uint64_t exp = 0;
read(fd, &exp, sizeof(uint64_t)); 

三、定时器第应用举例

通过如下函数添加定时器 int32_t TimerManager::AddTimer(int32_t intervalMs, int32_t repeatCount, std::function<void()> callback)

例1:2秒之后初始化分布式硬件管理器

初始化穿越时,先初始化设备管理器,2秒之后初始化分布式硬件管理器,并且注册设备状态回调。

void CoordinationSM::Init()
{
    auto *context = COOR_EVENT_MGR->GetIContext();
    context->GetTimerManager().AddTimer(INTERVAL_MS, 1, [this]() {
        this->InitDeviceManager();
        COOR_SOFTBUS_ADAPTER->Init();
    });
    COOR_DEV_MGR->Init();
    runner_ = AppExecFwk::EventRunner::Create(THREAD_NAME);
    eventHandler_ = std::make_shared<CoordinationEventHandler>(runner_);
}

bool CoordinationSM::InitDeviceManager()
{
    initCallback_ = std::make_shared<DeviceInitCallBack>();
    int32_t ret = DIS_HARDWARE.InitDeviceManager(FI_PKG_NAME, initCallback_);

    stateCallback_ = std::make_shared<DmDeviceStateCallback>();
    ret = DIS_HARDWARE.RegisterDevStateCallback(FI_PKG_NAME, "", stateCallback_);
    return true;
}

例2: 设备管理器多次使能直到成功

服务框架中 OnStart() 开启线程,线程中首先使能epoll,一般不是一次就成功,最多需要尝试MAX_N_RETRIES次(100)。线程调用 EnableDevMgr(MAX_NRETRIES),EnableDevMgr()中使能 devMgr.Enable(),如果失败则每间隔1秒再使能一次,直到成功,最多100次失败就放弃。

void DeviceStatusService::OnStart()
{
    // ......
    if (!Init()) {
        return;
    }
#ifdef OHOS_BUILD_ENABLE_INTENTION_FRAMEWORK
    intention_ = sptr<IntentionService>::MakeSptr(this);
    if (!Publish(intention_)) {
#else
    if (!Publish(this)) {
#endif // OHOS_BUILD_ENABLE_INTENTION_FRAMEWORK
        return;
    }
    state_ = ServiceRunningState::STATE_RUNNING;
    ready_ = true;
    worker_ = std::thread(std::bind(&DeviceStatusService::OnThread, this));
}

void DeviceStatusService::OnThread()
{
    SetThreadName(std::string("os_ds_service"));
    uint64_t tid = GetThisThreadId();
    delegateTasks_.SetWorkerThreadId(tid);
    EnableDevMgr(MAX_N_RETRIES);

    while (state_ == ServiceRunningState::STATE_RUNNING) {
        struct epoll_event ev[MAX_EVENT_SIZE] {};
        int32_t count = EpollWait(MAX_EVENT_SIZE, -1, ev[0]);
        for (int32_t i = 0; i < count && state_ == ServiceRunningState::STATE_RUNNING; i++) {
          // ......
        }
    }
}
int32_t DeviceStatusService::EnableDevMgr(int32_t nRetries)
{
    static int32_t timerId { -1 };
    int32_t ret = devMgr_.Enable();
    if (ret != RET_OK) {
        if (nRetries > 0) {
            timerId = timerMgr_.AddTimer(DEFAULT_WAIT_TIME_MS, WAIT_FOR_ONCE,
                std::bind(&DeviceStatusService::EnableDevMgr, this, nRetries - 1));
        }
        return ret;
    }
    AddEpoll(EPOLL_EVENT_DEVICE_MGR, devMgr_.GetFd());
    if (timerId >= 0) {
        timerMgr_.RemoveTimer(timerId);
        timerId = -1;
    }
    return RET_OK;
}

例3:超时等不到回复的处理

分布式硬件每个操作的结果回调,间隔4秒最多2次,如果还收不到回复,就清除该回调。正常则在4秒内收到回复,timeId被删除。

    enum class CallbackType {
        StartDInputCallback,
        StartDInputCallbackDHIds,
        StartDInputCallbackSink,
        StopDInputCallback,
        StopDInputCallbackDHIds,
        StopDInputCallbackSink,
        PrepareStartDInputCallback,
        UnPrepareStopDInputCallback,
        PrepareStartDInputCallbackSink,
        UnPrepareStopDInputCallbackSink
    };
void DistributedInputAdapter::AddTimer(const CallbackType &type)
{
    auto context = COOR_EVENT_MGR->GetIContext();
    int32_t timerId = context->GetTimerManager().AddTimer(DEFAULT_DELAY_TIME, RETRY_TIME, [this, type]() {
        if ((callbacks_.find(type) == callbacks_.end()) || (watchings_.find(type) == watchings_.end())) {
            return;
        }
        if (watchings_[type].times == 0) {
            watchings_[type].times++;
            return;
        }
        callbacks_[type](false);
        callbacks_.erase(type);
    });

    watchings_[type].timerId = timerId;
    watchings_[type].times = 0;
}

void DistributedInputAdapter::ProcessDInputCallback(CallbackType cbType, int32_t status)
{
    std::lock_guard<std::mutex> guard(adapterLock_);
    RemoveTimer(cbType);
    auto it = callbacks_.find(cbType);
    if (it == callbacks_.end()) {
        return;
    }
    it->second(status == RET_OK);
    callbacks_.erase(it);
}

例4: 延后获取系统能力集管理器的代理对象

拖拽管理器初始化时,延迟0.5秒之后获取系统能力集管理器的代理对象,并订阅公共息屏、亮屏等事件。

int32_t DragManager::Init(IContext* context)
{
    context_ = context;
    int32_t repeatCount = 1;
    context_->GetTimerManager().AddTimer(INTERVAL_MS, repeatCount, [this]() {
        auto samgrProxy = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();

        statusListener_ = new (std::nothrow) DragAbilityStatusChange(eventHub_);
        int32_t ret = samgrProxy->SubscribeSystemAbility(COMMON_EVENT_SERVICE_ID, statusListener_);
    });
    return RET_OK;
}

例5: 拖拽时,抬起后超时自启动拖拽停止操作

拖拽时,如果鼠标抬起超过2秒,则认为拖拽异常,内部启动StopDrag()

void DragManager::OnDragUp(std::shared_ptr<MMI::PointerEvent> pointerEvent)
{
    DragData dragData = DRAG_DATA_MGR.GetDragData();
    // ......
    int32_t repeatCount = 1;
    timerId_ = context_->GetTimerManager().AddTimer(TIMEOUT_MS, repeatCount, [this]() {
        DragDropResult dropResult { DragResult::DRAG_EXCEPTION, false, -1 };
        this->StopDrag(dropResult);
    });
    notifyPUllUpCallback_();
}

例6:超时未等到,打印失败日志

打开会话超过5秒报错 openSessionWaitCond_.wait_for(waitLock, std::chrono::seconds(SESSION_WAIT_TIMEOUTSECOND) 开启穿越超过1秒报错 openSessionWaitCond.wait_for(sessionLock, std::chrono::seconds(FILTER_WAIT_TIMEOUT_SECOND))

int32_t CoordinationSoftbusAdapter::WaitSessionOpend(const std::string &remoteNetworkId, int32_t sessionId)
{
    std::unique_lock<std::mutex> waitLock(operationMutex_);
    sessionDevs_[remoteNetworkId] = sessionId;
    auto status = openSessionWaitCond_.wait_for(waitLock, std::chrono::seconds(SESSION_WAIT_TIMEOUT_SECOND),
        [this, remoteNetworkId] () { return false; });
    if (!status) {
        CoordinationDFX::WriteOpenSoftbusResult(remoteNetworkId, 0, STATUS_SIGN);
        return RET_ERR;
    }
    return RET_OK;
}

int32_t CoordinationSoftbusAdapter::StartRemoteCoordination(const std::string &localNetworkId,
    const std::string &remoteNetworkId, bool checkButtonDown)
{
    std::unique_lock<std::mutex> sessionLock(operationMutex_);
    // ......
    if (isPointerButtonPressed) {
        auto status = openSessionWaitCond_.wait_for(sessionLock, std::chrono::seconds(FILTER_WAIT_TIMEOUT_SECOND));
        if (status == std::cv_status::timeout) {
            CoordinationDFX::WriteActivate(localNetworkId, remoteNetworkId, sessionDevs_,
                OHOS::HiviewDFX::HiSysEvent::EventType::FAULT);
            return RET_ERR;
        }
    }
    return RET_OK;
}

参考资料

©著作权归作者所有,转载或内容合作请联系作者

您尚未登录,无法参与评论,登录后可以:
参与开源共建问题交流
认同或收藏高质量问答
获取积分成为开源共建先驱

Copyright   ©2023  OpenHarmony开发者论坛  京ICP备2020036654号-3 |技术支持 Discuz!

返回顶部