TcpConnection.cc revision a1bde736
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2a1bde736SShuo Chen// 3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license 4a1bde736SShuo Chen// that can be found in the License file. 5a1bde736SShuo Chen// 6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7a1bde736SShuo Chen 8a1bde736SShuo Chen#include "TcpConnection.h" 9a1bde736SShuo Chen 10a1bde736SShuo Chen#include "logging/Logging.h" 11a1bde736SShuo Chen#include "Channel.h" 12a1bde736SShuo Chen#include "EventLoop.h" 13a1bde736SShuo Chen#include "Socket.h" 14a1bde736SShuo Chen#include "SocketsOps.h" 15a1bde736SShuo Chen 16a1bde736SShuo Chen#include <boost/bind.hpp> 17a1bde736SShuo Chen 18a1bde736SShuo Chen#include <errno.h> 19a1bde736SShuo Chen#include <stdio.h> 20a1bde736SShuo Chen 21a1bde736SShuo Chenusing namespace muduo; 22a1bde736SShuo Chen 23a1bde736SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 24a1bde736SShuo Chen const std::string& nameArg, 25a1bde736SShuo Chen int sockfd, 26a1bde736SShuo Chen const InetAddress& localAddr, 27a1bde736SShuo Chen const InetAddress& peerAddr) 28a1bde736SShuo Chen : loop_(CHECK_NOTNULL(loop)), 29a1bde736SShuo Chen name_(nameArg), 30a1bde736SShuo Chen state_(kConnecting), 31a1bde736SShuo Chen socket_(new Socket(sockfd)), 32a1bde736SShuo Chen channel_(new Channel(loop, sockfd)), 33a1bde736SShuo Chen localAddr_(localAddr), 34a1bde736SShuo Chen peerAddr_(peerAddr) 35a1bde736SShuo Chen{ 36a1bde736SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37a1bde736SShuo Chen << " fd=" << sockfd; 38a1bde736SShuo Chen channel_->setReadCallback( 39a1bde736SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 40a1bde736SShuo Chen channel_->setWriteCallback( 41a1bde736SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 42a1bde736SShuo Chen channel_->setCloseCallback( 43a1bde736SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 44a1bde736SShuo Chen channel_->setErrorCallback( 45a1bde736SShuo Chen boost::bind(&TcpConnection::handleError, this)); 46a1bde736SShuo Chen} 47a1bde736SShuo Chen 48a1bde736SShuo ChenTcpConnection::~TcpConnection() 49a1bde736SShuo Chen{ 50a1bde736SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51a1bde736SShuo Chen << " fd=" << channel_->fd(); 52a1bde736SShuo Chen} 53a1bde736SShuo Chen 54a1bde736SShuo Chenvoid TcpConnection::send(const std::string& message) 55a1bde736SShuo Chen{ 56a1bde736SShuo Chen if (state_ == kConnected) { 57a1bde736SShuo Chen if (loop_->isInLoopThread()) { 58a1bde736SShuo Chen sendInLoop(message); 59a1bde736SShuo Chen } else { 60a1bde736SShuo Chen loop_->runInLoop( 61a1bde736SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62a1bde736SShuo Chen } 63a1bde736SShuo Chen } 64a1bde736SShuo Chen} 65a1bde736SShuo Chen 66a1bde736SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67a1bde736SShuo Chen{ 68a1bde736SShuo Chen loop_->assertInLoopThread(); 69a1bde736SShuo Chen ssize_t nwrote = 0; 70a1bde736SShuo Chen // if no thing in output queue, try writing directly 71a1bde736SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72a1bde736SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73a1bde736SShuo Chen if (nwrote >= 0) { 74a1bde736SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75a1bde736SShuo Chen LOG_TRACE << "I am going to write more data"; 76a1bde736SShuo Chen } else if (writeCompleteCallback_) { 77a1bde736SShuo Chen loop_->queueInLoop( 78a1bde736SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 79a1bde736SShuo Chen } 80a1bde736SShuo Chen } else { 81a1bde736SShuo Chen nwrote = 0; 82a1bde736SShuo Chen if (errno != EWOULDBLOCK) { 83a1bde736SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 84a1bde736SShuo Chen } 85a1bde736SShuo Chen } 86a1bde736SShuo Chen } 87a1bde736SShuo Chen 88a1bde736SShuo Chen assert(nwrote >= 0); 89a1bde736SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 90a1bde736SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 91a1bde736SShuo Chen if (!channel_->isWriting()) { 92a1bde736SShuo Chen channel_->enableWriting(); 93a1bde736SShuo Chen } 94a1bde736SShuo Chen } 95a1bde736SShuo Chen} 96a1bde736SShuo Chen 97a1bde736SShuo Chenvoid TcpConnection::shutdown() 98a1bde736SShuo Chen{ 99a1bde736SShuo Chen // FIXME: use compare and swap 100a1bde736SShuo Chen if (state_ == kConnected) 101a1bde736SShuo Chen { 102a1bde736SShuo Chen setState(kDisconnecting); 103a1bde736SShuo Chen // FIXME: shared_from_this()? 104a1bde736SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 105a1bde736SShuo Chen } 106a1bde736SShuo Chen} 107a1bde736SShuo Chen 108a1bde736SShuo Chenvoid TcpConnection::shutdownInLoop() 109a1bde736SShuo Chen{ 110a1bde736SShuo Chen loop_->assertInLoopThread(); 111a1bde736SShuo Chen if (!channel_->isWriting()) 112a1bde736SShuo Chen { 113a1bde736SShuo Chen // we are not writing 114a1bde736SShuo Chen socket_->shutdownWrite(); 115a1bde736SShuo Chen } 116a1bde736SShuo Chen} 117a1bde736SShuo Chen 118a1bde736SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on) 119a1bde736SShuo Chen{ 120a1bde736SShuo Chen socket_->setTcpNoDelay(on); 121a1bde736SShuo Chen} 122a1bde736SShuo Chen 123a1bde736SShuo Chenvoid TcpConnection::connectEstablished() 124a1bde736SShuo Chen{ 125a1bde736SShuo Chen loop_->assertInLoopThread(); 126a1bde736SShuo Chen assert(state_ == kConnecting); 127a1bde736SShuo Chen setState(kConnected); 128a1bde736SShuo Chen channel_->enableReading(); 129a1bde736SShuo Chen connectionCallback_(shared_from_this()); 130a1bde736SShuo Chen} 131a1bde736SShuo Chen 132a1bde736SShuo Chenvoid TcpConnection::connectDestroyed() 133a1bde736SShuo Chen{ 134a1bde736SShuo Chen loop_->assertInLoopThread(); 135a1bde736SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 136a1bde736SShuo Chen setState(kDisconnected); 137a1bde736SShuo Chen channel_->disableAll(); 138a1bde736SShuo Chen connectionCallback_(shared_from_this()); 139a1bde736SShuo Chen 140a1bde736SShuo Chen loop_->removeChannel(get_pointer(channel_)); 141a1bde736SShuo Chen} 142a1bde736SShuo Chen 143a1bde736SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 144a1bde736SShuo Chen{ 145a1bde736SShuo Chen int savedErrno = 0; 146a1bde736SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 147a1bde736SShuo Chen if (n > 0) { 148a1bde736SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 149a1bde736SShuo Chen } else if (n == 0) { 150a1bde736SShuo Chen handleClose(); 151a1bde736SShuo Chen } else { 152a1bde736SShuo Chen errno = savedErrno; 153a1bde736SShuo Chen LOG_SYSERR << "TcpConnection::handleRead"; 154a1bde736SShuo Chen handleError(); 155a1bde736SShuo Chen } 156a1bde736SShuo Chen} 157a1bde736SShuo Chen 158a1bde736SShuo Chenvoid TcpConnection::handleWrite() 159a1bde736SShuo Chen{ 160a1bde736SShuo Chen loop_->assertInLoopThread(); 161a1bde736SShuo Chen if (channel_->isWriting()) { 162a1bde736SShuo Chen ssize_t n = ::write(channel_->fd(), 163a1bde736SShuo Chen outputBuffer_.peek(), 164a1bde736SShuo Chen outputBuffer_.readableBytes()); 165a1bde736SShuo Chen if (n > 0) { 166a1bde736SShuo Chen outputBuffer_.retrieve(n); 167a1bde736SShuo Chen if (outputBuffer_.readableBytes() == 0) { 168a1bde736SShuo Chen channel_->disableWriting(); 169a1bde736SShuo Chen if (writeCompleteCallback_) { 170a1bde736SShuo Chen loop_->queueInLoop( 171a1bde736SShuo Chen boost::bind(writeCompleteCallback_, shared_from_this())); 172a1bde736SShuo Chen } 173a1bde736SShuo Chen if (state_ == kDisconnecting) { 174a1bde736SShuo Chen shutdownInLoop(); 175a1bde736SShuo Chen } 176a1bde736SShuo Chen } else { 177a1bde736SShuo Chen LOG_TRACE << "I am going to write more data"; 178a1bde736SShuo Chen } 179a1bde736SShuo Chen } else { 180a1bde736SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 181a1bde736SShuo Chen } 182a1bde736SShuo Chen } else { 183a1bde736SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 184a1bde736SShuo Chen } 185a1bde736SShuo Chen} 186a1bde736SShuo Chen 187a1bde736SShuo Chenvoid TcpConnection::handleClose() 188a1bde736SShuo Chen{ 189a1bde736SShuo Chen loop_->assertInLoopThread(); 190a1bde736SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 191a1bde736SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 192a1bde736SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 193a1bde736SShuo Chen channel_->disableAll(); 194a1bde736SShuo Chen // must be the last line 195a1bde736SShuo Chen closeCallback_(shared_from_this()); 196a1bde736SShuo Chen} 197a1bde736SShuo Chen 198a1bde736SShuo Chenvoid TcpConnection::handleError() 199a1bde736SShuo Chen{ 200a1bde736SShuo Chen int err = sockets::getSocketError(channel_->fd()); 201a1bde736SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 202a1bde736SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 203a1bde736SShuo Chen} 204