19a1e991dSShuo Chen// excerpts from http://code.google.com/p/muduo/ 29a1e991dSShuo Chen// 39a1e991dSShuo Chen// Use of this source code is governed by a BSD-style license 49a1e991dSShuo Chen// that can be found in the License file. 59a1e991dSShuo Chen// 69a1e991dSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 79a1e991dSShuo Chen 89a1e991dSShuo Chen#include "EventLoop.h" 99a1e991dSShuo Chen 109a1e991dSShuo Chen#include "Channel.h" 119a1e991dSShuo Chen#include "Poller.h" 129a1e991dSShuo Chen#include "TimerQueue.h" 139a1e991dSShuo Chen 149a1e991dSShuo Chen#include "logging/Logging.h" 159a1e991dSShuo Chen 169a1e991dSShuo Chen#include <boost/bind.hpp> 179a1e991dSShuo Chen 189a1e991dSShuo Chen#include <assert.h> 199a1e991dSShuo Chen#include <sys/eventfd.h> 209a1e991dSShuo Chen 219a1e991dSShuo Chenusing namespace muduo; 229a1e991dSShuo Chen 239a1e991dSShuo Chen__thread EventLoop* t_loopInThisThread = 0; 249a1e991dSShuo Chenconst int kPollTimeMs = 10000; 259a1e991dSShuo Chen 269a1e991dSShuo Chenstatic int createEventfd() 279a1e991dSShuo Chen{ 289a1e991dSShuo Chen int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 299a1e991dSShuo Chen if (evtfd < 0) 309a1e991dSShuo Chen { 319a1e991dSShuo Chen LOG_SYSERR << "Failed in eventfd"; 329a1e991dSShuo Chen abort(); 339a1e991dSShuo Chen } 349a1e991dSShuo Chen return evtfd; 359a1e991dSShuo Chen} 369a1e991dSShuo Chen 379a1e991dSShuo ChenEventLoop::EventLoop() 389a1e991dSShuo Chen : looping_(false), 399a1e991dSShuo Chen quit_(false), 409a1e991dSShuo Chen callingPendingFunctors_(false), 419a1e991dSShuo Chen threadId_(CurrentThread::tid()), 429a1e991dSShuo Chen poller_(new Poller(this)), 439a1e991dSShuo Chen timerQueue_(new TimerQueue(this)), 449a1e991dSShuo Chen wakeupFd_(createEventfd()), 459a1e991dSShuo Chen wakeupChannel_(new Channel(this, wakeupFd_)) 469a1e991dSShuo Chen{ 479a1e991dSShuo Chen LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; 489a1e991dSShuo Chen if (t_loopInThisThread) 499a1e991dSShuo Chen { 509a1e991dSShuo Chen LOG_FATAL << "Another EventLoop " << t_loopInThisThread 519a1e991dSShuo Chen << " exists in this thread " << threadId_; 529a1e991dSShuo Chen } 539a1e991dSShuo Chen else 549a1e991dSShuo Chen { 559a1e991dSShuo Chen t_loopInThisThread = this; 569a1e991dSShuo Chen } 579a1e991dSShuo Chen wakeupChannel_->setReadCallback( 589a1e991dSShuo Chen boost::bind(&EventLoop::handleRead, this)); 599a1e991dSShuo Chen // we are always reading the wakeupfd 609a1e991dSShuo Chen wakeupChannel_->enableReading(); 619a1e991dSShuo Chen} 629a1e991dSShuo Chen 639a1e991dSShuo ChenEventLoop::~EventLoop() 649a1e991dSShuo Chen{ 659a1e991dSShuo Chen assert(!looping_); 669a1e991dSShuo Chen ::close(wakeupFd_); 679a1e991dSShuo Chen t_loopInThisThread = NULL; 689a1e991dSShuo Chen} 699a1e991dSShuo Chen 709a1e991dSShuo Chenvoid EventLoop::loop() 719a1e991dSShuo Chen{ 729a1e991dSShuo Chen assert(!looping_); 739a1e991dSShuo Chen assertInLoopThread(); 749a1e991dSShuo Chen looping_ = true; 759a1e991dSShuo Chen quit_ = false; 769a1e991dSShuo Chen 779a1e991dSShuo Chen while (!quit_) 789a1e991dSShuo Chen { 799a1e991dSShuo Chen activeChannels_.clear(); 809a1e991dSShuo Chen pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); 819a1e991dSShuo Chen for (ChannelList::iterator it = activeChannels_.begin(); 829a1e991dSShuo Chen it != activeChannels_.end(); ++it) 839a1e991dSShuo Chen { 849a1e991dSShuo Chen (*it)->handleEvent(); 859a1e991dSShuo Chen } 869a1e991dSShuo Chen doPendingFunctors(); 879a1e991dSShuo Chen } 889a1e991dSShuo Chen 899a1e991dSShuo Chen LOG_TRACE << "EventLoop " << this << " stop looping"; 909a1e991dSShuo Chen looping_ = false; 919a1e991dSShuo Chen} 929a1e991dSShuo Chen 939a1e991dSShuo Chenvoid EventLoop::quit() 949a1e991dSShuo Chen{ 959a1e991dSShuo Chen quit_ = true; 969a1e991dSShuo Chen if (!isInLoopThread()) 979a1e991dSShuo Chen { 989a1e991dSShuo Chen wakeup(); 999a1e991dSShuo Chen } 1009a1e991dSShuo Chen} 1019a1e991dSShuo Chen 1029a1e991dSShuo Chenvoid EventLoop::runInLoop(const Functor& cb) 1039a1e991dSShuo Chen{ 1049a1e991dSShuo Chen if (isInLoopThread()) 1059a1e991dSShuo Chen { 1069a1e991dSShuo Chen cb(); 1079a1e991dSShuo Chen } 1089a1e991dSShuo Chen else 1099a1e991dSShuo Chen { 1109a1e991dSShuo Chen queueInLoop(cb); 1119a1e991dSShuo Chen } 1129a1e991dSShuo Chen} 1139a1e991dSShuo Chen 1149a1e991dSShuo Chenvoid EventLoop::queueInLoop(const Functor& cb) 1159a1e991dSShuo Chen{ 1169a1e991dSShuo Chen { 1179a1e991dSShuo Chen MutexLockGuard lock(mutex_); 1189a1e991dSShuo Chen pendingFunctors_.push_back(cb); 1199a1e991dSShuo Chen } 1209a1e991dSShuo Chen 1210f776063SShuo Chen if (!isInLoopThread() || callingPendingFunctors_) 1229a1e991dSShuo Chen { 1239a1e991dSShuo Chen wakeup(); 1249a1e991dSShuo Chen } 1259a1e991dSShuo Chen} 1269a1e991dSShuo Chen 1279a1e991dSShuo ChenTimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) 1289a1e991dSShuo Chen{ 1299a1e991dSShuo Chen return timerQueue_->addTimer(cb, time, 0.0); 1309a1e991dSShuo Chen} 1319a1e991dSShuo Chen 1329a1e991dSShuo ChenTimerId EventLoop::runAfter(double delay, const TimerCallback& cb) 1339a1e991dSShuo Chen{ 1349a1e991dSShuo Chen Timestamp time(addTime(Timestamp::now(), delay)); 1359a1e991dSShuo Chen return runAt(time, cb); 1369a1e991dSShuo Chen} 1379a1e991dSShuo Chen 1389a1e991dSShuo ChenTimerId EventLoop::runEvery(double interval, const TimerCallback& cb) 1399a1e991dSShuo Chen{ 1409a1e991dSShuo Chen Timestamp time(addTime(Timestamp::now(), interval)); 1419a1e991dSShuo Chen return timerQueue_->addTimer(cb, time, interval); 1429a1e991dSShuo Chen} 1439a1e991dSShuo Chen 1449a1e991dSShuo Chenvoid EventLoop::updateChannel(Channel* channel) 1459a1e991dSShuo Chen{ 1469a1e991dSShuo Chen assert(channel->ownerLoop() == this); 1479a1e991dSShuo Chen assertInLoopThread(); 1489a1e991dSShuo Chen poller_->updateChannel(channel); 1499a1e991dSShuo Chen} 1509a1e991dSShuo Chen 1519a1e991dSShuo Chenvoid EventLoop::abortNotInLoopThread() 1529a1e991dSShuo Chen{ 1539a1e991dSShuo Chen LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this 1549a1e991dSShuo Chen << " was created in threadId_ = " << threadId_ 1559a1e991dSShuo Chen << ", current thread id = " << CurrentThread::tid(); 1569a1e991dSShuo Chen} 1579a1e991dSShuo Chen 1589a1e991dSShuo Chenvoid EventLoop::wakeup() 1599a1e991dSShuo Chen{ 1609a1e991dSShuo Chen uint64_t one = 1; 1619a1e991dSShuo Chen ssize_t n = ::write(wakeupFd_, &one, sizeof one); 1629a1e991dSShuo Chen if (n != sizeof one) 1639a1e991dSShuo Chen { 1649a1e991dSShuo Chen LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; 1659a1e991dSShuo Chen } 1669a1e991dSShuo Chen} 1679a1e991dSShuo Chen 1689a1e991dSShuo Chenvoid EventLoop::handleRead() 1699a1e991dSShuo Chen{ 1709a1e991dSShuo Chen uint64_t one = 1; 1719a1e991dSShuo Chen ssize_t n = ::read(wakeupFd_, &one, sizeof one); 1729a1e991dSShuo Chen if (n != sizeof one) 1739a1e991dSShuo Chen { 1749a1e991dSShuo Chen LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; 1759a1e991dSShuo Chen } 1769a1e991dSShuo Chen} 1779a1e991dSShuo Chen 1789a1e991dSShuo Chenvoid EventLoop::doPendingFunctors() 1799a1e991dSShuo Chen{ 1809a1e991dSShuo Chen std::vector<Functor> functors; 1819a1e991dSShuo Chen callingPendingFunctors_ = true; 1829a1e991dSShuo Chen 1839a1e991dSShuo Chen { 1849a1e991dSShuo Chen MutexLockGuard lock(mutex_); 1859a1e991dSShuo Chen functors.swap(pendingFunctors_); 1869a1e991dSShuo Chen } 1879a1e991dSShuo Chen 1889a1e991dSShuo Chen for (size_t i = 0; i < functors.size(); ++i) 1899a1e991dSShuo Chen { 1909a1e991dSShuo Chen functors[i](); 1919a1e991dSShuo Chen } 1929a1e991dSShuo Chen callingPendingFunctors_ = false; 1939a1e991dSShuo Chen} 1949a1e991dSShuo Chen 195