s11-s10-TimerQueue.cc.diff revision f4e8e3d3
1f4e8e3d3SShuo Chen // excerpts from http://code.google.com/p/muduo/ 2f4e8e3d3SShuo Chen // 3f4e8e3d3SShuo Chen // Use of this source code is governed by a BSD-style license 4f4e8e3d3SShuo Chen // that can be found in the License file. 5f4e8e3d3SShuo Chen // 6f4e8e3d3SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 7f4e8e3d3SShuo Chen 8f4e8e3d3SShuo Chen #define __STDC_LIMIT_MACROS 9f4e8e3d3SShuo Chen #include "TimerQueue.h" 10f4e8e3d3SShuo Chen 11f4e8e3d3SShuo Chen #include "logging/Logging.h" 12f4e8e3d3SShuo Chen #include "EventLoop.h" 13f4e8e3d3SShuo Chen #include "Timer.h" 14f4e8e3d3SShuo Chen #include "TimerId.h" 15f4e8e3d3SShuo Chen 16f4e8e3d3SShuo Chen #include <boost/bind.hpp> 17f4e8e3d3SShuo Chen+#include <boost/foreach.hpp> 18f4e8e3d3SShuo Chen 19f4e8e3d3SShuo Chen #include <sys/timerfd.h> 20f4e8e3d3SShuo Chen 21f4e8e3d3SShuo Chen namespace muduo 22f4e8e3d3SShuo Chen { 23f4e8e3d3SShuo Chen namespace detail 24f4e8e3d3SShuo Chen { 25f4e8e3d3SShuo Chen 26f4e8e3d3SShuo Chen int createTimerfd() 27f4e8e3d3SShuo Chen { 28f4e8e3d3SShuo Chen int timerfd = ::timerfd_create(CLOCK_MONOTONIC, 29f4e8e3d3SShuo Chen TFD_NONBLOCK | TFD_CLOEXEC); 30f4e8e3d3SShuo Chen if (timerfd < 0) 31f4e8e3d3SShuo Chen { 32f4e8e3d3SShuo Chen LOG_SYSFATAL << "Failed in timerfd_create"; 33f4e8e3d3SShuo Chen } 34f4e8e3d3SShuo Chen return timerfd; 35f4e8e3d3SShuo Chen } 36f4e8e3d3SShuo Chen 37f4e8e3d3SShuo Chen struct timespec howMuchTimeFromNow(Timestamp when) 38f4e8e3d3SShuo Chen { 39f4e8e3d3SShuo Chen int64_t microseconds = when.microSecondsSinceEpoch() 40f4e8e3d3SShuo Chen - Timestamp::now().microSecondsSinceEpoch(); 41f4e8e3d3SShuo Chen if (microseconds < 100) 42f4e8e3d3SShuo Chen { 43f4e8e3d3SShuo Chen microseconds = 100; 44f4e8e3d3SShuo Chen } 45f4e8e3d3SShuo Chen struct timespec ts; 46f4e8e3d3SShuo Chen ts.tv_sec = static_cast<time_t>( 47f4e8e3d3SShuo Chen microseconds / Timestamp::kMicroSecondsPerSecond); 48f4e8e3d3SShuo Chen ts.tv_nsec = static_cast<long>( 49f4e8e3d3SShuo Chen (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); 50f4e8e3d3SShuo Chen return ts; 51f4e8e3d3SShuo Chen } 52f4e8e3d3SShuo Chen 53f4e8e3d3SShuo Chen void readTimerfd(int timerfd, Timestamp now) 54f4e8e3d3SShuo Chen { 55f4e8e3d3SShuo Chen uint64_t howmany; 56f4e8e3d3SShuo Chen ssize_t n = ::read(timerfd, &howmany, sizeof howmany); 57f4e8e3d3SShuo Chen LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); 58f4e8e3d3SShuo Chen if (n != sizeof howmany) 59f4e8e3d3SShuo Chen { 60f4e8e3d3SShuo Chen LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; 61f4e8e3d3SShuo Chen } 62f4e8e3d3SShuo Chen } 63f4e8e3d3SShuo Chen 64f4e8e3d3SShuo Chen void resetTimerfd(int timerfd, Timestamp expiration) 65f4e8e3d3SShuo Chen { 66f4e8e3d3SShuo Chen // wake up loop by timerfd_settime() 67f4e8e3d3SShuo Chen struct itimerspec newValue; 68f4e8e3d3SShuo Chen struct itimerspec oldValue; 69f4e8e3d3SShuo Chen bzero(&newValue, sizeof newValue); 70f4e8e3d3SShuo Chen bzero(&oldValue, sizeof oldValue); 71f4e8e3d3SShuo Chen newValue.it_value = howMuchTimeFromNow(expiration); 72f4e8e3d3SShuo Chen int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); 73f4e8e3d3SShuo Chen if (ret) 74f4e8e3d3SShuo Chen { 75f4e8e3d3SShuo Chen LOG_SYSERR << "timerfd_settime()"; 76f4e8e3d3SShuo Chen } 77f4e8e3d3SShuo Chen } 78f4e8e3d3SShuo Chen 79f4e8e3d3SShuo Chen } 80f4e8e3d3SShuo Chen } 81f4e8e3d3SShuo Chen 82f4e8e3d3SShuo Chen using namespace muduo; 83f4e8e3d3SShuo Chen using namespace muduo::detail; 84f4e8e3d3SShuo Chen 85f4e8e3d3SShuo Chen TimerQueue::TimerQueue(EventLoop* loop) 86f4e8e3d3SShuo Chen : loop_(loop), 87f4e8e3d3SShuo Chen timerfd_(createTimerfd()), 88f4e8e3d3SShuo Chen timerfdChannel_(loop, timerfd_), 89f4e8e3d3SShuo Chen- timers_() 90f4e8e3d3SShuo Chen+ timers_(), 91f4e8e3d3SShuo Chen+ callingExpiredTimers_(false) 92f4e8e3d3SShuo Chen { 93f4e8e3d3SShuo Chen timerfdChannel_.setReadCallback( 94f4e8e3d3SShuo Chen boost::bind(&TimerQueue::handleRead, this)); 95f4e8e3d3SShuo Chen // we are always reading the timerfd, we disarm it with timerfd_settime. 96f4e8e3d3SShuo Chen timerfdChannel_.enableReading(); 97f4e8e3d3SShuo Chen } 98f4e8e3d3SShuo Chen 99f4e8e3d3SShuo Chen TimerQueue::~TimerQueue() 100f4e8e3d3SShuo Chen { 101f4e8e3d3SShuo Chen ::close(timerfd_); 102f4e8e3d3SShuo Chen // do not remove channel, since we're in EventLoop::dtor(); 103f4e8e3d3SShuo Chen for (TimerList::iterator it = timers_.begin(); 104f4e8e3d3SShuo Chen it != timers_.end(); ++it) 105f4e8e3d3SShuo Chen { 106f4e8e3d3SShuo Chen delete it->second; 107f4e8e3d3SShuo Chen } 108f4e8e3d3SShuo Chen } 109f4e8e3d3SShuo Chen 110f4e8e3d3SShuo Chen TimerId TimerQueue::addTimer(const TimerCallback& cb, 111f4e8e3d3SShuo Chen Timestamp when, 112f4e8e3d3SShuo Chen double interval) 113f4e8e3d3SShuo Chen { 114f4e8e3d3SShuo Chen Timer* timer = new Timer(cb, when, interval); 115f4e8e3d3SShuo Chen loop_->runInLoop( 116f4e8e3d3SShuo Chen boost::bind(&TimerQueue::addTimerInLoop, this, timer)); 117f4e8e3d3SShuo Chen! return TimerId(timer, timer->sequence()); 118f4e8e3d3SShuo Chen } 119f4e8e3d3SShuo Chen 120f4e8e3d3SShuo Chen+void TimerQueue::cancel(TimerId timerId) 121f4e8e3d3SShuo Chen+{ 122f4e8e3d3SShuo Chen+ loop_->runInLoop( 123f4e8e3d3SShuo Chen+ boost::bind(&TimerQueue::cancelInLoop, this, timerId)); 124f4e8e3d3SShuo Chen+} 125f4e8e3d3SShuo Chen+ 126f4e8e3d3SShuo Chen void TimerQueue::addTimerInLoop(Timer* timer) 127f4e8e3d3SShuo Chen { 128f4e8e3d3SShuo Chen loop_->assertInLoopThread(); 129f4e8e3d3SShuo Chen bool earliestChanged = insert(timer); 130f4e8e3d3SShuo Chen 131f4e8e3d3SShuo Chen if (earliestChanged) 132f4e8e3d3SShuo Chen { 133f4e8e3d3SShuo Chen resetTimerfd(timerfd_, timer->expiration()); 134f4e8e3d3SShuo Chen } 135f4e8e3d3SShuo Chen } 136f4e8e3d3SShuo Chen 137f4e8e3d3SShuo Chen+void TimerQueue::cancelInLoop(TimerId timerId) 138f4e8e3d3SShuo Chen+{ 139f4e8e3d3SShuo Chen+ loop_->assertInLoopThread(); 140f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 141f4e8e3d3SShuo Chen+ ActiveTimer timer(timerId.timer_, timerId.seq_); 142f4e8e3d3SShuo Chen+ ActiveTimerSet::iterator it = activeTimers_.find(timer); 143f4e8e3d3SShuo Chen+ if (it != activeTimers_.end()) 144f4e8e3d3SShuo Chen+ { 145f4e8e3d3SShuo Chen+ size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); 146f4e8e3d3SShuo Chen+ assert(n == 1); (void)n; 147f4e8e3d3SShuo Chen+ delete it->first; // FIXME: no delete please 148f4e8e3d3SShuo Chen+ activeTimers_.erase(it); 149f4e8e3d3SShuo Chen+ } 150f4e8e3d3SShuo Chen+ else if (callingExpiredTimers_) 151f4e8e3d3SShuo Chen+ { 152f4e8e3d3SShuo Chen+ cancelingTimers_.insert(timer); 153f4e8e3d3SShuo Chen+ } 154f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 155f4e8e3d3SShuo Chen+} 156f4e8e3d3SShuo Chen+ 157f4e8e3d3SShuo Chen void TimerQueue::handleRead() 158f4e8e3d3SShuo Chen { 159f4e8e3d3SShuo Chen loop_->assertInLoopThread(); 160f4e8e3d3SShuo Chen Timestamp now(Timestamp::now()); 161f4e8e3d3SShuo Chen readTimerfd(timerfd_, now); 162f4e8e3d3SShuo Chen 163f4e8e3d3SShuo Chen std::vector<Entry> expired = getExpired(now); 164f4e8e3d3SShuo Chen 165f4e8e3d3SShuo Chen+ callingExpiredTimers_ = true; 166f4e8e3d3SShuo Chen+ cancelingTimers_.clear(); 167f4e8e3d3SShuo Chen // safe to callback outside critical section 168f4e8e3d3SShuo Chen for (std::vector<Entry>::iterator it = expired.begin(); 169f4e8e3d3SShuo Chen it != expired.end(); ++it) 170f4e8e3d3SShuo Chen { 171f4e8e3d3SShuo Chen it->second->run(); 172f4e8e3d3SShuo Chen } 173f4e8e3d3SShuo Chen+ callingExpiredTimers_ = false; 174f4e8e3d3SShuo Chen 175f4e8e3d3SShuo Chen reset(expired, now); 176f4e8e3d3SShuo Chen } 177f4e8e3d3SShuo Chen 178f4e8e3d3SShuo Chen std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) 179f4e8e3d3SShuo Chen { 180f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 181f4e8e3d3SShuo Chen std::vector<Entry> expired; 182f4e8e3d3SShuo Chen Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 183f4e8e3d3SShuo Chen TimerList::iterator it = timers_.lower_bound(sentry); 184f4e8e3d3SShuo Chen assert(it == timers_.end() || now < it->first); 185f4e8e3d3SShuo Chen std::copy(timers_.begin(), it, back_inserter(expired)); 186f4e8e3d3SShuo Chen timers_.erase(timers_.begin(), it); 187f4e8e3d3SShuo Chen 188f4e8e3d3SShuo Chen+ BOOST_FOREACH(Entry entry, expired) 189f4e8e3d3SShuo Chen+ { 190f4e8e3d3SShuo Chen+ ActiveTimer timer(entry.second, entry.second->sequence()); 191f4e8e3d3SShuo Chen+ size_t n = activeTimers_.erase(timer); 192f4e8e3d3SShuo Chen+ assert(n == 1); (void)n; 193f4e8e3d3SShuo Chen+ } 194f4e8e3d3SShuo Chen+ 195f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 196f4e8e3d3SShuo Chen return expired; 197f4e8e3d3SShuo Chen } 198f4e8e3d3SShuo Chen 199f4e8e3d3SShuo Chen void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) 200f4e8e3d3SShuo Chen { 201f4e8e3d3SShuo Chen Timestamp nextExpire; 202f4e8e3d3SShuo Chen 203f4e8e3d3SShuo Chen for (std::vector<Entry>::const_iterator it = expired.begin(); 204f4e8e3d3SShuo Chen it != expired.end(); ++it) 205f4e8e3d3SShuo Chen { 206f4e8e3d3SShuo Chen+ ActiveTimer timer(it->second, it->second->sequence()); 207f4e8e3d3SShuo Chen! if (it->second->repeat() 208f4e8e3d3SShuo Chen+ && cancelingTimers_.find(timer) == cancelingTimers_.end()) 209f4e8e3d3SShuo Chen { 210f4e8e3d3SShuo Chen it->second->restart(now); 211f4e8e3d3SShuo Chen insert(it->second); 212f4e8e3d3SShuo Chen } 213f4e8e3d3SShuo Chen else 214f4e8e3d3SShuo Chen { 215f4e8e3d3SShuo Chen // FIXME move to a free list 216f4e8e3d3SShuo Chen delete it->second; 217f4e8e3d3SShuo Chen } 218f4e8e3d3SShuo Chen } 219f4e8e3d3SShuo Chen 220f4e8e3d3SShuo Chen if (!timers_.empty()) 221f4e8e3d3SShuo Chen { 222f4e8e3d3SShuo Chen nextExpire = timers_.begin()->second->expiration(); 223f4e8e3d3SShuo Chen } 224f4e8e3d3SShuo Chen 225f4e8e3d3SShuo Chen if (nextExpire.valid()) 226f4e8e3d3SShuo Chen { 227f4e8e3d3SShuo Chen resetTimerfd(timerfd_, nextExpire); 228f4e8e3d3SShuo Chen } 229f4e8e3d3SShuo Chen } 230f4e8e3d3SShuo Chen 231f4e8e3d3SShuo Chen bool TimerQueue::insert(Timer* timer) 232f4e8e3d3SShuo Chen { 233f4e8e3d3SShuo Chen+ loop_->assertInLoopThread(); 234f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 235f4e8e3d3SShuo Chen bool earliestChanged = false; 236f4e8e3d3SShuo Chen Timestamp when = timer->expiration(); 237f4e8e3d3SShuo Chen TimerList::iterator it = timers_.begin(); 238f4e8e3d3SShuo Chen if (it == timers_.end() || when < it->first) 239f4e8e3d3SShuo Chen { 240f4e8e3d3SShuo Chen earliestChanged = true; 241f4e8e3d3SShuo Chen } 242f4e8e3d3SShuo Chen+ 243f4e8e3d3SShuo Chen+ { 244f4e8e3d3SShuo Chen std::pair<TimerList::iterator, bool> result 245f4e8e3d3SShuo Chen = timers_.insert(Entry(when, timer)); 246f4e8e3d3SShuo Chen assert(result.second); (void)result; 247f4e8e3d3SShuo Chen+ } 248f4e8e3d3SShuo Chen+ { 249f4e8e3d3SShuo Chen+ std::pair<ActiveTimerSet::iterator, bool> result 250f4e8e3d3SShuo Chen+ = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); 251f4e8e3d3SShuo Chen+ assert(result.second); (void)result; 252f4e8e3d3SShuo Chen+ } 253f4e8e3d3SShuo Chen+ 254f4e8e3d3SShuo Chen+ assert(timers_.size() == activeTimers_.size()); 255f4e8e3d3SShuo Chen return earliestChanged; 256f4e8e3d3SShuo Chen } 257f4e8e3d3SShuo Chen 258