1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "TcpServer.h"
9 
10 #include "logging/Logging.h"
11 #include "Acceptor.h"
12 #include "EventLoop.h"
13+#include "EventLoopThreadPool.h"
14 #include "SocketsOps.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <stdio.h>  // snprintf
19 
20 using namespace muduo;
21 
22 TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr)
23   : loop_(CHECK_NOTNULL(loop)),
24     name_(listenAddr.toHostPort()),
25     acceptor_(new Acceptor(loop, listenAddr)),
26+    threadPool_(new EventLoopThreadPool(loop)),
27     started_(false),
28     nextConnId_(1)
29 {
30   acceptor_->setNewConnectionCallback(
31       boost::bind(&TcpServer::newConnection, this, _1, _2));
32 }
33 
34 TcpServer::~TcpServer()
35 {
36 }
37 
38+void TcpServer::setThreadNum(int numThreads)
39+{
40+  assert(0 <= numThreads);
41+  threadPool_->setThreadNum(numThreads);
42+}
43+
44 void TcpServer::start()
45 {
46   if (!started_)
47   {
48     started_ = true;
49+    threadPool_->start();
50   }
51 
52   if (!acceptor_->listenning())
53   {
54     loop_->runInLoop(
55         boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
56   }
57 }
58 
59 void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
60 {
61   loop_->assertInLoopThread();
62   char buf[32];
63   snprintf(buf, sizeof buf, "#%d", nextConnId_);
64   ++nextConnId_;
65   std::string connName = name_ + buf;
66 
67   LOG_INFO << "TcpServer::newConnection [" << name_
68            << "] - new connection [" << connName
69            << "] from " << peerAddr.toHostPort();
70   InetAddress localAddr(sockets::getLocalAddr(sockfd));
71   // FIXME poll with zero timeout to double confirm the new connection
72+  EventLoop* ioLoop = threadPool_->getNextLoop();
73   TcpConnectionPtr conn(
74!      new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
75   connections_[connName] = conn;
76   conn->setConnectionCallback(connectionCallback_);
77   conn->setMessageCallback(messageCallback_);
78   conn->setWriteCompleteCallback(writeCompleteCallback_);
79   conn->setCloseCallback(
80       boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
81!  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
82 }
83 
84 void TcpServer::removeConnection(const TcpConnectionPtr& conn)
85 {
86+  // FIXME: unsafe
87+  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
88+}
89
90+void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
91+{
92   loop_->assertInLoopThread();
93!  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
94            << "] - connection " << conn->name();
95   size_t n = connections_.erase(conn->name());
96   assert(n == 1); (void)n;
97+  EventLoop* ioLoop = conn->getLoop();
98!  ioLoop->queueInLoop(
99       boost::bind(&TcpConnection::connectDestroyed, conn));
100 }
101 
102