EventLoop.cc revision b37003a7
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/
2b37003a7SShuo Chen//
3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license
4b37003a7SShuo Chen// that can be found in the License file.
5b37003a7SShuo Chen//
6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7b37003a7SShuo Chen
8b37003a7SShuo Chen#include "EventLoop.h"
9b37003a7SShuo Chen
10b37003a7SShuo Chen#include "Channel.h"
11b37003a7SShuo Chen#include "Poller.h"
12b37003a7SShuo Chen#include "TimerQueue.h"
13b37003a7SShuo Chen
14b37003a7SShuo Chen#include "logging/Logging.h"
15b37003a7SShuo Chen
16b37003a7SShuo Chen#include <boost/bind.hpp>
17b37003a7SShuo Chen
18b37003a7SShuo Chen#include <assert.h>
19b37003a7SShuo Chen#include <sys/eventfd.h>
20b37003a7SShuo Chen
21b37003a7SShuo Chenusing namespace muduo;
22b37003a7SShuo Chen
23b37003a7SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
24b37003a7SShuo Chenconst int kPollTimeMs = 10000;
25b37003a7SShuo Chen
26b37003a7SShuo Chenstatic int createEventfd()
27b37003a7SShuo Chen{
28b37003a7SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29b37003a7SShuo Chen  if (evtfd < 0)
30b37003a7SShuo Chen  {
31b37003a7SShuo Chen    LOG_SYSERR << "Failed in eventfd";
32b37003a7SShuo Chen    abort();
33b37003a7SShuo Chen  }
34b37003a7SShuo Chen  return evtfd;
35b37003a7SShuo Chen}
36b37003a7SShuo Chen
37b37003a7SShuo ChenEventLoop::EventLoop()
38b37003a7SShuo Chen  : looping_(false),
39b37003a7SShuo Chen    quit_(false),
40b37003a7SShuo Chen    callingPendingFunctors_(false),
41b37003a7SShuo Chen    threadId_(CurrentThread::tid()),
42b37003a7SShuo Chen    poller_(new Poller(this)),
43b37003a7SShuo Chen    timerQueue_(new TimerQueue(this)),
44b37003a7SShuo Chen    wakeupFd_(createEventfd()),
45b37003a7SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
46b37003a7SShuo Chen{
47b37003a7SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48b37003a7SShuo Chen  if (t_loopInThisThread)
49b37003a7SShuo Chen  {
50b37003a7SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51b37003a7SShuo Chen              << " exists in this thread " << threadId_;
52b37003a7SShuo Chen  }
53b37003a7SShuo Chen  else
54b37003a7SShuo Chen  {
55b37003a7SShuo Chen    t_loopInThisThread = this;
56b37003a7SShuo Chen  }
57b37003a7SShuo Chen  wakeupChannel_->setReadCallback(
58b37003a7SShuo Chen      boost::bind(&EventLoop::handleRead, this));
59b37003a7SShuo Chen  // we are always reading the wakeupfd
60b37003a7SShuo Chen  wakeupChannel_->enableReading();
61b37003a7SShuo Chen}
62b37003a7SShuo Chen
63b37003a7SShuo ChenEventLoop::~EventLoop()
64b37003a7SShuo Chen{
65b37003a7SShuo Chen  assert(!looping_);
66b37003a7SShuo Chen  ::close(wakeupFd_);
67b37003a7SShuo Chen  t_loopInThisThread = NULL;
68b37003a7SShuo Chen}
69b37003a7SShuo Chen
70b37003a7SShuo Chenvoid EventLoop::loop()
71b37003a7SShuo Chen{
72b37003a7SShuo Chen  assert(!looping_);
73b37003a7SShuo Chen  assertInLoopThread();
74b37003a7SShuo Chen  looping_ = true;
75b37003a7SShuo Chen  quit_ = false;
76b37003a7SShuo Chen
77b37003a7SShuo Chen  while (!quit_)
78b37003a7SShuo Chen  {
79b37003a7SShuo Chen    activeChannels_.clear();
80b37003a7SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81b37003a7SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
82b37003a7SShuo Chen        it != activeChannels_.end(); ++it)
83b37003a7SShuo Chen    {
84b37003a7SShuo Chen      (*it)->handleEvent(pollReturnTime_);
85b37003a7SShuo Chen    }
86b37003a7SShuo Chen    doPendingFunctors();
87b37003a7SShuo Chen  }
88b37003a7SShuo Chen
89b37003a7SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
90b37003a7SShuo Chen  looping_ = false;
91b37003a7SShuo Chen}
92b37003a7SShuo Chen
93b37003a7SShuo Chenvoid EventLoop::quit()
94b37003a7SShuo Chen{
95b37003a7SShuo Chen  quit_ = true;
96b37003a7SShuo Chen  if (!isInLoopThread())
97b37003a7SShuo Chen  {
98b37003a7SShuo Chen    wakeup();
99b37003a7SShuo Chen  }
100b37003a7SShuo Chen}
101b37003a7SShuo Chen
102b37003a7SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
103b37003a7SShuo Chen{
104b37003a7SShuo Chen  if (isInLoopThread())
105b37003a7SShuo Chen  {
106b37003a7SShuo Chen    cb();
107b37003a7SShuo Chen  }
108b37003a7SShuo Chen  else
109b37003a7SShuo Chen  {
110b37003a7SShuo Chen    queueInLoop(cb);
111b37003a7SShuo Chen    wakeup();
112b37003a7SShuo Chen  }
113b37003a7SShuo Chen}
114b37003a7SShuo Chen
115b37003a7SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
116b37003a7SShuo Chen{
117b37003a7SShuo Chen  {
118b37003a7SShuo Chen  MutexLockGuard lock(mutex_);
119b37003a7SShuo Chen  pendingFunctors_.push_back(cb);
120b37003a7SShuo Chen  }
121b37003a7SShuo Chen
122b37003a7SShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
123b37003a7SShuo Chen  {
124b37003a7SShuo Chen    wakeup();
125b37003a7SShuo Chen  }
126b37003a7SShuo Chen}
127b37003a7SShuo Chen
128b37003a7SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
129b37003a7SShuo Chen{
130b37003a7SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
131b37003a7SShuo Chen}
132b37003a7SShuo Chen
133b37003a7SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
134b37003a7SShuo Chen{
135b37003a7SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
136b37003a7SShuo Chen  return runAt(time, cb);
137b37003a7SShuo Chen}
138b37003a7SShuo Chen
139b37003a7SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
140b37003a7SShuo Chen{
141b37003a7SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
142b37003a7SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
143b37003a7SShuo Chen}
144b37003a7SShuo Chen
145b37003a7SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
146b37003a7SShuo Chen{
147b37003a7SShuo Chen  assert(channel->ownerLoop() == this);
148b37003a7SShuo Chen  assertInLoopThread();
149b37003a7SShuo Chen  poller_->updateChannel(channel);
150b37003a7SShuo Chen}
151b37003a7SShuo Chen
152b37003a7SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
153b37003a7SShuo Chen{
154b37003a7SShuo Chen  assert(channel->ownerLoop() == this);
155b37003a7SShuo Chen  assertInLoopThread();
156b37003a7SShuo Chen  poller_->removeChannel(channel);
157b37003a7SShuo Chen}
158b37003a7SShuo Chen
159b37003a7SShuo Chenvoid EventLoop::abortNotInLoopThread()
160b37003a7SShuo Chen{
161b37003a7SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
162b37003a7SShuo Chen            << " was created in threadId_ = " << threadId_
163b37003a7SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
164b37003a7SShuo Chen}
165b37003a7SShuo Chen
166b37003a7SShuo Chenvoid EventLoop::wakeup()
167b37003a7SShuo Chen{
168b37003a7SShuo Chen  uint64_t one = 1;
169b37003a7SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
170b37003a7SShuo Chen  if (n != sizeof one)
171b37003a7SShuo Chen  {
172b37003a7SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
173b37003a7SShuo Chen  }
174b37003a7SShuo Chen}
175b37003a7SShuo Chen
176b37003a7SShuo Chenvoid EventLoop::handleRead()
177b37003a7SShuo Chen{
178b37003a7SShuo Chen  uint64_t one = 1;
179b37003a7SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
180b37003a7SShuo Chen  if (n != sizeof one)
181b37003a7SShuo Chen  {
182b37003a7SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
183b37003a7SShuo Chen  }
184b37003a7SShuo Chen}
185b37003a7SShuo Chen
186b37003a7SShuo Chenvoid EventLoop::doPendingFunctors()
187b37003a7SShuo Chen{
188b37003a7SShuo Chen  std::vector<Functor> functors;
189b37003a7SShuo Chen  callingPendingFunctors_ = true;
190b37003a7SShuo Chen
191b37003a7SShuo Chen  {
192b37003a7SShuo Chen  MutexLockGuard lock(mutex_);
193b37003a7SShuo Chen  functors.swap(pendingFunctors_);
194b37003a7SShuo Chen  }
195b37003a7SShuo Chen
196b37003a7SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
197b37003a7SShuo Chen  {
198b37003a7SShuo Chen    functors[i]();
199b37003a7SShuo Chen  }
200b37003a7SShuo Chen  callingPendingFunctors_ = false;
201b37003a7SShuo Chen}
202b37003a7SShuo Chen
203