1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#include "EventLoop.h"
9
10#include "Channel.h"
11#include "Poller.h"
12#include "TimerQueue.h"
13
14#include "logging/Logging.h"
15
16#include <boost/bind.hpp>
17
18#include <assert.h>
19#include <sys/eventfd.h>
20
21using namespace muduo;
22
23__thread EventLoop* t_loopInThisThread = 0;
24const int kPollTimeMs = 10000;
25
26static int createEventfd()
27{
28  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29  if (evtfd < 0)
30  {
31    LOG_SYSERR << "Failed in eventfd";
32    abort();
33  }
34  return evtfd;
35}
36
37EventLoop::EventLoop()
38  : looping_(false),
39    quit_(false),
40    callingPendingFunctors_(false),
41    threadId_(CurrentThread::tid()),
42    poller_(new Poller(this)),
43    timerQueue_(new TimerQueue(this)),
44    wakeupFd_(createEventfd()),
45    wakeupChannel_(new Channel(this, wakeupFd_))
46{
47  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48  if (t_loopInThisThread)
49  {
50    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51              << " exists in this thread " << threadId_;
52  }
53  else
54  {
55    t_loopInThisThread = this;
56  }
57  wakeupChannel_->setReadCallback(
58      boost::bind(&EventLoop::handleRead, this));
59  // we are always reading the wakeupfd
60  wakeupChannel_->enableReading();
61}
62
63EventLoop::~EventLoop()
64{
65  assert(!looping_);
66  ::close(wakeupFd_);
67  t_loopInThisThread = NULL;
68}
69
70void EventLoop::loop()
71{
72  assert(!looping_);
73  assertInLoopThread();
74  looping_ = true;
75  quit_ = false;
76
77  while (!quit_)
78  {
79    activeChannels_.clear();
80    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81    for (ChannelList::iterator it = activeChannels_.begin();
82        it != activeChannels_.end(); ++it)
83    {
84      (*it)->handleEvent();
85    }
86    doPendingFunctors();
87  }
88
89  LOG_TRACE << "EventLoop " << this << " stop looping";
90  looping_ = false;
91}
92
93void EventLoop::quit()
94{
95  quit_ = true;
96  if (!isInLoopThread())
97  {
98    wakeup();
99  }
100}
101
102void EventLoop::runInLoop(const Functor& cb)
103{
104  if (isInLoopThread())
105  {
106    cb();
107  }
108  else
109  {
110    queueInLoop(cb);
111  }
112}
113
114void EventLoop::queueInLoop(const Functor& cb)
115{
116  {
117  MutexLockGuard lock(mutex_);
118  pendingFunctors_.push_back(cb);
119  }
120
121  if (!isInLoopThread() || callingPendingFunctors_)
122  {
123    wakeup();
124  }
125}
126
127TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
128{
129  return timerQueue_->addTimer(cb, time, 0.0);
130}
131
132TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
133{
134  Timestamp time(addTime(Timestamp::now(), delay));
135  return runAt(time, cb);
136}
137
138TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
139{
140  Timestamp time(addTime(Timestamp::now(), interval));
141  return timerQueue_->addTimer(cb, time, interval);
142}
143
144void EventLoop::updateChannel(Channel* channel)
145{
146  assert(channel->ownerLoop() == this);
147  assertInLoopThread();
148  poller_->updateChannel(channel);
149}
150
151void EventLoop::removeChannel(Channel* channel)
152{
153  assert(channel->ownerLoop() == this);
154  assertInLoopThread();
155  poller_->removeChannel(channel);
156}
157
158void EventLoop::abortNotInLoopThread()
159{
160  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
161            << " was created in threadId_ = " << threadId_
162            << ", current thread id = " <<  CurrentThread::tid();
163}
164
165void EventLoop::wakeup()
166{
167  uint64_t one = 1;
168  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
169  if (n != sizeof one)
170  {
171    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
172  }
173}
174
175void EventLoop::handleRead()
176{
177  uint64_t one = 1;
178  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
179  if (n != sizeof one)
180  {
181    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
182  }
183}
184
185void EventLoop::doPendingFunctors()
186{
187  std::vector<Functor> functors;
188  callingPendingFunctors_ = true;
189
190  {
191  MutexLockGuard lock(mutex_);
192  functors.swap(pendingFunctors_);
193  }
194
195  for (size_t i = 0; i < functors.size(); ++i)
196  {
197    functors[i]();
198  }
199  callingPendingFunctors_ = false;
200}
201
202