EventLoop.cc revision e44bbe65
1cc7f415cSShuo Chen// excerpts from http://code.google.com/p/muduo/
2cc7f415cSShuo Chen//
3cc7f415cSShuo Chen// Use of this source code is governed by a BSD-style license
4cc7f415cSShuo Chen// that can be found in the License file.
5cc7f415cSShuo Chen//
6cc7f415cSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7cc7f415cSShuo Chen
8cc7f415cSShuo Chen#include "EventLoop.h"
9cc7f415cSShuo Chen
10cc7f415cSShuo Chen#include "Channel.h"
11cc7f415cSShuo Chen#include "Poller.h"
12cc7f415cSShuo Chen#include "TimerQueue.h"
13cc7f415cSShuo Chen
14cc7f415cSShuo Chen#include "logging/Logging.h"
15cc7f415cSShuo Chen
16c461658bSShuo Chen#include <boost/bind.hpp>
17c461658bSShuo Chen
18cc7f415cSShuo Chen#include <assert.h>
19c461658bSShuo Chen#include <sys/eventfd.h>
20cc7f415cSShuo Chen
21cc7f415cSShuo Chenusing namespace muduo;
22cc7f415cSShuo Chen
23cc7f415cSShuo Chen__thread EventLoop* t_loopInThisThread = 0;
24cc7f415cSShuo Chenconst int kPollTimeMs = 10000;
25cc7f415cSShuo Chen
26c461658bSShuo Chenstatic int createEventfd()
27c461658bSShuo Chen{
28c461658bSShuo Chen  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29c461658bSShuo Chen  if (evtfd < 0)
30c461658bSShuo Chen  {
31c461658bSShuo Chen    LOG_SYSERR << "Failed in eventfd";
32c461658bSShuo Chen    abort();
33c461658bSShuo Chen  }
34c461658bSShuo Chen  return evtfd;
35c461658bSShuo Chen}
36c461658bSShuo Chen
37cc7f415cSShuo ChenEventLoop::EventLoop()
38cc7f415cSShuo Chen  : looping_(false),
39cc7f415cSShuo Chen    quit_(false),
40c461658bSShuo Chen    callingPendingFunctors_(false),
41cc7f415cSShuo Chen    threadId_(CurrentThread::tid()),
42cc7f415cSShuo Chen    poller_(new Poller(this)),
43c461658bSShuo Chen    timerQueue_(new TimerQueue(this)),
44c461658bSShuo Chen    wakeupFd_(createEventfd()),
45c461658bSShuo Chen    wakeupChannel_(new Channel(this, wakeupFd_))
46cc7f415cSShuo Chen{
47cc7f415cSShuo Chen  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48cc7f415cSShuo Chen  if (t_loopInThisThread)
49cc7f415cSShuo Chen  {
50cc7f415cSShuo Chen    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51cc7f415cSShuo Chen              << " exists in this thread " << threadId_;
52cc7f415cSShuo Chen  }
53cc7f415cSShuo Chen  else
54cc7f415cSShuo Chen  {
55cc7f415cSShuo Chen    t_loopInThisThread = this;
56cc7f415cSShuo Chen  }
57c461658bSShuo Chen  wakeupChannel_->setReadCallback(
58c461658bSShuo Chen      boost::bind(&EventLoop::handleRead, this));
59c461658bSShuo Chen  // we are always reading the wakeupfd
60c461658bSShuo Chen  wakeupChannel_->enableReading();
61cc7f415cSShuo Chen}
62cc7f415cSShuo Chen
63cc7f415cSShuo ChenEventLoop::~EventLoop()
64cc7f415cSShuo Chen{
65cc7f415cSShuo Chen  assert(!looping_);
66c461658bSShuo Chen  ::close(wakeupFd_);
67cc7f415cSShuo Chen  t_loopInThisThread = NULL;
68cc7f415cSShuo Chen}
69cc7f415cSShuo Chen
70cc7f415cSShuo Chenvoid EventLoop::loop()
71cc7f415cSShuo Chen{
72cc7f415cSShuo Chen  assert(!looping_);
73cc7f415cSShuo Chen  assertInLoopThread();
74cc7f415cSShuo Chen  looping_ = true;
75e44bbe65SShuo Chen  quit_ = false;
76e44bbe65SShuo Chen
77cc7f415cSShuo Chen  while (!quit_)
78cc7f415cSShuo Chen  {
79cc7f415cSShuo Chen    activeChannels_.clear();
80cc7f415cSShuo Chen    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81cc7f415cSShuo Chen    for (ChannelList::iterator it = activeChannels_.begin();
82cc7f415cSShuo Chen        it != activeChannels_.end(); ++it)
83cc7f415cSShuo Chen    {
84cc7f415cSShuo Chen      (*it)->handleEvent();
85cc7f415cSShuo Chen    }
86c461658bSShuo Chen    doPendingFunctors();
87cc7f415cSShuo Chen  }
88cc7f415cSShuo Chen
89cc7f415cSShuo Chen  LOG_TRACE << "EventLoop " << this << " stop looping";
90cc7f415cSShuo Chen  looping_ = false;
91cc7f415cSShuo Chen}
92cc7f415cSShuo Chen
93cc7f415cSShuo Chenvoid EventLoop::quit()
94cc7f415cSShuo Chen{
95cc7f415cSShuo Chen  quit_ = true;
96c461658bSShuo Chen  if (!isInLoopThread())
97c461658bSShuo Chen  {
98c461658bSShuo Chen    wakeup();
99c461658bSShuo Chen  }
100c461658bSShuo Chen}
101c461658bSShuo Chen
102c461658bSShuo Chenvoid EventLoop::runInLoop(const Functor& cb)
103c461658bSShuo Chen{
104c461658bSShuo Chen  if (isInLoopThread())
105c461658bSShuo Chen  {
106c461658bSShuo Chen    cb();
107c461658bSShuo Chen  }
108c461658bSShuo Chen  else
109c461658bSShuo Chen  {
110c461658bSShuo Chen    queueInLoop(cb);
111c461658bSShuo Chen    wakeup();
112c461658bSShuo Chen  }
113c461658bSShuo Chen}
114c461658bSShuo Chen
115c461658bSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb)
116c461658bSShuo Chen{
117c461658bSShuo Chen  {
118c461658bSShuo Chen  MutexLockGuard lock(mutex_);
119c461658bSShuo Chen  pendingFunctors_.push_back(cb);
120c461658bSShuo Chen  }
121c461658bSShuo Chen
122c461658bSShuo Chen  if (isInLoopThread() && callingPendingFunctors_)
123c461658bSShuo Chen  {
124c461658bSShuo Chen    wakeup();
125c461658bSShuo Chen  }
126cc7f415cSShuo Chen}
127cc7f415cSShuo Chen
128cc7f415cSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
129cc7f415cSShuo Chen{
130cc7f415cSShuo Chen  return timerQueue_->addTimer(cb, time, 0.0);
131cc7f415cSShuo Chen}
132cc7f415cSShuo Chen
133cc7f415cSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
134cc7f415cSShuo Chen{
135cc7f415cSShuo Chen  Timestamp time(addTime(Timestamp::now(), delay));
136cc7f415cSShuo Chen  return runAt(time, cb);
137cc7f415cSShuo Chen}
138cc7f415cSShuo Chen
139cc7f415cSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
140cc7f415cSShuo Chen{
141cc7f415cSShuo Chen  Timestamp time(addTime(Timestamp::now(), interval));
142cc7f415cSShuo Chen  return timerQueue_->addTimer(cb, time, interval);
143cc7f415cSShuo Chen}
144cc7f415cSShuo Chen
145cc7f415cSShuo Chenvoid EventLoop::updateChannel(Channel* channel)
146cc7f415cSShuo Chen{
147cc7f415cSShuo Chen  assert(channel->ownerLoop() == this);
148cc7f415cSShuo Chen  assertInLoopThread();
149cc7f415cSShuo Chen  poller_->updateChannel(channel);
150cc7f415cSShuo Chen}
151cc7f415cSShuo Chen
152cc7f415cSShuo Chenvoid EventLoop::abortNotInLoopThread()
153cc7f415cSShuo Chen{
154cc7f415cSShuo Chen  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
155cc7f415cSShuo Chen            << " was created in threadId_ = " << threadId_
156cc7f415cSShuo Chen            << ", current thread id = " <<  CurrentThread::tid();
157cc7f415cSShuo Chen}
158cc7f415cSShuo Chen
159c461658bSShuo Chenvoid EventLoop::wakeup()
160c461658bSShuo Chen{
161c461658bSShuo Chen  uint64_t one = 1;
162c461658bSShuo Chen  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
163c461658bSShuo Chen  if (n != sizeof one)
164c461658bSShuo Chen  {
165c461658bSShuo Chen    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
166c461658bSShuo Chen  }
167c461658bSShuo Chen}
168c461658bSShuo Chen
169c461658bSShuo Chenvoid EventLoop::handleRead()
170c461658bSShuo Chen{
171c461658bSShuo Chen  uint64_t one = 1;
172c461658bSShuo Chen  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
173c461658bSShuo Chen  if (n != sizeof one)
174c461658bSShuo Chen  {
175c461658bSShuo Chen    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
176c461658bSShuo Chen  }
177c461658bSShuo Chen}
178c461658bSShuo Chen
179c461658bSShuo Chenvoid EventLoop::doPendingFunctors()
180c461658bSShuo Chen{
181c461658bSShuo Chen  std::vector<Functor> functors;
182c461658bSShuo Chen  callingPendingFunctors_ = true;
183c461658bSShuo Chen
184c461658bSShuo Chen  {
185c461658bSShuo Chen  MutexLockGuard lock(mutex_);
186c461658bSShuo Chen  functors.swap(pendingFunctors_);
187c461658bSShuo Chen  }
188c461658bSShuo Chen
189c461658bSShuo Chen  for (size_t i = 0; i < functors.size(); ++i)
190c461658bSShuo Chen  {
191c461658bSShuo Chen    functors[i]();
192c461658bSShuo Chen  }
193c461658bSShuo Chen  callingPendingFunctors_ = false;
194c461658bSShuo Chen}
195c461658bSShuo Chen
196