1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "EventLoop.h" 9 10 #include "Channel.h" 11 #include "Poller.h" 12 #include "TimerQueue.h" 13 14 #include "logging/Logging.h" 15 16 #include <boost/bind.hpp> 17 18 #include <assert.h> 19 #include <signal.h> 20 #include <sys/eventfd.h> 21 22 using namespace muduo; 23 24 __thread EventLoop* t_loopInThisThread = 0; 25 const int kPollTimeMs = 10000; 26 27 static int createEventfd() 28 { 29 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30 if (evtfd < 0) 31 { 32 LOG_SYSERR << "Failed in eventfd"; 33 abort(); 34 } 35 return evtfd; 36 } 37 38 class IgnoreSigPipe 39 { 40 public: 41 IgnoreSigPipe() 42 { 43 ::signal(SIGPIPE, SIG_IGN); 44 } 45 }; 46 47 IgnoreSigPipe initObj; 48 49 EventLoop::EventLoop() 50 : looping_(false), 51 quit_(false), 52 callingPendingFunctors_(false), 53 threadId_(CurrentThread::tid()), 54 poller_(new Poller(this)), 55 timerQueue_(new TimerQueue(this)), 56 wakeupFd_(createEventfd()), 57 wakeupChannel_(new Channel(this, wakeupFd_)) 58 { 59 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60 if (t_loopInThisThread) 61 { 62 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63 << " exists in this thread " << threadId_; 64 } 65 else 66 { 67 t_loopInThisThread = this; 68 } 69 wakeupChannel_->setReadCallback( 70 boost::bind(&EventLoop::handleRead, this)); 71 // we are always reading the wakeupfd 72 wakeupChannel_->enableReading(); 73 } 74 75 EventLoop::~EventLoop() 76 { 77 assert(!looping_); 78 ::close(wakeupFd_); 79 t_loopInThisThread = NULL; 80 } 81 82 void EventLoop::loop() 83 { 84 assert(!looping_); 85 assertInLoopThread(); 86 looping_ = true; 87 quit_ = false; 88 89 while (!quit_) 90 { 91 activeChannels_.clear(); 92 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93 for (ChannelList::iterator it = activeChannels_.begin(); 94 it != activeChannels_.end(); ++it) 95 { 96 (*it)->handleEvent(pollReturnTime_); 97 } 98 doPendingFunctors(); 99 } 100 101 LOG_TRACE << "EventLoop " << this << " stop looping"; 102 looping_ = false; 103 } 104 105 void EventLoop::quit() 106 { 107 quit_ = true; 108 if (!isInLoopThread()) 109 { 110 wakeup(); 111 } 112 } 113 114 void EventLoop::runInLoop(const Functor& cb) 115 { 116 if (isInLoopThread()) 117 { 118 cb(); 119 } 120 else 121 { 122 queueInLoop(cb); 123 } 124 } 125 126 void EventLoop::queueInLoop(const Functor& cb) 127 { 128 { 129 MutexLockGuard lock(mutex_); 130 pendingFunctors_.push_back(cb); 131 } 132 133 if (!isInLoopThread() || callingPendingFunctors_) 134 { 135 wakeup(); 136 } 137 } 138 139 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140 { 141 return timerQueue_->addTimer(cb, time, 0.0); 142 } 143 144 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145 { 146 Timestamp time(addTime(Timestamp::now(), delay)); 147 return runAt(time, cb); 148 } 149 150 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151 { 152 Timestamp time(addTime(Timestamp::now(), interval)); 153 return timerQueue_->addTimer(cb, time, interval); 154 } 155 156+void EventLoop::cancel(TimerId timerId) 157+{ 158+ return timerQueue_->cancel(timerId); 159+} 160+ 161 void EventLoop::updateChannel(Channel* channel) 162 { 163 assert(channel->ownerLoop() == this); 164 assertInLoopThread(); 165 poller_->updateChannel(channel); 166 } 167 168 void EventLoop::removeChannel(Channel* channel) 169 { 170 assert(channel->ownerLoop() == this); 171 assertInLoopThread(); 172 poller_->removeChannel(channel); 173 } 174 175 void EventLoop::abortNotInLoopThread() 176 { 177 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 178 << " was created in threadId_ = " << threadId_ 179 << ", current thread id = " << CurrentThread::tid(); 180 } 181 182 void EventLoop::wakeup() 183 { 184 uint64_t one = 1; 185 ssize_t n = ::write(wakeupFd_, &one, sizeof one); 186 if (n != sizeof one) 187 { 188 LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 189 } 190 } 191 192 void EventLoop::handleRead() 193 { 194 uint64_t one = 1; 195 ssize_t n = ::read(wakeupFd_, &one, sizeof one); 196 if (n != sizeof one) 197 { 198 LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 199 } 200 } 201 202 void EventLoop::doPendingFunctors() 203 { 204 std::vector<Functor> functors; 205 callingPendingFunctors_ = true; 206 207 { 208 MutexLockGuard lock(mutex_); 209 functors.swap(pendingFunctors_); 210 } 211 212 for (size_t i = 0; i < functors.size(); ++i) 213 { 214 functors[i](); 215 } 216 callingPendingFunctors_ = false; 217 } 218 219