1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "TcpConnection.h" 9 10#include "logging/Logging.h" 11#include "Channel.h" 12#include "EventLoop.h" 13#include "Socket.h" 14#include "SocketsOps.h" 15 16#include <boost/bind.hpp> 17 18#include <errno.h> 19#include <stdio.h> 20 21using namespace muduo; 22 23TcpConnection::TcpConnection(EventLoop* loop, 24 const std::string& nameArg, 25 int sockfd, 26 const InetAddress& localAddr, 27 const InetAddress& peerAddr) 28 : loop_(CHECK_NOTNULL(loop)), 29 name_(nameArg), 30 state_(kConnecting), 31 socket_(new Socket(sockfd)), 32 channel_(new Channel(loop, sockfd)), 33 localAddr_(localAddr), 34 peerAddr_(peerAddr) 35{ 36 LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37 << " fd=" << sockfd; 38 channel_->setReadCallback( 39 boost::bind(&TcpConnection::handleRead, this, _1)); 40 channel_->setWriteCallback( 41 boost::bind(&TcpConnection::handleWrite, this)); 42 channel_->setCloseCallback( 43 boost::bind(&TcpConnection::handleClose, this)); 44 channel_->setErrorCallback( 45 boost::bind(&TcpConnection::handleError, this)); 46} 47 48TcpConnection::~TcpConnection() 49{ 50 LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51 << " fd=" << channel_->fd(); 52} 53 54void TcpConnection::send(const std::string& message) 55{ 56 if (state_ == kConnected) { 57 if (loop_->isInLoopThread()) { 58 sendInLoop(message); 59 } else { 60 loop_->runInLoop( 61 boost::bind(&TcpConnection::sendInLoop, this, message)); 62 } 63 } 64} 65 66void TcpConnection::sendInLoop(const std::string& message) 67{ 68 loop_->assertInLoopThread(); 69 ssize_t nwrote = 0; 70 // if no thing in output queue, try writing directly 71 if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72 nwrote = ::write(channel_->fd(), message.data(), message.size()); 73 if (nwrote >= 0) { 74 if (implicit_cast<size_t>(nwrote) < message.size()) { 75 LOG_TRACE << "I am going to write more data"; 76 } 77 } else { 78 nwrote = 0; 79 if (errno != EWOULDBLOCK) { 80 LOG_SYSERR << "TcpConnection::sendInLoop"; 81 } 82 } 83 } 84 85 assert(nwrote >= 0); 86 if (implicit_cast<size_t>(nwrote) < message.size()) { 87 outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 88 if (!channel_->isWriting()) { 89 channel_->enableWriting(); 90 } 91 } 92} 93 94void TcpConnection::shutdown() 95{ 96 // FIXME: use compare and swap 97 if (state_ == kConnected) 98 { 99 setState(kDisconnecting); 100 // FIXME: shared_from_this()? 101 loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 102 } 103} 104 105void TcpConnection::shutdownInLoop() 106{ 107 loop_->assertInLoopThread(); 108 if (!channel_->isWriting()) 109 { 110 // we are not writing 111 socket_->shutdownWrite(); 112 } 113} 114 115void TcpConnection::connectEstablished() 116{ 117 loop_->assertInLoopThread(); 118 assert(state_ == kConnecting); 119 setState(kConnected); 120 channel_->enableReading(); 121 connectionCallback_(shared_from_this()); 122} 123 124void TcpConnection::connectDestroyed() 125{ 126 loop_->assertInLoopThread(); 127 assert(state_ == kConnected || state_ == kDisconnecting); 128 setState(kDisconnected); 129 channel_->disableAll(); 130 connectionCallback_(shared_from_this()); 131 132 loop_->removeChannel(get_pointer(channel_)); 133} 134 135void TcpConnection::handleRead(Timestamp receiveTime) 136{ 137 int savedErrno = 0; 138 ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 139 if (n > 0) { 140 messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 141 } else if (n == 0) { 142 handleClose(); 143 } else { 144 errno = savedErrno; 145 LOG_SYSERR << "TcpConnection::handleRead"; 146 handleError(); 147 } 148} 149 150void TcpConnection::handleWrite() 151{ 152 loop_->assertInLoopThread(); 153 if (channel_->isWriting()) { 154 ssize_t n = ::write(channel_->fd(), 155 outputBuffer_.peek(), 156 outputBuffer_.readableBytes()); 157 if (n > 0) { 158 outputBuffer_.retrieve(n); 159 if (outputBuffer_.readableBytes() == 0) { 160 channel_->disableWriting(); 161 if (state_ == kDisconnecting) { 162 shutdownInLoop(); 163 } 164 } else { 165 LOG_TRACE << "I am going to write more data"; 166 } 167 } else { 168 LOG_SYSERR << "TcpConnection::handleWrite"; 169 } 170 } else { 171 LOG_TRACE << "Connection is down, no more writing"; 172 } 173} 174 175void TcpConnection::handleClose() 176{ 177 loop_->assertInLoopThread(); 178 LOG_TRACE << "TcpConnection::handleClose state = " << state_; 179 assert(state_ == kConnected || state_ == kDisconnecting); 180 // we don't close fd, leave it to dtor, so we can find leaks easily. 181 channel_->disableAll(); 182 // must be the last line 183 closeCallback_(shared_from_this()); 184} 185 186void TcpConnection::handleError() 187{ 188 int err = sockets::getSocketError(channel_->fd()); 189 LOG_ERROR << "TcpConnection::handleError [" << name_ 190 << "] - SO_ERROR = " << err << " " << strerror_tl(err); 191} 192