1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #define __STDC_LIMIT_MACROS
9 #include "TimerQueue.h"
10 
11 #include "logging/Logging.h"
12 #include "EventLoop.h"
13 #include "Timer.h"
14 #include "TimerId.h"
15 
16 #include <boost/bind.hpp>
17+#include <boost/foreach.hpp>
18 
19 #include <sys/timerfd.h>
20 
21 namespace muduo
22 {
23 namespace detail
24 {
25 
26 int createTimerfd()
27 {
28   int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29                                  TFD_NONBLOCK | TFD_CLOEXEC);
30   if (timerfd < 0)
31   {
32     LOG_SYSFATAL << "Failed in timerfd_create";
33   }
34   return timerfd;
35 }
36 
37 struct timespec howMuchTimeFromNow(Timestamp when)
38 {
39   int64_t microseconds = when.microSecondsSinceEpoch()
40                          - Timestamp::now().microSecondsSinceEpoch();
41   if (microseconds < 100)
42   {
43     microseconds = 100;
44   }
45   struct timespec ts;
46   ts.tv_sec = static_cast<time_t>(
47       microseconds / Timestamp::kMicroSecondsPerSecond);
48   ts.tv_nsec = static_cast<long>(
49       (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50   return ts;
51 }
52 
53 void readTimerfd(int timerfd, Timestamp now)
54 {
55   uint64_t howmany;
56   ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57   LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58   if (n != sizeof howmany)
59   {
60     LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61   }
62 }
63 
64 void resetTimerfd(int timerfd, Timestamp expiration)
65 {
66   // wake up loop by timerfd_settime()
67   struct itimerspec newValue;
68   struct itimerspec oldValue;
69   bzero(&newValue, sizeof newValue);
70   bzero(&oldValue, sizeof oldValue);
71   newValue.it_value = howMuchTimeFromNow(expiration);
72   int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73   if (ret)
74   {
75     LOG_SYSERR << "timerfd_settime()";
76   }
77 }
78 
79 }
80 }
81 
82 using namespace muduo;
83 using namespace muduo::detail;
84 
85 TimerQueue::TimerQueue(EventLoop* loop)
86   : loop_(loop),
87     timerfd_(createTimerfd()),
88     timerfdChannel_(loop, timerfd_),
89     timers_(),
90+    callingExpiredTimers_(false)
91 {
92   timerfdChannel_.setReadCallback(
93       boost::bind(&TimerQueue::handleRead, this));
94   // we are always reading the timerfd, we disarm it with timerfd_settime.
95   timerfdChannel_.enableReading();
96 }
97 
98 TimerQueue::~TimerQueue()
99 {
100   ::close(timerfd_);
101   // do not remove channel, since we're in EventLoop::dtor();
102   for (TimerList::iterator it = timers_.begin();
103       it != timers_.end(); ++it)
104   {
105     delete it->second;
106   }
107 }
108 
109 TimerId TimerQueue::addTimer(const TimerCallback& cb,
110                              Timestamp when,
111                              double interval)
112 {
113   Timer* timer = new Timer(cb, when, interval);
114   loop_->runInLoop(
115       boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116!  return TimerId(timer, timer->sequence());
117 }
118 
119+void TimerQueue::cancel(TimerId timerId)
120+{
121+  loop_->runInLoop(
122+      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123+}
124+
125 void TimerQueue::addTimerInLoop(Timer* timer)
126 {
127   loop_->assertInLoopThread();
128   bool earliestChanged = insert(timer);
129 
130   if (earliestChanged)
131   {
132     resetTimerfd(timerfd_, timer->expiration());
133   }
134 }
135 
136+void TimerQueue::cancelInLoop(TimerId timerId)
137+{
138+  loop_->assertInLoopThread();
139+  assert(timers_.size() == activeTimers_.size());
140+  ActiveTimer timer(timerId.timer_, timerId.seq_);
141+  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142+  if (it != activeTimers_.end())
143+  {
144+    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145+    assert(n == 1); (void)n;
146+    delete it->first; // FIXME: no delete please
147+    activeTimers_.erase(it);
148+  }
149+  else if (callingExpiredTimers_)
150+  {
151+    cancelingTimers_.insert(timer);
152+  }
153+  assert(timers_.size() == activeTimers_.size());
154+}
155+
156 void TimerQueue::handleRead()
157 {
158   loop_->assertInLoopThread();
159   Timestamp now(Timestamp::now());
160   readTimerfd(timerfd_, now);
161 
162   std::vector<Entry> expired = getExpired(now);
163 
164+  callingExpiredTimers_ = true;
165+  cancelingTimers_.clear();
166   // safe to callback outside critical section
167   for (std::vector<Entry>::iterator it = expired.begin();
168       it != expired.end(); ++it)
169   {
170     it->second->run();
171   }
172+  callingExpiredTimers_ = false;
173 
174   reset(expired, now);
175 }
176 
177 std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
178 {
179+  assert(timers_.size() == activeTimers_.size());
180   std::vector<Entry> expired;
181   Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
182   TimerList::iterator it = timers_.lower_bound(sentry);
183   assert(it == timers_.end() || now < it->first);
184   std::copy(timers_.begin(), it, back_inserter(expired));
185   timers_.erase(timers_.begin(), it);
186 
187+  BOOST_FOREACH(Entry entry, expired)
188+  {
189+    ActiveTimer timer(entry.second, entry.second->sequence());
190+    size_t n = activeTimers_.erase(timer);
191+    assert(n == 1); (void)n;
192+  }
193+
194+  assert(timers_.size() == activeTimers_.size());
195   return expired;
196 }
197 
198 void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
199 {
200   Timestamp nextExpire;
201 
202   for (std::vector<Entry>::const_iterator it = expired.begin();
203       it != expired.end(); ++it)
204   {
205+    ActiveTimer timer(it->second, it->second->sequence());
206!    if (it->second->repeat()
207+        && cancelingTimers_.find(timer) == cancelingTimers_.end())
208     {
209       it->second->restart(now);
210       insert(it->second);
211     }
212     else
213     {
214       // FIXME move to a free list
215       delete it->second;
216     }
217   }
218 
219   if (!timers_.empty())
220   {
221     nextExpire = timers_.begin()->second->expiration();
222   }
223 
224   if (nextExpire.valid())
225   {
226     resetTimerfd(timerfd_, nextExpire);
227   }
228 }
229 
230 bool TimerQueue::insert(Timer* timer)
231 {
232+  loop_->assertInLoopThread();
233+  assert(timers_.size() == activeTimers_.size());
234   bool earliestChanged = false;
235   Timestamp when = timer->expiration();
236   TimerList::iterator it = timers_.begin();
237   if (it == timers_.end() || when < it->first)
238   {
239     earliestChanged = true;
240   }
241+
242+  {
243     std::pair<TimerList::iterator, bool> result
244       = timers_.insert(Entry(when, timer));
245     assert(result.second); (void)result;
246+  }
247+  {
248+    std::pair<ActiveTimerSet::iterator, bool> result
249+      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250+    assert(result.second); (void)result;
251+  }
252+
253+  assert(timers_.size() == activeTimers_.size());
254   return earliestChanged;
255 }
256 
257