EventLoop.cc revision a4e9058b
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2b37003a7SShuo Chen// 3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license 4b37003a7SShuo Chen// that can be found in the License file. 5b37003a7SShuo Chen// 6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7b37003a7SShuo Chen 8b37003a7SShuo Chen#include "EventLoop.h" 9b37003a7SShuo Chen 10b37003a7SShuo Chen#include "Channel.h" 11b37003a7SShuo Chen#include "Poller.h" 12b37003a7SShuo Chen#include "TimerQueue.h" 13b37003a7SShuo Chen 14b37003a7SShuo Chen#include "logging/Logging.h" 15b37003a7SShuo Chen 16b37003a7SShuo Chen#include <boost/bind.hpp> 17b37003a7SShuo Chen 18b37003a7SShuo Chen#include <assert.h> 19a4e9058bSShuo Chen#include <signal.h> 20b37003a7SShuo Chen#include <sys/eventfd.h> 21b37003a7SShuo Chen 22b37003a7SShuo Chenusing namespace muduo; 23b37003a7SShuo Chen 24b37003a7SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 25b37003a7SShuo Chenconst int kPollTimeMs = 10000; 26b37003a7SShuo Chen 27b37003a7SShuo Chenstatic int createEventfd() 28b37003a7SShuo Chen{ 29b37003a7SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30b37003a7SShuo Chen if (evtfd < 0) 31b37003a7SShuo Chen { 32b37003a7SShuo Chen LOG_SYSERR << "Failed in eventfd"; 33b37003a7SShuo Chen abort(); 34b37003a7SShuo Chen } 35b37003a7SShuo Chen return evtfd; 36b37003a7SShuo Chen} 37b37003a7SShuo Chen 38a4e9058bSShuo Chenclass IgnoreSigPipe 39a4e9058bSShuo Chen{ 40a4e9058bSShuo Chen public: 41a4e9058bSShuo Chen IgnoreSigPipe() 42a4e9058bSShuo Chen { 43a4e9058bSShuo Chen ::signal(SIGPIPE, SIG_IGN); 44a4e9058bSShuo Chen } 45a4e9058bSShuo Chen}; 46a4e9058bSShuo Chen 47a4e9058bSShuo ChenIgnoreSigPipe initObj; 48a4e9058bSShuo Chen 49b37003a7SShuo ChenEventLoop::EventLoop() 50b37003a7SShuo Chen : looping_(false), 51b37003a7SShuo Chen quit_(false), 52b37003a7SShuo Chen callingPendingFunctors_(false), 53b37003a7SShuo Chen threadId_(CurrentThread::tid()), 54b37003a7SShuo Chen poller_(new Poller(this)), 55b37003a7SShuo Chen timerQueue_(new TimerQueue(this)), 56b37003a7SShuo Chen wakeupFd_(createEventfd()), 57b37003a7SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 58b37003a7SShuo Chen{ 59b37003a7SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60b37003a7SShuo Chen if (t_loopInThisThread) 61b37003a7SShuo Chen { 62b37003a7SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63b37003a7SShuo Chen << " exists in this thread " << threadId_; 64b37003a7SShuo Chen } 65b37003a7SShuo Chen else 66b37003a7SShuo Chen { 67b37003a7SShuo Chen t_loopInThisThread = this; 68b37003a7SShuo Chen } 69b37003a7SShuo Chen wakeupChannel_->setReadCallback( 70b37003a7SShuo Chen boost::bind(&EventLoop::handleRead, this)); 71b37003a7SShuo Chen // we are always reading the wakeupfd 72b37003a7SShuo Chen wakeupChannel_->enableReading(); 73b37003a7SShuo Chen} 74b37003a7SShuo Chen 75b37003a7SShuo ChenEventLoop::~EventLoop() 76b37003a7SShuo Chen{ 77b37003a7SShuo Chen assert(!looping_); 78b37003a7SShuo Chen ::close(wakeupFd_); 79b37003a7SShuo Chen t_loopInThisThread = NULL; 80b37003a7SShuo Chen} 81b37003a7SShuo Chen 82b37003a7SShuo Chenvoid EventLoop::loop() 83b37003a7SShuo Chen{ 84b37003a7SShuo Chen assert(!looping_); 85b37003a7SShuo Chen assertInLoopThread(); 86b37003a7SShuo Chen looping_ = true; 87b37003a7SShuo Chen quit_ = false; 88b37003a7SShuo Chen 89b37003a7SShuo Chen while (!quit_) 90b37003a7SShuo Chen { 91b37003a7SShuo Chen activeChannels_.clear(); 92b37003a7SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93b37003a7SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 94b37003a7SShuo Chen it != activeChannels_.end(); ++it) 95b37003a7SShuo Chen { 96b37003a7SShuo Chen (*it)->handleEvent(pollReturnTime_); 97b37003a7SShuo Chen } 98b37003a7SShuo Chen doPendingFunctors(); 99b37003a7SShuo Chen } 100b37003a7SShuo Chen 101b37003a7SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 102b37003a7SShuo Chen looping_ = false; 103b37003a7SShuo Chen} 104b37003a7SShuo Chen 105b37003a7SShuo Chenvoid EventLoop::quit() 106b37003a7SShuo Chen{ 107b37003a7SShuo Chen quit_ = true; 108b37003a7SShuo Chen if (!isInLoopThread()) 109b37003a7SShuo Chen { 110b37003a7SShuo Chen wakeup(); 111b37003a7SShuo Chen } 112b37003a7SShuo Chen} 113b37003a7SShuo Chen 114b37003a7SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 115b37003a7SShuo Chen{ 116b37003a7SShuo Chen if (isInLoopThread()) 117b37003a7SShuo Chen { 118b37003a7SShuo Chen cb(); 119b37003a7SShuo Chen } 120b37003a7SShuo Chen else 121b37003a7SShuo Chen { 122b37003a7SShuo Chen queueInLoop(cb); 123b37003a7SShuo Chen wakeup(); 124b37003a7SShuo Chen } 125b37003a7SShuo Chen} 126b37003a7SShuo Chen 127b37003a7SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 128b37003a7SShuo Chen{ 129b37003a7SShuo Chen { 130b37003a7SShuo Chen MutexLockGuard lock(mutex_); 131b37003a7SShuo Chen pendingFunctors_.push_back(cb); 132b37003a7SShuo Chen } 133b37003a7SShuo Chen 134b37003a7SShuo Chen if (isInLoopThread() && callingPendingFunctors_) 135b37003a7SShuo Chen { 136b37003a7SShuo Chen wakeup(); 137b37003a7SShuo Chen } 138b37003a7SShuo Chen} 139b37003a7SShuo Chen 140b37003a7SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 141b37003a7SShuo Chen{ 142b37003a7SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 143b37003a7SShuo Chen} 144b37003a7SShuo Chen 145b37003a7SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 146b37003a7SShuo Chen{ 147b37003a7SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 148b37003a7SShuo Chen return runAt(time, cb); 149b37003a7SShuo Chen} 150b37003a7SShuo Chen 151b37003a7SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 152b37003a7SShuo Chen{ 153b37003a7SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 154b37003a7SShuo Chen return timerQueue_->addTimer(cb, time, interval); 155b37003a7SShuo Chen} 156b37003a7SShuo Chen 157b37003a7SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 158b37003a7SShuo Chen{ 159b37003a7SShuo Chen assert(channel->ownerLoop() == this); 160b37003a7SShuo Chen assertInLoopThread(); 161b37003a7SShuo Chen poller_->updateChannel(channel); 162b37003a7SShuo Chen} 163b37003a7SShuo Chen 164b37003a7SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 165b37003a7SShuo Chen{ 166b37003a7SShuo Chen assert(channel->ownerLoop() == this); 167b37003a7SShuo Chen assertInLoopThread(); 168b37003a7SShuo Chen poller_->removeChannel(channel); 169b37003a7SShuo Chen} 170b37003a7SShuo Chen 171b37003a7SShuo Chenvoid EventLoop::abortNotInLoopThread() 172b37003a7SShuo Chen{ 173b37003a7SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 174b37003a7SShuo Chen << " was created in threadId_ = " << threadId_ 175b37003a7SShuo Chen << ", current thread id = " << CurrentThread::tid(); 176b37003a7SShuo Chen} 177b37003a7SShuo Chen 178b37003a7SShuo Chenvoid EventLoop::wakeup() 179b37003a7SShuo Chen{ 180b37003a7SShuo Chen uint64_t one = 1; 181b37003a7SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 182b37003a7SShuo Chen if (n != sizeof one) 183b37003a7SShuo Chen { 184b37003a7SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 185b37003a7SShuo Chen } 186b37003a7SShuo Chen} 187b37003a7SShuo Chen 188b37003a7SShuo Chenvoid EventLoop::handleRead() 189b37003a7SShuo Chen{ 190b37003a7SShuo Chen uint64_t one = 1; 191b37003a7SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 192b37003a7SShuo Chen if (n != sizeof one) 193b37003a7SShuo Chen { 194b37003a7SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 195b37003a7SShuo Chen } 196b37003a7SShuo Chen} 197b37003a7SShuo Chen 198b37003a7SShuo Chenvoid EventLoop::doPendingFunctors() 199b37003a7SShuo Chen{ 200b37003a7SShuo Chen std::vector<Functor> functors; 201b37003a7SShuo Chen callingPendingFunctors_ = true; 202b37003a7SShuo Chen 203b37003a7SShuo Chen { 204b37003a7SShuo Chen MutexLockGuard lock(mutex_); 205b37003a7SShuo Chen functors.swap(pendingFunctors_); 206b37003a7SShuo Chen } 207b37003a7SShuo Chen 208b37003a7SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 209b37003a7SShuo Chen { 210b37003a7SShuo Chen functors[i](); 211b37003a7SShuo Chen } 212b37003a7SShuo Chen callingPendingFunctors_ = false; 213b37003a7SShuo Chen} 214b37003a7SShuo Chen 215