EventLoop.cc revision e254a845
1e254a845SShuo Chen// excerpts from http://code.google.com/p/muduo/
2e254a845SShuo Chen//
3e254a845SShuo Chen// Use of this source code is governed by a BSD-style license
4e254a845SShuo Chen// that can be found in the License file.
5e254a845SShuo Chen//
6e254a845SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7e254a845SShuo Chen
8e254a845SShuo Chen#include "EventLoop.h"
9e254a845SShuo Chen
10e254a845SShuo Chen#include "Channel.h"
11e254a845SShuo Chen#include "Poller.h"
12e254a845SShuo Chen#include "TimerQueue.h"
13e254a845SShuo Chen
14e254a845SShuo Chen#include "logging/Logging.h"
15e254a845SShuo Chen
16e254a845SShuo Chen#include <boost/bind.hpp>
17e254a845SShuo Chen
18e254a845SShuo Chen#include <assert.h>
19e254a845SShuo Chen#include <signal.h>
20e254a845SShuo Chen#include <sys/eventfd.h>
21e254a845SShuo Chen
22e254a845SShuo Chenusing namespace muduo;
23e254a845SShuo Chen
24e254a845SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
25e254a845SShuo Chenconst int kPollTimeMs = 10000;
26e254a845SShuo Chen
27e254a845SShuo Chenstatic int createEventfd()
28e254a845SShuo Chen{
29e254a845SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30e254a845SShuo Chen  if (evtfd < 0)
31e254a845SShuo Chen  {
32e254a845SShuo Chen    LOG_SYSERR << "Failed in eventfd";
33e254a845SShuo Chen    abort();
34e254a845SShuo Chen  }
35e254a845SShuo Chen  return evtfd;
36e254a845SShuo Chen}
37e254a845SShuo Chen
38e254a845SShuo Chenclass IgnoreSigPipe
39e254a845SShuo Chen{
40e254a845SShuo Chen public:
41e254a845SShuo Chen  IgnoreSigPipe()
42e254a845SShuo Chen  {
43e254a845SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
44e254a845SShuo Chen  }
45e254a845SShuo Chen};
46e254a845SShuo Chen
47e254a845SShuo ChenIgnoreSigPipe initObj;
48e254a845SShuo Chen
49e254a845SShuo ChenEventLoop::EventLoop()
50e254a845SShuo Chen  : looping_(false),
51e254a845SShuo Chen    quit_(false),
52e254a845SShuo Chen    callingPendingFunctors_(false),
53e254a845SShuo Chen    threadId_(CurrentThread::tid()),
54e254a845SShuo Chen    poller_(new Poller(this)),
55e254a845SShuo Chen    timerQueue_(new TimerQueue(this)),
56e254a845SShuo Chen    wakeupFd_(createEventfd()),
57e254a845SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
58e254a845SShuo Chen{
59e254a845SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60e254a845SShuo Chen  if (t_loopInThisThread)
61e254a845SShuo Chen  {
62e254a845SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63e254a845SShuo Chen              << " exists in this thread " << threadId_;
64e254a845SShuo Chen  }
65e254a845SShuo Chen  else
66e254a845SShuo Chen  {
67e254a845SShuo Chen    t_loopInThisThread = this;
68e254a845SShuo Chen  }
69e254a845SShuo Chen  wakeupChannel_->setReadCallback(
70e254a845SShuo Chen      boost::bind(&EventLoop::handleRead, this));
71e254a845SShuo Chen  // we are always reading the wakeupfd
72e254a845SShuo Chen  wakeupChannel_->enableReading();
73e254a845SShuo Chen}
74e254a845SShuo Chen
75e254a845SShuo ChenEventLoop::~EventLoop()
76e254a845SShuo Chen{
77e254a845SShuo Chen  assert(!looping_);
78e254a845SShuo Chen  ::close(wakeupFd_);
79e254a845SShuo Chen  t_loopInThisThread = NULL;
80e254a845SShuo Chen}
81e254a845SShuo Chen
82e254a845SShuo Chenvoid EventLoop::loop()
83e254a845SShuo Chen{
84e254a845SShuo Chen  assert(!looping_);
85e254a845SShuo Chen  assertInLoopThread();
86e254a845SShuo Chen  looping_ = true;
87e254a845SShuo Chen  quit_ = false;
88e254a845SShuo Chen
89e254a845SShuo Chen  while (!quit_)
90e254a845SShuo Chen  {
91e254a845SShuo Chen    activeChannels_.clear();
92e254a845SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93e254a845SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
94e254a845SShuo Chen        it != activeChannels_.end(); ++it)
95e254a845SShuo Chen    {
96e254a845SShuo Chen      (*it)->handleEvent(pollReturnTime_);
97e254a845SShuo Chen    }
98e254a845SShuo Chen    doPendingFunctors();
99e254a845SShuo Chen  }
100e254a845SShuo Chen
101e254a845SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
102e254a845SShuo Chen  looping_ = false;
103e254a845SShuo Chen}
104e254a845SShuo Chen
105e254a845SShuo Chenvoid EventLoop::quit()
106e254a845SShuo Chen{
107e254a845SShuo Chen  quit_ = true;
108e254a845SShuo Chen  if (!isInLoopThread())
109e254a845SShuo Chen  {
110e254a845SShuo Chen    wakeup();
111e254a845SShuo Chen  }
112e254a845SShuo Chen}
113e254a845SShuo Chen
114e254a845SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
115e254a845SShuo Chen{
116e254a845SShuo Chen  if (isInLoopThread())
117e254a845SShuo Chen  {
118e254a845SShuo Chen    cb();
119e254a845SShuo Chen  }
120e254a845SShuo Chen  else
121e254a845SShuo Chen  {
122e254a845SShuo Chen    queueInLoop(cb);
123e254a845SShuo Chen    wakeup();
124e254a845SShuo Chen  }
125e254a845SShuo Chen}
126e254a845SShuo Chen
127e254a845SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
128e254a845SShuo Chen{
129e254a845SShuo Chen  {
130e254a845SShuo Chen  MutexLockGuard lock(mutex_);
131e254a845SShuo Chen  pendingFunctors_.push_back(cb);
132e254a845SShuo Chen  }
133e254a845SShuo Chen
134e254a845SShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
135e254a845SShuo Chen  {
136e254a845SShuo Chen    wakeup();
137e254a845SShuo Chen  }
138e254a845SShuo Chen}
139e254a845SShuo Chen
140e254a845SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
141e254a845SShuo Chen{
142e254a845SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
143e254a845SShuo Chen}
144e254a845SShuo Chen
145e254a845SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
146e254a845SShuo Chen{
147e254a845SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
148e254a845SShuo Chen  return runAt(time, cb);
149e254a845SShuo Chen}
150e254a845SShuo Chen
151e254a845SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
152e254a845SShuo Chen{
153e254a845SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
154e254a845SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
155e254a845SShuo Chen}
156e254a845SShuo Chen
157e254a845SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
158e254a845SShuo Chen{
159e254a845SShuo Chen  assert(channel->ownerLoop() == this);
160e254a845SShuo Chen  assertInLoopThread();
161e254a845SShuo Chen  poller_->updateChannel(channel);
162e254a845SShuo Chen}
163e254a845SShuo Chen
164e254a845SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
165e254a845SShuo Chen{
166e254a845SShuo Chen  assert(channel->ownerLoop() == this);
167e254a845SShuo Chen  assertInLoopThread();
168e254a845SShuo Chen  poller_->removeChannel(channel);
169e254a845SShuo Chen}
170e254a845SShuo Chen
171e254a845SShuo Chenvoid EventLoop::abortNotInLoopThread()
172e254a845SShuo Chen{
173e254a845SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
174e254a845SShuo Chen            << " was created in threadId_ = " << threadId_
175e254a845SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
176e254a845SShuo Chen}
177e254a845SShuo Chen
178e254a845SShuo Chenvoid EventLoop::wakeup()
179e254a845SShuo Chen{
180e254a845SShuo Chen  uint64_t one = 1;
181e254a845SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
182e254a845SShuo Chen  if (n != sizeof one)
183e254a845SShuo Chen  {
184e254a845SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
185e254a845SShuo Chen  }
186e254a845SShuo Chen}
187e254a845SShuo Chen
188e254a845SShuo Chenvoid EventLoop::handleRead()
189e254a845SShuo Chen{
190e254a845SShuo Chen  uint64_t one = 1;
191e254a845SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
192e254a845SShuo Chen  if (n != sizeof one)
193e254a845SShuo Chen  {
194e254a845SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
195e254a845SShuo Chen  }
196e254a845SShuo Chen}
197e254a845SShuo Chen
198e254a845SShuo Chenvoid EventLoop::doPendingFunctors()
199e254a845SShuo Chen{
200e254a845SShuo Chen  std::vector<Functor> functors;
201e254a845SShuo Chen  callingPendingFunctors_ = true;
202e254a845SShuo Chen
203e254a845SShuo Chen  {
204e254a845SShuo Chen  MutexLockGuard lock(mutex_);
205e254a845SShuo Chen  functors.swap(pendingFunctors_);
206e254a845SShuo Chen  }
207e254a845SShuo Chen
208e254a845SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
209e254a845SShuo Chen  {
210e254a845SShuo Chen    functors[i]();
211e254a845SShuo Chen  }
212e254a845SShuo Chen  callingPendingFunctors_ = false;
213e254a845SShuo Chen}
214e254a845SShuo Chen
215