170523619SShuo Chen // excerpts from http://code.google.com/p/muduo/
270523619SShuo Chen //
370523619SShuo Chen // Use of this source code is governed by a BSD-style license
470523619SShuo Chen // that can be found in the License file.
570523619SShuo Chen //
670523619SShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
770523619SShuo Chen 
870523619SShuo Chen #include "EventLoop.h"
970523619SShuo Chen 
1070523619SShuo Chen #include "Channel.h"
1170523619SShuo Chen-#include "Poller.h"
1270523619SShuo Chen+#include "EPoller.h"
1370523619SShuo Chen #include "TimerQueue.h"
1470523619SShuo Chen 
1570523619SShuo Chen #include "logging/Logging.h"
1670523619SShuo Chen 
1770523619SShuo Chen #include <boost/bind.hpp>
1870523619SShuo Chen 
1970523619SShuo Chen #include <assert.h>
2070523619SShuo Chen #include <signal.h>
2170523619SShuo Chen #include <sys/eventfd.h>
2270523619SShuo Chen 
2370523619SShuo Chen using namespace muduo;
2470523619SShuo Chen 
2570523619SShuo Chen __thread EventLoop* t_loopInThisThread = 0;
2670523619SShuo Chen const int kPollTimeMs = 10000;
2770523619SShuo Chen 
2870523619SShuo Chen static int createEventfd()
2970523619SShuo Chen {
3070523619SShuo Chen   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
3170523619SShuo Chen   if (evtfd < 0)
3270523619SShuo Chen   {
3370523619SShuo Chen     LOG_SYSERR << "Failed in eventfd";
3470523619SShuo Chen     abort();
3570523619SShuo Chen   }
3670523619SShuo Chen   return evtfd;
3770523619SShuo Chen }
3870523619SShuo Chen 
3970523619SShuo Chen class IgnoreSigPipe
4070523619SShuo Chen {
4170523619SShuo Chen  public:
4270523619SShuo Chen   IgnoreSigPipe()
4370523619SShuo Chen   {
4470523619SShuo Chen     ::signal(SIGPIPE, SIG_IGN);
4570523619SShuo Chen   }
4670523619SShuo Chen };
4770523619SShuo Chen 
4870523619SShuo Chen IgnoreSigPipe initObj;
4970523619SShuo Chen 
5070523619SShuo Chen EventLoop::EventLoop()
5170523619SShuo Chen   : looping_(false),
5270523619SShuo Chen     quit_(false),
5370523619SShuo Chen     callingPendingFunctors_(false),
5470523619SShuo Chen     threadId_(CurrentThread::tid()),
5570523619SShuo Chen-    poller_(new Poller(this)),
5670523619SShuo Chen+    poller_(new EPoller(this)),
5770523619SShuo Chen     timerQueue_(new TimerQueue(this)),
5870523619SShuo Chen     wakeupFd_(createEventfd()),
5970523619SShuo Chen     wakeupChannel_(new Channel(this, wakeupFd_))
6070523619SShuo Chen {
6170523619SShuo Chen   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
6270523619SShuo Chen   if (t_loopInThisThread)
6370523619SShuo Chen   {
6470523619SShuo Chen     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
6570523619SShuo Chen               << " exists in this thread " << threadId_;
6670523619SShuo Chen   }
6770523619SShuo Chen   else
6870523619SShuo Chen   {
6970523619SShuo Chen     t_loopInThisThread = this;
7070523619SShuo Chen   }
7170523619SShuo Chen   wakeupChannel_->setReadCallback(
7270523619SShuo Chen       boost::bind(&EventLoop::handleRead, this));
7370523619SShuo Chen   // we are always reading the wakeupfd
7470523619SShuo Chen   wakeupChannel_->enableReading();
7570523619SShuo Chen }
7670523619SShuo Chen 
7770523619SShuo Chen EventLoop::~EventLoop()
7870523619SShuo Chen {
7970523619SShuo Chen   assert(!looping_);
8070523619SShuo Chen   ::close(wakeupFd_);
8170523619SShuo Chen   t_loopInThisThread = NULL;
8270523619SShuo Chen }
8370523619SShuo Chen 
8470523619SShuo Chen void EventLoop::loop()
8570523619SShuo Chen {
8670523619SShuo Chen   assert(!looping_);
8770523619SShuo Chen   assertInLoopThread();
8870523619SShuo Chen   looping_ = true;
8970523619SShuo Chen   quit_ = false;
9070523619SShuo Chen 
9170523619SShuo Chen   while (!quit_)
9270523619SShuo Chen   {
9370523619SShuo Chen     activeChannels_.clear();
9470523619SShuo Chen     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
9570523619SShuo Chen     for (ChannelList::iterator it = activeChannels_.begin();
9670523619SShuo Chen         it != activeChannels_.end(); ++it)
9770523619SShuo Chen     {
9870523619SShuo Chen       (*it)->handleEvent(pollReturnTime_);
9970523619SShuo Chen     }
10070523619SShuo Chen     doPendingFunctors();
10170523619SShuo Chen   }
10270523619SShuo Chen 
10370523619SShuo Chen   LOG_TRACE << "EventLoop " << this << " stop looping";
10470523619SShuo Chen   looping_ = false;
10570523619SShuo Chen }
10670523619SShuo Chen 
10770523619SShuo Chen void EventLoop::quit()
10870523619SShuo Chen {
10970523619SShuo Chen   quit_ = true;
11070523619SShuo Chen   if (!isInLoopThread())
11170523619SShuo Chen   {
11270523619SShuo Chen     wakeup();
11370523619SShuo Chen   }
11470523619SShuo Chen }
11570523619SShuo Chen 
11670523619SShuo Chen void EventLoop::runInLoop(const Functor& cb)
11770523619SShuo Chen {
11870523619SShuo Chen   if (isInLoopThread())
11970523619SShuo Chen   {
12070523619SShuo Chen     cb();
12170523619SShuo Chen   }
12270523619SShuo Chen   else
12370523619SShuo Chen   {
12470523619SShuo Chen     queueInLoop(cb);
12570523619SShuo Chen   }
12670523619SShuo Chen }
12770523619SShuo Chen 
12870523619SShuo Chen void EventLoop::queueInLoop(const Functor& cb)
12970523619SShuo Chen {
13070523619SShuo Chen   {
13170523619SShuo Chen   MutexLockGuard lock(mutex_);
13270523619SShuo Chen   pendingFunctors_.push_back(cb);
13370523619SShuo Chen   }
13470523619SShuo Chen 
13570523619SShuo Chen   if (!isInLoopThread() || callingPendingFunctors_)
13670523619SShuo Chen   {
13770523619SShuo Chen     wakeup();
13870523619SShuo Chen   }
13970523619SShuo Chen }
14070523619SShuo Chen 
14170523619SShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
14270523619SShuo Chen {
14370523619SShuo Chen   return timerQueue_->addTimer(cb, time, 0.0);
14470523619SShuo Chen }
14570523619SShuo Chen 
14670523619SShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
14770523619SShuo Chen {
14870523619SShuo Chen   Timestamp time(addTime(Timestamp::now(), delay));
14970523619SShuo Chen   return runAt(time, cb);
15070523619SShuo Chen }
15170523619SShuo Chen 
15270523619SShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
15370523619SShuo Chen {
15470523619SShuo Chen   Timestamp time(addTime(Timestamp::now(), interval));
15570523619SShuo Chen   return timerQueue_->addTimer(cb, time, interval);
15670523619SShuo Chen }
15770523619SShuo Chen 
15870523619SShuo Chen void EventLoop::cancel(TimerId timerId)
15970523619SShuo Chen {
16070523619SShuo Chen   return timerQueue_->cancel(timerId);
16170523619SShuo Chen }
16270523619SShuo Chen 
16370523619SShuo Chen void EventLoop::updateChannel(Channel* channel)
16470523619SShuo Chen {
16570523619SShuo Chen   assert(channel->ownerLoop() == this);
16670523619SShuo Chen   assertInLoopThread();
16770523619SShuo Chen   poller_->updateChannel(channel);
16870523619SShuo Chen }
16970523619SShuo Chen 
17070523619SShuo Chen void EventLoop::removeChannel(Channel* channel)
17170523619SShuo Chen {
17270523619SShuo Chen   assert(channel->ownerLoop() == this);
17370523619SShuo Chen   assertInLoopThread();
17470523619SShuo Chen   poller_->removeChannel(channel);
17570523619SShuo Chen }
17670523619SShuo Chen 
17770523619SShuo Chen void EventLoop::abortNotInLoopThread()
17870523619SShuo Chen {
17970523619SShuo Chen   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
18070523619SShuo Chen             << " was created in threadId_ = " << threadId_
18170523619SShuo Chen             << ", current thread id = " <<  CurrentThread::tid();
18270523619SShuo Chen }
18370523619SShuo Chen 
18470523619SShuo Chen void EventLoop::wakeup()
18570523619SShuo Chen {
18670523619SShuo Chen   uint64_t one = 1;
18770523619SShuo Chen   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
18870523619SShuo Chen   if (n != sizeof one)
18970523619SShuo Chen   {
19070523619SShuo Chen     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
19170523619SShuo Chen   }
19270523619SShuo Chen }
19370523619SShuo Chen 
19470523619SShuo Chen void EventLoop::handleRead()
19570523619SShuo Chen {
19670523619SShuo Chen   uint64_t one = 1;
19770523619SShuo Chen   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
19870523619SShuo Chen   if (n != sizeof one)
19970523619SShuo Chen   {
20070523619SShuo Chen     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
20170523619SShuo Chen   }
20270523619SShuo Chen }
20370523619SShuo Chen 
20470523619SShuo Chen void EventLoop::doPendingFunctors()
20570523619SShuo Chen {
20670523619SShuo Chen   std::vector<Functor> functors;
20770523619SShuo Chen   callingPendingFunctors_ = true;
20870523619SShuo Chen 
20970523619SShuo Chen   {
21070523619SShuo Chen   MutexLockGuard lock(mutex_);
21170523619SShuo Chen   functors.swap(pendingFunctors_);
21270523619SShuo Chen   }
21370523619SShuo Chen 
21470523619SShuo Chen   for (size_t i = 0; i < functors.size(); ++i)
21570523619SShuo Chen   {
21670523619SShuo Chen     functors[i]();
21770523619SShuo Chen   }
21870523619SShuo Chen   callingPendingFunctors_ = false;
21970523619SShuo Chen }
22070523619SShuo Chen 
221