EventLoop.cc revision a4e9058b
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/
2b37003a7SShuo Chen//
3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license
4b37003a7SShuo Chen// that can be found in the License file.
5b37003a7SShuo Chen//
6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7b37003a7SShuo Chen
8b37003a7SShuo Chen#include "EventLoop.h"
9b37003a7SShuo Chen
10b37003a7SShuo Chen#include "Channel.h"
11b37003a7SShuo Chen#include "Poller.h"
12b37003a7SShuo Chen#include "TimerQueue.h"
13b37003a7SShuo Chen
14b37003a7SShuo Chen#include "logging/Logging.h"
15b37003a7SShuo Chen
16b37003a7SShuo Chen#include <boost/bind.hpp>
17b37003a7SShuo Chen
18b37003a7SShuo Chen#include <assert.h>
19a4e9058bSShuo Chen#include <signal.h>
20b37003a7SShuo Chen#include <sys/eventfd.h>
21b37003a7SShuo Chen
22b37003a7SShuo Chenusing namespace muduo;
23b37003a7SShuo Chen
24b37003a7SShuo Chen__thread EventLoop* t_loopInThisThread = 0;
25b37003a7SShuo Chenconst int kPollTimeMs = 10000;
26b37003a7SShuo Chen
27b37003a7SShuo Chenstatic int createEventfd()
28b37003a7SShuo Chen{
29b37003a7SShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30b37003a7SShuo Chen  if (evtfd < 0)
31b37003a7SShuo Chen  {
32b37003a7SShuo Chen    LOG_SYSERR << "Failed in eventfd";
33b37003a7SShuo Chen    abort();
34b37003a7SShuo Chen  }
35b37003a7SShuo Chen  return evtfd;
36b37003a7SShuo Chen}
37b37003a7SShuo Chen
38a4e9058bSShuo Chenclass IgnoreSigPipe
39a4e9058bSShuo Chen{
40a4e9058bSShuo Chen public:
41a4e9058bSShuo Chen  IgnoreSigPipe()
42a4e9058bSShuo Chen  {
43a4e9058bSShuo Chen    ::signal(SIGPIPE, SIG_IGN);
44a4e9058bSShuo Chen  }
45a4e9058bSShuo Chen};
46a4e9058bSShuo Chen
47a4e9058bSShuo ChenIgnoreSigPipe initObj;
48a4e9058bSShuo Chen
49b37003a7SShuo ChenEventLoop::EventLoop()
50b37003a7SShuo Chen  : looping_(false),
51b37003a7SShuo Chen    quit_(false),
52b37003a7SShuo Chen    callingPendingFunctors_(false),
53b37003a7SShuo Chen    threadId_(CurrentThread::tid()),
54b37003a7SShuo Chen    poller_(new Poller(this)),
55b37003a7SShuo Chen    timerQueue_(new TimerQueue(this)),
56b37003a7SShuo Chen    wakeupFd_(createEventfd()),
57b37003a7SShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
58b37003a7SShuo Chen{
59b37003a7SShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60b37003a7SShuo Chen  if (t_loopInThisThread)
61b37003a7SShuo Chen  {
62b37003a7SShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63b37003a7SShuo Chen              << " exists in this thread " << threadId_;
64b37003a7SShuo Chen  }
65b37003a7SShuo Chen  else
66b37003a7SShuo Chen  {
67b37003a7SShuo Chen    t_loopInThisThread = this;
68b37003a7SShuo Chen  }
69b37003a7SShuo Chen  wakeupChannel_->setReadCallback(
70b37003a7SShuo Chen      boost::bind(&EventLoop::handleRead, this));
71b37003a7SShuo Chen  // we are always reading the wakeupfd
72b37003a7SShuo Chen  wakeupChannel_->enableReading();
73b37003a7SShuo Chen}
74b37003a7SShuo Chen
75b37003a7SShuo ChenEventLoop::~EventLoop()
76b37003a7SShuo Chen{
77b37003a7SShuo Chen  assert(!looping_);
78b37003a7SShuo Chen  ::close(wakeupFd_);
79b37003a7SShuo Chen  t_loopInThisThread = NULL;
80b37003a7SShuo Chen}
81b37003a7SShuo Chen
82b37003a7SShuo Chenvoid EventLoop::loop()
83b37003a7SShuo Chen{
84b37003a7SShuo Chen  assert(!looping_);
85b37003a7SShuo Chen  assertInLoopThread();
86b37003a7SShuo Chen  looping_ = true;
87b37003a7SShuo Chen  quit_ = false;
88b37003a7SShuo Chen
89b37003a7SShuo Chen  while (!quit_)
90b37003a7SShuo Chen  {
91b37003a7SShuo Chen    activeChannels_.clear();
92b37003a7SShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93b37003a7SShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
94b37003a7SShuo Chen        it != activeChannels_.end(); ++it)
95b37003a7SShuo Chen    {
96b37003a7SShuo Chen      (*it)->handleEvent(pollReturnTime_);
97b37003a7SShuo Chen    }
98b37003a7SShuo Chen    doPendingFunctors();
99b37003a7SShuo Chen  }
100b37003a7SShuo Chen
101b37003a7SShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
102b37003a7SShuo Chen  looping_ = false;
103b37003a7SShuo Chen}
104b37003a7SShuo Chen
105b37003a7SShuo Chenvoid EventLoop::quit()
106b37003a7SShuo Chen{
107b37003a7SShuo Chen  quit_ = true;
108b37003a7SShuo Chen  if (!isInLoopThread())
109b37003a7SShuo Chen  {
110b37003a7SShuo Chen    wakeup();
111b37003a7SShuo Chen  }
112b37003a7SShuo Chen}
113b37003a7SShuo Chen
114b37003a7SShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
115b37003a7SShuo Chen{
116b37003a7SShuo Chen  if (isInLoopThread())
117b37003a7SShuo Chen  {
118b37003a7SShuo Chen    cb();
119b37003a7SShuo Chen  }
120b37003a7SShuo Chen  else
121b37003a7SShuo Chen  {
122b37003a7SShuo Chen    queueInLoop(cb);
123b37003a7SShuo Chen    wakeup();
124b37003a7SShuo Chen  }
125b37003a7SShuo Chen}
126b37003a7SShuo Chen
127b37003a7SShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
128b37003a7SShuo Chen{
129b37003a7SShuo Chen  {
130b37003a7SShuo Chen  MutexLockGuard lock(mutex_);
131b37003a7SShuo Chen  pendingFunctors_.push_back(cb);
132b37003a7SShuo Chen  }
133b37003a7SShuo Chen
134b37003a7SShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
135b37003a7SShuo Chen  {
136b37003a7SShuo Chen    wakeup();
137b37003a7SShuo Chen  }
138b37003a7SShuo Chen}
139b37003a7SShuo Chen
140b37003a7SShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
141b37003a7SShuo Chen{
142b37003a7SShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
143b37003a7SShuo Chen}
144b37003a7SShuo Chen
145b37003a7SShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
146b37003a7SShuo Chen{
147b37003a7SShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
148b37003a7SShuo Chen  return runAt(time, cb);
149b37003a7SShuo Chen}
150b37003a7SShuo Chen
151b37003a7SShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
152b37003a7SShuo Chen{
153b37003a7SShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
154b37003a7SShuo Chen  return timerQueue_->addTimer(cb, time, interval);
155b37003a7SShuo Chen}
156b37003a7SShuo Chen
157b37003a7SShuo Chenvoid EventLoop::updateChannel(Channel* channel)
158b37003a7SShuo Chen{
159b37003a7SShuo Chen  assert(channel->ownerLoop() == this);
160b37003a7SShuo Chen  assertInLoopThread();
161b37003a7SShuo Chen  poller_->updateChannel(channel);
162b37003a7SShuo Chen}
163b37003a7SShuo Chen
164b37003a7SShuo Chenvoid EventLoop::removeChannel(Channel* channel)
165b37003a7SShuo Chen{
166b37003a7SShuo Chen  assert(channel->ownerLoop() == this);
167b37003a7SShuo Chen  assertInLoopThread();
168b37003a7SShuo Chen  poller_->removeChannel(channel);
169b37003a7SShuo Chen}
170b37003a7SShuo Chen
171b37003a7SShuo Chenvoid EventLoop::abortNotInLoopThread()
172b37003a7SShuo Chen{
173b37003a7SShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
174b37003a7SShuo Chen            << " was created in threadId_ = " << threadId_
175b37003a7SShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
176b37003a7SShuo Chen}
177b37003a7SShuo Chen
178b37003a7SShuo Chenvoid EventLoop::wakeup()
179b37003a7SShuo Chen{
180b37003a7SShuo Chen  uint64_t one = 1;
181b37003a7SShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
182b37003a7SShuo Chen  if (n != sizeof one)
183b37003a7SShuo Chen  {
184b37003a7SShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
185b37003a7SShuo Chen  }
186b37003a7SShuo Chen}
187b37003a7SShuo Chen
188b37003a7SShuo Chenvoid EventLoop::handleRead()
189b37003a7SShuo Chen{
190b37003a7SShuo Chen  uint64_t one = 1;
191b37003a7SShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
192b37003a7SShuo Chen  if (n != sizeof one)
193b37003a7SShuo Chen  {
194b37003a7SShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
195b37003a7SShuo Chen  }
196b37003a7SShuo Chen}
197b37003a7SShuo Chen
198b37003a7SShuo Chenvoid EventLoop::doPendingFunctors()
199b37003a7SShuo Chen{
200b37003a7SShuo Chen  std::vector<Functor> functors;
201b37003a7SShuo Chen  callingPendingFunctors_ = true;
202b37003a7SShuo Chen
203b37003a7SShuo Chen  {
204b37003a7SShuo Chen  MutexLockGuard lock(mutex_);
205b37003a7SShuo Chen  functors.swap(pendingFunctors_);
206b37003a7SShuo Chen  }
207b37003a7SShuo Chen
208b37003a7SShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
209b37003a7SShuo Chen  {
210b37003a7SShuo Chen    functors[i]();
211b37003a7SShuo Chen  }
212b37003a7SShuo Chen  callingPendingFunctors_ = false;
213b37003a7SShuo Chen}
214b37003a7SShuo Chen
215