// excerpts from http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // // Author: Shuo Chen (chenshuo at chenshuo dot com) #include "TcpServer.h" #include "logging/Logging.h" #include "Acceptor.h" #include "EventLoop.h" +#include "EventLoopThreadPool.h" #include "SocketsOps.h" #include #include // snprintf using namespace muduo; TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr) : loop_(CHECK_NOTNULL(loop)), name_(listenAddr.toHostPort()), acceptor_(new Acceptor(loop, listenAddr)), + threadPool_(new EventLoopThreadPool(loop)), started_(false), nextConnId_(1) { acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); } TcpServer::~TcpServer() { } +void TcpServer::setThreadNum(int numThreads) +{ + assert(0 <= numThreads); + threadPool_->setThreadNum(numThreads); +} + void TcpServer::start() { if (!started_) { started_ = true; + threadPool_->start(); } if (!acceptor_->listenning()) { loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_))); } } void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); char buf[32]; snprintf(buf, sizeof buf, "#%d", nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toHostPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection + EventLoop* ioLoop = threadPool_->getNextLoop(); TcpConnectionPtr conn( ! new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ! ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); } void TcpServer::removeConnection(const TcpConnectionPtr& conn) { + // FIXME: unsafe + loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn)); +} +void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) +{ loop_->assertInLoopThread(); ! LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); size_t n = connections_.erase(conn->name()); assert(n == 1); (void)n; + EventLoop* ioLoop = conn->getLoop(); ! ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); }