s11-s10-TimerQueue.cc.diff revision f4e8e3d3
1f4e8e3d3SShuo Chen // excerpts from http://code.google.com/p/muduo/
2f4e8e3d3SShuo Chen //
3f4e8e3d3SShuo Chen // Use of this source code is governed by a BSD-style license
4f4e8e3d3SShuo Chen // that can be found in the License file.
5f4e8e3d3SShuo Chen //
6f4e8e3d3SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
7f4e8e3d3SShuo Chen 
8f4e8e3d3SShuo Chen #define __STDC_LIMIT_MACROS
9f4e8e3d3SShuo Chen #include "TimerQueue.h"
10f4e8e3d3SShuo Chen 
11f4e8e3d3SShuo Chen #include "logging/Logging.h"
12f4e8e3d3SShuo Chen #include "EventLoop.h"
13f4e8e3d3SShuo Chen #include "Timer.h"
14f4e8e3d3SShuo Chen #include "TimerId.h"
15f4e8e3d3SShuo Chen 
16f4e8e3d3SShuo Chen #include <boost/bind.hpp>
17f4e8e3d3SShuo Chen+#include <boost/foreach.hpp>
18f4e8e3d3SShuo Chen 
19f4e8e3d3SShuo Chen #include <sys/timerfd.h>
20f4e8e3d3SShuo Chen 
21f4e8e3d3SShuo Chen namespace muduo
22f4e8e3d3SShuo Chen {
23f4e8e3d3SShuo Chen namespace detail
24f4e8e3d3SShuo Chen {
25f4e8e3d3SShuo Chen 
26f4e8e3d3SShuo Chen int createTimerfd()
27f4e8e3d3SShuo Chen {
28f4e8e3d3SShuo Chen   int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29f4e8e3d3SShuo Chen                                  TFD_NONBLOCK | TFD_CLOEXEC);
30f4e8e3d3SShuo Chen   if (timerfd < 0)
31f4e8e3d3SShuo Chen   {
32f4e8e3d3SShuo Chen     LOG_SYSFATAL << "Failed in timerfd_create";
33f4e8e3d3SShuo Chen   }
34f4e8e3d3SShuo Chen   return timerfd;
35f4e8e3d3SShuo Chen }
36f4e8e3d3SShuo Chen 
37f4e8e3d3SShuo Chen struct timespec howMuchTimeFromNow(Timestamp when)
38f4e8e3d3SShuo Chen {
39f4e8e3d3SShuo Chen   int64_t microseconds = when.microSecondsSinceEpoch()
40f4e8e3d3SShuo Chen                          - Timestamp::now().microSecondsSinceEpoch();
41f4e8e3d3SShuo Chen   if (microseconds < 100)
42f4e8e3d3SShuo Chen   {
43f4e8e3d3SShuo Chen     microseconds = 100;
44f4e8e3d3SShuo Chen   }
45f4e8e3d3SShuo Chen   struct timespec ts;
46f4e8e3d3SShuo Chen   ts.tv_sec = static_cast<time_t>(
47f4e8e3d3SShuo Chen       microseconds / Timestamp::kMicroSecondsPerSecond);
48f4e8e3d3SShuo Chen   ts.tv_nsec = static_cast<long>(
49f4e8e3d3SShuo Chen       (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50f4e8e3d3SShuo Chen   return ts;
51f4e8e3d3SShuo Chen }
52f4e8e3d3SShuo Chen 
53f4e8e3d3SShuo Chen void readTimerfd(int timerfd, Timestamp now)
54f4e8e3d3SShuo Chen {
55f4e8e3d3SShuo Chen   uint64_t howmany;
56f4e8e3d3SShuo Chen   ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57f4e8e3d3SShuo Chen   LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58f4e8e3d3SShuo Chen   if (n != sizeof howmany)
59f4e8e3d3SShuo Chen   {
60f4e8e3d3SShuo Chen     LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61f4e8e3d3SShuo Chen   }
62f4e8e3d3SShuo Chen }
63f4e8e3d3SShuo Chen 
64f4e8e3d3SShuo Chen void resetTimerfd(int timerfd, Timestamp expiration)
65f4e8e3d3SShuo Chen {
66f4e8e3d3SShuo Chen   // wake up loop by timerfd_settime()
67f4e8e3d3SShuo Chen   struct itimerspec newValue;
68f4e8e3d3SShuo Chen   struct itimerspec oldValue;
69f4e8e3d3SShuo Chen   bzero(&newValue, sizeof newValue);
70f4e8e3d3SShuo Chen   bzero(&oldValue, sizeof oldValue);
71f4e8e3d3SShuo Chen   newValue.it_value = howMuchTimeFromNow(expiration);
72f4e8e3d3SShuo Chen   int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73f4e8e3d3SShuo Chen   if (ret)
74f4e8e3d3SShuo Chen   {
75f4e8e3d3SShuo Chen     LOG_SYSERR << "timerfd_settime()";
76f4e8e3d3SShuo Chen   }
77f4e8e3d3SShuo Chen }
78f4e8e3d3SShuo Chen 
79f4e8e3d3SShuo Chen }
80f4e8e3d3SShuo Chen }
81f4e8e3d3SShuo Chen 
82f4e8e3d3SShuo Chen using namespace muduo;
83f4e8e3d3SShuo Chen using namespace muduo::detail;
84f4e8e3d3SShuo Chen 
85f4e8e3d3SShuo Chen TimerQueue::TimerQueue(EventLoop* loop)
86f4e8e3d3SShuo Chen   : loop_(loop),
87f4e8e3d3SShuo Chen     timerfd_(createTimerfd()),
88f4e8e3d3SShuo Chen     timerfdChannel_(loop, timerfd_),
89f4e8e3d3SShuo Chen-    timers_()
90f4e8e3d3SShuo Chen+    timers_(),
91f4e8e3d3SShuo Chen+    callingExpiredTimers_(false)
92f4e8e3d3SShuo Chen {
93f4e8e3d3SShuo Chen   timerfdChannel_.setReadCallback(
94f4e8e3d3SShuo Chen       boost::bind(&TimerQueue::handleRead, this));
95f4e8e3d3SShuo Chen   // we are always reading the timerfd, we disarm it with timerfd_settime.
96f4e8e3d3SShuo Chen   timerfdChannel_.enableReading();
97f4e8e3d3SShuo Chen }
98f4e8e3d3SShuo Chen 
99f4e8e3d3SShuo Chen TimerQueue::~TimerQueue()
100f4e8e3d3SShuo Chen {
101f4e8e3d3SShuo Chen   ::close(timerfd_);
102f4e8e3d3SShuo Chen   // do not remove channel, since we're in EventLoop::dtor();
103f4e8e3d3SShuo Chen   for (TimerList::iterator it = timers_.begin();
104f4e8e3d3SShuo Chen       it != timers_.end(); ++it)
105f4e8e3d3SShuo Chen   {
106f4e8e3d3SShuo Chen     delete it->second;
107f4e8e3d3SShuo Chen   }
108f4e8e3d3SShuo Chen }
109f4e8e3d3SShuo Chen 
110f4e8e3d3SShuo Chen TimerId TimerQueue::addTimer(const TimerCallback& cb,
111f4e8e3d3SShuo Chen                              Timestamp when,
112f4e8e3d3SShuo Chen                              double interval)
113f4e8e3d3SShuo Chen {
114f4e8e3d3SShuo Chen   Timer* timer = new Timer(cb, when, interval);
115f4e8e3d3SShuo Chen   loop_->runInLoop(
116f4e8e3d3SShuo Chen       boost::bind(&TimerQueue::addTimerInLoop, this, timer));
117f4e8e3d3SShuo Chen!  return TimerId(timer, timer->sequence());
118f4e8e3d3SShuo Chen }
119f4e8e3d3SShuo Chen 
120f4e8e3d3SShuo Chen+void TimerQueue::cancel(TimerId timerId)
121f4e8e3d3SShuo Chen+{
122f4e8e3d3SShuo Chen+  loop_->runInLoop(
123f4e8e3d3SShuo Chen+      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
124f4e8e3d3SShuo Chen+}
125f4e8e3d3SShuo Chen+
126f4e8e3d3SShuo Chen void TimerQueue::addTimerInLoop(Timer* timer)
127f4e8e3d3SShuo Chen {
128f4e8e3d3SShuo Chen   loop_->assertInLoopThread();
129f4e8e3d3SShuo Chen   bool earliestChanged = insert(timer);
130f4e8e3d3SShuo Chen 
131f4e8e3d3SShuo Chen   if (earliestChanged)
132f4e8e3d3SShuo Chen   {
133f4e8e3d3SShuo Chen     resetTimerfd(timerfd_, timer->expiration());
134f4e8e3d3SShuo Chen   }
135f4e8e3d3SShuo Chen }
136f4e8e3d3SShuo Chen 
137f4e8e3d3SShuo Chen+void TimerQueue::cancelInLoop(TimerId timerId)
138f4e8e3d3SShuo Chen+{
139f4e8e3d3SShuo Chen+  loop_->assertInLoopThread();
140f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
141f4e8e3d3SShuo Chen+  ActiveTimer timer(timerId.timer_, timerId.seq_);
142f4e8e3d3SShuo Chen+  ActiveTimerSet::iterator it = activeTimers_.find(timer);
143f4e8e3d3SShuo Chen+  if (it != activeTimers_.end())
144f4e8e3d3SShuo Chen+  {
145f4e8e3d3SShuo Chen+    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
146f4e8e3d3SShuo Chen+    assert(n == 1); (void)n;
147f4e8e3d3SShuo Chen+    delete it->first; // FIXME: no delete please
148f4e8e3d3SShuo Chen+    activeTimers_.erase(it);
149f4e8e3d3SShuo Chen+  }
150f4e8e3d3SShuo Chen+  else if (callingExpiredTimers_)
151f4e8e3d3SShuo Chen+  {
152f4e8e3d3SShuo Chen+    cancelingTimers_.insert(timer);
153f4e8e3d3SShuo Chen+  }
154f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
155f4e8e3d3SShuo Chen+}
156f4e8e3d3SShuo Chen+
157f4e8e3d3SShuo Chen void TimerQueue::handleRead()
158f4e8e3d3SShuo Chen {
159f4e8e3d3SShuo Chen   loop_->assertInLoopThread();
160f4e8e3d3SShuo Chen   Timestamp now(Timestamp::now());
161f4e8e3d3SShuo Chen   readTimerfd(timerfd_, now);
162f4e8e3d3SShuo Chen 
163f4e8e3d3SShuo Chen   std::vector<Entry> expired = getExpired(now);
164f4e8e3d3SShuo Chen 
165f4e8e3d3SShuo Chen+  callingExpiredTimers_ = true;
166f4e8e3d3SShuo Chen+  cancelingTimers_.clear();
167f4e8e3d3SShuo Chen   // safe to callback outside critical section
168f4e8e3d3SShuo Chen   for (std::vector<Entry>::iterator it = expired.begin();
169f4e8e3d3SShuo Chen       it != expired.end(); ++it)
170f4e8e3d3SShuo Chen   {
171f4e8e3d3SShuo Chen     it->second->run();
172f4e8e3d3SShuo Chen   }
173f4e8e3d3SShuo Chen+  callingExpiredTimers_ = false;
174f4e8e3d3SShuo Chen 
175f4e8e3d3SShuo Chen   reset(expired, now);
176f4e8e3d3SShuo Chen }
177f4e8e3d3SShuo Chen 
178f4e8e3d3SShuo Chen std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
179f4e8e3d3SShuo Chen {
180f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
181f4e8e3d3SShuo Chen   std::vector<Entry> expired;
182f4e8e3d3SShuo Chen   Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
183f4e8e3d3SShuo Chen   TimerList::iterator it = timers_.lower_bound(sentry);
184f4e8e3d3SShuo Chen   assert(it == timers_.end() || now < it->first);
185f4e8e3d3SShuo Chen   std::copy(timers_.begin(), it, back_inserter(expired));
186f4e8e3d3SShuo Chen   timers_.erase(timers_.begin(), it);
187f4e8e3d3SShuo Chen 
188f4e8e3d3SShuo Chen+  BOOST_FOREACH(Entry entry, expired)
189f4e8e3d3SShuo Chen+  {
190f4e8e3d3SShuo Chen+    ActiveTimer timer(entry.second, entry.second->sequence());
191f4e8e3d3SShuo Chen+    size_t n = activeTimers_.erase(timer);
192f4e8e3d3SShuo Chen+    assert(n == 1); (void)n;
193f4e8e3d3SShuo Chen+  }
194f4e8e3d3SShuo Chen+
195f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
196f4e8e3d3SShuo Chen   return expired;
197f4e8e3d3SShuo Chen }
198f4e8e3d3SShuo Chen 
199f4e8e3d3SShuo Chen void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
200f4e8e3d3SShuo Chen {
201f4e8e3d3SShuo Chen   Timestamp nextExpire;
202f4e8e3d3SShuo Chen 
203f4e8e3d3SShuo Chen   for (std::vector<Entry>::const_iterator it = expired.begin();
204f4e8e3d3SShuo Chen       it != expired.end(); ++it)
205f4e8e3d3SShuo Chen   {
206f4e8e3d3SShuo Chen+    ActiveTimer timer(it->second, it->second->sequence());
207f4e8e3d3SShuo Chen!    if (it->second->repeat()
208f4e8e3d3SShuo Chen+        && cancelingTimers_.find(timer) == cancelingTimers_.end())
209f4e8e3d3SShuo Chen     {
210f4e8e3d3SShuo Chen       it->second->restart(now);
211f4e8e3d3SShuo Chen       insert(it->second);
212f4e8e3d3SShuo Chen     }
213f4e8e3d3SShuo Chen     else
214f4e8e3d3SShuo Chen     {
215f4e8e3d3SShuo Chen       // FIXME move to a free list
216f4e8e3d3SShuo Chen       delete it->second;
217f4e8e3d3SShuo Chen     }
218f4e8e3d3SShuo Chen   }
219f4e8e3d3SShuo Chen 
220f4e8e3d3SShuo Chen   if (!timers_.empty())
221f4e8e3d3SShuo Chen   {
222f4e8e3d3SShuo Chen     nextExpire = timers_.begin()->second->expiration();
223f4e8e3d3SShuo Chen   }
224f4e8e3d3SShuo Chen 
225f4e8e3d3SShuo Chen   if (nextExpire.valid())
226f4e8e3d3SShuo Chen   {
227f4e8e3d3SShuo Chen     resetTimerfd(timerfd_, nextExpire);
228f4e8e3d3SShuo Chen   }
229f4e8e3d3SShuo Chen }
230f4e8e3d3SShuo Chen 
231f4e8e3d3SShuo Chen bool TimerQueue::insert(Timer* timer)
232f4e8e3d3SShuo Chen {
233f4e8e3d3SShuo Chen+  loop_->assertInLoopThread();
234f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
235f4e8e3d3SShuo Chen   bool earliestChanged = false;
236f4e8e3d3SShuo Chen   Timestamp when = timer->expiration();
237f4e8e3d3SShuo Chen   TimerList::iterator it = timers_.begin();
238f4e8e3d3SShuo Chen   if (it == timers_.end() || when < it->first)
239f4e8e3d3SShuo Chen   {
240f4e8e3d3SShuo Chen     earliestChanged = true;
241f4e8e3d3SShuo Chen   }
242f4e8e3d3SShuo Chen+
243f4e8e3d3SShuo Chen+  {
244f4e8e3d3SShuo Chen     std::pair<TimerList::iterator, bool> result
245f4e8e3d3SShuo Chen       = timers_.insert(Entry(when, timer));
246f4e8e3d3SShuo Chen     assert(result.second); (void)result;
247f4e8e3d3SShuo Chen+  }
248f4e8e3d3SShuo Chen+  {
249f4e8e3d3SShuo Chen+    std::pair<ActiveTimerSet::iterator, bool> result
250f4e8e3d3SShuo Chen+      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
251f4e8e3d3SShuo Chen+    assert(result.second); (void)result;
252f4e8e3d3SShuo Chen+  }
253f4e8e3d3SShuo Chen+
254f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
255f4e8e3d3SShuo Chen   return earliestChanged;
256f4e8e3d3SShuo Chen }
257f4e8e3d3SShuo Chen 
258