EventLoop.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#include "EventLoop.h"
9a1bde736SShuo Chen
10a1bde736SShuo Chen#include "Channel.h"
11a1bde736SShuo Chen#include "Poller.h"
12a1bde736SShuo Chen#include "TimerQueue.h"
13a1bde736SShuo Chen
14a1bde736SShuo Chen#include "logging/Logging.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen
18a1bde736SShuo Chen#include <assert.h>
19a1bde736SShuo Chen#include <signal.h>
20a1bde736SShuo Chen#include <sys/eventfd.h>
21a1bde736SShuo Chen
22a1bde736SShuo Chenusing namespace muduo;
23a1bde736SShuo Chen
24a1bde736SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
25a1bde736SShuo Chenconst int kPollTimeMs = 10000;
26a1bde736SShuo Chen
27a1bde736SShuo Chenstatic int createEventfd()
28a1bde736SShuo Chen{
29a1bde736SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30a1bde736SShuo Chen  if (evtfd < 0)
31a1bde736SShuo Chen  {
32a1bde736SShuo Chen    LOG_SYSERR << "Failed in eventfd";
33a1bde736SShuo Chen    abort();
34a1bde736SShuo Chen  }
35a1bde736SShuo Chen  return evtfd;
36a1bde736SShuo Chen}
37a1bde736SShuo Chen
38a1bde736SShuo Chenclass IgnoreSigPipe
39a1bde736SShuo Chen{
40a1bde736SShuo Chen public:
41a1bde736SShuo Chen  IgnoreSigPipe()
42a1bde736SShuo Chen  {
43a1bde736SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
44a1bde736SShuo Chen  }
45a1bde736SShuo Chen};
46a1bde736SShuo Chen
47a1bde736SShuo ChenIgnoreSigPipe initObj;
48a1bde736SShuo Chen
49a1bde736SShuo ChenEventLoop::EventLoop()
50a1bde736SShuo Chen  : looping_(false),
51a1bde736SShuo Chen    quit_(false),
52a1bde736SShuo Chen    callingPendingFunctors_(false),
53a1bde736SShuo Chen    threadId_(CurrentThread::tid()),
54a1bde736SShuo Chen    poller_(new Poller(this)),
55a1bde736SShuo Chen    timerQueue_(new TimerQueue(this)),
56a1bde736SShuo Chen    wakeupFd_(createEventfd()),
57a1bde736SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
58a1bde736SShuo Chen{
59a1bde736SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60a1bde736SShuo Chen  if (t_loopInThisThread)
61a1bde736SShuo Chen  {
62a1bde736SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63a1bde736SShuo Chen              << " exists in this thread " << threadId_;
64a1bde736SShuo Chen  }
65a1bde736SShuo Chen  else
66a1bde736SShuo Chen  {
67a1bde736SShuo Chen    t_loopInThisThread = this;
68a1bde736SShuo Chen  }
69a1bde736SShuo Chen  wakeupChannel_->setReadCallback(
70a1bde736SShuo Chen      boost::bind(&EventLoop::handleRead, this));
71a1bde736SShuo Chen  // we are always reading the wakeupfd
72a1bde736SShuo Chen  wakeupChannel_->enableReading();
73a1bde736SShuo Chen}
74a1bde736SShuo Chen
75a1bde736SShuo ChenEventLoop::~EventLoop()
76a1bde736SShuo Chen{
77a1bde736SShuo Chen  assert(!looping_);
78a1bde736SShuo Chen  ::close(wakeupFd_);
79a1bde736SShuo Chen  t_loopInThisThread = NULL;
80a1bde736SShuo Chen}
81a1bde736SShuo Chen
82a1bde736SShuo Chenvoid EventLoop::loop()
83a1bde736SShuo Chen{
84a1bde736SShuo Chen  assert(!looping_);
85a1bde736SShuo Chen  assertInLoopThread();
86a1bde736SShuo Chen  looping_ = true;
87a1bde736SShuo Chen  quit_ = false;
88a1bde736SShuo Chen
89a1bde736SShuo Chen  while (!quit_)
90a1bde736SShuo Chen  {
91a1bde736SShuo Chen    activeChannels_.clear();
92a1bde736SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93a1bde736SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
94a1bde736SShuo Chen        it != activeChannels_.end(); ++it)
95a1bde736SShuo Chen    {
96a1bde736SShuo Chen      (*it)->handleEvent(pollReturnTime_);
97a1bde736SShuo Chen    }
98a1bde736SShuo Chen    doPendingFunctors();
99a1bde736SShuo Chen  }
100a1bde736SShuo Chen
101a1bde736SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
102a1bde736SShuo Chen  looping_ = false;
103a1bde736SShuo Chen}
104a1bde736SShuo Chen
105a1bde736SShuo Chenvoid EventLoop::quit()
106a1bde736SShuo Chen{
107a1bde736SShuo Chen  quit_ = true;
108a1bde736SShuo Chen  if (!isInLoopThread())
109a1bde736SShuo Chen  {
110a1bde736SShuo Chen    wakeup();
111a1bde736SShuo Chen  }
112a1bde736SShuo Chen}
113a1bde736SShuo Chen
114a1bde736SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
115a1bde736SShuo Chen{
116a1bde736SShuo Chen  if (isInLoopThread())
117a1bde736SShuo Chen  {
118a1bde736SShuo Chen    cb();
119a1bde736SShuo Chen  }
120a1bde736SShuo Chen  else
121a1bde736SShuo Chen  {
122a1bde736SShuo Chen    queueInLoop(cb);
123a1bde736SShuo Chen  }
124a1bde736SShuo Chen}
125a1bde736SShuo Chen
126a1bde736SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
127a1bde736SShuo Chen{
128a1bde736SShuo Chen  {
129a1bde736SShuo Chen  MutexLockGuard lock(mutex_);
130a1bde736SShuo Chen  pendingFunctors_.push_back(cb);
131a1bde736SShuo Chen  }
132a1bde736SShuo Chen
133a1bde736SShuo Chen  if (!isInLoopThread() || callingPendingFunctors_)
134a1bde736SShuo Chen  {
135a1bde736SShuo Chen    wakeup();
136a1bde736SShuo Chen  }
137a1bde736SShuo Chen}
138a1bde736SShuo Chen
139a1bde736SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140a1bde736SShuo Chen{
141a1bde736SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
142a1bde736SShuo Chen}
143a1bde736SShuo Chen
144a1bde736SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145a1bde736SShuo Chen{
146a1bde736SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
147a1bde736SShuo Chen  return runAt(time, cb);
148a1bde736SShuo Chen}
149a1bde736SShuo Chen
150a1bde736SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151a1bde736SShuo Chen{
152a1bde736SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
153a1bde736SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
154a1bde736SShuo Chen}
155a1bde736SShuo Chen
156a1bde736SShuo Chenvoid EventLoop::cancel(TimerId timerId)
157a1bde736SShuo Chen{
158a1bde736SShuo Chen  return timerQueue_->cancel(timerId);
159a1bde736SShuo Chen}
160a1bde736SShuo Chen
161a1bde736SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
162a1bde736SShuo Chen{
163a1bde736SShuo Chen  assert(channel->ownerLoop() == this);
164a1bde736SShuo Chen  assertInLoopThread();
165a1bde736SShuo Chen  poller_->updateChannel(channel);
166a1bde736SShuo Chen}
167a1bde736SShuo Chen
168a1bde736SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
169a1bde736SShuo Chen{
170a1bde736SShuo Chen  assert(channel->ownerLoop() == this);
171a1bde736SShuo Chen  assertInLoopThread();
172a1bde736SShuo Chen  poller_->removeChannel(channel);
173a1bde736SShuo Chen}
174a1bde736SShuo Chen
175a1bde736SShuo Chenvoid EventLoop::abortNotInLoopThread()
176a1bde736SShuo Chen{
177a1bde736SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
178a1bde736SShuo Chen            << " was created in threadId_ = " << threadId_
179a1bde736SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
180a1bde736SShuo Chen}
181a1bde736SShuo Chen
182a1bde736SShuo Chenvoid EventLoop::wakeup()
183a1bde736SShuo Chen{
184a1bde736SShuo Chen  uint64_t one = 1;
185a1bde736SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
186a1bde736SShuo Chen  if (n != sizeof one)
187a1bde736SShuo Chen  {
188a1bde736SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
189a1bde736SShuo Chen  }
190a1bde736SShuo Chen}
191a1bde736SShuo Chen
192a1bde736SShuo Chenvoid EventLoop::handleRead()
193a1bde736SShuo Chen{
194a1bde736SShuo Chen  uint64_t one = 1;
195a1bde736SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
196a1bde736SShuo Chen  if (n != sizeof one)
197a1bde736SShuo Chen  {
198a1bde736SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
199a1bde736SShuo Chen  }
200a1bde736SShuo Chen}
201a1bde736SShuo Chen
202a1bde736SShuo Chenvoid EventLoop::doPendingFunctors()
203a1bde736SShuo Chen{
204a1bde736SShuo Chen  std::vector<Functor> functors;
205a1bde736SShuo Chen  callingPendingFunctors_ = true;
206a1bde736SShuo Chen
207a1bde736SShuo Chen  {
208a1bde736SShuo Chen  MutexLockGuard lock(mutex_);
209a1bde736SShuo Chen  functors.swap(pendingFunctors_);
210a1bde736SShuo Chen  }
211a1bde736SShuo Chen
212a1bde736SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
213a1bde736SShuo Chen  {
214a1bde736SShuo Chen    functors[i]();
215a1bde736SShuo Chen  }
216a1bde736SShuo Chen  callingPendingFunctors_ = false;
217a1bde736SShuo Chen}
218a1bde736SShuo Chen
219