1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#define __STDC_LIMIT_MACROS 9#include "TimerQueue.h" 10 11#include "logging/Logging.h" 12#include "EventLoop.h" 13#include "Timer.h" 14#include "TimerId.h" 15 16#include <boost/bind.hpp> 17#include <boost/foreach.hpp> 18 19#include <sys/timerfd.h> 20 21namespace muduo 22{ 23namespace detail 24{ 25 26int createTimerfd() 27{ 28 int timerfd = ::timerfd_create(CLOCK_MONOTONIC, 29 TFD_NONBLOCK | TFD_CLOEXEC); 30 if (timerfd < 0) 31 { 32 LOG_SYSFATAL << "Failed in timerfd_create"; 33 } 34 return timerfd; 35} 36 37struct timespec howMuchTimeFromNow(Timestamp when) 38{ 39 int64_t microseconds = when.microSecondsSinceEpoch() 40 - Timestamp::now().microSecondsSinceEpoch(); 41 if (microseconds < 100) 42 { 43 microseconds = 100; 44 } 45 struct timespec ts; 46 ts.tv_sec = static_cast<time_t>( 47 microseconds / Timestamp::kMicroSecondsPerSecond); 48 ts.tv_nsec = static_cast<long>( 49 (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); 50 return ts; 51} 52 53void readTimerfd(int timerfd, Timestamp now) 54{ 55 uint64_t howmany; 56 ssize_t n = ::read(timerfd, &howmany, sizeof howmany); 57 LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); 58 if (n != sizeof howmany) 59 { 60 LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; 61 } 62} 63 64void resetTimerfd(int timerfd, Timestamp expiration) 65{ 66 // wake up loop by timerfd_settime() 67 struct itimerspec newValue; 68 struct itimerspec oldValue; 69 bzero(&newValue, sizeof newValue); 70 bzero(&oldValue, sizeof oldValue); 71 newValue.it_value = howMuchTimeFromNow(expiration); 72 int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); 73 if (ret) 74 { 75 LOG_SYSERR << "timerfd_settime()"; 76 } 77} 78 79} 80} 81 82using namespace muduo; 83using namespace muduo::detail; 84 85TimerQueue::TimerQueue(EventLoop* loop) 86 : loop_(loop), 87 timerfd_(createTimerfd()), 88 timerfdChannel_(loop, timerfd_), 89 timers_(), 90 callingExpiredTimers_(false) 91{ 92 timerfdChannel_.setReadCallback( 93 boost::bind(&TimerQueue::handleRead, this)); 94 // we are always reading the timerfd, we disarm it with timerfd_settime. 95 timerfdChannel_.enableReading(); 96} 97 98TimerQueue::~TimerQueue() 99{ 100 ::close(timerfd_); 101 // do not remove channel, since we're in EventLoop::dtor(); 102 for (TimerList::iterator it = timers_.begin(); 103 it != timers_.end(); ++it) 104 { 105 delete it->second; 106 } 107} 108 109TimerId TimerQueue::addTimer(const TimerCallback& cb, 110 Timestamp when, 111 double interval) 112{ 113 Timer* timer = new Timer(cb, when, interval); 114 loop_->runInLoop( 115 boost::bind(&TimerQueue::addTimerInLoop, this, timer)); 116 return TimerId(timer, timer->sequence()); 117} 118 119void TimerQueue::cancel(TimerId timerId) 120{ 121 loop_->runInLoop( 122 boost::bind(&TimerQueue::cancelInLoop, this, timerId)); 123} 124 125void TimerQueue::addTimerInLoop(Timer* timer) 126{ 127 loop_->assertInLoopThread(); 128 bool earliestChanged = insert(timer); 129 130 if (earliestChanged) 131 { 132 resetTimerfd(timerfd_, timer->expiration()); 133 } 134} 135 136void TimerQueue::cancelInLoop(TimerId timerId) 137{ 138 loop_->assertInLoopThread(); 139 assert(timers_.size() == activeTimers_.size()); 140 ActiveTimer timer(timerId.timer_, timerId.sequence_); 141 ActiveTimerSet::iterator it = activeTimers_.find(timer); 142 if (it != activeTimers_.end()) 143 { 144 size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); 145 assert(n == 1); (void)n; 146 delete it->first; // FIXME: no delete please 147 activeTimers_.erase(it); 148 } 149 else if (callingExpiredTimers_) 150 { 151 cancelingTimers_.insert(timer); 152 } 153 assert(timers_.size() == activeTimers_.size()); 154} 155 156void TimerQueue::handleRead() 157{ 158 loop_->assertInLoopThread(); 159 Timestamp now(Timestamp::now()); 160 readTimerfd(timerfd_, now); 161 162 std::vector<Entry> expired = getExpired(now); 163 164 callingExpiredTimers_ = true; 165 cancelingTimers_.clear(); 166 // safe to callback outside critical section 167 for (std::vector<Entry>::iterator it = expired.begin(); 168 it != expired.end(); ++it) 169 { 170 it->second->run(); 171 } 172 callingExpiredTimers_ = false; 173 174 reset(expired, now); 175} 176 177std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) 178{ 179 assert(timers_.size() == activeTimers_.size()); 180 std::vector<Entry> expired; 181 Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 182 TimerList::iterator it = timers_.lower_bound(sentry); 183 assert(it == timers_.end() || now < it->first); 184 std::copy(timers_.begin(), it, back_inserter(expired)); 185 timers_.erase(timers_.begin(), it); 186 187 BOOST_FOREACH(Entry entry, expired) 188 { 189 ActiveTimer timer(entry.second, entry.second->sequence()); 190 size_t n = activeTimers_.erase(timer); 191 assert(n == 1); (void)n; 192 } 193 194 assert(timers_.size() == activeTimers_.size()); 195 return expired; 196} 197 198void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) 199{ 200 Timestamp nextExpire; 201 202 for (std::vector<Entry>::const_iterator it = expired.begin(); 203 it != expired.end(); ++it) 204 { 205 ActiveTimer timer(it->second, it->second->sequence()); 206 if (it->second->repeat() 207 && cancelingTimers_.find(timer) == cancelingTimers_.end()) 208 { 209 it->second->restart(now); 210 insert(it->second); 211 } 212 else 213 { 214 // FIXME move to a free list 215 delete it->second; 216 } 217 } 218 219 if (!timers_.empty()) 220 { 221 nextExpire = timers_.begin()->second->expiration(); 222 } 223 224 if (nextExpire.valid()) 225 { 226 resetTimerfd(timerfd_, nextExpire); 227 } 228} 229 230bool TimerQueue::insert(Timer* timer) 231{ 232 loop_->assertInLoopThread(); 233 assert(timers_.size() == activeTimers_.size()); 234 bool earliestChanged = false; 235 Timestamp when = timer->expiration(); 236 TimerList::iterator it = timers_.begin(); 237 if (it == timers_.end() || when < it->first) 238 { 239 earliestChanged = true; 240 } 241 242 { 243 std::pair<TimerList::iterator, bool> result 244 = timers_.insert(Entry(when, timer)); 245 assert(result.second); (void)result; 246 } 247 { 248 std::pair<ActiveTimerSet::iterator, bool> result 249 = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); 250 assert(result.second); (void)result; 251 } 252 253 assert(timers_.size() == activeTimers_.size()); 254 return earliestChanged; 255} 256 257