TimerQueue.cc revision fdb5c17c
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#define __STDC_LIMIT_MACROS
9a1bde736SShuo Chen#include "TimerQueue.h"
10a1bde736SShuo Chen
11a1bde736SShuo Chen#include "logging/Logging.h"
12a1bde736SShuo Chen#include "EventLoop.h"
13a1bde736SShuo Chen#include "Timer.h"
14a1bde736SShuo Chen#include "TimerId.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen#include <boost/foreach.hpp>
18a1bde736SShuo Chen
19a1bde736SShuo Chen#include <sys/timerfd.h>
20a1bde736SShuo Chen
21a1bde736SShuo Chennamespace muduo
22a1bde736SShuo Chen{
23a1bde736SShuo Chennamespace detail
24a1bde736SShuo Chen{
25a1bde736SShuo Chen
26a1bde736SShuo Chenint createTimerfd()
27a1bde736SShuo Chen{
28a1bde736SShuo Chen  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29a1bde736SShuo Chen                                 TFD_NONBLOCK | TFD_CLOEXEC);
30a1bde736SShuo Chen  if (timerfd < 0)
31a1bde736SShuo Chen  {
32a1bde736SShuo Chen    LOG_SYSFATAL << "Failed in timerfd_create";
33a1bde736SShuo Chen  }
34a1bde736SShuo Chen  return timerfd;
35a1bde736SShuo Chen}
36a1bde736SShuo Chen
37a1bde736SShuo Chenstruct timespec howMuchTimeFromNow(Timestamp when)
38a1bde736SShuo Chen{
39a1bde736SShuo Chen  int64_t microseconds = when.microSecondsSinceEpoch()
40a1bde736SShuo Chen                         - Timestamp::now().microSecondsSinceEpoch();
41a1bde736SShuo Chen  if (microseconds < 100)
42a1bde736SShuo Chen  {
43a1bde736SShuo Chen    microseconds = 100;
44a1bde736SShuo Chen  }
45a1bde736SShuo Chen  struct timespec ts;
46a1bde736SShuo Chen  ts.tv_sec = static_cast<time_t>(
47a1bde736SShuo Chen      microseconds / Timestamp::kMicroSecondsPerSecond);
48a1bde736SShuo Chen  ts.tv_nsec = static_cast<long>(
49a1bde736SShuo Chen      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50a1bde736SShuo Chen  return ts;
51a1bde736SShuo Chen}
52a1bde736SShuo Chen
53a1bde736SShuo Chenvoid readTimerfd(int timerfd, Timestamp now)
54a1bde736SShuo Chen{
55a1bde736SShuo Chen  uint64_t howmany;
56a1bde736SShuo Chen  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57a1bde736SShuo Chen  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58a1bde736SShuo Chen  if (n != sizeof howmany)
59a1bde736SShuo Chen  {
60a1bde736SShuo Chen    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61a1bde736SShuo Chen  }
62a1bde736SShuo Chen}
63a1bde736SShuo Chen
64a1bde736SShuo Chenvoid resetTimerfd(int timerfd, Timestamp expiration)
65a1bde736SShuo Chen{
66a1bde736SShuo Chen  // wake up loop by timerfd_settime()
67a1bde736SShuo Chen  struct itimerspec newValue;
68a1bde736SShuo Chen  struct itimerspec oldValue;
69a1bde736SShuo Chen  bzero(&newValue, sizeof newValue);
70a1bde736SShuo Chen  bzero(&oldValue, sizeof oldValue);
71a1bde736SShuo Chen  newValue.it_value = howMuchTimeFromNow(expiration);
72a1bde736SShuo Chen  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73a1bde736SShuo Chen  if (ret)
74a1bde736SShuo Chen  {
75a1bde736SShuo Chen    LOG_SYSERR << "timerfd_settime()";
76a1bde736SShuo Chen  }
77a1bde736SShuo Chen}
78a1bde736SShuo Chen
79a1bde736SShuo Chen}
80a1bde736SShuo Chen}
81a1bde736SShuo Chen
82a1bde736SShuo Chenusing namespace muduo;
83a1bde736SShuo Chenusing namespace muduo::detail;
84a1bde736SShuo Chen
85a1bde736SShuo ChenTimerQueue::TimerQueue(EventLoop* loop)
86a1bde736SShuo Chen  : loop_(loop),
87a1bde736SShuo Chen    timerfd_(createTimerfd()),
88a1bde736SShuo Chen    timerfdChannel_(loop, timerfd_),
89a1bde736SShuo Chen    timers_(),
90a1bde736SShuo Chen    callingExpiredTimers_(false)
91a1bde736SShuo Chen{
92a1bde736SShuo Chen  timerfdChannel_.setReadCallback(
93a1bde736SShuo Chen      boost::bind(&TimerQueue::handleRead, this));
94a1bde736SShuo Chen  // we are always reading the timerfd, we disarm it with timerfd_settime.
95a1bde736SShuo Chen  timerfdChannel_.enableReading();
96a1bde736SShuo Chen}
97a1bde736SShuo Chen
98a1bde736SShuo ChenTimerQueue::~TimerQueue()
99a1bde736SShuo Chen{
100a1bde736SShuo Chen  ::close(timerfd_);
101a1bde736SShuo Chen  // do not remove channel, since we're in EventLoop::dtor();
102a1bde736SShuo Chen  for (TimerList::iterator it = timers_.begin();
103a1bde736SShuo Chen      it != timers_.end(); ++it)
104a1bde736SShuo Chen  {
105a1bde736SShuo Chen    delete it->second;
106a1bde736SShuo Chen  }
107a1bde736SShuo Chen}
108a1bde736SShuo Chen
109a1bde736SShuo ChenTimerId TimerQueue::addTimer(const TimerCallback& cb,
110a1bde736SShuo Chen                             Timestamp when,
111a1bde736SShuo Chen                             double interval)
112a1bde736SShuo Chen{
113a1bde736SShuo Chen  Timer* timer = new Timer(cb, when, interval);
114a1bde736SShuo Chen  loop_->runInLoop(
115a1bde736SShuo Chen      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116a1bde736SShuo Chen  return TimerId(timer, timer->sequence());
117a1bde736SShuo Chen}
118a1bde736SShuo Chen
119a1bde736SShuo Chenvoid TimerQueue::cancel(TimerId timerId)
120a1bde736SShuo Chen{
121a1bde736SShuo Chen  loop_->runInLoop(
122a1bde736SShuo Chen      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123a1bde736SShuo Chen}
124a1bde736SShuo Chen
125a1bde736SShuo Chenvoid TimerQueue::addTimerInLoop(Timer* timer)
126a1bde736SShuo Chen{
127a1bde736SShuo Chen  loop_->assertInLoopThread();
128a1bde736SShuo Chen  bool earliestChanged = insert(timer);
129a1bde736SShuo Chen
130a1bde736SShuo Chen  if (earliestChanged)
131a1bde736SShuo Chen  {
132a1bde736SShuo Chen    resetTimerfd(timerfd_, timer->expiration());
133a1bde736SShuo Chen  }
134a1bde736SShuo Chen}
135a1bde736SShuo Chen
136a1bde736SShuo Chenvoid TimerQueue::cancelInLoop(TimerId timerId)
137a1bde736SShuo Chen{
138a1bde736SShuo Chen  loop_->assertInLoopThread();
139a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
140fdb5c17cSShuo Chen  ActiveTimer timer(timerId.timer_, timerId.sequence_);
141a1bde736SShuo Chen  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142a1bde736SShuo Chen  if (it != activeTimers_.end())
143a1bde736SShuo Chen  {
144a1bde736SShuo Chen    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145a1bde736SShuo Chen    assert(n == 1); (void)n;
146a1bde736SShuo Chen    delete it->first; // FIXME: no delete please
147a1bde736SShuo Chen    activeTimers_.erase(it);
148a1bde736SShuo Chen  }
149a1bde736SShuo Chen  else if (callingExpiredTimers_)
150a1bde736SShuo Chen  {
151a1bde736SShuo Chen    cancelingTimers_.insert(timer);
152a1bde736SShuo Chen  }
153a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
154a1bde736SShuo Chen}
155a1bde736SShuo Chen
156a1bde736SShuo Chenvoid TimerQueue::handleRead()
157a1bde736SShuo Chen{
158a1bde736SShuo Chen  loop_->assertInLoopThread();
159a1bde736SShuo Chen  Timestamp now(Timestamp::now());
160a1bde736SShuo Chen  readTimerfd(timerfd_, now);
161a1bde736SShuo Chen
162a1bde736SShuo Chen  std::vector<Entry> expired = getExpired(now);
163a1bde736SShuo Chen
164a1bde736SShuo Chen  callingExpiredTimers_ = true;
165a1bde736SShuo Chen  cancelingTimers_.clear();
166a1bde736SShuo Chen  // safe to callback outside critical section
167a1bde736SShuo Chen  for (std::vector<Entry>::iterator it = expired.begin();
168a1bde736SShuo Chen      it != expired.end(); ++it)
169a1bde736SShuo Chen  {
170a1bde736SShuo Chen    it->second->run();
171a1bde736SShuo Chen  }
172a1bde736SShuo Chen  callingExpiredTimers_ = false;
173a1bde736SShuo Chen
174a1bde736SShuo Chen  reset(expired, now);
175a1bde736SShuo Chen}
176a1bde736SShuo Chen
177a1bde736SShuo Chenstd::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
178a1bde736SShuo Chen{
179a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
180a1bde736SShuo Chen  std::vector<Entry> expired;
181a1bde736SShuo Chen  Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
182a1bde736SShuo Chen  TimerList::iterator it = timers_.lower_bound(sentry);
183a1bde736SShuo Chen  assert(it == timers_.end() || now < it->first);
184a1bde736SShuo Chen  std::copy(timers_.begin(), it, back_inserter(expired));
185a1bde736SShuo Chen  timers_.erase(timers_.begin(), it);
186a1bde736SShuo Chen
187a1bde736SShuo Chen  BOOST_FOREACH(Entry entry, expired)
188a1bde736SShuo Chen  {
189a1bde736SShuo Chen    ActiveTimer timer(entry.second, entry.second->sequence());
190a1bde736SShuo Chen    size_t n = activeTimers_.erase(timer);
191a1bde736SShuo Chen    assert(n == 1); (void)n;
192a1bde736SShuo Chen  }
193a1bde736SShuo Chen
194a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
195a1bde736SShuo Chen  return expired;
196a1bde736SShuo Chen}
197a1bde736SShuo Chen
198a1bde736SShuo Chenvoid TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
199a1bde736SShuo Chen{
200a1bde736SShuo Chen  Timestamp nextExpire;
201a1bde736SShuo Chen
202a1bde736SShuo Chen  for (std::vector<Entry>::const_iterator it = expired.begin();
203a1bde736SShuo Chen      it != expired.end(); ++it)
204a1bde736SShuo Chen  {
205a1bde736SShuo Chen    ActiveTimer timer(it->second, it->second->sequence());
206a1bde736SShuo Chen    if (it->second->repeat()
207a1bde736SShuo Chen        && cancelingTimers_.find(timer) == cancelingTimers_.end())
208a1bde736SShuo Chen    {
209a1bde736SShuo Chen      it->second->restart(now);
210a1bde736SShuo Chen      insert(it->second);
211a1bde736SShuo Chen    }
212a1bde736SShuo Chen    else
213a1bde736SShuo Chen    {
214a1bde736SShuo Chen      // FIXME move to a free list
215a1bde736SShuo Chen      delete it->second;
216a1bde736SShuo Chen    }
217a1bde736SShuo Chen  }
218a1bde736SShuo Chen
219a1bde736SShuo Chen  if (!timers_.empty())
220a1bde736SShuo Chen  {
221a1bde736SShuo Chen    nextExpire = timers_.begin()->second->expiration();
222a1bde736SShuo Chen  }
223a1bde736SShuo Chen
224a1bde736SShuo Chen  if (nextExpire.valid())
225a1bde736SShuo Chen  {
226a1bde736SShuo Chen    resetTimerfd(timerfd_, nextExpire);
227a1bde736SShuo Chen  }
228a1bde736SShuo Chen}
229a1bde736SShuo Chen
230a1bde736SShuo Chenbool TimerQueue::insert(Timer* timer)
231a1bde736SShuo Chen{
232a1bde736SShuo Chen  loop_->assertInLoopThread();
233a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
234a1bde736SShuo Chen  bool earliestChanged = false;
235a1bde736SShuo Chen  Timestamp when = timer->expiration();
236a1bde736SShuo Chen  TimerList::iterator it = timers_.begin();
237a1bde736SShuo Chen  if (it == timers_.end() || when < it->first)
238a1bde736SShuo Chen  {
239a1bde736SShuo Chen    earliestChanged = true;
240a1bde736SShuo Chen  }
241a1bde736SShuo Chen
242a1bde736SShuo Chen  {
243a1bde736SShuo Chen    std::pair<TimerList::iterator, bool> result
244a1bde736SShuo Chen      = timers_.insert(Entry(when, timer));
245a1bde736SShuo Chen    assert(result.second); (void)result;
246a1bde736SShuo Chen  }
247a1bde736SShuo Chen  {
248a1bde736SShuo Chen    std::pair<ActiveTimerSet::iterator, bool> result
249a1bde736SShuo Chen      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250a1bde736SShuo Chen    assert(result.second); (void)result;
251a1bde736SShuo Chen  }
252a1bde736SShuo Chen
253a1bde736SShuo Chen  assert(timers_.size() == activeTimers_.size());
254a1bde736SShuo Chen  return earliestChanged;
255a1bde736SShuo Chen}
256a1bde736SShuo Chen
257