EventLoop.cc revision 9a1e991d
19a1e991dSShuo Chen// excerpts from http://code.google.com/p/muduo/
29a1e991dSShuo Chen//
39a1e991dSShuo Chen// Use of this source code is governed by a BSD-style license
49a1e991dSShuo Chen// that can be found in the License file.
59a1e991dSShuo Chen//
69a1e991dSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
79a1e991dSShuo Chen
89a1e991dSShuo Chen#include "EventLoop.h"
99a1e991dSShuo Chen
109a1e991dSShuo Chen#include "Channel.h"
119a1e991dSShuo Chen#include "Poller.h"
129a1e991dSShuo Chen#include "TimerQueue.h"
139a1e991dSShuo Chen
149a1e991dSShuo Chen#include "logging/Logging.h"
159a1e991dSShuo Chen
169a1e991dSShuo Chen#include <boost/bind.hpp>
179a1e991dSShuo Chen
189a1e991dSShuo Chen#include <assert.h>
199a1e991dSShuo Chen#include <sys/eventfd.h>
209a1e991dSShuo Chen
219a1e991dSShuo Chenusing namespace muduo;
229a1e991dSShuo Chen
239a1e991dSShuo Chen__thread EventLoop* t_loopInThisThread = 0;
249a1e991dSShuo Chenconst int kPollTimeMs = 10000;
259a1e991dSShuo Chen
269a1e991dSShuo Chenstatic int createEventfd()
279a1e991dSShuo Chen{
289a1e991dSShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
299a1e991dSShuo Chen  if (evtfd < 0)
309a1e991dSShuo Chen  {
319a1e991dSShuo Chen    LOG_SYSERR << "Failed in eventfd";
329a1e991dSShuo Chen    abort();
339a1e991dSShuo Chen  }
349a1e991dSShuo Chen  return evtfd;
359a1e991dSShuo Chen}
369a1e991dSShuo Chen
379a1e991dSShuo ChenEventLoop::EventLoop()
389a1e991dSShuo Chen  : looping_(false),
399a1e991dSShuo Chen    quit_(false),
409a1e991dSShuo Chen    callingPendingFunctors_(false),
419a1e991dSShuo Chen    threadId_(CurrentThread::tid()),
429a1e991dSShuo Chen    poller_(new Poller(this)),
439a1e991dSShuo Chen    timerQueue_(new TimerQueue(this)),
449a1e991dSShuo Chen    wakeupFd_(createEventfd()),
459a1e991dSShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
469a1e991dSShuo Chen{
479a1e991dSShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
489a1e991dSShuo Chen  if (t_loopInThisThread)
499a1e991dSShuo Chen  {
509a1e991dSShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
519a1e991dSShuo Chen              << " exists in this thread " << threadId_;
529a1e991dSShuo Chen  }
539a1e991dSShuo Chen  else
549a1e991dSShuo Chen  {
559a1e991dSShuo Chen    t_loopInThisThread = this;
569a1e991dSShuo Chen  }
579a1e991dSShuo Chen  wakeupChannel_->setReadCallback(
589a1e991dSShuo Chen      boost::bind(&EventLoop::handleRead, this));
599a1e991dSShuo Chen  // we are always reading the wakeupfd
609a1e991dSShuo Chen  wakeupChannel_->enableReading();
619a1e991dSShuo Chen}
629a1e991dSShuo Chen
639a1e991dSShuo ChenEventLoop::~EventLoop()
649a1e991dSShuo Chen{
659a1e991dSShuo Chen  assert(!looping_);
669a1e991dSShuo Chen  ::close(wakeupFd_);
679a1e991dSShuo Chen  t_loopInThisThread = NULL;
689a1e991dSShuo Chen}
699a1e991dSShuo Chen
709a1e991dSShuo Chenvoid EventLoop::loop()
719a1e991dSShuo Chen{
729a1e991dSShuo Chen  assert(!looping_);
739a1e991dSShuo Chen  assertInLoopThread();
749a1e991dSShuo Chen  looping_ = true;
759a1e991dSShuo Chen  quit_ = false;
769a1e991dSShuo Chen
779a1e991dSShuo Chen  while (!quit_)
789a1e991dSShuo Chen  {
799a1e991dSShuo Chen    activeChannels_.clear();
809a1e991dSShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
819a1e991dSShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
829a1e991dSShuo Chen        it != activeChannels_.end(); ++it)
839a1e991dSShuo Chen    {
849a1e991dSShuo Chen      (*it)->handleEvent();
859a1e991dSShuo Chen    }
869a1e991dSShuo Chen    doPendingFunctors();
879a1e991dSShuo Chen  }
889a1e991dSShuo Chen
899a1e991dSShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
909a1e991dSShuo Chen  looping_ = false;
919a1e991dSShuo Chen}
929a1e991dSShuo Chen
939a1e991dSShuo Chenvoid EventLoop::quit()
949a1e991dSShuo Chen{
959a1e991dSShuo Chen  quit_ = true;
969a1e991dSShuo Chen  if (!isInLoopThread())
979a1e991dSShuo Chen  {
989a1e991dSShuo Chen    wakeup();
999a1e991dSShuo Chen  }
1009a1e991dSShuo Chen}
1019a1e991dSShuo Chen
1029a1e991dSShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
1039a1e991dSShuo Chen{
1049a1e991dSShuo Chen  if (isInLoopThread())
1059a1e991dSShuo Chen  {
1069a1e991dSShuo Chen    cb();
1079a1e991dSShuo Chen  }
1089a1e991dSShuo Chen  else
1099a1e991dSShuo Chen  {
1109a1e991dSShuo Chen    queueInLoop(cb);
1119a1e991dSShuo Chen    wakeup();
1129a1e991dSShuo Chen  }
1139a1e991dSShuo Chen}
1149a1e991dSShuo Chen
1159a1e991dSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
1169a1e991dSShuo Chen{
1179a1e991dSShuo Chen  {
1189a1e991dSShuo Chen  MutexLockGuard lock(mutex_);
1199a1e991dSShuo Chen  pendingFunctors_.push_back(cb);
1209a1e991dSShuo Chen  }
1219a1e991dSShuo Chen
1229a1e991dSShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
1239a1e991dSShuo Chen  {
1249a1e991dSShuo Chen    wakeup();
1259a1e991dSShuo Chen  }
1269a1e991dSShuo Chen}
1279a1e991dSShuo Chen
1289a1e991dSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
1299a1e991dSShuo Chen{
1309a1e991dSShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
1319a1e991dSShuo Chen}
1329a1e991dSShuo Chen
1339a1e991dSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
1349a1e991dSShuo Chen{
1359a1e991dSShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
1369a1e991dSShuo Chen  return runAt(time, cb);
1379a1e991dSShuo Chen}
1389a1e991dSShuo Chen
1399a1e991dSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
1409a1e991dSShuo Chen{
1419a1e991dSShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
1429a1e991dSShuo Chen  return timerQueue_->addTimer(cb, time, interval);
1439a1e991dSShuo Chen}
1449a1e991dSShuo Chen
1459a1e991dSShuo Chenvoid EventLoop::updateChannel(Channel* channel)
1469a1e991dSShuo Chen{
1479a1e991dSShuo Chen  assert(channel->ownerLoop() == this);
1489a1e991dSShuo Chen  assertInLoopThread();
1499a1e991dSShuo Chen  poller_->updateChannel(channel);
1509a1e991dSShuo Chen}
1519a1e991dSShuo Chen
1529a1e991dSShuo Chenvoid EventLoop::abortNotInLoopThread()
1539a1e991dSShuo Chen{
1549a1e991dSShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
1559a1e991dSShuo Chen            << " was created in threadId_ = " << threadId_
1569a1e991dSShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
1579a1e991dSShuo Chen}
1589a1e991dSShuo Chen
1599a1e991dSShuo Chenvoid EventLoop::wakeup()
1609a1e991dSShuo Chen{
1619a1e991dSShuo Chen  uint64_t one = 1;
1629a1e991dSShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
1639a1e991dSShuo Chen  if (n != sizeof one)
1649a1e991dSShuo Chen  {
1659a1e991dSShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
1669a1e991dSShuo Chen  }
1679a1e991dSShuo Chen}
1689a1e991dSShuo Chen
1699a1e991dSShuo Chenvoid EventLoop::handleRead()
1709a1e991dSShuo Chen{
1719a1e991dSShuo Chen  uint64_t one = 1;
1729a1e991dSShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
1739a1e991dSShuo Chen  if (n != sizeof one)
1749a1e991dSShuo Chen  {
1759a1e991dSShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
1769a1e991dSShuo Chen  }
1779a1e991dSShuo Chen}
1789a1e991dSShuo Chen
1799a1e991dSShuo Chenvoid EventLoop::doPendingFunctors()
1809a1e991dSShuo Chen{
1819a1e991dSShuo Chen  std::vector<Functor> functors;
1829a1e991dSShuo Chen  callingPendingFunctors_ = true;
1839a1e991dSShuo Chen
1849a1e991dSShuo Chen  {
1859a1e991dSShuo Chen  MutexLockGuard lock(mutex_);
1869a1e991dSShuo Chen  functors.swap(pendingFunctors_);
1879a1e991dSShuo Chen  }
1889a1e991dSShuo Chen
1899a1e991dSShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
1909a1e991dSShuo Chen  {
1919a1e991dSShuo Chen    functors[i]();
1929a1e991dSShuo Chen  }
1939a1e991dSShuo Chen  callingPendingFunctors_ = false;
1949a1e991dSShuo Chen}
1959a1e991dSShuo Chen
196