1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "EventLoop.h" 9 10 #include "Channel.h" 11-#include "Poller.h" 12+#include "EPoller.h" 13 #include "TimerQueue.h" 14 15 #include "logging/Logging.h" 16 17 #include <boost/bind.hpp> 18 19 #include <assert.h> 20 #include <signal.h> 21 #include <sys/eventfd.h> 22 23 using namespace muduo; 24 25 __thread EventLoop* t_loopInThisThread = 0; 26 const int kPollTimeMs = 10000; 27 28 static int createEventfd() 29 { 30 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 31 if (evtfd < 0) 32 { 33 LOG_SYSERR << "Failed in eventfd"; 34 abort(); 35 } 36 return evtfd; 37 } 38 39 class IgnoreSigPipe 40 { 41 public: 42 IgnoreSigPipe() 43 { 44 ::signal(SIGPIPE, SIG_IGN); 45 } 46 }; 47 48 IgnoreSigPipe initObj; 49 50 EventLoop::EventLoop() 51 : looping_(false), 52 quit_(false), 53 callingPendingFunctors_(false), 54 threadId_(CurrentThread::tid()), 55- poller_(new Poller(this)), 56+ poller_(new EPoller(this)), 57 timerQueue_(new TimerQueue(this)), 58 wakeupFd_(createEventfd()), 59 wakeupChannel_(new Channel(this, wakeupFd_)) 60 { 61 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 62 if (t_loopInThisThread) 63 { 64 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 65 << " exists in this thread " << threadId_; 66 } 67 else 68 { 69 t_loopInThisThread = this; 70 } 71 wakeupChannel_->setReadCallback( 72 boost::bind(&EventLoop::handleRead, this)); 73 // we are always reading the wakeupfd 74 wakeupChannel_->enableReading(); 75 } 76 77 EventLoop::~EventLoop() 78 { 79 assert(!looping_); 80 ::close(wakeupFd_); 81 t_loopInThisThread = NULL; 82 } 83 84 void EventLoop::loop() 85 { 86 assert(!looping_); 87 assertInLoopThread(); 88 looping_ = true; 89 quit_ = false; 90 91 while (!quit_) 92 { 93 activeChannels_.clear(); 94 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 95 for (ChannelList::iterator it = activeChannels_.begin(); 96 it != activeChannels_.end(); ++it) 97 { 98 (*it)->handleEvent(pollReturnTime_); 99 } 100 doPendingFunctors(); 101 } 102 103 LOG_TRACE << "EventLoop " << this << " stop looping"; 104 looping_ = false; 105 } 106 107 void EventLoop::quit() 108 { 109 quit_ = true; 110 if (!isInLoopThread()) 111 { 112 wakeup(); 113 } 114 } 115 116 void EventLoop::runInLoop(const Functor& cb) 117 { 118 if (isInLoopThread()) 119 { 120 cb(); 121 } 122 else 123 { 124 queueInLoop(cb); 125 } 126 } 127 128 void EventLoop::queueInLoop(const Functor& cb) 129 { 130 { 131 MutexLockGuard lock(mutex_); 132 pendingFunctors_.push_back(cb); 133 } 134 135 if (!isInLoopThread() || callingPendingFunctors_) 136 { 137 wakeup(); 138 } 139 } 140 141 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 142 { 143 return timerQueue_->addTimer(cb, time, 0.0); 144 } 145 146 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 147 { 148 Timestamp time(addTime(Timestamp::now(), delay)); 149 return runAt(time, cb); 150 } 151 152 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 153 { 154 Timestamp time(addTime(Timestamp::now(), interval)); 155 return timerQueue_->addTimer(cb, time, interval); 156 } 157 158 void EventLoop::cancel(TimerId timerId) 159 { 160 return timerQueue_->cancel(timerId); 161 } 162 163 void EventLoop::updateChannel(Channel* channel) 164 { 165 assert(channel->ownerLoop() == this); 166 assertInLoopThread(); 167 poller_->updateChannel(channel); 168 } 169 170 void EventLoop::removeChannel(Channel* channel) 171 { 172 assert(channel->ownerLoop() == this); 173 assertInLoopThread(); 174 poller_->removeChannel(channel); 175 } 176 177 void EventLoop::abortNotInLoopThread() 178 { 179 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 180 << " was created in threadId_ = " << threadId_ 181 << ", current thread id = " << CurrentThread::tid(); 182 } 183 184 void EventLoop::wakeup() 185 { 186 uint64_t one = 1; 187 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 188 if (n != sizeof one) 189 { 190 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 191 } 192 } 193 194 void EventLoop::handleRead() 195 { 196 uint64_t one = 1; 197 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 198 if (n != sizeof one) 199 { 200 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 201 } 202 } 203 204 void EventLoop::doPendingFunctors() 205 { 206 std::vector<Functor> functors; 207 callingPendingFunctors_ = true; 208 209 { 210 MutexLockGuard lock(mutex_); 211 functors.swap(pendingFunctors_); 212 } 213 214 for (size_t i = 0; i < functors.size(); ++i) 215 { 216 functors[i](); 217 } 218 callingPendingFunctors_ = false; 219 } 220 221