170523619SShuo Chen // excerpts from http://code.google.com/p/muduo/ 270523619SShuo Chen // 370523619SShuo Chen // Use of this source code is governed by a BSD-style license 470523619SShuo Chen // that can be found in the License file. 570523619SShuo Chen // 670523619SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 770523619SShuo Chen 870523619SShuo Chen #include "EventLoop.h" 970523619SShuo Chen 1070523619SShuo Chen #include "Channel.h" 1170523619SShuo Chen-#include "Poller.h" 1270523619SShuo Chen+#include "EPoller.h" 1370523619SShuo Chen #include "TimerQueue.h" 1470523619SShuo Chen 1570523619SShuo Chen #include "logging/Logging.h" 1670523619SShuo Chen 1770523619SShuo Chen #include <boost/bind.hpp> 1870523619SShuo Chen 1970523619SShuo Chen #include <assert.h> 2070523619SShuo Chen #include <signal.h> 2170523619SShuo Chen #include <sys/eventfd.h> 2270523619SShuo Chen 2370523619SShuo Chen using namespace muduo; 2470523619SShuo Chen 2570523619SShuo Chen __thread EventLoop* t_loopInThisThread = 0; 2670523619SShuo Chen const int kPollTimeMs = 10000; 2770523619SShuo Chen 2870523619SShuo Chen static int createEventfd() 2970523619SShuo Chen { 3070523619SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 3170523619SShuo Chen if (evtfd < 0) 3270523619SShuo Chen { 3370523619SShuo Chen LOG_SYSERR << "Failed in eventfd"; 3470523619SShuo Chen abort(); 3570523619SShuo Chen } 3670523619SShuo Chen return evtfd; 3770523619SShuo Chen } 3870523619SShuo Chen 3970523619SShuo Chen class IgnoreSigPipe 4070523619SShuo Chen { 4170523619SShuo Chen public: 4270523619SShuo Chen IgnoreSigPipe() 4370523619SShuo Chen { 4470523619SShuo Chen ::signal(SIGPIPE, SIG_IGN); 4570523619SShuo Chen } 4670523619SShuo Chen }; 4770523619SShuo Chen 4870523619SShuo Chen IgnoreSigPipe initObj; 4970523619SShuo Chen 5070523619SShuo Chen EventLoop::EventLoop() 5170523619SShuo Chen : looping_(false), 5270523619SShuo Chen quit_(false), 5370523619SShuo Chen callingPendingFunctors_(false), 5470523619SShuo Chen threadId_(CurrentThread::tid()), 5570523619SShuo Chen- poller_(new Poller(this)), 5670523619SShuo Chen+ poller_(new EPoller(this)), 5770523619SShuo Chen timerQueue_(new TimerQueue(this)), 5870523619SShuo Chen wakeupFd_(createEventfd()), 5970523619SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 6070523619SShuo Chen { 6170523619SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 6270523619SShuo Chen if (t_loopInThisThread) 6370523619SShuo Chen { 6470523619SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 6570523619SShuo Chen << " exists in this thread " << threadId_; 6670523619SShuo Chen } 6770523619SShuo Chen else 6870523619SShuo Chen { 6970523619SShuo Chen t_loopInThisThread = this; 7070523619SShuo Chen } 7170523619SShuo Chen wakeupChannel_->setReadCallback( 7270523619SShuo Chen boost::bind(&EventLoop::handleRead, this)); 7370523619SShuo Chen // we are always reading the wakeupfd 7470523619SShuo Chen wakeupChannel_->enableReading(); 7570523619SShuo Chen } 7670523619SShuo Chen 7770523619SShuo Chen EventLoop::~EventLoop() 7870523619SShuo Chen { 7970523619SShuo Chen assert(!looping_); 8070523619SShuo Chen ::close(wakeupFd_); 8170523619SShuo Chen t_loopInThisThread = NULL; 8270523619SShuo Chen } 8370523619SShuo Chen 8470523619SShuo Chen void EventLoop::loop() 8570523619SShuo Chen { 8670523619SShuo Chen assert(!looping_); 8770523619SShuo Chen assertInLoopThread(); 8870523619SShuo Chen looping_ = true; 8970523619SShuo Chen quit_ = false; 9070523619SShuo Chen 9170523619SShuo Chen while (!quit_) 9270523619SShuo Chen { 9370523619SShuo Chen activeChannels_.clear(); 9470523619SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 9570523619SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 9670523619SShuo Chen it != activeChannels_.end(); ++it) 9770523619SShuo Chen { 9870523619SShuo Chen (*it)->handleEvent(pollReturnTime_); 9970523619SShuo Chen } 10070523619SShuo Chen doPendingFunctors(); 10170523619SShuo Chen } 10270523619SShuo Chen 10370523619SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 10470523619SShuo Chen looping_ = false; 10570523619SShuo Chen } 10670523619SShuo Chen 10770523619SShuo Chen void EventLoop::quit() 10870523619SShuo Chen { 10970523619SShuo Chen quit_ = true; 11070523619SShuo Chen if (!isInLoopThread()) 11170523619SShuo Chen { 11270523619SShuo Chen wakeup(); 11370523619SShuo Chen } 11470523619SShuo Chen } 11570523619SShuo Chen 11670523619SShuo Chen void EventLoop::runInLoop(const Functor& cb) 11770523619SShuo Chen { 11870523619SShuo Chen if (isInLoopThread()) 11970523619SShuo Chen { 12070523619SShuo Chen cb(); 12170523619SShuo Chen } 12270523619SShuo Chen else 12370523619SShuo Chen { 12470523619SShuo Chen queueInLoop(cb); 12570523619SShuo Chen } 12670523619SShuo Chen } 12770523619SShuo Chen 12870523619SShuo Chen void EventLoop::queueInLoop(const Functor& cb) 12970523619SShuo Chen { 13070523619SShuo Chen { 13170523619SShuo Chen MutexLockGuard lock(mutex_); 13270523619SShuo Chen pendingFunctors_.push_back(cb); 13370523619SShuo Chen } 13470523619SShuo Chen 13570523619SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 13670523619SShuo Chen { 13770523619SShuo Chen wakeup(); 13870523619SShuo Chen } 13970523619SShuo Chen } 14070523619SShuo Chen 14170523619SShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 14270523619SShuo Chen { 14370523619SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 14470523619SShuo Chen } 14570523619SShuo Chen 14670523619SShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 14770523619SShuo Chen { 14870523619SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 14970523619SShuo Chen return runAt(time, cb); 15070523619SShuo Chen } 15170523619SShuo Chen 15270523619SShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 15370523619SShuo Chen { 15470523619SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 15570523619SShuo Chen return timerQueue_->addTimer(cb, time, interval); 15670523619SShuo Chen } 15770523619SShuo Chen 15870523619SShuo Chen void EventLoop::cancel(TimerId timerId) 15970523619SShuo Chen { 16070523619SShuo Chen return timerQueue_->cancel(timerId); 16170523619SShuo Chen } 16270523619SShuo Chen 16370523619SShuo Chen void EventLoop::updateChannel(Channel* channel) 16470523619SShuo Chen { 16570523619SShuo Chen assert(channel->ownerLoop() == this); 16670523619SShuo Chen assertInLoopThread(); 16770523619SShuo Chen poller_->updateChannel(channel); 16870523619SShuo Chen } 16970523619SShuo Chen 17070523619SShuo Chen void EventLoop::removeChannel(Channel* channel) 17170523619SShuo Chen { 17270523619SShuo Chen assert(channel->ownerLoop() == this); 17370523619SShuo Chen assertInLoopThread(); 17470523619SShuo Chen poller_->removeChannel(channel); 17570523619SShuo Chen } 17670523619SShuo Chen 17770523619SShuo Chen void EventLoop::abortNotInLoopThread() 17870523619SShuo Chen { 17970523619SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 18070523619SShuo Chen << " was created in threadId_ = " << threadId_ 18170523619SShuo Chen << ", current thread id = " << CurrentThread::tid(); 18270523619SShuo Chen } 18370523619SShuo Chen 18470523619SShuo Chen void EventLoop::wakeup() 18570523619SShuo Chen { 18670523619SShuo Chen uint64_t one = 1; 18770523619SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 18870523619SShuo Chen if (n != sizeof one) 18970523619SShuo Chen { 19070523619SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 19170523619SShuo Chen } 19270523619SShuo Chen } 19370523619SShuo Chen 19470523619SShuo Chen void EventLoop::handleRead() 19570523619SShuo Chen { 19670523619SShuo Chen uint64_t one = 1; 19770523619SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 19870523619SShuo Chen if (n != sizeof one) 19970523619SShuo Chen { 20070523619SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 20170523619SShuo Chen } 20270523619SShuo Chen } 20370523619SShuo Chen 20470523619SShuo Chen void EventLoop::doPendingFunctors() 20570523619SShuo Chen { 20670523619SShuo Chen std::vector<Functor> functors; 20770523619SShuo Chen callingPendingFunctors_ = true; 20870523619SShuo Chen 20970523619SShuo Chen { 21070523619SShuo Chen MutexLockGuard lock(mutex_); 21170523619SShuo Chen functors.swap(pendingFunctors_); 21270523619SShuo Chen } 21370523619SShuo Chen 21470523619SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 21570523619SShuo Chen { 21670523619SShuo Chen functors[i](); 21770523619SShuo Chen } 21870523619SShuo Chen callingPendingFunctors_ = false; 21970523619SShuo Chen } 22070523619SShuo Chen 221