1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2354280cfSShuo Chen// 3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license 4354280cfSShuo Chen// that can be found in the License file. 5354280cfSShuo Chen// 6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7354280cfSShuo Chen 8354280cfSShuo Chen#include "TcpConnection.h" 9354280cfSShuo Chen 10354280cfSShuo Chen#include "logging/Logging.h" 11354280cfSShuo Chen#include "Channel.h" 12354280cfSShuo Chen#include "EventLoop.h" 13354280cfSShuo Chen#include "Socket.h" 14354280cfSShuo Chen#include "SocketsOps.h" 15354280cfSShuo Chen 16354280cfSShuo Chen#include <boost/bind.hpp> 17354280cfSShuo Chen 18354280cfSShuo Chen#include <errno.h> 19354280cfSShuo Chen#include <stdio.h> 20354280cfSShuo Chen 21354280cfSShuo Chenusing namespace muduo; 22354280cfSShuo Chen 23354280cfSShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 24354280cfSShuo Chen const std::string& nameArg, 25354280cfSShuo Chen int sockfd, 26354280cfSShuo Chen const InetAddress& localAddr, 27354280cfSShuo Chen const InetAddress& peerAddr) 28354280cfSShuo Chen : loop_(CHECK_NOTNULL(loop)), 29354280cfSShuo Chen name_(nameArg), 30354280cfSShuo Chen state_(kConnecting), 31354280cfSShuo Chen socket_(new Socket(sockfd)), 32354280cfSShuo Chen channel_(new Channel(loop, sockfd)), 33354280cfSShuo Chen localAddr_(localAddr), 34354280cfSShuo Chen peerAddr_(peerAddr) 35354280cfSShuo Chen{ 36354280cfSShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37354280cfSShuo Chen << " fd=" << sockfd; 38354280cfSShuo Chen channel_->setReadCallback( 39354280cfSShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 40354280cfSShuo Chen channel_->setWriteCallback( 41354280cfSShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 42354280cfSShuo Chen channel_->setCloseCallback( 43354280cfSShuo Chen boost::bind(&TcpConnection::handleClose, this)); 44354280cfSShuo Chen channel_->setErrorCallback( 45354280cfSShuo Chen boost::bind(&TcpConnection::handleError, this)); 46354280cfSShuo Chen} 47354280cfSShuo Chen 48354280cfSShuo ChenTcpConnection::~TcpConnection() 49354280cfSShuo Chen{ 50354280cfSShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51354280cfSShuo Chen << " fd=" << channel_->fd(); 52354280cfSShuo Chen} 53354280cfSShuo Chen 54354280cfSShuo Chenvoid TcpConnection::send(const std::string& message) 55354280cfSShuo Chen{ 56354280cfSShuo Chen if (state_ == kConnected) { 57354280cfSShuo Chen if (loop_->isInLoopThread()) { 58354280cfSShuo Chen sendInLoop(message); 59354280cfSShuo Chen } else { 60354280cfSShuo Chen loop_->runInLoop( 61354280cfSShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62354280cfSShuo Chen } 63354280cfSShuo Chen } 64354280cfSShuo Chen} 65354280cfSShuo Chen 66354280cfSShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67354280cfSShuo Chen{ 68354280cfSShuo Chen loop_->assertInLoopThread(); 69354280cfSShuo Chen ssize_t nwrote = 0; 70354280cfSShuo Chen // if no thing in output queue, try writing directly 71354280cfSShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72354280cfSShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73354280cfSShuo Chen if (nwrote >= 0) { 74354280cfSShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75354280cfSShuo Chen LOG_TRACE << "I am going to write more data"; 76354280cfSShuo Chen } else if (writeCompleteCallback_) { 77354280cfSShuo Chen loop_->queueInLoop( 78354280cfSShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 79354280cfSShuo Chen } 80354280cfSShuo Chen } else { 81354280cfSShuo Chen nwrote = 0; 82354280cfSShuo Chen if (errno != EWOULDBLOCK) { 83354280cfSShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 84354280cfSShuo Chen } 85354280cfSShuo Chen } 86354280cfSShuo Chen } 87354280cfSShuo Chen 88354280cfSShuo Chen assert(nwrote >= 0); 89354280cfSShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 90354280cfSShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 91354280cfSShuo Chen if (!channel_->isWriting()) { 92354280cfSShuo Chen channel_->enableWriting(); 93354280cfSShuo Chen } 94354280cfSShuo Chen } 95354280cfSShuo Chen} 96354280cfSShuo Chen 97354280cfSShuo Chenvoid TcpConnection::shutdown() 98354280cfSShuo Chen{ 99354280cfSShuo Chen // FIXME: use compare and swap 100354280cfSShuo Chen if (state_ == kConnected) 101354280cfSShuo Chen { 102354280cfSShuo Chen setState(kDisconnecting); 103354280cfSShuo Chen // FIXME: shared_from_this()? 104354280cfSShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 105354280cfSShuo Chen } 106354280cfSShuo Chen} 107354280cfSShuo Chen 108354280cfSShuo Chenvoid TcpConnection::shutdownInLoop() 109354280cfSShuo Chen{ 110354280cfSShuo Chen loop_->assertInLoopThread(); 111354280cfSShuo Chen if (!channel_->isWriting()) 112354280cfSShuo Chen { 113354280cfSShuo Chen // we are not writing 114354280cfSShuo Chen socket_->shutdownWrite(); 115354280cfSShuo Chen } 116354280cfSShuo Chen} 117354280cfSShuo Chen 118354280cfSShuo Chenvoid TcpConnection::setTcpNoDelay(bool on) 119354280cfSShuo Chen{ 120354280cfSShuo Chen socket_->setTcpNoDelay(on); 121354280cfSShuo Chen} 122354280cfSShuo Chen 123354280cfSShuo Chenvoid TcpConnection::connectEstablished() 124354280cfSShuo Chen{ 125354280cfSShuo Chen loop_->assertInLoopThread(); 126354280cfSShuo Chen assert(state_ == kConnecting); 127354280cfSShuo Chen setState(kConnected); 128354280cfSShuo Chen channel_->enableReading(); 129354280cfSShuo Chen connectionCallback_(shared_from_this()); 130354280cfSShuo Chen} 131354280cfSShuo Chen 132354280cfSShuo Chenvoid TcpConnection::connectDestroyed() 133354280cfSShuo Chen{ 134354280cfSShuo Chen loop_->assertInLoopThread(); 135354280cfSShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 136354280cfSShuo Chen setState(kDisconnected); 137354280cfSShuo Chen channel_->disableAll(); 138354280cfSShuo Chen connectionCallback_(shared_from_this()); 139354280cfSShuo Chen 140354280cfSShuo Chen loop_->removeChannel(get_pointer(channel_)); 141354280cfSShuo Chen} 142354280cfSShuo Chen 143354280cfSShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 144354280cfSShuo Chen{ 145354280cfSShuo Chen int savedErrno = 0; 146354280cfSShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 147354280cfSShuo Chen if (n > 0) { 148354280cfSShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 149354280cfSShuo Chen } else if (n == 0) { 150354280cfSShuo Chen handleClose(); 151354280cfSShuo Chen } else { 152354280cfSShuo Chen errno = savedErrno; 153354280cfSShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 154354280cfSShuo Chen handleError(); 155354280cfSShuo Chen } 156354280cfSShuo Chen} 157354280cfSShuo Chen 158354280cfSShuo Chenvoid TcpConnection::handleWrite() 159354280cfSShuo Chen{ 160354280cfSShuo Chen loop_->assertInLoopThread(); 161354280cfSShuo Chen if (channel_->isWriting()) { 162354280cfSShuo Chen ssize_t n = ::write(channel_->fd(), 163354280cfSShuo Chen outputBuffer_.peek(), 164354280cfSShuo Chen outputBuffer_.readableBytes()); 165354280cfSShuo Chen if (n > 0) { 166354280cfSShuo Chen outputBuffer_.retrieve(n); 167354280cfSShuo Chen if (outputBuffer_.readableBytes() == 0) { 168354280cfSShuo Chen channel_->disableWriting(); 169354280cfSShuo Chen if (writeCompleteCallback_) { 170354280cfSShuo Chen loop_->queueInLoop( 171354280cfSShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 172354280cfSShuo Chen } 173354280cfSShuo Chen if (state_ == kDisconnecting) { 174354280cfSShuo Chen shutdownInLoop(); 175354280cfSShuo Chen } 176354280cfSShuo Chen } else { 177354280cfSShuo Chen LOG_TRACE << "I am going to write more data"; 178354280cfSShuo Chen } 179354280cfSShuo Chen } else { 180354280cfSShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 181354280cfSShuo Chen } 182354280cfSShuo Chen } else { 183354280cfSShuo Chen LOG_TRACE << "Connection is down, no more writing"; 184354280cfSShuo Chen } 185354280cfSShuo Chen} 186354280cfSShuo Chen 187354280cfSShuo Chenvoid TcpConnection::handleClose() 188354280cfSShuo Chen{ 189354280cfSShuo Chen loop_->assertInLoopThread(); 190354280cfSShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 191354280cfSShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 192354280cfSShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 193354280cfSShuo Chen channel_->disableAll(); 194354280cfSShuo Chen // must be the last line 195354280cfSShuo Chen closeCallback_(shared_from_this()); 196354280cfSShuo Chen} 197354280cfSShuo Chen 198354280cfSShuo Chenvoid TcpConnection::handleError() 199354280cfSShuo Chen{ 200354280cfSShuo Chen int err = sockets::getSocketError(channel_->fd()); 201354280cfSShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 202354280cfSShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 203354280cfSShuo Chen} 204