EventLoop.cc revision 0f776063
165c497a3SShuo Chen// excerpts from http://code.google.com/p/muduo/ 265c497a3SShuo Chen// 365c497a3SShuo Chen// Use of this source code is governed by a BSD-style license 465c497a3SShuo Chen// that can be found in the License file. 565c497a3SShuo Chen// 665c497a3SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 765c497a3SShuo Chen 865c497a3SShuo Chen#include "EventLoop.h" 965c497a3SShuo Chen 1065c497a3SShuo Chen#include "Channel.h" 1165c497a3SShuo Chen#include "Poller.h" 1265c497a3SShuo Chen#include "TimerQueue.h" 1365c497a3SShuo Chen 1465c497a3SShuo Chen#include "logging/Logging.h" 1565c497a3SShuo Chen 1665c497a3SShuo Chen#include <boost/bind.hpp> 1765c497a3SShuo Chen 1865c497a3SShuo Chen#include <assert.h> 1965c497a3SShuo Chen#include <sys/eventfd.h> 2065c497a3SShuo Chen 2165c497a3SShuo Chenusing namespace muduo; 2265c497a3SShuo Chen 2365c497a3SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 2465c497a3SShuo Chenconst int kPollTimeMs = 10000; 2565c497a3SShuo Chen 2665c497a3SShuo Chenstatic int createEventfd() 2765c497a3SShuo Chen{ 2865c497a3SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 2965c497a3SShuo Chen if (evtfd < 0) 3065c497a3SShuo Chen { 3165c497a3SShuo Chen LOG_SYSERR << "Failed in eventfd"; 3265c497a3SShuo Chen abort(); 3365c497a3SShuo Chen } 3465c497a3SShuo Chen return evtfd; 3565c497a3SShuo Chen} 3665c497a3SShuo Chen 3765c497a3SShuo ChenEventLoop::EventLoop() 3865c497a3SShuo Chen : looping_(false), 3965c497a3SShuo Chen quit_(false), 4065c497a3SShuo Chen callingPendingFunctors_(false), 4165c497a3SShuo Chen threadId_(CurrentThread::tid()), 4265c497a3SShuo Chen poller_(new Poller(this)), 4365c497a3SShuo Chen timerQueue_(new TimerQueue(this)), 4465c497a3SShuo Chen wakeupFd_(createEventfd()), 4565c497a3SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 4665c497a3SShuo Chen{ 4765c497a3SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 4865c497a3SShuo Chen if (t_loopInThisThread) 4965c497a3SShuo Chen { 5065c497a3SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 5165c497a3SShuo Chen << " exists in this thread " << threadId_; 5265c497a3SShuo Chen } 5365c497a3SShuo Chen else 5465c497a3SShuo Chen { 5565c497a3SShuo Chen t_loopInThisThread = this; 5665c497a3SShuo Chen } 5765c497a3SShuo Chen wakeupChannel_->setReadCallback( 5865c497a3SShuo Chen boost::bind(&EventLoop::handleRead, this)); 5965c497a3SShuo Chen // we are always reading the wakeupfd 6065c497a3SShuo Chen wakeupChannel_->enableReading(); 6165c497a3SShuo Chen} 6265c497a3SShuo Chen 6365c497a3SShuo ChenEventLoop::~EventLoop() 6465c497a3SShuo Chen{ 6565c497a3SShuo Chen assert(!looping_); 6665c497a3SShuo Chen ::close(wakeupFd_); 6765c497a3SShuo Chen t_loopInThisThread = NULL; 6865c497a3SShuo Chen} 6965c497a3SShuo Chen 7065c497a3SShuo Chenvoid EventLoop::loop() 7165c497a3SShuo Chen{ 7265c497a3SShuo Chen assert(!looping_); 7365c497a3SShuo Chen assertInLoopThread(); 7465c497a3SShuo Chen looping_ = true; 7565c497a3SShuo Chen quit_ = false; 7665c497a3SShuo Chen 7765c497a3SShuo Chen while (!quit_) 7865c497a3SShuo Chen { 7965c497a3SShuo Chen activeChannels_.clear(); 8065c497a3SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 8165c497a3SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 8265c497a3SShuo Chen it != activeChannels_.end(); ++it) 8365c497a3SShuo Chen { 84714cd85fSShuo Chen (*it)->handleEvent(pollReturnTime_); 8565c497a3SShuo Chen } 8665c497a3SShuo Chen doPendingFunctors(); 8765c497a3SShuo Chen } 8865c497a3SShuo Chen 8965c497a3SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 9065c497a3SShuo Chen looping_ = false; 9165c497a3SShuo Chen} 9265c497a3SShuo Chen 9365c497a3SShuo Chenvoid EventLoop::quit() 9465c497a3SShuo Chen{ 9565c497a3SShuo Chen quit_ = true; 9665c497a3SShuo Chen if (!isInLoopThread()) 9765c497a3SShuo Chen { 9865c497a3SShuo Chen wakeup(); 9965c497a3SShuo Chen } 10065c497a3SShuo Chen} 10165c497a3SShuo Chen 10265c497a3SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 10365c497a3SShuo Chen{ 10465c497a3SShuo Chen if (isInLoopThread()) 10565c497a3SShuo Chen { 10665c497a3SShuo Chen cb(); 10765c497a3SShuo Chen } 10865c497a3SShuo Chen else 10965c497a3SShuo Chen { 11065c497a3SShuo Chen queueInLoop(cb); 11165c497a3SShuo Chen } 11265c497a3SShuo Chen} 11365c497a3SShuo Chen 11465c497a3SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 11565c497a3SShuo Chen{ 11665c497a3SShuo Chen { 11765c497a3SShuo Chen MutexLockGuard lock(mutex_); 11865c497a3SShuo Chen pendingFunctors_.push_back(cb); 11965c497a3SShuo Chen } 12065c497a3SShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 12265c497a3SShuo Chen { 12365c497a3SShuo Chen wakeup(); 12465c497a3SShuo Chen } 12565c497a3SShuo Chen} 12665c497a3SShuo Chen 12765c497a3SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 12865c497a3SShuo Chen{ 12965c497a3SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 13065c497a3SShuo Chen} 13165c497a3SShuo Chen 13265c497a3SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 13365c497a3SShuo Chen{ 13465c497a3SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 13565c497a3SShuo Chen return runAt(time, cb); 13665c497a3SShuo Chen} 13765c497a3SShuo Chen 13865c497a3SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 13965c497a3SShuo Chen{ 14065c497a3SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 14165c497a3SShuo Chen return timerQueue_->addTimer(cb, time, interval); 14265c497a3SShuo Chen} 14365c497a3SShuo Chen 14465c497a3SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 14565c497a3SShuo Chen{ 14665c497a3SShuo Chen assert(channel->ownerLoop() == this); 14765c497a3SShuo Chen assertInLoopThread(); 14865c497a3SShuo Chen poller_->updateChannel(channel); 14965c497a3SShuo Chen} 15065c497a3SShuo Chen 15165c497a3SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 15265c497a3SShuo Chen{ 15365c497a3SShuo Chen assert(channel->ownerLoop() == this); 15465c497a3SShuo Chen assertInLoopThread(); 15565c497a3SShuo Chen poller_->removeChannel(channel); 15665c497a3SShuo Chen} 15765c497a3SShuo Chen 15865c497a3SShuo Chenvoid EventLoop::abortNotInLoopThread() 15965c497a3SShuo Chen{ 16065c497a3SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 16165c497a3SShuo Chen << " was created in threadId_ = " << threadId_ 16265c497a3SShuo Chen << ", current thread id = " << CurrentThread::tid(); 16365c497a3SShuo Chen} 16465c497a3SShuo Chen 16565c497a3SShuo Chenvoid EventLoop::wakeup() 16665c497a3SShuo Chen{ 16765c497a3SShuo Chen uint64_t one = 1; 16865c497a3SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 16965c497a3SShuo Chen if (n != sizeof one) 17065c497a3SShuo Chen { 17165c497a3SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 17265c497a3SShuo Chen } 17365c497a3SShuo Chen} 17465c497a3SShuo Chen 17565c497a3SShuo Chenvoid EventLoop::handleRead() 17665c497a3SShuo Chen{ 17765c497a3SShuo Chen uint64_t one = 1; 17865c497a3SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 17965c497a3SShuo Chen if (n != sizeof one) 18065c497a3SShuo Chen { 18165c497a3SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 18265c497a3SShuo Chen } 18365c497a3SShuo Chen} 18465c497a3SShuo Chen 18565c497a3SShuo Chenvoid EventLoop::doPendingFunctors() 18665c497a3SShuo Chen{ 18765c497a3SShuo Chen std::vector<Functor> functors; 18865c497a3SShuo Chen callingPendingFunctors_ = true; 18965c497a3SShuo Chen 19065c497a3SShuo Chen { 19165c497a3SShuo Chen MutexLockGuard lock(mutex_); 19265c497a3SShuo Chen functors.swap(pendingFunctors_); 19365c497a3SShuo Chen } 19465c497a3SShuo Chen 19565c497a3SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 19665c497a3SShuo Chen { 19765c497a3SShuo Chen functors[i](); 19865c497a3SShuo Chen } 19965c497a3SShuo Chen callingPendingFunctors_ = false; 20065c497a3SShuo Chen} 20165c497a3SShuo Chen 202