1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "EventLoop.h"
9 
10 #include "Channel.h"
11-#include "Poller.h"
12+#include "EPoller.h"
13 #include "TimerQueue.h"
14 
15 #include "logging/Logging.h"
16 
17 #include <boost/bind.hpp>
18 
19 #include <assert.h>
20 #include <signal.h>
21 #include <sys/eventfd.h>
22 
23 using namespace muduo;
24 
25 __thread EventLoop* t_loopInThisThread = 0;
26 const int kPollTimeMs = 10000;
27 
28 static int createEventfd()
29 {
30   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
31   if (evtfd < 0)
32   {
33     LOG_SYSERR << "Failed in eventfd";
34     abort();
35   }
36   return evtfd;
37 }
38 
39 class IgnoreSigPipe
40 {
41  public:
42   IgnoreSigPipe()
43   {
44     ::signal(SIGPIPE, SIG_IGN);
45   }
46 };
47 
48 IgnoreSigPipe initObj;
49 
50 EventLoop::EventLoop()
51   : looping_(false),
52     quit_(false),
53     callingPendingFunctors_(false),
54     threadId_(CurrentThread::tid()),
55-    poller_(new Poller(this)),
56+    poller_(new EPoller(this)),
57     timerQueue_(new TimerQueue(this)),
58     wakeupFd_(createEventfd()),
59     wakeupChannel_(new Channel(this, wakeupFd_))
60 {
61   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
62   if (t_loopInThisThread)
63   {
64     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
65               << " exists in this thread " << threadId_;
66   }
67   else
68   {
69     t_loopInThisThread = this;
70   }
71   wakeupChannel_->setReadCallback(
72       boost::bind(&EventLoop::handleRead, this));
73   // we are always reading the wakeupfd
74   wakeupChannel_->enableReading();
75 }
76 
77 EventLoop::~EventLoop()
78 {
79   assert(!looping_);
80   ::close(wakeupFd_);
81   t_loopInThisThread = NULL;
82 }
83 
84 void EventLoop::loop()
85 {
86   assert(!looping_);
87   assertInLoopThread();
88   looping_ = true;
89   quit_ = false;
90 
91   while (!quit_)
92   {
93     activeChannels_.clear();
94     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
95     for (ChannelList::iterator it = activeChannels_.begin();
96         it != activeChannels_.end(); ++it)
97     {
98       (*it)->handleEvent(pollReturnTime_);
99     }
100     doPendingFunctors();
101   }
102 
103   LOG_TRACE << "EventLoop " << this << " stop looping";
104   looping_ = false;
105 }
106 
107 void EventLoop::quit()
108 {
109   quit_ = true;
110   if (!isInLoopThread())
111   {
112     wakeup();
113   }
114 }
115 
116 void EventLoop::runInLoop(const Functor& cb)
117 {
118   if (isInLoopThread())
119   {
120     cb();
121   }
122   else
123   {
124     queueInLoop(cb);
125   }
126 }
127 
128 void EventLoop::queueInLoop(const Functor& cb)
129 {
130   {
131   MutexLockGuard lock(mutex_);
132   pendingFunctors_.push_back(cb);
133   }
134 
135   if (!isInLoopThread() || callingPendingFunctors_)
136   {
137     wakeup();
138   }
139 }
140 
141 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
142 {
143   return timerQueue_->addTimer(cb, time, 0.0);
144 }
145 
146 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
147 {
148   Timestamp time(addTime(Timestamp::now(), delay));
149   return runAt(time, cb);
150 }
151 
152 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
153 {
154   Timestamp time(addTime(Timestamp::now(), interval));
155   return timerQueue_->addTimer(cb, time, interval);
156 }
157 
158 void EventLoop::cancel(TimerId timerId)
159 {
160   return timerQueue_->cancel(timerId);
161 }
162 
163 void EventLoop::updateChannel(Channel* channel)
164 {
165   assert(channel->ownerLoop() == this);
166   assertInLoopThread();
167   poller_->updateChannel(channel);
168 }
169 
170 void EventLoop::removeChannel(Channel* channel)
171 {
172   assert(channel->ownerLoop() == this);
173   assertInLoopThread();
174   poller_->removeChannel(channel);
175 }
176 
177 void EventLoop::abortNotInLoopThread()
178 {
179   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
180             << " was created in threadId_ = " << threadId_
181             << ", current thread id = " <<  CurrentThread::tid();
182 }
183 
184 void EventLoop::wakeup()
185 {
186   uint64_t one = 1;
187   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
188   if (n != sizeof one)
189   {
190     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
191   }
192 }
193 
194 void EventLoop::handleRead()
195 {
196   uint64_t one = 1;
197   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
198   if (n != sizeof one)
199   {
200     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
201   }
202 }
203 
204 void EventLoop::doPendingFunctors()
205 {
206   std::vector<Functor> functors;
207   callingPendingFunctors_ = true;
208 
209   {
210   MutexLockGuard lock(mutex_);
211   functors.swap(pendingFunctors_);
212   }
213 
214   for (size_t i = 0; i < functors.size(); ++i)
215   {
216     functors[i]();
217   }
218   callingPendingFunctors_ = false;
219 }
220 
221