140161064SShuo Chen// excerpts from http://code.google.com/p/muduo/
240161064SShuo Chen//
340161064SShuo Chen// Use of this source code is governed by a BSD-style license
440161064SShuo Chen// that can be found in the License file.
540161064SShuo Chen//
640161064SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
740161064SShuo Chen
840161064SShuo Chen#define __STDC_LIMIT_MACROS
940161064SShuo Chen#include "TimerQueue.h"
1040161064SShuo Chen
1140161064SShuo Chen#include "logging/Logging.h"
1240161064SShuo Chen#include "EventLoop.h"
1340161064SShuo Chen#include "Timer.h"
1440161064SShuo Chen#include "TimerId.h"
1540161064SShuo Chen
1640161064SShuo Chen#include <boost/bind.hpp>
17f4e8e3d3SShuo Chen#include <boost/foreach.hpp>
1840161064SShuo Chen
1940161064SShuo Chen#include <sys/timerfd.h>
2040161064SShuo Chen
2140161064SShuo Chennamespace muduo
2240161064SShuo Chen{
2340161064SShuo Chennamespace detail
2440161064SShuo Chen{
2540161064SShuo Chen
2640161064SShuo Chenint createTimerfd()
2740161064SShuo Chen{
2840161064SShuo Chen  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
2940161064SShuo Chen                                 TFD_NONBLOCK | TFD_CLOEXEC);
3040161064SShuo Chen  if (timerfd < 0)
3140161064SShuo Chen  {
3240161064SShuo Chen    LOG_SYSFATAL << "Failed in timerfd_create";
3340161064SShuo Chen  }
3440161064SShuo Chen  return timerfd;
3540161064SShuo Chen}
3640161064SShuo Chen
3740161064SShuo Chenstruct timespec howMuchTimeFromNow(Timestamp when)
3840161064SShuo Chen{
3940161064SShuo Chen  int64_t microseconds = when.microSecondsSinceEpoch()
4040161064SShuo Chen                         - Timestamp::now().microSecondsSinceEpoch();
4140161064SShuo Chen  if (microseconds < 100)
4240161064SShuo Chen  {
4340161064SShuo Chen    microseconds = 100;
4440161064SShuo Chen  }
4540161064SShuo Chen  struct timespec ts;
4640161064SShuo Chen  ts.tv_sec = static_cast<time_t>(
4740161064SShuo Chen      microseconds / Timestamp::kMicroSecondsPerSecond);
4840161064SShuo Chen  ts.tv_nsec = static_cast<long>(
4940161064SShuo Chen      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
5040161064SShuo Chen  return ts;
5140161064SShuo Chen}
5240161064SShuo Chen
5340161064SShuo Chenvoid readTimerfd(int timerfd, Timestamp now)
5440161064SShuo Chen{
5540161064SShuo Chen  uint64_t howmany;
5640161064SShuo Chen  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
5740161064SShuo Chen  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
5840161064SShuo Chen  if (n != sizeof howmany)
5940161064SShuo Chen  {
6040161064SShuo Chen    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
6140161064SShuo Chen  }
6240161064SShuo Chen}
6340161064SShuo Chen
6440161064SShuo Chenvoid resetTimerfd(int timerfd, Timestamp expiration)
6540161064SShuo Chen{
6640161064SShuo Chen  // wake up loop by timerfd_settime()
6740161064SShuo Chen  struct itimerspec newValue;
6840161064SShuo Chen  struct itimerspec oldValue;
6940161064SShuo Chen  bzero(&newValue, sizeof newValue);
7040161064SShuo Chen  bzero(&oldValue, sizeof oldValue);
7140161064SShuo Chen  newValue.it_value = howMuchTimeFromNow(expiration);
7240161064SShuo Chen  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
7340161064SShuo Chen  if (ret)
7440161064SShuo Chen  {
7540161064SShuo Chen    LOG_SYSERR << "timerfd_settime()";
7640161064SShuo Chen  }
7740161064SShuo Chen}
7840161064SShuo Chen
7940161064SShuo Chen}
8040161064SShuo Chen}
8140161064SShuo Chen
8240161064SShuo Chenusing namespace muduo;
8340161064SShuo Chenusing namespace muduo::detail;
8440161064SShuo Chen
8540161064SShuo ChenTimerQueue::TimerQueue(EventLoop* loop)
8640161064SShuo Chen  : loop_(loop),
8740161064SShuo Chen    timerfd_(createTimerfd()),
8840161064SShuo Chen    timerfdChannel_(loop, timerfd_),
89f4e8e3d3SShuo Chen    timers_(),
90f4e8e3d3SShuo Chen    callingExpiredTimers_(false)
9140161064SShuo Chen{
9240161064SShuo Chen  timerfdChannel_.setReadCallback(
9340161064SShuo Chen      boost::bind(&TimerQueue::handleRead, this));
9440161064SShuo Chen  // we are always reading the timerfd, we disarm it with timerfd_settime.
9540161064SShuo Chen  timerfdChannel_.enableReading();
9640161064SShuo Chen}
9740161064SShuo Chen
9840161064SShuo ChenTimerQueue::~TimerQueue()
9940161064SShuo Chen{
10040161064SShuo Chen  ::close(timerfd_);
10140161064SShuo Chen  // do not remove channel, since we're in EventLoop::dtor();
10240161064SShuo Chen  for (TimerList::iterator it = timers_.begin();
10340161064SShuo Chen      it != timers_.end(); ++it)
10440161064SShuo Chen  {
10540161064SShuo Chen    delete it->second;
10640161064SShuo Chen  }
10740161064SShuo Chen}
10840161064SShuo Chen
10940161064SShuo ChenTimerId TimerQueue::addTimer(const TimerCallback& cb,
11040161064SShuo Chen                             Timestamp when,
11140161064SShuo Chen                             double interval)
11240161064SShuo Chen{
11340161064SShuo Chen  Timer* timer = new Timer(cb, when, interval);
11440161064SShuo Chen  loop_->runInLoop(
115f4e8e3d3SShuo Chen      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116f4e8e3d3SShuo Chen  return TimerId(timer, timer->sequence());
11740161064SShuo Chen}
11840161064SShuo Chen
119f4e8e3d3SShuo Chenvoid TimerQueue::cancel(TimerId timerId)
120f4e8e3d3SShuo Chen{
121f4e8e3d3SShuo Chen  loop_->runInLoop(
122f4e8e3d3SShuo Chen      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123f4e8e3d3SShuo Chen}
124f4e8e3d3SShuo Chen
125f4e8e3d3SShuo Chenvoid TimerQueue::addTimerInLoop(Timer* timer)
12640161064SShuo Chen{
12740161064SShuo Chen  loop_->assertInLoopThread();
12840161064SShuo Chen  bool earliestChanged = insert(timer);
12940161064SShuo Chen
13040161064SShuo Chen  if (earliestChanged)
13140161064SShuo Chen  {
13240161064SShuo Chen    resetTimerfd(timerfd_, timer->expiration());
13340161064SShuo Chen  }
13440161064SShuo Chen}
13540161064SShuo Chen
136f4e8e3d3SShuo Chenvoid TimerQueue::cancelInLoop(TimerId timerId)
137f4e8e3d3SShuo Chen{
138f4e8e3d3SShuo Chen  loop_->assertInLoopThread();
139f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
140fdb5c17cSShuo Chen  ActiveTimer timer(timerId.timer_, timerId.sequence_);
141f4e8e3d3SShuo Chen  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142f4e8e3d3SShuo Chen  if (it != activeTimers_.end())
143f4e8e3d3SShuo Chen  {
144f4e8e3d3SShuo Chen    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145f4e8e3d3SShuo Chen    assert(n == 1); (void)n;
146f4e8e3d3SShuo Chen    delete it->first; // FIXME: no delete please
147f4e8e3d3SShuo Chen    activeTimers_.erase(it);
148f4e8e3d3SShuo Chen  }
149f4e8e3d3SShuo Chen  else if (callingExpiredTimers_)
150f4e8e3d3SShuo Chen  {
151f4e8e3d3SShuo Chen    cancelingTimers_.insert(timer);
152f4e8e3d3SShuo Chen  }
153f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
154f4e8e3d3SShuo Chen}
155f4e8e3d3SShuo Chen
15640161064SShuo Chenvoid TimerQueue::handleRead()
15740161064SShuo Chen{
15840161064SShuo Chen  loop_->assertInLoopThread();
15940161064SShuo Chen  Timestamp now(Timestamp::now());
16040161064SShuo Chen  readTimerfd(timerfd_, now);
16140161064SShuo Chen
16240161064SShuo Chen  std::vector<Entry> expired = getExpired(now);
16340161064SShuo Chen
164f4e8e3d3SShuo Chen  callingExpiredTimers_ = true;
165f4e8e3d3SShuo Chen  cancelingTimers_.clear();
16640161064SShuo Chen  // safe to callback outside critical section
16740161064SShuo Chen  for (std::vector<Entry>::iterator it = expired.begin();
16840161064SShuo Chen      it != expired.end(); ++it)
16940161064SShuo Chen  {
17040161064SShuo Chen    it->second->run();
17140161064SShuo Chen  }
172f4e8e3d3SShuo Chen  callingExpiredTimers_ = false;
17340161064SShuo Chen
17440161064SShuo Chen  reset(expired, now);
17540161064SShuo Chen}
17640161064SShuo Chen
17740161064SShuo Chenstd::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
17840161064SShuo Chen{
179f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
18040161064SShuo Chen  std::vector<Entry> expired;
18140161064SShuo Chen  Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
18240161064SShuo Chen  TimerList::iterator it = timers_.lower_bound(sentry);
18340161064SShuo Chen  assert(it == timers_.end() || now < it->first);
18440161064SShuo Chen  std::copy(timers_.begin(), it, back_inserter(expired));
18540161064SShuo Chen  timers_.erase(timers_.begin(), it);
18640161064SShuo Chen
187f4e8e3d3SShuo Chen  BOOST_FOREACH(Entry entry, expired)
188f4e8e3d3SShuo Chen  {
189f4e8e3d3SShuo Chen    ActiveTimer timer(entry.second, entry.second->sequence());
190f4e8e3d3SShuo Chen    size_t n = activeTimers_.erase(timer);
191f4e8e3d3SShuo Chen    assert(n == 1); (void)n;
192f4e8e3d3SShuo Chen  }
193f4e8e3d3SShuo Chen
194f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
19540161064SShuo Chen  return expired;
19640161064SShuo Chen}
19740161064SShuo Chen
19840161064SShuo Chenvoid TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
19940161064SShuo Chen{
20040161064SShuo Chen  Timestamp nextExpire;
20140161064SShuo Chen
20240161064SShuo Chen  for (std::vector<Entry>::const_iterator it = expired.begin();
20340161064SShuo Chen      it != expired.end(); ++it)
20440161064SShuo Chen  {
205f4e8e3d3SShuo Chen    ActiveTimer timer(it->second, it->second->sequence());
206f4e8e3d3SShuo Chen    if (it->second->repeat()
207f4e8e3d3SShuo Chen        && cancelingTimers_.find(timer) == cancelingTimers_.end())
20840161064SShuo Chen    {
20940161064SShuo Chen      it->second->restart(now);
21040161064SShuo Chen      insert(it->second);
21140161064SShuo Chen    }
21240161064SShuo Chen    else
21340161064SShuo Chen    {
21440161064SShuo Chen      // FIXME move to a free list
21540161064SShuo Chen      delete it->second;
21640161064SShuo Chen    }
21740161064SShuo Chen  }
21840161064SShuo Chen
21940161064SShuo Chen  if (!timers_.empty())
22040161064SShuo Chen  {
22140161064SShuo Chen    nextExpire = timers_.begin()->second->expiration();
22240161064SShuo Chen  }
22340161064SShuo Chen
22440161064SShuo Chen  if (nextExpire.valid())
22540161064SShuo Chen  {
22640161064SShuo Chen    resetTimerfd(timerfd_, nextExpire);
22740161064SShuo Chen  }
22840161064SShuo Chen}
22940161064SShuo Chen
23040161064SShuo Chenbool TimerQueue::insert(Timer* timer)
23140161064SShuo Chen{
232f4e8e3d3SShuo Chen  loop_->assertInLoopThread();
233f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
23440161064SShuo Chen  bool earliestChanged = false;
23540161064SShuo Chen  Timestamp when = timer->expiration();
23640161064SShuo Chen  TimerList::iterator it = timers_.begin();
23740161064SShuo Chen  if (it == timers_.end() || when < it->first)
23840161064SShuo Chen  {
23940161064SShuo Chen    earliestChanged = true;
24040161064SShuo Chen  }
241f4e8e3d3SShuo Chen
242f4e8e3d3SShuo Chen  {
243f4e8e3d3SShuo Chen    std::pair<TimerList::iterator, bool> result
244f4e8e3d3SShuo Chen      = timers_.insert(Entry(when, timer));
245f4e8e3d3SShuo Chen    assert(result.second); (void)result;
246f4e8e3d3SShuo Chen  }
247f4e8e3d3SShuo Chen  {
248f4e8e3d3SShuo Chen    std::pair<ActiveTimerSet::iterator, bool> result
249f4e8e3d3SShuo Chen      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250f4e8e3d3SShuo Chen    assert(result.second); (void)result;
251f4e8e3d3SShuo Chen  }
252f4e8e3d3SShuo Chen
253f4e8e3d3SShuo Chen  assert(timers_.size() == activeTimers_.size());
25440161064SShuo Chen  return earliestChanged;
25540161064SShuo Chen}
25640161064SShuo Chen
257