1566406ccSShuo Chen // excerpts from http://code.google.com/p/muduo/ 2566406ccSShuo Chen // 3566406ccSShuo Chen // Use of this source code is governed by a BSD-style license 4566406ccSShuo Chen // that can be found in the License file. 5566406ccSShuo Chen // 6566406ccSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 7566406ccSShuo Chen 8566406ccSShuo Chen #include "EventLoop.h" 9566406ccSShuo Chen 10566406ccSShuo Chen #include "Channel.h" 11566406ccSShuo Chen #include "Poller.h" 12566406ccSShuo Chen #include "TimerQueue.h" 13566406ccSShuo Chen 14566406ccSShuo Chen #include "logging/Logging.h" 15566406ccSShuo Chen 16566406ccSShuo Chen+#include <boost/bind.hpp> 17566406ccSShuo Chen+ 18566406ccSShuo Chen #include <assert.h> 19566406ccSShuo Chen+#include <sys/eventfd.h> 20566406ccSShuo Chen 21566406ccSShuo Chen using namespace muduo; 22566406ccSShuo Chen 23566406ccSShuo Chen __thread EventLoop* t_loopInThisThread = 0; 24566406ccSShuo Chen const int kPollTimeMs = 10000; 25566406ccSShuo Chen 26566406ccSShuo Chen+static int createEventfd() 27566406ccSShuo Chen+{ 28566406ccSShuo Chen+ int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29566406ccSShuo Chen+ if (evtfd < 0) 30566406ccSShuo Chen+ { 31566406ccSShuo Chen+ LOG_SYSERR << "Failed in eventfd"; 32566406ccSShuo Chen+ abort(); 33566406ccSShuo Chen+ } 34566406ccSShuo Chen+ return evtfd; 35566406ccSShuo Chen+} 36566406ccSShuo Chen+ 37566406ccSShuo Chen EventLoop::EventLoop() 38566406ccSShuo Chen : looping_(false), 39566406ccSShuo Chen quit_(false), 40566406ccSShuo Chen+ callingPendingFunctors_(false), 41566406ccSShuo Chen threadId_(CurrentThread::tid()), 42566406ccSShuo Chen poller_(new Poller(this)), 43566406ccSShuo Chen timerQueue_(new TimerQueue(this)), 44566406ccSShuo Chen+ wakeupFd_(createEventfd()), 45566406ccSShuo Chen+ wakeupChannel_(new Channel(this, wakeupFd_)) 46566406ccSShuo Chen { 47566406ccSShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48566406ccSShuo Chen if (t_loopInThisThread) 49566406ccSShuo Chen { 50566406ccSShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51566406ccSShuo Chen << " exists in this thread " << threadId_; 52566406ccSShuo Chen } 53566406ccSShuo Chen else 54566406ccSShuo Chen { 55566406ccSShuo Chen t_loopInThisThread = this; 56566406ccSShuo Chen } 57566406ccSShuo Chen+ wakeupChannel_->setReadCallback( 58566406ccSShuo Chen+ boost::bind(&EventLoop::handleRead, this)); 59566406ccSShuo Chen+ // we are always reading the wakeupfd 60566406ccSShuo Chen+ wakeupChannel_->enableReading(); 61566406ccSShuo Chen } 62566406ccSShuo Chen 63566406ccSShuo Chen EventLoop::~EventLoop() 64566406ccSShuo Chen { 65566406ccSShuo Chen assert(!looping_); 66566406ccSShuo Chen+ ::close(wakeupFd_); 67566406ccSShuo Chen t_loopInThisThread = NULL; 68566406ccSShuo Chen } 69566406ccSShuo Chen 70566406ccSShuo Chen void EventLoop::loop() 71566406ccSShuo Chen { 72566406ccSShuo Chen assert(!looping_); 73566406ccSShuo Chen assertInLoopThread(); 74566406ccSShuo Chen looping_ = true; 75566406ccSShuo Chen quit_ = false; 76566406ccSShuo Chen 77566406ccSShuo Chen while (!quit_) 78566406ccSShuo Chen { 79566406ccSShuo Chen activeChannels_.clear(); 80566406ccSShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81566406ccSShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 82566406ccSShuo Chen it != activeChannels_.end(); ++it) 83566406ccSShuo Chen { 84566406ccSShuo Chen (*it)->handleEvent(); 85566406ccSShuo Chen } 86566406ccSShuo Chen+ doPendingFunctors(); 87566406ccSShuo Chen } 88566406ccSShuo Chen 89566406ccSShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 90566406ccSShuo Chen looping_ = false; 91566406ccSShuo Chen } 92566406ccSShuo Chen 93566406ccSShuo Chen void EventLoop::quit() 94566406ccSShuo Chen { 95566406ccSShuo Chen quit_ = true; 96566406ccSShuo Chen+ if (!isInLoopThread()) 97566406ccSShuo Chen+ { 98566406ccSShuo Chen+ wakeup(); 99566406ccSShuo Chen+ } 100b4a5ce52SShuo Chen } 101b4a5ce52SShuo Chen 102566406ccSShuo Chen+void EventLoop::runInLoop(const Functor& cb) 103566406ccSShuo Chen+{ 104566406ccSShuo Chen+ if (isInLoopThread()) 105566406ccSShuo Chen+ { 106566406ccSShuo Chen+ cb(); 107566406ccSShuo Chen+ } 108566406ccSShuo Chen+ else 109566406ccSShuo Chen+ { 110566406ccSShuo Chen+ queueInLoop(cb); 111566406ccSShuo Chen+ } 112566406ccSShuo Chen+} 113566406ccSShuo Chen+ 114566406ccSShuo Chen+void EventLoop::queueInLoop(const Functor& cb) 115566406ccSShuo Chen+{ 116566406ccSShuo Chen+ { 117566406ccSShuo Chen+ MutexLockGuard lock(mutex_); 118566406ccSShuo Chen+ pendingFunctors_.push_back(cb); 119566406ccSShuo Chen+ } 120566406ccSShuo Chen+ 1210f776063SShuo Chen+ if (!isInLoopThread() || callingPendingFunctors_) 122566406ccSShuo Chen+ { 123566406ccSShuo Chen+ wakeup(); 124566406ccSShuo Chen+ } 125b4a5ce52SShuo Chen+} 126566406ccSShuo Chen 127566406ccSShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 128566406ccSShuo Chen { 129566406ccSShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 130566406ccSShuo Chen } 131566406ccSShuo Chen 132566406ccSShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 133566406ccSShuo Chen { 134566406ccSShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 135566406ccSShuo Chen return runAt(time, cb); 136566406ccSShuo Chen } 137566406ccSShuo Chen 138566406ccSShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 139566406ccSShuo Chen { 140566406ccSShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 141566406ccSShuo Chen return timerQueue_->addTimer(cb, time, interval); 142566406ccSShuo Chen } 143566406ccSShuo Chen 144566406ccSShuo Chen void EventLoop::updateChannel(Channel* channel) 145566406ccSShuo Chen { 146566406ccSShuo Chen assert(channel->ownerLoop() == this); 147566406ccSShuo Chen assertInLoopThread(); 148566406ccSShuo Chen poller_->updateChannel(channel); 149566406ccSShuo Chen } 150566406ccSShuo Chen 151566406ccSShuo Chen void EventLoop::abortNotInLoopThread() 152566406ccSShuo Chen { 153566406ccSShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 154566406ccSShuo Chen << " was created in threadId_ = " << threadId_ 155566406ccSShuo Chen << ", current thread id = " << CurrentThread::tid(); 156566406ccSShuo Chen } 157566406ccSShuo Chen 158566406ccSShuo Chen+void EventLoop::wakeup() 159566406ccSShuo Chen+{ 160566406ccSShuo Chen+ uint64_t one = 1; 161566406ccSShuo Chen+ ssize_t n = ::write(wakeupFd_, &one, sizeof one); 162566406ccSShuo Chen+ if (n != sizeof one) 163566406ccSShuo Chen+ { 164566406ccSShuo Chen+ LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 165566406ccSShuo Chen+ } 166566406ccSShuo Chen+} 167566406ccSShuo Chen+ 168566406ccSShuo Chen+void EventLoop::handleRead() 169566406ccSShuo Chen+{ 170566406ccSShuo Chen+ uint64_t one = 1; 171566406ccSShuo Chen+ ssize_t n = ::read(wakeupFd_, &one, sizeof one); 172566406ccSShuo Chen+ if (n != sizeof one) 173566406ccSShuo Chen+ { 174566406ccSShuo Chen+ LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 175566406ccSShuo Chen+ } 176566406ccSShuo Chen+} 177566406ccSShuo Chen+ 178566406ccSShuo Chen+void EventLoop::doPendingFunctors() 179566406ccSShuo Chen+{ 180566406ccSShuo Chen+ std::vector<Functor> functors; 181566406ccSShuo Chen+ callingPendingFunctors_ = true; 182566406ccSShuo Chen+ 183566406ccSShuo Chen+ { 184566406ccSShuo Chen+ MutexLockGuard lock(mutex_); 185566406ccSShuo Chen+ functors.swap(pendingFunctors_); 186566406ccSShuo Chen+ } 187566406ccSShuo Chen+ 188566406ccSShuo Chen+ for (size_t i = 0; i < functors.size(); ++i) 189566406ccSShuo Chen+ { 190566406ccSShuo Chen+ functors[i](); 191566406ccSShuo Chen+ } 192566406ccSShuo Chen+ callingPendingFunctors_ = false; 193566406ccSShuo Chen+} 194566406ccSShuo Chen+ 195