15af4b7fbSShuo Chen // excerpts from http://code.google.com/p/muduo/ 25af4b7fbSShuo Chen // 35af4b7fbSShuo Chen // Use of this source code is governed by a BSD-style license 45af4b7fbSShuo Chen // that can be found in the License file. 55af4b7fbSShuo Chen // 65af4b7fbSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 75af4b7fbSShuo Chen 85af4b7fbSShuo Chen #include "EventLoop.h" 95af4b7fbSShuo Chen 105af4b7fbSShuo Chen #include "Channel.h" 115af4b7fbSShuo Chen #include "Poller.h" 125af4b7fbSShuo Chen #include "TimerQueue.h" 135af4b7fbSShuo Chen 145af4b7fbSShuo Chen #include "logging/Logging.h" 155af4b7fbSShuo Chen 165af4b7fbSShuo Chen #include <boost/bind.hpp> 175af4b7fbSShuo Chen 185af4b7fbSShuo Chen #include <assert.h> 195af4b7fbSShuo Chen+#include <signal.h> 205af4b7fbSShuo Chen #include <sys/eventfd.h> 215af4b7fbSShuo Chen 225af4b7fbSShuo Chen using namespace muduo; 235af4b7fbSShuo Chen 245af4b7fbSShuo Chen __thread EventLoop* t_loopInThisThread = 0; 255af4b7fbSShuo Chen const int kPollTimeMs = 10000; 265af4b7fbSShuo Chen 275af4b7fbSShuo Chen static int createEventfd() 285af4b7fbSShuo Chen { 295af4b7fbSShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 305af4b7fbSShuo Chen if (evtfd < 0) 315af4b7fbSShuo Chen { 325af4b7fbSShuo Chen LOG_SYSERR << "Failed in eventfd"; 335af4b7fbSShuo Chen abort(); 345af4b7fbSShuo Chen } 355af4b7fbSShuo Chen return evtfd; 365af4b7fbSShuo Chen } 375af4b7fbSShuo Chen 385af4b7fbSShuo Chen+class IgnoreSigPipe 395af4b7fbSShuo Chen+{ 405af4b7fbSShuo Chen+ public: 415af4b7fbSShuo Chen+ IgnoreSigPipe() 425af4b7fbSShuo Chen+ { 435af4b7fbSShuo Chen+ ::signal(SIGPIPE, SIG_IGN); 445af4b7fbSShuo Chen+ } 455af4b7fbSShuo Chen+}; 465af4b7fbSShuo Chen+ 475af4b7fbSShuo Chen+IgnoreSigPipe initObj; 485af4b7fbSShuo Chen+ 495af4b7fbSShuo Chen EventLoop::EventLoop() 505af4b7fbSShuo Chen : looping_(false), 515af4b7fbSShuo Chen quit_(false), 525af4b7fbSShuo Chen callingPendingFunctors_(false), 535af4b7fbSShuo Chen threadId_(CurrentThread::tid()), 545af4b7fbSShuo Chen poller_(new Poller(this)), 555af4b7fbSShuo Chen timerQueue_(new TimerQueue(this)), 565af4b7fbSShuo Chen wakeupFd_(createEventfd()), 575af4b7fbSShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 585af4b7fbSShuo Chen { 595af4b7fbSShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 605af4b7fbSShuo Chen if (t_loopInThisThread) 615af4b7fbSShuo Chen { 625af4b7fbSShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 635af4b7fbSShuo Chen << " exists in this thread " << threadId_; 645af4b7fbSShuo Chen } 655af4b7fbSShuo Chen else 665af4b7fbSShuo Chen { 675af4b7fbSShuo Chen t_loopInThisThread = this; 685af4b7fbSShuo Chen } 695af4b7fbSShuo Chen wakeupChannel_->setReadCallback( 705af4b7fbSShuo Chen boost::bind(&EventLoop::handleRead, this)); 715af4b7fbSShuo Chen // we are always reading the wakeupfd 725af4b7fbSShuo Chen wakeupChannel_->enableReading(); 735af4b7fbSShuo Chen } 745af4b7fbSShuo Chen 755af4b7fbSShuo Chen EventLoop::~EventLoop() 765af4b7fbSShuo Chen { 775af4b7fbSShuo Chen assert(!looping_); 785af4b7fbSShuo Chen ::close(wakeupFd_); 795af4b7fbSShuo Chen t_loopInThisThread = NULL; 805af4b7fbSShuo Chen } 815af4b7fbSShuo Chen 825af4b7fbSShuo Chen void EventLoop::loop() 835af4b7fbSShuo Chen { 845af4b7fbSShuo Chen assert(!looping_); 855af4b7fbSShuo Chen assertInLoopThread(); 865af4b7fbSShuo Chen looping_ = true; 875af4b7fbSShuo Chen quit_ = false; 885af4b7fbSShuo Chen 895af4b7fbSShuo Chen while (!quit_) 905af4b7fbSShuo Chen { 915af4b7fbSShuo Chen activeChannels_.clear(); 925af4b7fbSShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 935af4b7fbSShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 945af4b7fbSShuo Chen it != activeChannels_.end(); ++it) 955af4b7fbSShuo Chen { 965af4b7fbSShuo Chen (*it)->handleEvent(pollReturnTime_); 975af4b7fbSShuo Chen } 985af4b7fbSShuo Chen doPendingFunctors(); 995af4b7fbSShuo Chen } 1005af4b7fbSShuo Chen 1015af4b7fbSShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 1025af4b7fbSShuo Chen looping_ = false; 1035af4b7fbSShuo Chen } 1045af4b7fbSShuo Chen 1055af4b7fbSShuo Chen void EventLoop::quit() 1065af4b7fbSShuo Chen { 1075af4b7fbSShuo Chen quit_ = true; 1085af4b7fbSShuo Chen if (!isInLoopThread()) 1095af4b7fbSShuo Chen { 1105af4b7fbSShuo Chen wakeup(); 1115af4b7fbSShuo Chen } 1125af4b7fbSShuo Chen } 1135af4b7fbSShuo Chen 1145af4b7fbSShuo Chen void EventLoop::runInLoop(const Functor& cb) 1155af4b7fbSShuo Chen { 1165af4b7fbSShuo Chen if (isInLoopThread()) 1175af4b7fbSShuo Chen { 1185af4b7fbSShuo Chen cb(); 1195af4b7fbSShuo Chen } 1205af4b7fbSShuo Chen else 1215af4b7fbSShuo Chen { 1225af4b7fbSShuo Chen queueInLoop(cb); 1235af4b7fbSShuo Chen } 1245af4b7fbSShuo Chen } 1255af4b7fbSShuo Chen 1265af4b7fbSShuo Chen void EventLoop::queueInLoop(const Functor& cb) 1275af4b7fbSShuo Chen { 1285af4b7fbSShuo Chen { 1295af4b7fbSShuo Chen MutexLockGuard lock(mutex_); 1305af4b7fbSShuo Chen pendingFunctors_.push_back(cb); 1315af4b7fbSShuo Chen } 1325af4b7fbSShuo Chen 1330f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 1345af4b7fbSShuo Chen { 1355af4b7fbSShuo Chen wakeup(); 1365af4b7fbSShuo Chen } 1375af4b7fbSShuo Chen } 1385af4b7fbSShuo Chen 1395af4b7fbSShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1405af4b7fbSShuo Chen { 1415af4b7fbSShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1425af4b7fbSShuo Chen } 1435af4b7fbSShuo Chen 1445af4b7fbSShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1455af4b7fbSShuo Chen { 1465af4b7fbSShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1475af4b7fbSShuo Chen return runAt(time, cb); 1485af4b7fbSShuo Chen } 1495af4b7fbSShuo Chen 1505af4b7fbSShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1515af4b7fbSShuo Chen { 1525af4b7fbSShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1535af4b7fbSShuo Chen return timerQueue_->addTimer(cb, time, interval); 1545af4b7fbSShuo Chen } 1555af4b7fbSShuo Chen 1565af4b7fbSShuo Chen void EventLoop::updateChannel(Channel* channel) 1575af4b7fbSShuo Chen { 1585af4b7fbSShuo Chen assert(channel->ownerLoop() == this); 1595af4b7fbSShuo Chen assertInLoopThread(); 1605af4b7fbSShuo Chen poller_->updateChannel(channel); 1615af4b7fbSShuo Chen } 1625af4b7fbSShuo Chen 1635af4b7fbSShuo Chen void EventLoop::removeChannel(Channel* channel) 1645af4b7fbSShuo Chen { 1655af4b7fbSShuo Chen assert(channel->ownerLoop() == this); 1665af4b7fbSShuo Chen assertInLoopThread(); 1675af4b7fbSShuo Chen poller_->removeChannel(channel); 1685af4b7fbSShuo Chen } 1695af4b7fbSShuo Chen 1705af4b7fbSShuo Chen void EventLoop::abortNotInLoopThread() 1715af4b7fbSShuo Chen { 1725af4b7fbSShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1735af4b7fbSShuo Chen << " was created in threadId_ = " << threadId_ 1745af4b7fbSShuo Chen << ", current thread id = " << CurrentThread::tid(); 1755af4b7fbSShuo Chen } 1765af4b7fbSShuo Chen 1775af4b7fbSShuo Chen void EventLoop::wakeup() 1785af4b7fbSShuo Chen { 1795af4b7fbSShuo Chen uint64_t one = 1; 1805af4b7fbSShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1815af4b7fbSShuo Chen if (n != sizeof one) 1825af4b7fbSShuo Chen { 1835af4b7fbSShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1845af4b7fbSShuo Chen } 1855af4b7fbSShuo Chen } 1865af4b7fbSShuo Chen 1875af4b7fbSShuo Chen void EventLoop::handleRead() 1885af4b7fbSShuo Chen { 1895af4b7fbSShuo Chen uint64_t one = 1; 1905af4b7fbSShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1915af4b7fbSShuo Chen if (n != sizeof one) 1925af4b7fbSShuo Chen { 1935af4b7fbSShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1945af4b7fbSShuo Chen } 1955af4b7fbSShuo Chen } 1965af4b7fbSShuo Chen 1975af4b7fbSShuo Chen void EventLoop::doPendingFunctors() 1985af4b7fbSShuo Chen { 1995af4b7fbSShuo Chen std::vector<Functor> functors; 2005af4b7fbSShuo Chen callingPendingFunctors_ = true; 2015af4b7fbSShuo Chen 2025af4b7fbSShuo Chen { 2035af4b7fbSShuo Chen MutexLockGuard lock(mutex_); 2045af4b7fbSShuo Chen functors.swap(pendingFunctors_); 2055af4b7fbSShuo Chen } 2065af4b7fbSShuo Chen 2075af4b7fbSShuo Chen for (size_t i = 0; i < functors.size(); ++i) 2085af4b7fbSShuo Chen { 2095af4b7fbSShuo Chen functors[i](); 2105af4b7fbSShuo Chen } 2115af4b7fbSShuo Chen callingPendingFunctors_ = false; 2125af4b7fbSShuo Chen } 2135af4b7fbSShuo Chen 214