1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "EventLoop.h" 9 10#include "Channel.h" 11#include "Poller.h" 12#include "TimerQueue.h" 13 14#include "logging/Logging.h" 15 16#include <boost/bind.hpp> 17 18#include <assert.h> 19#include <sys/eventfd.h> 20 21using namespace muduo; 22 23__thread EventLoop* t_loopInThisThread = 0; 24const int kPollTimeMs = 10000; 25 26static int createEventfd() 27{ 28 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29 if (evtfd < 0) 30 { 31 LOG_SYSERR << "Failed in eventfd"; 32 abort(); 33 } 34 return evtfd; 35} 36 37EventLoop::EventLoop() 38 : looping_(false), 39 quit_(false), 40 callingPendingFunctors_(false), 41 threadId_(CurrentThread::tid()), 42 poller_(new Poller(this)), 43 timerQueue_(new TimerQueue(this)), 44 wakeupFd_(createEventfd()), 45 wakeupChannel_(new Channel(this, wakeupFd_)) 46{ 47 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48 if (t_loopInThisThread) 49 { 50 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51 << " exists in this thread " << threadId_; 52 } 53 else 54 { 55 t_loopInThisThread = this; 56 } 57 wakeupChannel_->setReadCallback( 58 boost::bind(&EventLoop::handleRead, this)); 59 // we are always reading the wakeupfd 60 wakeupChannel_->enableReading(); 61} 62 63EventLoop::~EventLoop() 64{ 65 assert(!looping_); 66 ::close(wakeupFd_); 67 t_loopInThisThread = NULL; 68} 69 70void EventLoop::loop() 71{ 72 assert(!looping_); 73 assertInLoopThread(); 74 looping_ = true; 75 quit_ = false; 76 77 while (!quit_) 78 { 79 activeChannels_.clear(); 80 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81 for (ChannelList::iterator it = activeChannels_.begin(); 82 it != activeChannels_.end(); ++it) 83 { 84 (*it)->handleEvent(); 85 } 86 doPendingFunctors(); 87 } 88 89 LOG_TRACE << "EventLoop " << this << " stop looping"; 90 looping_ = false; 91} 92 93void EventLoop::quit() 94{ 95 quit_ = true; 96 if (!isInLoopThread()) 97 { 98 wakeup(); 99 } 100} 101 102void EventLoop::runInLoop(const Functor& cb) 103{ 104 if (isInLoopThread()) 105 { 106 cb(); 107 } 108 else 109 { 110 queueInLoop(cb); 111 } 112} 113 114void EventLoop::queueInLoop(const Functor& cb) 115{ 116 { 117 MutexLockGuard lock(mutex_); 118 pendingFunctors_.push_back(cb); 119 } 120 121 if (!isInLoopThread() || callingPendingFunctors_) 122 { 123 wakeup(); 124 } 125} 126 127TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 128{ 129 return timerQueue_->addTimer(cb, time, 0.0); 130} 131 132TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 133{ 134 Timestamp time(addTime(Timestamp::now(), delay)); 135 return runAt(time, cb); 136} 137 138TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 139{ 140 Timestamp time(addTime(Timestamp::now(), interval)); 141 return timerQueue_->addTimer(cb, time, interval); 142} 143 144void EventLoop::updateChannel(Channel* channel) 145{ 146 assert(channel->ownerLoop() == this); 147 assertInLoopThread(); 148 poller_->updateChannel(channel); 149} 150 151void EventLoop::abortNotInLoopThread() 152{ 153 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 154 << " was created in threadId_ = " << threadId_ 155 << ", current thread id = " << CurrentThread::tid(); 156} 157 158void EventLoop::wakeup() 159{ 160 uint64_t one = 1; 161 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 162 if (n != sizeof one) 163 { 164 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 165 } 166} 167 168void EventLoop::handleRead() 169{ 170 uint64_t one = 1; 171 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 172 if (n != sizeof one) 173 { 174 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 175 } 176} 177 178void EventLoop::doPendingFunctors() 179{ 180 std::vector<Functor> functors; 181 callingPendingFunctors_ = true; 182 183 { 184 MutexLockGuard lock(mutex_); 185 functors.swap(pendingFunctors_); 186 } 187 188 for (size_t i = 0; i < functors.size(); ++i) 189 { 190 functors[i](); 191 } 192 callingPendingFunctors_ = false; 193} 194 195