TimerQueue.cc revision 354280cf
1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2354280cfSShuo Chen// 3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license 4354280cfSShuo Chen// that can be found in the License file. 5354280cfSShuo Chen// 6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7354280cfSShuo Chen 8354280cfSShuo Chen#define __STDC_LIMIT_MACROS 9354280cfSShuo Chen#include "TimerQueue.h" 10354280cfSShuo Chen 11354280cfSShuo Chen#include "logging/Logging.h" 12354280cfSShuo Chen#include "EventLoop.h" 13354280cfSShuo Chen#include "Timer.h" 14354280cfSShuo Chen#include "TimerId.h" 15354280cfSShuo Chen 16354280cfSShuo Chen#include <boost/bind.hpp> 17354280cfSShuo Chen#include <boost/foreach.hpp> 18354280cfSShuo Chen 19354280cfSShuo Chen#include <sys/timerfd.h> 20354280cfSShuo Chen 21354280cfSShuo Chennamespace muduo 22354280cfSShuo Chen{ 23354280cfSShuo Chennamespace detail 24354280cfSShuo Chen{ 25354280cfSShuo Chen 26354280cfSShuo Chenint createTimerfd() 27354280cfSShuo Chen{ 28354280cfSShuo Chen int timerfd = ::timerfd_create(CLOCK_MONOTONIC, 29354280cfSShuo Chen TFD_NONBLOCK | TFD_CLOEXEC); 30354280cfSShuo Chen if (timerfd < 0) 31354280cfSShuo Chen { 32354280cfSShuo Chen LOG_SYSFATAL << "Failed in timerfd_create"; 33354280cfSShuo Chen } 34354280cfSShuo Chen return timerfd; 35354280cfSShuo Chen} 36354280cfSShuo Chen 37354280cfSShuo Chenstruct timespec howMuchTimeFromNow(Timestamp when) 38354280cfSShuo Chen{ 39354280cfSShuo Chen int64_t microseconds = when.microSecondsSinceEpoch() 40354280cfSShuo Chen - Timestamp::now().microSecondsSinceEpoch(); 41354280cfSShuo Chen if (microseconds < 100) 42354280cfSShuo Chen { 43354280cfSShuo Chen microseconds = 100; 44354280cfSShuo Chen } 45354280cfSShuo Chen struct timespec ts; 46354280cfSShuo Chen ts.tv_sec = static_cast<time_t>( 47354280cfSShuo Chen microseconds / Timestamp::kMicroSecondsPerSecond); 48354280cfSShuo Chen ts.tv_nsec = static_cast<long>( 49354280cfSShuo Chen (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); 50354280cfSShuo Chen return ts; 51354280cfSShuo Chen} 52354280cfSShuo Chen 53354280cfSShuo Chenvoid readTimerfd(int timerfd, Timestamp now) 54354280cfSShuo Chen{ 55354280cfSShuo Chen uint64_t howmany; 56354280cfSShuo Chen ssize_t n = ::read(timerfd, &howmany, sizeof howmany); 57354280cfSShuo Chen LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); 58354280cfSShuo Chen if (n != sizeof howmany) 59354280cfSShuo Chen { 60354280cfSShuo Chen LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; 61354280cfSShuo Chen } 62354280cfSShuo Chen} 63354280cfSShuo Chen 64354280cfSShuo Chenvoid resetTimerfd(int timerfd, Timestamp expiration) 65354280cfSShuo Chen{ 66354280cfSShuo Chen // wake up loop by timerfd_settime() 67354280cfSShuo Chen struct itimerspec newValue; 68354280cfSShuo Chen struct itimerspec oldValue; 69354280cfSShuo Chen bzero(&newValue, sizeof newValue); 70354280cfSShuo Chen bzero(&oldValue, sizeof oldValue); 71354280cfSShuo Chen newValue.it_value = howMuchTimeFromNow(expiration); 72354280cfSShuo Chen int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); 73354280cfSShuo Chen if (ret) 74354280cfSShuo Chen { 75354280cfSShuo Chen LOG_SYSERR << "timerfd_settime()"; 76354280cfSShuo Chen } 77354280cfSShuo Chen} 78354280cfSShuo Chen 79354280cfSShuo Chen} 80354280cfSShuo Chen} 81354280cfSShuo Chen 82354280cfSShuo Chenusing namespace muduo; 83354280cfSShuo Chenusing namespace muduo::detail; 84354280cfSShuo Chen 85354280cfSShuo ChenTimerQueue::TimerQueue(EventLoop* loop) 86354280cfSShuo Chen : loop_(loop), 87354280cfSShuo Chen timerfd_(createTimerfd()), 88354280cfSShuo Chen timerfdChannel_(loop, timerfd_), 89354280cfSShuo Chen timers_(), 90354280cfSShuo Chen callingExpiredTimers_(false) 91354280cfSShuo Chen{ 92354280cfSShuo Chen timerfdChannel_.setReadCallback( 93354280cfSShuo Chen boost::bind(&TimerQueue::handleRead, this)); 94354280cfSShuo Chen // we are always reading the timerfd, we disarm it with timerfd_settime. 95354280cfSShuo Chen timerfdChannel_.enableReading(); 96354280cfSShuo Chen} 97354280cfSShuo Chen 98354280cfSShuo ChenTimerQueue::~TimerQueue() 99354280cfSShuo Chen{ 100354280cfSShuo Chen ::close(timerfd_); 101354280cfSShuo Chen // do not remove channel, since we're in EventLoop::dtor(); 102354280cfSShuo Chen for (TimerList::iterator it = timers_.begin(); 103354280cfSShuo Chen it != timers_.end(); ++it) 104354280cfSShuo Chen { 105354280cfSShuo Chen delete it->second; 106354280cfSShuo Chen } 107354280cfSShuo Chen} 108354280cfSShuo Chen 109354280cfSShuo ChenTimerId TimerQueue::addTimer(const TimerCallback& cb, 110354280cfSShuo Chen Timestamp when, 111354280cfSShuo Chen double interval) 112354280cfSShuo Chen{ 113354280cfSShuo Chen Timer* timer = new Timer(cb, when, interval); 114354280cfSShuo Chen loop_->runInLoop( 115354280cfSShuo Chen boost::bind(&TimerQueue::addTimerInLoop, this, timer)); 116354280cfSShuo Chen return TimerId(timer, timer->sequence()); 117354280cfSShuo Chen} 118354280cfSShuo Chen 119354280cfSShuo Chenvoid TimerQueue::cancel(TimerId timerId) 120354280cfSShuo Chen{ 121354280cfSShuo Chen loop_->runInLoop( 122354280cfSShuo Chen boost::bind(&TimerQueue::cancelInLoop, this, timerId)); 123354280cfSShuo Chen} 124354280cfSShuo Chen 125354280cfSShuo Chenvoid TimerQueue::addTimerInLoop(Timer* timer) 126354280cfSShuo Chen{ 127354280cfSShuo Chen loop_->assertInLoopThread(); 128354280cfSShuo Chen bool earliestChanged = insert(timer); 129354280cfSShuo Chen 130354280cfSShuo Chen if (earliestChanged) 131354280cfSShuo Chen { 132354280cfSShuo Chen resetTimerfd(timerfd_, timer->expiration()); 133354280cfSShuo Chen } 134354280cfSShuo Chen} 135354280cfSShuo Chen 136354280cfSShuo Chenvoid TimerQueue::cancelInLoop(TimerId timerId) 137354280cfSShuo Chen{ 138354280cfSShuo Chen loop_->assertInLoopThread(); 139354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 140354280cfSShuo Chen ActiveTimer timer(timerId.timer_, timerId.sequence_); 141354280cfSShuo Chen ActiveTimerSet::iterator it = activeTimers_.find(timer); 142354280cfSShuo Chen if (it != activeTimers_.end()) 143354280cfSShuo Chen { 144354280cfSShuo Chen size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); 145354280cfSShuo Chen assert(n == 1); (void)n; 146354280cfSShuo Chen delete it->first; // FIXME: no delete please 147354280cfSShuo Chen activeTimers_.erase(it); 148354280cfSShuo Chen } 149354280cfSShuo Chen else if (callingExpiredTimers_) 150354280cfSShuo Chen { 151354280cfSShuo Chen cancelingTimers_.insert(timer); 152354280cfSShuo Chen } 153354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 154354280cfSShuo Chen} 155354280cfSShuo Chen 156354280cfSShuo Chenvoid TimerQueue::handleRead() 157354280cfSShuo Chen{ 158354280cfSShuo Chen loop_->assertInLoopThread(); 159354280cfSShuo Chen Timestamp now(Timestamp::now()); 160354280cfSShuo Chen readTimerfd(timerfd_, now); 161354280cfSShuo Chen 162354280cfSShuo Chen std::vector<Entry> expired = getExpired(now); 163354280cfSShuo Chen 164354280cfSShuo Chen callingExpiredTimers_ = true; 165354280cfSShuo Chen cancelingTimers_.clear(); 166354280cfSShuo Chen // safe to callback outside critical section 167354280cfSShuo Chen for (std::vector<Entry>::iterator it = expired.begin(); 168354280cfSShuo Chen it != expired.end(); ++it) 169354280cfSShuo Chen { 170354280cfSShuo Chen it->second->run(); 171354280cfSShuo Chen } 172354280cfSShuo Chen callingExpiredTimers_ = false; 173354280cfSShuo Chen 174354280cfSShuo Chen reset(expired, now); 175354280cfSShuo Chen} 176354280cfSShuo Chen 177354280cfSShuo Chenstd::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) 178354280cfSShuo Chen{ 179354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 180354280cfSShuo Chen std::vector<Entry> expired; 181354280cfSShuo Chen Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 182354280cfSShuo Chen TimerList::iterator it = timers_.lower_bound(sentry); 183354280cfSShuo Chen assert(it == timers_.end() || now < it->first); 184354280cfSShuo Chen std::copy(timers_.begin(), it, back_inserter(expired)); 185354280cfSShuo Chen timers_.erase(timers_.begin(), it); 186354280cfSShuo Chen 187354280cfSShuo Chen BOOST_FOREACH(Entry entry, expired) 188354280cfSShuo Chen { 189354280cfSShuo Chen ActiveTimer timer(entry.second, entry.second->sequence()); 190354280cfSShuo Chen size_t n = activeTimers_.erase(timer); 191354280cfSShuo Chen assert(n == 1); (void)n; 192354280cfSShuo Chen } 193354280cfSShuo Chen 194354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 195354280cfSShuo Chen return expired; 196354280cfSShuo Chen} 197354280cfSShuo Chen 198354280cfSShuo Chenvoid TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) 199354280cfSShuo Chen{ 200354280cfSShuo Chen Timestamp nextExpire; 201354280cfSShuo Chen 202354280cfSShuo Chen for (std::vector<Entry>::const_iterator it = expired.begin(); 203354280cfSShuo Chen it != expired.end(); ++it) 204354280cfSShuo Chen { 205354280cfSShuo Chen ActiveTimer timer(it->second, it->second->sequence()); 206354280cfSShuo Chen if (it->second->repeat() 207354280cfSShuo Chen && cancelingTimers_.find(timer) == cancelingTimers_.end()) 208354280cfSShuo Chen { 209354280cfSShuo Chen it->second->restart(now); 210354280cfSShuo Chen insert(it->second); 211354280cfSShuo Chen } 212354280cfSShuo Chen else 213354280cfSShuo Chen { 214354280cfSShuo Chen // FIXME move to a free list 215354280cfSShuo Chen delete it->second; 216354280cfSShuo Chen } 217354280cfSShuo Chen } 218354280cfSShuo Chen 219354280cfSShuo Chen if (!timers_.empty()) 220354280cfSShuo Chen { 221354280cfSShuo Chen nextExpire = timers_.begin()->second->expiration(); 222354280cfSShuo Chen } 223354280cfSShuo Chen 224354280cfSShuo Chen if (nextExpire.valid()) 225354280cfSShuo Chen { 226354280cfSShuo Chen resetTimerfd(timerfd_, nextExpire); 227354280cfSShuo Chen } 228354280cfSShuo Chen} 229354280cfSShuo Chen 230354280cfSShuo Chenbool TimerQueue::insert(Timer* timer) 231354280cfSShuo Chen{ 232354280cfSShuo Chen loop_->assertInLoopThread(); 233354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 234354280cfSShuo Chen bool earliestChanged = false; 235354280cfSShuo Chen Timestamp when = timer->expiration(); 236354280cfSShuo Chen TimerList::iterator it = timers_.begin(); 237354280cfSShuo Chen if (it == timers_.end() || when < it->first) 238354280cfSShuo Chen { 239354280cfSShuo Chen earliestChanged = true; 240354280cfSShuo Chen } 241354280cfSShuo Chen 242354280cfSShuo Chen { 243354280cfSShuo Chen std::pair<TimerList::iterator, bool> result 244354280cfSShuo Chen = timers_.insert(Entry(when, timer)); 245354280cfSShuo Chen assert(result.second); (void)result; 246354280cfSShuo Chen } 247354280cfSShuo Chen { 248354280cfSShuo Chen std::pair<ActiveTimerSet::iterator, bool> result 249354280cfSShuo Chen = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); 250354280cfSShuo Chen assert(result.second); (void)result; 251354280cfSShuo Chen } 252354280cfSShuo Chen 253354280cfSShuo Chen assert(timers_.size() == activeTimers_.size()); 254354280cfSShuo Chen return earliestChanged; 255354280cfSShuo Chen} 256354280cfSShuo Chen 257