TcpClient.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#include "TcpClient.h"
9a1bde736SShuo Chen
10a1bde736SShuo Chen#include "Connector.h"
11a1bde736SShuo Chen#include "EventLoop.h"
12a1bde736SShuo Chen#include "SocketsOps.h"
13a1bde736SShuo Chen
14a1bde736SShuo Chen#include "logging/Logging.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen
18a1bde736SShuo Chen#include <stdio.h>  // snprintf
19a1bde736SShuo Chen
20a1bde736SShuo Chenusing namespace muduo;
21a1bde736SShuo Chen
22a1bde736SShuo Chen// TcpClient::TcpClient(EventLoop* loop)
23a1bde736SShuo Chen//   : loop_(loop)
24a1bde736SShuo Chen// {
25a1bde736SShuo Chen// }
26a1bde736SShuo Chen
27a1bde736SShuo Chen// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
28a1bde736SShuo Chen//   : loop_(CHECK_NOTNULL(loop)),
29a1bde736SShuo Chen//     serverAddr_(host, port)
30a1bde736SShuo Chen// {
31a1bde736SShuo Chen// }
32a1bde736SShuo Chen
33a1bde736SShuo Chennamespace muduo
34a1bde736SShuo Chen{
35a1bde736SShuo Chennamespace detail
36a1bde736SShuo Chen{
37a1bde736SShuo Chen
38a1bde736SShuo Chenvoid removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
39a1bde736SShuo Chen{
40a1bde736SShuo Chen  loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
41a1bde736SShuo Chen}
42a1bde736SShuo Chen
43a1bde736SShuo Chenvoid removeConnector(const ConnectorPtr& connector)
44a1bde736SShuo Chen{
45a1bde736SShuo Chen  //connector->
46a1bde736SShuo Chen}
47a1bde736SShuo Chen
48a1bde736SShuo Chen}
49a1bde736SShuo Chen}
50a1bde736SShuo Chen
51a1bde736SShuo ChenTcpClient::TcpClient(EventLoop* loop,
52a1bde736SShuo Chen                     const InetAddress& serverAddr)
53a1bde736SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
54a1bde736SShuo Chen    connector_(new Connector(loop, serverAddr)),
55a1bde736SShuo Chen    retry_(false),
56a1bde736SShuo Chen    connect_(true),
57a1bde736SShuo Chen    nextConnId_(1)
58a1bde736SShuo Chen{
59a1bde736SShuo Chen  connector_->setNewConnectionCallback(
60a1bde736SShuo Chen      boost::bind(&TcpClient::newConnection, this, _1));
61a1bde736SShuo Chen  // FIXME setConnectFailedCallback
62a1bde736SShuo Chen  LOG_INFO << "TcpClient::TcpClient[" << this
63a1bde736SShuo Chen           << "] - connector " << get_pointer(connector_);
64a1bde736SShuo Chen}
65a1bde736SShuo Chen
66a1bde736SShuo ChenTcpClient::~TcpClient()
67a1bde736SShuo Chen{
68a1bde736SShuo Chen  LOG_INFO << "TcpClient::~TcpClient[" << this
69a1bde736SShuo Chen           << "] - connector " << get_pointer(connector_);
70a1bde736SShuo Chen  TcpConnectionPtr conn;
71a1bde736SShuo Chen  {
72a1bde736SShuo Chen    MutexLockGuard lock(mutex_);
73a1bde736SShuo Chen    conn = connection_;
74a1bde736SShuo Chen  }
75a1bde736SShuo Chen  if (conn)
76a1bde736SShuo Chen  {
77a1bde736SShuo Chen    // FIXME: not 100% safe, if we are in different thread
78a1bde736SShuo Chen    CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1);
79a1bde736SShuo Chen    loop_->runInLoop(
80a1bde736SShuo Chen        boost::bind(&TcpConnection::setCloseCallback, conn, cb));
81a1bde736SShuo Chen  }
82a1bde736SShuo Chen  else
83a1bde736SShuo Chen  {
84a1bde736SShuo Chen    connector_->stop();
85a1bde736SShuo Chen    // FIXME: HACK
86a1bde736SShuo Chen    loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_));
87a1bde736SShuo Chen  }
88a1bde736SShuo Chen}
89a1bde736SShuo Chen
90a1bde736SShuo Chenvoid TcpClient::connect()
91a1bde736SShuo Chen{
92a1bde736SShuo Chen  // FIXME: check state
93a1bde736SShuo Chen  LOG_INFO << "TcpClient::connect[" << this << "] - connecting to "
94a1bde736SShuo Chen           << connector_->serverAddress().toHostPort();
95a1bde736SShuo Chen  connect_ = true;
96a1bde736SShuo Chen  connector_->start();
97a1bde736SShuo Chen}
98a1bde736SShuo Chen
99a1bde736SShuo Chenvoid TcpClient::disconnect()
100a1bde736SShuo Chen{
101a1bde736SShuo Chen  connect_ = false;
102a1bde736SShuo Chen
103a1bde736SShuo Chen  {
104a1bde736SShuo Chen    MutexLockGuard lock(mutex_);
105a1bde736SShuo Chen    if (connection_)
106a1bde736SShuo Chen    {
107a1bde736SShuo Chen      connection_->shutdown();
108a1bde736SShuo Chen    }
109a1bde736SShuo Chen  }
110a1bde736SShuo Chen}
111a1bde736SShuo Chen
112a1bde736SShuo Chenvoid TcpClient::stop()
113a1bde736SShuo Chen{
114a1bde736SShuo Chen  connect_ = false;
115a1bde736SShuo Chen  connector_->stop();
116a1bde736SShuo Chen}
117a1bde736SShuo Chen
118a1bde736SShuo Chenvoid TcpClient::newConnection(int sockfd)
119a1bde736SShuo Chen{
120a1bde736SShuo Chen  loop_->assertInLoopThread();
121a1bde736SShuo Chen  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
122a1bde736SShuo Chen  char buf[32];
123a1bde736SShuo Chen  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toHostPort().c_str(), nextConnId_);
124a1bde736SShuo Chen  ++nextConnId_;
125a1bde736SShuo Chen  string connName = buf;
126a1bde736SShuo Chen
127a1bde736SShuo Chen  InetAddress localAddr(sockets::getLocalAddr(sockfd));
128a1bde736SShuo Chen  // FIXME poll with zero timeout to double confirm the new connection
129a1bde736SShuo Chen  // FIXME use make_shared if necessary
130a1bde736SShuo Chen  TcpConnectionPtr conn(new TcpConnection(loop_,
131a1bde736SShuo Chen                                          connName,
132a1bde736SShuo Chen                                          sockfd,
133a1bde736SShuo Chen                                          localAddr,
134a1bde736SShuo Chen                                          peerAddr));
135a1bde736SShuo Chen
136a1bde736SShuo Chen  conn->setConnectionCallback(connectionCallback_);
137a1bde736SShuo Chen  conn->setMessageCallback(messageCallback_);
138a1bde736SShuo Chen  conn->setWriteCompleteCallback(writeCompleteCallback_);
139a1bde736SShuo Chen  conn->setCloseCallback(
140a1bde736SShuo Chen      boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
141a1bde736SShuo Chen  {
142a1bde736SShuo Chen    MutexLockGuard lock(mutex_);
143a1bde736SShuo Chen    connection_ = conn;
144a1bde736SShuo Chen  }
145a1bde736SShuo Chen  conn->connectEstablished();
146a1bde736SShuo Chen}
147a1bde736SShuo Chen
148a1bde736SShuo Chenvoid TcpClient::removeConnection(const TcpConnectionPtr& conn)
149a1bde736SShuo Chen{
150a1bde736SShuo Chen  loop_->assertInLoopThread();
151a1bde736SShuo Chen  assert(loop_ == conn->getLoop());
152a1bde736SShuo Chen
153a1bde736SShuo Chen  {
154a1bde736SShuo Chen    MutexLockGuard lock(mutex_);
155a1bde736SShuo Chen    assert(connection_ == conn);
156a1bde736SShuo Chen    connection_.reset();
157a1bde736SShuo Chen  }
158a1bde736SShuo Chen
159a1bde736SShuo Chen  loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
160a1bde736SShuo Chen  // FIXME wake up ?
161a1bde736SShuo Chen  if (retry_ && connect_)
162a1bde736SShuo Chen  {
163a1bde736SShuo Chen    LOG_INFO << "TcpClient::connect[" << this << "] - Reconnecting to "
164a1bde736SShuo Chen             << connector_->serverAddress().toHostPort();
165a1bde736SShuo Chen    connector_->restart();
166a1bde736SShuo Chen  }
167a1bde736SShuo Chen}
168a1bde736SShuo Chen
169