13b467340SShuo Chen // excerpts from http://code.google.com/p/muduo/ 23b467340SShuo Chen // 33b467340SShuo Chen // Use of this source code is governed by a BSD-style license 43b467340SShuo Chen // that can be found in the License file. 53b467340SShuo Chen // 63b467340SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 73b467340SShuo Chen 83b467340SShuo Chen #include "EventLoop.h" 93b467340SShuo Chen 103b467340SShuo Chen #include "Channel.h" 113b467340SShuo Chen #include "Poller.h" 123b467340SShuo Chen #include "TimerQueue.h" 133b467340SShuo Chen 143b467340SShuo Chen #include "logging/Logging.h" 153b467340SShuo Chen 163b467340SShuo Chen #include <boost/bind.hpp> 173b467340SShuo Chen 183b467340SShuo Chen #include <assert.h> 193b467340SShuo Chen #include <sys/eventfd.h> 203b467340SShuo Chen 213b467340SShuo Chen using namespace muduo; 223b467340SShuo Chen 233b467340SShuo Chen __thread EventLoop* t_loopInThisThread = 0; 243b467340SShuo Chen const int kPollTimeMs = 10000; 253b467340SShuo Chen 263b467340SShuo Chen static int createEventfd() 273b467340SShuo Chen { 283b467340SShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 293b467340SShuo Chen if (evtfd < 0) 303b467340SShuo Chen { 313b467340SShuo Chen LOG_SYSERR << "Failed in eventfd"; 323b467340SShuo Chen abort(); 333b467340SShuo Chen } 343b467340SShuo Chen return evtfd; 353b467340SShuo Chen } 363b467340SShuo Chen 373b467340SShuo Chen EventLoop::EventLoop() 383b467340SShuo Chen : looping_(false), 393b467340SShuo Chen quit_(false), 403b467340SShuo Chen callingPendingFunctors_(false), 413b467340SShuo Chen threadId_(CurrentThread::tid()), 423b467340SShuo Chen poller_(new Poller(this)), 433b467340SShuo Chen timerQueue_(new TimerQueue(this)), 443b467340SShuo Chen wakeupFd_(createEventfd()), 453b467340SShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 463b467340SShuo Chen { 473b467340SShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 483b467340SShuo Chen if (t_loopInThisThread) 493b467340SShuo Chen { 503b467340SShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 513b467340SShuo Chen << " exists in this thread " << threadId_; 523b467340SShuo Chen } 533b467340SShuo Chen else 543b467340SShuo Chen { 553b467340SShuo Chen t_loopInThisThread = this; 563b467340SShuo Chen } 573b467340SShuo Chen wakeupChannel_->setReadCallback( 583b467340SShuo Chen boost::bind(&EventLoop::handleRead, this)); 593b467340SShuo Chen // we are always reading the wakeupfd 603b467340SShuo Chen wakeupChannel_->enableReading(); 613b467340SShuo Chen } 623b467340SShuo Chen 633b467340SShuo Chen EventLoop::~EventLoop() 643b467340SShuo Chen { 653b467340SShuo Chen assert(!looping_); 663b467340SShuo Chen ::close(wakeupFd_); 673b467340SShuo Chen t_loopInThisThread = NULL; 683b467340SShuo Chen } 693b467340SShuo Chen 703b467340SShuo Chen void EventLoop::loop() 713b467340SShuo Chen { 723b467340SShuo Chen assert(!looping_); 733b467340SShuo Chen assertInLoopThread(); 743b467340SShuo Chen looping_ = true; 753b467340SShuo Chen quit_ = false; 763b467340SShuo Chen 773b467340SShuo Chen while (!quit_) 783b467340SShuo Chen { 793b467340SShuo Chen activeChannels_.clear(); 803b467340SShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 813b467340SShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 823b467340SShuo Chen it != activeChannels_.end(); ++it) 833b467340SShuo Chen { 843b467340SShuo Chen! (*it)->handleEvent(pollReturnTime_); 853b467340SShuo Chen } 863b467340SShuo Chen doPendingFunctors(); 873b467340SShuo Chen } 883b467340SShuo Chen 893b467340SShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 903b467340SShuo Chen looping_ = false; 913b467340SShuo Chen } 923b467340SShuo Chen 933b467340SShuo Chen void EventLoop::quit() 943b467340SShuo Chen { 953b467340SShuo Chen quit_ = true; 963b467340SShuo Chen if (!isInLoopThread()) 973b467340SShuo Chen { 983b467340SShuo Chen wakeup(); 993b467340SShuo Chen } 1003b467340SShuo Chen } 1013b467340SShuo Chen 1023b467340SShuo Chen void EventLoop::runInLoop(const Functor& cb) 1033b467340SShuo Chen { 1043b467340SShuo Chen if (isInLoopThread()) 1053b467340SShuo Chen { 1063b467340SShuo Chen cb(); 1073b467340SShuo Chen } 1083b467340SShuo Chen else 1093b467340SShuo Chen { 1103b467340SShuo Chen queueInLoop(cb); 1113b467340SShuo Chen } 1123b467340SShuo Chen } 1133b467340SShuo Chen 1143b467340SShuo Chen void EventLoop::queueInLoop(const Functor& cb) 1153b467340SShuo Chen { 1163b467340SShuo Chen { 1173b467340SShuo Chen MutexLockGuard lock(mutex_); 1183b467340SShuo Chen pendingFunctors_.push_back(cb); 1193b467340SShuo Chen } 1203b467340SShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 1223b467340SShuo Chen { 1233b467340SShuo Chen wakeup(); 1243b467340SShuo Chen } 1253b467340SShuo Chen } 1263b467340SShuo Chen 1273b467340SShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1283b467340SShuo Chen { 1293b467340SShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1303b467340SShuo Chen } 1313b467340SShuo Chen 1323b467340SShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1333b467340SShuo Chen { 1343b467340SShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1353b467340SShuo Chen return runAt(time, cb); 1363b467340SShuo Chen } 1373b467340SShuo Chen 1383b467340SShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1393b467340SShuo Chen { 1403b467340SShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1413b467340SShuo Chen return timerQueue_->addTimer(cb, time, interval); 1423b467340SShuo Chen } 1433b467340SShuo Chen 1443b467340SShuo Chen void EventLoop::updateChannel(Channel* channel) 1453b467340SShuo Chen { 1463b467340SShuo Chen assert(channel->ownerLoop() == this); 1473b467340SShuo Chen assertInLoopThread(); 1483b467340SShuo Chen poller_->updateChannel(channel); 1493b467340SShuo Chen } 1503b467340SShuo Chen 1513b467340SShuo Chen void EventLoop::removeChannel(Channel* channel) 1523b467340SShuo Chen { 1533b467340SShuo Chen assert(channel->ownerLoop() == this); 1543b467340SShuo Chen assertInLoopThread(); 1553b467340SShuo Chen poller_->removeChannel(channel); 1563b467340SShuo Chen } 1573b467340SShuo Chen 1583b467340SShuo Chen void EventLoop::abortNotInLoopThread() 1593b467340SShuo Chen { 1603b467340SShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1613b467340SShuo Chen << " was created in threadId_ = " << threadId_ 1623b467340SShuo Chen << ", current thread id = " << CurrentThread::tid(); 1633b467340SShuo Chen } 1643b467340SShuo Chen 1653b467340SShuo Chen void EventLoop::wakeup() 1663b467340SShuo Chen { 1673b467340SShuo Chen uint64_t one = 1; 1683b467340SShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1693b467340SShuo Chen if (n != sizeof one) 1703b467340SShuo Chen { 1713b467340SShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1723b467340SShuo Chen } 1733b467340SShuo Chen } 1743b467340SShuo Chen 1753b467340SShuo Chen void EventLoop::handleRead() 1763b467340SShuo Chen { 1773b467340SShuo Chen uint64_t one = 1; 1783b467340SShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1793b467340SShuo Chen if (n != sizeof one) 1803b467340SShuo Chen { 1813b467340SShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1823b467340SShuo Chen } 1833b467340SShuo Chen } 1843b467340SShuo Chen 1853b467340SShuo Chen void EventLoop::doPendingFunctors() 1863b467340SShuo Chen { 1873b467340SShuo Chen std::vector<Functor> functors; 1883b467340SShuo Chen callingPendingFunctors_ = true; 1893b467340SShuo Chen 1903b467340SShuo Chen { 1913b467340SShuo Chen MutexLockGuard lock(mutex_); 1923b467340SShuo Chen functors.swap(pendingFunctors_); 1933b467340SShuo Chen } 1943b467340SShuo Chen 1953b467340SShuo Chen for (size_t i = 0; i < functors.size(); ++i) 1963b467340SShuo Chen { 1973b467340SShuo Chen functors[i](); 1983b467340SShuo Chen } 1993b467340SShuo Chen callingPendingFunctors_ = false; 2003b467340SShuo Chen } 2013b467340SShuo Chen 202