博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Muduo网络库源码分析之定时器的实现
阅读量:4184 次
发布时间:2019-05-26

本文共 13857 字,大约阅读时间需要 46 分钟。

muduo 的定时器功能由三个 class 实现,TimerId、Timer 和 TimerQueue。

TimerId 类

它唯一标识一个 Timer 定时器。TimerId Class 同时保存Timer* 和 sequence_,这个 sequence_ 是每个 Timer 对象有一个全局递增的序列号 int64_t sequence_,用原子计数器(AtomicInt64)生成。

它主要用于注销定时器,这样就可以区分地址相同的先后两个 Timer 对象。

namespace muduo{namespace net{class Timer;////// An opaque identifier, for canceling Timer.////* 带有唯一标识的Timer,主要用于取消Timer */class TimerId : public muduo::copyable{ public:  TimerId()    : timer_(NULL),      sequence_(0)  {  }  TimerId(Timer* timer, int64_t seq)    : timer_(timer),    //timer 定时器的指针      sequence_(seq)    //seq 该定时任务的序列号  {  }  // default copy-ctor, dtor and assignment are okay  friend class TimerQueue; private:  Timer* timer_;  int64_t sequence_;};}}

Timer 类

封装了定时器的一些参数,包括超时时间(expiration_)、超时回调函数(callback_)、时间间隔(interval_)、是否重复定时(repeat_)、定时器的序列号等成员变量,成员函数大都是返回这些变量的值,run() 用来调用回调函数,restart() 用来重启定时器。

Timer.h

namespace muduo{namespace net{////// Internal class for timer event.////* 定时器 */class Timer : boost::noncopyable{ public:  Timer(const TimerCallback& cb, Timestamp when, double interval)    : callback_(cb),      expiration_(when),      interval_(interval),      repeat_(interval > 0.0),      sequence_(s_numCreated_.incrementAndGet())  { }#ifdef __GXX_EXPERIMENTAL_CXX0X__  Timer(TimerCallback&& cb, Timestamp when, double interval)    : callback_(std::move(cb)),      expiration_(when),      interval_(interval),      repeat_(interval > 0.0),      sequence_(s_numCreated_.incrementAndGet())  { }#endif  void run() const  {    callback_();    //执行定时器回调函数  }  /* 返回定时器的超时时间戳 */  Timestamp expiration() const  { return expiration_; }  /* 是否周期性定时 */  bool repeat() const { return repeat_; }  /* 返回本定时器的序列号 */  int64_t sequence() const { return sequence_; }  /* 重启定时器 */  void restart(Timestamp now);  static int64_t numCreated() { return s_numCreated_.get(); } private:  const TimerCallback callback_;    //超时回调函数  Timestamp expiration_;            //超时时间戳  const double interval_;           //时间间隔,如果是一次性定时器,该值为0  const bool repeat_;               //是否重复执行  const int64_t sequence_;          //本定时任务的序号  static AtomicInt64 s_numCreated_; //定时器计数,当前已经创建的定时器数量};}}

Timer.cc

#include 
using namespace muduo;using namespace muduo::net;AtomicInt64 Timer::s_numCreated_;void Timer::restart(Timestamp now){ if (repeat_) { //如果需要重复,那就将时间设为下次超时的时间 expiration_ = addTime(now, interval_); } else { //如果不需要重复,那就将超时时间设为一个不可用的 value expiration_ = Timestamp::invalid(); }}

TimerQueue 类

定时器队列,用于管理所有的定时器,此类的接口只有两个:添加和注销定时器,分别为 addTimer()cancel()

TimerQueue 数据结构的选择。需要高效地组织目前尚未到期的 Timer,能快速地根据当前时间找到已经到期的 Timer,也要能高效地添加和删除 Timer。最终选择了 set < pair<TimeStamp,Timer*> >,采用 pair 为 key 的原因是可能在一个时刻有多个相同的 Timestamp 时间戳超时,而查找只返回一个,这样即使两个 Timer 的超时时间相同,它们的地址也必须不同。

通过给 timerfd 一个超时时间实现超时计时,它内部有 Channel,通过 Channel 管理 timerfd,然后向EventLoop和 Poller 注册 timerfd 的可读事件,当 timerfd 的可读事件就绪时表明一个超时时间点到了,然后调用 timerfdChannel_ 的可读事件回调 handleRead(),通过 getExpired() 找出所有的超时事件,然后执行相应的超时回调函数 Timer::run()。为了复用定时器,每次处理完之后,会检查这些超时定时器是否需要重复定时,如果需要重复,就再次添加到定时器集合中。

timerfd 如何实现多个定时器超时计时的呢?每次向保存定时器的 set 容器插入一个定时器 Timer 的时候就比较 set 的头元素的超时时间,若新插入的超时时间小,则更新 timerfd 的时间,从而保证 timerfd 始终是 set 中最近的一个超时时间。当 timerfd 可读时,需要遍历容器 set,因为可能此时有多个 Timer 超时了(尽管 tiemrfd 是当前最小的超时时间)。这里的关键是采用 timerfd 实现统一事件源。

TimerQueue.h

namespace muduo{namespace net{class EventLoop;class Timer;class TimerId;////// A best efforts timer queue./// No guarantee that the callback will be on time.////* 定时器队列 */class TimerQueue : boost::noncopyable{ public:  explicit TimerQueue(EventLoop* loop);  ~TimerQueue();  ///  /// Schedules the callback to be run at given time,  /// repeats if @c interval > 0.0.  ///  /// Must be thread safe. Usually be called from other threads.  /* 添加一个定时器 */  TimerId addTimer(const TimerCallback& cb,                   Timestamp when,                   double interval);#ifdef __GXX_EXPERIMENTAL_CXX0X__  TimerId addTimer(TimerCallback&& cb,                   Timestamp when,                   double interval);#endif  /* 注销一个定时器 */  void cancel(TimerId timerId); private:  // FIXME: use unique_ptr
instead of raw pointers. // This requires heterogeneous comparison lookup (N3465) from C++14 // so that we can find an T* in a set
>. typedef std::pair
Entry; //对应一个定时任务 typedef std::set
TimerList; //定时任务集合,采用set,有key无value,且有序 typedef std::pair
ActiveTimer; //下面有解释 typedef std::set
ActiveTimerSet; void addTimerInLoop(Timer* timer); //添加一个定时任务 void cancelInLoop(TimerId timerId); //注销一个定时器 // called when timerfd alarms void handleRead(); //timerfd 可读 的回调 // move out all expired timers std::vector
getExpired(Timestamp now); //获取所有超时的定时器 /* 重置超时的定时器 */ void reset(const std::vector
& expired, Timestamp now); bool insert(Timer* timer); //把定时器插到TimerList中 EventLoop* loop_; //TimerQueue 所属的 EventLoop const int timerfd_; // 内部的 timerfd Channel timerfdChannel_; //timerfd 对应的Channel,借此来观察timerfd_ 上的readable事件 // Timer list sorted by expiration TimerList timers_; //所有的定时任务 // for cancel() // timers_ 与 activeTimers_ 都保存了相同的Timer 地址 // timers_ 是按超时时间排序,activeTimers_ 是按定时器地址排序 ActiveTimerSet activeTimers_; bool callingExpiredTimers_; /* atomic *///是否处于 处理定时器超时回调中 ActiveTimerSet cancelingTimers_; //保存被注销的定时器};}}

TimerQueue.cc

TimerQueue::TimerQueue(EventLoop* loop)  : loop_(loop),    timerfd_(createTimerfd()),    timerfdChannel_(loop, timerfd_),    timers_(),    callingExpiredTimers_(false){  timerfdChannel_.setReadCallback(      boost::bind(&TimerQueue::handleRead, this));//设置timerfd可读事件回调函数为handleRead  // we are always reading the timerfd, we disarm it with timerfd_settime.  timerfdChannel_.enableReading();  //timerfd 注册可读事件}TimerQueue::~TimerQueue(){  timerfdChannel_.disableAll();  timerfdChannel_.remove();  ::close(timerfd_);  // do not remove channel, since we're in EventLoop::dtor();  for (TimerList::iterator it = timers_.begin();      it != timers_.end(); ++it)  {    delete it->second; //手动释放Timer*  }}/* 添加定时任务,返回此定时器对应的唯一标识 */TimerId TimerQueue::addTimer(const TimerCallback& cb,                             Timestamp when,                             double interval){    /* new 一个定时器对象 interval 大于0 ,就是需要重复的定时器 */  Timer* timer = new Timer(cb, when, interval);  /*    * runInLoop 的意思是 如果本IO线程想要添加定时器则直接由 addTimerInLoop 添加   * 如果是其他线程向IO线程添加定时器则需要间接通过 queueInLoop添加   */  loop_->runInLoop(      boost::bind(&TimerQueue::addTimerInLoop, this, timer));  return TimerId(timer, timer->sequence());}#ifdef __GXX_EXPERIMENTAL_CXX0X__TimerId TimerQueue::addTimer(TimerCallback&& cb,                             Timestamp when,                             double interval){    // 右值语义  Timer* timer = new Timer(std::move(cb), when, interval);  loop_->runInLoop(      boost::bind(&TimerQueue::addTimerInLoop, this, timer));  return TimerId(timer, timer->sequence());}#endif/* 注销一个定时器,被EventLoop::cancel(TimerId timerId)调用 */void TimerQueue::cancel(TimerId timerId){  loop_->runInLoop(      boost::bind(&TimerQueue::cancelInLoop, this, timerId));}/* IO线程向自己添加定时器 */void TimerQueue::addTimerInLoop(Timer* timer){  loop_->assertInLoopThread();  bool earliestChanged = insert(timer); //如果当前插入的定时器 比队列中的定时器都早 则返回真  if (earliestChanged)      //最早的超时时间改变了,就需要重置timerfd_的超时时间  {    resetTimerfd(timerfd_, timer->expiration()); //timerfd_ 重新设置超时时间  }}void TimerQueue::cancelInLoop(TimerId timerId){  loop_->assertInLoopThread();  assert(timers_.size() == activeTimers_.size());  ActiveTimer timer(timerId.timer_, timerId.sequence_);  ActiveTimerSet::iterator it = activeTimers_.find(timer);  //查找该定时器  if (it != activeTimers_.end()) // 找到了  {      /* 从 timers_ 和 activeTimers_ 中删掉*/    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));    assert(n == 1); (void)n;    delete it->first; // FIXME: no delete please    //手动 delete     activeTimers_.erase(it);  }  else if (callingExpiredTimers_) //可能正在处理  {      /* 那就先 插入要被注销的定时器 */    cancelingTimers_.insert(timer);  }  assert(timers_.size() == activeTimers_.size());}/* timerfd 可读事件的回调函数 */void TimerQueue::handleRead(){  loop_->assertInLoopThread();  Timestamp now(Timestamp::now());  readTimerfd(timerfd_, now);  /* 找出所有超时的事件 */  std::vector
expired = getExpired(now); callingExpiredTimers_ = true; cancelingTimers_.clear(); // safe to callback outside critical section for (std::vector
::iterator it = expired.begin(); it != expired.end(); ++it) { it->second->run(); //执行超时定时器的回调 } callingExpiredTimers_ = false; reset(expired, now); //重置定时器,如果不需要再次定时,就删掉,否则再次定时}/* 获取队列中超时的定时器 */std::vector
TimerQueue::getExpired(Timestamp now){ assert(timers_.size() == activeTimers_.size()); std::vector
expired; //保存超时定时器的容器 Entry sentry(now, reinterpret_cast
(UINTPTR_MAX)); //哨兵值 TimerList::iterator end = timers_.lower_bound(sentry); //返回第一个未超时的Timer的迭代器 assert(end == timers_.end() || now < end->first); //均未超时或者找到了 std::copy(timers_.begin(), end, back_inserter(expired)); //把超时的定时器拷贝到 expired 容器中 timers_.erase(timers_.begin(), end); //将超时的定时器从timers_删掉 for (std::vector
::iterator it = expired.begin(); it != expired.end(); ++it) { ActiveTimer timer(it->second, it->second->sequence()); size_t n = activeTimers_.erase(timer); // 将超时的定时器 从 activeTimers_ 删掉 assert(n == 1); (void)n; } assert(timers_.size() == activeTimers_.size()); // 都删掉之后 size 应该相同 return expired; //返回超时的那部分定时器}/* 已经执行完超时回调的定时任务后,检查这些定时器是否需要重复 */void TimerQueue::reset(const std::vector
& expired, Timestamp now){ Timestamp nextExpire; for (std::vector
::const_iterator it = expired.begin(); it != expired.end(); ++it) { ActiveTimer timer(it->second, it->second->sequence()); if (it->second->repeat() // 需要重复 而且 没有要被注销 && cancelingTimers_.find(timer) == cancelingTimers_.end()) { /* 将该定时器的超时时间改为下次超时的时间 */ it->second->restart(now); insert(it->second); //重新插入到定时器容器中 } else { // FIXME move to a free list // 不需要重复就删除 delete it->second; // FIXME: no delete please } } if (!timers_.empty()) { /* 获取当前定时器集合中的最早定时器的时间戳,作为下次超时时间*/ nextExpire = timers_.begin()->second->expiration(); } if (nextExpire.valid()) { resetTimerfd(timerfd_, nextExpire); //重置 timerfd_ 的超时时间 }}/* 向 set 中插入新的定时器 */bool TimerQueue::insert(Timer* timer){ loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); bool earliestChanged = false; // 最早的超时时间 是否被更改 Timestamp when = timer->expiration(); //新插入 timer 的超时时间 TimerList::iterator it = timers_.begin(); // 当前最早的定时任务 if (it == timers_.end() || when < it->first) { earliestChanged = true; //如果timers_为空,或者when 小于目前最早的定时任务,那么最早的超时时间,肯定需要被改变 } { /* 向 timers_ 中插入定时任务 */ std::pair
result = timers_.insert(Entry(when, timer)); assert(result.second); (void)result; } { /* 向 activeTimers_ 中插入定时任务 */ std::pair
result = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); assert(result.second); (void)result; } /* 插入完成后,两个容器元素数目应该相同 */ assert(timers_.size() == activeTimers_.size()); return earliestChanged; //返回修改标志,表示最近的超时时间已经改变}

定时器的使用接口

EventLoop 中提供了四个接口来使用定时器,三个是添加定时器,都转而调用 TimerQueue::addTimer();还有一个注销定时器。如下:

/* 在时间戳为 time 的时刻执行,0.0 表示不重复 */TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb){  return timerQueue_->addTimer(cb, time, 0.0);}/* 延迟 delay 时间执行 */TimerId EventLoop::runAfter(double delay, const TimerCallback& cb){  Timestamp time(addTime(Timestamp::now(), delay));  return runAt(time, cb);}/* 重复定时器,时间间隔为 interval */TimerId EventLoop::runEvery(double interval, const TimerCallback& cb){  Timestamp time(addTime(Timestamp::now(), interval));  return timerQueue_->addTimer(cb, time, interval);}/* 注销定时器,直接调用 TimerQueue::cancel() */void EventLoop::cancel(TimerId timerId){  return timerQueue_->cancel(timerId);}

timerfd 的相关操作

timerfd 是 Linux 为用户程序提供的一个定时器接口,将定时器抽象为文件描述符,通过文件描述符的可读事件进行超时通知,该文件在超时的那一刻变得可读,这样就能完美的融入到 select/poll 框架中,用统一的方式处理 I/O 和定时事件。同时它的时间精度比用 select/poll 的 timeout 更高,timeout 定时精度只有毫秒。

提供了三个 timerfd C API:

#include 
int timerfd_create(int clockid, int flags); int timerfd_settime(int fd, int flags, const struct itimerspec *new_value,struct itimerspec *old_value); int timerfd_gettime(int fd, struct itimerspec *curr_value);

用法和测试例子请参考:

下面看一下 TimerQueue 中 timerfd 的相关操作。

/* 创建 timerfd */int createTimerfd(){  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,                                 TFD_NONBLOCK | TFD_CLOEXEC);  if (timerfd < 0)  {    LOG_SYSFATAL << "Failed in timerfd_create";  }  return timerfd;}/* 计算超时时间与当前时间的时间差,并将参数转换为 api 接受的类型  */struct timespec howMuchTimeFromNow(Timestamp when){    /* 微秒数 = 超时时刻微秒数 - 当前时刻微秒数 */  int64_t microseconds = when.microSecondsSinceEpoch()                         - Timestamp::now().microSecondsSinceEpoch();  if (microseconds < 100)  {    microseconds = 100;  }  struct timespec ts;   //转换成 struct timespec 结构返回  ts.tv_sec = static_cast
( microseconds / Timestamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast
( (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); return ts;}/* 读timerfd,避免定时器事件一直触发 */void readTimerfd(int timerfd, Timestamp now){ uint64_t howmany; ssize_t n = ::read(timerfd, &howmany, sizeof howmany); LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); if (n != sizeof howmany) { LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; }}/* 重置定时器超时时间 */void resetTimerfd(int timerfd, Timestamp expiration){ // wake up loop by timerfd_settime() struct itimerspec newValue; struct itimerspec oldValue; bzero(&newValue, sizeof newValue); bzero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); //到这个时间后,会产生一个定时事件 if (ret) { LOG_SYSERR << "timerfd_settime()"; }}

以上就是 muduo 定时器的实现,我自己在写 http server 时自己用最小堆实现的定时器,有兴趣的可以看下:

转载地址:http://uquoi.baihongyu.com/

你可能感兴趣的文章
在CentOS 7.7 x86_64上安装InfluxDB 1.8.0实录
查看>>
CentOS 7.5 如何升级Git实录
查看>>
Python中的urllib.quote和Go中的url.QueryEscape关系探讨
查看>>
在Mac上使用pip3安装python的数据统计模块实录
查看>>
在Mac上使用pip3安装交互式环境IPython实录
查看>>
在Mac上使用pip3安装Jupyter Notebook并简单使用
查看>>
在Mac上利用pip3安装pyecharts模块
查看>>
go连接Kafka报错kafka: client has run out of available brokers to talk to
查看>>
在CentOS 7.5上升级SQLite3过程实录
查看>>
Mastering Algorithms with C中文版附带源码说明
查看>>
[综合面试] 计算机面试书籍与求职网站推荐
查看>>
删除链表中的重复项
查看>>
交互两个数(不引入第三个变量)
查看>>
C/C++面试题分享
查看>>
一道题弄明白二维数组的指针
查看>>
链表类型题目需要用到的头文件list.h
查看>>
tree类型题目需要用到的头文件tree.h
查看>>
有一个1亿结点的树,已知两个结点, 求它们的最低公共祖先!
查看>>
BST(binary search tree)类型题目需要用到的头文件binary_tree.h
查看>>
将BST转换为有序的双向链表!
查看>>