TcpConnection.cc revision b37003a7
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2b37003a7SShuo Chen// 3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license 4b37003a7SShuo Chen// that can be found in the License file. 5b37003a7SShuo Chen// 6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7b37003a7SShuo Chen 8b37003a7SShuo Chen#include "TcpConnection.h" 9b37003a7SShuo Chen 10b37003a7SShuo Chen#include "logging/Logging.h" 11b37003a7SShuo Chen#include "Channel.h" 12b37003a7SShuo Chen#include "EventLoop.h" 13b37003a7SShuo Chen#include "Socket.h" 14b37003a7SShuo Chen#include "SocketsOps.h" 15b37003a7SShuo Chen 16b37003a7SShuo Chen#include <boost/bind.hpp> 17b37003a7SShuo Chen 18b37003a7SShuo Chen#include <errno.h> 19b37003a7SShuo Chen#include <stdio.h> 20b37003a7SShuo Chen 21b37003a7SShuo Chenusing namespace muduo; 22b37003a7SShuo Chen 23b37003a7SShuo ChenTcpConnection::TcpConnection(EventLoop* loop, 24b37003a7SShuo Chen const std::string& nameArg, 25b37003a7SShuo Chen int sockfd, 26b37003a7SShuo Chen const InetAddress& localAddr, 27b37003a7SShuo Chen const InetAddress& peerAddr) 28b37003a7SShuo Chen : loop_(CHECK_NOTNULL(loop)), 29b37003a7SShuo Chen name_(nameArg), 30b37003a7SShuo Chen state_(kConnecting), 31b37003a7SShuo Chen socket_(new Socket(sockfd)), 32b37003a7SShuo Chen channel_(new Channel(loop, sockfd)), 33b37003a7SShuo Chen localAddr_(localAddr), 34b37003a7SShuo Chen peerAddr_(peerAddr) 35b37003a7SShuo Chen{ 36b37003a7SShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37b37003a7SShuo Chen << " fd=" << sockfd; 38b37003a7SShuo Chen channel_->setReadCallback( 39b37003a7SShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 40b37003a7SShuo Chen channel_->setWriteCallback( 41b37003a7SShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 42b37003a7SShuo Chen channel_->setCloseCallback( 43b37003a7SShuo Chen boost::bind(&TcpConnection::handleClose, this)); 44b37003a7SShuo Chen channel_->setErrorCallback( 45b37003a7SShuo Chen boost::bind(&TcpConnection::handleError, this)); 46b37003a7SShuo Chen} 47b37003a7SShuo Chen 48b37003a7SShuo ChenTcpConnection::~TcpConnection() 49b37003a7SShuo Chen{ 50b37003a7SShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51b37003a7SShuo Chen << " fd=" << channel_->fd(); 52b37003a7SShuo Chen} 53b37003a7SShuo Chen 54b37003a7SShuo Chenvoid TcpConnection::send(const std::string& message) 55b37003a7SShuo Chen{ 56b37003a7SShuo Chen if (state_ == kConnected) { 57b37003a7SShuo Chen if (loop_->isInLoopThread()) { 58b37003a7SShuo Chen sendInLoop(message); 59b37003a7SShuo Chen } else { 60b37003a7SShuo Chen loop_->runInLoop( 61b37003a7SShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 62b37003a7SShuo Chen } 63b37003a7SShuo Chen } 64b37003a7SShuo Chen} 65b37003a7SShuo Chen 66b37003a7SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message) 67b37003a7SShuo Chen{ 68b37003a7SShuo Chen loop_->assertInLoopThread(); 69b37003a7SShuo Chen ssize_t nwrote = 0; 70b37003a7SShuo Chen // if no thing in output queue, try writing directly 71b37003a7SShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72b37003a7SShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 73b37003a7SShuo Chen if (nwrote >= 0) { 74b37003a7SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 75b37003a7SShuo Chen LOG_TRACE << "I am going to write more data"; 76b37003a7SShuo Chen } 77b37003a7SShuo Chen } else { 78b37003a7SShuo Chen nwrote = 0; 79b37003a7SShuo Chen if (errno != EWOULDBLOCK) { 80b37003a7SShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 81b37003a7SShuo Chen } 82b37003a7SShuo Chen } 83b37003a7SShuo Chen } 84b37003a7SShuo Chen 85b37003a7SShuo Chen assert(nwrote >= 0); 86b37003a7SShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 87b37003a7SShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 88b37003a7SShuo Chen if (!channel_->isWriting()) { 89b37003a7SShuo Chen channel_->enableWriting(); 90b37003a7SShuo Chen } 91b37003a7SShuo Chen } 92b37003a7SShuo Chen} 93b37003a7SShuo Chen 94b37003a7SShuo Chenvoid TcpConnection::shutdown() 95b37003a7SShuo Chen{ 96b37003a7SShuo Chen // FIXME: use compare and swap 97b37003a7SShuo Chen if (state_ == kConnected) 98b37003a7SShuo Chen { 99b37003a7SShuo Chen setState(kDisconnecting); 100b37003a7SShuo Chen // FIXME: shared_from_this()? 101b37003a7SShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 102b37003a7SShuo Chen } 103b37003a7SShuo Chen} 104b37003a7SShuo Chen 105b37003a7SShuo Chenvoid TcpConnection::shutdownInLoop() 106b37003a7SShuo Chen{ 107b37003a7SShuo Chen loop_->assertInLoopThread(); 108b37003a7SShuo Chen if (!channel_->isWriting()) 109b37003a7SShuo Chen { 110b37003a7SShuo Chen // we are not writing 111b37003a7SShuo Chen socket_->shutdownWrite(); 112b37003a7SShuo Chen } 113b37003a7SShuo Chen} 114b37003a7SShuo Chen 115b37003a7SShuo Chenvoid TcpConnection::connectEstablished() 116b37003a7SShuo Chen{ 117b37003a7SShuo Chen loop_->assertInLoopThread(); 118b37003a7SShuo Chen assert(state_ == kConnecting); 119b37003a7SShuo Chen setState(kConnected); 120b37003a7SShuo Chen channel_->enableReading(); 121b37003a7SShuo Chen connectionCallback_(shared_from_this()); 122b37003a7SShuo Chen} 123b37003a7SShuo Chen 124b37003a7SShuo Chenvoid TcpConnection::connectDestroyed() 125b37003a7SShuo Chen{ 126b37003a7SShuo Chen loop_->assertInLoopThread(); 127b37003a7SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 128b37003a7SShuo Chen setState(kDisconnected); 129b37003a7SShuo Chen channel_->disableAll(); 130b37003a7SShuo Chen connectionCallback_(shared_from_this()); 131b37003a7SShuo Chen 132b37003a7SShuo Chen loop_->removeChannel(get_pointer(channel_)); 133b37003a7SShuo Chen} 134b37003a7SShuo Chen 135b37003a7SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime) 136b37003a7SShuo Chen{ 137b37003a7SShuo Chen int savedErrno = 0; 138b37003a7SShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 139b37003a7SShuo Chen if (n > 0) { 140b37003a7SShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 141b37003a7SShuo Chen } else if (n == 0) { 142b37003a7SShuo Chen handleClose(); 143b37003a7SShuo Chen } else { 144b37003a7SShuo Chen // FIXME: check savedErrno 145b37003a7SShuo Chen handleError(); 146b37003a7SShuo Chen } 147b37003a7SShuo Chen} 148b37003a7SShuo Chen 149b37003a7SShuo Chenvoid TcpConnection::handleWrite() 150b37003a7SShuo Chen{ 151b37003a7SShuo Chen loop_->assertInLoopThread(); 152b37003a7SShuo Chen if (channel_->isWriting()) { 153b37003a7SShuo Chen ssize_t n = ::write(channel_->fd(), 154b37003a7SShuo Chen outputBuffer_.peek(), 155b37003a7SShuo Chen outputBuffer_.readableBytes()); 156b37003a7SShuo Chen if (n > 0) { 157b37003a7SShuo Chen outputBuffer_.retrieve(n); 158b37003a7SShuo Chen if (outputBuffer_.readableBytes() == 0) { 159b37003a7SShuo Chen channel_->disableWriting(); 160b37003a7SShuo Chen if (state_ == kDisconnecting) { 161b37003a7SShuo Chen shutdownInLoop(); 162b37003a7SShuo Chen } 163b37003a7SShuo Chen } else { 164b37003a7SShuo Chen LOG_TRACE << "I am going to write more data"; 165b37003a7SShuo Chen } 166b37003a7SShuo Chen } else { 167b37003a7SShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 168b37003a7SShuo Chen abort(); // FIXME 169b37003a7SShuo Chen } 170b37003a7SShuo Chen } else { 171b37003a7SShuo Chen LOG_TRACE << "Connection is down, no more writing"; 172b37003a7SShuo Chen } 173b37003a7SShuo Chen} 174b37003a7SShuo Chen 175b37003a7SShuo Chenvoid TcpConnection::handleClose() 176b37003a7SShuo Chen{ 177b37003a7SShuo Chen loop_->assertInLoopThread(); 178b37003a7SShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 179b37003a7SShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 180b37003a7SShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 181b37003a7SShuo Chen channel_->disableAll(); 182b37003a7SShuo Chen // must be the last line 183b37003a7SShuo Chen closeCallback_(shared_from_this()); 184b37003a7SShuo Chen} 185b37003a7SShuo Chen 186b37003a7SShuo Chenvoid TcpConnection::handleError() 187b37003a7SShuo Chen{ 188b37003a7SShuo Chen int err = sockets::getSocketError(channel_->fd()); 189b37003a7SShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 190b37003a7SShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 191b37003a7SShuo Chen} 192