1 // excerpts from http://code.google.com/p/muduo/ 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the License file. 5 // 6 // Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8 #include "TcpConnection.h" 9 10 #include "logging/Logging.h" 11 #include "Channel.h" 12 #include "EventLoop.h" 13 #include "Socket.h" 14 #include "SocketsOps.h" 15 16 #include <boost/bind.hpp> 17 18 #include <errno.h> 19 #include <stdio.h> 20 21 using namespace muduo; 22 23 TcpConnection::TcpConnection(EventLoop* loop, 24 const std::string& nameArg, 25 int sockfd, 26 const InetAddress& localAddr, 27 const InetAddress& peerAddr) 28 : loop_(CHECK_NOTNULL(loop)), 29 name_(nameArg), 30 state_(kConnecting), 31 socket_(new Socket(sockfd)), 32 channel_(new Channel(loop, sockfd)), 33 localAddr_(localAddr), 34 peerAddr_(peerAddr) 35 { 36 LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 37 << " fd=" << sockfd; 38 channel_->setReadCallback( 39 boost::bind(&TcpConnection::handleRead, this, _1)); 40 channel_->setWriteCallback( 41 boost::bind(&TcpConnection::handleWrite, this)); 42 channel_->setCloseCallback( 43 boost::bind(&TcpConnection::handleClose, this)); 44 channel_->setErrorCallback( 45 boost::bind(&TcpConnection::handleError, this)); 46 } 47 48 TcpConnection::~TcpConnection() 49 { 50 LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 51 << " fd=" << channel_->fd(); 52 } 53 54+void TcpConnection::send(const std::string& message) 55+{ 56+ if (state_ == kConnected) { 57+ if (loop_->isInLoopThread()) { 58+ sendInLoop(message); 59+ } else { 60+ loop_->runInLoop( 61+ boost::bind(&TcpConnection::sendInLoop, this, message)); 62+ } 63+ } 64+} 65+ 66+void TcpConnection::sendInLoop(const std::string& message) 67+{ 68+ loop_->assertInLoopThread(); 69+ ssize_t nwrote = 0; 70+ // if no thing in output queue, try writing directly 71+ if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 72+ nwrote = ::write(channel_->fd(), message.data(), message.size()); 73+ if (nwrote >= 0) { 74+ if (implicit_cast<size_t>(nwrote) < message.size()) { 75+ LOG_TRACE << "I am going to write more data"; 76+ } 77+ } else { 78+ nwrote = 0; 79+ if (errno != EWOULDBLOCK) { 80+ LOG_SYSERR << "TcpConnection::sendInLoop"; 81+ } 82+ } 83+ } 84+ 85+ assert(nwrote >= 0); 86+ if (implicit_cast<size_t>(nwrote) < message.size()) { 87+ outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 88+ if (!channel_->isWriting()) { 89+ channel_->enableWriting(); 90+ } 91+ } 92+} 93+ 94+void TcpConnection::shutdown() 95+{ 96+ // FIXME: use compare and swap 97+ if (state_ == kConnected) 98+ { 99+ setState(kDisconnecting); 100+ // FIXME: shared_from_this()? 101+ loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 102+ } 103+} 104+ 105+void TcpConnection::shutdownInLoop() 106+{ 107+ loop_->assertInLoopThread(); 108+ if (!channel_->isWriting()) 109+ { 110+ // we are not writing 111+ socket_->shutdownWrite(); 112+ } 113+} 114+ 115 void TcpConnection::connectEstablished() 116 { 117 loop_->assertInLoopThread(); 118 assert(state_ == kConnecting); 119 setState(kConnected); 120 channel_->enableReading(); 121 connectionCallback_(shared_from_this()); 122 } 123 124 void TcpConnection::connectDestroyed() 125 { 126 loop_->assertInLoopThread(); 127! assert(state_ == kConnected || state_ == kDisconnecting); 128 setState(kDisconnected); 129 channel_->disableAll(); 130 connectionCallback_(shared_from_this()); 131 132 loop_->removeChannel(get_pointer(channel_)); 133 } 134 135 void TcpConnection::handleRead(Timestamp receiveTime) 136 { 137 int savedErrno = 0; 138 ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 139 if (n > 0) { 140 messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 141 } else if (n == 0) { 142 handleClose(); 143 } else { 144 // FIXME: check savedErrno 145 handleError(); 146 } 147 } 148 149 void TcpConnection::handleWrite() 150 { 151+ loop_->assertInLoopThread(); 152+ if (channel_->isWriting()) { 153+ ssize_t n = ::write(channel_->fd(), 154+ outputBuffer_.peek(), 155+ outputBuffer_.readableBytes()); 156+ if (n > 0) { 157+ outputBuffer_.retrieve(n); 158+ if (outputBuffer_.readableBytes() == 0) { 159+ channel_->disableWriting(); 160+ if (state_ == kDisconnecting) { 161+ shutdownInLoop(); 162+ } 163+ } else { 164+ LOG_TRACE << "I am going to write more data"; 165+ } 166+ } else { 167+ LOG_SYSERR << "TcpConnection::handleWrite"; 168+ abort(); // FIXME 169+ } 170+ } else { 171+ LOG_TRACE << "Connection is down, no more writing"; 172+ } 173 } 174 175 void TcpConnection::handleClose() 176 { 177 loop_->assertInLoopThread(); 178 LOG_TRACE << "TcpConnection::handleClose state = " << state_; 179! assert(state_ == kConnected || state_ == kDisconnecting); 180 // we don't close fd, leave it to dtor, so we can find leaks easily. 181 channel_->disableAll(); 182 // must be the last line 183 closeCallback_(shared_from_this()); 184 } 185 186 void TcpConnection::handleError() 187 { 188 int err = sockets::getSocketError(channel_->fd()); 189 LOG_ERROR << "TcpConnection::handleError [" << name_ 190 << "] - SO_ERROR = " << err << " " << strerror_tl(err); 191 } 192