15af4b7fbSShuo Chen // excerpts from http://code.google.com/p/muduo/
25af4b7fbSShuo Chen //
35af4b7fbSShuo Chen // Use of this source code is governed by a BSD-style license
45af4b7fbSShuo Chen // that can be found in the License file.
55af4b7fbSShuo Chen //
65af4b7fbSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
75af4b7fbSShuo Chen 
85af4b7fbSShuo Chen #include "EventLoop.h"
95af4b7fbSShuo Chen 
105af4b7fbSShuo Chen #include "Channel.h"
115af4b7fbSShuo Chen #include "Poller.h"
125af4b7fbSShuo Chen #include "TimerQueue.h"
135af4b7fbSShuo Chen 
145af4b7fbSShuo Chen #include "logging/Logging.h"
155af4b7fbSShuo Chen 
165af4b7fbSShuo Chen #include <boost/bind.hpp>
175af4b7fbSShuo Chen 
185af4b7fbSShuo Chen #include <assert.h>
195af4b7fbSShuo Chen+#include <signal.h>
205af4b7fbSShuo Chen #include <sys/eventfd.h>
215af4b7fbSShuo Chen 
225af4b7fbSShuo Chen using namespace muduo;
235af4b7fbSShuo Chen 
245af4b7fbSShuo Chen __thread EventLoop* t_loopInThisThread = 0;
255af4b7fbSShuo Chen const int kPollTimeMs = 10000;
265af4b7fbSShuo Chen 
275af4b7fbSShuo Chen static int createEventfd()
285af4b7fbSShuo Chen {
295af4b7fbSShuo Chen   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
305af4b7fbSShuo Chen   if (evtfd < 0)
315af4b7fbSShuo Chen   {
325af4b7fbSShuo Chen     LOG_SYSERR << "Failed in eventfd";
335af4b7fbSShuo Chen     abort();
345af4b7fbSShuo Chen   }
355af4b7fbSShuo Chen   return evtfd;
365af4b7fbSShuo Chen }
375af4b7fbSShuo Chen 
385af4b7fbSShuo Chen+class IgnoreSigPipe
395af4b7fbSShuo Chen+{
405af4b7fbSShuo Chen+ public:
415af4b7fbSShuo Chen+  IgnoreSigPipe()
425af4b7fbSShuo Chen+  {
435af4b7fbSShuo Chen+    ::signal(SIGPIPE, SIG_IGN);
445af4b7fbSShuo Chen+  }
455af4b7fbSShuo Chen+};
465af4b7fbSShuo Chen+
475af4b7fbSShuo Chen+IgnoreSigPipe initObj;
485af4b7fbSShuo Chen+
495af4b7fbSShuo Chen EventLoop::EventLoop()
505af4b7fbSShuo Chen   : looping_(false),
515af4b7fbSShuo Chen     quit_(false),
525af4b7fbSShuo Chen     callingPendingFunctors_(false),
535af4b7fbSShuo Chen     threadId_(CurrentThread::tid()),
545af4b7fbSShuo Chen     poller_(new Poller(this)),
555af4b7fbSShuo Chen     timerQueue_(new TimerQueue(this)),
565af4b7fbSShuo Chen     wakeupFd_(createEventfd()),
575af4b7fbSShuo Chen     wakeupChannel_(new Channel(this, wakeupFd_))
585af4b7fbSShuo Chen {
595af4b7fbSShuo Chen   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
605af4b7fbSShuo Chen   if (t_loopInThisThread)
615af4b7fbSShuo Chen   {
625af4b7fbSShuo Chen     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
635af4b7fbSShuo Chen               << " exists in this thread " << threadId_;
645af4b7fbSShuo Chen   }
655af4b7fbSShuo Chen   else
665af4b7fbSShuo Chen   {
675af4b7fbSShuo Chen     t_loopInThisThread = this;
685af4b7fbSShuo Chen   }
695af4b7fbSShuo Chen   wakeupChannel_->setReadCallback(
705af4b7fbSShuo Chen       boost::bind(&EventLoop::handleRead, this));
715af4b7fbSShuo Chen   // we are always reading the wakeupfd
725af4b7fbSShuo Chen   wakeupChannel_->enableReading();
735af4b7fbSShuo Chen }
745af4b7fbSShuo Chen 
755af4b7fbSShuo Chen EventLoop::~EventLoop()
765af4b7fbSShuo Chen {
775af4b7fbSShuo Chen   assert(!looping_);
785af4b7fbSShuo Chen   ::close(wakeupFd_);
795af4b7fbSShuo Chen   t_loopInThisThread = NULL;
805af4b7fbSShuo Chen }
815af4b7fbSShuo Chen 
825af4b7fbSShuo Chen void EventLoop::loop()
835af4b7fbSShuo Chen {
845af4b7fbSShuo Chen   assert(!looping_);
855af4b7fbSShuo Chen   assertInLoopThread();
865af4b7fbSShuo Chen   looping_ = true;
875af4b7fbSShuo Chen   quit_ = false;
885af4b7fbSShuo Chen 
895af4b7fbSShuo Chen   while (!quit_)
905af4b7fbSShuo Chen   {
915af4b7fbSShuo Chen     activeChannels_.clear();
925af4b7fbSShuo Chen     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
935af4b7fbSShuo Chen     for (ChannelList::iterator it = activeChannels_.begin();
945af4b7fbSShuo Chen         it != activeChannels_.end(); ++it)
955af4b7fbSShuo Chen     {
965af4b7fbSShuo Chen       (*it)->handleEvent(pollReturnTime_);
975af4b7fbSShuo Chen     }
985af4b7fbSShuo Chen     doPendingFunctors();
995af4b7fbSShuo Chen   }
1005af4b7fbSShuo Chen 
1015af4b7fbSShuo Chen   LOG_TRACE << "EventLoop " << this << " stop looping";
1025af4b7fbSShuo Chen   looping_ = false;
1035af4b7fbSShuo Chen }
1045af4b7fbSShuo Chen 
1055af4b7fbSShuo Chen void EventLoop::quit()
1065af4b7fbSShuo Chen {
1075af4b7fbSShuo Chen   quit_ = true;
1085af4b7fbSShuo Chen   if (!isInLoopThread())
1095af4b7fbSShuo Chen   {
1105af4b7fbSShuo Chen     wakeup();
1115af4b7fbSShuo Chen   }
1125af4b7fbSShuo Chen }
1135af4b7fbSShuo Chen 
1145af4b7fbSShuo Chen void EventLoop::runInLoop(const Functor& cb)
1155af4b7fbSShuo Chen {
1165af4b7fbSShuo Chen   if (isInLoopThread())
1175af4b7fbSShuo Chen   {
1185af4b7fbSShuo Chen     cb();
1195af4b7fbSShuo Chen   }
1205af4b7fbSShuo Chen   else
1215af4b7fbSShuo Chen   {
1225af4b7fbSShuo Chen     queueInLoop(cb);
1235af4b7fbSShuo Chen   }
1245af4b7fbSShuo Chen }
1255af4b7fbSShuo Chen 
1265af4b7fbSShuo Chen void EventLoop::queueInLoop(const Functor& cb)
1275af4b7fbSShuo Chen {
1285af4b7fbSShuo Chen   {
1295af4b7fbSShuo Chen   MutexLockGuard lock(mutex_);
1305af4b7fbSShuo Chen   pendingFunctors_.push_back(cb);
1315af4b7fbSShuo Chen   }
1325af4b7fbSShuo Chen 
1330f776063SShuo Chen   if (!isInLoopThread() || callingPendingFunctors_)
1345af4b7fbSShuo Chen   {
1355af4b7fbSShuo Chen     wakeup();
1365af4b7fbSShuo Chen   }
1375af4b7fbSShuo Chen }
1385af4b7fbSShuo Chen 
1395af4b7fbSShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1405af4b7fbSShuo Chen {
1415af4b7fbSShuo Chen   return timerQueue_->addTimer(cb, time, 0.0);
1425af4b7fbSShuo Chen }
1435af4b7fbSShuo Chen 
1445af4b7fbSShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1455af4b7fbSShuo Chen {
1465af4b7fbSShuo Chen   Timestamp time(addTime(Timestamp::now(), delay));
1475af4b7fbSShuo Chen   return runAt(time, cb);
1485af4b7fbSShuo Chen }
1495af4b7fbSShuo Chen 
1505af4b7fbSShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1515af4b7fbSShuo Chen {
1525af4b7fbSShuo Chen   Timestamp time(addTime(Timestamp::now(), interval));
1535af4b7fbSShuo Chen   return timerQueue_->addTimer(cb, time, interval);
1545af4b7fbSShuo Chen }
1555af4b7fbSShuo Chen 
1565af4b7fbSShuo Chen void EventLoop::updateChannel(Channel* channel)
1575af4b7fbSShuo Chen {
1585af4b7fbSShuo Chen   assert(channel->ownerLoop() == this);
1595af4b7fbSShuo Chen   assertInLoopThread();
1605af4b7fbSShuo Chen   poller_->updateChannel(channel);
1615af4b7fbSShuo Chen }
1625af4b7fbSShuo Chen 
1635af4b7fbSShuo Chen void EventLoop::removeChannel(Channel* channel)
1645af4b7fbSShuo Chen {
1655af4b7fbSShuo Chen   assert(channel->ownerLoop() == this);
1665af4b7fbSShuo Chen   assertInLoopThread();
1675af4b7fbSShuo Chen   poller_->removeChannel(channel);
1685af4b7fbSShuo Chen }
1695af4b7fbSShuo Chen 
1705af4b7fbSShuo Chen void EventLoop::abortNotInLoopThread()
1715af4b7fbSShuo Chen {
1725af4b7fbSShuo Chen   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1735af4b7fbSShuo Chen             << " was created in threadId_ = " << threadId_
1745af4b7fbSShuo Chen             << ", current thread id = " <<  CurrentThread::tid();
1755af4b7fbSShuo Chen }
1765af4b7fbSShuo Chen 
1775af4b7fbSShuo Chen void EventLoop::wakeup()
1785af4b7fbSShuo Chen {
1795af4b7fbSShuo Chen   uint64_t one = 1;
1805af4b7fbSShuo Chen   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1815af4b7fbSShuo Chen   if (n != sizeof one)
1825af4b7fbSShuo Chen   {
1835af4b7fbSShuo Chen     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1845af4b7fbSShuo Chen   }
1855af4b7fbSShuo Chen }
1865af4b7fbSShuo Chen 
1875af4b7fbSShuo Chen void EventLoop::handleRead()
1885af4b7fbSShuo Chen {
1895af4b7fbSShuo Chen   uint64_t one = 1;
1905af4b7fbSShuo Chen   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1915af4b7fbSShuo Chen   if (n != sizeof one)
1925af4b7fbSShuo Chen   {
1935af4b7fbSShuo Chen     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1945af4b7fbSShuo Chen   }
1955af4b7fbSShuo Chen }
1965af4b7fbSShuo Chen 
1975af4b7fbSShuo Chen void EventLoop::doPendingFunctors()
1985af4b7fbSShuo Chen {
1995af4b7fbSShuo Chen   std::vector<Functor> functors;
2005af4b7fbSShuo Chen   callingPendingFunctors_ = true;
2015af4b7fbSShuo Chen 
2025af4b7fbSShuo Chen   {
2035af4b7fbSShuo Chen   MutexLockGuard lock(mutex_);
2045af4b7fbSShuo Chen   functors.swap(pendingFunctors_);
2055af4b7fbSShuo Chen   }
2065af4b7fbSShuo Chen 
2075af4b7fbSShuo Chen   for (size_t i = 0; i < functors.size(); ++i)
2085af4b7fbSShuo Chen   {
2095af4b7fbSShuo Chen     functors[i]();
2105af4b7fbSShuo Chen   }
2115af4b7fbSShuo Chen   callingPendingFunctors_ = false;
2125af4b7fbSShuo Chen }
2135af4b7fbSShuo Chen 
214