1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "TcpServer.h" 9 10 #include "logging/Logging.h" 11 #include "Acceptor.h" 12 #include "EventLoop.h" 13+#include "EventLoopThreadPool.h" 14 #include "SocketsOps.h" 15 16 #include <boost/bind.hpp> 17 18 #include <stdio.h> // snprintf 19 20 using namespace muduo; 21 22 TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr) 23 : loop_(CHECK_NOTNULL(loop)), 24 name_(listenAddr.toHostPort()), 25 acceptor_(new Acceptor(loop, listenAddr)), 26+ threadPool_(new EventLoopThreadPool(loop)), 27 started_(false), 28 nextConnId_(1) 29 { 30 acceptor_->setNewConnectionCallback( 31 boost::bind(&TcpServer::newConnection, this, _1, _2)); 32 } 33 34 TcpServer::~TcpServer() 35 { 36 } 37 38+void TcpServer::setThreadNum(int numThreads) 39+{ 40+ assert(0 <= numThreads); 41+ threadPool_->setThreadNum(numThreads); 42+} 43+ 44 void TcpServer::start() 45 { 46 if (!started_) 47 { 48 started_ = true; 49+ threadPool_->start(); 50 } 51 52 if (!acceptor_->listenning()) 53 { 54 loop_->runInLoop( 55 boost::bind(&Acceptor::listen, get_pointer(acceptor_))); 56 } 57 } 58 59 void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) 60 { 61 loop_->assertInLoopThread(); 62 char buf[32]; 63 snprintf(buf, sizeof buf, "#%d", nextConnId_); 64 ++nextConnId_; 65 std::string connName = name_ + buf; 66 67 LOG_INFO << "TcpServer::newConnection [" << name_ 68 << "] - new connection [" << connName 69 << "] from " << peerAddr.toHostPort(); 70 InetAddress localAddr(sockets::getLocalAddr(sockfd)); 71 // FIXME poll with zero timeout to double confirm the new connection 72+ EventLoop* ioLoop = threadPool_->getNextLoop(); 73 TcpConnectionPtr conn( 74! new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); 75 connections_[connName] = conn; 76 conn->setConnectionCallback(connectionCallback_); 77 conn->setMessageCallback(messageCallback_); 78 conn->setWriteCompleteCallback(writeCompleteCallback_); 79 conn->setCloseCallback( 80 boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe 81! ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); 82 } 83 84 void TcpServer::removeConnection(const TcpConnectionPtr& conn) 85 { 86+ // FIXME: unsafe 87+ loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn)); 88+} 89 90+void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) 91+{ 92 loop_->assertInLoopThread(); 93! LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ 94 << "] - connection " << conn->name(); 95 size_t n = connections_.erase(conn->name()); 96 assert(n == 1); (void)n; 97+ EventLoop* ioLoop = conn->getLoop(); 98! ioLoop->queueInLoop( 99 boost::bind(&TcpConnection::connectDestroyed, conn)); 100 } 101 102