1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "EventLoop.h"
9 
10 #include "Channel.h"
11 #include "Poller.h"
12 #include "TimerQueue.h"
13 
14 #include "logging/Logging.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <assert.h>
19+#include <signal.h>
20 #include <sys/eventfd.h>
21 
22 using namespace muduo;
23 
24 __thread EventLoop* t_loopInThisThread = 0;
25 const int kPollTimeMs = 10000;
26 
27 static int createEventfd()
28 {
29   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
30   if (evtfd < 0)
31   {
32     LOG_SYSERR << "Failed in eventfd";
33     abort();
34   }
35   return evtfd;
36 }
37 
38+class IgnoreSigPipe
39+{
40+ public:
41+  IgnoreSigPipe()
42+  {
43+    ::signal(SIGPIPE, SIG_IGN);
44+  }
45+};
46+
47+IgnoreSigPipe initObj;
48+
49 EventLoop::EventLoop()
50   : looping_(false),
51     quit_(false),
52     callingPendingFunctors_(false),
53     threadId_(CurrentThread::tid()),
54     poller_(new Poller(this)),
55     timerQueue_(new TimerQueue(this)),
56     wakeupFd_(createEventfd()),
57     wakeupChannel_(new Channel(this, wakeupFd_))
58 {
59   LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
60   if (t_loopInThisThread)
61   {
62     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
63               << " exists in this thread " << threadId_;
64   }
65   else
66   {
67     t_loopInThisThread = this;
68   }
69   wakeupChannel_->setReadCallback(
70       boost::bind(&EventLoop::handleRead, this));
71   // we are always reading the wakeupfd
72   wakeupChannel_->enableReading();
73 }
74 
75 EventLoop::~EventLoop()
76 {
77   assert(!looping_);
78   ::close(wakeupFd_);
79   t_loopInThisThread = NULL;
80 }
81 
82 void EventLoop::loop()
83 {
84   assert(!looping_);
85   assertInLoopThread();
86   looping_ = true;
87   quit_ = false;
88 
89   while (!quit_)
90   {
91     activeChannels_.clear();
92     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
93     for (ChannelList::iterator it = activeChannels_.begin();
94         it != activeChannels_.end(); ++it)
95     {
96       (*it)->handleEvent(pollReturnTime_);
97     }
98     doPendingFunctors();
99   }
100 
101   LOG_TRACE << "EventLoop " << this << " stop looping";
102   looping_ = false;
103 }
104 
105 void EventLoop::quit()
106 {
107   quit_ = true;
108   if (!isInLoopThread())
109   {
110     wakeup();
111   }
112 }
113 
114 void EventLoop::runInLoop(const Functor& cb)
115 {
116   if (isInLoopThread())
117   {
118     cb();
119   }
120   else
121   {
122     queueInLoop(cb);
123   }
124 }
125 
126 void EventLoop::queueInLoop(const Functor& cb)
127 {
128   {
129   MutexLockGuard lock(mutex_);
130   pendingFunctors_.push_back(cb);
131   }
132 
133   if (!isInLoopThread() || callingPendingFunctors_)
134   {
135     wakeup();
136   }
137 }
138 
139 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
140 {
141   return timerQueue_->addTimer(cb, time, 0.0);
142 }
143 
144 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
145 {
146   Timestamp time(addTime(Timestamp::now(), delay));
147   return runAt(time, cb);
148 }
149 
150 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
151 {
152   Timestamp time(addTime(Timestamp::now(), interval));
153   return timerQueue_->addTimer(cb, time, interval);
154 }
155 
156 void EventLoop::updateChannel(Channel* channel)
157 {
158   assert(channel->ownerLoop() == this);
159   assertInLoopThread();
160   poller_->updateChannel(channel);
161 }
162 
163 void EventLoop::removeChannel(Channel* channel)
164 {
165   assert(channel->ownerLoop() == this);
166   assertInLoopThread();
167   poller_->removeChannel(channel);
168 }
169 
170 void EventLoop::abortNotInLoopThread()
171 {
172   LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
173             << " was created in threadId_ = " << threadId_
174             << ", current thread id = " <<  CurrentThread::tid();
175 }
176 
177 void EventLoop::wakeup()
178 {
179   uint64_t one = 1;
180   ssize_t n = ::write(wakeupFd_, &one, sizeof one);
181   if (n != sizeof one)
182   {
183     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
184   }
185 }
186 
187 void EventLoop::handleRead()
188 {
189   uint64_t one = 1;
190   ssize_t n = ::read(wakeupFd_, &one, sizeof one);
191   if (n != sizeof one)
192   {
193     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
194   }
195 }
196 
197 void EventLoop::doPendingFunctors()
198 {
199   std::vector<Functor> functors;
200   callingPendingFunctors_ = true;
201 
202   {
203   MutexLockGuard lock(mutex_);
204   functors.swap(pendingFunctors_);
205   }
206 
207   for (size_t i = 0; i < functors.size(); ++i)
208   {
209     functors[i]();
210   }
211   callingPendingFunctors_ = false;
212 }
213 
214