1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "EventLoop.h"
9 
10 #include "Channel.h"
11 #include "Poller.h"
12 #include "TimerQueue.h"
13 
14 #include "logging/Logging.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <assert.h>
19 #include <sys/eventfd.h>
20 
21 using namespace muduo;
22 
23 __thread EventLoop* t_loopInThisThread = 0;
24 const int kPollTimeMs = 10000;
25 
26 static int createEventfd()
27 {
28   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
29   if (evtfd < 0)
30   {
31     LOG_SYSERR << "Failed in eventfd";
32     abort();
33   }
34   return evtfd;
35 }
36 
37 EventLoop::EventLoop()
38   : looping_(false),
39     quit_(false),
40     callingPendingFunctors_(false),
41     threadId_(CurrentThread::tid()),
42     poller_(new Poller(this)),
43     timerQueue_(new TimerQueue(this)),
44     wakeupFd_(createEventfd()),
45     wakeupChannel_(new Channel(this, wakeupFd_))
46 {
47   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
48   if (t_loopInThisThread)
49   {
50     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
51               << " exists in this thread " << threadId_;
52   }
53   else
54   {
55     t_loopInThisThread = this;
56   }
57   wakeupChannel_->setReadCallback(
58       boost::bind(&EventLoop::handleRead, this));
59   // we are always reading the wakeupfd
60   wakeupChannel_->enableReading();
61 }
62 
63 EventLoop::~EventLoop()
64 {
65   assert(!looping_);
66   ::close(wakeupFd_);
67   t_loopInThisThread = NULL;
68 }
69 
70 void EventLoop::loop()
71 {
72   assert(!looping_);
73   assertInLoopThread();
74   looping_ = true;
75   quit_ = false;
76 
77   while (!quit_)
78   {
79     activeChannels_.clear();
80     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
81     for (ChannelList::iterator it = activeChannels_.begin();
82         it != activeChannels_.end(); ++it)
83     {
84!      (*it)->handleEvent(pollReturnTime_);
85     }
86     doPendingFunctors();
87   }
88 
89   LOG_TRACE << "EventLoop " << this << " stop looping";
90   looping_ = false;
91 }
92 
93 void EventLoop::quit()
94 {
95   quit_ = true;
96   if (!isInLoopThread())
97   {
98     wakeup();
99   }
100 }
101 
102 void EventLoop::runInLoop(const Functor& cb)
103 {
104   if (isInLoopThread())
105   {
106     cb();
107   }
108   else
109   {
110     queueInLoop(cb);
111   }
112 }
113 
114 void EventLoop::queueInLoop(const Functor& cb)
115 {
116   {
117   MutexLockGuard lock(mutex_);
118   pendingFunctors_.push_back(cb);
119   }
120 
121   if (!isInLoopThread() || callingPendingFunctors_)
122   {
123     wakeup();
124   }
125 }
126 
127 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
128 {
129   return timerQueue_->addTimer(cb, time, 0.0);
130 }
131 
132 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
133 {
134   Timestamp time(addTime(Timestamp::now(), delay));
135   return runAt(time, cb);
136 }
137 
138 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
139 {
140   Timestamp time(addTime(Timestamp::now(), interval));
141   return timerQueue_->addTimer(cb, time, interval);
142 }
143 
144 void EventLoop::updateChannel(Channel* channel)
145 {
146   assert(channel->ownerLoop() == this);
147   assertInLoopThread();
148   poller_->updateChannel(channel);
149 }
150 
151 void EventLoop::removeChannel(Channel* channel)
152 {
153   assert(channel->ownerLoop() == this);
154   assertInLoopThread();
155   poller_->removeChannel(channel);
156 }
157 
158 void EventLoop::abortNotInLoopThread()
159 {
160   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
161             << " was created in threadId_ = " << threadId_
162             << ", current thread id = " <<  CurrentThread::tid();
163 }
164 
165 void EventLoop::wakeup()
166 {
167   uint64_t one = 1;
168   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
169   if (n != sizeof one)
170   {
171     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
172   }
173 }
174 
175 void EventLoop::handleRead()
176 {
177   uint64_t one = 1;
178   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
179   if (n != sizeof one)
180   {
181     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
182   }
183 }
184 
185 void EventLoop::doPendingFunctors()
186 {
187   std::vector<Functor> functors;
188   callingPendingFunctors_ = true;
189 
190   {
191   MutexLockGuard lock(mutex_);
192   functors.swap(pendingFunctors_);
193   }
194 
195   for (size_t i = 0; i < functors.size(); ++i)
196   {
197     functors[i]();
198   }
199   callingPendingFunctors_ = false;
200 }
201 
202