1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #define __STDC_LIMIT_MACROS
9 #include "TimerQueue.h"
10 
11 #include "logging/Logging.h"
12 #include "EventLoop.h"
13 #include "Timer.h"
14 #include "TimerId.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <sys/timerfd.h>
19 
20 namespace muduo
21 {
22 namespace detail
23 {
24 
25 int createTimerfd()
26 {
27   int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
28                                  TFD_NONBLOCK | TFD_CLOEXEC);
29   if (timerfd < 0)
30   {
31     LOG_SYSFATAL << "Failed in timerfd_create";
32   }
33   return timerfd;
34 }
35 
36 struct timespec howMuchTimeFromNow(Timestamp when)
37 {
38   int64_t microseconds = when.microSecondsSinceEpoch()
39                          - Timestamp::now().microSecondsSinceEpoch();
40   if (microseconds < 100)
41   {
42     microseconds = 100;
43   }
44   struct timespec ts;
45   ts.tv_sec = static_cast<time_t>(
46       microseconds / Timestamp::kMicroSecondsPerSecond);
47   ts.tv_nsec = static_cast<long>(
48       (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
49   return ts;
50 }
51 
52 void readTimerfd(int timerfd, Timestamp now)
53 {
54   uint64_t howmany;
55   ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
56   LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
57   if (n != sizeof howmany)
58   {
59     LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
60   }
61 }
62 
63 void resetTimerfd(int timerfd, Timestamp expiration)
64 {
65   // wake up loop by timerfd_settime()
66   struct itimerspec newValue;
67   struct itimerspec oldValue;
68   bzero(&newValue, sizeof newValue);
69   bzero(&oldValue, sizeof oldValue);
70   newValue.it_value = howMuchTimeFromNow(expiration);
71   int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
72   if (ret)
73   {
74     LOG_SYSERR << "timerfd_settime()";
75   }
76 }
77 
78 }
79 }
80 
81 using namespace muduo;
82 using namespace muduo::detail;
83 
84 TimerQueue::TimerQueue(EventLoop* loop)
85   : loop_(loop),
86     timerfd_(createTimerfd()),
87     timerfdChannel_(loop, timerfd_),
88     timers_()
89 {
90   timerfdChannel_.setReadCallback(
91       boost::bind(&TimerQueue::handleRead, this));
92   // we are always reading the timerfd, we disarm it with timerfd_settime.
93   timerfdChannel_.enableReading();
94 }
95 
96 TimerQueue::~TimerQueue()
97 {
98   ::close(timerfd_);
99   // do not remove channel, since we're in EventLoop::dtor();
100   for (TimerList::iterator it = timers_.begin();
101       it != timers_.end(); ++it)
102   {
103     delete it->second;
104   }
105 }
106 
107 TimerId TimerQueue::addTimer(const TimerCallback& cb,
108                              Timestamp when,
109                              double interval)
110 {
111   Timer* timer = new Timer(cb, when, interval);
112+  loop_->runInLoop(
113+      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
114+  return TimerId(timer);
115+}
116 
117+void TimerQueue::addTimerInLoop(Timer* timer)
118+{
119   loop_->assertInLoopThread();
120   bool earliestChanged = insert(timer);
121 
122   if (earliestChanged)
123   {
124     resetTimerfd(timerfd_, timer->expiration());
125   }
126-  return TimerId(timer);
127 }
128 
129 void TimerQueue::handleRead()
130 {
131   loop_->assertInLoopThread();
132   Timestamp now(Timestamp::now());
133   readTimerfd(timerfd_, now);
134 
135   std::vector<Entry> expired = getExpired(now);
136 
137   // safe to callback outside critical section
138   for (std::vector<Entry>::iterator it = expired.begin();
139       it != expired.end(); ++it)
140   {
141     it->second->run();
142   }
143 
144   reset(expired, now);
145 }
146 
147 std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
148 {
149   std::vector<Entry> expired;
150   Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
151   TimerList::iterator it = timers_.lower_bound(sentry);
152   assert(it == timers_.end() || now < it->first);
153   std::copy(timers_.begin(), it, back_inserter(expired));
154   timers_.erase(timers_.begin(), it);
155 
156   return expired;
157 }
158 
159 void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
160 {
161   Timestamp nextExpire;
162 
163   for (std::vector<Entry>::const_iterator it = expired.begin();
164       it != expired.end(); ++it)
165   {
166     if (it->second->repeat())
167     {
168       it->second->restart(now);
169       insert(it->second);
170     }
171     else
172     {
173       // FIXME move to a free list
174       delete it->second;
175     }
176   }
177 
178   if (!timers_.empty())
179   {
180     nextExpire = timers_.begin()->second->expiration();
181   }
182 
183   if (nextExpire.valid())
184   {
185     resetTimerfd(timerfd_, nextExpire);
186   }
187 }
188 
189 bool TimerQueue::insert(Timer* timer)
190 {
191   bool earliestChanged = false;
192   Timestamp when = timer->expiration();
193   TimerList::iterator it = timers_.begin();
194   if (it == timers_.end() || when < it->first)
195   {
196     earliestChanged = true;
197   }
198   std::pair<TimerList::iterator, bool> result =
199           timers_.insert(std::make_pair(when, timer));
200   assert(result.second);
201   return earliestChanged;
202 }
203 
204