1e254a845SShuo Chen// excerpts from http://code.google.com/p/muduo/
2e254a845SShuo Chen//
3e254a845SShuo Chen// Use of this source code is governed by a BSD-style license
4e254a845SShuo Chen// that can be found in the License file.
5e254a845SShuo Chen//
6e254a845SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7e254a845SShuo Chen
8e254a845SShuo Chen#include "EventLoop.h"
9e254a845SShuo Chen
10e254a845SShuo Chen#include "Channel.h"
11e254a845SShuo Chen#include "Poller.h"
12e254a845SShuo Chen#include "TimerQueue.h"
13e254a845SShuo Chen
14e254a845SShuo Chen#include "logging/Logging.h"
15e254a845SShuo Chen
16e254a845SShuo Chen#include <boost/bind.hpp>
17e254a845SShuo Chen
18e254a845SShuo Chen#include <assert.h>
19e254a845SShuo Chen#include <signal.h>
20e254a845SShuo Chen#include <sys/eventfd.h>
21e254a845SShuo Chen
22e254a845SShuo Chenusing namespace muduo;
23e254a845SShuo Chen
24e254a845SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
25e254a845SShuo Chenconst int kPollTimeMs = 10000;
26e254a845SShuo Chen
27e254a845SShuo Chenstatic int createEventfd()
28e254a845SShuo Chen{
29e254a845SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30e254a845SShuo Chen  if (evtfd < 0)
31e254a845SShuo Chen  {
32e254a845SShuo Chen    LOG_SYSERR << "Failed in eventfd";
33e254a845SShuo Chen    abort();
34e254a845SShuo Chen  }
35e254a845SShuo Chen  return evtfd;
36e254a845SShuo Chen}
37e254a845SShuo Chen
38e254a845SShuo Chenclass IgnoreSigPipe
39e254a845SShuo Chen{
40e254a845SShuo Chen public:
41e254a845SShuo Chen  IgnoreSigPipe()
42e254a845SShuo Chen  {
43e254a845SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
44e254a845SShuo Chen  }
45e254a845SShuo Chen};
46e254a845SShuo Chen
47e254a845SShuo ChenIgnoreSigPipe initObj;
48e254a845SShuo Chen
49e254a845SShuo ChenEventLoop::EventLoop()
50e254a845SShuo Chen  : looping_(false),
51e254a845SShuo Chen    quit_(false),
52e254a845SShuo Chen    callingPendingFunctors_(false),
53e254a845SShuo Chen    threadId_(CurrentThread::tid()),
54e254a845SShuo Chen    poller_(new Poller(this)),
55e254a845SShuo Chen    timerQueue_(new TimerQueue(this)),
56e254a845SShuo Chen    wakeupFd_(createEventfd()),
57e254a845SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
58e254a845SShuo Chen{
59e254a845SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60e254a845SShuo Chen  if (t_loopInThisThread)
61e254a845SShuo Chen  {
62e254a845SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63e254a845SShuo Chen              << " exists in this thread " << threadId_;
64e254a845SShuo Chen  }
65e254a845SShuo Chen  else
66e254a845SShuo Chen  {
67e254a845SShuo Chen    t_loopInThisThread = this;
68e254a845SShuo Chen  }
69e254a845SShuo Chen  wakeupChannel_->setReadCallback(
70e254a845SShuo Chen      boost::bind(&EventLoop::handleRead, this));
71e254a845SShuo Chen  // we are always reading the wakeupfd
72e254a845SShuo Chen  wakeupChannel_->enableReading();
73e254a845SShuo Chen}
74e254a845SShuo Chen
75e254a845SShuo ChenEventLoop::~EventLoop()
76e254a845SShuo Chen{
77e254a845SShuo Chen  assert(!looping_);
78e254a845SShuo Chen  ::close(wakeupFd_);
79e254a845SShuo Chen  t_loopInThisThread = NULL;
80e254a845SShuo Chen}
81e254a845SShuo Chen
82e254a845SShuo Chenvoid EventLoop::loop()
83e254a845SShuo Chen{
84e254a845SShuo Chen  assert(!looping_);
85e254a845SShuo Chen  assertInLoopThread();
86e254a845SShuo Chen  looping_ = true;
87e254a845SShuo Chen  quit_ = false;
88e254a845SShuo Chen
89e254a845SShuo Chen  while (!quit_)
90e254a845SShuo Chen  {
91e254a845SShuo Chen    activeChannels_.clear();
92e254a845SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93e254a845SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
94e254a845SShuo Chen        it != activeChannels_.end(); ++it)
95e254a845SShuo Chen    {
96e254a845SShuo Chen      (*it)->handleEvent(pollReturnTime_);
97e254a845SShuo Chen    }
98e254a845SShuo Chen    doPendingFunctors();
99e254a845SShuo Chen  }
100e254a845SShuo Chen
101e254a845SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
102e254a845SShuo Chen  looping_ = false;
103e254a845SShuo Chen}
104e254a845SShuo Chen
105e254a845SShuo Chenvoid EventLoop::quit()
106e254a845SShuo Chen{
107e254a845SShuo Chen  quit_ = true;
108e254a845SShuo Chen  if (!isInLoopThread())
109e254a845SShuo Chen  {
110e254a845SShuo Chen    wakeup();
111e254a845SShuo Chen  }
112e254a845SShuo Chen}
113e254a845SShuo Chen
114e254a845SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
115e254a845SShuo Chen{
116e254a845SShuo Chen  if (isInLoopThread())
117e254a845SShuo Chen  {
118e254a845SShuo Chen    cb();
119e254a845SShuo Chen  }
120e254a845SShuo Chen  else
121e254a845SShuo Chen  {
122e254a845SShuo Chen    queueInLoop(cb);
123e254a845SShuo Chen  }
124e254a845SShuo Chen}
125e254a845SShuo Chen
126e254a845SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
127e254a845SShuo Chen{
128e254a845SShuo Chen  {
129e254a845SShuo Chen  MutexLockGuard lock(mutex_);
130e254a845SShuo Chen  pendingFunctors_.push_back(cb);
131e254a845SShuo Chen  }
132e254a845SShuo Chen
1330f776063SShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
134e254a845SShuo Chen  {
135e254a845SShuo Chen    wakeup();
136e254a845SShuo Chen  }
137e254a845SShuo Chen}
138e254a845SShuo Chen
139e254a845SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140e254a845SShuo Chen{
141e254a845SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
142e254a845SShuo Chen}
143e254a845SShuo Chen
144e254a845SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145e254a845SShuo Chen{
146e254a845SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
147e254a845SShuo Chen  return runAt(time, cb);
148e254a845SShuo Chen}
149e254a845SShuo Chen
150e254a845SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151e254a845SShuo Chen{
152e254a845SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
153e254a845SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
154e254a845SShuo Chen}
155e254a845SShuo Chen
156e254a845SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
157e254a845SShuo Chen{
158e254a845SShuo Chen  assert(channel->ownerLoop() == this);
159e254a845SShuo Chen  assertInLoopThread();
160e254a845SShuo Chen  poller_->updateChannel(channel);
161e254a845SShuo Chen}
162e254a845SShuo Chen
163e254a845SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
164e254a845SShuo Chen{
165e254a845SShuo Chen  assert(channel->ownerLoop() == this);
166e254a845SShuo Chen  assertInLoopThread();
167e254a845SShuo Chen  poller_->removeChannel(channel);
168e254a845SShuo Chen}
169e254a845SShuo Chen
170e254a845SShuo Chenvoid EventLoop::abortNotInLoopThread()
171e254a845SShuo Chen{
172e254a845SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
173e254a845SShuo Chen            << " was created in threadId_ = " << threadId_
174e254a845SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
175e254a845SShuo Chen}
176e254a845SShuo Chen
177e254a845SShuo Chenvoid EventLoop::wakeup()
178e254a845SShuo Chen{
179e254a845SShuo Chen  uint64_t one = 1;
180e254a845SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
181e254a845SShuo Chen  if (n != sizeof one)
182e254a845SShuo Chen  {
183e254a845SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
184e254a845SShuo Chen  }
185e254a845SShuo Chen}
186e254a845SShuo Chen
187e254a845SShuo Chenvoid EventLoop::handleRead()
188e254a845SShuo Chen{
189e254a845SShuo Chen  uint64_t one = 1;
190e254a845SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
191e254a845SShuo Chen  if (n != sizeof one)
192e254a845SShuo Chen  {
193e254a845SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
194e254a845SShuo Chen  }
195e254a845SShuo Chen}
196e254a845SShuo Chen
197e254a845SShuo Chenvoid EventLoop::doPendingFunctors()
198e254a845SShuo Chen{
199e254a845SShuo Chen  std::vector<Functor> functors;
200e254a845SShuo Chen  callingPendingFunctors_ = true;
201e254a845SShuo Chen
202e254a845SShuo Chen  {
203e254a845SShuo Chen  MutexLockGuard lock(mutex_);
204e254a845SShuo Chen  functors.swap(pendingFunctors_);
205e254a845SShuo Chen  }
206e254a845SShuo Chen
207e254a845SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
208e254a845SShuo Chen  {
209e254a845SShuo Chen    functors[i]();
210e254a845SShuo Chen  }
211e254a845SShuo Chen  callingPendingFunctors_ = false;
212e254a845SShuo Chen}
213e254a845SShuo Chen
214