12745a763SShuo Chen// excerpts from http://code.google.com/p/muduo/ 22745a763SShuo Chen// 32745a763SShuo Chen// Use of this source code is governed by a BSD-style license 42745a763SShuo Chen// that can be found in the License file. 52745a763SShuo Chen// 62745a763SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 72745a763SShuo Chen 82745a763SShuo Chen#include "EventLoop.h" 92745a763SShuo Chen 102745a763SShuo Chen#include "Channel.h" 112745a763SShuo Chen#include "Poller.h" 122745a763SShuo Chen#include "TimerQueue.h" 132745a763SShuo Chen 142745a763SShuo Chen#include "logging/Logging.h" 152745a763SShuo Chen 162745a763SShuo Chen#include <boost/bind.hpp> 172745a763SShuo Chen 182745a763SShuo Chen#include <assert.h> 192745a763SShuo Chen#include <sys/eventfd.h> 202745a763SShuo Chen 212745a763SShuo Chenusing namespace muduo; 222745a763SShuo Chen 232745a763SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 242745a763SShuo Chenconst int kPollTimeMs = 10000; 252745a763SShuo Chen 262745a763SShuo Chenstatic int createEventfd() 272745a763SShuo Chen{ 282745a763SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 292745a763SShuo Chen if (evtfd < 0) 302745a763SShuo Chen { 312745a763SShuo Chen LOG_SYSERR << "Failed in eventfd"; 322745a763SShuo Chen abort(); 332745a763SShuo Chen } 342745a763SShuo Chen return evtfd; 352745a763SShuo Chen} 362745a763SShuo Chen 372745a763SShuo ChenEventLoop::EventLoop() 382745a763SShuo Chen : looping_(false), 392745a763SShuo Chen quit_(false), 402745a763SShuo Chen callingPendingFunctors_(false), 412745a763SShuo Chen threadId_(CurrentThread::tid()), 422745a763SShuo Chen poller_(new Poller(this)), 432745a763SShuo Chen timerQueue_(new TimerQueue(this)), 442745a763SShuo Chen wakeupFd_(createEventfd()), 452745a763SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 462745a763SShuo Chen{ 472745a763SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 482745a763SShuo Chen if (t_loopInThisThread) 492745a763SShuo Chen { 502745a763SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 512745a763SShuo Chen << " exists in this thread " << threadId_; 522745a763SShuo Chen } 532745a763SShuo Chen else 542745a763SShuo Chen { 552745a763SShuo Chen t_loopInThisThread = this; 562745a763SShuo Chen } 572745a763SShuo Chen wakeupChannel_->setReadCallback( 582745a763SShuo Chen boost::bind(&EventLoop::handleRead, this)); 592745a763SShuo Chen // we are always reading the wakeupfd 602745a763SShuo Chen wakeupChannel_->enableReading(); 612745a763SShuo Chen} 622745a763SShuo Chen 632745a763SShuo ChenEventLoop::~EventLoop() 642745a763SShuo Chen{ 652745a763SShuo Chen assert(!looping_); 662745a763SShuo Chen ::close(wakeupFd_); 672745a763SShuo Chen t_loopInThisThread = NULL; 682745a763SShuo Chen} 692745a763SShuo Chen 702745a763SShuo Chenvoid EventLoop::loop() 712745a763SShuo Chen{ 722745a763SShuo Chen assert(!looping_); 732745a763SShuo Chen assertInLoopThread(); 742745a763SShuo Chen looping_ = true; 752745a763SShuo Chen quit_ = false; 762745a763SShuo Chen 772745a763SShuo Chen while (!quit_) 782745a763SShuo Chen { 792745a763SShuo Chen activeChannels_.clear(); 802745a763SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 812745a763SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 822745a763SShuo Chen it != activeChannels_.end(); ++it) 832745a763SShuo Chen { 842745a763SShuo Chen (*it)->handleEvent(); 852745a763SShuo Chen } 862745a763SShuo Chen doPendingFunctors(); 872745a763SShuo Chen } 882745a763SShuo Chen 892745a763SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 902745a763SShuo Chen looping_ = false; 912745a763SShuo Chen} 922745a763SShuo Chen 932745a763SShuo Chenvoid EventLoop::quit() 942745a763SShuo Chen{ 952745a763SShuo Chen quit_ = true; 962745a763SShuo Chen if (!isInLoopThread()) 972745a763SShuo Chen { 982745a763SShuo Chen wakeup(); 992745a763SShuo Chen } 1002745a763SShuo Chen} 1012745a763SShuo Chen 1022745a763SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 1032745a763SShuo Chen{ 1042745a763SShuo Chen if (isInLoopThread()) 1052745a763SShuo Chen { 1062745a763SShuo Chen cb(); 1072745a763SShuo Chen } 1082745a763SShuo Chen else 1092745a763SShuo Chen { 1102745a763SShuo Chen queueInLoop(cb); 1112745a763SShuo Chen } 1122745a763SShuo Chen} 1132745a763SShuo Chen 1142745a763SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 1152745a763SShuo Chen{ 1162745a763SShuo Chen { 1172745a763SShuo Chen MutexLockGuard lock(mutex_); 1182745a763SShuo Chen pendingFunctors_.push_back(cb); 1192745a763SShuo Chen } 1202745a763SShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 1222745a763SShuo Chen { 1232745a763SShuo Chen wakeup(); 1242745a763SShuo Chen } 1252745a763SShuo Chen} 1262745a763SShuo Chen 1272745a763SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1282745a763SShuo Chen{ 1292745a763SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1302745a763SShuo Chen} 1312745a763SShuo Chen 1322745a763SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1332745a763SShuo Chen{ 1342745a763SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1352745a763SShuo Chen return runAt(time, cb); 1362745a763SShuo Chen} 1372745a763SShuo Chen 1382745a763SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1392745a763SShuo Chen{ 1402745a763SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1412745a763SShuo Chen return timerQueue_->addTimer(cb, time, interval); 1422745a763SShuo Chen} 1432745a763SShuo Chen 1442745a763SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 1452745a763SShuo Chen{ 1462745a763SShuo Chen assert(channel->ownerLoop() == this); 1472745a763SShuo Chen assertInLoopThread(); 1482745a763SShuo Chen poller_->updateChannel(channel); 1492745a763SShuo Chen} 1502745a763SShuo Chen 1512745a763SShuo Chenvoid EventLoop::abortNotInLoopThread() 1522745a763SShuo Chen{ 1532745a763SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1542745a763SShuo Chen << " was created in threadId_ = " << threadId_ 1552745a763SShuo Chen << ", current thread id = " << CurrentThread::tid(); 1562745a763SShuo Chen} 1572745a763SShuo Chen 1582745a763SShuo Chenvoid EventLoop::wakeup() 1592745a763SShuo Chen{ 1602745a763SShuo Chen uint64_t one = 1; 1612745a763SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1622745a763SShuo Chen if (n != sizeof one) 1632745a763SShuo Chen { 1642745a763SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1652745a763SShuo Chen } 1662745a763SShuo Chen} 1672745a763SShuo Chen 1682745a763SShuo Chenvoid EventLoop::handleRead() 1692745a763SShuo Chen{ 1702745a763SShuo Chen uint64_t one = 1; 1712745a763SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1722745a763SShuo Chen if (n != sizeof one) 1732745a763SShuo Chen { 1742745a763SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1752745a763SShuo Chen } 1762745a763SShuo Chen} 1772745a763SShuo Chen 1782745a763SShuo Chenvoid EventLoop::doPendingFunctors() 1792745a763SShuo Chen{ 1802745a763SShuo Chen std::vector<Functor> functors; 1812745a763SShuo Chen callingPendingFunctors_ = true; 1822745a763SShuo Chen 1832745a763SShuo Chen { 1842745a763SShuo Chen MutexLockGuard lock(mutex_); 1852745a763SShuo Chen functors.swap(pendingFunctors_); 1862745a763SShuo Chen } 1872745a763SShuo Chen 1882745a763SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 1892745a763SShuo Chen { 1902745a763SShuo Chen functors[i](); 1912745a763SShuo Chen } 1922745a763SShuo Chen callingPendingFunctors_ = false; 1932745a763SShuo Chen} 1942745a763SShuo Chen 195