140161064SShuo Chen// excerpts from http://code.google.com/p/muduo/ 240161064SShuo Chen// 340161064SShuo Chen// Use of this source code is governed by a BSD-style license 440161064SShuo Chen// that can be found in the License file. 540161064SShuo Chen// 640161064SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 740161064SShuo Chen 840161064SShuo Chen#include "EventLoop.h" 940161064SShuo Chen 1040161064SShuo Chen#include "Channel.h" 1140161064SShuo Chen#include "Poller.h" 1240161064SShuo Chen#include "TimerQueue.h" 1340161064SShuo Chen 1440161064SShuo Chen#include "logging/Logging.h" 1540161064SShuo Chen 1640161064SShuo Chen#include <boost/bind.hpp> 1740161064SShuo Chen 1840161064SShuo Chen#include <assert.h> 1940161064SShuo Chen#include <signal.h> 2040161064SShuo Chen#include <sys/eventfd.h> 2140161064SShuo Chen 2240161064SShuo Chenusing namespace muduo; 2340161064SShuo Chen 2440161064SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 2540161064SShuo Chenconst int kPollTimeMs = 10000; 2640161064SShuo Chen 2740161064SShuo Chenstatic int createEventfd() 2840161064SShuo Chen{ 2940161064SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 3040161064SShuo Chen if (evtfd < 0) 3140161064SShuo Chen { 3240161064SShuo Chen LOG_SYSERR << "Failed in eventfd"; 3340161064SShuo Chen abort(); 3440161064SShuo Chen } 3540161064SShuo Chen return evtfd; 3640161064SShuo Chen} 3740161064SShuo Chen 3840161064SShuo Chenclass IgnoreSigPipe 3940161064SShuo Chen{ 4040161064SShuo Chen public: 4140161064SShuo Chen IgnoreSigPipe() 4240161064SShuo Chen { 4340161064SShuo Chen ::signal(SIGPIPE, SIG_IGN); 4440161064SShuo Chen } 4540161064SShuo Chen}; 4640161064SShuo Chen 4740161064SShuo ChenIgnoreSigPipe initObj; 4840161064SShuo Chen 4940161064SShuo ChenEventLoop::EventLoop() 5040161064SShuo Chen : looping_(false), 5140161064SShuo Chen quit_(false), 5240161064SShuo Chen callingPendingFunctors_(false), 5340161064SShuo Chen threadId_(CurrentThread::tid()), 5440161064SShuo Chen poller_(new Poller(this)), 5540161064SShuo Chen timerQueue_(new TimerQueue(this)), 5640161064SShuo Chen wakeupFd_(createEventfd()), 5740161064SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 5840161064SShuo Chen{ 5940161064SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 6040161064SShuo Chen if (t_loopInThisThread) 6140161064SShuo Chen { 6240161064SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 6340161064SShuo Chen << " exists in this thread " << threadId_; 6440161064SShuo Chen } 6540161064SShuo Chen else 6640161064SShuo Chen { 6740161064SShuo Chen t_loopInThisThread = this; 6840161064SShuo Chen } 6940161064SShuo Chen wakeupChannel_->setReadCallback( 7040161064SShuo Chen boost::bind(&EventLoop::handleRead, this)); 7140161064SShuo Chen // we are always reading the wakeupfd 7240161064SShuo Chen wakeupChannel_->enableReading(); 7340161064SShuo Chen} 7440161064SShuo Chen 7540161064SShuo ChenEventLoop::~EventLoop() 7640161064SShuo Chen{ 7740161064SShuo Chen assert(!looping_); 7840161064SShuo Chen ::close(wakeupFd_); 7940161064SShuo Chen t_loopInThisThread = NULL; 8040161064SShuo Chen} 8140161064SShuo Chen 8240161064SShuo Chenvoid EventLoop::loop() 8340161064SShuo Chen{ 8440161064SShuo Chen assert(!looping_); 8540161064SShuo Chen assertInLoopThread(); 8640161064SShuo Chen looping_ = true; 8740161064SShuo Chen quit_ = false; 8840161064SShuo Chen 8940161064SShuo Chen while (!quit_) 9040161064SShuo Chen { 9140161064SShuo Chen activeChannels_.clear(); 9240161064SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 9340161064SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 9440161064SShuo Chen it != activeChannels_.end(); ++it) 9540161064SShuo Chen { 9640161064SShuo Chen (*it)->handleEvent(pollReturnTime_); 9740161064SShuo Chen } 9840161064SShuo Chen doPendingFunctors(); 9940161064SShuo Chen } 10040161064SShuo Chen 10140161064SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 10240161064SShuo Chen looping_ = false; 10340161064SShuo Chen} 10440161064SShuo Chen 10540161064SShuo Chenvoid EventLoop::quit() 10640161064SShuo Chen{ 10740161064SShuo Chen quit_ = true; 10840161064SShuo Chen if (!isInLoopThread()) 10940161064SShuo Chen { 11040161064SShuo Chen wakeup(); 11140161064SShuo Chen } 11240161064SShuo Chen} 11340161064SShuo Chen 11440161064SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 11540161064SShuo Chen{ 11640161064SShuo Chen if (isInLoopThread()) 11740161064SShuo Chen { 11840161064SShuo Chen cb(); 11940161064SShuo Chen } 12040161064SShuo Chen else 12140161064SShuo Chen { 12240161064SShuo Chen queueInLoop(cb); 12340161064SShuo Chen } 12440161064SShuo Chen} 12540161064SShuo Chen 12640161064SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 12740161064SShuo Chen{ 12840161064SShuo Chen { 12940161064SShuo Chen MutexLockGuard lock(mutex_); 13040161064SShuo Chen pendingFunctors_.push_back(cb); 13140161064SShuo Chen } 13240161064SShuo Chen 13340161064SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 13440161064SShuo Chen { 13540161064SShuo Chen wakeup(); 13640161064SShuo Chen } 13740161064SShuo Chen} 13840161064SShuo Chen 13940161064SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 14040161064SShuo Chen{ 14140161064SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 14240161064SShuo Chen} 14340161064SShuo Chen 14440161064SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 14540161064SShuo Chen{ 14640161064SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 14740161064SShuo Chen return runAt(time, cb); 14840161064SShuo Chen} 14940161064SShuo Chen 15040161064SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 15140161064SShuo Chen{ 15240161064SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 15340161064SShuo Chen return timerQueue_->addTimer(cb, time, interval); 15440161064SShuo Chen} 15540161064SShuo Chen 156f4e8e3d3SShuo Chenvoid EventLoop::cancel(TimerId timerId) 157f4e8e3d3SShuo Chen{ 158f4e8e3d3SShuo Chen return timerQueue_->cancel(timerId); 159f4e8e3d3SShuo Chen} 160f4e8e3d3SShuo Chen 16140161064SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 16240161064SShuo Chen{ 16340161064SShuo Chen assert(channel->ownerLoop() == this); 16440161064SShuo Chen assertInLoopThread(); 16540161064SShuo Chen poller_->updateChannel(channel); 16640161064SShuo Chen} 16740161064SShuo Chen 16840161064SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 16940161064SShuo Chen{ 17040161064SShuo Chen assert(channel->ownerLoop() == this); 17140161064SShuo Chen assertInLoopThread(); 17240161064SShuo Chen poller_->removeChannel(channel); 17340161064SShuo Chen} 17440161064SShuo Chen 17540161064SShuo Chenvoid EventLoop::abortNotInLoopThread() 17640161064SShuo Chen{ 17740161064SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 17840161064SShuo Chen << " was created in threadId_ = " << threadId_ 17940161064SShuo Chen << ", current thread id = " << CurrentThread::tid(); 18040161064SShuo Chen} 18140161064SShuo Chen 18240161064SShuo Chenvoid EventLoop::wakeup() 18340161064SShuo Chen{ 18440161064SShuo Chen uint64_t one = 1; 18540161064SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 18640161064SShuo Chen if (n != sizeof one) 18740161064SShuo Chen { 18840161064SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 18940161064SShuo Chen } 19040161064SShuo Chen} 19140161064SShuo Chen 19240161064SShuo Chenvoid EventLoop::handleRead() 19340161064SShuo Chen{ 19440161064SShuo Chen uint64_t one = 1; 19540161064SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 19640161064SShuo Chen if (n != sizeof one) 19740161064SShuo Chen { 19840161064SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 19940161064SShuo Chen } 20040161064SShuo Chen} 20140161064SShuo Chen 20240161064SShuo Chenvoid EventLoop::doPendingFunctors() 20340161064SShuo Chen{ 20440161064SShuo Chen std::vector<Functor> functors; 20540161064SShuo Chen callingPendingFunctors_ = true; 20640161064SShuo Chen 20740161064SShuo Chen { 20840161064SShuo Chen MutexLockGuard lock(mutex_); 20940161064SShuo Chen functors.swap(pendingFunctors_); 21040161064SShuo Chen } 21140161064SShuo Chen 21240161064SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 21340161064SShuo Chen { 21440161064SShuo Chen functors[i](); 21540161064SShuo Chen } 21640161064SShuo Chen callingPendingFunctors_ = false; 21740161064SShuo Chen} 21840161064SShuo Chen 219