1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2b37003a7SShuo Chen// 3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license 4b37003a7SShuo Chen// that can be found in the License file. 5b37003a7SShuo Chen// 6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7b37003a7SShuo Chen 8b37003a7SShuo Chen#include "TcpConnection.h" 9b37003a7SShuo Chen 10b37003a7SShuo Chen#include "logging/Logging.h" 11b37003a7SShuo Chen#include "Channel.h" 12b37003a7SShuo Chen#include "EventLoop.h" 13b37003a7SShuo Chen#include "Socket.h" 14b37003a7SShuo Chen#include "SocketsOps.h" 15b37003a7SShuo Chen 16b37003a7SShuo Chen#include <boost/bind.hpp> 17b37003a7SShuo Chen 18b37003a7SShuo Chen#include <errno.h> 19b37003a7SShuo Chen#include <stdio.h> 20b37003a7SShuo Chen 21b37003a7SShuo Chenusing namespace muduo; 22b37003a7SShuo Chen 23b37003a7SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 24b37003a7SShuo Chen const std::string& nameArg, 25b37003a7SShuo Chen int sockfd, 26b37003a7SShuo Chen const InetAddress& localAddr, 27b37003a7SShuo Chen const InetAddress& peerAddr) 28b37003a7SShuo Chen : loop_(CHECK_NOTNULL(loop)), 29b37003a7SShuo Chen name_(nameArg), 30b37003a7SShuo Chen state_(kConnecting), 31b37003a7SShuo Chen socket_(new Socket(sockfd)), 32b37003a7SShuo Chen channel_(new Channel(loop, sockfd)), 33b37003a7SShuo Chen localAddr_(localAddr), 34b37003a7SShuo Chen peerAddr_(peerAddr) 35b37003a7SShuo Chen{ 36b37003a7SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37b37003a7SShuo Chen << " fd=" << sockfd; 38b37003a7SShuo Chen channel_->setReadCallback( 39b37003a7SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 40b37003a7SShuo Chen channel_->setWriteCallback( 41b37003a7SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 42b37003a7SShuo Chen channel_->setCloseCallback( 43b37003a7SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 44b37003a7SShuo Chen channel_->setErrorCallback( 45b37003a7SShuo Chen boost::bind(&TcpConnection::handleError, this)); 46b37003a7SShuo Chen} 47b37003a7SShuo Chen 48b37003a7SShuo ChenTcpConnection::~TcpConnection() 49b37003a7SShuo Chen{ 50b37003a7SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51b37003a7SShuo Chen << " fd=" << channel_->fd(); 52b37003a7SShuo Chen} 53b37003a7SShuo Chen 54b37003a7SShuo Chenvoid TcpConnection::send(const std::string& message) 55b37003a7SShuo Chen{ 56b37003a7SShuo Chen if (state_ == kConnected) { 57b37003a7SShuo Chen if (loop_->isInLoopThread()) { 58b37003a7SShuo Chen sendInLoop(message); 59b37003a7SShuo Chen } else { 60b37003a7SShuo Chen loop_->runInLoop( 61b37003a7SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62b37003a7SShuo Chen } 63b37003a7SShuo Chen } 64b37003a7SShuo Chen} 65b37003a7SShuo Chen 66b37003a7SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67b37003a7SShuo Chen{ 68b37003a7SShuo Chen loop_->assertInLoopThread(); 69b37003a7SShuo Chen ssize_t nwrote = 0; 70b37003a7SShuo Chen // if no thing in output queue, try writing directly 71b37003a7SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72b37003a7SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73b37003a7SShuo Chen if (nwrote >= 0) { 74b37003a7SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75b37003a7SShuo Chen LOG_TRACE << "I am going to write more data"; 7613e937ffSShuo Chen } else if (writeCompleteCallback_) { 7713e937ffSShuo Chen loop_->queueInLoop( 7813e937ffSShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 79b37003a7SShuo Chen } 80b37003a7SShuo Chen } else { 81b37003a7SShuo Chen nwrote = 0; 82b37003a7SShuo Chen if (errno != EWOULDBLOCK) { 83b37003a7SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 84b37003a7SShuo Chen } 85b37003a7SShuo Chen } 86b37003a7SShuo Chen } 87b37003a7SShuo Chen 88b37003a7SShuo Chen assert(nwrote >= 0); 89b37003a7SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 90b37003a7SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 91b37003a7SShuo Chen if (!channel_->isWriting()) { 92b37003a7SShuo Chen channel_->enableWriting(); 93b37003a7SShuo Chen } 94b37003a7SShuo Chen } 95b37003a7SShuo Chen} 96b37003a7SShuo Chen 97b37003a7SShuo Chenvoid TcpConnection::shutdown() 98b37003a7SShuo Chen{ 99b37003a7SShuo Chen // FIXME: use compare and swap 100b37003a7SShuo Chen if (state_ == kConnected) 101b37003a7SShuo Chen { 102b37003a7SShuo Chen setState(kDisconnecting); 103b37003a7SShuo Chen // FIXME: shared_from_this()? 104b37003a7SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 105b37003a7SShuo Chen } 106b37003a7SShuo Chen} 107b37003a7SShuo Chen 108b37003a7SShuo Chenvoid TcpConnection::shutdownInLoop() 109b37003a7SShuo Chen{ 110b37003a7SShuo Chen loop_->assertInLoopThread(); 111b37003a7SShuo Chen if (!channel_->isWriting()) 112b37003a7SShuo Chen { 113b37003a7SShuo Chen // we are not writing 114b37003a7SShuo Chen socket_->shutdownWrite(); 115b37003a7SShuo Chen } 116b37003a7SShuo Chen} 117b37003a7SShuo Chen 11813e937ffSShuo Chenvoid TcpConnection::setTcpNoDelay(bool on) 11913e937ffSShuo Chen{ 12013e937ffSShuo Chen socket_->setTcpNoDelay(on); 12113e937ffSShuo Chen} 12213e937ffSShuo Chen 123b37003a7SShuo Chenvoid TcpConnection::connectEstablished() 124b37003a7SShuo Chen{ 125b37003a7SShuo Chen loop_->assertInLoopThread(); 126b37003a7SShuo Chen assert(state_ == kConnecting); 127b37003a7SShuo Chen setState(kConnected); 128b37003a7SShuo Chen channel_->enableReading(); 129b37003a7SShuo Chen connectionCallback_(shared_from_this()); 130b37003a7SShuo Chen} 131b37003a7SShuo Chen 132b37003a7SShuo Chenvoid TcpConnection::connectDestroyed() 133b37003a7SShuo Chen{ 134b37003a7SShuo Chen loop_->assertInLoopThread(); 135b37003a7SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 136b37003a7SShuo Chen setState(kDisconnected); 137b37003a7SShuo Chen channel_->disableAll(); 138b37003a7SShuo Chen connectionCallback_(shared_from_this()); 139b37003a7SShuo Chen 140b37003a7SShuo Chen loop_->removeChannel(get_pointer(channel_)); 141b37003a7SShuo Chen} 142b37003a7SShuo Chen 143b37003a7SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 144b37003a7SShuo Chen{ 145b37003a7SShuo Chen int savedErrno = 0; 146b37003a7SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 147b37003a7SShuo Chen if (n > 0) { 148b37003a7SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 149b37003a7SShuo Chen } else if (n == 0) { 150b37003a7SShuo Chen handleClose(); 151b37003a7SShuo Chen } else { 1520dd528a5SShuo Chen errno = savedErrno; 1530dd528a5SShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 154b37003a7SShuo Chen handleError(); 155b37003a7SShuo Chen } 156b37003a7SShuo Chen} 157b37003a7SShuo Chen 158b37003a7SShuo Chenvoid TcpConnection::handleWrite() 159b37003a7SShuo Chen{ 160b37003a7SShuo Chen loop_->assertInLoopThread(); 161b37003a7SShuo Chen if (channel_->isWriting()) { 162b37003a7SShuo Chen ssize_t n = ::write(channel_->fd(), 163b37003a7SShuo Chen outputBuffer_.peek(), 164b37003a7SShuo Chen outputBuffer_.readableBytes()); 165b37003a7SShuo Chen if (n > 0) { 166b37003a7SShuo Chen outputBuffer_.retrieve(n); 167b37003a7SShuo Chen if (outputBuffer_.readableBytes() == 0) { 168b37003a7SShuo Chen channel_->disableWriting(); 16913e937ffSShuo Chen if (writeCompleteCallback_) { 17013e937ffSShuo Chen loop_->queueInLoop( 17113e937ffSShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 17213e937ffSShuo Chen } 173b37003a7SShuo Chen if (state_ == kDisconnecting) { 174b37003a7SShuo Chen shutdownInLoop(); 175b37003a7SShuo Chen } 176b37003a7SShuo Chen } else { 177b37003a7SShuo Chen LOG_TRACE << "I am going to write more data"; 178b37003a7SShuo Chen } 179b37003a7SShuo Chen } else { 180b37003a7SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 181b37003a7SShuo Chen } 182b37003a7SShuo Chen } else { 183b37003a7SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 184b37003a7SShuo Chen } 185b37003a7SShuo Chen} 186b37003a7SShuo Chen 187b37003a7SShuo Chenvoid TcpConnection::handleClose() 188b37003a7SShuo Chen{ 189b37003a7SShuo Chen loop_->assertInLoopThread(); 190b37003a7SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 191b37003a7SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 192b37003a7SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 193b37003a7SShuo Chen channel_->disableAll(); 194b37003a7SShuo Chen // must be the last line 195b37003a7SShuo Chen closeCallback_(shared_from_this()); 196b37003a7SShuo Chen} 197b37003a7SShuo Chen 198b37003a7SShuo Chenvoid TcpConnection::handleError() 199b37003a7SShuo Chen{ 200b37003a7SShuo Chen int err = sockets::getSocketError(channel_->fd()); 201b37003a7SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 202b37003a7SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 203b37003a7SShuo Chen} 204