EventLoop.cc revision 0f776063
12745a763SShuo Chen// excerpts from http://code.google.com/p/muduo/
22745a763SShuo Chen//
32745a763SShuo Chen// Use of this source code is governed by a BSD-style license
42745a763SShuo Chen// that can be found in the License file.
52745a763SShuo Chen//
62745a763SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
72745a763SShuo Chen
82745a763SShuo Chen#include "EventLoop.h"
92745a763SShuo Chen
102745a763SShuo Chen#include "Channel.h"
112745a763SShuo Chen#include "Poller.h"
122745a763SShuo Chen#include "TimerQueue.h"
132745a763SShuo Chen
142745a763SShuo Chen#include "logging/Logging.h"
152745a763SShuo Chen
162745a763SShuo Chen#include <boost/bind.hpp>
172745a763SShuo Chen
182745a763SShuo Chen#include <assert.h>
192745a763SShuo Chen#include <sys/eventfd.h>
202745a763SShuo Chen
212745a763SShuo Chenusing namespace muduo;
222745a763SShuo Chen
232745a763SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
242745a763SShuo Chenconst int kPollTimeMs = 10000;
252745a763SShuo Chen
262745a763SShuo Chenstatic int createEventfd()
272745a763SShuo Chen{
282745a763SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
292745a763SShuo Chen  if (evtfd < 0)
302745a763SShuo Chen  {
312745a763SShuo Chen    LOG_SYSERR << "Failed in eventfd";
322745a763SShuo Chen    abort();
332745a763SShuo Chen  }
342745a763SShuo Chen  return evtfd;
352745a763SShuo Chen}
362745a763SShuo Chen
372745a763SShuo ChenEventLoop::EventLoop()
382745a763SShuo Chen  : looping_(false),
392745a763SShuo Chen    quit_(false),
402745a763SShuo Chen    callingPendingFunctors_(false),
412745a763SShuo Chen    threadId_(CurrentThread::tid()),
422745a763SShuo Chen    poller_(new Poller(this)),
432745a763SShuo Chen    timerQueue_(new TimerQueue(this)),
442745a763SShuo Chen    wakeupFd_(createEventfd()),
452745a763SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
462745a763SShuo Chen{
472745a763SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
482745a763SShuo Chen  if (t_loopInThisThread)
492745a763SShuo Chen  {
502745a763SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
512745a763SShuo Chen              << " exists in this thread " << threadId_;
522745a763SShuo Chen  }
532745a763SShuo Chen  else
542745a763SShuo Chen  {
552745a763SShuo Chen    t_loopInThisThread = this;
562745a763SShuo Chen  }
572745a763SShuo Chen  wakeupChannel_->setReadCallback(
582745a763SShuo Chen      boost::bind(&EventLoop::handleRead, this));
592745a763SShuo Chen  // we are always reading the wakeupfd
602745a763SShuo Chen  wakeupChannel_->enableReading();
612745a763SShuo Chen}
622745a763SShuo Chen
632745a763SShuo ChenEventLoop::~EventLoop()
642745a763SShuo Chen{
652745a763SShuo Chen  assert(!looping_);
662745a763SShuo Chen  ::close(wakeupFd_);
672745a763SShuo Chen  t_loopInThisThread = NULL;
682745a763SShuo Chen}
692745a763SShuo Chen
702745a763SShuo Chenvoid EventLoop::loop()
712745a763SShuo Chen{
722745a763SShuo Chen  assert(!looping_);
732745a763SShuo Chen  assertInLoopThread();
742745a763SShuo Chen  looping_ = true;
752745a763SShuo Chen  quit_ = false;
762745a763SShuo Chen
772745a763SShuo Chen  while (!quit_)
782745a763SShuo Chen  {
792745a763SShuo Chen    activeChannels_.clear();
802745a763SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
812745a763SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
822745a763SShuo Chen        it != activeChannels_.end(); ++it)
832745a763SShuo Chen    {
842745a763SShuo Chen      (*it)->handleEvent();
852745a763SShuo Chen    }
862745a763SShuo Chen    doPendingFunctors();
872745a763SShuo Chen  }
882745a763SShuo Chen
892745a763SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
902745a763SShuo Chen  looping_ = false;
912745a763SShuo Chen}
922745a763SShuo Chen
932745a763SShuo Chenvoid EventLoop::quit()
942745a763SShuo Chen{
952745a763SShuo Chen  quit_ = true;
962745a763SShuo Chen  if (!isInLoopThread())
972745a763SShuo Chen  {
982745a763SShuo Chen    wakeup();
992745a763SShuo Chen  }
1002745a763SShuo Chen}
1012745a763SShuo Chen
1022745a763SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
1032745a763SShuo Chen{
1042745a763SShuo Chen  if (isInLoopThread())
1052745a763SShuo Chen  {
1062745a763SShuo Chen    cb();
1072745a763SShuo Chen  }
1082745a763SShuo Chen  else
1092745a763SShuo Chen  {
1102745a763SShuo Chen    queueInLoop(cb);
1112745a763SShuo Chen  }
1122745a763SShuo Chen}
1132745a763SShuo Chen
1142745a763SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
1152745a763SShuo Chen{
1162745a763SShuo Chen  {
1172745a763SShuo Chen  MutexLockGuard lock(mutex_);
1182745a763SShuo Chen  pendingFunctors_.push_back(cb);
1192745a763SShuo Chen  }
1202745a763SShuo Chen
1210f776063SShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
1222745a763SShuo Chen  {
1232745a763SShuo Chen    wakeup();
1242745a763SShuo Chen  }
1252745a763SShuo Chen}
1262745a763SShuo Chen
1272745a763SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1282745a763SShuo Chen{
1292745a763SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
1302745a763SShuo Chen}
1312745a763SShuo Chen
1322745a763SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1332745a763SShuo Chen{
1342745a763SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
1352745a763SShuo Chen  return runAt(time, cb);
1362745a763SShuo Chen}
1372745a763SShuo Chen
1382745a763SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1392745a763SShuo Chen{
1402745a763SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
1412745a763SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
1422745a763SShuo Chen}
1432745a763SShuo Chen
1442745a763SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
1452745a763SShuo Chen{
1462745a763SShuo Chen  assert(channel->ownerLoop() == this);
1472745a763SShuo Chen  assertInLoopThread();
1482745a763SShuo Chen  poller_->updateChannel(channel);
1492745a763SShuo Chen}
1502745a763SShuo Chen
1512745a763SShuo Chenvoid EventLoop::abortNotInLoopThread()
1522745a763SShuo Chen{
1532745a763SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1542745a763SShuo Chen            << " was created in threadId_ = " << threadId_
1552745a763SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
1562745a763SShuo Chen}
1572745a763SShuo Chen
1582745a763SShuo Chenvoid EventLoop::wakeup()
1592745a763SShuo Chen{
1602745a763SShuo Chen  uint64_t one = 1;
1612745a763SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1622745a763SShuo Chen  if (n != sizeof one)
1632745a763SShuo Chen  {
1642745a763SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1652745a763SShuo Chen  }
1662745a763SShuo Chen}
1672745a763SShuo Chen
1682745a763SShuo Chenvoid EventLoop::handleRead()
1692745a763SShuo Chen{
1702745a763SShuo Chen  uint64_t one = 1;
1712745a763SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1722745a763SShuo Chen  if (n != sizeof one)
1732745a763SShuo Chen  {
1742745a763SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1752745a763SShuo Chen  }
1762745a763SShuo Chen}
1772745a763SShuo Chen
1782745a763SShuo Chenvoid EventLoop::doPendingFunctors()
1792745a763SShuo Chen{
1802745a763SShuo Chen  std::vector<Functor> functors;
1812745a763SShuo Chen  callingPendingFunctors_ = true;
1822745a763SShuo Chen
1832745a763SShuo Chen  {
1842745a763SShuo Chen  MutexLockGuard lock(mutex_);
1852745a763SShuo Chen  functors.swap(pendingFunctors_);
1862745a763SShuo Chen  }
1872745a763SShuo Chen
1882745a763SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
1892745a763SShuo Chen  {
1902745a763SShuo Chen    functors[i]();
1912745a763SShuo Chen  }
1922745a763SShuo Chen  callingPendingFunctors_ = false;
1932745a763SShuo Chen}
1942745a763SShuo Chen
195