EventLoop.cc revision 17c057c3
1bfe73648SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2bfe73648SShuo Chen// 3bfe73648SShuo Chen// Use of this source code is governed by a BSD-style license 4bfe73648SShuo Chen// that can be found in the License file. 5bfe73648SShuo Chen// 6bfe73648SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7bfe73648SShuo Chen 8bfe73648SShuo Chen#include "EventLoop.h" 9bfe73648SShuo Chen 10bfe73648SShuo Chen#include "Channel.h" 11bfe73648SShuo Chen#include "Poller.h" 12bfe73648SShuo Chen#include "TimerQueue.h" 13bfe73648SShuo Chen 14bfe73648SShuo Chen#include "logging/Logging.h" 15bfe73648SShuo Chen 16bfe73648SShuo Chen#include <boost/bind.hpp> 17bfe73648SShuo Chen 18bfe73648SShuo Chen#include <assert.h> 19bfe73648SShuo Chen#include <sys/eventfd.h> 20bfe73648SShuo Chen 21bfe73648SShuo Chenusing namespace muduo; 22bfe73648SShuo Chen 23bfe73648SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 24bfe73648SShuo Chenconst int kPollTimeMs = 10000; 25bfe73648SShuo Chen 26bfe73648SShuo Chenstatic int createEventfd() 27bfe73648SShuo Chen{ 28bfe73648SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29bfe73648SShuo Chen if (evtfd < 0) 30bfe73648SShuo Chen { 31bfe73648SShuo Chen LOG_SYSERR << "Failed in eventfd"; 32bfe73648SShuo Chen abort(); 33bfe73648SShuo Chen } 34bfe73648SShuo Chen return evtfd; 35bfe73648SShuo Chen} 36bfe73648SShuo Chen 37bfe73648SShuo ChenEventLoop::EventLoop() 38bfe73648SShuo Chen : looping_(false), 39bfe73648SShuo Chen quit_(false), 40bfe73648SShuo Chen callingPendingFunctors_(false), 41bfe73648SShuo Chen threadId_(CurrentThread::tid()), 42bfe73648SShuo Chen poller_(new Poller(this)), 43bfe73648SShuo Chen timerQueue_(new TimerQueue(this)), 44bfe73648SShuo Chen wakeupFd_(createEventfd()), 45bfe73648SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 46bfe73648SShuo Chen{ 47bfe73648SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48bfe73648SShuo Chen if (t_loopInThisThread) 49bfe73648SShuo Chen { 50bfe73648SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51bfe73648SShuo Chen << " exists in this thread " << threadId_; 52bfe73648SShuo Chen } 53bfe73648SShuo Chen else 54bfe73648SShuo Chen { 55bfe73648SShuo Chen t_loopInThisThread = this; 56bfe73648SShuo Chen } 57bfe73648SShuo Chen wakeupChannel_->setReadCallback( 58bfe73648SShuo Chen boost::bind(&EventLoop::handleRead, this)); 59bfe73648SShuo Chen // we are always reading the wakeupfd 60bfe73648SShuo Chen wakeupChannel_->enableReading(); 61bfe73648SShuo Chen} 62bfe73648SShuo Chen 63bfe73648SShuo ChenEventLoop::~EventLoop() 64bfe73648SShuo Chen{ 65bfe73648SShuo Chen assert(!looping_); 66bfe73648SShuo Chen ::close(wakeupFd_); 67bfe73648SShuo Chen t_loopInThisThread = NULL; 68bfe73648SShuo Chen} 69bfe73648SShuo Chen 70bfe73648SShuo Chenvoid EventLoop::loop() 71bfe73648SShuo Chen{ 72bfe73648SShuo Chen assert(!looping_); 73bfe73648SShuo Chen assertInLoopThread(); 74bfe73648SShuo Chen looping_ = true; 75bfe73648SShuo Chen quit_ = false; 76bfe73648SShuo Chen 77bfe73648SShuo Chen while (!quit_) 78bfe73648SShuo Chen { 79bfe73648SShuo Chen activeChannels_.clear(); 80bfe73648SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81bfe73648SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 82bfe73648SShuo Chen it != activeChannels_.end(); ++it) 83bfe73648SShuo Chen { 84bfe73648SShuo Chen (*it)->handleEvent(); 85bfe73648SShuo Chen } 86bfe73648SShuo Chen doPendingFunctors(); 87bfe73648SShuo Chen } 88bfe73648SShuo Chen 89bfe73648SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 90bfe73648SShuo Chen looping_ = false; 91bfe73648SShuo Chen} 92bfe73648SShuo Chen 93bfe73648SShuo Chenvoid EventLoop::quit() 94bfe73648SShuo Chen{ 95bfe73648SShuo Chen quit_ = true; 96bfe73648SShuo Chen if (!isInLoopThread()) 97bfe73648SShuo Chen { 98bfe73648SShuo Chen wakeup(); 99bfe73648SShuo Chen } 100bfe73648SShuo Chen} 101bfe73648SShuo Chen 102bfe73648SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 103bfe73648SShuo Chen{ 104bfe73648SShuo Chen if (isInLoopThread()) 105bfe73648SShuo Chen { 106bfe73648SShuo Chen cb(); 107bfe73648SShuo Chen } 108bfe73648SShuo Chen else 109bfe73648SShuo Chen { 110bfe73648SShuo Chen queueInLoop(cb); 111bfe73648SShuo Chen wakeup(); 112bfe73648SShuo Chen } 113bfe73648SShuo Chen} 114bfe73648SShuo Chen 115bfe73648SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 116bfe73648SShuo Chen{ 117bfe73648SShuo Chen { 118bfe73648SShuo Chen MutexLockGuard lock(mutex_); 119bfe73648SShuo Chen pendingFunctors_.push_back(cb); 120bfe73648SShuo Chen } 121bfe73648SShuo Chen 122bfe73648SShuo Chen if (isInLoopThread() && callingPendingFunctors_) 123bfe73648SShuo Chen { 124bfe73648SShuo Chen wakeup(); 125bfe73648SShuo Chen } 126bfe73648SShuo Chen} 127bfe73648SShuo Chen 128bfe73648SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 129bfe73648SShuo Chen{ 130bfe73648SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 131bfe73648SShuo Chen} 132bfe73648SShuo Chen 133bfe73648SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 134bfe73648SShuo Chen{ 135bfe73648SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 136bfe73648SShuo Chen return runAt(time, cb); 137bfe73648SShuo Chen} 138bfe73648SShuo Chen 139bfe73648SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 140bfe73648SShuo Chen{ 141bfe73648SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 142bfe73648SShuo Chen return timerQueue_->addTimer(cb, time, interval); 143bfe73648SShuo Chen} 144bfe73648SShuo Chen 145bfe73648SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 146bfe73648SShuo Chen{ 147bfe73648SShuo Chen assert(channel->ownerLoop() == this); 148bfe73648SShuo Chen assertInLoopThread(); 149bfe73648SShuo Chen poller_->updateChannel(channel); 150bfe73648SShuo Chen} 151bfe73648SShuo Chen 15217c057c3SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 15317c057c3SShuo Chen{ 15417c057c3SShuo Chen assert(channel->ownerLoop() == this); 15517c057c3SShuo Chen assertInLoopThread(); 15617c057c3SShuo Chen poller_->removeChannel(channel); 15717c057c3SShuo Chen} 15817c057c3SShuo Chen 159bfe73648SShuo Chenvoid EventLoop::abortNotInLoopThread() 160bfe73648SShuo Chen{ 161bfe73648SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 162bfe73648SShuo Chen << " was created in threadId_ = " << threadId_ 163bfe73648SShuo Chen << ", current thread id = " << CurrentThread::tid(); 164bfe73648SShuo Chen} 165bfe73648SShuo Chen 166bfe73648SShuo Chenvoid EventLoop::wakeup() 167bfe73648SShuo Chen{ 168bfe73648SShuo Chen uint64_t one = 1; 169bfe73648SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 170bfe73648SShuo Chen if (n != sizeof one) 171bfe73648SShuo Chen { 172bfe73648SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 173bfe73648SShuo Chen } 174bfe73648SShuo Chen} 175bfe73648SShuo Chen 176bfe73648SShuo Chenvoid EventLoop::handleRead() 177bfe73648SShuo Chen{ 178bfe73648SShuo Chen uint64_t one = 1; 179bfe73648SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 180bfe73648SShuo Chen if (n != sizeof one) 181bfe73648SShuo Chen { 182bfe73648SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 183bfe73648SShuo Chen } 184bfe73648SShuo Chen} 185bfe73648SShuo Chen 186bfe73648SShuo Chenvoid EventLoop::doPendingFunctors() 187bfe73648SShuo Chen{ 188bfe73648SShuo Chen std::vector<Functor> functors; 189bfe73648SShuo Chen callingPendingFunctors_ = true; 190bfe73648SShuo Chen 191bfe73648SShuo Chen { 192bfe73648SShuo Chen MutexLockGuard lock(mutex_); 193bfe73648SShuo Chen functors.swap(pendingFunctors_); 194bfe73648SShuo Chen } 195bfe73648SShuo Chen 196bfe73648SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 197bfe73648SShuo Chen { 198bfe73648SShuo Chen functors[i](); 199bfe73648SShuo Chen } 200bfe73648SShuo Chen callingPendingFunctors_ = false; 201bfe73648SShuo Chen} 202bfe73648SShuo Chen 203