1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#include "EventLoop.h"
9
10#include "Channel.h"
11#include "Poller.h"
12#include "TimerQueue.h"
13
14#include "logging/Logging.h"
15
16#include <boost/bind.hpp>
17
18#include <assert.h>
19#include <signal.h>
20#include <sys/eventfd.h>
21
22using namespace muduo;
23
24__thread EventLoop* t_loopInThisThread = 0;
25const int kPollTimeMs = 10000;
26
27static int createEventfd()
28{
29  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30  if (evtfd < 0)
31  {
32    LOG_SYSERR << "Failed in eventfd";
33    abort();
34  }
35  return evtfd;
36}
37
38class IgnoreSigPipe
39{
40 public:
41  IgnoreSigPipe()
42  {
43    ::signal(SIGPIPE, SIG_IGN);
44  }
45};
46
47IgnoreSigPipe initObj;
48
49EventLoop::EventLoop()
50  : looping_(false),
51    quit_(false),
52    callingPendingFunctors_(false),
53    threadId_(CurrentThread::tid()),
54    poller_(new Poller(this)),
55    timerQueue_(new TimerQueue(this)),
56    wakeupFd_(createEventfd()),
57    wakeupChannel_(new Channel(this, wakeupFd_))
58{
59  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60  if (t_loopInThisThread)
61  {
62    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63              << " exists in this thread " << threadId_;
64  }
65  else
66  {
67    t_loopInThisThread = this;
68  }
69  wakeupChannel_->setReadCallback(
70      boost::bind(&EventLoop::handleRead, this));
71  // we are always reading the wakeupfd
72  wakeupChannel_->enableReading();
73}
74
75EventLoop::~EventLoop()
76{
77  assert(!looping_);
78  ::close(wakeupFd_);
79  t_loopInThisThread = NULL;
80}
81
82void EventLoop::loop()
83{
84  assert(!looping_);
85  assertInLoopThread();
86  looping_ = true;
87  quit_ = false;
88
89  while (!quit_)
90  {
91    activeChannels_.clear();
92    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93    for (ChannelList::iterator it = activeChannels_.begin();
94        it != activeChannels_.end(); ++it)
95    {
96      (*it)->handleEvent(pollReturnTime_);
97    }
98    doPendingFunctors();
99  }
100
101  LOG_TRACE << "EventLoop " << this << " stop looping";
102  looping_ = false;
103}
104
105void EventLoop::quit()
106{
107  quit_ = true;
108  if (!isInLoopThread())
109  {
110    wakeup();
111  }
112}
113
114void EventLoop::runInLoop(const Functor& cb)
115{
116  if (isInLoopThread())
117  {
118    cb();
119  }
120  else
121  {
122    queueInLoop(cb);
123  }
124}
125
126void EventLoop::queueInLoop(const Functor& cb)
127{
128  {
129  MutexLockGuard lock(mutex_);
130  pendingFunctors_.push_back(cb);
131  }
132
133  if (!isInLoopThread() || callingPendingFunctors_)
134  {
135    wakeup();
136  }
137}
138
139TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140{
141  return timerQueue_->addTimer(cb, time, 0.0);
142}
143
144TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145{
146  Timestamp time(addTime(Timestamp::now(), delay));
147  return runAt(time, cb);
148}
149
150TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151{
152  Timestamp time(addTime(Timestamp::now(), interval));
153  return timerQueue_->addTimer(cb, time, interval);
154}
155
156void EventLoop::updateChannel(Channel* channel)
157{
158  assert(channel->ownerLoop() == this);
159  assertInLoopThread();
160  poller_->updateChannel(channel);
161}
162
163void EventLoop::removeChannel(Channel* channel)
164{
165  assert(channel->ownerLoop() == this);
166  assertInLoopThread();
167  poller_->removeChannel(channel);
168}
169
170void EventLoop::abortNotInLoopThread()
171{
172  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
173            << " was created in threadId_ = " << threadId_
174            << ", current thread id = " <<  CurrentThread::tid();
175}
176
177void EventLoop::wakeup()
178{
179  uint64_t one = 1;
180  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
181  if (n != sizeof one)
182  {
183    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
184  }
185}
186
187void EventLoop::handleRead()
188{
189  uint64_t one = 1;
190  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
191  if (n != sizeof one)
192  {
193    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
194  }
195}
196
197void EventLoop::doPendingFunctors()
198{
199  std::vector<Functor> functors;
200  callingPendingFunctors_ = true;
201
202  {
203  MutexLockGuard lock(mutex_);
204  functors.swap(pendingFunctors_);
205  }
206
207  for (size_t i = 0; i < functors.size(); ++i)
208  {
209    functors[i]();
210  }
211  callingPendingFunctors_ = false;
212}
213
214