EventLoop.cc revision 354280cf
1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/
2354280cfSShuo Chen//
3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license
4354280cfSShuo Chen// that can be found in the License file.
5354280cfSShuo Chen//
6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7354280cfSShuo Chen
8354280cfSShuo Chen#include "EventLoop.h"
9354280cfSShuo Chen
10354280cfSShuo Chen#include "Channel.h"
11354280cfSShuo Chen#include "Poller.h"
12354280cfSShuo Chen#include "TimerQueue.h"
13354280cfSShuo Chen
14354280cfSShuo Chen#include "logging/Logging.h"
15354280cfSShuo Chen
16354280cfSShuo Chen#include <boost/bind.hpp>
17354280cfSShuo Chen
18354280cfSShuo Chen#include <assert.h>
19354280cfSShuo Chen#include <signal.h>
20354280cfSShuo Chen#include <sys/eventfd.h>
21354280cfSShuo Chen
22354280cfSShuo Chenusing namespace muduo;
23354280cfSShuo Chen
24354280cfSShuo Chen__thread EventLoop* t_loopInThisThread = 0;
25354280cfSShuo Chenconst int kPollTimeMs = 10000;
26354280cfSShuo Chen
27354280cfSShuo Chenstatic int createEventfd()
28354280cfSShuo Chen{
29354280cfSShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30354280cfSShuo Chen  if (evtfd < 0)
31354280cfSShuo Chen  {
32354280cfSShuo Chen    LOG_SYSERR << "Failed in eventfd";
33354280cfSShuo Chen    abort();
34354280cfSShuo Chen  }
35354280cfSShuo Chen  return evtfd;
36354280cfSShuo Chen}
37354280cfSShuo Chen
38354280cfSShuo Chenclass IgnoreSigPipe
39354280cfSShuo Chen{
40354280cfSShuo Chen public:
41354280cfSShuo Chen  IgnoreSigPipe()
42354280cfSShuo Chen  {
43354280cfSShuo Chen    ::signal(SIGPIPE, SIG_IGN);
44354280cfSShuo Chen  }
45354280cfSShuo Chen};
46354280cfSShuo Chen
47354280cfSShuo ChenIgnoreSigPipe initObj;
48354280cfSShuo Chen
49354280cfSShuo ChenEventLoop::EventLoop()
50354280cfSShuo Chen  : looping_(false),
51354280cfSShuo Chen    quit_(false),
52354280cfSShuo Chen    callingPendingFunctors_(false),
53354280cfSShuo Chen    threadId_(CurrentThread::tid()),
54354280cfSShuo Chen    poller_(new Poller(this)),
55354280cfSShuo Chen    timerQueue_(new TimerQueue(this)),
56354280cfSShuo Chen    wakeupFd_(createEventfd()),
57354280cfSShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
58354280cfSShuo Chen{
59354280cfSShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60354280cfSShuo Chen  if (t_loopInThisThread)
61354280cfSShuo Chen  {
62354280cfSShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63354280cfSShuo Chen              << " exists in this thread " << threadId_;
64354280cfSShuo Chen  }
65354280cfSShuo Chen  else
66354280cfSShuo Chen  {
67354280cfSShuo Chen    t_loopInThisThread = this;
68354280cfSShuo Chen  }
69354280cfSShuo Chen  wakeupChannel_->setReadCallback(
70354280cfSShuo Chen      boost::bind(&EventLoop::handleRead, this));
71354280cfSShuo Chen  // we are always reading the wakeupfd
72354280cfSShuo Chen  wakeupChannel_->enableReading();
73354280cfSShuo Chen}
74354280cfSShuo Chen
75354280cfSShuo ChenEventLoop::~EventLoop()
76354280cfSShuo Chen{
77354280cfSShuo Chen  assert(!looping_);
78354280cfSShuo Chen  ::close(wakeupFd_);
79354280cfSShuo Chen  t_loopInThisThread = NULL;
80354280cfSShuo Chen}
81354280cfSShuo Chen
82354280cfSShuo Chenvoid EventLoop::loop()
83354280cfSShuo Chen{
84354280cfSShuo Chen  assert(!looping_);
85354280cfSShuo Chen  assertInLoopThread();
86354280cfSShuo Chen  looping_ = true;
87354280cfSShuo Chen  quit_ = false;
88354280cfSShuo Chen
89354280cfSShuo Chen  while (!quit_)
90354280cfSShuo Chen  {
91354280cfSShuo Chen    activeChannels_.clear();
92354280cfSShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93354280cfSShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
94354280cfSShuo Chen        it != activeChannels_.end(); ++it)
95354280cfSShuo Chen    {
96354280cfSShuo Chen      (*it)->handleEvent(pollReturnTime_);
97354280cfSShuo Chen    }
98354280cfSShuo Chen    doPendingFunctors();
99354280cfSShuo Chen  }
100354280cfSShuo Chen
101354280cfSShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
102354280cfSShuo Chen  looping_ = false;
103354280cfSShuo Chen}
104354280cfSShuo Chen
105354280cfSShuo Chenvoid EventLoop::quit()
106354280cfSShuo Chen{
107354280cfSShuo Chen  quit_ = true;
108354280cfSShuo Chen  if (!isInLoopThread())
109354280cfSShuo Chen  {
110354280cfSShuo Chen    wakeup();
111354280cfSShuo Chen  }
112354280cfSShuo Chen}
113354280cfSShuo Chen
114354280cfSShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
115354280cfSShuo Chen{
116354280cfSShuo Chen  if (isInLoopThread())
117354280cfSShuo Chen  {
118354280cfSShuo Chen    cb();
119354280cfSShuo Chen  }
120354280cfSShuo Chen  else
121354280cfSShuo Chen  {
122354280cfSShuo Chen    queueInLoop(cb);
123354280cfSShuo Chen  }
124354280cfSShuo Chen}
125354280cfSShuo Chen
126354280cfSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
127354280cfSShuo Chen{
128354280cfSShuo Chen  {
129354280cfSShuo Chen  MutexLockGuard lock(mutex_);
130354280cfSShuo Chen  pendingFunctors_.push_back(cb);
131354280cfSShuo Chen  }
132354280cfSShuo Chen
133354280cfSShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
134354280cfSShuo Chen  {
135354280cfSShuo Chen    wakeup();
136354280cfSShuo Chen  }
137354280cfSShuo Chen}
138354280cfSShuo Chen
139354280cfSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140354280cfSShuo Chen{
141354280cfSShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
142354280cfSShuo Chen}
143354280cfSShuo Chen
144354280cfSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145354280cfSShuo Chen{
146354280cfSShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
147354280cfSShuo Chen  return runAt(time, cb);
148354280cfSShuo Chen}
149354280cfSShuo Chen
150354280cfSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151354280cfSShuo Chen{
152354280cfSShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
153354280cfSShuo Chen  return timerQueue_->addTimer(cb, time, interval);
154354280cfSShuo Chen}
155354280cfSShuo Chen
156354280cfSShuo Chenvoid EventLoop::cancel(TimerId timerId)
157354280cfSShuo Chen{
158354280cfSShuo Chen  return timerQueue_->cancel(timerId);
159354280cfSShuo Chen}
160354280cfSShuo Chen
161354280cfSShuo Chenvoid EventLoop::updateChannel(Channel* channel)
162354280cfSShuo Chen{
163354280cfSShuo Chen  assert(channel->ownerLoop() == this);
164354280cfSShuo Chen  assertInLoopThread();
165354280cfSShuo Chen  poller_->updateChannel(channel);
166354280cfSShuo Chen}
167354280cfSShuo Chen
168354280cfSShuo Chenvoid EventLoop::removeChannel(Channel* channel)
169354280cfSShuo Chen{
170354280cfSShuo Chen  assert(channel->ownerLoop() == this);
171354280cfSShuo Chen  assertInLoopThread();
172354280cfSShuo Chen  poller_->removeChannel(channel);
173354280cfSShuo Chen}
174354280cfSShuo Chen
175354280cfSShuo Chenvoid EventLoop::abortNotInLoopThread()
176354280cfSShuo Chen{
177354280cfSShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
178354280cfSShuo Chen            << " was created in threadId_ = " << threadId_
179354280cfSShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
180354280cfSShuo Chen}
181354280cfSShuo Chen
182354280cfSShuo Chenvoid EventLoop::wakeup()
183354280cfSShuo Chen{
184354280cfSShuo Chen  uint64_t one = 1;
185354280cfSShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
186354280cfSShuo Chen  if (n != sizeof one)
187354280cfSShuo Chen  {
188354280cfSShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
189354280cfSShuo Chen  }
190354280cfSShuo Chen}
191354280cfSShuo Chen
192354280cfSShuo Chenvoid EventLoop::handleRead()
193354280cfSShuo Chen{
194354280cfSShuo Chen  uint64_t one = 1;
195354280cfSShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
196354280cfSShuo Chen  if (n != sizeof one)
197354280cfSShuo Chen  {
198354280cfSShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
199354280cfSShuo Chen  }
200354280cfSShuo Chen}
201354280cfSShuo Chen
202354280cfSShuo Chenvoid EventLoop::doPendingFunctors()
203354280cfSShuo Chen{
204354280cfSShuo Chen  std::vector<Functor> functors;
205354280cfSShuo Chen  callingPendingFunctors_ = true;
206354280cfSShuo Chen
207354280cfSShuo Chen  {
208354280cfSShuo Chen  MutexLockGuard lock(mutex_);
209354280cfSShuo Chen  functors.swap(pendingFunctors_);
210354280cfSShuo Chen  }
211354280cfSShuo Chen
212354280cfSShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
213354280cfSShuo Chen  {
214354280cfSShuo Chen    functors[i]();
215354280cfSShuo Chen  }
216354280cfSShuo Chen  callingPendingFunctors_ = false;
217354280cfSShuo Chen}
218354280cfSShuo Chen
219