1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "EventLoop.h"
9 
10 #include "Channel.h"
11 #include "Poller.h"
12 #include "TimerQueue.h"
13 
14 #include "logging/Logging.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <assert.h>
19 #include <signal.h>
20 #include <sys/eventfd.h>
21 
22 using namespace muduo;
23 
24 __thread EventLoop* t_loopInThisThread = 0;
25 const int kPollTimeMs = 10000;
26 
27 static int createEventfd()
28 {
29   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30   if (evtfd < 0)
31   {
32     LOG_SYSERR << "Failed in eventfd";
33     abort();
34   }
35   return evtfd;
36 }
37 
38 class IgnoreSigPipe
39 {
40  public:
41   IgnoreSigPipe()
42   {
43     ::signal(SIGPIPE, SIG_IGN);
44   }
45 };
46 
47 IgnoreSigPipe initObj;
48 
49 EventLoop::EventLoop()
50   : looping_(false),
51     quit_(false),
52     callingPendingFunctors_(false),
53     threadId_(CurrentThread::tid()),
54     poller_(new Poller(this)),
55     timerQueue_(new TimerQueue(this)),
56     wakeupFd_(createEventfd()),
57     wakeupChannel_(new Channel(this, wakeupFd_))
58 {
59   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60   if (t_loopInThisThread)
61   {
62     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63               << " exists in this thread " << threadId_;
64   }
65   else
66   {
67     t_loopInThisThread = this;
68   }
69   wakeupChannel_->setReadCallback(
70       boost::bind(&EventLoop::handleRead, this));
71   // we are always reading the wakeupfd
72   wakeupChannel_->enableReading();
73 }
74 
75 EventLoop::~EventLoop()
76 {
77   assert(!looping_);
78   ::close(wakeupFd_);
79   t_loopInThisThread = NULL;
80 }
81 
82 void EventLoop::loop()
83 {
84   assert(!looping_);
85   assertInLoopThread();
86   looping_ = true;
87   quit_ = false;
88 
89   while (!quit_)
90   {
91     activeChannels_.clear();
92     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93     for (ChannelList::iterator it = activeChannels_.begin();
94         it != activeChannels_.end(); ++it)
95     {
96       (*it)->handleEvent(pollReturnTime_);
97     }
98     doPendingFunctors();
99   }
100 
101   LOG_TRACE << "EventLoop " << this << " stop looping";
102   looping_ = false;
103 }
104 
105 void EventLoop::quit()
106 {
107   quit_ = true;
108   if (!isInLoopThread())
109   {
110     wakeup();
111   }
112 }
113 
114 void EventLoop::runInLoop(const Functor& cb)
115 {
116   if (isInLoopThread())
117   {
118     cb();
119   }
120   else
121   {
122     queueInLoop(cb);
123   }
124 }
125 
126 void EventLoop::queueInLoop(const Functor& cb)
127 {
128   {
129   MutexLockGuard lock(mutex_);
130   pendingFunctors_.push_back(cb);
131   }
132 
133   if (!isInLoopThread() || callingPendingFunctors_)
134   {
135     wakeup();
136   }
137 }
138 
139 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140 {
141   return timerQueue_->addTimer(cb, time, 0.0);
142 }
143 
144 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145 {
146   Timestamp time(addTime(Timestamp::now(), delay));
147   return runAt(time, cb);
148 }
149 
150 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151 {
152   Timestamp time(addTime(Timestamp::now(), interval));
153   return timerQueue_->addTimer(cb, time, interval);
154 }
155 
156+void EventLoop::cancel(TimerId timerId)
157+{
158+  return timerQueue_->cancel(timerId);
159+}
160+
161 void EventLoop::updateChannel(Channel* channel)
162 {
163   assert(channel->ownerLoop() == this);
164   assertInLoopThread();
165   poller_->updateChannel(channel);
166 }
167 
168 void EventLoop::removeChannel(Channel* channel)
169 {
170   assert(channel->ownerLoop() == this);
171   assertInLoopThread();
172   poller_->removeChannel(channel);
173 }
174 
175 void EventLoop::abortNotInLoopThread()
176 {
177   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
178             << " was created in threadId_ = " << threadId_
179             << ", current thread id = " <<  CurrentThread::tid();
180 }
181 
182 void EventLoop::wakeup()
183 {
184   uint64_t one = 1;
185   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
186   if (n != sizeof one)
187   {
188     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
189   }
190 }
191 
192 void EventLoop::handleRead()
193 {
194   uint64_t one = 1;
195   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
196   if (n != sizeof one)
197   {
198     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
199   }
200 }
201 
202 void EventLoop::doPendingFunctors()
203 {
204   std::vector<Functor> functors;
205   callingPendingFunctors_ = true;
206 
207   {
208   MutexLockGuard lock(mutex_);
209   functors.swap(pendingFunctors_);
210   }
211 
212   for (size_t i = 0; i < functors.size(); ++i)
213   {
214     functors[i]();
215   }
216   callingPendingFunctors_ = false;
217 }
218 
219