1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2354280cfSShuo Chen// 3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license 4354280cfSShuo Chen// that can be found in the License file. 5354280cfSShuo Chen// 6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7354280cfSShuo Chen 8354280cfSShuo Chen#include "TcpClient.h" 9354280cfSShuo Chen 10354280cfSShuo Chen#include "Connector.h" 11354280cfSShuo Chen#include "EventLoop.h" 12354280cfSShuo Chen#include "SocketsOps.h" 13354280cfSShuo Chen 14354280cfSShuo Chen#include "logging/Logging.h" 15354280cfSShuo Chen 16354280cfSShuo Chen#include <boost/bind.hpp> 17354280cfSShuo Chen 18354280cfSShuo Chen#include <stdio.h> // snprintf 19354280cfSShuo Chen 20354280cfSShuo Chenusing namespace muduo; 21354280cfSShuo Chen 22354280cfSShuo Chen// TcpClient::TcpClient(EventLoop* loop) 23354280cfSShuo Chen// : loop_(loop) 24354280cfSShuo Chen// { 25354280cfSShuo Chen// } 26354280cfSShuo Chen 27354280cfSShuo Chen// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port) 28354280cfSShuo Chen// : loop_(CHECK_NOTNULL(loop)), 29354280cfSShuo Chen// serverAddr_(host, port) 30354280cfSShuo Chen// { 31354280cfSShuo Chen// } 32354280cfSShuo Chen 33354280cfSShuo Chennamespace muduo 34354280cfSShuo Chen{ 35354280cfSShuo Chennamespace detail 36354280cfSShuo Chen{ 37354280cfSShuo Chen 38354280cfSShuo Chenvoid removeConnection(EventLoop* loop, const TcpConnectionPtr& conn) 39354280cfSShuo Chen{ 40354280cfSShuo Chen loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); 41354280cfSShuo Chen} 42354280cfSShuo Chen 43354280cfSShuo Chenvoid removeConnector(const ConnectorPtr& connector) 44354280cfSShuo Chen{ 45354280cfSShuo Chen //connector-> 46354280cfSShuo Chen} 47354280cfSShuo Chen 48354280cfSShuo Chen} 49354280cfSShuo Chen} 50354280cfSShuo Chen 51354280cfSShuo ChenTcpClient::TcpClient(EventLoop* loop, 52354280cfSShuo Chen const InetAddress& serverAddr) 53354280cfSShuo Chen : loop_(CHECK_NOTNULL(loop)), 54354280cfSShuo Chen connector_(new Connector(loop, serverAddr)), 55354280cfSShuo Chen retry_(false), 56354280cfSShuo Chen connect_(true), 57354280cfSShuo Chen nextConnId_(1) 58354280cfSShuo Chen{ 59354280cfSShuo Chen connector_->setNewConnectionCallback( 60354280cfSShuo Chen boost::bind(&TcpClient::newConnection, this, _1)); 61354280cfSShuo Chen // FIXME setConnectFailedCallback 62354280cfSShuo Chen LOG_INFO << "TcpClient::TcpClient[" << this 63354280cfSShuo Chen << "] - connector " << get_pointer(connector_); 64354280cfSShuo Chen} 65354280cfSShuo Chen 66354280cfSShuo ChenTcpClient::~TcpClient() 67354280cfSShuo Chen{ 68354280cfSShuo Chen LOG_INFO << "TcpClient::~TcpClient[" << this 69354280cfSShuo Chen << "] - connector " << get_pointer(connector_); 70354280cfSShuo Chen TcpConnectionPtr conn; 71354280cfSShuo Chen { 72354280cfSShuo Chen MutexLockGuard lock(mutex_); 73354280cfSShuo Chen conn = connection_; 74354280cfSShuo Chen } 75354280cfSShuo Chen if (conn) 76354280cfSShuo Chen { 77354280cfSShuo Chen // FIXME: not 100% safe, if we are in different thread 78354280cfSShuo Chen CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1); 79354280cfSShuo Chen loop_->runInLoop( 80354280cfSShuo Chen boost::bind(&TcpConnection::setCloseCallback, conn, cb)); 81354280cfSShuo Chen } 82354280cfSShuo Chen else 83354280cfSShuo Chen { 84354280cfSShuo Chen connector_->stop(); 85354280cfSShuo Chen // FIXME: HACK 86354280cfSShuo Chen loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_)); 87354280cfSShuo Chen } 88354280cfSShuo Chen} 89354280cfSShuo Chen 90354280cfSShuo Chenvoid TcpClient::connect() 91354280cfSShuo Chen{ 92354280cfSShuo Chen // FIXME: check state 93354280cfSShuo Chen LOG_INFO << "TcpClient::connect[" << this << "] - connecting to " 94354280cfSShuo Chen << connector_->serverAddress().toHostPort(); 95354280cfSShuo Chen connect_ = true; 96354280cfSShuo Chen connector_->start(); 97354280cfSShuo Chen} 98354280cfSShuo Chen 99354280cfSShuo Chenvoid TcpClient::disconnect() 100354280cfSShuo Chen{ 101354280cfSShuo Chen connect_ = false; 102354280cfSShuo Chen 103354280cfSShuo Chen { 104354280cfSShuo Chen MutexLockGuard lock(mutex_); 105354280cfSShuo Chen if (connection_) 106354280cfSShuo Chen { 107354280cfSShuo Chen connection_->shutdown(); 108354280cfSShuo Chen } 109354280cfSShuo Chen } 110354280cfSShuo Chen} 111354280cfSShuo Chen 112354280cfSShuo Chenvoid TcpClient::stop() 113354280cfSShuo Chen{ 114354280cfSShuo Chen connect_ = false; 115354280cfSShuo Chen connector_->stop(); 116354280cfSShuo Chen} 117354280cfSShuo Chen 118354280cfSShuo Chenvoid TcpClient::newConnection(int sockfd) 119354280cfSShuo Chen{ 120354280cfSShuo Chen loop_->assertInLoopThread(); 121354280cfSShuo Chen InetAddress peerAddr(sockets::getPeerAddr(sockfd)); 122354280cfSShuo Chen char buf[32]; 123354280cfSShuo Chen snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toHostPort().c_str(), nextConnId_); 124354280cfSShuo Chen ++nextConnId_; 125354280cfSShuo Chen string connName = buf; 126354280cfSShuo Chen 127354280cfSShuo Chen InetAddress localAddr(sockets::getLocalAddr(sockfd)); 128354280cfSShuo Chen // FIXME poll with zero timeout to double confirm the new connection 129354280cfSShuo Chen // FIXME use make_shared if necessary 130354280cfSShuo Chen TcpConnectionPtr conn(new TcpConnection(loop_, 131354280cfSShuo Chen connName, 132354280cfSShuo Chen sockfd, 133354280cfSShuo Chen localAddr, 134354280cfSShuo Chen peerAddr)); 135354280cfSShuo Chen 136354280cfSShuo Chen conn->setConnectionCallback(connectionCallback_); 137354280cfSShuo Chen conn->setMessageCallback(messageCallback_); 138354280cfSShuo Chen conn->setWriteCompleteCallback(writeCompleteCallback_); 139354280cfSShuo Chen conn->setCloseCallback( 140354280cfSShuo Chen boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe 141354280cfSShuo Chen { 142354280cfSShuo Chen MutexLockGuard lock(mutex_); 143354280cfSShuo Chen connection_ = conn; 144354280cfSShuo Chen } 145354280cfSShuo Chen conn->connectEstablished(); 146354280cfSShuo Chen} 147354280cfSShuo Chen 148354280cfSShuo Chenvoid TcpClient::removeConnection(const TcpConnectionPtr& conn) 149354280cfSShuo Chen{ 150354280cfSShuo Chen loop_->assertInLoopThread(); 151354280cfSShuo Chen assert(loop_ == conn->getLoop()); 152354280cfSShuo Chen 153354280cfSShuo Chen { 154354280cfSShuo Chen MutexLockGuard lock(mutex_); 155354280cfSShuo Chen assert(connection_ == conn); 156354280cfSShuo Chen connection_.reset(); 157354280cfSShuo Chen } 158354280cfSShuo Chen 159354280cfSShuo Chen loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn)); 160354280cfSShuo Chen if (retry_ && connect_) 161354280cfSShuo Chen { 162354280cfSShuo Chen LOG_INFO << "TcpClient::connect[" << this << "] - Reconnecting to " 163354280cfSShuo Chen << connector_->serverAddress().toHostPort(); 164354280cfSShuo Chen connector_->restart(); 165354280cfSShuo Chen } 166354280cfSShuo Chen} 167354280cfSShuo Chen 168