s11-s10-TimerQueue.cc.diff revision db9c5511
1f4e8e3d3SShuo Chen // excerpts from http://code.google.com/p/muduo/
2f4e8e3d3SShuo Chen //
3f4e8e3d3SShuo Chen // Use of this source code is governed by a BSD-style license
4f4e8e3d3SShuo Chen // that can be found in the License file.
5f4e8e3d3SShuo Chen //
6f4e8e3d3SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
7f4e8e3d3SShuo Chen 
8f4e8e3d3SShuo Chen #define __STDC_LIMIT_MACROS
9f4e8e3d3SShuo Chen #include "TimerQueue.h"
10f4e8e3d3SShuo Chen 
11f4e8e3d3SShuo Chen #include "logging/Logging.h"
12f4e8e3d3SShuo Chen #include "EventLoop.h"
13f4e8e3d3SShuo Chen #include "Timer.h"
14f4e8e3d3SShuo Chen #include "TimerId.h"
15f4e8e3d3SShuo Chen 
16f4e8e3d3SShuo Chen #include <boost/bind.hpp>
17f4e8e3d3SShuo Chen+#include <boost/foreach.hpp>
18f4e8e3d3SShuo Chen 
19f4e8e3d3SShuo Chen #include <sys/timerfd.h>
20f4e8e3d3SShuo Chen 
21f4e8e3d3SShuo Chen namespace muduo
22f4e8e3d3SShuo Chen {
23f4e8e3d3SShuo Chen namespace detail
24f4e8e3d3SShuo Chen {
25f4e8e3d3SShuo Chen 
26f4e8e3d3SShuo Chen int createTimerfd()
27f4e8e3d3SShuo Chen {
28f4e8e3d3SShuo Chen   int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29f4e8e3d3SShuo Chen                                  TFD_NONBLOCK | TFD_CLOEXEC);
30f4e8e3d3SShuo Chen   if (timerfd < 0)
31f4e8e3d3SShuo Chen   {
32f4e8e3d3SShuo Chen     LOG_SYSFATAL << "Failed in timerfd_create";
33f4e8e3d3SShuo Chen   }
34f4e8e3d3SShuo Chen   return timerfd;
35f4e8e3d3SShuo Chen }
36f4e8e3d3SShuo Chen 
37f4e8e3d3SShuo Chen struct timespec howMuchTimeFromNow(Timestamp when)
38f4e8e3d3SShuo Chen {
39f4e8e3d3SShuo Chen   int64_t microseconds = when.microSecondsSinceEpoch()
40f4e8e3d3SShuo Chen                          - Timestamp::now().microSecondsSinceEpoch();
41f4e8e3d3SShuo Chen   if (microseconds < 100)
42f4e8e3d3SShuo Chen   {
43f4e8e3d3SShuo Chen     microseconds = 100;
44f4e8e3d3SShuo Chen   }
45f4e8e3d3SShuo Chen   struct timespec ts;
46f4e8e3d3SShuo Chen   ts.tv_sec = static_cast<time_t>(
47f4e8e3d3SShuo Chen       microseconds / Timestamp::kMicroSecondsPerSecond);
48f4e8e3d3SShuo Chen   ts.tv_nsec = static_cast<long>(
49f4e8e3d3SShuo Chen       (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50f4e8e3d3SShuo Chen   return ts;
51f4e8e3d3SShuo Chen }
52f4e8e3d3SShuo Chen 
53f4e8e3d3SShuo Chen void readTimerfd(int timerfd, Timestamp now)
54f4e8e3d3SShuo Chen {
55f4e8e3d3SShuo Chen   uint64_t howmany;
56f4e8e3d3SShuo Chen   ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57f4e8e3d3SShuo Chen   LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58f4e8e3d3SShuo Chen   if (n != sizeof howmany)
59f4e8e3d3SShuo Chen   {
60f4e8e3d3SShuo Chen     LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61f4e8e3d3SShuo Chen   }
62f4e8e3d3SShuo Chen }
63f4e8e3d3SShuo Chen 
64f4e8e3d3SShuo Chen void resetTimerfd(int timerfd, Timestamp expiration)
65f4e8e3d3SShuo Chen {
66f4e8e3d3SShuo Chen   // wake up loop by timerfd_settime()
67f4e8e3d3SShuo Chen   struct itimerspec newValue;
68f4e8e3d3SShuo Chen   struct itimerspec oldValue;
69f4e8e3d3SShuo Chen   bzero(&newValue, sizeof newValue);
70f4e8e3d3SShuo Chen   bzero(&oldValue, sizeof oldValue);
71f4e8e3d3SShuo Chen   newValue.it_value = howMuchTimeFromNow(expiration);
72f4e8e3d3SShuo Chen   int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73f4e8e3d3SShuo Chen   if (ret)
74f4e8e3d3SShuo Chen   {
75f4e8e3d3SShuo Chen     LOG_SYSERR << "timerfd_settime()";
76f4e8e3d3SShuo Chen   }
77f4e8e3d3SShuo Chen }
78f4e8e3d3SShuo Chen 
79f4e8e3d3SShuo Chen }
80f4e8e3d3SShuo Chen }
81f4e8e3d3SShuo Chen 
82f4e8e3d3SShuo Chen using namespace muduo;
83f4e8e3d3SShuo Chen using namespace muduo::detail;
84f4e8e3d3SShuo Chen 
85f4e8e3d3SShuo Chen TimerQueue::TimerQueue(EventLoop* loop)
86f4e8e3d3SShuo Chen   : loop_(loop),
87f4e8e3d3SShuo Chen     timerfd_(createTimerfd()),
88f4e8e3d3SShuo Chen     timerfdChannel_(loop, timerfd_),
89db9c5511SShuo Chen     timers_(),
90f4e8e3d3SShuo Chen+    callingExpiredTimers_(false)
91f4e8e3d3SShuo Chen {
92f4e8e3d3SShuo Chen   timerfdChannel_.setReadCallback(
93f4e8e3d3SShuo Chen       boost::bind(&TimerQueue::handleRead, this));
94f4e8e3d3SShuo Chen   // we are always reading the timerfd, we disarm it with timerfd_settime.
95f4e8e3d3SShuo Chen   timerfdChannel_.enableReading();
96f4e8e3d3SShuo Chen }
97f4e8e3d3SShuo Chen 
98f4e8e3d3SShuo Chen TimerQueue::~TimerQueue()
99f4e8e3d3SShuo Chen {
100f4e8e3d3SShuo Chen   ::close(timerfd_);
101f4e8e3d3SShuo Chen   // do not remove channel, since we're in EventLoop::dtor();
102f4e8e3d3SShuo Chen   for (TimerList::iterator it = timers_.begin();
103f4e8e3d3SShuo Chen       it != timers_.end(); ++it)
104f4e8e3d3SShuo Chen   {
105f4e8e3d3SShuo Chen     delete it->second;
106f4e8e3d3SShuo Chen   }
107f4e8e3d3SShuo Chen }
108f4e8e3d3SShuo Chen 
109f4e8e3d3SShuo Chen TimerId TimerQueue::addTimer(const TimerCallback& cb,
110f4e8e3d3SShuo Chen                              Timestamp when,
111f4e8e3d3SShuo Chen                              double interval)
112f4e8e3d3SShuo Chen {
113f4e8e3d3SShuo Chen   Timer* timer = new Timer(cb, when, interval);
114f4e8e3d3SShuo Chen   loop_->runInLoop(
115f4e8e3d3SShuo Chen       boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116f4e8e3d3SShuo Chen!  return TimerId(timer, timer->sequence());
117f4e8e3d3SShuo Chen }
118f4e8e3d3SShuo Chen 
119f4e8e3d3SShuo Chen+void TimerQueue::cancel(TimerId timerId)
120f4e8e3d3SShuo Chen+{
121f4e8e3d3SShuo Chen+  loop_->runInLoop(
122f4e8e3d3SShuo Chen+      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123f4e8e3d3SShuo Chen+}
124f4e8e3d3SShuo Chen+
125f4e8e3d3SShuo Chen void TimerQueue::addTimerInLoop(Timer* timer)
126f4e8e3d3SShuo Chen {
127f4e8e3d3SShuo Chen   loop_->assertInLoopThread();
128f4e8e3d3SShuo Chen   bool earliestChanged = insert(timer);
129f4e8e3d3SShuo Chen 
130f4e8e3d3SShuo Chen   if (earliestChanged)
131f4e8e3d3SShuo Chen   {
132f4e8e3d3SShuo Chen     resetTimerfd(timerfd_, timer->expiration());
133f4e8e3d3SShuo Chen   }
134f4e8e3d3SShuo Chen }
135f4e8e3d3SShuo Chen 
136f4e8e3d3SShuo Chen+void TimerQueue::cancelInLoop(TimerId timerId)
137f4e8e3d3SShuo Chen+{
138f4e8e3d3SShuo Chen+  loop_->assertInLoopThread();
139f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
140f4e8e3d3SShuo Chen+  ActiveTimer timer(timerId.timer_, timerId.seq_);
141f4e8e3d3SShuo Chen+  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142f4e8e3d3SShuo Chen+  if (it != activeTimers_.end())
143f4e8e3d3SShuo Chen+  {
144f4e8e3d3SShuo Chen+    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145f4e8e3d3SShuo Chen+    assert(n == 1); (void)n;
146f4e8e3d3SShuo Chen+    delete it->first; // FIXME: no delete please
147f4e8e3d3SShuo Chen+    activeTimers_.erase(it);
148f4e8e3d3SShuo Chen+  }
149f4e8e3d3SShuo Chen+  else if (callingExpiredTimers_)
150f4e8e3d3SShuo Chen+  {
151f4e8e3d3SShuo Chen+    cancelingTimers_.insert(timer);
152f4e8e3d3SShuo Chen+  }
153f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
154f4e8e3d3SShuo Chen+}
155f4e8e3d3SShuo Chen+
156f4e8e3d3SShuo Chen void TimerQueue::handleRead()
157f4e8e3d3SShuo Chen {
158f4e8e3d3SShuo Chen   loop_->assertInLoopThread();
159f4e8e3d3SShuo Chen   Timestamp now(Timestamp::now());
160f4e8e3d3SShuo Chen   readTimerfd(timerfd_, now);
161f4e8e3d3SShuo Chen 
162f4e8e3d3SShuo Chen   std::vector<Entry> expired = getExpired(now);
163f4e8e3d3SShuo Chen 
164f4e8e3d3SShuo Chen+  callingExpiredTimers_ = true;
165f4e8e3d3SShuo Chen+  cancelingTimers_.clear();
166f4e8e3d3SShuo Chen   // safe to callback outside critical section
167f4e8e3d3SShuo Chen   for (std::vector<Entry>::iterator it = expired.begin();
168f4e8e3d3SShuo Chen       it != expired.end(); ++it)
169f4e8e3d3SShuo Chen   {
170f4e8e3d3SShuo Chen     it->second->run();
171f4e8e3d3SShuo Chen   }
172f4e8e3d3SShuo Chen+  callingExpiredTimers_ = false;
173f4e8e3d3SShuo Chen 
174f4e8e3d3SShuo Chen   reset(expired, now);
175f4e8e3d3SShuo Chen }
176f4e8e3d3SShuo Chen 
177f4e8e3d3SShuo Chen std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
178f4e8e3d3SShuo Chen {
179f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
180f4e8e3d3SShuo Chen   std::vector<Entry> expired;
181f4e8e3d3SShuo Chen   Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
182f4e8e3d3SShuo Chen   TimerList::iterator it = timers_.lower_bound(sentry);
183f4e8e3d3SShuo Chen   assert(it == timers_.end() || now < it->first);
184f4e8e3d3SShuo Chen   std::copy(timers_.begin(), it, back_inserter(expired));
185f4e8e3d3SShuo Chen   timers_.erase(timers_.begin(), it);
186f4e8e3d3SShuo Chen 
187f4e8e3d3SShuo Chen+  BOOST_FOREACH(Entry entry, expired)
188f4e8e3d3SShuo Chen+  {
189f4e8e3d3SShuo Chen+    ActiveTimer timer(entry.second, entry.second->sequence());
190f4e8e3d3SShuo Chen+    size_t n = activeTimers_.erase(timer);
191f4e8e3d3SShuo Chen+    assert(n == 1); (void)n;
192f4e8e3d3SShuo Chen+  }
193f4e8e3d3SShuo Chen+
194f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
195f4e8e3d3SShuo Chen   return expired;
196f4e8e3d3SShuo Chen }
197f4e8e3d3SShuo Chen 
198f4e8e3d3SShuo Chen void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
199f4e8e3d3SShuo Chen {
200f4e8e3d3SShuo Chen   Timestamp nextExpire;
201f4e8e3d3SShuo Chen 
202f4e8e3d3SShuo Chen   for (std::vector<Entry>::const_iterator it = expired.begin();
203f4e8e3d3SShuo Chen       it != expired.end(); ++it)
204f4e8e3d3SShuo Chen   {
205f4e8e3d3SShuo Chen+    ActiveTimer timer(it->second, it->second->sequence());
206f4e8e3d3SShuo Chen!    if (it->second->repeat()
207f4e8e3d3SShuo Chen+        && cancelingTimers_.find(timer) == cancelingTimers_.end())
208f4e8e3d3SShuo Chen     {
209f4e8e3d3SShuo Chen       it->second->restart(now);
210f4e8e3d3SShuo Chen       insert(it->second);
211f4e8e3d3SShuo Chen     }
212f4e8e3d3SShuo Chen     else
213f4e8e3d3SShuo Chen     {
214f4e8e3d3SShuo Chen       // FIXME move to a free list
215f4e8e3d3SShuo Chen       delete it->second;
216f4e8e3d3SShuo Chen     }
217f4e8e3d3SShuo Chen   }
218f4e8e3d3SShuo Chen 
219f4e8e3d3SShuo Chen   if (!timers_.empty())
220f4e8e3d3SShuo Chen   {
221f4e8e3d3SShuo Chen     nextExpire = timers_.begin()->second->expiration();
222f4e8e3d3SShuo Chen   }
223f4e8e3d3SShuo Chen 
224f4e8e3d3SShuo Chen   if (nextExpire.valid())
225f4e8e3d3SShuo Chen   {
226f4e8e3d3SShuo Chen     resetTimerfd(timerfd_, nextExpire);
227f4e8e3d3SShuo Chen   }
228f4e8e3d3SShuo Chen }
229f4e8e3d3SShuo Chen 
230f4e8e3d3SShuo Chen bool TimerQueue::insert(Timer* timer)
231f4e8e3d3SShuo Chen {
232f4e8e3d3SShuo Chen+  loop_->assertInLoopThread();
233f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
234f4e8e3d3SShuo Chen   bool earliestChanged = false;
235f4e8e3d3SShuo Chen   Timestamp when = timer->expiration();
236f4e8e3d3SShuo Chen   TimerList::iterator it = timers_.begin();
237f4e8e3d3SShuo Chen   if (it == timers_.end() || when < it->first)
238f4e8e3d3SShuo Chen   {
239f4e8e3d3SShuo Chen     earliestChanged = true;
240f4e8e3d3SShuo Chen   }
241f4e8e3d3SShuo Chen+
242f4e8e3d3SShuo Chen+  {
243f4e8e3d3SShuo Chen     std::pair<TimerList::iterator, bool> result
244f4e8e3d3SShuo Chen       = timers_.insert(Entry(when, timer));
245f4e8e3d3SShuo Chen     assert(result.second); (void)result;
246f4e8e3d3SShuo Chen+  }
247f4e8e3d3SShuo Chen+  {
248f4e8e3d3SShuo Chen+    std::pair<ActiveTimerSet::iterator, bool> result
249f4e8e3d3SShuo Chen+      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250f4e8e3d3SShuo Chen+    assert(result.second); (void)result;
251f4e8e3d3SShuo Chen+  }
252f4e8e3d3SShuo Chen+
253f4e8e3d3SShuo Chen+  assert(timers_.size() == activeTimers_.size());
254f4e8e3d3SShuo Chen   return earliestChanged;
255f4e8e3d3SShuo Chen }
256f4e8e3d3SShuo Chen 
257