TimerQueue.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2a1bde736SShuo Chen// 3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license 4a1bde736SShuo Chen// that can be found in the License file. 5a1bde736SShuo Chen// 6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7a1bde736SShuo Chen 8a1bde736SShuo Chen#define __STDC_LIMIT_MACROS 9a1bde736SShuo Chen#include "TimerQueue.h" 10a1bde736SShuo Chen 11a1bde736SShuo Chen#include "logging/Logging.h" 12a1bde736SShuo Chen#include "EventLoop.h" 13a1bde736SShuo Chen#include "Timer.h" 14a1bde736SShuo Chen#include "TimerId.h" 15a1bde736SShuo Chen 16a1bde736SShuo Chen#include <boost/bind.hpp> 17a1bde736SShuo Chen#include <boost/foreach.hpp> 18a1bde736SShuo Chen 19a1bde736SShuo Chen#include <sys/timerfd.h> 20a1bde736SShuo Chen 21a1bde736SShuo Chennamespace muduo 22a1bde736SShuo Chen{ 23a1bde736SShuo Chennamespace detail 24a1bde736SShuo Chen{ 25a1bde736SShuo Chen 26a1bde736SShuo Chenint createTimerfd() 27a1bde736SShuo Chen{ 28a1bde736SShuo Chen int timerfd = ::timerfd_create(CLOCK_MONOTONIC, 29a1bde736SShuo Chen TFD_NONBLOCK | TFD_CLOEXEC); 30a1bde736SShuo Chen if (timerfd < 0) 31a1bde736SShuo Chen { 32a1bde736SShuo Chen LOG_SYSFATAL << "Failed in timerfd_create"; 33a1bde736SShuo Chen } 34a1bde736SShuo Chen return timerfd; 35a1bde736SShuo Chen} 36a1bde736SShuo Chen 37a1bde736SShuo Chenstruct timespec howMuchTimeFromNow(Timestamp when) 38a1bde736SShuo Chen{ 39a1bde736SShuo Chen int64_t microseconds = when.microSecondsSinceEpoch() 40a1bde736SShuo Chen - Timestamp::now().microSecondsSinceEpoch(); 41a1bde736SShuo Chen if (microseconds < 100) 42a1bde736SShuo Chen { 43a1bde736SShuo Chen microseconds = 100; 44a1bde736SShuo Chen } 45a1bde736SShuo Chen struct timespec ts; 46a1bde736SShuo Chen ts.tv_sec = static_cast<time_t>( 47a1bde736SShuo Chen microseconds / Timestamp::kMicroSecondsPerSecond); 48a1bde736SShuo Chen ts.tv_nsec = static_cast<long>( 49a1bde736SShuo Chen (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); 50a1bde736SShuo Chen return ts; 51a1bde736SShuo Chen} 52a1bde736SShuo Chen 53a1bde736SShuo Chenvoid readTimerfd(int timerfd, Timestamp now) 54a1bde736SShuo Chen{ 55a1bde736SShuo Chen uint64_t howmany; 56a1bde736SShuo Chen ssize_t n = ::read(timerfd, &howmany, sizeof howmany); 57a1bde736SShuo Chen LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); 58a1bde736SShuo Chen if (n != sizeof howmany) 59a1bde736SShuo Chen { 60a1bde736SShuo Chen LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; 61a1bde736SShuo Chen } 62a1bde736SShuo Chen} 63a1bde736SShuo Chen 64a1bde736SShuo Chenvoid resetTimerfd(int timerfd, Timestamp expiration) 65a1bde736SShuo Chen{ 66a1bde736SShuo Chen // wake up loop by timerfd_settime() 67a1bde736SShuo Chen struct itimerspec newValue; 68a1bde736SShuo Chen struct itimerspec oldValue; 69a1bde736SShuo Chen bzero(&newValue, sizeof newValue); 70a1bde736SShuo Chen bzero(&oldValue, sizeof oldValue); 71a1bde736SShuo Chen newValue.it_value = howMuchTimeFromNow(expiration); 72a1bde736SShuo Chen int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); 73a1bde736SShuo Chen if (ret) 74a1bde736SShuo Chen { 75a1bde736SShuo Chen LOG_SYSERR << "timerfd_settime()"; 76a1bde736SShuo Chen } 77a1bde736SShuo Chen} 78a1bde736SShuo Chen 79a1bde736SShuo Chen} 80a1bde736SShuo Chen} 81a1bde736SShuo Chen 82a1bde736SShuo Chenusing namespace muduo; 83a1bde736SShuo Chenusing namespace muduo::detail; 84a1bde736SShuo Chen 85a1bde736SShuo ChenTimerQueue::TimerQueue(EventLoop* loop) 86a1bde736SShuo Chen : loop_(loop), 87a1bde736SShuo Chen timerfd_(createTimerfd()), 88a1bde736SShuo Chen timerfdChannel_(loop, timerfd_), 89a1bde736SShuo Chen timers_(), 90a1bde736SShuo Chen callingExpiredTimers_(false) 91a1bde736SShuo Chen{ 92a1bde736SShuo Chen timerfdChannel_.setReadCallback( 93a1bde736SShuo Chen boost::bind(&TimerQueue::handleRead, this)); 94a1bde736SShuo Chen // we are always reading the timerfd, we disarm it with timerfd_settime. 95a1bde736SShuo Chen timerfdChannel_.enableReading(); 96a1bde736SShuo Chen} 97a1bde736SShuo Chen 98a1bde736SShuo ChenTimerQueue::~TimerQueue() 99a1bde736SShuo Chen{ 100a1bde736SShuo Chen ::close(timerfd_); 101a1bde736SShuo Chen // do not remove channel, since we're in EventLoop::dtor(); 102a1bde736SShuo Chen for (TimerList::iterator it = timers_.begin(); 103a1bde736SShuo Chen it != timers_.end(); ++it) 104a1bde736SShuo Chen { 105a1bde736SShuo Chen delete it->second; 106a1bde736SShuo Chen } 107a1bde736SShuo Chen} 108a1bde736SShuo Chen 109a1bde736SShuo ChenTimerId TimerQueue::addTimer(const TimerCallback& cb, 110a1bde736SShuo Chen Timestamp when, 111a1bde736SShuo Chen double interval) 112a1bde736SShuo Chen{ 113a1bde736SShuo Chen Timer* timer = new Timer(cb, when, interval); 114a1bde736SShuo Chen loop_->runInLoop( 115a1bde736SShuo Chen boost::bind(&TimerQueue::addTimerInLoop, this, timer)); 116a1bde736SShuo Chen return TimerId(timer, timer->sequence()); 117a1bde736SShuo Chen} 118a1bde736SShuo Chen 119a1bde736SShuo Chenvoid TimerQueue::cancel(TimerId timerId) 120a1bde736SShuo Chen{ 121a1bde736SShuo Chen loop_->runInLoop( 122a1bde736SShuo Chen boost::bind(&TimerQueue::cancelInLoop, this, timerId)); 123a1bde736SShuo Chen} 124a1bde736SShuo Chen 125a1bde736SShuo Chenvoid TimerQueue::addTimerInLoop(Timer* timer) 126a1bde736SShuo Chen{ 127a1bde736SShuo Chen loop_->assertInLoopThread(); 128a1bde736SShuo Chen bool earliestChanged = insert(timer); 129a1bde736SShuo Chen 130a1bde736SShuo Chen if (earliestChanged) 131a1bde736SShuo Chen { 132a1bde736SShuo Chen resetTimerfd(timerfd_, timer->expiration()); 133a1bde736SShuo Chen } 134a1bde736SShuo Chen} 135a1bde736SShuo Chen 136a1bde736SShuo Chenvoid TimerQueue::cancelInLoop(TimerId timerId) 137a1bde736SShuo Chen{ 138a1bde736SShuo Chen loop_->assertInLoopThread(); 139a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 140a1bde736SShuo Chen ActiveTimer timer(timerId.timer_, timerId.seq_); 141a1bde736SShuo Chen ActiveTimerSet::iterator it = activeTimers_.find(timer); 142a1bde736SShuo Chen if (it != activeTimers_.end()) 143a1bde736SShuo Chen { 144a1bde736SShuo Chen size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); 145a1bde736SShuo Chen assert(n == 1); (void)n; 146a1bde736SShuo Chen delete it->first; // FIXME: no delete please 147a1bde736SShuo Chen activeTimers_.erase(it); 148a1bde736SShuo Chen } 149a1bde736SShuo Chen else if (callingExpiredTimers_) 150a1bde736SShuo Chen { 151a1bde736SShuo Chen cancelingTimers_.insert(timer); 152a1bde736SShuo Chen } 153a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 154a1bde736SShuo Chen} 155a1bde736SShuo Chen 156a1bde736SShuo Chenvoid TimerQueue::handleRead() 157a1bde736SShuo Chen{ 158a1bde736SShuo Chen loop_->assertInLoopThread(); 159a1bde736SShuo Chen Timestamp now(Timestamp::now()); 160a1bde736SShuo Chen readTimerfd(timerfd_, now); 161a1bde736SShuo Chen 162a1bde736SShuo Chen std::vector<Entry> expired = getExpired(now); 163a1bde736SShuo Chen 164a1bde736SShuo Chen callingExpiredTimers_ = true; 165a1bde736SShuo Chen cancelingTimers_.clear(); 166a1bde736SShuo Chen // safe to callback outside critical section 167a1bde736SShuo Chen for (std::vector<Entry>::iterator it = expired.begin(); 168a1bde736SShuo Chen it != expired.end(); ++it) 169a1bde736SShuo Chen { 170a1bde736SShuo Chen it->second->run(); 171a1bde736SShuo Chen } 172a1bde736SShuo Chen callingExpiredTimers_ = false; 173a1bde736SShuo Chen 174a1bde736SShuo Chen reset(expired, now); 175a1bde736SShuo Chen} 176a1bde736SShuo Chen 177a1bde736SShuo Chenstd::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) 178a1bde736SShuo Chen{ 179a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 180a1bde736SShuo Chen std::vector<Entry> expired; 181a1bde736SShuo Chen Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 182a1bde736SShuo Chen TimerList::iterator it = timers_.lower_bound(sentry); 183a1bde736SShuo Chen assert(it == timers_.end() || now < it->first); 184a1bde736SShuo Chen std::copy(timers_.begin(), it, back_inserter(expired)); 185a1bde736SShuo Chen timers_.erase(timers_.begin(), it); 186a1bde736SShuo Chen 187a1bde736SShuo Chen BOOST_FOREACH(Entry entry, expired) 188a1bde736SShuo Chen { 189a1bde736SShuo Chen ActiveTimer timer(entry.second, entry.second->sequence()); 190a1bde736SShuo Chen size_t n = activeTimers_.erase(timer); 191a1bde736SShuo Chen assert(n == 1); (void)n; 192a1bde736SShuo Chen } 193a1bde736SShuo Chen 194a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 195a1bde736SShuo Chen return expired; 196a1bde736SShuo Chen} 197a1bde736SShuo Chen 198a1bde736SShuo Chenvoid TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) 199a1bde736SShuo Chen{ 200a1bde736SShuo Chen Timestamp nextExpire; 201a1bde736SShuo Chen 202a1bde736SShuo Chen for (std::vector<Entry>::const_iterator it = expired.begin(); 203a1bde736SShuo Chen it != expired.end(); ++it) 204a1bde736SShuo Chen { 205a1bde736SShuo Chen ActiveTimer timer(it->second, it->second->sequence()); 206a1bde736SShuo Chen if (it->second->repeat() 207a1bde736SShuo Chen && cancelingTimers_.find(timer) == cancelingTimers_.end()) 208a1bde736SShuo Chen { 209a1bde736SShuo Chen it->second->restart(now); 210a1bde736SShuo Chen insert(it->second); 211a1bde736SShuo Chen } 212a1bde736SShuo Chen else 213a1bde736SShuo Chen { 214a1bde736SShuo Chen // FIXME move to a free list 215a1bde736SShuo Chen delete it->second; 216a1bde736SShuo Chen } 217a1bde736SShuo Chen } 218a1bde736SShuo Chen 219a1bde736SShuo Chen if (!timers_.empty()) 220a1bde736SShuo Chen { 221a1bde736SShuo Chen nextExpire = timers_.begin()->second->expiration(); 222a1bde736SShuo Chen } 223a1bde736SShuo Chen 224a1bde736SShuo Chen if (nextExpire.valid()) 225a1bde736SShuo Chen { 226a1bde736SShuo Chen resetTimerfd(timerfd_, nextExpire); 227a1bde736SShuo Chen } 228a1bde736SShuo Chen} 229a1bde736SShuo Chen 230a1bde736SShuo Chenbool TimerQueue::insert(Timer* timer) 231a1bde736SShuo Chen{ 232a1bde736SShuo Chen loop_->assertInLoopThread(); 233a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 234a1bde736SShuo Chen bool earliestChanged = false; 235a1bde736SShuo Chen Timestamp when = timer->expiration(); 236a1bde736SShuo Chen TimerList::iterator it = timers_.begin(); 237a1bde736SShuo Chen if (it == timers_.end() || when < it->first) 238a1bde736SShuo Chen { 239a1bde736SShuo Chen earliestChanged = true; 240a1bde736SShuo Chen } 241a1bde736SShuo Chen 242a1bde736SShuo Chen { 243a1bde736SShuo Chen std::pair<TimerList::iterator, bool> result 244a1bde736SShuo Chen = timers_.insert(Entry(when, timer)); 245a1bde736SShuo Chen assert(result.second); (void)result; 246a1bde736SShuo Chen } 247a1bde736SShuo Chen { 248a1bde736SShuo Chen std::pair<ActiveTimerSet::iterator, bool> result 249a1bde736SShuo Chen = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); 250a1bde736SShuo Chen assert(result.second); (void)result; 251a1bde736SShuo Chen } 252a1bde736SShuo Chen 253a1bde736SShuo Chen assert(timers_.size() == activeTimers_.size()); 254a1bde736SShuo Chen return earliestChanged; 255a1bde736SShuo Chen} 256a1bde736SShuo Chen 257