EventLoop.cc revision 0f776063
12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/
22a18e699SShuo Chen//
32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license
42a18e699SShuo Chen// that can be found in the License file.
52a18e699SShuo Chen//
62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
72a18e699SShuo Chen
82a18e699SShuo Chen#include "EventLoop.h"
92a18e699SShuo Chen
102a18e699SShuo Chen#include "Channel.h"
112a18e699SShuo Chen#include "Poller.h"
122a18e699SShuo Chen#include "TimerQueue.h"
132a18e699SShuo Chen
142a18e699SShuo Chen#include "logging/Logging.h"
152a18e699SShuo Chen
162a18e699SShuo Chen#include <boost/bind.hpp>
172a18e699SShuo Chen
182a18e699SShuo Chen#include <assert.h>
192a18e699SShuo Chen#include <sys/eventfd.h>
202a18e699SShuo Chen
212a18e699SShuo Chenusing namespace muduo;
222a18e699SShuo Chen
232a18e699SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
242a18e699SShuo Chenconst int kPollTimeMs = 10000;
252a18e699SShuo Chen
262a18e699SShuo Chenstatic int createEventfd()
272a18e699SShuo Chen{
282a18e699SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
292a18e699SShuo Chen  if (evtfd < 0)
302a18e699SShuo Chen  {
312a18e699SShuo Chen    LOG_SYSERR << "Failed in eventfd";
322a18e699SShuo Chen    abort();
332a18e699SShuo Chen  }
342a18e699SShuo Chen  return evtfd;
352a18e699SShuo Chen}
362a18e699SShuo Chen
372a18e699SShuo ChenEventLoop::EventLoop()
382a18e699SShuo Chen  : looping_(false),
392a18e699SShuo Chen    quit_(false),
402a18e699SShuo Chen    callingPendingFunctors_(false),
412a18e699SShuo Chen    threadId_(CurrentThread::tid()),
422a18e699SShuo Chen    poller_(new Poller(this)),
432a18e699SShuo Chen    timerQueue_(new TimerQueue(this)),
442a18e699SShuo Chen    wakeupFd_(createEventfd()),
452a18e699SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
462a18e699SShuo Chen{
472a18e699SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
482a18e699SShuo Chen  if (t_loopInThisThread)
492a18e699SShuo Chen  {
502a18e699SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
512a18e699SShuo Chen              << " exists in this thread " << threadId_;
522a18e699SShuo Chen  }
532a18e699SShuo Chen  else
542a18e699SShuo Chen  {
552a18e699SShuo Chen    t_loopInThisThread = this;
562a18e699SShuo Chen  }
572a18e699SShuo Chen  wakeupChannel_->setReadCallback(
582a18e699SShuo Chen      boost::bind(&EventLoop::handleRead, this));
592a18e699SShuo Chen  // we are always reading the wakeupfd
602a18e699SShuo Chen  wakeupChannel_->enableReading();
612a18e699SShuo Chen}
622a18e699SShuo Chen
632a18e699SShuo ChenEventLoop::~EventLoop()
642a18e699SShuo Chen{
652a18e699SShuo Chen  assert(!looping_);
662a18e699SShuo Chen  ::close(wakeupFd_);
672a18e699SShuo Chen  t_loopInThisThread = NULL;
682a18e699SShuo Chen}
692a18e699SShuo Chen
702a18e699SShuo Chenvoid EventLoop::loop()
712a18e699SShuo Chen{
722a18e699SShuo Chen  assert(!looping_);
732a18e699SShuo Chen  assertInLoopThread();
742a18e699SShuo Chen  looping_ = true;
752a18e699SShuo Chen  quit_ = false;
762a18e699SShuo Chen
772a18e699SShuo Chen  while (!quit_)
782a18e699SShuo Chen  {
792a18e699SShuo Chen    activeChannels_.clear();
802a18e699SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
812a18e699SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
822a18e699SShuo Chen        it != activeChannels_.end(); ++it)
832a18e699SShuo Chen    {
842a18e699SShuo Chen      (*it)->handleEvent(pollReturnTime_);
852a18e699SShuo Chen    }
862a18e699SShuo Chen    doPendingFunctors();
872a18e699SShuo Chen  }
882a18e699SShuo Chen
892a18e699SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
902a18e699SShuo Chen  looping_ = false;
912a18e699SShuo Chen}
922a18e699SShuo Chen
932a18e699SShuo Chenvoid EventLoop::quit()
942a18e699SShuo Chen{
952a18e699SShuo Chen  quit_ = true;
962a18e699SShuo Chen  if (!isInLoopThread())
972a18e699SShuo Chen  {
982a18e699SShuo Chen    wakeup();
992a18e699SShuo Chen  }
1002a18e699SShuo Chen}
1012a18e699SShuo Chen
1022a18e699SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
1032a18e699SShuo Chen{
1042a18e699SShuo Chen  if (isInLoopThread())
1052a18e699SShuo Chen  {
1062a18e699SShuo Chen    cb();
1072a18e699SShuo Chen  }
1082a18e699SShuo Chen  else
1092a18e699SShuo Chen  {
1102a18e699SShuo Chen    queueInLoop(cb);
1112a18e699SShuo Chen  }
1122a18e699SShuo Chen}
1132a18e699SShuo Chen
1142a18e699SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
1152a18e699SShuo Chen{
1162a18e699SShuo Chen  {
1172a18e699SShuo Chen  MutexLockGuard lock(mutex_);
1182a18e699SShuo Chen  pendingFunctors_.push_back(cb);
1192a18e699SShuo Chen  }
1202a18e699SShuo Chen
1210f776063SShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
1222a18e699SShuo Chen  {
1232a18e699SShuo Chen    wakeup();
1242a18e699SShuo Chen  }
1252a18e699SShuo Chen}
1262a18e699SShuo Chen
1272a18e699SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1282a18e699SShuo Chen{
1292a18e699SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
1302a18e699SShuo Chen}
1312a18e699SShuo Chen
1322a18e699SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1332a18e699SShuo Chen{
1342a18e699SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
1352a18e699SShuo Chen  return runAt(time, cb);
1362a18e699SShuo Chen}
1372a18e699SShuo Chen
1382a18e699SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1392a18e699SShuo Chen{
1402a18e699SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
1412a18e699SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
1422a18e699SShuo Chen}
1432a18e699SShuo Chen
1442a18e699SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
1452a18e699SShuo Chen{
1462a18e699SShuo Chen  assert(channel->ownerLoop() == this);
1472a18e699SShuo Chen  assertInLoopThread();
1482a18e699SShuo Chen  poller_->updateChannel(channel);
1492a18e699SShuo Chen}
1502a18e699SShuo Chen
1512a18e699SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
1522a18e699SShuo Chen{
1532a18e699SShuo Chen  assert(channel->ownerLoop() == this);
1542a18e699SShuo Chen  assertInLoopThread();
1552a18e699SShuo Chen  poller_->removeChannel(channel);
1562a18e699SShuo Chen}
1572a18e699SShuo Chen
1582a18e699SShuo Chenvoid EventLoop::abortNotInLoopThread()
1592a18e699SShuo Chen{
1602a18e699SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1612a18e699SShuo Chen            << " was created in threadId_ = " << threadId_
1622a18e699SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
1632a18e699SShuo Chen}
1642a18e699SShuo Chen
1652a18e699SShuo Chenvoid EventLoop::wakeup()
1662a18e699SShuo Chen{
1672a18e699SShuo Chen  uint64_t one = 1;
1682a18e699SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1692a18e699SShuo Chen  if (n != sizeof one)
1702a18e699SShuo Chen  {
1712a18e699SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1722a18e699SShuo Chen  }
1732a18e699SShuo Chen}
1742a18e699SShuo Chen
1752a18e699SShuo Chenvoid EventLoop::handleRead()
1762a18e699SShuo Chen{
1772a18e699SShuo Chen  uint64_t one = 1;
1782a18e699SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1792a18e699SShuo Chen  if (n != sizeof one)
1802a18e699SShuo Chen  {
1812a18e699SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1822a18e699SShuo Chen  }
1832a18e699SShuo Chen}
1842a18e699SShuo Chen
1852a18e699SShuo Chenvoid EventLoop::doPendingFunctors()
1862a18e699SShuo Chen{
1872a18e699SShuo Chen  std::vector<Functor> functors;
1882a18e699SShuo Chen  callingPendingFunctors_ = true;
1892a18e699SShuo Chen
1902a18e699SShuo Chen  {
1912a18e699SShuo Chen  MutexLockGuard lock(mutex_);
1922a18e699SShuo Chen  functors.swap(pendingFunctors_);
1932a18e699SShuo Chen  }
1942a18e699SShuo Chen
1952a18e699SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
1962a18e699SShuo Chen  {
1972a18e699SShuo Chen    functors[i]();
1982a18e699SShuo Chen  }
1992a18e699SShuo Chen  callingPendingFunctors_ = false;
2002a18e699SShuo Chen}
2012a18e699SShuo Chen
202