1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "EventLoop.h" 9 10#include "Channel.h" 11#include "Poller.h" 12#include "TimerQueue.h" 13 14#include "logging/Logging.h" 15 16#include <boost/bind.hpp> 17 18#include <assert.h> 19#include <sys/eventfd.h> 20 21using namespace muduo; 22 23__thread EventLoop* t_loopInThisThread = 0; 24const int kPollTimeMs = 10000; 25 26static int createEventfd() 27{ 28 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29 if (evtfd < 0) 30 { 31 LOG_SYSERR << "Failed in eventfd"; 32 abort(); 33 } 34 return evtfd; 35} 36 37EventLoop::EventLoop() 38 : looping_(false), 39 quit_(false), 40 callingPendingFunctors_(false), 41 threadId_(CurrentThread::tid()), 42 poller_(new Poller(this)), 43 timerQueue_(new TimerQueue(this)), 44 wakeupFd_(createEventfd()), 45 wakeupChannel_(new Channel(this, wakeupFd_)) 46{ 47 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48 if (t_loopInThisThread) 49 { 50 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51 << " exists in this thread " << threadId_; 52 } 53 else 54 { 55 t_loopInThisThread = this; 56 } 57 wakeupChannel_->setReadCallback( 58 boost::bind(&EventLoop::handleRead, this)); 59 // we are always reading the wakeupfd 60 wakeupChannel_->enableReading(); 61} 62 63EventLoop::~EventLoop() 64{ 65 assert(!looping_); 66 ::close(wakeupFd_); 67 t_loopInThisThread = NULL; 68} 69 70void EventLoop::loop() 71{ 72 assert(!looping_); 73 assertInLoopThread(); 74 looping_ = true; 75 quit_ = false; 76 77 while (!quit_) 78 { 79 activeChannels_.clear(); 80 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81 for (ChannelList::iterator it = activeChannels_.begin(); 82 it != activeChannels_.end(); ++it) 83 { 84 (*it)->handleEvent(pollReturnTime_); 85 } 86 doPendingFunctors(); 87 } 88 89 LOG_TRACE << "EventLoop " << this << " stop looping"; 90 looping_ = false; 91} 92 93void EventLoop::quit() 94{ 95 quit_ = true; 96 if (!isInLoopThread()) 97 { 98 wakeup(); 99 } 100} 101 102void EventLoop::runInLoop(const Functor& cb) 103{ 104 if (isInLoopThread()) 105 { 106 cb(); 107 } 108 else 109 { 110 queueInLoop(cb); 111 } 112} 113 114void EventLoop::queueInLoop(const Functor& cb) 115{ 116 { 117 MutexLockGuard lock(mutex_); 118 pendingFunctors_.push_back(cb); 119 } 120 121 if (!isInLoopThread() || callingPendingFunctors_) 122 { 123 wakeup(); 124 } 125} 126 127TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 128{ 129 return timerQueue_->addTimer(cb, time, 0.0); 130} 131 132TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 133{ 134 Timestamp time(addTime(Timestamp::now(), delay)); 135 return runAt(time, cb); 136} 137 138TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 139{ 140 Timestamp time(addTime(Timestamp::now(), interval)); 141 return timerQueue_->addTimer(cb, time, interval); 142} 143 144void EventLoop::updateChannel(Channel* channel) 145{ 146 assert(channel->ownerLoop() == this); 147 assertInLoopThread(); 148 poller_->updateChannel(channel); 149} 150 151void EventLoop::removeChannel(Channel* channel) 152{ 153 assert(channel->ownerLoop() == this); 154 assertInLoopThread(); 155 poller_->removeChannel(channel); 156} 157 158void EventLoop::abortNotInLoopThread() 159{ 160 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 161 << " was created in threadId_ = " << threadId_ 162 << ", current thread id = " << CurrentThread::tid(); 163} 164 165void EventLoop::wakeup() 166{ 167 uint64_t one = 1; 168 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 169 if (n != sizeof one) 170 { 171 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 172 } 173} 174 175void EventLoop::handleRead() 176{ 177 uint64_t one = 1; 178 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 179 if (n != sizeof one) 180 { 181 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 182 } 183} 184 185void EventLoop::doPendingFunctors() 186{ 187 std::vector<Functor> functors; 188 callingPendingFunctors_ = true; 189 190 { 191 MutexLockGuard lock(mutex_); 192 functors.swap(pendingFunctors_); 193 } 194 195 for (size_t i = 0; i < functors.size(); ++i) 196 { 197 functors[i](); 198 } 199 callingPendingFunctors_ = false; 200} 201 202