1566406ccSShuo Chen // excerpts from http://code.google.com/p/muduo/
2566406ccSShuo Chen //
3566406ccSShuo Chen // Use of this source code is governed by a BSD-style license
4566406ccSShuo Chen // that can be found in the License file.
5566406ccSShuo Chen //
6566406ccSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
7566406ccSShuo Chen 
8566406ccSShuo Chen #include "EventLoop.h"
9566406ccSShuo Chen 
10566406ccSShuo Chen #include "Channel.h"
11566406ccSShuo Chen #include "Poller.h"
12566406ccSShuo Chen #include "TimerQueue.h"
13566406ccSShuo Chen 
14566406ccSShuo Chen #include "logging/Logging.h"
15566406ccSShuo Chen 
16566406ccSShuo Chen+#include <boost/bind.hpp>
17566406ccSShuo Chen+
18566406ccSShuo Chen #include <assert.h>
19566406ccSShuo Chen+#include <sys/eventfd.h>
20566406ccSShuo Chen 
21566406ccSShuo Chen using namespace muduo;
22566406ccSShuo Chen 
23566406ccSShuo Chen __thread EventLoop* t_loopInThisThread = 0;
24566406ccSShuo Chen const int kPollTimeMs = 10000;
25566406ccSShuo Chen 
26566406ccSShuo Chen+static int createEventfd()
27566406ccSShuo Chen+{
28566406ccSShuo Chen+  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29566406ccSShuo Chen+  if (evtfd < 0)
30566406ccSShuo Chen+  {
31566406ccSShuo Chen+    LOG_SYSERR << "Failed in eventfd";
32566406ccSShuo Chen+    abort();
33566406ccSShuo Chen+  }
34566406ccSShuo Chen+  return evtfd;
35566406ccSShuo Chen+}
36566406ccSShuo Chen+
37566406ccSShuo Chen EventLoop::EventLoop()
38566406ccSShuo Chen   : looping_(false),
39566406ccSShuo Chen     quit_(false),
40566406ccSShuo Chen+    callingPendingFunctors_(false),
41566406ccSShuo Chen     threadId_(CurrentThread::tid()),
42566406ccSShuo Chen     poller_(new Poller(this)),
43566406ccSShuo Chen     timerQueue_(new TimerQueue(this)),
44566406ccSShuo Chen+    wakeupFd_(createEventfd()),
45566406ccSShuo Chen+    wakeupChannel_(new Channel(this, wakeupFd_))
46566406ccSShuo Chen {
47566406ccSShuo Chen   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48566406ccSShuo Chen   if (t_loopInThisThread)
49566406ccSShuo Chen   {
50566406ccSShuo Chen     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51566406ccSShuo Chen               << " exists in this thread " << threadId_;
52566406ccSShuo Chen   }
53566406ccSShuo Chen   else
54566406ccSShuo Chen   {
55566406ccSShuo Chen     t_loopInThisThread = this;
56566406ccSShuo Chen   }
57566406ccSShuo Chen+  wakeupChannel_->setReadCallback(
58566406ccSShuo Chen+      boost::bind(&EventLoop::handleRead, this));
59566406ccSShuo Chen+  // we are always reading the wakeupfd
60566406ccSShuo Chen+  wakeupChannel_->enableReading();
61566406ccSShuo Chen }
62566406ccSShuo Chen 
63566406ccSShuo Chen EventLoop::~EventLoop()
64566406ccSShuo Chen {
65566406ccSShuo Chen   assert(!looping_);
66566406ccSShuo Chen+  ::close(wakeupFd_);
67566406ccSShuo Chen   t_loopInThisThread = NULL;
68566406ccSShuo Chen }
69566406ccSShuo Chen 
70566406ccSShuo Chen void EventLoop::loop()
71566406ccSShuo Chen {
72566406ccSShuo Chen   assert(!looping_);
73566406ccSShuo Chen   assertInLoopThread();
74566406ccSShuo Chen   looping_ = true;
75566406ccSShuo Chen   quit_ = false;
76566406ccSShuo Chen 
77566406ccSShuo Chen   while (!quit_)
78566406ccSShuo Chen   {
79566406ccSShuo Chen     activeChannels_.clear();
80566406ccSShuo Chen     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81566406ccSShuo Chen     for (ChannelList::iterator it = activeChannels_.begin();
82566406ccSShuo Chen         it != activeChannels_.end(); ++it)
83566406ccSShuo Chen     {
84566406ccSShuo Chen       (*it)->handleEvent();
85566406ccSShuo Chen     }
86566406ccSShuo Chen+    doPendingFunctors();
87566406ccSShuo Chen   }
88566406ccSShuo Chen 
89566406ccSShuo Chen   LOG_TRACE << "EventLoop " << this << " stop looping";
90566406ccSShuo Chen   looping_ = false;
91566406ccSShuo Chen }
92566406ccSShuo Chen 
93566406ccSShuo Chen void EventLoop::quit()
94566406ccSShuo Chen {
95566406ccSShuo Chen   quit_ = true;
96566406ccSShuo Chen+  if (!isInLoopThread())
97566406ccSShuo Chen+  {
98566406ccSShuo Chen+    wakeup();
99566406ccSShuo Chen+  }
100b4a5ce52SShuo Chen }
101b4a5ce52SShuo Chen 
102566406ccSShuo Chen+void EventLoop::runInLoop(const Functor& cb)
103566406ccSShuo Chen+{
104566406ccSShuo Chen+  if (isInLoopThread())
105566406ccSShuo Chen+  {
106566406ccSShuo Chen+    cb();
107566406ccSShuo Chen+  }
108566406ccSShuo Chen+  else
109566406ccSShuo Chen+  {
110566406ccSShuo Chen+    queueInLoop(cb);
111566406ccSShuo Chen+  }
112566406ccSShuo Chen+}
113566406ccSShuo Chen+
114566406ccSShuo Chen+void EventLoop::queueInLoop(const Functor& cb)
115566406ccSShuo Chen+{
116566406ccSShuo Chen+  {
117566406ccSShuo Chen+  MutexLockGuard lock(mutex_);
118566406ccSShuo Chen+  pendingFunctors_.push_back(cb);
119566406ccSShuo Chen+  }
120566406ccSShuo Chen+
1210f776063SShuo Chen+  if (!isInLoopThread() || callingPendingFunctors_)
122566406ccSShuo Chen+  {
123566406ccSShuo Chen+    wakeup();
124566406ccSShuo Chen+  }
125b4a5ce52SShuo Chen+}
126566406ccSShuo Chen 
127566406ccSShuo Chen TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
128566406ccSShuo Chen {
129566406ccSShuo Chen   return timerQueue_->addTimer(cb, time, 0.0);
130566406ccSShuo Chen }
131566406ccSShuo Chen 
132566406ccSShuo Chen TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
133566406ccSShuo Chen {
134566406ccSShuo Chen   Timestamp time(addTime(Timestamp::now(), delay));
135566406ccSShuo Chen   return runAt(time, cb);
136566406ccSShuo Chen }
137566406ccSShuo Chen 
138566406ccSShuo Chen TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
139566406ccSShuo Chen {
140566406ccSShuo Chen   Timestamp time(addTime(Timestamp::now(), interval));
141566406ccSShuo Chen   return timerQueue_->addTimer(cb, time, interval);
142566406ccSShuo Chen }
143566406ccSShuo Chen 
144566406ccSShuo Chen void EventLoop::updateChannel(Channel* channel)
145566406ccSShuo Chen {
146566406ccSShuo Chen   assert(channel->ownerLoop() == this);
147566406ccSShuo Chen   assertInLoopThread();
148566406ccSShuo Chen   poller_->updateChannel(channel);
149566406ccSShuo Chen }
150566406ccSShuo Chen 
151566406ccSShuo Chen void EventLoop::abortNotInLoopThread()
152566406ccSShuo Chen {
153566406ccSShuo Chen   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
154566406ccSShuo Chen             << " was created in threadId_ = " << threadId_
155566406ccSShuo Chen             << ", current thread id = " <<  CurrentThread::tid();
156566406ccSShuo Chen }
157566406ccSShuo Chen 
158566406ccSShuo Chen+void EventLoop::wakeup()
159566406ccSShuo Chen+{
160566406ccSShuo Chen+  uint64_t one = 1;
161566406ccSShuo Chen+  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
162566406ccSShuo Chen+  if (n != sizeof one)
163566406ccSShuo Chen+  {
164566406ccSShuo Chen+    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
165566406ccSShuo Chen+  }
166566406ccSShuo Chen+}
167566406ccSShuo Chen+
168566406ccSShuo Chen+void EventLoop::handleRead()
169566406ccSShuo Chen+{
170566406ccSShuo Chen+  uint64_t one = 1;
171566406ccSShuo Chen+  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
172566406ccSShuo Chen+  if (n != sizeof one)
173566406ccSShuo Chen+  {
174566406ccSShuo Chen+    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
175566406ccSShuo Chen+  }
176566406ccSShuo Chen+}
177566406ccSShuo Chen+
178566406ccSShuo Chen+void EventLoop::doPendingFunctors()
179566406ccSShuo Chen+{
180566406ccSShuo Chen+  std::vector<Functor> functors;
181566406ccSShuo Chen+  callingPendingFunctors_ = true;
182566406ccSShuo Chen+
183566406ccSShuo Chen+  {
184566406ccSShuo Chen+  MutexLockGuard lock(mutex_);
185566406ccSShuo Chen+  functors.swap(pendingFunctors_);
186566406ccSShuo Chen+  }
187566406ccSShuo Chen+
188566406ccSShuo Chen+  for (size_t i = 0; i < functors.size(); ++i)
189566406ccSShuo Chen+  {
190566406ccSShuo Chen+    functors[i]();
191566406ccSShuo Chen+  }
192566406ccSShuo Chen+  callingPendingFunctors_ = false;
193566406ccSShuo Chen+}
194566406ccSShuo Chen+
195