1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2a1bde736SShuo Chen// 3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license 4a1bde736SShuo Chen// that can be found in the License file. 5a1bde736SShuo Chen// 6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7a1bde736SShuo Chen 8a1bde736SShuo Chen#include "EventLoop.h" 9a1bde736SShuo Chen 10a1bde736SShuo Chen#include "Channel.h" 11a1bde736SShuo Chen#include "Poller.h" 12a1bde736SShuo Chen#include "TimerQueue.h" 13a1bde736SShuo Chen 14a1bde736SShuo Chen#include "logging/Logging.h" 15a1bde736SShuo Chen 16a1bde736SShuo Chen#include <boost/bind.hpp> 17a1bde736SShuo Chen 18a1bde736SShuo Chen#include <assert.h> 19a1bde736SShuo Chen#include <signal.h> 20a1bde736SShuo Chen#include <sys/eventfd.h> 21a1bde736SShuo Chen 22a1bde736SShuo Chenusing namespace muduo; 23a1bde736SShuo Chen 24a1bde736SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 25a1bde736SShuo Chenconst int kPollTimeMs = 10000; 26a1bde736SShuo Chen 27a1bde736SShuo Chenstatic int createEventfd() 28a1bde736SShuo Chen{ 29a1bde736SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 30a1bde736SShuo Chen if (evtfd < 0) 31a1bde736SShuo Chen { 32a1bde736SShuo Chen LOG_SYSERR << "Failed in eventfd"; 33a1bde736SShuo Chen abort(); 34a1bde736SShuo Chen } 35a1bde736SShuo Chen return evtfd; 36a1bde736SShuo Chen} 37a1bde736SShuo Chen 38a1bde736SShuo Chenclass IgnoreSigPipe 39a1bde736SShuo Chen{ 40a1bde736SShuo Chen public: 41a1bde736SShuo Chen IgnoreSigPipe() 42a1bde736SShuo Chen { 43a1bde736SShuo Chen ::signal(SIGPIPE, SIG_IGN); 44a1bde736SShuo Chen } 45a1bde736SShuo Chen}; 46a1bde736SShuo Chen 47a1bde736SShuo ChenIgnoreSigPipe initObj; 48a1bde736SShuo Chen 49a1bde736SShuo ChenEventLoop::EventLoop() 50a1bde736SShuo Chen : looping_(false), 51a1bde736SShuo Chen quit_(false), 52a1bde736SShuo Chen callingPendingFunctors_(false), 53a1bde736SShuo Chen threadId_(CurrentThread::tid()), 54a1bde736SShuo Chen poller_(new Poller(this)), 55a1bde736SShuo Chen timerQueue_(new TimerQueue(this)), 56a1bde736SShuo Chen wakeupFd_(createEventfd()), 57a1bde736SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 58a1bde736SShuo Chen{ 59a1bde736SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 60a1bde736SShuo Chen if (t_loopInThisThread) 61a1bde736SShuo Chen { 62a1bde736SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 63a1bde736SShuo Chen << " exists in this thread " << threadId_; 64a1bde736SShuo Chen } 65a1bde736SShuo Chen else 66a1bde736SShuo Chen { 67a1bde736SShuo Chen t_loopInThisThread = this; 68a1bde736SShuo Chen } 69a1bde736SShuo Chen wakeupChannel_->setReadCallback( 70a1bde736SShuo Chen boost::bind(&EventLoop::handleRead, this)); 71a1bde736SShuo Chen // we are always reading the wakeupfd 72a1bde736SShuo Chen wakeupChannel_->enableReading(); 73a1bde736SShuo Chen} 74a1bde736SShuo Chen 75a1bde736SShuo ChenEventLoop::~EventLoop() 76a1bde736SShuo Chen{ 77a1bde736SShuo Chen assert(!looping_); 78a1bde736SShuo Chen ::close(wakeupFd_); 79a1bde736SShuo Chen t_loopInThisThread = NULL; 80a1bde736SShuo Chen} 81a1bde736SShuo Chen 82a1bde736SShuo Chenvoid EventLoop::loop() 83a1bde736SShuo Chen{ 84a1bde736SShuo Chen assert(!looping_); 85a1bde736SShuo Chen assertInLoopThread(); 86a1bde736SShuo Chen looping_ = true; 87a1bde736SShuo Chen quit_ = false; 88a1bde736SShuo Chen 89a1bde736SShuo Chen while (!quit_) 90a1bde736SShuo Chen { 91a1bde736SShuo Chen activeChannels_.clear(); 92a1bde736SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 93a1bde736SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 94a1bde736SShuo Chen it != activeChannels_.end(); ++it) 95a1bde736SShuo Chen { 96a1bde736SShuo Chen (*it)->handleEvent(pollReturnTime_); 97a1bde736SShuo Chen } 98a1bde736SShuo Chen doPendingFunctors(); 99a1bde736SShuo Chen } 100a1bde736SShuo Chen 101a1bde736SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 102a1bde736SShuo Chen looping_ = false; 103a1bde736SShuo Chen} 104a1bde736SShuo Chen 105a1bde736SShuo Chenvoid EventLoop::quit() 106a1bde736SShuo Chen{ 107a1bde736SShuo Chen quit_ = true; 108a1bde736SShuo Chen if (!isInLoopThread()) 109a1bde736SShuo Chen { 110a1bde736SShuo Chen wakeup(); 111a1bde736SShuo Chen } 112a1bde736SShuo Chen} 113a1bde736SShuo Chen 114a1bde736SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 115a1bde736SShuo Chen{ 116a1bde736SShuo Chen if (isInLoopThread()) 117a1bde736SShuo Chen { 118a1bde736SShuo Chen cb(); 119a1bde736SShuo Chen } 120a1bde736SShuo Chen else 121a1bde736SShuo Chen { 122a1bde736SShuo Chen queueInLoop(cb); 123a1bde736SShuo Chen } 124a1bde736SShuo Chen} 125a1bde736SShuo Chen 126a1bde736SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 127a1bde736SShuo Chen{ 128a1bde736SShuo Chen { 129a1bde736SShuo Chen MutexLockGuard lock(mutex_); 130a1bde736SShuo Chen pendingFunctors_.push_back(cb); 131a1bde736SShuo Chen } 132a1bde736SShuo Chen 133a1bde736SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 134a1bde736SShuo Chen { 135a1bde736SShuo Chen wakeup(); 136a1bde736SShuo Chen } 137a1bde736SShuo Chen} 138a1bde736SShuo Chen 139a1bde736SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 140a1bde736SShuo Chen{ 141a1bde736SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 142a1bde736SShuo Chen} 143a1bde736SShuo Chen 144a1bde736SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 145a1bde736SShuo Chen{ 146a1bde736SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 147a1bde736SShuo Chen return runAt(time, cb); 148a1bde736SShuo Chen} 149a1bde736SShuo Chen 150a1bde736SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 151a1bde736SShuo Chen{ 152a1bde736SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 153a1bde736SShuo Chen return timerQueue_->addTimer(cb, time, interval); 154a1bde736SShuo Chen} 155a1bde736SShuo Chen 156a1bde736SShuo Chenvoid EventLoop::cancel(TimerId timerId) 157a1bde736SShuo Chen{ 158a1bde736SShuo Chen return timerQueue_->cancel(timerId); 159a1bde736SShuo Chen} 160a1bde736SShuo Chen 161a1bde736SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 162a1bde736SShuo Chen{ 163a1bde736SShuo Chen assert(channel->ownerLoop() == this); 164a1bde736SShuo Chen assertInLoopThread(); 165a1bde736SShuo Chen poller_->updateChannel(channel); 166a1bde736SShuo Chen} 167a1bde736SShuo Chen 168a1bde736SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 169a1bde736SShuo Chen{ 170a1bde736SShuo Chen assert(channel->ownerLoop() == this); 171a1bde736SShuo Chen assertInLoopThread(); 172a1bde736SShuo Chen poller_->removeChannel(channel); 173a1bde736SShuo Chen} 174a1bde736SShuo Chen 175a1bde736SShuo Chenvoid EventLoop::abortNotInLoopThread() 176a1bde736SShuo Chen{ 177a1bde736SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 178a1bde736SShuo Chen << " was created in threadId_ = " << threadId_ 179a1bde736SShuo Chen << ", current thread id = " << CurrentThread::tid(); 180a1bde736SShuo Chen} 181a1bde736SShuo Chen 182a1bde736SShuo Chenvoid EventLoop::wakeup() 183a1bde736SShuo Chen{ 184a1bde736SShuo Chen uint64_t one = 1; 185a1bde736SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 186a1bde736SShuo Chen if (n != sizeof one) 187a1bde736SShuo Chen { 188a1bde736SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 189a1bde736SShuo Chen } 190a1bde736SShuo Chen} 191a1bde736SShuo Chen 192a1bde736SShuo Chenvoid EventLoop::handleRead() 193a1bde736SShuo Chen{ 194a1bde736SShuo Chen uint64_t one = 1; 195a1bde736SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 196a1bde736SShuo Chen if (n != sizeof one) 197a1bde736SShuo Chen { 198a1bde736SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 199a1bde736SShuo Chen } 200a1bde736SShuo Chen} 201a1bde736SShuo Chen 202a1bde736SShuo Chenvoid EventLoop::doPendingFunctors() 203a1bde736SShuo Chen{ 204a1bde736SShuo Chen std::vector<Functor> functors; 205a1bde736SShuo Chen callingPendingFunctors_ = true; 206a1bde736SShuo Chen 207a1bde736SShuo Chen { 208a1bde736SShuo Chen MutexLockGuard lock(mutex_); 209a1bde736SShuo Chen functors.swap(pendingFunctors_); 210a1bde736SShuo Chen } 211a1bde736SShuo Chen 212a1bde736SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 213a1bde736SShuo Chen { 214a1bde736SShuo Chen functors[i](); 215a1bde736SShuo Chen } 216a1bde736SShuo Chen callingPendingFunctors_ = false; 217a1bde736SShuo Chen} 218a1bde736SShuo Chen 219