1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "EventLoop.h" 9 10#include "Channel.h" 11#include "Poller.h" 12#include "TimerQueue.h" 13 14#include "logging/Logging.h" 15 16#include <boost/bind.hpp> 17 18#include <assert.h> 19#include <signal.h> 20#include <sys/eventfd.h> 21 22using namespace muduo; 23 24__thread EventLoop* t_loopInThisThread = 0; 25const int kPollTimeMs = 10000; 26 27static int createEventfd() 28{ 29 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30 if (evtfd < 0) 31 { 32 LOG_SYSERR << "Failed in eventfd"; 33 abort(); 34 } 35 return evtfd; 36} 37 38class IgnoreSigPipe 39{ 40 public: 41 IgnoreSigPipe() 42 { 43 ::signal(SIGPIPE, SIG_IGN); 44 } 45}; 46 47IgnoreSigPipe initObj; 48 49EventLoop::EventLoop() 50 : looping_(false), 51 quit_(false), 52 callingPendingFunctors_(false), 53 threadId_(CurrentThread::tid()), 54 poller_(new Poller(this)), 55 timerQueue_(new TimerQueue(this)), 56 wakeupFd_(createEventfd()), 57 wakeupChannel_(new Channel(this, wakeupFd_)) 58{ 59 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60 if (t_loopInThisThread) 61 { 62 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63 << " exists in this thread " << threadId_; 64 } 65 else 66 { 67 t_loopInThisThread = this; 68 } 69 wakeupChannel_->setReadCallback( 70 boost::bind(&EventLoop::handleRead, this)); 71 // we are always reading the wakeupfd 72 wakeupChannel_->enableReading(); 73} 74 75EventLoop::~EventLoop() 76{ 77 assert(!looping_); 78 ::close(wakeupFd_); 79 t_loopInThisThread = NULL; 80} 81 82void EventLoop::loop() 83{ 84 assert(!looping_); 85 assertInLoopThread(); 86 looping_ = true; 87 quit_ = false; 88 89 while (!quit_) 90 { 91 activeChannels_.clear(); 92 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93 for (ChannelList::iterator it = activeChannels_.begin(); 94 it != activeChannels_.end(); ++it) 95 { 96 (*it)->handleEvent(pollReturnTime_); 97 } 98 doPendingFunctors(); 99 } 100 101 LOG_TRACE << "EventLoop " << this << " stop looping"; 102 looping_ = false; 103} 104 105void EventLoop::quit() 106{ 107 quit_ = true; 108 if (!isInLoopThread()) 109 { 110 wakeup(); 111 } 112} 113 114void EventLoop::runInLoop(const Functor& cb) 115{ 116 if (isInLoopThread()) 117 { 118 cb(); 119 } 120 else 121 { 122 queueInLoop(cb); 123 } 124} 125 126void EventLoop::queueInLoop(const Functor& cb) 127{ 128 { 129 MutexLockGuard lock(mutex_); 130 pendingFunctors_.push_back(cb); 131 } 132 133 if (!isInLoopThread() || callingPendingFunctors_) 134 { 135 wakeup(); 136 } 137} 138 139TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140{ 141 return timerQueue_->addTimer(cb, time, 0.0); 142} 143 144TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145{ 146 Timestamp time(addTime(Timestamp::now(), delay)); 147 return runAt(time, cb); 148} 149 150TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151{ 152 Timestamp time(addTime(Timestamp::now(), interval)); 153 return timerQueue_->addTimer(cb, time, interval); 154} 155 156void EventLoop::updateChannel(Channel* channel) 157{ 158 assert(channel->ownerLoop() == this); 159 assertInLoopThread(); 160 poller_->updateChannel(channel); 161} 162 163void EventLoop::removeChannel(Channel* channel) 164{ 165 assert(channel->ownerLoop() == this); 166 assertInLoopThread(); 167 poller_->removeChannel(channel); 168} 169 170void EventLoop::abortNotInLoopThread() 171{ 172 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 173 << " was created in threadId_ = " << threadId_ 174 << ", current thread id = " << CurrentThread::tid(); 175} 176 177void EventLoop::wakeup() 178{ 179 uint64_t one = 1; 180 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 181 if (n != sizeof one) 182 { 183 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 184 } 185} 186 187void EventLoop::handleRead() 188{ 189 uint64_t one = 1; 190 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 191 if (n != sizeof one) 192 { 193 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 194 } 195} 196 197void EventLoop::doPendingFunctors() 198{ 199 std::vector<Functor> functors; 200 callingPendingFunctors_ = true; 201 202 { 203 MutexLockGuard lock(mutex_); 204 functors.swap(pendingFunctors_); 205 } 206 207 for (size_t i = 0; i < functors.size(); ++i) 208 { 209 functors[i](); 210 } 211 callingPendingFunctors_ = false; 212} 213 214