1e254a845SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2e254a845SShuo Chen// 3e254a845SShuo Chen// Use of this source code is governed by a BSD-style license 4e254a845SShuo Chen// that can be found in the License file. 5e254a845SShuo Chen// 6e254a845SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7e254a845SShuo Chen 8e254a845SShuo Chen#include "EventLoop.h" 9e254a845SShuo Chen 10e254a845SShuo Chen#include "Channel.h" 11e254a845SShuo Chen#include "Poller.h" 12e254a845SShuo Chen#include "TimerQueue.h" 13e254a845SShuo Chen 14e254a845SShuo Chen#include "logging/Logging.h" 15e254a845SShuo Chen 16e254a845SShuo Chen#include <boost/bind.hpp> 17e254a845SShuo Chen 18e254a845SShuo Chen#include <assert.h> 19e254a845SShuo Chen#include <signal.h> 20e254a845SShuo Chen#include <sys/eventfd.h> 21e254a845SShuo Chen 22e254a845SShuo Chenusing namespace muduo; 23e254a845SShuo Chen 24e254a845SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 25e254a845SShuo Chenconst int kPollTimeMs = 10000; 26e254a845SShuo Chen 27e254a845SShuo Chenstatic int createEventfd() 28e254a845SShuo Chen{ 29e254a845SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30e254a845SShuo Chen if (evtfd < 0) 31e254a845SShuo Chen { 32e254a845SShuo Chen LOG_SYSERR << "Failed in eventfd"; 33e254a845SShuo Chen abort(); 34e254a845SShuo Chen } 35e254a845SShuo Chen return evtfd; 36e254a845SShuo Chen} 37e254a845SShuo Chen 38e254a845SShuo Chenclass IgnoreSigPipe 39e254a845SShuo Chen{ 40e254a845SShuo Chen public: 41e254a845SShuo Chen IgnoreSigPipe() 42e254a845SShuo Chen { 43e254a845SShuo Chen ::signal(SIGPIPE, SIG_IGN); 44e254a845SShuo Chen } 45e254a845SShuo Chen}; 46e254a845SShuo Chen 47e254a845SShuo ChenIgnoreSigPipe initObj; 48e254a845SShuo Chen 49e254a845SShuo ChenEventLoop::EventLoop() 50e254a845SShuo Chen : looping_(false), 51e254a845SShuo Chen quit_(false), 52e254a845SShuo Chen callingPendingFunctors_(false), 53e254a845SShuo Chen threadId_(CurrentThread::tid()), 54e254a845SShuo Chen poller_(new Poller(this)), 55e254a845SShuo Chen timerQueue_(new TimerQueue(this)), 56e254a845SShuo Chen wakeupFd_(createEventfd()), 57e254a845SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 58e254a845SShuo Chen{ 59e254a845SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60e254a845SShuo Chen if (t_loopInThisThread) 61e254a845SShuo Chen { 62e254a845SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63e254a845SShuo Chen << " exists in this thread " << threadId_; 64e254a845SShuo Chen } 65e254a845SShuo Chen else 66e254a845SShuo Chen { 67e254a845SShuo Chen t_loopInThisThread = this; 68e254a845SShuo Chen } 69e254a845SShuo Chen wakeupChannel_->setReadCallback( 70e254a845SShuo Chen boost::bind(&EventLoop::handleRead, this)); 71e254a845SShuo Chen // we are always reading the wakeupfd 72e254a845SShuo Chen wakeupChannel_->enableReading(); 73e254a845SShuo Chen} 74e254a845SShuo Chen 75e254a845SShuo ChenEventLoop::~EventLoop() 76e254a845SShuo Chen{ 77e254a845SShuo Chen assert(!looping_); 78e254a845SShuo Chen ::close(wakeupFd_); 79e254a845SShuo Chen t_loopInThisThread = NULL; 80e254a845SShuo Chen} 81e254a845SShuo Chen 82e254a845SShuo Chenvoid EventLoop::loop() 83e254a845SShuo Chen{ 84e254a845SShuo Chen assert(!looping_); 85e254a845SShuo Chen assertInLoopThread(); 86e254a845SShuo Chen looping_ = true; 87e254a845SShuo Chen quit_ = false; 88e254a845SShuo Chen 89e254a845SShuo Chen while (!quit_) 90e254a845SShuo Chen { 91e254a845SShuo Chen activeChannels_.clear(); 92e254a845SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93e254a845SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 94e254a845SShuo Chen it != activeChannels_.end(); ++it) 95e254a845SShuo Chen { 96e254a845SShuo Chen (*it)->handleEvent(pollReturnTime_); 97e254a845SShuo Chen } 98e254a845SShuo Chen doPendingFunctors(); 99e254a845SShuo Chen } 100e254a845SShuo Chen 101e254a845SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 102e254a845SShuo Chen looping_ = false; 103e254a845SShuo Chen} 104e254a845SShuo Chen 105e254a845SShuo Chenvoid EventLoop::quit() 106e254a845SShuo Chen{ 107e254a845SShuo Chen quit_ = true; 108e254a845SShuo Chen if (!isInLoopThread()) 109e254a845SShuo Chen { 110e254a845SShuo Chen wakeup(); 111e254a845SShuo Chen } 112e254a845SShuo Chen} 113e254a845SShuo Chen 114e254a845SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 115e254a845SShuo Chen{ 116e254a845SShuo Chen if (isInLoopThread()) 117e254a845SShuo Chen { 118e254a845SShuo Chen cb(); 119e254a845SShuo Chen } 120e254a845SShuo Chen else 121e254a845SShuo Chen { 122e254a845SShuo Chen queueInLoop(cb); 123e254a845SShuo Chen } 124e254a845SShuo Chen} 125e254a845SShuo Chen 126e254a845SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 127e254a845SShuo Chen{ 128e254a845SShuo Chen { 129e254a845SShuo Chen MutexLockGuard lock(mutex_); 130e254a845SShuo Chen pendingFunctors_.push_back(cb); 131e254a845SShuo Chen } 132e254a845SShuo Chen 1330f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 134e254a845SShuo Chen { 135e254a845SShuo Chen wakeup(); 136e254a845SShuo Chen } 137e254a845SShuo Chen} 138e254a845SShuo Chen 139e254a845SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140e254a845SShuo Chen{ 141e254a845SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 142e254a845SShuo Chen} 143e254a845SShuo Chen 144e254a845SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145e254a845SShuo Chen{ 146e254a845SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 147e254a845SShuo Chen return runAt(time, cb); 148e254a845SShuo Chen} 149e254a845SShuo Chen 150e254a845SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151e254a845SShuo Chen{ 152e254a845SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 153e254a845SShuo Chen return timerQueue_->addTimer(cb, time, interval); 154e254a845SShuo Chen} 155e254a845SShuo Chen 156e254a845SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 157e254a845SShuo Chen{ 158e254a845SShuo Chen assert(channel->ownerLoop() == this); 159e254a845SShuo Chen assertInLoopThread(); 160e254a845SShuo Chen poller_->updateChannel(channel); 161e254a845SShuo Chen} 162e254a845SShuo Chen 163e254a845SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 164e254a845SShuo Chen{ 165e254a845SShuo Chen assert(channel->ownerLoop() == this); 166e254a845SShuo Chen assertInLoopThread(); 167e254a845SShuo Chen poller_->removeChannel(channel); 168e254a845SShuo Chen} 169e254a845SShuo Chen 170e254a845SShuo Chenvoid EventLoop::abortNotInLoopThread() 171e254a845SShuo Chen{ 172e254a845SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 173e254a845SShuo Chen << " was created in threadId_ = " << threadId_ 174e254a845SShuo Chen << ", current thread id = " << CurrentThread::tid(); 175e254a845SShuo Chen} 176e254a845SShuo Chen 177e254a845SShuo Chenvoid EventLoop::wakeup() 178e254a845SShuo Chen{ 179e254a845SShuo Chen uint64_t one = 1; 180e254a845SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 181e254a845SShuo Chen if (n != sizeof one) 182e254a845SShuo Chen { 183e254a845SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 184e254a845SShuo Chen } 185e254a845SShuo Chen} 186e254a845SShuo Chen 187e254a845SShuo Chenvoid EventLoop::handleRead() 188e254a845SShuo Chen{ 189e254a845SShuo Chen uint64_t one = 1; 190e254a845SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 191e254a845SShuo Chen if (n != sizeof one) 192e254a845SShuo Chen { 193e254a845SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 194e254a845SShuo Chen } 195e254a845SShuo Chen} 196e254a845SShuo Chen 197e254a845SShuo Chenvoid EventLoop::doPendingFunctors() 198e254a845SShuo Chen{ 199e254a845SShuo Chen std::vector<Functor> functors; 200e254a845SShuo Chen callingPendingFunctors_ = true; 201e254a845SShuo Chen 202e254a845SShuo Chen { 203e254a845SShuo Chen MutexLockGuard lock(mutex_); 204e254a845SShuo Chen functors.swap(pendingFunctors_); 205e254a845SShuo Chen } 206e254a845SShuo Chen 207e254a845SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 208e254a845SShuo Chen { 209e254a845SShuo Chen functors[i](); 210e254a845SShuo Chen } 211e254a845SShuo Chen callingPendingFunctors_ = false; 212e254a845SShuo Chen} 213e254a845SShuo Chen 214