12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/ 22a18e699SShuo Chen// 32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license 42a18e699SShuo Chen// that can be found in the License file. 52a18e699SShuo Chen// 62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 72a18e699SShuo Chen 82a18e699SShuo Chen#include "EventLoop.h" 92a18e699SShuo Chen 102a18e699SShuo Chen#include "Channel.h" 112a18e699SShuo Chen#include "Poller.h" 122a18e699SShuo Chen#include "TimerQueue.h" 132a18e699SShuo Chen 142a18e699SShuo Chen#include "logging/Logging.h" 152a18e699SShuo Chen 162a18e699SShuo Chen#include <boost/bind.hpp> 172a18e699SShuo Chen 182a18e699SShuo Chen#include <assert.h> 192a18e699SShuo Chen#include <sys/eventfd.h> 202a18e699SShuo Chen 212a18e699SShuo Chenusing namespace muduo; 222a18e699SShuo Chen 232a18e699SShuo Chen__thread EventLoop* t_loopInThisThread = 0; 242a18e699SShuo Chenconst int kPollTimeMs = 10000; 252a18e699SShuo Chen 262a18e699SShuo Chenstatic int createEventfd() 272a18e699SShuo Chen{ 282a18e699SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 292a18e699SShuo Chen if (evtfd < 0) 302a18e699SShuo Chen { 312a18e699SShuo Chen LOG_SYSERR << "Failed in eventfd"; 322a18e699SShuo Chen abort(); 332a18e699SShuo Chen } 342a18e699SShuo Chen return evtfd; 352a18e699SShuo Chen} 362a18e699SShuo Chen 372a18e699SShuo ChenEventLoop::EventLoop() 382a18e699SShuo Chen : looping_(false), 392a18e699SShuo Chen quit_(false), 402a18e699SShuo Chen callingPendingFunctors_(false), 412a18e699SShuo Chen threadId_(CurrentThread::tid()), 422a18e699SShuo Chen poller_(new Poller(this)), 432a18e699SShuo Chen timerQueue_(new TimerQueue(this)), 442a18e699SShuo Chen wakeupFd_(createEventfd()), 452a18e699SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 462a18e699SShuo Chen{ 472a18e699SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 482a18e699SShuo Chen if (t_loopInThisThread) 492a18e699SShuo Chen { 502a18e699SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 512a18e699SShuo Chen << " exists in this thread " << threadId_; 522a18e699SShuo Chen } 532a18e699SShuo Chen else 542a18e699SShuo Chen { 552a18e699SShuo Chen t_loopInThisThread = this; 562a18e699SShuo Chen } 572a18e699SShuo Chen wakeupChannel_->setReadCallback( 582a18e699SShuo Chen boost::bind(&EventLoop::handleRead, this)); 592a18e699SShuo Chen // we are always reading the wakeupfd 602a18e699SShuo Chen wakeupChannel_->enableReading(); 612a18e699SShuo Chen} 622a18e699SShuo Chen 632a18e699SShuo ChenEventLoop::~EventLoop() 642a18e699SShuo Chen{ 652a18e699SShuo Chen assert(!looping_); 662a18e699SShuo Chen ::close(wakeupFd_); 672a18e699SShuo Chen t_loopInThisThread = NULL; 682a18e699SShuo Chen} 692a18e699SShuo Chen 702a18e699SShuo Chenvoid EventLoop::loop() 712a18e699SShuo Chen{ 722a18e699SShuo Chen assert(!looping_); 732a18e699SShuo Chen assertInLoopThread(); 742a18e699SShuo Chen looping_ = true; 752a18e699SShuo Chen quit_ = false; 762a18e699SShuo Chen 772a18e699SShuo Chen while (!quit_) 782a18e699SShuo Chen { 792a18e699SShuo Chen activeChannels_.clear(); 802a18e699SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 812a18e699SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 822a18e699SShuo Chen it != activeChannels_.end(); ++it) 832a18e699SShuo Chen { 842a18e699SShuo Chen (*it)->handleEvent(pollReturnTime_); 852a18e699SShuo Chen } 862a18e699SShuo Chen doPendingFunctors(); 872a18e699SShuo Chen } 882a18e699SShuo Chen 892a18e699SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 902a18e699SShuo Chen looping_ = false; 912a18e699SShuo Chen} 922a18e699SShuo Chen 932a18e699SShuo Chenvoid EventLoop::quit() 942a18e699SShuo Chen{ 952a18e699SShuo Chen quit_ = true; 962a18e699SShuo Chen if (!isInLoopThread()) 972a18e699SShuo Chen { 982a18e699SShuo Chen wakeup(); 992a18e699SShuo Chen } 1002a18e699SShuo Chen} 1012a18e699SShuo Chen 1022a18e699SShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 1032a18e699SShuo Chen{ 1042a18e699SShuo Chen if (isInLoopThread()) 1052a18e699SShuo Chen { 1062a18e699SShuo Chen cb(); 1072a18e699SShuo Chen } 1082a18e699SShuo Chen else 1092a18e699SShuo Chen { 1102a18e699SShuo Chen queueInLoop(cb); 1112a18e699SShuo Chen } 1122a18e699SShuo Chen} 1132a18e699SShuo Chen 1142a18e699SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 1152a18e699SShuo Chen{ 1162a18e699SShuo Chen { 1172a18e699SShuo Chen MutexLockGuard lock(mutex_); 1182a18e699SShuo Chen pendingFunctors_.push_back(cb); 1192a18e699SShuo Chen } 1202a18e699SShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 1222a18e699SShuo Chen { 1232a18e699SShuo Chen wakeup(); 1242a18e699SShuo Chen } 1252a18e699SShuo Chen} 1262a18e699SShuo Chen 1272a18e699SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1282a18e699SShuo Chen{ 1292a18e699SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1302a18e699SShuo Chen} 1312a18e699SShuo Chen 1322a18e699SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1332a18e699SShuo Chen{ 1342a18e699SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1352a18e699SShuo Chen return runAt(time, cb); 1362a18e699SShuo Chen} 1372a18e699SShuo Chen 1382a18e699SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1392a18e699SShuo Chen{ 1402a18e699SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1412a18e699SShuo Chen return timerQueue_->addTimer(cb, time, interval); 1422a18e699SShuo Chen} 1432a18e699SShuo Chen 1442a18e699SShuo Chenvoid EventLoop::updateChannel(Channel* channel) 1452a18e699SShuo Chen{ 1462a18e699SShuo Chen assert(channel->ownerLoop() == this); 1472a18e699SShuo Chen assertInLoopThread(); 1482a18e699SShuo Chen poller_->updateChannel(channel); 1492a18e699SShuo Chen} 1502a18e699SShuo Chen 1512a18e699SShuo Chenvoid EventLoop::removeChannel(Channel* channel) 1522a18e699SShuo Chen{ 1532a18e699SShuo Chen assert(channel->ownerLoop() == this); 1542a18e699SShuo Chen assertInLoopThread(); 1552a18e699SShuo Chen poller_->removeChannel(channel); 1562a18e699SShuo Chen} 1572a18e699SShuo Chen 1582a18e699SShuo Chenvoid EventLoop::abortNotInLoopThread() 1592a18e699SShuo Chen{ 1602a18e699SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1612a18e699SShuo Chen << " was created in threadId_ = " << threadId_ 1622a18e699SShuo Chen << ", current thread id = " << CurrentThread::tid(); 1632a18e699SShuo Chen} 1642a18e699SShuo Chen 1652a18e699SShuo Chenvoid EventLoop::wakeup() 1662a18e699SShuo Chen{ 1672a18e699SShuo Chen uint64_t one = 1; 1682a18e699SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1692a18e699SShuo Chen if (n != sizeof one) 1702a18e699SShuo Chen { 1712a18e699SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1722a18e699SShuo Chen } 1732a18e699SShuo Chen} 1742a18e699SShuo Chen 1752a18e699SShuo Chenvoid EventLoop::handleRead() 1762a18e699SShuo Chen{ 1772a18e699SShuo Chen uint64_t one = 1; 1782a18e699SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1792a18e699SShuo Chen if (n != sizeof one) 1802a18e699SShuo Chen { 1812a18e699SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1822a18e699SShuo Chen } 1832a18e699SShuo Chen} 1842a18e699SShuo Chen 1852a18e699SShuo Chenvoid EventLoop::doPendingFunctors() 1862a18e699SShuo Chen{ 1872a18e699SShuo Chen std::vector<Functor> functors; 1882a18e699SShuo Chen callingPendingFunctors_ = true; 1892a18e699SShuo Chen 1902a18e699SShuo Chen { 1912a18e699SShuo Chen MutexLockGuard lock(mutex_); 1922a18e699SShuo Chen functors.swap(pendingFunctors_); 1932a18e699SShuo Chen } 1942a18e699SShuo Chen 1952a18e699SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 1962a18e699SShuo Chen { 1972a18e699SShuo Chen functors[i](); 1982a18e699SShuo Chen } 1992a18e699SShuo Chen callingPendingFunctors_ = false; 2002a18e699SShuo Chen} 2012a18e699SShuo Chen 202