TcpConnection.cc revision 129fe122
12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/ 22a18e699SShuo Chen// 32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license 42a18e699SShuo Chen// that can be found in the License file. 52a18e699SShuo Chen// 62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 72a18e699SShuo Chen 82a18e699SShuo Chen#include "TcpConnection.h" 92a18e699SShuo Chen 102a18e699SShuo Chen#include "logging/Logging.h" 112a18e699SShuo Chen#include "Channel.h" 122a18e699SShuo Chen#include "EventLoop.h" 132a18e699SShuo Chen#include "Socket.h" 142a18e699SShuo Chen#include "SocketsOps.h" 152a18e699SShuo Chen 162a18e699SShuo Chen#include <boost/bind.hpp> 172a18e699SShuo Chen 182a18e699SShuo Chen#include <errno.h> 192a18e699SShuo Chen#include <stdio.h> 202a18e699SShuo Chen 212a18e699SShuo Chenusing namespace muduo; 222a18e699SShuo Chen 232a18e699SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 242a18e699SShuo Chen const std::string& nameArg, 252a18e699SShuo Chen int sockfd, 262a18e699SShuo Chen const InetAddress& localAddr, 272a18e699SShuo Chen const InetAddress& peerAddr) 282a18e699SShuo Chen : loop_(CHECK_NOTNULL(loop)), 292a18e699SShuo Chen name_(nameArg), 302a18e699SShuo Chen state_(kConnecting), 312a18e699SShuo Chen socket_(new Socket(sockfd)), 322a18e699SShuo Chen channel_(new Channel(loop, sockfd)), 332a18e699SShuo Chen localAddr_(localAddr), 342a18e699SShuo Chen peerAddr_(peerAddr) 352a18e699SShuo Chen{ 362a18e699SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 372a18e699SShuo Chen << " fd=" << sockfd; 382a18e699SShuo Chen channel_->setReadCallback( 392a18e699SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 402a18e699SShuo Chen channel_->setWriteCallback( 412a18e699SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 422a18e699SShuo Chen channel_->setCloseCallback( 432a18e699SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 442a18e699SShuo Chen channel_->setErrorCallback( 452a18e699SShuo Chen boost::bind(&TcpConnection::handleError, this)); 462a18e699SShuo Chen} 472a18e699SShuo Chen 482a18e699SShuo ChenTcpConnection::~TcpConnection() 492a18e699SShuo Chen{ 502a18e699SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 512a18e699SShuo Chen << " fd=" << channel_->fd(); 522a18e699SShuo Chen} 532a18e699SShuo Chen 54129fe122SShuo Chenvoid TcpConnection::send(const std::string& message) 55129fe122SShuo Chen{ 56129fe122SShuo Chen if (state_ == kConnected) { 57129fe122SShuo Chen if (loop_->isInLoopThread()) { 58129fe122SShuo Chen sendInLoop(message); 59129fe122SShuo Chen } else { 60129fe122SShuo Chen loop_->runInLoop( 61129fe122SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62129fe122SShuo Chen } 63129fe122SShuo Chen } 64129fe122SShuo Chen} 65129fe122SShuo Chen 66129fe122SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67129fe122SShuo Chen{ 68129fe122SShuo Chen loop_->assertInLoopThread(); 69129fe122SShuo Chen ssize_t nwrote = 0; 70129fe122SShuo Chen // if no thing in output queue, try writing directly 71129fe122SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72129fe122SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73129fe122SShuo Chen if (nwrote >= 0) { 74129fe122SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75129fe122SShuo Chen LOG_TRACE << "I am going to write more data"; 76129fe122SShuo Chen } 77129fe122SShuo Chen } else { 78129fe122SShuo Chen nwrote = 0; 79129fe122SShuo Chen if (errno != EWOULDBLOCK) { 80129fe122SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 81129fe122SShuo Chen } 82129fe122SShuo Chen } 83129fe122SShuo Chen } 84129fe122SShuo Chen 85129fe122SShuo Chen assert(nwrote >= 0); 86129fe122SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 87129fe122SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 88129fe122SShuo Chen if (!channel_->isWriting()) { 89129fe122SShuo Chen channel_->enableWriting(); 90129fe122SShuo Chen } 91129fe122SShuo Chen } 92129fe122SShuo Chen} 93129fe122SShuo Chen 94129fe122SShuo Chen 95129fe122SShuo Chenvoid TcpConnection::shutdown() 96129fe122SShuo Chen{ 97129fe122SShuo Chen // FIXME: use compare and swap 98129fe122SShuo Chen if (state_ == kConnected) 99129fe122SShuo Chen { 100129fe122SShuo Chen setState(kDisconnecting); 101129fe122SShuo Chen // FIXME: shared_from_this()? 102129fe122SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 103129fe122SShuo Chen } 104129fe122SShuo Chen} 105129fe122SShuo Chen 106129fe122SShuo Chenvoid TcpConnection::shutdownInLoop() 107129fe122SShuo Chen{ 108129fe122SShuo Chen loop_->assertInLoopThread(); 109129fe122SShuo Chen if (!channel_->isWriting()) 110129fe122SShuo Chen { 111129fe122SShuo Chen // we are not writing 112129fe122SShuo Chen socket_->shutdownWrite(); 113129fe122SShuo Chen } 114129fe122SShuo Chen} 115129fe122SShuo Chen 1162a18e699SShuo Chenvoid TcpConnection::connectEstablished() 1172a18e699SShuo Chen{ 1182a18e699SShuo Chen loop_->assertInLoopThread(); 1192a18e699SShuo Chen assert(state_ == kConnecting); 1202a18e699SShuo Chen setState(kConnected); 1212a18e699SShuo Chen channel_->enableReading(); 1222a18e699SShuo Chen connectionCallback_(shared_from_this()); 1232a18e699SShuo Chen} 1242a18e699SShuo Chen 1252a18e699SShuo Chenvoid TcpConnection::connectDestroyed() 1262a18e699SShuo Chen{ 1272a18e699SShuo Chen loop_->assertInLoopThread(); 128129fe122SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1292a18e699SShuo Chen setState(kDisconnected); 1302a18e699SShuo Chen channel_->disableAll(); 1312a18e699SShuo Chen connectionCallback_(shared_from_this()); 1322a18e699SShuo Chen 1332a18e699SShuo Chen loop_->removeChannel(get_pointer(channel_)); 1342a18e699SShuo Chen} 1352a18e699SShuo Chen 1362a18e699SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 1372a18e699SShuo Chen{ 1382a18e699SShuo Chen int savedErrno = 0; 1392a18e699SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 1402a18e699SShuo Chen if (n > 0) { 1412a18e699SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 1422a18e699SShuo Chen } else if (n == 0) { 1432a18e699SShuo Chen handleClose(); 1442a18e699SShuo Chen } else { 1452a18e699SShuo Chen // FIXME: check savedErrno 1462a18e699SShuo Chen handleError(); 1472a18e699SShuo Chen } 1482a18e699SShuo Chen} 1492a18e699SShuo Chen 1502a18e699SShuo Chenvoid TcpConnection::handleWrite() 1512a18e699SShuo Chen{ 152129fe122SShuo Chen loop_->assertInLoopThread(); 153129fe122SShuo Chen if (channel_->isWriting()) { 154129fe122SShuo Chen ssize_t n = ::write(channel_->fd(), 155129fe122SShuo Chen outputBuffer_.peek(), 156129fe122SShuo Chen outputBuffer_.readableBytes()); 157129fe122SShuo Chen if (n > 0) { 158129fe122SShuo Chen outputBuffer_.retrieve(n); 159129fe122SShuo Chen if (outputBuffer_.readableBytes() == 0) { 160129fe122SShuo Chen channel_->disableWriting(); 161129fe122SShuo Chen if (state_ == kDisconnecting) { 162129fe122SShuo Chen shutdownInLoop(); 163129fe122SShuo Chen } 164129fe122SShuo Chen } else { 165129fe122SShuo Chen LOG_TRACE << "I am going to write more data"; 166129fe122SShuo Chen } 167129fe122SShuo Chen } else { 168129fe122SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 169129fe122SShuo Chen abort(); // FIXME 170129fe122SShuo Chen } 171129fe122SShuo Chen } else { 172129fe122SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 173129fe122SShuo Chen } 1742a18e699SShuo Chen} 1752a18e699SShuo Chen 1762a18e699SShuo Chenvoid TcpConnection::handleClose() 1772a18e699SShuo Chen{ 1782a18e699SShuo Chen loop_->assertInLoopThread(); 1792a18e699SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 180129fe122SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1812a18e699SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 1822a18e699SShuo Chen channel_->disableAll(); 1832a18e699SShuo Chen // must be the last line 1842a18e699SShuo Chen closeCallback_(shared_from_this()); 1852a18e699SShuo Chen} 1862a18e699SShuo Chen 1872a18e699SShuo Chenvoid TcpConnection::handleError() 1882a18e699SShuo Chen{ 1892a18e699SShuo Chen int err = sockets::getSocketError(channel_->fd()); 1902a18e699SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 1912a18e699SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 1922a18e699SShuo Chen} 193