EventLoop.cc revision bfe73648
1bfe73648SShuo Chen// excerpts from http://code.google.com/p/muduo/
2bfe73648SShuo Chen//
3bfe73648SShuo Chen// Use of this source code is governed by a BSD-style license
4bfe73648SShuo Chen// that can be found in the License file.
5bfe73648SShuo Chen//
6bfe73648SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7bfe73648SShuo Chen
8bfe73648SShuo Chen#include "EventLoop.h"
9bfe73648SShuo Chen
10bfe73648SShuo Chen#include "Channel.h"
11bfe73648SShuo Chen#include "Poller.h"
12bfe73648SShuo Chen#include "TimerQueue.h"
13bfe73648SShuo Chen
14bfe73648SShuo Chen#include "logging/Logging.h"
15bfe73648SShuo Chen
16bfe73648SShuo Chen#include <boost/bind.hpp>
17bfe73648SShuo Chen
18bfe73648SShuo Chen#include <assert.h>
19bfe73648SShuo Chen#include <sys/eventfd.h>
20bfe73648SShuo Chen
21bfe73648SShuo Chenusing namespace muduo;
22bfe73648SShuo Chen
23bfe73648SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
24bfe73648SShuo Chenconst int kPollTimeMs = 10000;
25bfe73648SShuo Chen
26bfe73648SShuo Chenstatic int createEventfd()
27bfe73648SShuo Chen{
28bfe73648SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29bfe73648SShuo Chen  if (evtfd < 0)
30bfe73648SShuo Chen  {
31bfe73648SShuo Chen    LOG_SYSERR << "Failed in eventfd";
32bfe73648SShuo Chen    abort();
33bfe73648SShuo Chen  }
34bfe73648SShuo Chen  return evtfd;
35bfe73648SShuo Chen}
36bfe73648SShuo Chen
37bfe73648SShuo ChenEventLoop::EventLoop()
38bfe73648SShuo Chen  : looping_(false),
39bfe73648SShuo Chen    quit_(false),
40bfe73648SShuo Chen    callingPendingFunctors_(false),
41bfe73648SShuo Chen    threadId_(CurrentThread::tid()),
42bfe73648SShuo Chen    poller_(new Poller(this)),
43bfe73648SShuo Chen    timerQueue_(new TimerQueue(this)),
44bfe73648SShuo Chen    wakeupFd_(createEventfd()),
45bfe73648SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
46bfe73648SShuo Chen{
47bfe73648SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48bfe73648SShuo Chen  if (t_loopInThisThread)
49bfe73648SShuo Chen  {
50bfe73648SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51bfe73648SShuo Chen              << " exists in this thread " << threadId_;
52bfe73648SShuo Chen  }
53bfe73648SShuo Chen  else
54bfe73648SShuo Chen  {
55bfe73648SShuo Chen    t_loopInThisThread = this;
56bfe73648SShuo Chen  }
57bfe73648SShuo Chen  wakeupChannel_->setReadCallback(
58bfe73648SShuo Chen      boost::bind(&EventLoop::handleRead, this));
59bfe73648SShuo Chen  // we are always reading the wakeupfd
60bfe73648SShuo Chen  wakeupChannel_->enableReading();
61bfe73648SShuo Chen}
62bfe73648SShuo Chen
63bfe73648SShuo ChenEventLoop::~EventLoop()
64bfe73648SShuo Chen{
65bfe73648SShuo Chen  assert(!looping_);
66bfe73648SShuo Chen  ::close(wakeupFd_);
67bfe73648SShuo Chen  t_loopInThisThread = NULL;
68bfe73648SShuo Chen}
69bfe73648SShuo Chen
70bfe73648SShuo Chenvoid EventLoop::loop()
71bfe73648SShuo Chen{
72bfe73648SShuo Chen  assert(!looping_);
73bfe73648SShuo Chen  assertInLoopThread();
74bfe73648SShuo Chen  looping_ = true;
75bfe73648SShuo Chen  quit_ = false;
76bfe73648SShuo Chen
77bfe73648SShuo Chen  while (!quit_)
78bfe73648SShuo Chen  {
79bfe73648SShuo Chen    activeChannels_.clear();
80bfe73648SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81bfe73648SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
82bfe73648SShuo Chen        it != activeChannels_.end(); ++it)
83bfe73648SShuo Chen    {
84bfe73648SShuo Chen      (*it)->handleEvent();
85bfe73648SShuo Chen    }
86bfe73648SShuo Chen    doPendingFunctors();
87bfe73648SShuo Chen  }
88bfe73648SShuo Chen
89bfe73648SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
90bfe73648SShuo Chen  looping_ = false;
91bfe73648SShuo Chen}
92bfe73648SShuo Chen
93bfe73648SShuo Chenvoid EventLoop::quit()
94bfe73648SShuo Chen{
95bfe73648SShuo Chen  quit_ = true;
96bfe73648SShuo Chen  if (!isInLoopThread())
97bfe73648SShuo Chen  {
98bfe73648SShuo Chen    wakeup();
99bfe73648SShuo Chen  }
100bfe73648SShuo Chen}
101bfe73648SShuo Chen
102bfe73648SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
103bfe73648SShuo Chen{
104bfe73648SShuo Chen  if (isInLoopThread())
105bfe73648SShuo Chen  {
106bfe73648SShuo Chen    cb();
107bfe73648SShuo Chen  }
108bfe73648SShuo Chen  else
109bfe73648SShuo Chen  {
110bfe73648SShuo Chen    queueInLoop(cb);
111bfe73648SShuo Chen    wakeup();
112bfe73648SShuo Chen  }
113bfe73648SShuo Chen}
114bfe73648SShuo Chen
115bfe73648SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
116bfe73648SShuo Chen{
117bfe73648SShuo Chen  {
118bfe73648SShuo Chen  MutexLockGuard lock(mutex_);
119bfe73648SShuo Chen  pendingFunctors_.push_back(cb);
120bfe73648SShuo Chen  }
121bfe73648SShuo Chen
122bfe73648SShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
123bfe73648SShuo Chen  {
124bfe73648SShuo Chen    wakeup();
125bfe73648SShuo Chen  }
126bfe73648SShuo Chen}
127bfe73648SShuo Chen
128bfe73648SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
129bfe73648SShuo Chen{
130bfe73648SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
131bfe73648SShuo Chen}
132bfe73648SShuo Chen
133bfe73648SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
134bfe73648SShuo Chen{
135bfe73648SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
136bfe73648SShuo Chen  return runAt(time, cb);
137bfe73648SShuo Chen}
138bfe73648SShuo Chen
139bfe73648SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
140bfe73648SShuo Chen{
141bfe73648SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
142bfe73648SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
143bfe73648SShuo Chen}
144bfe73648SShuo Chen
145bfe73648SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
146bfe73648SShuo Chen{
147bfe73648SShuo Chen  assert(channel->ownerLoop() == this);
148bfe73648SShuo Chen  assertInLoopThread();
149bfe73648SShuo Chen  poller_->updateChannel(channel);
150bfe73648SShuo Chen}
151bfe73648SShuo Chen
152bfe73648SShuo Chenvoid EventLoop::abortNotInLoopThread()
153bfe73648SShuo Chen{
154bfe73648SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
155bfe73648SShuo Chen            << " was created in threadId_ = " << threadId_
156bfe73648SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
157bfe73648SShuo Chen}
158bfe73648SShuo Chen
159bfe73648SShuo Chenvoid EventLoop::wakeup()
160bfe73648SShuo Chen{
161bfe73648SShuo Chen  uint64_t one = 1;
162bfe73648SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
163bfe73648SShuo Chen  if (n != sizeof one)
164bfe73648SShuo Chen  {
165bfe73648SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
166bfe73648SShuo Chen  }
167bfe73648SShuo Chen}
168bfe73648SShuo Chen
169bfe73648SShuo Chenvoid EventLoop::handleRead()
170bfe73648SShuo Chen{
171bfe73648SShuo Chen  uint64_t one = 1;
172bfe73648SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
173bfe73648SShuo Chen  if (n != sizeof one)
174bfe73648SShuo Chen  {
175bfe73648SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
176bfe73648SShuo Chen  }
177bfe73648SShuo Chen}
178bfe73648SShuo Chen
179bfe73648SShuo Chenvoid EventLoop::doPendingFunctors()
180bfe73648SShuo Chen{
181bfe73648SShuo Chen  std::vector<Functor> functors;
182bfe73648SShuo Chen  callingPendingFunctors_ = true;
183bfe73648SShuo Chen
184bfe73648SShuo Chen  {
185bfe73648SShuo Chen  MutexLockGuard lock(mutex_);
186bfe73648SShuo Chen  functors.swap(pendingFunctors_);
187bfe73648SShuo Chen  }
188bfe73648SShuo Chen
189bfe73648SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
190bfe73648SShuo Chen  {
191bfe73648SShuo Chen    functors[i]();
192bfe73648SShuo Chen  }
193bfe73648SShuo Chen  callingPendingFunctors_ = false;
194bfe73648SShuo Chen}
195bfe73648SShuo Chen
196