EventLoop.cc revision 40161064
140161064SShuo Chen// excerpts from http://code.google.com/p/muduo/
240161064SShuo Chen//
340161064SShuo Chen// Use of this source code is governed by a BSD-style license
440161064SShuo Chen// that can be found in the License file.
540161064SShuo Chen//
640161064SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
740161064SShuo Chen
840161064SShuo Chen#include "EventLoop.h"
940161064SShuo Chen
1040161064SShuo Chen#include "Channel.h"
1140161064SShuo Chen#include "Poller.h"
1240161064SShuo Chen#include "TimerQueue.h"
1340161064SShuo Chen
1440161064SShuo Chen#include "logging/Logging.h"
1540161064SShuo Chen
1640161064SShuo Chen#include <boost/bind.hpp>
1740161064SShuo Chen
1840161064SShuo Chen#include <assert.h>
1940161064SShuo Chen#include <signal.h>
2040161064SShuo Chen#include <sys/eventfd.h>
2140161064SShuo Chen
2240161064SShuo Chenusing namespace muduo;
2340161064SShuo Chen
2440161064SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
2540161064SShuo Chenconst int kPollTimeMs = 10000;
2640161064SShuo Chen
2740161064SShuo Chenstatic int createEventfd()
2840161064SShuo Chen{
2940161064SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
3040161064SShuo Chen  if (evtfd < 0)
3140161064SShuo Chen  {
3240161064SShuo Chen    LOG_SYSERR << "Failed in eventfd";
3340161064SShuo Chen    abort();
3440161064SShuo Chen  }
3540161064SShuo Chen  return evtfd;
3640161064SShuo Chen}
3740161064SShuo Chen
3840161064SShuo Chenclass IgnoreSigPipe
3940161064SShuo Chen{
4040161064SShuo Chen public:
4140161064SShuo Chen  IgnoreSigPipe()
4240161064SShuo Chen  {
4340161064SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
4440161064SShuo Chen  }
4540161064SShuo Chen};
4640161064SShuo Chen
4740161064SShuo ChenIgnoreSigPipe initObj;
4840161064SShuo Chen
4940161064SShuo ChenEventLoop::EventLoop()
5040161064SShuo Chen  : looping_(false),
5140161064SShuo Chen    quit_(false),
5240161064SShuo Chen    callingPendingFunctors_(false),
5340161064SShuo Chen    threadId_(CurrentThread::tid()),
5440161064SShuo Chen    poller_(new Poller(this)),
5540161064SShuo Chen    timerQueue_(new TimerQueue(this)),
5640161064SShuo Chen    wakeupFd_(createEventfd()),
5740161064SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
5840161064SShuo Chen{
5940161064SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
6040161064SShuo Chen  if (t_loopInThisThread)
6140161064SShuo Chen  {
6240161064SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
6340161064SShuo Chen              << " exists in this thread " << threadId_;
6440161064SShuo Chen  }
6540161064SShuo Chen  else
6640161064SShuo Chen  {
6740161064SShuo Chen    t_loopInThisThread = this;
6840161064SShuo Chen  }
6940161064SShuo Chen  wakeupChannel_->setReadCallback(
7040161064SShuo Chen      boost::bind(&EventLoop::handleRead, this));
7140161064SShuo Chen  // we are always reading the wakeupfd
7240161064SShuo Chen  wakeupChannel_->enableReading();
7340161064SShuo Chen}
7440161064SShuo Chen
7540161064SShuo ChenEventLoop::~EventLoop()
7640161064SShuo Chen{
7740161064SShuo Chen  assert(!looping_);
7840161064SShuo Chen  ::close(wakeupFd_);
7940161064SShuo Chen  t_loopInThisThread = NULL;
8040161064SShuo Chen}
8140161064SShuo Chen
8240161064SShuo Chenvoid EventLoop::loop()
8340161064SShuo Chen{
8440161064SShuo Chen  assert(!looping_);
8540161064SShuo Chen  assertInLoopThread();
8640161064SShuo Chen  looping_ = true;
8740161064SShuo Chen  quit_ = false;
8840161064SShuo Chen
8940161064SShuo Chen  while (!quit_)
9040161064SShuo Chen  {
9140161064SShuo Chen    activeChannels_.clear();
9240161064SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
9340161064SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
9440161064SShuo Chen        it != activeChannels_.end(); ++it)
9540161064SShuo Chen    {
9640161064SShuo Chen      (*it)->handleEvent(pollReturnTime_);
9740161064SShuo Chen    }
9840161064SShuo Chen    doPendingFunctors();
9940161064SShuo Chen  }
10040161064SShuo Chen
10140161064SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
10240161064SShuo Chen  looping_ = false;
10340161064SShuo Chen}
10440161064SShuo Chen
10540161064SShuo Chenvoid EventLoop::quit()
10640161064SShuo Chen{
10740161064SShuo Chen  quit_ = true;
10840161064SShuo Chen  if (!isInLoopThread())
10940161064SShuo Chen  {
11040161064SShuo Chen    wakeup();
11140161064SShuo Chen  }
11240161064SShuo Chen}
11340161064SShuo Chen
11440161064SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
11540161064SShuo Chen{
11640161064SShuo Chen  if (isInLoopThread())
11740161064SShuo Chen  {
11840161064SShuo Chen    cb();
11940161064SShuo Chen  }
12040161064SShuo Chen  else
12140161064SShuo Chen  {
12240161064SShuo Chen    queueInLoop(cb);
12340161064SShuo Chen  }
12440161064SShuo Chen}
12540161064SShuo Chen
12640161064SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
12740161064SShuo Chen{
12840161064SShuo Chen  {
12940161064SShuo Chen  MutexLockGuard lock(mutex_);
13040161064SShuo Chen  pendingFunctors_.push_back(cb);
13140161064SShuo Chen  }
13240161064SShuo Chen
13340161064SShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
13440161064SShuo Chen  {
13540161064SShuo Chen    wakeup();
13640161064SShuo Chen  }
13740161064SShuo Chen}
13840161064SShuo Chen
13940161064SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
14040161064SShuo Chen{
14140161064SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
14240161064SShuo Chen}
14340161064SShuo Chen
14440161064SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
14540161064SShuo Chen{
14640161064SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
14740161064SShuo Chen  return runAt(time, cb);
14840161064SShuo Chen}
14940161064SShuo Chen
15040161064SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
15140161064SShuo Chen{
15240161064SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
15340161064SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
15440161064SShuo Chen}
15540161064SShuo Chen
15640161064SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
15740161064SShuo Chen{
15840161064SShuo Chen  assert(channel->ownerLoop() == this);
15940161064SShuo Chen  assertInLoopThread();
16040161064SShuo Chen  poller_->updateChannel(channel);
16140161064SShuo Chen}
16240161064SShuo Chen
16340161064SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
16440161064SShuo Chen{
16540161064SShuo Chen  assert(channel->ownerLoop() == this);
16640161064SShuo Chen  assertInLoopThread();
16740161064SShuo Chen  poller_->removeChannel(channel);
16840161064SShuo Chen}
16940161064SShuo Chen
17040161064SShuo Chenvoid EventLoop::abortNotInLoopThread()
17140161064SShuo Chen{
17240161064SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
17340161064SShuo Chen            << " was created in threadId_ = " << threadId_
17440161064SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
17540161064SShuo Chen}
17640161064SShuo Chen
17740161064SShuo Chenvoid EventLoop::wakeup()
17840161064SShuo Chen{
17940161064SShuo Chen  uint64_t one = 1;
18040161064SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
18140161064SShuo Chen  if (n != sizeof one)
18240161064SShuo Chen  {
18340161064SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
18440161064SShuo Chen  }
18540161064SShuo Chen}
18640161064SShuo Chen
18740161064SShuo Chenvoid EventLoop::handleRead()
18840161064SShuo Chen{
18940161064SShuo Chen  uint64_t one = 1;
19040161064SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
19140161064SShuo Chen  if (n != sizeof one)
19240161064SShuo Chen  {
19340161064SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
19440161064SShuo Chen  }
19540161064SShuo Chen}
19640161064SShuo Chen
19740161064SShuo Chenvoid EventLoop::doPendingFunctors()
19840161064SShuo Chen{
19940161064SShuo Chen  std::vector<Functor> functors;
20040161064SShuo Chen  callingPendingFunctors_ = true;
20140161064SShuo Chen
20240161064SShuo Chen  {
20340161064SShuo Chen  MutexLockGuard lock(mutex_);
20440161064SShuo Chen  functors.swap(pendingFunctors_);
20540161064SShuo Chen  }
20640161064SShuo Chen
20740161064SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
20840161064SShuo Chen  {
20940161064SShuo Chen    functors[i]();
21040161064SShuo Chen  }
21140161064SShuo Chen  callingPendingFunctors_ = false;
21240161064SShuo Chen}
21340161064SShuo Chen
214