EventLoop.cc revision b37003a7
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2b37003a7SShuo Chen// 3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license 4b37003a7SShuo Chen// that can be found in the License file. 5b37003a7SShuo Chen// 6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7b37003a7SShuo Chen 8b37003a7SShuo Chen#include "EventLoop.h" 9b37003a7SShuo Chen 10b37003a7SShuo Chen#include "Channel.h" 11b37003a7SShuo Chen#include "Poller.h" 12b37003a7SShuo Chen#include "TimerQueue.h" 13b37003a7SShuo Chen 14b37003a7SShuo Chen#include "logging/Logging.h" 15b37003a7SShuo Chen 16b37003a7SShuo Chen#include <boost/bind.hpp> 17b37003a7SShuo Chen 18b37003a7SShuo Chen#include <assert.h> 19b37003a7SShuo Chen#include <sys/eventfd.h> 20b37003a7SShuo Chen 21b37003a7SShuo Chenusing namespace muduo; 22b37003a7SShuo Chen 23b37003a7SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 24b37003a7SShuo Chenconst int kPollTimeMs = 10000; 25b37003a7SShuo Chen 26b37003a7SShuo Chenstatic int createEventfd() 27b37003a7SShuo Chen{ 28b37003a7SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29b37003a7SShuo Chen if (evtfd < 0) 30b37003a7SShuo Chen { 31b37003a7SShuo Chen LOG_SYSERR << "Failed in eventfd"; 32b37003a7SShuo Chen abort(); 33b37003a7SShuo Chen } 34b37003a7SShuo Chen return evtfd; 35b37003a7SShuo Chen} 36b37003a7SShuo Chen 37b37003a7SShuo ChenEventLoop::EventLoop() 38b37003a7SShuo Chen : looping_(false), 39b37003a7SShuo Chen quit_(false), 40b37003a7SShuo Chen callingPendingFunctors_(false), 41b37003a7SShuo Chen threadId_(CurrentThread::tid()), 42b37003a7SShuo Chen poller_(new Poller(this)), 43b37003a7SShuo Chen timerQueue_(new TimerQueue(this)), 44b37003a7SShuo Chen wakeupFd_(createEventfd()), 45b37003a7SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 46b37003a7SShuo Chen{ 47b37003a7SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48b37003a7SShuo Chen if (t_loopInThisThread) 49b37003a7SShuo Chen { 50b37003a7SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51b37003a7SShuo Chen << " exists in this thread " << threadId_; 52b37003a7SShuo Chen } 53b37003a7SShuo Chen else 54b37003a7SShuo Chen { 55b37003a7SShuo Chen t_loopInThisThread = this; 56b37003a7SShuo Chen } 57b37003a7SShuo Chen wakeupChannel_->setReadCallback( 58b37003a7SShuo Chen boost::bind(&EventLoop::handleRead, this)); 59b37003a7SShuo Chen // we are always reading the wakeupfd 60b37003a7SShuo Chen wakeupChannel_->enableReading(); 61b37003a7SShuo Chen} 62b37003a7SShuo Chen 63b37003a7SShuo ChenEventLoop::~EventLoop() 64b37003a7SShuo Chen{ 65b37003a7SShuo Chen assert(!looping_); 66b37003a7SShuo Chen ::close(wakeupFd_); 67b37003a7SShuo Chen t_loopInThisThread = NULL; 68b37003a7SShuo Chen} 69b37003a7SShuo Chen 70b37003a7SShuo Chenvoid EventLoop::loop() 71b37003a7SShuo Chen{ 72b37003a7SShuo Chen assert(!looping_); 73b37003a7SShuo Chen assertInLoopThread(); 74b37003a7SShuo Chen looping_ = true; 75b37003a7SShuo Chen quit_ = false; 76b37003a7SShuo Chen 77b37003a7SShuo Chen while (!quit_) 78b37003a7SShuo Chen { 79b37003a7SShuo Chen activeChannels_.clear(); 80b37003a7SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81b37003a7SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 82b37003a7SShuo Chen it != activeChannels_.end(); ++it) 83b37003a7SShuo Chen { 84b37003a7SShuo Chen (*it)->handleEvent(pollReturnTime_); 85b37003a7SShuo Chen } 86b37003a7SShuo Chen doPendingFunctors(); 87b37003a7SShuo Chen } 88b37003a7SShuo Chen 89b37003a7SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 90b37003a7SShuo Chen looping_ = false; 91b37003a7SShuo Chen} 92b37003a7SShuo Chen 93b37003a7SShuo Chenvoid EventLoop::quit() 94b37003a7SShuo Chen{ 95b37003a7SShuo Chen quit_ = true; 96b37003a7SShuo Chen if (!isInLoopThread()) 97b37003a7SShuo Chen { 98b37003a7SShuo Chen wakeup(); 99b37003a7SShuo Chen } 100b37003a7SShuo Chen} 101b37003a7SShuo Chen 102b37003a7SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 103b37003a7SShuo Chen{ 104b37003a7SShuo Chen if (isInLoopThread()) 105b37003a7SShuo Chen { 106b37003a7SShuo Chen cb(); 107b37003a7SShuo Chen } 108b37003a7SShuo Chen else 109b37003a7SShuo Chen { 110b37003a7SShuo Chen queueInLoop(cb); 111b37003a7SShuo Chen wakeup(); 112b37003a7SShuo Chen } 113b37003a7SShuo Chen} 114b37003a7SShuo Chen 115b37003a7SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 116b37003a7SShuo Chen{ 117b37003a7SShuo Chen { 118b37003a7SShuo Chen MutexLockGuard lock(mutex_); 119b37003a7SShuo Chen pendingFunctors_.push_back(cb); 120b37003a7SShuo Chen } 121b37003a7SShuo Chen 122b37003a7SShuo Chen if (isInLoopThread() && callingPendingFunctors_) 123b37003a7SShuo Chen { 124b37003a7SShuo Chen wakeup(); 125b37003a7SShuo Chen } 126b37003a7SShuo Chen} 127b37003a7SShuo Chen 128b37003a7SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 129b37003a7SShuo Chen{ 130b37003a7SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 131b37003a7SShuo Chen} 132b37003a7SShuo Chen 133b37003a7SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 134b37003a7SShuo Chen{ 135b37003a7SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 136b37003a7SShuo Chen return runAt(time, cb); 137b37003a7SShuo Chen} 138b37003a7SShuo Chen 139b37003a7SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 140b37003a7SShuo Chen{ 141b37003a7SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 142b37003a7SShuo Chen return timerQueue_->addTimer(cb, time, interval); 143b37003a7SShuo Chen} 144b37003a7SShuo Chen 145b37003a7SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 146b37003a7SShuo Chen{ 147b37003a7SShuo Chen assert(channel->ownerLoop() == this); 148b37003a7SShuo Chen assertInLoopThread(); 149b37003a7SShuo Chen poller_->updateChannel(channel); 150b37003a7SShuo Chen} 151b37003a7SShuo Chen 152b37003a7SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 153b37003a7SShuo Chen{ 154b37003a7SShuo Chen assert(channel->ownerLoop() == this); 155b37003a7SShuo Chen assertInLoopThread(); 156b37003a7SShuo Chen poller_->removeChannel(channel); 157b37003a7SShuo Chen} 158b37003a7SShuo Chen 159b37003a7SShuo Chenvoid EventLoop::abortNotInLoopThread() 160b37003a7SShuo Chen{ 161b37003a7SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 162b37003a7SShuo Chen << " was created in threadId_ = " << threadId_ 163b37003a7SShuo Chen << ", current thread id = " << CurrentThread::tid(); 164b37003a7SShuo Chen} 165b37003a7SShuo Chen 166b37003a7SShuo Chenvoid EventLoop::wakeup() 167b37003a7SShuo Chen{ 168b37003a7SShuo Chen uint64_t one = 1; 169b37003a7SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 170b37003a7SShuo Chen if (n != sizeof one) 171b37003a7SShuo Chen { 172b37003a7SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 173b37003a7SShuo Chen } 174b37003a7SShuo Chen} 175b37003a7SShuo Chen 176b37003a7SShuo Chenvoid EventLoop::handleRead() 177b37003a7SShuo Chen{ 178b37003a7SShuo Chen uint64_t one = 1; 179b37003a7SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 180b37003a7SShuo Chen if (n != sizeof one) 181b37003a7SShuo Chen { 182b37003a7SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 183b37003a7SShuo Chen } 184b37003a7SShuo Chen} 185b37003a7SShuo Chen 186b37003a7SShuo Chenvoid EventLoop::doPendingFunctors() 187b37003a7SShuo Chen{ 188b37003a7SShuo Chen std::vector<Functor> functors; 189b37003a7SShuo Chen callingPendingFunctors_ = true; 190b37003a7SShuo Chen 191b37003a7SShuo Chen { 192b37003a7SShuo Chen MutexLockGuard lock(mutex_); 193b37003a7SShuo Chen functors.swap(pendingFunctors_); 194b37003a7SShuo Chen } 195b37003a7SShuo Chen 196b37003a7SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 197b37003a7SShuo Chen { 198b37003a7SShuo Chen functors[i](); 199b37003a7SShuo Chen } 200b37003a7SShuo Chen callingPendingFunctors_ = false; 201b37003a7SShuo Chen} 202b37003a7SShuo Chen 203