1cc7f415cSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2cc7f415cSShuo Chen// 3cc7f415cSShuo Chen// Use of this source code is governed by a BSD-style license 4cc7f415cSShuo Chen// that can be found in the License file. 5cc7f415cSShuo Chen// 6cc7f415cSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7cc7f415cSShuo Chen 8cc7f415cSShuo Chen#include "EventLoop.h" 9cc7f415cSShuo Chen 10cc7f415cSShuo Chen#include "Channel.h" 11cc7f415cSShuo Chen#include "Poller.h" 12cc7f415cSShuo Chen#include "TimerQueue.h" 13cc7f415cSShuo Chen 14cc7f415cSShuo Chen#include "logging/Logging.h" 15cc7f415cSShuo Chen 16c461658bSShuo Chen#include <boost/bind.hpp> 17c461658bSShuo Chen 18cc7f415cSShuo Chen#include <assert.h> 19c461658bSShuo Chen#include <sys/eventfd.h> 20cc7f415cSShuo Chen 21cc7f415cSShuo Chenusing namespace muduo; 22cc7f415cSShuo Chen 23cc7f415cSShuo Chen__thread EventLoop* t_loopInThisThread = 0; 24cc7f415cSShuo Chenconst int kPollTimeMs = 10000; 25cc7f415cSShuo Chen 26c461658bSShuo Chenstatic int createEventfd() 27c461658bSShuo Chen{ 28c461658bSShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 29c461658bSShuo Chen if (evtfd < 0) 30c461658bSShuo Chen { 31c461658bSShuo Chen LOG_SYSERR << "Failed in eventfd"; 32c461658bSShuo Chen abort(); 33c461658bSShuo Chen } 34c461658bSShuo Chen return evtfd; 35c461658bSShuo Chen} 36c461658bSShuo Chen 37cc7f415cSShuo ChenEventLoop::EventLoop() 38cc7f415cSShuo Chen : looping_(false), 39cc7f415cSShuo Chen quit_(false), 40c461658bSShuo Chen callingPendingFunctors_(false), 41cc7f415cSShuo Chen threadId_(CurrentThread::tid()), 42cc7f415cSShuo Chen poller_(new Poller(this)), 43c461658bSShuo Chen timerQueue_(new TimerQueue(this)), 44c461658bSShuo Chen wakeupFd_(createEventfd()), 45c461658bSShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 46cc7f415cSShuo Chen{ 47cc7f415cSShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 48cc7f415cSShuo Chen if (t_loopInThisThread) 49cc7f415cSShuo Chen { 50cc7f415cSShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 51cc7f415cSShuo Chen << " exists in this thread " << threadId_; 52cc7f415cSShuo Chen } 53cc7f415cSShuo Chen else 54cc7f415cSShuo Chen { 55cc7f415cSShuo Chen t_loopInThisThread = this; 56cc7f415cSShuo Chen } 57c461658bSShuo Chen wakeupChannel_->setReadCallback( 58c461658bSShuo Chen boost::bind(&EventLoop::handleRead, this)); 59c461658bSShuo Chen // we are always reading the wakeupfd 60c461658bSShuo Chen wakeupChannel_->enableReading(); 61cc7f415cSShuo Chen} 62cc7f415cSShuo Chen 63cc7f415cSShuo ChenEventLoop::~EventLoop() 64cc7f415cSShuo Chen{ 65cc7f415cSShuo Chen assert(!looping_); 66c461658bSShuo Chen ::close(wakeupFd_); 67cc7f415cSShuo Chen t_loopInThisThread = NULL; 68cc7f415cSShuo Chen} 69cc7f415cSShuo Chen 70cc7f415cSShuo Chenvoid EventLoop::loop() 71cc7f415cSShuo Chen{ 72cc7f415cSShuo Chen assert(!looping_); 73cc7f415cSShuo Chen assertInLoopThread(); 74cc7f415cSShuo Chen looping_ = true; 75e44bbe65SShuo Chen quit_ = false; 76e44bbe65SShuo Chen 77cc7f415cSShuo Chen while (!quit_) 78cc7f415cSShuo Chen { 79cc7f415cSShuo Chen activeChannels_.clear(); 80cc7f415cSShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 81cc7f415cSShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 82cc7f415cSShuo Chen it != activeChannels_.end(); ++it) 83cc7f415cSShuo Chen { 84cc7f415cSShuo Chen (*it)->handleEvent(); 85cc7f415cSShuo Chen } 86c461658bSShuo Chen doPendingFunctors(); 87cc7f415cSShuo Chen } 88cc7f415cSShuo Chen 89cc7f415cSShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 90cc7f415cSShuo Chen looping_ = false; 91cc7f415cSShuo Chen} 92cc7f415cSShuo Chen 93cc7f415cSShuo Chenvoid EventLoop::quit() 94cc7f415cSShuo Chen{ 95cc7f415cSShuo Chen quit_ = true; 96c461658bSShuo Chen if (!isInLoopThread()) 97c461658bSShuo Chen { 98c461658bSShuo Chen wakeup(); 99c461658bSShuo Chen } 100c461658bSShuo Chen} 101c461658bSShuo Chen 102c461658bSShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 103c461658bSShuo Chen{ 104c461658bSShuo Chen if (isInLoopThread()) 105c461658bSShuo Chen { 106c461658bSShuo Chen cb(); 107c461658bSShuo Chen } 108c461658bSShuo Chen else 109c461658bSShuo Chen { 110c461658bSShuo Chen queueInLoop(cb); 111c461658bSShuo Chen } 112c461658bSShuo Chen} 113c461658bSShuo Chen 114c461658bSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 115c461658bSShuo Chen{ 116c461658bSShuo Chen { 117c461658bSShuo Chen MutexLockGuard lock(mutex_); 118c461658bSShuo Chen pendingFunctors_.push_back(cb); 119c461658bSShuo Chen } 120c461658bSShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 122c461658bSShuo Chen { 123c461658bSShuo Chen wakeup(); 124c461658bSShuo Chen } 125cc7f415cSShuo Chen} 126cc7f415cSShuo Chen 127cc7f415cSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 128cc7f415cSShuo Chen{ 129cc7f415cSShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 130cc7f415cSShuo Chen} 131cc7f415cSShuo Chen 132cc7f415cSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 133cc7f415cSShuo Chen{ 134cc7f415cSShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 135cc7f415cSShuo Chen return runAt(time, cb); 136cc7f415cSShuo Chen} 137cc7f415cSShuo Chen 138cc7f415cSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 139cc7f415cSShuo Chen{ 140cc7f415cSShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 141cc7f415cSShuo Chen return timerQueue_->addTimer(cb, time, interval); 142cc7f415cSShuo Chen} 143cc7f415cSShuo Chen 144cc7f415cSShuo Chenvoid EventLoop::updateChannel(Channel* channel) 145cc7f415cSShuo Chen{ 146cc7f415cSShuo Chen assert(channel->ownerLoop() == this); 147cc7f415cSShuo Chen assertInLoopThread(); 148cc7f415cSShuo Chen poller_->updateChannel(channel); 149cc7f415cSShuo Chen} 150cc7f415cSShuo Chen 151cc7f415cSShuo Chenvoid EventLoop::abortNotInLoopThread() 152cc7f415cSShuo Chen{ 153cc7f415cSShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 154cc7f415cSShuo Chen << " was created in threadId_ = " << threadId_ 155cc7f415cSShuo Chen << ", current thread id = " << CurrentThread::tid(); 156cc7f415cSShuo Chen} 157cc7f415cSShuo Chen 158c461658bSShuo Chenvoid EventLoop::wakeup() 159c461658bSShuo Chen{ 160c461658bSShuo Chen uint64_t one = 1; 161c461658bSShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 162c461658bSShuo Chen if (n != sizeof one) 163c461658bSShuo Chen { 164c461658bSShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 165c461658bSShuo Chen } 166c461658bSShuo Chen} 167c461658bSShuo Chen 168c461658bSShuo Chenvoid EventLoop::handleRead() 169c461658bSShuo Chen{ 170c461658bSShuo Chen uint64_t one = 1; 171c461658bSShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 172c461658bSShuo Chen if (n != sizeof one) 173c461658bSShuo Chen { 174c461658bSShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 175c461658bSShuo Chen } 176c461658bSShuo Chen} 177c461658bSShuo Chen 178c461658bSShuo Chenvoid EventLoop::doPendingFunctors() 179c461658bSShuo Chen{ 180c461658bSShuo Chen std::vector<Functor> functors; 181c461658bSShuo Chen callingPendingFunctors_ = true; 182c461658bSShuo Chen 183c461658bSShuo Chen { 184c461658bSShuo Chen MutexLockGuard lock(mutex_); 185c461658bSShuo Chen functors.swap(pendingFunctors_); 186c461658bSShuo Chen } 187c461658bSShuo Chen 188c461658bSShuo Chen for (size_t i = 0; i < functors.size(); ++i) 189c461658bSShuo Chen { 190c461658bSShuo Chen functors[i](); 191c461658bSShuo Chen } 192c461658bSShuo Chen callingPendingFunctors_ = false; 193c461658bSShuo Chen} 194c461658bSShuo Chen 195