EventLoop.cc revision 2a18e699
12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/ 22a18e699SShuo Chen// 32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license 42a18e699SShuo Chen// that can be found in the License file. 52a18e699SShuo Chen// 62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 72a18e699SShuo Chen 82a18e699SShuo Chen#include "EventLoop.h" 92a18e699SShuo Chen 102a18e699SShuo Chen#include "Channel.h" 112a18e699SShuo Chen#include "Poller.h" 122a18e699SShuo Chen#include "TimerQueue.h" 132a18e699SShuo Chen 142a18e699SShuo Chen#include "logging/Logging.h" 152a18e699SShuo Chen 162a18e699SShuo Chen#include <boost/bind.hpp> 172a18e699SShuo Chen 182a18e699SShuo Chen#include <assert.h> 192a18e699SShuo Chen#include <sys/eventfd.h> 202a18e699SShuo Chen 212a18e699SShuo Chenusing namespace muduo; 222a18e699SShuo Chen 232a18e699SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 242a18e699SShuo Chenconst int kPollTimeMs = 10000; 252a18e699SShuo Chen 262a18e699SShuo Chenstatic int createEventfd() 272a18e699SShuo Chen{ 282a18e699SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 292a18e699SShuo Chen if (evtfd < 0) 302a18e699SShuo Chen { 312a18e699SShuo Chen LOG_SYSERR << "Failed in eventfd"; 322a18e699SShuo Chen abort(); 332a18e699SShuo Chen } 342a18e699SShuo Chen return evtfd; 352a18e699SShuo Chen} 362a18e699SShuo Chen 372a18e699SShuo ChenEventLoop::EventLoop() 382a18e699SShuo Chen : looping_(false), 392a18e699SShuo Chen quit_(false), 402a18e699SShuo Chen callingPendingFunctors_(false), 412a18e699SShuo Chen threadId_(CurrentThread::tid()), 422a18e699SShuo Chen poller_(new Poller(this)), 432a18e699SShuo Chen timerQueue_(new TimerQueue(this)), 442a18e699SShuo Chen wakeupFd_(createEventfd()), 452a18e699SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 462a18e699SShuo Chen{ 472a18e699SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 482a18e699SShuo Chen if (t_loopInThisThread) 492a18e699SShuo Chen { 502a18e699SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 512a18e699SShuo Chen << " exists in this thread " << threadId_; 522a18e699SShuo Chen } 532a18e699SShuo Chen else 542a18e699SShuo Chen { 552a18e699SShuo Chen t_loopInThisThread = this; 562a18e699SShuo Chen } 572a18e699SShuo Chen wakeupChannel_->setReadCallback( 582a18e699SShuo Chen boost::bind(&EventLoop::handleRead, this)); 592a18e699SShuo Chen // we are always reading the wakeupfd 602a18e699SShuo Chen wakeupChannel_->enableReading(); 612a18e699SShuo Chen} 622a18e699SShuo Chen 632a18e699SShuo ChenEventLoop::~EventLoop() 642a18e699SShuo Chen{ 652a18e699SShuo Chen assert(!looping_); 662a18e699SShuo Chen ::close(wakeupFd_); 672a18e699SShuo Chen t_loopInThisThread = NULL; 682a18e699SShuo Chen} 692a18e699SShuo Chen 702a18e699SShuo Chenvoid EventLoop::loop() 712a18e699SShuo Chen{ 722a18e699SShuo Chen assert(!looping_); 732a18e699SShuo Chen assertInLoopThread(); 742a18e699SShuo Chen looping_ = true; 752a18e699SShuo Chen quit_ = false; 762a18e699SShuo Chen 772a18e699SShuo Chen while (!quit_) 782a18e699SShuo Chen { 792a18e699SShuo Chen activeChannels_.clear(); 802a18e699SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 812a18e699SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 822a18e699SShuo Chen it != activeChannels_.end(); ++it) 832a18e699SShuo Chen { 842a18e699SShuo Chen (*it)->handleEvent(pollReturnTime_); 852a18e699SShuo Chen } 862a18e699SShuo Chen doPendingFunctors(); 872a18e699SShuo Chen } 882a18e699SShuo Chen 892a18e699SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 902a18e699SShuo Chen looping_ = false; 912a18e699SShuo Chen} 922a18e699SShuo Chen 932a18e699SShuo Chenvoid EventLoop::quit() 942a18e699SShuo Chen{ 952a18e699SShuo Chen quit_ = true; 962a18e699SShuo Chen if (!isInLoopThread()) 972a18e699SShuo Chen { 982a18e699SShuo Chen wakeup(); 992a18e699SShuo Chen } 1002a18e699SShuo Chen} 1012a18e699SShuo Chen 1022a18e699SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 1032a18e699SShuo Chen{ 1042a18e699SShuo Chen if (isInLoopThread()) 1052a18e699SShuo Chen { 1062a18e699SShuo Chen cb(); 1072a18e699SShuo Chen } 1082a18e699SShuo Chen else 1092a18e699SShuo Chen { 1102a18e699SShuo Chen queueInLoop(cb); 1112a18e699SShuo Chen wakeup(); 1122a18e699SShuo Chen } 1132a18e699SShuo Chen} 1142a18e699SShuo Chen 1152a18e699SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 1162a18e699SShuo Chen{ 1172a18e699SShuo Chen { 1182a18e699SShuo Chen MutexLockGuard lock(mutex_); 1192a18e699SShuo Chen pendingFunctors_.push_back(cb); 1202a18e699SShuo Chen } 1212a18e699SShuo Chen 1222a18e699SShuo Chen if (isInLoopThread() && callingPendingFunctors_) 1232a18e699SShuo Chen { 1242a18e699SShuo Chen wakeup(); 1252a18e699SShuo Chen } 1262a18e699SShuo Chen} 1272a18e699SShuo Chen 1282a18e699SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1292a18e699SShuo Chen{ 1302a18e699SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1312a18e699SShuo Chen} 1322a18e699SShuo Chen 1332a18e699SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1342a18e699SShuo Chen{ 1352a18e699SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1362a18e699SShuo Chen return runAt(time, cb); 1372a18e699SShuo Chen} 1382a18e699SShuo Chen 1392a18e699SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1402a18e699SShuo Chen{ 1412a18e699SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1422a18e699SShuo Chen return timerQueue_->addTimer(cb, time, interval); 1432a18e699SShuo Chen} 1442a18e699SShuo Chen 1452a18e699SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 1462a18e699SShuo Chen{ 1472a18e699SShuo Chen assert(channel->ownerLoop() == this); 1482a18e699SShuo Chen assertInLoopThread(); 1492a18e699SShuo Chen poller_->updateChannel(channel); 1502a18e699SShuo Chen} 1512a18e699SShuo Chen 1522a18e699SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 1532a18e699SShuo Chen{ 1542a18e699SShuo Chen assert(channel->ownerLoop() == this); 1552a18e699SShuo Chen assertInLoopThread(); 1562a18e699SShuo Chen poller_->removeChannel(channel); 1572a18e699SShuo Chen} 1582a18e699SShuo Chen 1592a18e699SShuo Chenvoid EventLoop::abortNotInLoopThread() 1602a18e699SShuo Chen{ 1612a18e699SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1622a18e699SShuo Chen << " was created in threadId_ = " << threadId_ 1632a18e699SShuo Chen << ", current thread id = " << CurrentThread::tid(); 1642a18e699SShuo Chen} 1652a18e699SShuo Chen 1662a18e699SShuo Chenvoid EventLoop::wakeup() 1672a18e699SShuo Chen{ 1682a18e699SShuo Chen uint64_t one = 1; 1692a18e699SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1702a18e699SShuo Chen if (n != sizeof one) 1712a18e699SShuo Chen { 1722a18e699SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1732a18e699SShuo Chen } 1742a18e699SShuo Chen} 1752a18e699SShuo Chen 1762a18e699SShuo Chenvoid EventLoop::handleRead() 1772a18e699SShuo Chen{ 1782a18e699SShuo Chen uint64_t one = 1; 1792a18e699SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1802a18e699SShuo Chen if (n != sizeof one) 1812a18e699SShuo Chen { 1822a18e699SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1832a18e699SShuo Chen } 1842a18e699SShuo Chen} 1852a18e699SShuo Chen 1862a18e699SShuo Chenvoid EventLoop::doPendingFunctors() 1872a18e699SShuo Chen{ 1882a18e699SShuo Chen std::vector<Functor> functors; 1892a18e699SShuo Chen callingPendingFunctors_ = true; 1902a18e699SShuo Chen 1912a18e699SShuo Chen { 1922a18e699SShuo Chen MutexLockGuard lock(mutex_); 1932a18e699SShuo Chen functors.swap(pendingFunctors_); 1942a18e699SShuo Chen } 1952a18e699SShuo Chen 1962a18e699SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 1972a18e699SShuo Chen { 1982a18e699SShuo Chen functors[i](); 1992a18e699SShuo Chen } 2002a18e699SShuo Chen callingPendingFunctors_ = false; 2012a18e699SShuo Chen} 2022a18e699SShuo Chen 203