1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "EventLoop.h" 9 10 #include "Channel.h" 11 #include "Poller.h" 12 #include "TimerQueue.h" 13 14 #include "logging/Logging.h" 15 16+#include <boost/bind.hpp> 17+ 18 #include <assert.h> 19+#include <sys/eventfd.h> 20 21 using namespace muduo; 22 23 __thread EventLoop* t_loopInThisThread = 0; 24 const int kPollTimeMs = 10000; 25 26+static int createEventfd() 27+{ 28+ int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29+ if (evtfd < 0) 30+ { 31+ LOG_SYSERR << "Failed in eventfd"; 32+ abort(); 33+ } 34+ return evtfd; 35+} 36+ 37 EventLoop::EventLoop() 38 : looping_(false), 39 quit_(false), 40+ callingPendingFunctors_(false), 41 threadId_(CurrentThread::tid()), 42 poller_(new Poller(this)), 43 timerQueue_(new TimerQueue(this)), 44+ wakeupFd_(createEventfd()), 45+ wakeupChannel_(new Channel(this, wakeupFd_)) 46 { 47 LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48 if (t_loopInThisThread) 49 { 50 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51 << " exists in this thread " << threadId_; 52 } 53 else 54 { 55 t_loopInThisThread = this; 56 } 57+ wakeupChannel_->setReadCallback( 58+ boost::bind(&EventLoop::handleRead, this)); 59+ // we are always reading the wakeupfd 60+ wakeupChannel_->enableReading(); 61 } 62 63 EventLoop::~EventLoop() 64 { 65 assert(!looping_); 66+ ::close(wakeupFd_); 67 t_loopInThisThread = NULL; 68 } 69 70 void EventLoop::loop() 71 { 72 assert(!looping_); 73 assertInLoopThread(); 74 looping_ = true; 75 quit_ = false; 76 77 while (!quit_) 78 { 79 activeChannels_.clear(); 80 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81 for (ChannelList::iterator it = activeChannels_.begin(); 82 it != activeChannels_.end(); ++it) 83 { 84 (*it)->handleEvent(); 85 } 86+ doPendingFunctors(); 87 } 88 89 LOG_TRACE << "EventLoop " << this << " stop looping"; 90 looping_ = false; 91 } 92 93 void EventLoop::quit() 94 { 95 quit_ = true; 96+ if (!isInLoopThread()) 97+ { 98+ wakeup(); 99+ } 100 } 101 102+void EventLoop::runInLoop(const Functor& cb) 103+{ 104+ if (isInLoopThread()) 105+ { 106+ cb(); 107+ } 108+ else 109+ { 110+ queueInLoop(cb); 111+ } 112+} 113+ 114+void EventLoop::queueInLoop(const Functor& cb) 115+{ 116+ { 117+ MutexLockGuard lock(mutex_); 118+ pendingFunctors_.push_back(cb); 119+ } 120+ 121+ if (!isInLoopThread() || callingPendingFunctors_) 122+ { 123+ wakeup(); 124+ } 125+} 126 127 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 128 { 129 return timerQueue_->addTimer(cb, time, 0.0); 130 } 131 132 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 133 { 134 Timestamp time(addTime(Timestamp::now(), delay)); 135 return runAt(time, cb); 136 } 137 138 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 139 { 140 Timestamp time(addTime(Timestamp::now(), interval)); 141 return timerQueue_->addTimer(cb, time, interval); 142 } 143 144 void EventLoop::updateChannel(Channel* channel) 145 { 146 assert(channel->ownerLoop() == this); 147 assertInLoopThread(); 148 poller_->updateChannel(channel); 149 } 150 151 void EventLoop::abortNotInLoopThread() 152 { 153 LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 154 << " was created in threadId_ = " << threadId_ 155 << ", current thread id = " << CurrentThread::tid(); 156 } 157 158+void EventLoop::wakeup() 159+{ 160+ uint64_t one = 1; 161+ ssize_t n = ::write(wakeupFd_, &one, sizeof one); 162+ if (n != sizeof one) 163+ { 164+ LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 165+ } 166+} 167+ 168+void EventLoop::handleRead() 169+{ 170+ uint64_t one = 1; 171+ ssize_t n = ::read(wakeupFd_, &one, sizeof one); 172+ if (n != sizeof one) 173+ { 174+ LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 175+ } 176+} 177+ 178+void EventLoop::doPendingFunctors() 179+{ 180+ std::vector<Functor> functors; 181+ callingPendingFunctors_ = true; 182+ 183+ { 184+ MutexLockGuard lock(mutex_); 185+ functors.swap(pendingFunctors_); 186+ } 187+ 188+ for (size_t i = 0; i < functors.size(); ++i) 189+ { 190+ functors[i](); 191+ } 192+ callingPendingFunctors_ = false; 193+} 194+ 195