1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#include "EventLoop.h"
9
10#include "Channel.h"
11#include "EPoller.h"
12#include "TimerQueue.h"
13
14#include "logging/Logging.h"
15
16#include <boost/bind.hpp>
17
18#include <assert.h>
19#include <signal.h>
20#include <sys/eventfd.h>
21
22using namespace muduo;
23
24__thread EventLoop* t_loopInThisThread = 0;
25const int kPollTimeMs = 10000;
26
27static int createEventfd()
28{
29  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30  if (evtfd < 0)
31  {
32    LOG_SYSERR << "Failed in eventfd";
33    abort();
34  }
35  return evtfd;
36}
37
38class IgnoreSigPipe
39{
40 public:
41  IgnoreSigPipe()
42  {
43    ::signal(SIGPIPE, SIG_IGN);
44  }
45};
46
47IgnoreSigPipe initObj;
48
49EventLoop::EventLoop()
50  : looping_(false),
51    quit_(false),
52    callingPendingFunctors_(false),
53    threadId_(CurrentThread::tid()),
54    poller_(new EPoller(this)),
55    timerQueue_(new TimerQueue(this)),
56    wakeupFd_(createEventfd()),
57    wakeupChannel_(new Channel(this, wakeupFd_))
58{
59  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60  if (t_loopInThisThread)
61  {
62    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63              << " exists in this thread " << threadId_;
64  }
65  else
66  {
67    t_loopInThisThread = this;
68  }
69  wakeupChannel_->setReadCallback(
70      boost::bind(&EventLoop::handleRead, this));
71  // we are always reading the wakeupfd
72  wakeupChannel_->enableReading();
73}
74
75EventLoop::~EventLoop()
76{
77  assert(!looping_);
78  ::close(wakeupFd_);
79  t_loopInThisThread = NULL;
80}
81
82void EventLoop::loop()
83{
84  assert(!looping_);
85  assertInLoopThread();
86  looping_ = true;
87  quit_ = false;
88
89  while (!quit_)
90  {
91    activeChannels_.clear();
92    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93    for (ChannelList::iterator it = activeChannels_.begin();
94        it != activeChannels_.end(); ++it)
95    {
96      (*it)->handleEvent(pollReturnTime_);
97    }
98    doPendingFunctors();
99  }
100
101  LOG_TRACE << "EventLoop " << this << " stop looping";
102  looping_ = false;
103}
104
105void EventLoop::quit()
106{
107  quit_ = true;
108  if (!isInLoopThread())
109  {
110    wakeup();
111  }
112}
113
114void EventLoop::runInLoop(const Functor& cb)
115{
116  if (isInLoopThread())
117  {
118    cb();
119  }
120  else
121  {
122    queueInLoop(cb);
123  }
124}
125
126void EventLoop::queueInLoop(const Functor& cb)
127{
128  {
129  MutexLockGuard lock(mutex_);
130  pendingFunctors_.push_back(cb);
131  }
132
133  if (!isInLoopThread() || callingPendingFunctors_)
134  {
135    wakeup();
136  }
137}
138
139TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140{
141  return timerQueue_->addTimer(cb, time, 0.0);
142}
143
144TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145{
146  Timestamp time(addTime(Timestamp::now(), delay));
147  return runAt(time, cb);
148}
149
150TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151{
152  Timestamp time(addTime(Timestamp::now(), interval));
153  return timerQueue_->addTimer(cb, time, interval);
154}
155
156void EventLoop::cancel(TimerId timerId)
157{
158  return timerQueue_->cancel(timerId);
159}
160
161void EventLoop::updateChannel(Channel* channel)
162{
163  assert(channel->ownerLoop() == this);
164  assertInLoopThread();
165  poller_->updateChannel(channel);
166}
167
168void EventLoop::removeChannel(Channel* channel)
169{
170  assert(channel->ownerLoop() == this);
171  assertInLoopThread();
172  poller_->removeChannel(channel);
173}
174
175void EventLoop::abortNotInLoopThread()
176{
177  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
178            << " was created in threadId_ = " << threadId_
179            << ", current thread id = " <<  CurrentThread::tid();
180}
181
182void EventLoop::wakeup()
183{
184  uint64_t one = 1;
185  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
186  if (n != sizeof one)
187  {
188    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
189  }
190}
191
192void EventLoop::handleRead()
193{
194  uint64_t one = 1;
195  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
196  if (n != sizeof one)
197  {
198    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
199  }
200}
201
202void EventLoop::doPendingFunctors()
203{
204  std::vector<Functor> functors;
205  callingPendingFunctors_ = true;
206
207  {
208  MutexLockGuard lock(mutex_);
209  functors.swap(pendingFunctors_);
210  }
211
212  for (size_t i = 0; i < functors.size(); ++i)
213  {
214    functors[i]();
215  }
216  callingPendingFunctors_ = false;
217}
218
219