1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2354280cfSShuo Chen// 3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license 4354280cfSShuo Chen// that can be found in the License file. 5354280cfSShuo Chen// 6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7354280cfSShuo Chen 8354280cfSShuo Chen#include "EventLoop.h" 9354280cfSShuo Chen 10354280cfSShuo Chen#include "Channel.h" 1170523619SShuo Chen#include "EPoller.h" 12354280cfSShuo Chen#include "TimerQueue.h" 13354280cfSShuo Chen 14354280cfSShuo Chen#include "logging/Logging.h" 15354280cfSShuo Chen 16354280cfSShuo Chen#include <boost/bind.hpp> 17354280cfSShuo Chen 18354280cfSShuo Chen#include <assert.h> 19354280cfSShuo Chen#include <signal.h> 20354280cfSShuo Chen#include <sys/eventfd.h> 21354280cfSShuo Chen 22354280cfSShuo Chenusing namespace muduo; 23354280cfSShuo Chen 24354280cfSShuo Chen__thread EventLoop* t_loopInThisThread = 0; 25354280cfSShuo Chenconst int kPollTimeMs = 10000; 26354280cfSShuo Chen 27354280cfSShuo Chenstatic int createEventfd() 28354280cfSShuo Chen{ 29354280cfSShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30354280cfSShuo Chen if (evtfd < 0) 31354280cfSShuo Chen { 32354280cfSShuo Chen LOG_SYSERR << "Failed in eventfd"; 33354280cfSShuo Chen abort(); 34354280cfSShuo Chen } 35354280cfSShuo Chen return evtfd; 36354280cfSShuo Chen} 37354280cfSShuo Chen 38354280cfSShuo Chenclass IgnoreSigPipe 39354280cfSShuo Chen{ 40354280cfSShuo Chen public: 41354280cfSShuo Chen IgnoreSigPipe() 42354280cfSShuo Chen { 43354280cfSShuo Chen ::signal(SIGPIPE, SIG_IGN); 44354280cfSShuo Chen } 45354280cfSShuo Chen}; 46354280cfSShuo Chen 47354280cfSShuo ChenIgnoreSigPipe initObj; 48354280cfSShuo Chen 49354280cfSShuo ChenEventLoop::EventLoop() 50354280cfSShuo Chen : looping_(false), 51354280cfSShuo Chen quit_(false), 52354280cfSShuo Chen callingPendingFunctors_(false), 53354280cfSShuo Chen threadId_(CurrentThread::tid()), 5470523619SShuo Chen poller_(new EPoller(this)), 55354280cfSShuo Chen timerQueue_(new TimerQueue(this)), 56354280cfSShuo Chen wakeupFd_(createEventfd()), 57354280cfSShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 58354280cfSShuo Chen{ 59354280cfSShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60354280cfSShuo Chen if (t_loopInThisThread) 61354280cfSShuo Chen { 62354280cfSShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63354280cfSShuo Chen << " exists in this thread " << threadId_; 64354280cfSShuo Chen } 65354280cfSShuo Chen else 66354280cfSShuo Chen { 67354280cfSShuo Chen t_loopInThisThread = this; 68354280cfSShuo Chen } 69354280cfSShuo Chen wakeupChannel_->setReadCallback( 70354280cfSShuo Chen boost::bind(&EventLoop::handleRead, this)); 71354280cfSShuo Chen // we are always reading the wakeupfd 72354280cfSShuo Chen wakeupChannel_->enableReading(); 73354280cfSShuo Chen} 74354280cfSShuo Chen 75354280cfSShuo ChenEventLoop::~EventLoop() 76354280cfSShuo Chen{ 77354280cfSShuo Chen assert(!looping_); 78354280cfSShuo Chen ::close(wakeupFd_); 79354280cfSShuo Chen t_loopInThisThread = NULL; 80354280cfSShuo Chen} 81354280cfSShuo Chen 82354280cfSShuo Chenvoid EventLoop::loop() 83354280cfSShuo Chen{ 84354280cfSShuo Chen assert(!looping_); 85354280cfSShuo Chen assertInLoopThread(); 86354280cfSShuo Chen looping_ = true; 87354280cfSShuo Chen quit_ = false; 88354280cfSShuo Chen 89354280cfSShuo Chen while (!quit_) 90354280cfSShuo Chen { 91354280cfSShuo Chen activeChannels_.clear(); 92354280cfSShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93354280cfSShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 94354280cfSShuo Chen it != activeChannels_.end(); ++it) 95354280cfSShuo Chen { 96354280cfSShuo Chen (*it)->handleEvent(pollReturnTime_); 97354280cfSShuo Chen } 98354280cfSShuo Chen doPendingFunctors(); 99354280cfSShuo Chen } 100354280cfSShuo Chen 101354280cfSShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 102354280cfSShuo Chen looping_ = false; 103354280cfSShuo Chen} 104354280cfSShuo Chen 105354280cfSShuo Chenvoid EventLoop::quit() 106354280cfSShuo Chen{ 107354280cfSShuo Chen quit_ = true; 108354280cfSShuo Chen if (!isInLoopThread()) 109354280cfSShuo Chen { 110354280cfSShuo Chen wakeup(); 111354280cfSShuo Chen } 112354280cfSShuo Chen} 113354280cfSShuo Chen 114354280cfSShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 115354280cfSShuo Chen{ 116354280cfSShuo Chen if (isInLoopThread()) 117354280cfSShuo Chen { 118354280cfSShuo Chen cb(); 119354280cfSShuo Chen } 120354280cfSShuo Chen else 121354280cfSShuo Chen { 122354280cfSShuo Chen queueInLoop(cb); 123354280cfSShuo Chen } 124354280cfSShuo Chen} 125354280cfSShuo Chen 126354280cfSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 127354280cfSShuo Chen{ 128354280cfSShuo Chen { 129354280cfSShuo Chen MutexLockGuard lock(mutex_); 130354280cfSShuo Chen pendingFunctors_.push_back(cb); 131354280cfSShuo Chen } 132354280cfSShuo Chen 133354280cfSShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 134354280cfSShuo Chen { 135354280cfSShuo Chen wakeup(); 136354280cfSShuo Chen } 137354280cfSShuo Chen} 138354280cfSShuo Chen 139354280cfSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140354280cfSShuo Chen{ 141354280cfSShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 142354280cfSShuo Chen} 143354280cfSShuo Chen 144354280cfSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145354280cfSShuo Chen{ 146354280cfSShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 147354280cfSShuo Chen return runAt(time, cb); 148354280cfSShuo Chen} 149354280cfSShuo Chen 150354280cfSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151354280cfSShuo Chen{ 152354280cfSShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 153354280cfSShuo Chen return timerQueue_->addTimer(cb, time, interval); 154354280cfSShuo Chen} 155354280cfSShuo Chen 156354280cfSShuo Chenvoid EventLoop::cancel(TimerId timerId) 157354280cfSShuo Chen{ 158354280cfSShuo Chen return timerQueue_->cancel(timerId); 159354280cfSShuo Chen} 160354280cfSShuo Chen 161354280cfSShuo Chenvoid EventLoop::updateChannel(Channel* channel) 162354280cfSShuo Chen{ 163354280cfSShuo Chen assert(channel->ownerLoop() == this); 164354280cfSShuo Chen assertInLoopThread(); 165354280cfSShuo Chen poller_->updateChannel(channel); 166354280cfSShuo Chen} 167354280cfSShuo Chen 168354280cfSShuo Chenvoid EventLoop::removeChannel(Channel* channel) 169354280cfSShuo Chen{ 170354280cfSShuo Chen assert(channel->ownerLoop() == this); 171354280cfSShuo Chen assertInLoopThread(); 172354280cfSShuo Chen poller_->removeChannel(channel); 173354280cfSShuo Chen} 174354280cfSShuo Chen 175354280cfSShuo Chenvoid EventLoop::abortNotInLoopThread() 176354280cfSShuo Chen{ 177354280cfSShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 178354280cfSShuo Chen << " was created in threadId_ = " << threadId_ 179354280cfSShuo Chen << ", current thread id = " << CurrentThread::tid(); 180354280cfSShuo Chen} 181354280cfSShuo Chen 182354280cfSShuo Chenvoid EventLoop::wakeup() 183354280cfSShuo Chen{ 184354280cfSShuo Chen uint64_t one = 1; 185354280cfSShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 186354280cfSShuo Chen if (n != sizeof one) 187354280cfSShuo Chen { 188354280cfSShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 189354280cfSShuo Chen } 190354280cfSShuo Chen} 191354280cfSShuo Chen 192354280cfSShuo Chenvoid EventLoop::handleRead() 193354280cfSShuo Chen{ 194354280cfSShuo Chen uint64_t one = 1; 195354280cfSShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 196354280cfSShuo Chen if (n != sizeof one) 197354280cfSShuo Chen { 198354280cfSShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 199354280cfSShuo Chen } 200354280cfSShuo Chen} 201354280cfSShuo Chen 202354280cfSShuo Chenvoid EventLoop::doPendingFunctors() 203354280cfSShuo Chen{ 204354280cfSShuo Chen std::vector<Functor> functors; 205354280cfSShuo Chen callingPendingFunctors_ = true; 206354280cfSShuo Chen 207354280cfSShuo Chen { 208354280cfSShuo Chen MutexLockGuard lock(mutex_); 209354280cfSShuo Chen functors.swap(pendingFunctors_); 210354280cfSShuo Chen } 211354280cfSShuo Chen 212354280cfSShuo Chen for (size_t i = 0; i < functors.size(); ++i) 213354280cfSShuo Chen { 214354280cfSShuo Chen functors[i](); 215354280cfSShuo Chen } 216354280cfSShuo Chen callingPendingFunctors_ = false; 217354280cfSShuo Chen} 218354280cfSShuo Chen 219