1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #define __STDC_LIMIT_MACROS 9 #include "TimerQueue.h" 10 11 #include "logging/Logging.h" 12 #include "EventLoop.h" 13 #include "Timer.h" 14 #include "TimerId.h" 15 16 #include <boost/bind.hpp> 17+#include <boost/foreach.hpp> 18 19 #include <sys/timerfd.h> 20 21 namespace muduo 22 { 23 namespace detail 24 { 25 26 int createTimerfd() 27 { 28 int timerfd = ::timerfd_create(CLOCK_MONOTONIC, 29 TFD_NONBLOCK | TFD_CLOEXEC); 30 if (timerfd < 0) 31 { 32 LOG_SYSFATAL << "Failed in timerfd_create"; 33 } 34 return timerfd; 35 } 36 37 struct timespec howMuchTimeFromNow(Timestamp when) 38 { 39 int64_t microseconds = when.microSecondsSinceEpoch() 40 - Timestamp::now().microSecondsSinceEpoch(); 41 if (microseconds < 100) 42 { 43 microseconds = 100; 44 } 45 struct timespec ts; 46 ts.tv_sec = static_cast<time_t>( 47 microseconds / Timestamp::kMicroSecondsPerSecond); 48 ts.tv_nsec = static_cast<long>( 49 (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); 50 return ts; 51 } 52 53 void readTimerfd(int timerfd, Timestamp now) 54 { 55 uint64_t howmany; 56 ssize_t n = ::read(timerfd, &howmany, sizeof howmany); 57 LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); 58 if (n != sizeof howmany) 59 { 60 LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; 61 } 62 } 63 64 void resetTimerfd(int timerfd, Timestamp expiration) 65 { 66 // wake up loop by timerfd_settime() 67 struct itimerspec newValue; 68 struct itimerspec oldValue; 69 bzero(&newValue, sizeof newValue); 70 bzero(&oldValue, sizeof oldValue); 71 newValue.it_value = howMuchTimeFromNow(expiration); 72 int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); 73 if (ret) 74 { 75 LOG_SYSERR << "timerfd_settime()"; 76 } 77 } 78 79 } 80 } 81 82 using namespace muduo; 83 using namespace muduo::detail; 84 85 TimerQueue::TimerQueue(EventLoop* loop) 86 : loop_(loop), 87 timerfd_(createTimerfd()), 88 timerfdChannel_(loop, timerfd_), 89 timers_(), 90+ callingExpiredTimers_(false) 91 { 92 timerfdChannel_.setReadCallback( 93 boost::bind(&TimerQueue::handleRead, this)); 94 // we are always reading the timerfd, we disarm it with timerfd_settime. 95 timerfdChannel_.enableReading(); 96 } 97 98 TimerQueue::~TimerQueue() 99 { 100 ::close(timerfd_); 101 // do not remove channel, since we're in EventLoop::dtor(); 102 for (TimerList::iterator it = timers_.begin(); 103 it != timers_.end(); ++it) 104 { 105 delete it->second; 106 } 107 } 108 109 TimerId TimerQueue::addTimer(const TimerCallback& cb, 110 Timestamp when, 111 double interval) 112 { 113 Timer* timer = new Timer(cb, when, interval); 114 loop_->runInLoop( 115 boost::bind(&TimerQueue::addTimerInLoop, this, timer)); 116! return TimerId(timer, timer->sequence()); 117 } 118 119+void TimerQueue::cancel(TimerId timerId) 120+{ 121+ loop_->runInLoop( 122+ boost::bind(&TimerQueue::cancelInLoop, this, timerId)); 123+} 124+ 125 void TimerQueue::addTimerInLoop(Timer* timer) 126 { 127 loop_->assertInLoopThread(); 128 bool earliestChanged = insert(timer); 129 130 if (earliestChanged) 131 { 132 resetTimerfd(timerfd_, timer->expiration()); 133 } 134 } 135 136+void TimerQueue::cancelInLoop(TimerId timerId) 137+{ 138+ loop_->assertInLoopThread(); 139+ assert(timers_.size() == activeTimers_.size()); 140+ ActiveTimer timer(timerId.timer_, timerId.seq_); 141+ ActiveTimerSet::iterator it = activeTimers_.find(timer); 142+ if (it != activeTimers_.end()) 143+ { 144+ size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); 145+ assert(n == 1); (void)n; 146+ delete it->first; // FIXME: no delete please 147+ activeTimers_.erase(it); 148+ } 149+ else if (callingExpiredTimers_) 150+ { 151+ cancelingTimers_.insert(timer); 152+ } 153+ assert(timers_.size() == activeTimers_.size()); 154+} 155+ 156 void TimerQueue::handleRead() 157 { 158 loop_->assertInLoopThread(); 159 Timestamp now(Timestamp::now()); 160 readTimerfd(timerfd_, now); 161 162 std::vector<Entry> expired = getExpired(now); 163 164+ callingExpiredTimers_ = true; 165+ cancelingTimers_.clear(); 166 // safe to callback outside critical section 167 for (std::vector<Entry>::iterator it = expired.begin(); 168 it != expired.end(); ++it) 169 { 170 it->second->run(); 171 } 172+ callingExpiredTimers_ = false; 173 174 reset(expired, now); 175 } 176 177 std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) 178 { 179+ assert(timers_.size() == activeTimers_.size()); 180 std::vector<Entry> expired; 181 Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 182 TimerList::iterator it = timers_.lower_bound(sentry); 183 assert(it == timers_.end() || now < it->first); 184 std::copy(timers_.begin(), it, back_inserter(expired)); 185 timers_.erase(timers_.begin(), it); 186 187+ BOOST_FOREACH(Entry entry, expired) 188+ { 189+ ActiveTimer timer(entry.second, entry.second->sequence()); 190+ size_t n = activeTimers_.erase(timer); 191+ assert(n == 1); (void)n; 192+ } 193+ 194+ assert(timers_.size() == activeTimers_.size()); 195 return expired; 196 } 197 198 void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) 199 { 200 Timestamp nextExpire; 201 202 for (std::vector<Entry>::const_iterator it = expired.begin(); 203 it != expired.end(); ++it) 204 { 205+ ActiveTimer timer(it->second, it->second->sequence()); 206! if (it->second->repeat() 207+ && cancelingTimers_.find(timer) == cancelingTimers_.end()) 208 { 209 it->second->restart(now); 210 insert(it->second); 211 } 212 else 213 { 214 // FIXME move to a free list 215 delete it->second; 216 } 217 } 218 219 if (!timers_.empty()) 220 { 221 nextExpire = timers_.begin()->second->expiration(); 222 } 223 224 if (nextExpire.valid()) 225 { 226 resetTimerfd(timerfd_, nextExpire); 227 } 228 } 229 230 bool TimerQueue::insert(Timer* timer) 231 { 232+ loop_->assertInLoopThread(); 233+ assert(timers_.size() == activeTimers_.size()); 234 bool earliestChanged = false; 235 Timestamp when = timer->expiration(); 236 TimerList::iterator it = timers_.begin(); 237 if (it == timers_.end() || when < it->first) 238 { 239 earliestChanged = true; 240 } 241+ 242+ { 243 std::pair<TimerList::iterator, bool> result 244 = timers_.insert(Entry(when, timer)); 245 assert(result.second); (void)result; 246+ } 247+ { 248+ std::pair<ActiveTimerSet::iterator, bool> result 249+ = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); 250+ assert(result.second); (void)result; 251+ } 252+ 253+ assert(timers_.size() == activeTimers_.size()); 254 return earliestChanged; 255 } 256 257