EventLoop.cc revision 65c497a3
165c497a3SShuo Chen// excerpts from http://code.google.com/p/muduo/
265c497a3SShuo Chen//
365c497a3SShuo Chen// Use of this source code is governed by a BSD-style license
465c497a3SShuo Chen// that can be found in the License file.
565c497a3SShuo Chen//
665c497a3SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
765c497a3SShuo Chen
865c497a3SShuo Chen#include "EventLoop.h"
965c497a3SShuo Chen
1065c497a3SShuo Chen#include "Channel.h"
1165c497a3SShuo Chen#include "Poller.h"
1265c497a3SShuo Chen#include "TimerQueue.h"
1365c497a3SShuo Chen
1465c497a3SShuo Chen#include "logging/Logging.h"
1565c497a3SShuo Chen
1665c497a3SShuo Chen#include <boost/bind.hpp>
1765c497a3SShuo Chen
1865c497a3SShuo Chen#include <assert.h>
1965c497a3SShuo Chen#include <sys/eventfd.h>
2065c497a3SShuo Chen
2165c497a3SShuo Chenusing namespace muduo;
2265c497a3SShuo Chen
2365c497a3SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
2465c497a3SShuo Chenconst int kPollTimeMs = 10000;
2565c497a3SShuo Chen
2665c497a3SShuo Chenstatic int createEventfd()
2765c497a3SShuo Chen{
2865c497a3SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2965c497a3SShuo Chen  if (evtfd < 0)
3065c497a3SShuo Chen  {
3165c497a3SShuo Chen    LOG_SYSERR << "Failed in eventfd";
3265c497a3SShuo Chen    abort();
3365c497a3SShuo Chen  }
3465c497a3SShuo Chen  return evtfd;
3565c497a3SShuo Chen}
3665c497a3SShuo Chen
3765c497a3SShuo ChenEventLoop::EventLoop()
3865c497a3SShuo Chen  : looping_(false),
3965c497a3SShuo Chen    quit_(false),
4065c497a3SShuo Chen    callingPendingFunctors_(false),
4165c497a3SShuo Chen    threadId_(CurrentThread::tid()),
4265c497a3SShuo Chen    poller_(new Poller(this)),
4365c497a3SShuo Chen    timerQueue_(new TimerQueue(this)),
4465c497a3SShuo Chen    wakeupFd_(createEventfd()),
4565c497a3SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
4665c497a3SShuo Chen{
4765c497a3SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
4865c497a3SShuo Chen  if (t_loopInThisThread)
4965c497a3SShuo Chen  {
5065c497a3SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
5165c497a3SShuo Chen              << " exists in this thread " << threadId_;
5265c497a3SShuo Chen  }
5365c497a3SShuo Chen  else
5465c497a3SShuo Chen  {
5565c497a3SShuo Chen    t_loopInThisThread = this;
5665c497a3SShuo Chen  }
5765c497a3SShuo Chen  wakeupChannel_->setReadCallback(
5865c497a3SShuo Chen      boost::bind(&EventLoop::handleRead, this));
5965c497a3SShuo Chen  // we are always reading the wakeupfd
6065c497a3SShuo Chen  wakeupChannel_->enableReading();
6165c497a3SShuo Chen}
6265c497a3SShuo Chen
6365c497a3SShuo ChenEventLoop::~EventLoop()
6465c497a3SShuo Chen{
6565c497a3SShuo Chen  assert(!looping_);
6665c497a3SShuo Chen  ::close(wakeupFd_);
6765c497a3SShuo Chen  t_loopInThisThread = NULL;
6865c497a3SShuo Chen}
6965c497a3SShuo Chen
7065c497a3SShuo Chenvoid EventLoop::loop()
7165c497a3SShuo Chen{
7265c497a3SShuo Chen  assert(!looping_);
7365c497a3SShuo Chen  assertInLoopThread();
7465c497a3SShuo Chen  looping_ = true;
7565c497a3SShuo Chen  quit_ = false;
7665c497a3SShuo Chen
7765c497a3SShuo Chen  while (!quit_)
7865c497a3SShuo Chen  {
7965c497a3SShuo Chen    activeChannels_.clear();
8065c497a3SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
8165c497a3SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
8265c497a3SShuo Chen        it != activeChannels_.end(); ++it)
8365c497a3SShuo Chen    {
8465c497a3SShuo Chen      (*it)->handleEvent();
8565c497a3SShuo Chen    }
8665c497a3SShuo Chen    doPendingFunctors();
8765c497a3SShuo Chen  }
8865c497a3SShuo Chen
8965c497a3SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
9065c497a3SShuo Chen  looping_ = false;
9165c497a3SShuo Chen}
9265c497a3SShuo Chen
9365c497a3SShuo Chenvoid EventLoop::quit()
9465c497a3SShuo Chen{
9565c497a3SShuo Chen  quit_ = true;
9665c497a3SShuo Chen  if (!isInLoopThread())
9765c497a3SShuo Chen  {
9865c497a3SShuo Chen    wakeup();
9965c497a3SShuo Chen  }
10065c497a3SShuo Chen}
10165c497a3SShuo Chen
10265c497a3SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
10365c497a3SShuo Chen{
10465c497a3SShuo Chen  if (isInLoopThread())
10565c497a3SShuo Chen  {
10665c497a3SShuo Chen    cb();
10765c497a3SShuo Chen  }
10865c497a3SShuo Chen  else
10965c497a3SShuo Chen  {
11065c497a3SShuo Chen    queueInLoop(cb);
11165c497a3SShuo Chen    wakeup();
11265c497a3SShuo Chen  }
11365c497a3SShuo Chen}
11465c497a3SShuo Chen
11565c497a3SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
11665c497a3SShuo Chen{
11765c497a3SShuo Chen  {
11865c497a3SShuo Chen  MutexLockGuard lock(mutex_);
11965c497a3SShuo Chen  pendingFunctors_.push_back(cb);
12065c497a3SShuo Chen  }
12165c497a3SShuo Chen
12265c497a3SShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
12365c497a3SShuo Chen  {
12465c497a3SShuo Chen    wakeup();
12565c497a3SShuo Chen  }
12665c497a3SShuo Chen}
12765c497a3SShuo Chen
12865c497a3SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
12965c497a3SShuo Chen{
13065c497a3SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
13165c497a3SShuo Chen}
13265c497a3SShuo Chen
13365c497a3SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
13465c497a3SShuo Chen{
13565c497a3SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
13665c497a3SShuo Chen  return runAt(time, cb);
13765c497a3SShuo Chen}
13865c497a3SShuo Chen
13965c497a3SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
14065c497a3SShuo Chen{
14165c497a3SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
14265c497a3SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
14365c497a3SShuo Chen}
14465c497a3SShuo Chen
14565c497a3SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
14665c497a3SShuo Chen{
14765c497a3SShuo Chen  assert(channel->ownerLoop() == this);
14865c497a3SShuo Chen  assertInLoopThread();
14965c497a3SShuo Chen  poller_->updateChannel(channel);
15065c497a3SShuo Chen}
15165c497a3SShuo Chen
15265c497a3SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
15365c497a3SShuo Chen{
15465c497a3SShuo Chen  assert(channel->ownerLoop() == this);
15565c497a3SShuo Chen  assertInLoopThread();
15665c497a3SShuo Chen  poller_->removeChannel(channel);
15765c497a3SShuo Chen}
15865c497a3SShuo Chen
15965c497a3SShuo Chenvoid EventLoop::abortNotInLoopThread()
16065c497a3SShuo Chen{
16165c497a3SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
16265c497a3SShuo Chen            << " was created in threadId_ = " << threadId_
16365c497a3SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
16465c497a3SShuo Chen}
16565c497a3SShuo Chen
16665c497a3SShuo Chenvoid EventLoop::wakeup()
16765c497a3SShuo Chen{
16865c497a3SShuo Chen  uint64_t one = 1;
16965c497a3SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
17065c497a3SShuo Chen  if (n != sizeof one)
17165c497a3SShuo Chen  {
17265c497a3SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
17365c497a3SShuo Chen  }
17465c497a3SShuo Chen}
17565c497a3SShuo Chen
17665c497a3SShuo Chenvoid EventLoop::handleRead()
17765c497a3SShuo Chen{
17865c497a3SShuo Chen  uint64_t one = 1;
17965c497a3SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
18065c497a3SShuo Chen  if (n != sizeof one)
18165c497a3SShuo Chen  {
18265c497a3SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
18365c497a3SShuo Chen  }
18465c497a3SShuo Chen}
18565c497a3SShuo Chen
18665c497a3SShuo Chenvoid EventLoop::doPendingFunctors()
18765c497a3SShuo Chen{
18865c497a3SShuo Chen  std::vector<Functor> functors;
18965c497a3SShuo Chen  callingPendingFunctors_ = true;
19065c497a3SShuo Chen
19165c497a3SShuo Chen  {
19265c497a3SShuo Chen  MutexLockGuard lock(mutex_);
19365c497a3SShuo Chen  functors.swap(pendingFunctors_);
19465c497a3SShuo Chen  }
19565c497a3SShuo Chen
19665c497a3SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
19765c497a3SShuo Chen  {
19865c497a3SShuo Chen    functors[i]();
19965c497a3SShuo Chen  }
20065c497a3SShuo Chen  callingPendingFunctors_ = false;
20165c497a3SShuo Chen}
20265c497a3SShuo Chen
203