TcpServer.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#include "TcpServer.h"
9a1bde736SShuo Chen
10a1bde736SShuo Chen#include "logging/Logging.h"
11a1bde736SShuo Chen#include "Acceptor.h"
12a1bde736SShuo Chen#include "EventLoop.h"
13a1bde736SShuo Chen#include "EventLoopThreadPool.h"
14a1bde736SShuo Chen#include "SocketsOps.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen
18a1bde736SShuo Chen#include <stdio.h>  // snprintf
19a1bde736SShuo Chen
20a1bde736SShuo Chenusing namespace muduo;
21a1bde736SShuo Chen
22a1bde736SShuo ChenTcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr)
23a1bde736SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
24a1bde736SShuo Chen    name_(listenAddr.toHostPort()),
25a1bde736SShuo Chen    acceptor_(new Acceptor(loop, listenAddr)),
26a1bde736SShuo Chen    threadPool_(new EventLoopThreadPool(loop)),
27a1bde736SShuo Chen    started_(false),
28a1bde736SShuo Chen    nextConnId_(1)
29a1bde736SShuo Chen{
30a1bde736SShuo Chen  acceptor_->setNewConnectionCallback(
31a1bde736SShuo Chen      boost::bind(&TcpServer::newConnection, this, _1, _2));
32a1bde736SShuo Chen}
33a1bde736SShuo Chen
34a1bde736SShuo ChenTcpServer::~TcpServer()
35a1bde736SShuo Chen{
36a1bde736SShuo Chen}
37a1bde736SShuo Chen
38a1bde736SShuo Chenvoid TcpServer::setThreadNum(int numThreads)
39a1bde736SShuo Chen{
40a1bde736SShuo Chen  assert(0 <= numThreads);
41a1bde736SShuo Chen  threadPool_->setThreadNum(numThreads);
42a1bde736SShuo Chen}
43a1bde736SShuo Chen
44a1bde736SShuo Chenvoid TcpServer::start()
45a1bde736SShuo Chen{
46a1bde736SShuo Chen  if (!started_)
47a1bde736SShuo Chen  {
48a1bde736SShuo Chen    started_ = true;
49a1bde736SShuo Chen    threadPool_->start();
50a1bde736SShuo Chen  }
51a1bde736SShuo Chen
52a1bde736SShuo Chen  if (!acceptor_->listenning())
53a1bde736SShuo Chen  {
54a1bde736SShuo Chen    loop_->runInLoop(
55a1bde736SShuo Chen        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
56a1bde736SShuo Chen  }
57a1bde736SShuo Chen}
58a1bde736SShuo Chen
59a1bde736SShuo Chenvoid TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
60a1bde736SShuo Chen{
61a1bde736SShuo Chen  loop_->assertInLoopThread();
62a1bde736SShuo Chen  char buf[32];
63a1bde736SShuo Chen  snprintf(buf, sizeof buf, "#%d", nextConnId_);
64a1bde736SShuo Chen  ++nextConnId_;
65a1bde736SShuo Chen  std::string connName = name_ + buf;
66a1bde736SShuo Chen
67a1bde736SShuo Chen  LOG_INFO << "TcpServer::newConnection [" << name_
68a1bde736SShuo Chen           << "] - new connection [" << connName
69a1bde736SShuo Chen           << "] from " << peerAddr.toHostPort();
70a1bde736SShuo Chen  InetAddress localAddr(sockets::getLocalAddr(sockfd));
71a1bde736SShuo Chen  // FIXME poll with zero timeout to double confirm the new connection
72a1bde736SShuo Chen  EventLoop* ioLoop = threadPool_->getNextLoop();
73a1bde736SShuo Chen  TcpConnectionPtr conn(
74a1bde736SShuo Chen      new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
75a1bde736SShuo Chen  connections_[connName] = conn;
76a1bde736SShuo Chen  conn->setConnectionCallback(connectionCallback_);
77a1bde736SShuo Chen  conn->setMessageCallback(messageCallback_);
78a1bde736SShuo Chen  conn->setWriteCompleteCallback(writeCompleteCallback_);
79a1bde736SShuo Chen  conn->setCloseCallback(
80a1bde736SShuo Chen      boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
81a1bde736SShuo Chen  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
82a1bde736SShuo Chen}
83a1bde736SShuo Chen
84a1bde736SShuo Chenvoid TcpServer::removeConnection(const TcpConnectionPtr& conn)
85a1bde736SShuo Chen{
86a1bde736SShuo Chen  // FIXME: unsafe
87a1bde736SShuo Chen  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
88a1bde736SShuo Chen}
89a1bde736SShuo Chen
90a1bde736SShuo Chenvoid TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
91a1bde736SShuo Chen{
92a1bde736SShuo Chen  loop_->assertInLoopThread();
93a1bde736SShuo Chen  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
94a1bde736SShuo Chen           << "] - connection " << conn->name();
95a1bde736SShuo Chen  size_t n = connections_.erase(conn->name());
96a1bde736SShuo Chen  assert(n == 1); (void)n;
97a1bde736SShuo Chen  EventLoop* ioLoop = conn->getLoop();
98a1bde736SShuo Chen  ioLoop->queueInLoop(
99a1bde736SShuo Chen      boost::bind(&TcpConnection::connectDestroyed, conn));
100a1bde736SShuo Chen}
101a1bde736SShuo Chen
102