12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/ 22a18e699SShuo Chen// 32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license 42a18e699SShuo Chen// that can be found in the License file. 52a18e699SShuo Chen// 62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 72a18e699SShuo Chen 82a18e699SShuo Chen#include "TcpConnection.h" 92a18e699SShuo Chen 102a18e699SShuo Chen#include "logging/Logging.h" 112a18e699SShuo Chen#include "Channel.h" 122a18e699SShuo Chen#include "EventLoop.h" 132a18e699SShuo Chen#include "Socket.h" 142a18e699SShuo Chen#include "SocketsOps.h" 152a18e699SShuo Chen 162a18e699SShuo Chen#include <boost/bind.hpp> 172a18e699SShuo Chen 182a18e699SShuo Chen#include <errno.h> 192a18e699SShuo Chen#include <stdio.h> 202a18e699SShuo Chen 212a18e699SShuo Chenusing namespace muduo; 222a18e699SShuo Chen 232a18e699SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 242a18e699SShuo Chen const std::string& nameArg, 252a18e699SShuo Chen int sockfd, 262a18e699SShuo Chen const InetAddress& localAddr, 272a18e699SShuo Chen const InetAddress& peerAddr) 282a18e699SShuo Chen : loop_(CHECK_NOTNULL(loop)), 292a18e699SShuo Chen name_(nameArg), 302a18e699SShuo Chen state_(kConnecting), 312a18e699SShuo Chen socket_(new Socket(sockfd)), 322a18e699SShuo Chen channel_(new Channel(loop, sockfd)), 332a18e699SShuo Chen localAddr_(localAddr), 342a18e699SShuo Chen peerAddr_(peerAddr) 352a18e699SShuo Chen{ 362a18e699SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 372a18e699SShuo Chen << " fd=" << sockfd; 382a18e699SShuo Chen channel_->setReadCallback( 392a18e699SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 402a18e699SShuo Chen channel_->setWriteCallback( 412a18e699SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 422a18e699SShuo Chen channel_->setCloseCallback( 432a18e699SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 442a18e699SShuo Chen channel_->setErrorCallback( 452a18e699SShuo Chen boost::bind(&TcpConnection::handleError, this)); 462a18e699SShuo Chen} 472a18e699SShuo Chen 482a18e699SShuo ChenTcpConnection::~TcpConnection() 492a18e699SShuo Chen{ 502a18e699SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 512a18e699SShuo Chen << " fd=" << channel_->fd(); 522a18e699SShuo Chen} 532a18e699SShuo Chen 54129fe122SShuo Chenvoid TcpConnection::send(const std::string& message) 55129fe122SShuo Chen{ 56129fe122SShuo Chen if (state_ == kConnected) { 57129fe122SShuo Chen if (loop_->isInLoopThread()) { 58129fe122SShuo Chen sendInLoop(message); 59129fe122SShuo Chen } else { 60129fe122SShuo Chen loop_->runInLoop( 61129fe122SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62129fe122SShuo Chen } 63129fe122SShuo Chen } 64129fe122SShuo Chen} 65129fe122SShuo Chen 66129fe122SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67129fe122SShuo Chen{ 68129fe122SShuo Chen loop_->assertInLoopThread(); 69129fe122SShuo Chen ssize_t nwrote = 0; 70129fe122SShuo Chen // if no thing in output queue, try writing directly 71129fe122SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72129fe122SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73129fe122SShuo Chen if (nwrote >= 0) { 74129fe122SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75129fe122SShuo Chen LOG_TRACE << "I am going to write more data"; 76129fe122SShuo Chen } 77129fe122SShuo Chen } else { 78129fe122SShuo Chen nwrote = 0; 79129fe122SShuo Chen if (errno != EWOULDBLOCK) { 80129fe122SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 81129fe122SShuo Chen } 82129fe122SShuo Chen } 83129fe122SShuo Chen } 84129fe122SShuo Chen 85129fe122SShuo Chen assert(nwrote >= 0); 86129fe122SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 87129fe122SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 88129fe122SShuo Chen if (!channel_->isWriting()) { 89129fe122SShuo Chen channel_->enableWriting(); 90129fe122SShuo Chen } 91129fe122SShuo Chen } 92129fe122SShuo Chen} 93129fe122SShuo Chen 94129fe122SShuo Chenvoid TcpConnection::shutdown() 95129fe122SShuo Chen{ 96129fe122SShuo Chen // FIXME: use compare and swap 97129fe122SShuo Chen if (state_ == kConnected) 98129fe122SShuo Chen { 99129fe122SShuo Chen setState(kDisconnecting); 100129fe122SShuo Chen // FIXME: shared_from_this()? 101129fe122SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 102129fe122SShuo Chen } 103129fe122SShuo Chen} 104129fe122SShuo Chen 105129fe122SShuo Chenvoid TcpConnection::shutdownInLoop() 106129fe122SShuo Chen{ 107129fe122SShuo Chen loop_->assertInLoopThread(); 108129fe122SShuo Chen if (!channel_->isWriting()) 109129fe122SShuo Chen { 110129fe122SShuo Chen // we are not writing 111129fe122SShuo Chen socket_->shutdownWrite(); 112129fe122SShuo Chen } 113129fe122SShuo Chen} 114129fe122SShuo Chen 1152a18e699SShuo Chenvoid TcpConnection::connectEstablished() 1162a18e699SShuo Chen{ 1172a18e699SShuo Chen loop_->assertInLoopThread(); 1182a18e699SShuo Chen assert(state_ == kConnecting); 1192a18e699SShuo Chen setState(kConnected); 1202a18e699SShuo Chen channel_->enableReading(); 1212a18e699SShuo Chen connectionCallback_(shared_from_this()); 1222a18e699SShuo Chen} 1232a18e699SShuo Chen 1242a18e699SShuo Chenvoid TcpConnection::connectDestroyed() 1252a18e699SShuo Chen{ 1262a18e699SShuo Chen loop_->assertInLoopThread(); 127129fe122SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1282a18e699SShuo Chen setState(kDisconnected); 1292a18e699SShuo Chen channel_->disableAll(); 1302a18e699SShuo Chen connectionCallback_(shared_from_this()); 1312a18e699SShuo Chen 1322a18e699SShuo Chen loop_->removeChannel(get_pointer(channel_)); 1332a18e699SShuo Chen} 1342a18e699SShuo Chen 1352a18e699SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 1362a18e699SShuo Chen{ 1372a18e699SShuo Chen int savedErrno = 0; 1382a18e699SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 1392a18e699SShuo Chen if (n > 0) { 1402a18e699SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 1412a18e699SShuo Chen } else if (n == 0) { 1422a18e699SShuo Chen handleClose(); 1432a18e699SShuo Chen } else { 1440dd528a5SShuo Chen errno = savedErrno; 1450dd528a5SShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 1462a18e699SShuo Chen handleError(); 1472a18e699SShuo Chen } 1482a18e699SShuo Chen} 1492a18e699SShuo Chen 1502a18e699SShuo Chenvoid TcpConnection::handleWrite() 1512a18e699SShuo Chen{ 152129fe122SShuo Chen loop_->assertInLoopThread(); 153129fe122SShuo Chen if (channel_->isWriting()) { 154129fe122SShuo Chen ssize_t n = ::write(channel_->fd(), 155129fe122SShuo Chen outputBuffer_.peek(), 156129fe122SShuo Chen outputBuffer_.readableBytes()); 157129fe122SShuo Chen if (n > 0) { 158129fe122SShuo Chen outputBuffer_.retrieve(n); 159129fe122SShuo Chen if (outputBuffer_.readableBytes() == 0) { 160129fe122SShuo Chen channel_->disableWriting(); 161129fe122SShuo Chen if (state_ == kDisconnecting) { 162129fe122SShuo Chen shutdownInLoop(); 163129fe122SShuo Chen } 164129fe122SShuo Chen } else { 165129fe122SShuo Chen LOG_TRACE << "I am going to write more data"; 166129fe122SShuo Chen } 167129fe122SShuo Chen } else { 168129fe122SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 169129fe122SShuo Chen } 170129fe122SShuo Chen } else { 171129fe122SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 172129fe122SShuo Chen } 1732a18e699SShuo Chen} 1742a18e699SShuo Chen 1752a18e699SShuo Chenvoid TcpConnection::handleClose() 1762a18e699SShuo Chen{ 1772a18e699SShuo Chen loop_->assertInLoopThread(); 1782a18e699SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 179129fe122SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1802a18e699SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 1812a18e699SShuo Chen channel_->disableAll(); 1822a18e699SShuo Chen // must be the last line 1832a18e699SShuo Chen closeCallback_(shared_from_this()); 1842a18e699SShuo Chen} 1852a18e699SShuo Chen 1862a18e699SShuo Chenvoid TcpConnection::handleError() 1872a18e699SShuo Chen{ 1882a18e699SShuo Chen int err = sockets::getSocketError(channel_->fd()); 1892a18e699SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 1902a18e699SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 1912a18e699SShuo Chen} 192