1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#define __STDC_LIMIT_MACROS
9#include "TimerQueue.h"
10
11#include "logging/Logging.h"
12#include "EventLoop.h"
13#include "Timer.h"
14#include "TimerId.h"
15
16#include <boost/bind.hpp>
17#include <boost/foreach.hpp>
18
19#include <sys/timerfd.h>
20
21namespace muduo
22{
23namespace detail
24{
25
26int createTimerfd()
27{
28  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29                                 TFD_NONBLOCK | TFD_CLOEXEC);
30  if (timerfd < 0)
31  {
32    LOG_SYSFATAL << "Failed in timerfd_create";
33  }
34  return timerfd;
35}
36
37struct timespec howMuchTimeFromNow(Timestamp when)
38{
39  int64_t microseconds = when.microSecondsSinceEpoch()
40                         - Timestamp::now().microSecondsSinceEpoch();
41  if (microseconds < 100)
42  {
43    microseconds = 100;
44  }
45  struct timespec ts;
46  ts.tv_sec = static_cast<time_t>(
47      microseconds / Timestamp::kMicroSecondsPerSecond);
48  ts.tv_nsec = static_cast<long>(
49      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50  return ts;
51}
52
53void readTimerfd(int timerfd, Timestamp now)
54{
55  uint64_t howmany;
56  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58  if (n != sizeof howmany)
59  {
60    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61  }
62}
63
64void resetTimerfd(int timerfd, Timestamp expiration)
65{
66  // wake up loop by timerfd_settime()
67  struct itimerspec newValue;
68  struct itimerspec oldValue;
69  bzero(&newValue, sizeof newValue);
70  bzero(&oldValue, sizeof oldValue);
71  newValue.it_value = howMuchTimeFromNow(expiration);
72  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73  if (ret)
74  {
75    LOG_SYSERR << "timerfd_settime()";
76  }
77}
78
79}
80}
81
82using namespace muduo;
83using namespace muduo::detail;
84
85TimerQueue::TimerQueue(EventLoop* loop)
86  : loop_(loop),
87    timerfd_(createTimerfd()),
88    timerfdChannel_(loop, timerfd_),
89    timers_(),
90    callingExpiredTimers_(false)
91{
92  timerfdChannel_.setReadCallback(
93      boost::bind(&TimerQueue::handleRead, this));
94  // we are always reading the timerfd, we disarm it with timerfd_settime.
95  timerfdChannel_.enableReading();
96}
97
98TimerQueue::~TimerQueue()
99{
100  ::close(timerfd_);
101  // do not remove channel, since we're in EventLoop::dtor();
102  for (TimerList::iterator it = timers_.begin();
103      it != timers_.end(); ++it)
104  {
105    delete it->second;
106  }
107}
108
109TimerId TimerQueue::addTimer(const TimerCallback& cb,
110                             Timestamp when,
111                             double interval)
112{
113  Timer* timer = new Timer(cb, when, interval);
114  loop_->runInLoop(
115      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116  return TimerId(timer, timer->sequence());
117}
118
119void TimerQueue::cancel(TimerId timerId)
120{
121  loop_->runInLoop(
122      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123}
124
125void TimerQueue::addTimerInLoop(Timer* timer)
126{
127  loop_->assertInLoopThread();
128  bool earliestChanged = insert(timer);
129
130  if (earliestChanged)
131  {
132    resetTimerfd(timerfd_, timer->expiration());
133  }
134}
135
136void TimerQueue::cancelInLoop(TimerId timerId)
137{
138  loop_->assertInLoopThread();
139  assert(timers_.size() == activeTimers_.size());
140  ActiveTimer timer(timerId.timer_, timerId.sequence_);
141  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142  if (it != activeTimers_.end())
143  {
144    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145    assert(n == 1); (void)n;
146    delete it->first; // FIXME: no delete please
147    activeTimers_.erase(it);
148  }
149  else if (callingExpiredTimers_)
150  {
151    cancelingTimers_.insert(timer);
152  }
153  assert(timers_.size() == activeTimers_.size());
154}
155
156void TimerQueue::handleRead()
157{
158  loop_->assertInLoopThread();
159  Timestamp now(Timestamp::now());
160  readTimerfd(timerfd_, now);
161
162  std::vector<Entry> expired = getExpired(now);
163
164  callingExpiredTimers_ = true;
165  cancelingTimers_.clear();
166  // safe to callback outside critical section
167  for (std::vector<Entry>::iterator it = expired.begin();
168      it != expired.end(); ++it)
169  {
170    it->second->run();
171  }
172  callingExpiredTimers_ = false;
173
174  reset(expired, now);
175}
176
177std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
178{
179  assert(timers_.size() == activeTimers_.size());
180  std::vector<Entry> expired;
181  Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
182  TimerList::iterator it = timers_.lower_bound(sentry);
183  assert(it == timers_.end() || now < it->first);
184  std::copy(timers_.begin(), it, back_inserter(expired));
185  timers_.erase(timers_.begin(), it);
186
187  BOOST_FOREACH(Entry entry, expired)
188  {
189    ActiveTimer timer(entry.second, entry.second->sequence());
190    size_t n = activeTimers_.erase(timer);
191    assert(n == 1); (void)n;
192  }
193
194  assert(timers_.size() == activeTimers_.size());
195  return expired;
196}
197
198void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
199{
200  Timestamp nextExpire;
201
202  for (std::vector<Entry>::const_iterator it = expired.begin();
203      it != expired.end(); ++it)
204  {
205    ActiveTimer timer(it->second, it->second->sequence());
206    if (it->second->repeat()
207        && cancelingTimers_.find(timer) == cancelingTimers_.end())
208    {
209      it->second->restart(now);
210      insert(it->second);
211    }
212    else
213    {
214      // FIXME move to a free list
215      delete it->second;
216    }
217  }
218
219  if (!timers_.empty())
220  {
221    nextExpire = timers_.begin()->second->expiration();
222  }
223
224  if (nextExpire.valid())
225  {
226    resetTimerfd(timerfd_, nextExpire);
227  }
228}
229
230bool TimerQueue::insert(Timer* timer)
231{
232  loop_->assertInLoopThread();
233  assert(timers_.size() == activeTimers_.size());
234  bool earliestChanged = false;
235  Timestamp when = timer->expiration();
236  TimerList::iterator it = timers_.begin();
237  if (it == timers_.end() || when < it->first)
238  {
239    earliestChanged = true;
240  }
241
242  {
243    std::pair<TimerList::iterator, bool> result
244      = timers_.insert(Entry(when, timer));
245    assert(result.second); (void)result;
246  }
247  {
248    std::pair<ActiveTimerSet::iterator, bool> result
249      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250    assert(result.second); (void)result;
251  }
252
253  assert(timers_.size() == activeTimers_.size());
254  return earliestChanged;
255}
256
257