140161064SShuo Chen// excerpts from http://code.google.com/p/muduo/ 240161064SShuo Chen// 340161064SShuo Chen// Use of this source code is governed by a BSD-style license 440161064SShuo Chen// that can be found in the License file. 540161064SShuo Chen// 640161064SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 740161064SShuo Chen 840161064SShuo Chen#include "TcpConnection.h" 940161064SShuo Chen 1040161064SShuo Chen#include "logging/Logging.h" 1140161064SShuo Chen#include "Channel.h" 1240161064SShuo Chen#include "EventLoop.h" 1340161064SShuo Chen#include "Socket.h" 1440161064SShuo Chen#include "SocketsOps.h" 1540161064SShuo Chen 1640161064SShuo Chen#include <boost/bind.hpp> 1740161064SShuo Chen 1840161064SShuo Chen#include <errno.h> 1940161064SShuo Chen#include <stdio.h> 2040161064SShuo Chen 2140161064SShuo Chenusing namespace muduo; 2240161064SShuo Chen 2340161064SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 2440161064SShuo Chen const std::string& nameArg, 2540161064SShuo Chen int sockfd, 2640161064SShuo Chen const InetAddress& localAddr, 2740161064SShuo Chen const InetAddress& peerAddr) 2840161064SShuo Chen : loop_(CHECK_NOTNULL(loop)), 2940161064SShuo Chen name_(nameArg), 3040161064SShuo Chen state_(kConnecting), 3140161064SShuo Chen socket_(new Socket(sockfd)), 3240161064SShuo Chen channel_(new Channel(loop, sockfd)), 3340161064SShuo Chen localAddr_(localAddr), 3440161064SShuo Chen peerAddr_(peerAddr) 3540161064SShuo Chen{ 3640161064SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 3740161064SShuo Chen << " fd=" << sockfd; 3840161064SShuo Chen channel_->setReadCallback( 3940161064SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 4040161064SShuo Chen channel_->setWriteCallback( 4140161064SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 4240161064SShuo Chen channel_->setCloseCallback( 4340161064SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 4440161064SShuo Chen channel_->setErrorCallback( 4540161064SShuo Chen boost::bind(&TcpConnection::handleError, this)); 4640161064SShuo Chen} 4740161064SShuo Chen 4840161064SShuo ChenTcpConnection::~TcpConnection() 4940161064SShuo Chen{ 5040161064SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 5140161064SShuo Chen << " fd=" << channel_->fd(); 5240161064SShuo Chen} 5340161064SShuo Chen 5440161064SShuo Chenvoid TcpConnection::send(const std::string& message) 5540161064SShuo Chen{ 5640161064SShuo Chen if (state_ == kConnected) { 5740161064SShuo Chen if (loop_->isInLoopThread()) { 5840161064SShuo Chen sendInLoop(message); 5940161064SShuo Chen } else { 6040161064SShuo Chen loop_->runInLoop( 6140161064SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 6240161064SShuo Chen } 6340161064SShuo Chen } 6440161064SShuo Chen} 6540161064SShuo Chen 6640161064SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 6740161064SShuo Chen{ 6840161064SShuo Chen loop_->assertInLoopThread(); 6940161064SShuo Chen ssize_t nwrote = 0; 7040161064SShuo Chen // if no thing in output queue, try writing directly 7140161064SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 7240161064SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 7340161064SShuo Chen if (nwrote >= 0) { 7440161064SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 7540161064SShuo Chen LOG_TRACE << "I am going to write more data"; 7640161064SShuo Chen } else if (writeCompleteCallback_) { 7740161064SShuo Chen loop_->queueInLoop( 7840161064SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 7940161064SShuo Chen } 8040161064SShuo Chen } else { 8140161064SShuo Chen nwrote = 0; 8240161064SShuo Chen if (errno != EWOULDBLOCK) { 8340161064SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 8440161064SShuo Chen } 8540161064SShuo Chen } 8640161064SShuo Chen } 8740161064SShuo Chen 8840161064SShuo Chen assert(nwrote >= 0); 8940161064SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 9040161064SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 9140161064SShuo Chen if (!channel_->isWriting()) { 9240161064SShuo Chen channel_->enableWriting(); 9340161064SShuo Chen } 9440161064SShuo Chen } 9540161064SShuo Chen} 9640161064SShuo Chen 9740161064SShuo Chenvoid TcpConnection::shutdown() 9840161064SShuo Chen{ 9940161064SShuo Chen // FIXME: use compare and swap 10040161064SShuo Chen if (state_ == kConnected) 10140161064SShuo Chen { 10240161064SShuo Chen setState(kDisconnecting); 10340161064SShuo Chen // FIXME: shared_from_this()? 10440161064SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 10540161064SShuo Chen } 10640161064SShuo Chen} 10740161064SShuo Chen 10840161064SShuo Chenvoid TcpConnection::shutdownInLoop() 10940161064SShuo Chen{ 11040161064SShuo Chen loop_->assertInLoopThread(); 11140161064SShuo Chen if (!channel_->isWriting()) 11240161064SShuo Chen { 11340161064SShuo Chen // we are not writing 11440161064SShuo Chen socket_->shutdownWrite(); 11540161064SShuo Chen } 11640161064SShuo Chen} 11740161064SShuo Chen 11840161064SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on) 11940161064SShuo Chen{ 12040161064SShuo Chen socket_->setTcpNoDelay(on); 12140161064SShuo Chen} 12240161064SShuo Chen 12340161064SShuo Chenvoid TcpConnection::connectEstablished() 12440161064SShuo Chen{ 12540161064SShuo Chen loop_->assertInLoopThread(); 12640161064SShuo Chen assert(state_ == kConnecting); 12740161064SShuo Chen setState(kConnected); 12840161064SShuo Chen channel_->enableReading(); 12940161064SShuo Chen connectionCallback_(shared_from_this()); 13040161064SShuo Chen} 13140161064SShuo Chen 13240161064SShuo Chenvoid TcpConnection::connectDestroyed() 13340161064SShuo Chen{ 13440161064SShuo Chen loop_->assertInLoopThread(); 13540161064SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 13640161064SShuo Chen setState(kDisconnected); 13740161064SShuo Chen channel_->disableAll(); 13840161064SShuo Chen connectionCallback_(shared_from_this()); 13940161064SShuo Chen 14040161064SShuo Chen loop_->removeChannel(get_pointer(channel_)); 14140161064SShuo Chen} 14240161064SShuo Chen 14340161064SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 14440161064SShuo Chen{ 14540161064SShuo Chen int savedErrno = 0; 14640161064SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 14740161064SShuo Chen if (n > 0) { 14840161064SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 14940161064SShuo Chen } else if (n == 0) { 15040161064SShuo Chen handleClose(); 15140161064SShuo Chen } else { 15240161064SShuo Chen errno = savedErrno; 15340161064SShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 15440161064SShuo Chen handleError(); 15540161064SShuo Chen } 15640161064SShuo Chen} 15740161064SShuo Chen 15840161064SShuo Chenvoid TcpConnection::handleWrite() 15940161064SShuo Chen{ 16040161064SShuo Chen loop_->assertInLoopThread(); 16140161064SShuo Chen if (channel_->isWriting()) { 16240161064SShuo Chen ssize_t n = ::write(channel_->fd(), 16340161064SShuo Chen outputBuffer_.peek(), 16440161064SShuo Chen outputBuffer_.readableBytes()); 16540161064SShuo Chen if (n > 0) { 16640161064SShuo Chen outputBuffer_.retrieve(n); 16740161064SShuo Chen if (outputBuffer_.readableBytes() == 0) { 16840161064SShuo Chen channel_->disableWriting(); 16940161064SShuo Chen if (writeCompleteCallback_) { 17040161064SShuo Chen loop_->queueInLoop( 17140161064SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 17240161064SShuo Chen } 17340161064SShuo Chen if (state_ == kDisconnecting) { 17440161064SShuo Chen shutdownInLoop(); 17540161064SShuo Chen } 17640161064SShuo Chen } else { 17740161064SShuo Chen LOG_TRACE << "I am going to write more data"; 17840161064SShuo Chen } 17940161064SShuo Chen } else { 18040161064SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 18140161064SShuo Chen } 18240161064SShuo Chen } else { 18340161064SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 18440161064SShuo Chen } 18540161064SShuo Chen} 18640161064SShuo Chen 18740161064SShuo Chenvoid TcpConnection::handleClose() 18840161064SShuo Chen{ 18940161064SShuo Chen loop_->assertInLoopThread(); 19040161064SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 19140161064SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 19240161064SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 19340161064SShuo Chen channel_->disableAll(); 19440161064SShuo Chen // must be the last line 19540161064SShuo Chen closeCallback_(shared_from_this()); 19640161064SShuo Chen} 19740161064SShuo Chen 19840161064SShuo Chenvoid TcpConnection::handleError() 19940161064SShuo Chen{ 20040161064SShuo Chen int err = sockets::getSocketError(channel_->fd()); 20140161064SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 20240161064SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 20340161064SShuo Chen} 204