s09-s08-EventLoop.cc.diff revision 5af4b7fb
15af4b7fbSShuo Chen // excerpts from http://code.google.com/p/muduo/
25af4b7fbSShuo Chen //
35af4b7fbSShuo Chen // Use of this source code is governed by a BSD-style license
45af4b7fbSShuo Chen // that can be found in the License file.
55af4b7fbSShuo Chen //
65af4b7fbSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
75af4b7fbSShuo Chen 
85af4b7fbSShuo Chen #include "EventLoop.h"
95af4b7fbSShuo Chen 
105af4b7fbSShuo Chen #include "Channel.h"
115af4b7fbSShuo Chen #include "Poller.h"
125af4b7fbSShuo Chen #include "TimerQueue.h"
135af4b7fbSShuo Chen 
145af4b7fbSShuo Chen #include "logging/Logging.h"
155af4b7fbSShuo Chen 
165af4b7fbSShuo Chen #include <boost/bind.hpp>
175af4b7fbSShuo Chen 
185af4b7fbSShuo Chen #include <assert.h>
195af4b7fbSShuo Chen+#include <signal.h>
205af4b7fbSShuo Chen #include <sys/eventfd.h>
215af4b7fbSShuo Chen 
225af4b7fbSShuo Chen using namespace muduo;
235af4b7fbSShuo Chen 
245af4b7fbSShuo Chen __thread EventLoop* t_loopInThisThread = 0;
255af4b7fbSShuo Chen const int kPollTimeMs = 10000;
265af4b7fbSShuo Chen 
275af4b7fbSShuo Chen static int createEventfd()
285af4b7fbSShuo Chen {
295af4b7fbSShuo Chen   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
305af4b7fbSShuo Chen   if (evtfd < 0)
315af4b7fbSShuo Chen   {
325af4b7fbSShuo Chen     LOG_SYSERR << "Failed in eventfd";
335af4b7fbSShuo Chen     abort();
345af4b7fbSShuo Chen   }
355af4b7fbSShuo Chen   return evtfd;
365af4b7fbSShuo Chen }
375af4b7fbSShuo Chen 
385af4b7fbSShuo Chen+class IgnoreSigPipe
395af4b7fbSShuo Chen+{
405af4b7fbSShuo Chen+ public:
415af4b7fbSShuo Chen+  IgnoreSigPipe()
425af4b7fbSShuo Chen+  {
435af4b7fbSShuo Chen+    ::signal(SIGPIPE, SIG_IGN);
445af4b7fbSShuo Chen+  }
455af4b7fbSShuo Chen+};
465af4b7fbSShuo Chen+
475af4b7fbSShuo Chen+IgnoreSigPipe initObj;
485af4b7fbSShuo Chen+
495af4b7fbSShuo Chen EventLoop::EventLoop()
505af4b7fbSShuo Chen   : looping_(false),
515af4b7fbSShuo Chen     quit_(false),
525af4b7fbSShuo Chen     callingPendingFunctors_(false),
535af4b7fbSShuo Chen     threadId_(CurrentThread::tid()),
545af4b7fbSShuo Chen     poller_(new Poller(this)),
555af4b7fbSShuo Chen     timerQueue_(new TimerQueue(this)),
565af4b7fbSShuo Chen     wakeupFd_(createEventfd()),
575af4b7fbSShuo Chen     wakeupChannel_(new Channel(this, wakeupFd_))
585af4b7fbSShuo Chen {
595af4b7fbSShuo Chen   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
605af4b7fbSShuo Chen   if (t_loopInThisThread)
615af4b7fbSShuo Chen   {
625af4b7fbSShuo Chen     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
635af4b7fbSShuo Chen               << " exists in this thread " << threadId_;
645af4b7fbSShuo Chen   }
655af4b7fbSShuo Chen   else
665af4b7fbSShuo Chen   {
675af4b7fbSShuo Chen     t_loopInThisThread = this;
685af4b7fbSShuo Chen   }
695af4b7fbSShuo Chen   wakeupChannel_->setReadCallback(
705af4b7fbSShuo Chen       boost::bind(&EventLoop::handleRead, this));
715af4b7fbSShuo Chen   // we are always reading the wakeupfd
725af4b7fbSShuo Chen   wakeupChannel_->enableReading();
735af4b7fbSShuo Chen }
745af4b7fbSShuo Chen 
755af4b7fbSShuo Chen EventLoop::~EventLoop()
765af4b7fbSShuo Chen {
775af4b7fbSShuo Chen   assert(!looping_);
785af4b7fbSShuo Chen   ::close(wakeupFd_);
795af4b7fbSShuo Chen   t_loopInThisThread = NULL;
805af4b7fbSShuo Chen }
815af4b7fbSShuo Chen 
825af4b7fbSShuo Chen void EventLoop::loop()
835af4b7fbSShuo Chen {
845af4b7fbSShuo Chen   assert(!looping_);
855af4b7fbSShuo Chen   assertInLoopThread();
865af4b7fbSShuo Chen   looping_ = true;
875af4b7fbSShuo Chen   quit_ = false;
885af4b7fbSShuo Chen 
895af4b7fbSShuo Chen   while (!quit_)
905af4b7fbSShuo Chen   {
915af4b7fbSShuo Chen     activeChannels_.clear();
925af4b7fbSShuo Chen     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
935af4b7fbSShuo Chen     for (ChannelList::iterator it = activeChannels_.begin();
945af4b7fbSShuo Chen         it != activeChannels_.end(); ++it)
955af4b7fbSShuo Chen     {
965af4b7fbSShuo Chen       (*it)->handleEvent(pollReturnTime_);
975af4b7fbSShuo Chen     }
985af4b7fbSShuo Chen     doPendingFunctors();
995af4b7fbSShuo Chen   }
1005af4b7fbSShuo Chen 
1015af4b7fbSShuo Chen   LOG_TRACE << "EventLoop " << this << " stop looping";
1025af4b7fbSShuo Chen   looping_ = false;
1035af4b7fbSShuo Chen }
1045af4b7fbSShuo Chen 
1055af4b7fbSShuo Chen void EventLoop::quit()
1065af4b7fbSShuo Chen {
1075af4b7fbSShuo Chen   quit_ = true;
1085af4b7fbSShuo Chen   if (!isInLoopThread())
1095af4b7fbSShuo Chen   {
1105af4b7fbSShuo Chen     wakeup();
1115af4b7fbSShuo Chen   }
1125af4b7fbSShuo Chen }
1135af4b7fbSShuo Chen 
1145af4b7fbSShuo Chen void EventLoop::runInLoop(const Functor& cb)
1155af4b7fbSShuo Chen {
1165af4b7fbSShuo Chen   if (isInLoopThread())
1175af4b7fbSShuo Chen   {
1185af4b7fbSShuo Chen     cb();
1195af4b7fbSShuo Chen   }
1205af4b7fbSShuo Chen   else
1215af4b7fbSShuo Chen   {
1225af4b7fbSShuo Chen     queueInLoop(cb);
1235af4b7fbSShuo Chen     wakeup();
1245af4b7fbSShuo Chen   }
1255af4b7fbSShuo Chen }
1265af4b7fbSShuo Chen 
1275af4b7fbSShuo Chen void EventLoop::queueInLoop(const Functor& cb)
1285af4b7fbSShuo Chen {
1295af4b7fbSShuo Chen   {
1305af4b7fbSShuo Chen   MutexLockGuard lock(mutex_);
1315af4b7fbSShuo Chen   pendingFunctors_.push_back(cb);
1325af4b7fbSShuo Chen   }
1335af4b7fbSShuo Chen 
1345af4b7fbSShuo Chen   if (isInLoopThread() && callingPendingFunctors_)
1355af4b7fbSShuo Chen   {
1365af4b7fbSShuo Chen     wakeup();
1375af4b7fbSShuo Chen   }
1385af4b7fbSShuo Chen }
1395af4b7fbSShuo Chen 
1405af4b7fbSShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1415af4b7fbSShuo Chen {
1425af4b7fbSShuo Chen   return timerQueue_->addTimer(cb, time, 0.0);
1435af4b7fbSShuo Chen }
1445af4b7fbSShuo Chen 
1455af4b7fbSShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1465af4b7fbSShuo Chen {
1475af4b7fbSShuo Chen   Timestamp time(addTime(Timestamp::now(), delay));
1485af4b7fbSShuo Chen   return runAt(time, cb);
1495af4b7fbSShuo Chen }
1505af4b7fbSShuo Chen 
1515af4b7fbSShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1525af4b7fbSShuo Chen {
1535af4b7fbSShuo Chen   Timestamp time(addTime(Timestamp::now(), interval));
1545af4b7fbSShuo Chen   return timerQueue_->addTimer(cb, time, interval);
1555af4b7fbSShuo Chen }
1565af4b7fbSShuo Chen 
1575af4b7fbSShuo Chen void EventLoop::updateChannel(Channel* channel)
1585af4b7fbSShuo Chen {
1595af4b7fbSShuo Chen   assert(channel->ownerLoop() == this);
1605af4b7fbSShuo Chen   assertInLoopThread();
1615af4b7fbSShuo Chen   poller_->updateChannel(channel);
1625af4b7fbSShuo Chen }
1635af4b7fbSShuo Chen 
1645af4b7fbSShuo Chen void EventLoop::removeChannel(Channel* channel)
1655af4b7fbSShuo Chen {
1665af4b7fbSShuo Chen   assert(channel->ownerLoop() == this);
1675af4b7fbSShuo Chen   assertInLoopThread();
1685af4b7fbSShuo Chen   poller_->removeChannel(channel);
1695af4b7fbSShuo Chen }
1705af4b7fbSShuo Chen 
1715af4b7fbSShuo Chen void EventLoop::abortNotInLoopThread()
1725af4b7fbSShuo Chen {
1735af4b7fbSShuo Chen   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1745af4b7fbSShuo Chen             << " was created in threadId_ = " << threadId_
1755af4b7fbSShuo Chen             << ", current thread id = " <<  CurrentThread::tid();
1765af4b7fbSShuo Chen }
1775af4b7fbSShuo Chen 
1785af4b7fbSShuo Chen void EventLoop::wakeup()
1795af4b7fbSShuo Chen {
1805af4b7fbSShuo Chen   uint64_t one = 1;
1815af4b7fbSShuo Chen   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1825af4b7fbSShuo Chen   if (n != sizeof one)
1835af4b7fbSShuo Chen   {
1845af4b7fbSShuo Chen     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1855af4b7fbSShuo Chen   }
1865af4b7fbSShuo Chen }
1875af4b7fbSShuo Chen 
1885af4b7fbSShuo Chen void EventLoop::handleRead()
1895af4b7fbSShuo Chen {
1905af4b7fbSShuo Chen   uint64_t one = 1;
1915af4b7fbSShuo Chen   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1925af4b7fbSShuo Chen   if (n != sizeof one)
1935af4b7fbSShuo Chen   {
1945af4b7fbSShuo Chen     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1955af4b7fbSShuo Chen   }
1965af4b7fbSShuo Chen }
1975af4b7fbSShuo Chen 
1985af4b7fbSShuo Chen void EventLoop::doPendingFunctors()
1995af4b7fbSShuo Chen {
2005af4b7fbSShuo Chen   std::vector<Functor> functors;
2015af4b7fbSShuo Chen   callingPendingFunctors_ = true;
2025af4b7fbSShuo Chen 
2035af4b7fbSShuo Chen   {
2045af4b7fbSShuo Chen   MutexLockGuard lock(mutex_);
2055af4b7fbSShuo Chen   functors.swap(pendingFunctors_);
2065af4b7fbSShuo Chen   }
2075af4b7fbSShuo Chen 
2085af4b7fbSShuo Chen   for (size_t i = 0; i < functors.size(); ++i)
2095af4b7fbSShuo Chen   {
2105af4b7fbSShuo Chen     functors[i]();
2115af4b7fbSShuo Chen   }
2125af4b7fbSShuo Chen   callingPendingFunctors_ = false;
2135af4b7fbSShuo Chen }
2145af4b7fbSShuo Chen 
215