1e254a845SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2e254a845SShuo Chen// 3e254a845SShuo Chen// Use of this source code is governed by a BSD-style license 4e254a845SShuo Chen// that can be found in the License file. 5e254a845SShuo Chen// 6e254a845SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7e254a845SShuo Chen 8e254a845SShuo Chen#include "TcpConnection.h" 9e254a845SShuo Chen 10e254a845SShuo Chen#include "logging/Logging.h" 11e254a845SShuo Chen#include "Channel.h" 12e254a845SShuo Chen#include "EventLoop.h" 13e254a845SShuo Chen#include "Socket.h" 14e254a845SShuo Chen#include "SocketsOps.h" 15e254a845SShuo Chen 16e254a845SShuo Chen#include <boost/bind.hpp> 17e254a845SShuo Chen 18e254a845SShuo Chen#include <errno.h> 19e254a845SShuo Chen#include <stdio.h> 20e254a845SShuo Chen 21e254a845SShuo Chenusing namespace muduo; 22e254a845SShuo Chen 23e254a845SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 24e254a845SShuo Chen const std::string& nameArg, 25e254a845SShuo Chen int sockfd, 26e254a845SShuo Chen const InetAddress& localAddr, 27e254a845SShuo Chen const InetAddress& peerAddr) 28e254a845SShuo Chen : loop_(CHECK_NOTNULL(loop)), 29e254a845SShuo Chen name_(nameArg), 30e254a845SShuo Chen state_(kConnecting), 31e254a845SShuo Chen socket_(new Socket(sockfd)), 32e254a845SShuo Chen channel_(new Channel(loop, sockfd)), 33e254a845SShuo Chen localAddr_(localAddr), 34e254a845SShuo Chen peerAddr_(peerAddr) 35e254a845SShuo Chen{ 36e254a845SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37e254a845SShuo Chen << " fd=" << sockfd; 38e254a845SShuo Chen channel_->setReadCallback( 39e254a845SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 40e254a845SShuo Chen channel_->setWriteCallback( 41e254a845SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 42e254a845SShuo Chen channel_->setCloseCallback( 43e254a845SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 44e254a845SShuo Chen channel_->setErrorCallback( 45e254a845SShuo Chen boost::bind(&TcpConnection::handleError, this)); 46e254a845SShuo Chen} 47e254a845SShuo Chen 48e254a845SShuo ChenTcpConnection::~TcpConnection() 49e254a845SShuo Chen{ 50e254a845SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51e254a845SShuo Chen << " fd=" << channel_->fd(); 52e254a845SShuo Chen} 53e254a845SShuo Chen 54e254a845SShuo Chenvoid TcpConnection::send(const std::string& message) 55e254a845SShuo Chen{ 56e254a845SShuo Chen if (state_ == kConnected) { 57e254a845SShuo Chen if (loop_->isInLoopThread()) { 58e254a845SShuo Chen sendInLoop(message); 59e254a845SShuo Chen } else { 60e254a845SShuo Chen loop_->runInLoop( 61e254a845SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62e254a845SShuo Chen } 63e254a845SShuo Chen } 64e254a845SShuo Chen} 65e254a845SShuo Chen 66e254a845SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67e254a845SShuo Chen{ 68e254a845SShuo Chen loop_->assertInLoopThread(); 69e254a845SShuo Chen ssize_t nwrote = 0; 70e254a845SShuo Chen // if no thing in output queue, try writing directly 71e254a845SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72e254a845SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73e254a845SShuo Chen if (nwrote >= 0) { 74e254a845SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75e254a845SShuo Chen LOG_TRACE << "I am going to write more data"; 76e254a845SShuo Chen } else if (writeCompleteCallback_) { 77e254a845SShuo Chen loop_->queueInLoop( 78e254a845SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 79e254a845SShuo Chen } 80e254a845SShuo Chen } else { 81e254a845SShuo Chen nwrote = 0; 82e254a845SShuo Chen if (errno != EWOULDBLOCK) { 83e254a845SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 84e254a845SShuo Chen } 85e254a845SShuo Chen } 86e254a845SShuo Chen } 87e254a845SShuo Chen 88e254a845SShuo Chen assert(nwrote >= 0); 89e254a845SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 90e254a845SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 91e254a845SShuo Chen if (!channel_->isWriting()) { 92e254a845SShuo Chen channel_->enableWriting(); 93e254a845SShuo Chen } 94e254a845SShuo Chen } 95e254a845SShuo Chen} 96e254a845SShuo Chen 97e254a845SShuo Chenvoid TcpConnection::shutdown() 98e254a845SShuo Chen{ 99e254a845SShuo Chen // FIXME: use compare and swap 100e254a845SShuo Chen if (state_ == kConnected) 101e254a845SShuo Chen { 102e254a845SShuo Chen setState(kDisconnecting); 103e254a845SShuo Chen // FIXME: shared_from_this()? 104e254a845SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 105e254a845SShuo Chen } 106e254a845SShuo Chen} 107e254a845SShuo Chen 108e254a845SShuo Chenvoid TcpConnection::shutdownInLoop() 109e254a845SShuo Chen{ 110e254a845SShuo Chen loop_->assertInLoopThread(); 111e254a845SShuo Chen if (!channel_->isWriting()) 112e254a845SShuo Chen { 113e254a845SShuo Chen // we are not writing 114e254a845SShuo Chen socket_->shutdownWrite(); 115e254a845SShuo Chen } 116e254a845SShuo Chen} 117e254a845SShuo Chen 118e254a845SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on) 119e254a845SShuo Chen{ 120e254a845SShuo Chen socket_->setTcpNoDelay(on); 121e254a845SShuo Chen} 122e254a845SShuo Chen 123e254a845SShuo Chenvoid TcpConnection::connectEstablished() 124e254a845SShuo Chen{ 125e254a845SShuo Chen loop_->assertInLoopThread(); 126e254a845SShuo Chen assert(state_ == kConnecting); 127e254a845SShuo Chen setState(kConnected); 128e254a845SShuo Chen channel_->enableReading(); 129e254a845SShuo Chen connectionCallback_(shared_from_this()); 130e254a845SShuo Chen} 131e254a845SShuo Chen 132e254a845SShuo Chenvoid TcpConnection::connectDestroyed() 133e254a845SShuo Chen{ 134e254a845SShuo Chen loop_->assertInLoopThread(); 135e254a845SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 136e254a845SShuo Chen setState(kDisconnected); 137e254a845SShuo Chen channel_->disableAll(); 138e254a845SShuo Chen connectionCallback_(shared_from_this()); 139e254a845SShuo Chen 140e254a845SShuo Chen loop_->removeChannel(get_pointer(channel_)); 141e254a845SShuo Chen} 142e254a845SShuo Chen 143e254a845SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 144e254a845SShuo Chen{ 145e254a845SShuo Chen int savedErrno = 0; 146e254a845SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 147e254a845SShuo Chen if (n > 0) { 148e254a845SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 149e254a845SShuo Chen } else if (n == 0) { 150e254a845SShuo Chen handleClose(); 151e254a845SShuo Chen } else { 1520dd528a5SShuo Chen errno = savedErrno; 1530dd528a5SShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 154e254a845SShuo Chen handleError(); 155e254a845SShuo Chen } 156e254a845SShuo Chen} 157e254a845SShuo Chen 158e254a845SShuo Chenvoid TcpConnection::handleWrite() 159e254a845SShuo Chen{ 160e254a845SShuo Chen loop_->assertInLoopThread(); 161e254a845SShuo Chen if (channel_->isWriting()) { 162e254a845SShuo Chen ssize_t n = ::write(channel_->fd(), 163e254a845SShuo Chen outputBuffer_.peek(), 164e254a845SShuo Chen outputBuffer_.readableBytes()); 165e254a845SShuo Chen if (n > 0) { 166e254a845SShuo Chen outputBuffer_.retrieve(n); 167e254a845SShuo Chen if (outputBuffer_.readableBytes() == 0) { 168e254a845SShuo Chen channel_->disableWriting(); 169e254a845SShuo Chen if (writeCompleteCallback_) { 170e254a845SShuo Chen loop_->queueInLoop( 171e254a845SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 172e254a845SShuo Chen } 173e254a845SShuo Chen if (state_ == kDisconnecting) { 174e254a845SShuo Chen shutdownInLoop(); 175e254a845SShuo Chen } 176e254a845SShuo Chen } else { 177e254a845SShuo Chen LOG_TRACE << "I am going to write more data"; 178e254a845SShuo Chen } 179e254a845SShuo Chen } else { 180e254a845SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 181e254a845SShuo Chen } 182e254a845SShuo Chen } else { 183e254a845SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 184e254a845SShuo Chen } 185e254a845SShuo Chen} 186e254a845SShuo Chen 187e254a845SShuo Chenvoid TcpConnection::handleClose() 188e254a845SShuo Chen{ 189e254a845SShuo Chen loop_->assertInLoopThread(); 190e254a845SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 191e254a845SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 192e254a845SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 193e254a845SShuo Chen channel_->disableAll(); 194e254a845SShuo Chen // must be the last line 195e254a845SShuo Chen closeCallback_(shared_from_this()); 196e254a845SShuo Chen} 197e254a845SShuo Chen 198e254a845SShuo Chenvoid TcpConnection::handleError() 199e254a845SShuo Chen{ 200e254a845SShuo Chen int err = sockets::getSocketError(channel_->fd()); 201e254a845SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 202e254a845SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 203e254a845SShuo Chen} 204