1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "EventLoop.h" 9 10 #include "Channel.h" 11 #include "Poller.h" 12 #include "TimerQueue.h" 13 14 #include "logging/Logging.h" 15 16 #include <boost/bind.hpp> 17 18 #include <assert.h> 19+#include <signal.h> 20 #include <sys/eventfd.h> 21 22 using namespace muduo; 23 24 __thread EventLoop* t_loopInThisThread = 0; 25 const int kPollTimeMs = 10000; 26 27 static int createEventfd() 28 { 29 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30 if (evtfd < 0) 31 { 32 LOG_SYSERR << "Failed in eventfd"; 33 abort(); 34 } 35 return evtfd; 36 } 37 38+class IgnoreSigPipe 39+{ 40+ public: 41+ IgnoreSigPipe() 42+ { 43+ ::signal(SIGPIPE, SIG_IGN); 44+ } 45+}; 46+ 47+IgnoreSigPipe initObj; 48+ 49 EventLoop::EventLoop() 50 : looping_(false), 51 quit_(false), 52 callingPendingFunctors_(false), 53 threadId_(CurrentThread::tid()), 54 poller_(new Poller(this)), 55 timerQueue_(new TimerQueue(this)), 56 wakeupFd_(createEventfd()), 57 wakeupChannel_(new Channel(this, wakeupFd_)) 58 { 59 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60 if (t_loopInThisThread) 61 { 62 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63 << " exists in this thread " << threadId_; 64 } 65 else 66 { 67 t_loopInThisThread = this; 68 } 69 wakeupChannel_->setReadCallback( 70 boost::bind(&EventLoop::handleRead, this)); 71 // we are always reading the wakeupfd 72 wakeupChannel_->enableReading(); 73 } 74 75 EventLoop::~EventLoop() 76 { 77 assert(!looping_); 78 ::close(wakeupFd_); 79 t_loopInThisThread = NULL; 80 } 81 82 void EventLoop::loop() 83 { 84 assert(!looping_); 85 assertInLoopThread(); 86 looping_ = true; 87 quit_ = false; 88 89 while (!quit_) 90 { 91 activeChannels_.clear(); 92 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93 for (ChannelList::iterator it = activeChannels_.begin(); 94 it != activeChannels_.end(); ++it) 95 { 96 (*it)->handleEvent(pollReturnTime_); 97 } 98 doPendingFunctors(); 99 } 100 101 LOG_TRACE << "EventLoop " << this << " stop looping"; 102 looping_ = false; 103 } 104 105 void EventLoop::quit() 106 { 107 quit_ = true; 108 if (!isInLoopThread()) 109 { 110 wakeup(); 111 } 112 } 113 114 void EventLoop::runInLoop(const Functor& cb) 115 { 116 if (isInLoopThread()) 117 { 118 cb(); 119 } 120 else 121 { 122 queueInLoop(cb); 123 } 124 } 125 126 void EventLoop::queueInLoop(const Functor& cb) 127 { 128 { 129 MutexLockGuard lock(mutex_); 130 pendingFunctors_.push_back(cb); 131 } 132 133 if (!isInLoopThread() || callingPendingFunctors_) 134 { 135 wakeup(); 136 } 137 } 138 139 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140 { 141 return timerQueue_->addTimer(cb, time, 0.0); 142 } 143 144 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145 { 146 Timestamp time(addTime(Timestamp::now(), delay)); 147 return runAt(time, cb); 148 } 149 150 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151 { 152 Timestamp time(addTime(Timestamp::now(), interval)); 153 return timerQueue_->addTimer(cb, time, interval); 154 } 155 156 void EventLoop::updateChannel(Channel* channel) 157 { 158 assert(channel->ownerLoop() == this); 159 assertInLoopThread(); 160 poller_->updateChannel(channel); 161 } 162 163 void EventLoop::removeChannel(Channel* channel) 164 { 165 assert(channel->ownerLoop() == this); 166 assertInLoopThread(); 167 poller_->removeChannel(channel); 168 } 169 170 void EventLoop::abortNotInLoopThread() 171 { 172 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 173 << " was created in threadId_ = " << threadId_ 174 << ", current thread id = " << CurrentThread::tid(); 175 } 176 177 void EventLoop::wakeup() 178 { 179 uint64_t one = 1; 180 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 181 if (n != sizeof one) 182 { 183 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 184 } 185 } 186 187 void EventLoop::handleRead() 188 { 189 uint64_t one = 1; 190 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 191 if (n != sizeof one) 192 { 193 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 194 } 195 } 196 197 void EventLoop::doPendingFunctors() 198 { 199 std::vector<Functor> functors; 200 callingPendingFunctors_ = true; 201 202 { 203 MutexLockGuard lock(mutex_); 204 functors.swap(pendingFunctors_); 205 } 206 207 for (size_t i = 0; i < functors.size(); ++i) 208 { 209 functors[i](); 210 } 211 callingPendingFunctors_ = false; 212 } 213 214