TcpClient.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2a1bde736SShuo Chen// 3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license 4a1bde736SShuo Chen// that can be found in the License file. 5a1bde736SShuo Chen// 6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7a1bde736SShuo Chen 8a1bde736SShuo Chen#include "TcpClient.h" 9a1bde736SShuo Chen 10a1bde736SShuo Chen#include "Connector.h" 11a1bde736SShuo Chen#include "EventLoop.h" 12a1bde736SShuo Chen#include "SocketsOps.h" 13a1bde736SShuo Chen 14a1bde736SShuo Chen#include "logging/Logging.h" 15a1bde736SShuo Chen 16a1bde736SShuo Chen#include <boost/bind.hpp> 17a1bde736SShuo Chen 18a1bde736SShuo Chen#include <stdio.h> // snprintf 19a1bde736SShuo Chen 20a1bde736SShuo Chenusing namespace muduo; 21a1bde736SShuo Chen 22a1bde736SShuo Chen// TcpClient::TcpClient(EventLoop* loop) 23a1bde736SShuo Chen// : loop_(loop) 24a1bde736SShuo Chen// { 25a1bde736SShuo Chen// } 26a1bde736SShuo Chen 27a1bde736SShuo Chen// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port) 28a1bde736SShuo Chen// : loop_(CHECK_NOTNULL(loop)), 29a1bde736SShuo Chen// serverAddr_(host, port) 30a1bde736SShuo Chen// { 31a1bde736SShuo Chen// } 32a1bde736SShuo Chen 33a1bde736SShuo Chennamespace muduo 34a1bde736SShuo Chen{ 35a1bde736SShuo Chennamespace detail 36a1bde736SShuo Chen{ 37a1bde736SShuo Chen 38a1bde736SShuo Chenvoid removeConnection(EventLoop* loop, const TcpConnectionPtr& conn) 39a1bde736SShuo Chen{ 40a1bde736SShuo Chen loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); 41a1bde736SShuo Chen} 42a1bde736SShuo Chen 43a1bde736SShuo Chenvoid removeConnector(const ConnectorPtr& connector) 44a1bde736SShuo Chen{ 45a1bde736SShuo Chen //connector-> 46a1bde736SShuo Chen} 47a1bde736SShuo Chen 48a1bde736SShuo Chen} 49a1bde736SShuo Chen} 50a1bde736SShuo Chen 51a1bde736SShuo ChenTcpClient::TcpClient(EventLoop* loop, 52a1bde736SShuo Chen const InetAddress& serverAddr) 53a1bde736SShuo Chen : loop_(CHECK_NOTNULL(loop)), 54a1bde736SShuo Chen connector_(new Connector(loop, serverAddr)), 55a1bde736SShuo Chen retry_(false), 56a1bde736SShuo Chen connect_(true), 57a1bde736SShuo Chen nextConnId_(1) 58a1bde736SShuo Chen{ 59a1bde736SShuo Chen connector_->setNewConnectionCallback( 60a1bde736SShuo Chen boost::bind(&TcpClient::newConnection, this, _1)); 61a1bde736SShuo Chen // FIXME setConnectFailedCallback 62a1bde736SShuo Chen LOG_INFO << "TcpClient::TcpClient[" << this 63a1bde736SShuo Chen << "] - connector " << get_pointer(connector_); 64a1bde736SShuo Chen} 65a1bde736SShuo Chen 66a1bde736SShuo ChenTcpClient::~TcpClient() 67a1bde736SShuo Chen{ 68a1bde736SShuo Chen LOG_INFO << "TcpClient::~TcpClient[" << this 69a1bde736SShuo Chen << "] - connector " << get_pointer(connector_); 70a1bde736SShuo Chen TcpConnectionPtr conn; 71a1bde736SShuo Chen { 72a1bde736SShuo Chen MutexLockGuard lock(mutex_); 73a1bde736SShuo Chen conn = connection_; 74a1bde736SShuo Chen } 75a1bde736SShuo Chen if (conn) 76a1bde736SShuo Chen { 77a1bde736SShuo Chen // FIXME: not 100% safe, if we are in different thread 78a1bde736SShuo Chen CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1); 79a1bde736SShuo Chen loop_->runInLoop( 80a1bde736SShuo Chen boost::bind(&TcpConnection::setCloseCallback, conn, cb)); 81a1bde736SShuo Chen } 82a1bde736SShuo Chen else 83a1bde736SShuo Chen { 84a1bde736SShuo Chen connector_->stop(); 85a1bde736SShuo Chen // FIXME: HACK 86a1bde736SShuo Chen loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_)); 87a1bde736SShuo Chen } 88a1bde736SShuo Chen} 89a1bde736SShuo Chen 90a1bde736SShuo Chenvoid TcpClient::connect() 91a1bde736SShuo Chen{ 92a1bde736SShuo Chen // FIXME: check state 93a1bde736SShuo Chen LOG_INFO << "TcpClient::connect[" << this << "] - connecting to " 94a1bde736SShuo Chen << connector_->serverAddress().toHostPort(); 95a1bde736SShuo Chen connect_ = true; 96a1bde736SShuo Chen connector_->start(); 97a1bde736SShuo Chen} 98a1bde736SShuo Chen 99a1bde736SShuo Chenvoid TcpClient::disconnect() 100a1bde736SShuo Chen{ 101a1bde736SShuo Chen connect_ = false; 102a1bde736SShuo Chen 103a1bde736SShuo Chen { 104a1bde736SShuo Chen MutexLockGuard lock(mutex_); 105a1bde736SShuo Chen if (connection_) 106a1bde736SShuo Chen { 107a1bde736SShuo Chen connection_->shutdown(); 108a1bde736SShuo Chen } 109a1bde736SShuo Chen } 110a1bde736SShuo Chen} 111a1bde736SShuo Chen 112a1bde736SShuo Chenvoid TcpClient::stop() 113a1bde736SShuo Chen{ 114a1bde736SShuo Chen connect_ = false; 115a1bde736SShuo Chen connector_->stop(); 116a1bde736SShuo Chen} 117a1bde736SShuo Chen 118a1bde736SShuo Chenvoid TcpClient::newConnection(int sockfd) 119a1bde736SShuo Chen{ 120a1bde736SShuo Chen loop_->assertInLoopThread(); 121a1bde736SShuo Chen InetAddress peerAddr(sockets::getPeerAddr(sockfd)); 122a1bde736SShuo Chen char buf[32]; 123a1bde736SShuo Chen snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toHostPort().c_str(), nextConnId_); 124a1bde736SShuo Chen ++nextConnId_; 125a1bde736SShuo Chen string connName = buf; 126a1bde736SShuo Chen 127a1bde736SShuo Chen InetAddress localAddr(sockets::getLocalAddr(sockfd)); 128a1bde736SShuo Chen // FIXME poll with zero timeout to double confirm the new connection 129a1bde736SShuo Chen // FIXME use make_shared if necessary 130a1bde736SShuo Chen TcpConnectionPtr conn(new TcpConnection(loop_, 131a1bde736SShuo Chen connName, 132a1bde736SShuo Chen sockfd, 133a1bde736SShuo Chen localAddr, 134a1bde736SShuo Chen peerAddr)); 135a1bde736SShuo Chen 136a1bde736SShuo Chen conn->setConnectionCallback(connectionCallback_); 137a1bde736SShuo Chen conn->setMessageCallback(messageCallback_); 138a1bde736SShuo Chen conn->setWriteCompleteCallback(writeCompleteCallback_); 139a1bde736SShuo Chen conn->setCloseCallback( 140a1bde736SShuo Chen boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe 141a1bde736SShuo Chen { 142a1bde736SShuo Chen MutexLockGuard lock(mutex_); 143a1bde736SShuo Chen connection_ = conn; 144a1bde736SShuo Chen } 145a1bde736SShuo Chen conn->connectEstablished(); 146a1bde736SShuo Chen} 147a1bde736SShuo Chen 148a1bde736SShuo Chenvoid TcpClient::removeConnection(const TcpConnectionPtr& conn) 149a1bde736SShuo Chen{ 150a1bde736SShuo Chen loop_->assertInLoopThread(); 151a1bde736SShuo Chen assert(loop_ == conn->getLoop()); 152a1bde736SShuo Chen 153a1bde736SShuo Chen { 154a1bde736SShuo Chen MutexLockGuard lock(mutex_); 155a1bde736SShuo Chen assert(connection_ == conn); 156a1bde736SShuo Chen connection_.reset(); 157a1bde736SShuo Chen } 158a1bde736SShuo Chen 159a1bde736SShuo Chen loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); 160a1bde736SShuo Chen // FIXME wake up ? 161a1bde736SShuo Chen if (retry_ && connect_) 162a1bde736SShuo Chen { 163a1bde736SShuo Chen LOG_INFO << "TcpClient::connect[" << this << "] - Reconnecting to " 164a1bde736SShuo Chen << connector_->serverAddress().toHostPort(); 165a1bde736SShuo Chen connector_->restart(); 166a1bde736SShuo Chen } 167a1bde736SShuo Chen} 168a1bde736SShuo Chen 169