s07-s06-EventLoop.cc.diff revision 3b467340
13b467340SShuo Chen // excerpts from http://code.google.com/p/muduo/
23b467340SShuo Chen //
33b467340SShuo Chen // Use of this source code is governed by a BSD-style license
43b467340SShuo Chen // that can be found in the License file.
53b467340SShuo Chen //
63b467340SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
73b467340SShuo Chen 
83b467340SShuo Chen #include "EventLoop.h"
93b467340SShuo Chen 
103b467340SShuo Chen #include "Channel.h"
113b467340SShuo Chen #include "Poller.h"
123b467340SShuo Chen #include "TimerQueue.h"
133b467340SShuo Chen 
143b467340SShuo Chen #include "logging/Logging.h"
153b467340SShuo Chen 
163b467340SShuo Chen #include <boost/bind.hpp>
173b467340SShuo Chen 
183b467340SShuo Chen #include <assert.h>
193b467340SShuo Chen #include <sys/eventfd.h>
203b467340SShuo Chen 
213b467340SShuo Chen using namespace muduo;
223b467340SShuo Chen 
233b467340SShuo Chen __thread EventLoop* t_loopInThisThread = 0;
243b467340SShuo Chen const int kPollTimeMs = 10000;
253b467340SShuo Chen 
263b467340SShuo Chen static int createEventfd()
273b467340SShuo Chen {
283b467340SShuo Chen   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
293b467340SShuo Chen   if (evtfd < 0)
303b467340SShuo Chen   {
313b467340SShuo Chen     LOG_SYSERR << "Failed in eventfd";
323b467340SShuo Chen     abort();
333b467340SShuo Chen   }
343b467340SShuo Chen   return evtfd;
353b467340SShuo Chen }
363b467340SShuo Chen 
373b467340SShuo Chen EventLoop::EventLoop()
383b467340SShuo Chen   : looping_(false),
393b467340SShuo Chen     quit_(false),
403b467340SShuo Chen     callingPendingFunctors_(false),
413b467340SShuo Chen     threadId_(CurrentThread::tid()),
423b467340SShuo Chen     poller_(new Poller(this)),
433b467340SShuo Chen     timerQueue_(new TimerQueue(this)),
443b467340SShuo Chen     wakeupFd_(createEventfd()),
453b467340SShuo Chen     wakeupChannel_(new Channel(this, wakeupFd_))
463b467340SShuo Chen {
473b467340SShuo Chen   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
483b467340SShuo Chen   if (t_loopInThisThread)
493b467340SShuo Chen   {
503b467340SShuo Chen     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
513b467340SShuo Chen               << " exists in this thread " << threadId_;
523b467340SShuo Chen   }
533b467340SShuo Chen   else
543b467340SShuo Chen   {
553b467340SShuo Chen     t_loopInThisThread = this;
563b467340SShuo Chen   }
573b467340SShuo Chen   wakeupChannel_->setReadCallback(
583b467340SShuo Chen       boost::bind(&EventLoop::handleRead, this));
593b467340SShuo Chen   // we are always reading the wakeupfd
603b467340SShuo Chen   wakeupChannel_->enableReading();
613b467340SShuo Chen }
623b467340SShuo Chen 
633b467340SShuo Chen EventLoop::~EventLoop()
643b467340SShuo Chen {
653b467340SShuo Chen   assert(!looping_);
663b467340SShuo Chen   ::close(wakeupFd_);
673b467340SShuo Chen   t_loopInThisThread = NULL;
683b467340SShuo Chen }
693b467340SShuo Chen 
703b467340SShuo Chen void EventLoop::loop()
713b467340SShuo Chen {
723b467340SShuo Chen   assert(!looping_);
733b467340SShuo Chen   assertInLoopThread();
743b467340SShuo Chen   looping_ = true;
753b467340SShuo Chen   quit_ = false;
763b467340SShuo Chen 
773b467340SShuo Chen   while (!quit_)
783b467340SShuo Chen   {
793b467340SShuo Chen     activeChannels_.clear();
803b467340SShuo Chen     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
813b467340SShuo Chen     for (ChannelList::iterator it = activeChannels_.begin();
823b467340SShuo Chen         it != activeChannels_.end(); ++it)
833b467340SShuo Chen     {
843b467340SShuo Chen!      (*it)->handleEvent(pollReturnTime_);
853b467340SShuo Chen     }
863b467340SShuo Chen     doPendingFunctors();
873b467340SShuo Chen   }
883b467340SShuo Chen 
893b467340SShuo Chen   LOG_TRACE << "EventLoop " << this << " stop looping";
903b467340SShuo Chen   looping_ = false;
913b467340SShuo Chen }
923b467340SShuo Chen 
933b467340SShuo Chen void EventLoop::quit()
943b467340SShuo Chen {
953b467340SShuo Chen   quit_ = true;
963b467340SShuo Chen   if (!isInLoopThread())
973b467340SShuo Chen   {
983b467340SShuo Chen     wakeup();
993b467340SShuo Chen   }
1003b467340SShuo Chen }
1013b467340SShuo Chen 
1023b467340SShuo Chen void EventLoop::runInLoop(const Functor& cb)
1033b467340SShuo Chen {
1043b467340SShuo Chen   if (isInLoopThread())
1053b467340SShuo Chen   {
1063b467340SShuo Chen     cb();
1073b467340SShuo Chen   }
1083b467340SShuo Chen   else
1093b467340SShuo Chen   {
1103b467340SShuo Chen     queueInLoop(cb);
1113b467340SShuo Chen     wakeup();
1123b467340SShuo Chen   }
1133b467340SShuo Chen }
1143b467340SShuo Chen 
1153b467340SShuo Chen void EventLoop::queueInLoop(const Functor& cb)
1163b467340SShuo Chen {
1173b467340SShuo Chen   {
1183b467340SShuo Chen   MutexLockGuard lock(mutex_);
1193b467340SShuo Chen   pendingFunctors_.push_back(cb);
1203b467340SShuo Chen   }
1213b467340SShuo Chen 
1223b467340SShuo Chen   if (isInLoopThread() && callingPendingFunctors_)
1233b467340SShuo Chen   {
1243b467340SShuo Chen     wakeup();
1253b467340SShuo Chen   }
1263b467340SShuo Chen }
1273b467340SShuo Chen 
1283b467340SShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1293b467340SShuo Chen {
1303b467340SShuo Chen   return timerQueue_->addTimer(cb, time, 0.0);
1313b467340SShuo Chen }
1323b467340SShuo Chen 
1333b467340SShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1343b467340SShuo Chen {
1353b467340SShuo Chen   Timestamp time(addTime(Timestamp::now(), delay));
1363b467340SShuo Chen   return runAt(time, cb);
1373b467340SShuo Chen }
1383b467340SShuo Chen 
1393b467340SShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1403b467340SShuo Chen {
1413b467340SShuo Chen   Timestamp time(addTime(Timestamp::now(), interval));
1423b467340SShuo Chen   return timerQueue_->addTimer(cb, time, interval);
1433b467340SShuo Chen }
1443b467340SShuo Chen 
1453b467340SShuo Chen void EventLoop::updateChannel(Channel* channel)
1463b467340SShuo Chen {
1473b467340SShuo Chen   assert(channel->ownerLoop() == this);
1483b467340SShuo Chen   assertInLoopThread();
1493b467340SShuo Chen   poller_->updateChannel(channel);
1503b467340SShuo Chen }
1513b467340SShuo Chen 
1523b467340SShuo Chen void EventLoop::removeChannel(Channel* channel)
1533b467340SShuo Chen {
1543b467340SShuo Chen   assert(channel->ownerLoop() == this);
1553b467340SShuo Chen   assertInLoopThread();
1563b467340SShuo Chen   poller_->removeChannel(channel);
1573b467340SShuo Chen }
1583b467340SShuo Chen 
1593b467340SShuo Chen void EventLoop::abortNotInLoopThread()
1603b467340SShuo Chen {
1613b467340SShuo Chen   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1623b467340SShuo Chen             << " was created in threadId_ = " << threadId_
1633b467340SShuo Chen             << ", current thread id = " <<  CurrentThread::tid();
1643b467340SShuo Chen }
1653b467340SShuo Chen 
1663b467340SShuo Chen void EventLoop::wakeup()
1673b467340SShuo Chen {
1683b467340SShuo Chen   uint64_t one = 1;
1693b467340SShuo Chen   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1703b467340SShuo Chen   if (n != sizeof one)
1713b467340SShuo Chen   {
1723b467340SShuo Chen     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1733b467340SShuo Chen   }
1743b467340SShuo Chen }
1753b467340SShuo Chen 
1763b467340SShuo Chen void EventLoop::handleRead()
1773b467340SShuo Chen {
1783b467340SShuo Chen   uint64_t one = 1;
1793b467340SShuo Chen   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1803b467340SShuo Chen   if (n != sizeof one)
1813b467340SShuo Chen   {
1823b467340SShuo Chen     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1833b467340SShuo Chen   }
1843b467340SShuo Chen }
1853b467340SShuo Chen 
1863b467340SShuo Chen void EventLoop::doPendingFunctors()
1873b467340SShuo Chen {
1883b467340SShuo Chen   std::vector<Functor> functors;
1893b467340SShuo Chen   callingPendingFunctors_ = true;
1903b467340SShuo Chen 
1913b467340SShuo Chen   {
1923b467340SShuo Chen   MutexLockGuard lock(mutex_);
1933b467340SShuo Chen   functors.swap(pendingFunctors_);
1943b467340SShuo Chen   }
1953b467340SShuo Chen 
1963b467340SShuo Chen   for (size_t i = 0; i < functors.size(); ++i)
1973b467340SShuo Chen   {
1983b467340SShuo Chen     functors[i]();
1993b467340SShuo Chen   }
2003b467340SShuo Chen   callingPendingFunctors_ = false;
2013b467340SShuo Chen }
2023b467340SShuo Chen 
203