1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "EventLoop.h" 9 10#include "Channel.h" 11#include "EPoller.h" 12#include "TimerQueue.h" 13 14#include "logging/Logging.h" 15 16#include <boost/bind.hpp> 17 18#include <assert.h> 19#include <signal.h> 20#include <sys/eventfd.h> 21 22using namespace muduo; 23 24__thread EventLoop* t_loopInThisThread = 0; 25const int kPollTimeMs = 10000; 26 27static int createEventfd() 28{ 29 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30 if (evtfd < 0) 31 { 32 LOG_SYSERR << "Failed in eventfd"; 33 abort(); 34 } 35 return evtfd; 36} 37 38class IgnoreSigPipe 39{ 40 public: 41 IgnoreSigPipe() 42 { 43 ::signal(SIGPIPE, SIG_IGN); 44 } 45}; 46 47IgnoreSigPipe initObj; 48 49EventLoop::EventLoop() 50 : looping_(false), 51 quit_(false), 52 callingPendingFunctors_(false), 53 threadId_(CurrentThread::tid()), 54 poller_(new EPoller(this)), 55 timerQueue_(new TimerQueue(this)), 56 wakeupFd_(createEventfd()), 57 wakeupChannel_(new Channel(this, wakeupFd_)) 58{ 59 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60 if (t_loopInThisThread) 61 { 62 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63 << " exists in this thread " << threadId_; 64 } 65 else 66 { 67 t_loopInThisThread = this; 68 } 69 wakeupChannel_->setReadCallback( 70 boost::bind(&EventLoop::handleRead, this)); 71 // we are always reading the wakeupfd 72 wakeupChannel_->enableReading(); 73} 74 75EventLoop::~EventLoop() 76{ 77 assert(!looping_); 78 ::close(wakeupFd_); 79 t_loopInThisThread = NULL; 80} 81 82void EventLoop::loop() 83{ 84 assert(!looping_); 85 assertInLoopThread(); 86 looping_ = true; 87 quit_ = false; 88 89 while (!quit_) 90 { 91 activeChannels_.clear(); 92 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93 for (ChannelList::iterator it = activeChannels_.begin(); 94 it != activeChannels_.end(); ++it) 95 { 96 (*it)->handleEvent(pollReturnTime_); 97 } 98 doPendingFunctors(); 99 } 100 101 LOG_TRACE << "EventLoop " << this << " stop looping"; 102 looping_ = false; 103} 104 105void EventLoop::quit() 106{ 107 quit_ = true; 108 if (!isInLoopThread()) 109 { 110 wakeup(); 111 } 112} 113 114void EventLoop::runInLoop(const Functor& cb) 115{ 116 if (isInLoopThread()) 117 { 118 cb(); 119 } 120 else 121 { 122 queueInLoop(cb); 123 } 124} 125 126void EventLoop::queueInLoop(const Functor& cb) 127{ 128 { 129 MutexLockGuard lock(mutex_); 130 pendingFunctors_.push_back(cb); 131 } 132 133 if (!isInLoopThread() || callingPendingFunctors_) 134 { 135 wakeup(); 136 } 137} 138 139TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140{ 141 return timerQueue_->addTimer(cb, time, 0.0); 142} 143 144TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145{ 146 Timestamp time(addTime(Timestamp::now(), delay)); 147 return runAt(time, cb); 148} 149 150TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151{ 152 Timestamp time(addTime(Timestamp::now(), interval)); 153 return timerQueue_->addTimer(cb, time, interval); 154} 155 156void EventLoop::cancel(TimerId timerId) 157{ 158 return timerQueue_->cancel(timerId); 159} 160 161void EventLoop::updateChannel(Channel* channel) 162{ 163 assert(channel->ownerLoop() == this); 164 assertInLoopThread(); 165 poller_->updateChannel(channel); 166} 167 168void EventLoop::removeChannel(Channel* channel) 169{ 170 assert(channel->ownerLoop() == this); 171 assertInLoopThread(); 172 poller_->removeChannel(channel); 173} 174 175void EventLoop::abortNotInLoopThread() 176{ 177 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 178 << " was created in threadId_ = " << threadId_ 179 << ", current thread id = " << CurrentThread::tid(); 180} 181 182void EventLoop::wakeup() 183{ 184 uint64_t one = 1; 185 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 186 if (n != sizeof one) 187 { 188 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 189 } 190} 191 192void EventLoop::handleRead() 193{ 194 uint64_t one = 1; 195 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 196 if (n != sizeof one) 197 { 198 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 199 } 200} 201 202void EventLoop::doPendingFunctors() 203{ 204 std::vector<Functor> functors; 205 callingPendingFunctors_ = true; 206 207 { 208 MutexLockGuard lock(mutex_); 209 functors.swap(pendingFunctors_); 210 } 211 212 for (size_t i = 0; i < functors.size(); ++i) 213 { 214 functors[i](); 215 } 216 callingPendingFunctors_ = false; 217} 218 219