1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/
2354280cfSShuo Chen//
3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license
4354280cfSShuo Chen// that can be found in the License file.
5354280cfSShuo Chen//
6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7354280cfSShuo Chen
8354280cfSShuo Chen#define __STDC_LIMIT_MACROS
9354280cfSShuo Chen#include "TimerQueue.h"
10354280cfSShuo Chen
11354280cfSShuo Chen#include "logging/Logging.h"
12354280cfSShuo Chen#include "EventLoop.h"
13354280cfSShuo Chen#include "Timer.h"
14354280cfSShuo Chen#include "TimerId.h"
15354280cfSShuo Chen
16354280cfSShuo Chen#include <boost/bind.hpp>
17354280cfSShuo Chen#include <boost/foreach.hpp>
18354280cfSShuo Chen
19354280cfSShuo Chen#include <sys/timerfd.h>
20354280cfSShuo Chen
21354280cfSShuo Chennamespace muduo
22354280cfSShuo Chen{
23354280cfSShuo Chennamespace detail
24354280cfSShuo Chen{
25354280cfSShuo Chen
26354280cfSShuo Chenint createTimerfd()
27354280cfSShuo Chen{
28354280cfSShuo Chen  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
29354280cfSShuo Chen                                 TFD_NONBLOCK | TFD_CLOEXEC);
30354280cfSShuo Chen  if (timerfd < 0)
31354280cfSShuo Chen  {
32354280cfSShuo Chen    LOG_SYSFATAL << "Failed in timerfd_create";
33354280cfSShuo Chen  }
34354280cfSShuo Chen  return timerfd;
35354280cfSShuo Chen}
36354280cfSShuo Chen
37354280cfSShuo Chenstruct timespec howMuchTimeFromNow(Timestamp when)
38354280cfSShuo Chen{
39354280cfSShuo Chen  int64_t microseconds = when.microSecondsSinceEpoch()
40354280cfSShuo Chen                         - Timestamp::now().microSecondsSinceEpoch();
41354280cfSShuo Chen  if (microseconds < 100)
42354280cfSShuo Chen  {
43354280cfSShuo Chen    microseconds = 100;
44354280cfSShuo Chen  }
45354280cfSShuo Chen  struct timespec ts;
46354280cfSShuo Chen  ts.tv_sec = static_cast<time_t>(
47354280cfSShuo Chen      microseconds / Timestamp::kMicroSecondsPerSecond);
48354280cfSShuo Chen  ts.tv_nsec = static_cast<long>(
49354280cfSShuo Chen      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
50354280cfSShuo Chen  return ts;
51354280cfSShuo Chen}
52354280cfSShuo Chen
53354280cfSShuo Chenvoid readTimerfd(int timerfd, Timestamp now)
54354280cfSShuo Chen{
55354280cfSShuo Chen  uint64_t howmany;
56354280cfSShuo Chen  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
57354280cfSShuo Chen  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
58354280cfSShuo Chen  if (n != sizeof howmany)
59354280cfSShuo Chen  {
60354280cfSShuo Chen    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
61354280cfSShuo Chen  }
62354280cfSShuo Chen}
63354280cfSShuo Chen
64354280cfSShuo Chenvoid resetTimerfd(int timerfd, Timestamp expiration)
65354280cfSShuo Chen{
66354280cfSShuo Chen  // wake up loop by timerfd_settime()
67354280cfSShuo Chen  struct itimerspec newValue;
68354280cfSShuo Chen  struct itimerspec oldValue;
69354280cfSShuo Chen  bzero(&newValue, sizeof newValue);
70354280cfSShuo Chen  bzero(&oldValue, sizeof oldValue);
71354280cfSShuo Chen  newValue.it_value = howMuchTimeFromNow(expiration);
72354280cfSShuo Chen  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
73354280cfSShuo Chen  if (ret)
74354280cfSShuo Chen  {
75354280cfSShuo Chen    LOG_SYSERR << "timerfd_settime()";
76354280cfSShuo Chen  }
77354280cfSShuo Chen}
78354280cfSShuo Chen
79354280cfSShuo Chen}
80354280cfSShuo Chen}
81354280cfSShuo Chen
82354280cfSShuo Chenusing namespace muduo;
83354280cfSShuo Chenusing namespace muduo::detail;
84354280cfSShuo Chen
85354280cfSShuo ChenTimerQueue::TimerQueue(EventLoop* loop)
86354280cfSShuo Chen  : loop_(loop),
87354280cfSShuo Chen    timerfd_(createTimerfd()),
88354280cfSShuo Chen    timerfdChannel_(loop, timerfd_),
89354280cfSShuo Chen    timers_(),
90354280cfSShuo Chen    callingExpiredTimers_(false)
91354280cfSShuo Chen{
92354280cfSShuo Chen  timerfdChannel_.setReadCallback(
93354280cfSShuo Chen      boost::bind(&TimerQueue::handleRead, this));
94354280cfSShuo Chen  // we are always reading the timerfd, we disarm it with timerfd_settime.
95354280cfSShuo Chen  timerfdChannel_.enableReading();
96354280cfSShuo Chen}
97354280cfSShuo Chen
98354280cfSShuo ChenTimerQueue::~TimerQueue()
99354280cfSShuo Chen{
100354280cfSShuo Chen  ::close(timerfd_);
101354280cfSShuo Chen  // do not remove channel, since we're in EventLoop::dtor();
102354280cfSShuo Chen  for (TimerList::iterator it = timers_.begin();
103354280cfSShuo Chen      it != timers_.end(); ++it)
104354280cfSShuo Chen  {
105354280cfSShuo Chen    delete it->second;
106354280cfSShuo Chen  }
107354280cfSShuo Chen}
108354280cfSShuo Chen
109354280cfSShuo ChenTimerId TimerQueue::addTimer(const TimerCallback& cb,
110354280cfSShuo Chen                             Timestamp when,
111354280cfSShuo Chen                             double interval)
112354280cfSShuo Chen{
113354280cfSShuo Chen  Timer* timer = new Timer(cb, when, interval);
114354280cfSShuo Chen  loop_->runInLoop(
115354280cfSShuo Chen      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
116354280cfSShuo Chen  return TimerId(timer, timer->sequence());
117354280cfSShuo Chen}
118354280cfSShuo Chen
119354280cfSShuo Chenvoid TimerQueue::cancel(TimerId timerId)
120354280cfSShuo Chen{
121354280cfSShuo Chen  loop_->runInLoop(
122354280cfSShuo Chen      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
123354280cfSShuo Chen}
124354280cfSShuo Chen
125354280cfSShuo Chenvoid TimerQueue::addTimerInLoop(Timer* timer)
126354280cfSShuo Chen{
127354280cfSShuo Chen  loop_->assertInLoopThread();
128354280cfSShuo Chen  bool earliestChanged = insert(timer);
129354280cfSShuo Chen
130354280cfSShuo Chen  if (earliestChanged)
131354280cfSShuo Chen  {
132354280cfSShuo Chen    resetTimerfd(timerfd_, timer->expiration());
133354280cfSShuo Chen  }
134354280cfSShuo Chen}
135354280cfSShuo Chen
136354280cfSShuo Chenvoid TimerQueue::cancelInLoop(TimerId timerId)
137354280cfSShuo Chen{
138354280cfSShuo Chen  loop_->assertInLoopThread();
139354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
140354280cfSShuo Chen  ActiveTimer timer(timerId.timer_, timerId.sequence_);
141354280cfSShuo Chen  ActiveTimerSet::iterator it = activeTimers_.find(timer);
142354280cfSShuo Chen  if (it != activeTimers_.end())
143354280cfSShuo Chen  {
144354280cfSShuo Chen    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
145354280cfSShuo Chen    assert(n == 1); (void)n;
146354280cfSShuo Chen    delete it->first; // FIXME: no delete please
147354280cfSShuo Chen    activeTimers_.erase(it);
148354280cfSShuo Chen  }
149354280cfSShuo Chen  else if (callingExpiredTimers_)
150354280cfSShuo Chen  {
151354280cfSShuo Chen    cancelingTimers_.insert(timer);
152354280cfSShuo Chen  }
153354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
154354280cfSShuo Chen}
155354280cfSShuo Chen
156354280cfSShuo Chenvoid TimerQueue::handleRead()
157354280cfSShuo Chen{
158354280cfSShuo Chen  loop_->assertInLoopThread();
159354280cfSShuo Chen  Timestamp now(Timestamp::now());
160354280cfSShuo Chen  readTimerfd(timerfd_, now);
161354280cfSShuo Chen
162354280cfSShuo Chen  std::vector<Entry> expired = getExpired(now);
163354280cfSShuo Chen
164354280cfSShuo Chen  callingExpiredTimers_ = true;
165354280cfSShuo Chen  cancelingTimers_.clear();
166354280cfSShuo Chen  // safe to callback outside critical section
167354280cfSShuo Chen  for (std::vector<Entry>::iterator it = expired.begin();
168354280cfSShuo Chen      it != expired.end(); ++it)
169354280cfSShuo Chen  {
170354280cfSShuo Chen    it->second->run();
171354280cfSShuo Chen  }
172354280cfSShuo Chen  callingExpiredTimers_ = false;
173354280cfSShuo Chen
174354280cfSShuo Chen  reset(expired, now);
175354280cfSShuo Chen}
176354280cfSShuo Chen
177354280cfSShuo Chenstd::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
178354280cfSShuo Chen{
179354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
180354280cfSShuo Chen  std::vector<Entry> expired;
181354280cfSShuo Chen  Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
182354280cfSShuo Chen  TimerList::iterator it = timers_.lower_bound(sentry);
183354280cfSShuo Chen  assert(it == timers_.end() || now < it->first);
184354280cfSShuo Chen  std::copy(timers_.begin(), it, back_inserter(expired));
185354280cfSShuo Chen  timers_.erase(timers_.begin(), it);
186354280cfSShuo Chen
187354280cfSShuo Chen  BOOST_FOREACH(Entry entry, expired)
188354280cfSShuo Chen  {
189354280cfSShuo Chen    ActiveTimer timer(entry.second, entry.second->sequence());
190354280cfSShuo Chen    size_t n = activeTimers_.erase(timer);
191354280cfSShuo Chen    assert(n == 1); (void)n;
192354280cfSShuo Chen  }
193354280cfSShuo Chen
194354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
195354280cfSShuo Chen  return expired;
196354280cfSShuo Chen}
197354280cfSShuo Chen
198354280cfSShuo Chenvoid TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
199354280cfSShuo Chen{
200354280cfSShuo Chen  Timestamp nextExpire;
201354280cfSShuo Chen
202354280cfSShuo Chen  for (std::vector<Entry>::const_iterator it = expired.begin();
203354280cfSShuo Chen      it != expired.end(); ++it)
204354280cfSShuo Chen  {
205354280cfSShuo Chen    ActiveTimer timer(it->second, it->second->sequence());
206354280cfSShuo Chen    if (it->second->repeat()
207354280cfSShuo Chen        && cancelingTimers_.find(timer) == cancelingTimers_.end())
208354280cfSShuo Chen    {
209354280cfSShuo Chen      it->second->restart(now);
210354280cfSShuo Chen      insert(it->second);
211354280cfSShuo Chen    }
212354280cfSShuo Chen    else
213354280cfSShuo Chen    {
214354280cfSShuo Chen      // FIXME move to a free list
215354280cfSShuo Chen      delete it->second;
216354280cfSShuo Chen    }
217354280cfSShuo Chen  }
218354280cfSShuo Chen
219354280cfSShuo Chen  if (!timers_.empty())
220354280cfSShuo Chen  {
221354280cfSShuo Chen    nextExpire = timers_.begin()->second->expiration();
222354280cfSShuo Chen  }
223354280cfSShuo Chen
224354280cfSShuo Chen  if (nextExpire.valid())
225354280cfSShuo Chen  {
226354280cfSShuo Chen    resetTimerfd(timerfd_, nextExpire);
227354280cfSShuo Chen  }
228354280cfSShuo Chen}
229354280cfSShuo Chen
230354280cfSShuo Chenbool TimerQueue::insert(Timer* timer)
231354280cfSShuo Chen{
232354280cfSShuo Chen  loop_->assertInLoopThread();
233354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
234354280cfSShuo Chen  bool earliestChanged = false;
235354280cfSShuo Chen  Timestamp when = timer->expiration();
236354280cfSShuo Chen  TimerList::iterator it = timers_.begin();
237354280cfSShuo Chen  if (it == timers_.end() || when < it->first)
238354280cfSShuo Chen  {
239354280cfSShuo Chen    earliestChanged = true;
240354280cfSShuo Chen  }
241354280cfSShuo Chen
242354280cfSShuo Chen  {
243354280cfSShuo Chen    std::pair<TimerList::iterator, bool> result
244354280cfSShuo Chen      = timers_.insert(Entry(when, timer));
245354280cfSShuo Chen    assert(result.second); (void)result;
246354280cfSShuo Chen  }
247354280cfSShuo Chen  {
248354280cfSShuo Chen    std::pair<ActiveTimerSet::iterator, bool> result
249354280cfSShuo Chen      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
250354280cfSShuo Chen    assert(result.second); (void)result;
251354280cfSShuo Chen  }
252354280cfSShuo Chen
253354280cfSShuo Chen  assert(timers_.size() == activeTimers_.size());
254354280cfSShuo Chen  return earliestChanged;
255354280cfSShuo Chen}
256354280cfSShuo Chen
257