15af4b7fbSShuo Chen // excerpts from http://code.google.com/p/muduo/ 25af4b7fbSShuo Chen // 35af4b7fbSShuo Chen // Use of this source code is governed by a BSD-style license 45af4b7fbSShuo Chen // that can be found in the License file. 55af4b7fbSShuo Chen // 65af4b7fbSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com) 75af4b7fbSShuo Chen 85af4b7fbSShuo Chen #include "TcpConnection.h" 95af4b7fbSShuo Chen 105af4b7fbSShuo Chen #include "logging/Logging.h" 115af4b7fbSShuo Chen #include "Channel.h" 125af4b7fbSShuo Chen #include "EventLoop.h" 135af4b7fbSShuo Chen #include "Socket.h" 145af4b7fbSShuo Chen #include "SocketsOps.h" 155af4b7fbSShuo Chen 165af4b7fbSShuo Chen #include <boost/bind.hpp> 175af4b7fbSShuo Chen 185af4b7fbSShuo Chen #include <errno.h> 195af4b7fbSShuo Chen #include <stdio.h> 205af4b7fbSShuo Chen 215af4b7fbSShuo Chen using namespace muduo; 225af4b7fbSShuo Chen 235af4b7fbSShuo Chen TcpConnection::TcpConnection(EventLoop* loop, 245af4b7fbSShuo Chen const std::string& nameArg, 255af4b7fbSShuo Chen int sockfd, 265af4b7fbSShuo Chen const InetAddress& localAddr, 275af4b7fbSShuo Chen const InetAddress& peerAddr) 285af4b7fbSShuo Chen : loop_(CHECK_NOTNULL(loop)), 295af4b7fbSShuo Chen name_(nameArg), 305af4b7fbSShuo Chen state_(kConnecting), 315af4b7fbSShuo Chen socket_(new Socket(sockfd)), 325af4b7fbSShuo Chen channel_(new Channel(loop, sockfd)), 335af4b7fbSShuo Chen localAddr_(localAddr), 345af4b7fbSShuo Chen peerAddr_(peerAddr) 355af4b7fbSShuo Chen { 365af4b7fbSShuo Chen LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this 375af4b7fbSShuo Chen << " fd=" << sockfd; 385af4b7fbSShuo Chen channel_->setReadCallback( 395af4b7fbSShuo Chen boost::bind(&TcpConnection::handleRead, this, _1)); 405af4b7fbSShuo Chen channel_->setWriteCallback( 415af4b7fbSShuo Chen boost::bind(&TcpConnection::handleWrite, this)); 425af4b7fbSShuo Chen channel_->setCloseCallback( 435af4b7fbSShuo Chen boost::bind(&TcpConnection::handleClose, this)); 445af4b7fbSShuo Chen channel_->setErrorCallback( 455af4b7fbSShuo Chen boost::bind(&TcpConnection::handleError, this)); 465af4b7fbSShuo Chen } 475af4b7fbSShuo Chen 485af4b7fbSShuo Chen TcpConnection::~TcpConnection() 495af4b7fbSShuo Chen { 505af4b7fbSShuo Chen LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this 515af4b7fbSShuo Chen << " fd=" << channel_->fd(); 525af4b7fbSShuo Chen } 535af4b7fbSShuo Chen 545af4b7fbSShuo Chen void TcpConnection::send(const std::string& message) 555af4b7fbSShuo Chen { 565af4b7fbSShuo Chen if (state_ == kConnected) { 575af4b7fbSShuo Chen if (loop_->isInLoopThread()) { 585af4b7fbSShuo Chen sendInLoop(message); 595af4b7fbSShuo Chen } else { 605af4b7fbSShuo Chen loop_->runInLoop( 615af4b7fbSShuo Chen boost::bind(&TcpConnection::sendInLoop, this, message)); 625af4b7fbSShuo Chen } 635af4b7fbSShuo Chen } 645af4b7fbSShuo Chen } 655af4b7fbSShuo Chen 665af4b7fbSShuo Chen void TcpConnection::sendInLoop(const std::string& message) 675af4b7fbSShuo Chen { 685af4b7fbSShuo Chen loop_->assertInLoopThread(); 695af4b7fbSShuo Chen ssize_t nwrote = 0; 705af4b7fbSShuo Chen // if no thing in output queue, try writing directly 715af4b7fbSShuo Chen if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { 725af4b7fbSShuo Chen nwrote = ::write(channel_->fd(), message.data(), message.size()); 735af4b7fbSShuo Chen if (nwrote >= 0) { 745af4b7fbSShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 755af4b7fbSShuo Chen LOG_TRACE << "I am going to write more data"; 765af4b7fbSShuo Chen+ } else if (writeCompleteCallback_) { 775af4b7fbSShuo Chen+ loop_->queueInLoop( 785af4b7fbSShuo Chen+ boost::bind(writeCompleteCallback_, shared_from_this())); 795af4b7fbSShuo Chen } 805af4b7fbSShuo Chen } else { 815af4b7fbSShuo Chen nwrote = 0; 825af4b7fbSShuo Chen if (errno != EWOULDBLOCK) { 835af4b7fbSShuo Chen LOG_SYSERR << "TcpConnection::sendInLoop"; 845af4b7fbSShuo Chen } 855af4b7fbSShuo Chen } 865af4b7fbSShuo Chen } 875af4b7fbSShuo Chen 885af4b7fbSShuo Chen assert(nwrote >= 0); 895af4b7fbSShuo Chen if (implicit_cast<size_t>(nwrote) < message.size()) { 905af4b7fbSShuo Chen outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); 915af4b7fbSShuo Chen if (!channel_->isWriting()) { 925af4b7fbSShuo Chen channel_->enableWriting(); 935af4b7fbSShuo Chen } 945af4b7fbSShuo Chen } 955af4b7fbSShuo Chen } 965af4b7fbSShuo Chen 975af4b7fbSShuo Chen void TcpConnection::shutdown() 985af4b7fbSShuo Chen { 995af4b7fbSShuo Chen // FIXME: use compare and swap 1005af4b7fbSShuo Chen if (state_ == kConnected) 1015af4b7fbSShuo Chen { 1025af4b7fbSShuo Chen setState(kDisconnecting); 1035af4b7fbSShuo Chen // FIXME: shared_from_this()? 1045af4b7fbSShuo Chen loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); 1055af4b7fbSShuo Chen } 1065af4b7fbSShuo Chen } 1075af4b7fbSShuo Chen 1085af4b7fbSShuo Chen void TcpConnection::shutdownInLoop() 1095af4b7fbSShuo Chen { 1105af4b7fbSShuo Chen loop_->assertInLoopThread(); 1115af4b7fbSShuo Chen if (!channel_->isWriting()) 1125af4b7fbSShuo Chen { 1135af4b7fbSShuo Chen // we are not writing 1145af4b7fbSShuo Chen socket_->shutdownWrite(); 1155af4b7fbSShuo Chen } 1165af4b7fbSShuo Chen } 1175af4b7fbSShuo Chen 1185af4b7fbSShuo Chen+void TcpConnection::setTcpNoDelay(bool on) 1195af4b7fbSShuo Chen+{ 1205af4b7fbSShuo Chen+ socket_->setTcpNoDelay(on); 1215af4b7fbSShuo Chen+} 1225af4b7fbSShuo Chen+ 1235af4b7fbSShuo Chen void TcpConnection::connectEstablished() 1245af4b7fbSShuo Chen { 1255af4b7fbSShuo Chen loop_->assertInLoopThread(); 1265af4b7fbSShuo Chen assert(state_ == kConnecting); 1275af4b7fbSShuo Chen setState(kConnected); 1285af4b7fbSShuo Chen channel_->enableReading(); 1295af4b7fbSShuo Chen connectionCallback_(shared_from_this()); 1305af4b7fbSShuo Chen } 1315af4b7fbSShuo Chen 1325af4b7fbSShuo Chen void TcpConnection::connectDestroyed() 1335af4b7fbSShuo Chen { 1345af4b7fbSShuo Chen loop_->assertInLoopThread(); 1355af4b7fbSShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1365af4b7fbSShuo Chen setState(kDisconnected); 1375af4b7fbSShuo Chen channel_->disableAll(); 1385af4b7fbSShuo Chen connectionCallback_(shared_from_this()); 1395af4b7fbSShuo Chen 1405af4b7fbSShuo Chen loop_->removeChannel(get_pointer(channel_)); 1415af4b7fbSShuo Chen } 1425af4b7fbSShuo Chen 1435af4b7fbSShuo Chen void TcpConnection::handleRead(Timestamp receiveTime) 1445af4b7fbSShuo Chen { 1455af4b7fbSShuo Chen int savedErrno = 0; 1465af4b7fbSShuo Chen ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); 1475af4b7fbSShuo Chen if (n > 0) { 1485af4b7fbSShuo Chen messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 1495af4b7fbSShuo Chen } else if (n == 0) { 1505af4b7fbSShuo Chen handleClose(); 1515af4b7fbSShuo Chen } else { 1525af4b7fbSShuo Chen // FIXME: check savedErrno 1535af4b7fbSShuo Chen handleError(); 1545af4b7fbSShuo Chen } 1555af4b7fbSShuo Chen } 1565af4b7fbSShuo Chen 1575af4b7fbSShuo Chen void TcpConnection::handleWrite() 1585af4b7fbSShuo Chen { 1595af4b7fbSShuo Chen loop_->assertInLoopThread(); 1605af4b7fbSShuo Chen if (channel_->isWriting()) { 1615af4b7fbSShuo Chen ssize_t n = ::write(channel_->fd(), 1625af4b7fbSShuo Chen outputBuffer_.peek(), 1635af4b7fbSShuo Chen outputBuffer_.readableBytes()); 1645af4b7fbSShuo Chen if (n > 0) { 1655af4b7fbSShuo Chen outputBuffer_.retrieve(n); 1665af4b7fbSShuo Chen if (outputBuffer_.readableBytes() == 0) { 1675af4b7fbSShuo Chen channel_->disableWriting(); 1685af4b7fbSShuo Chen+ if (writeCompleteCallback_) { 1695af4b7fbSShuo Chen+ loop_->queueInLoop( 1705af4b7fbSShuo Chen+ boost::bind(writeCompleteCallback_, shared_from_this())); 1715af4b7fbSShuo Chen+ } 1725af4b7fbSShuo Chen if (state_ == kDisconnecting) { 1735af4b7fbSShuo Chen shutdownInLoop(); 1745af4b7fbSShuo Chen } 1755af4b7fbSShuo Chen } else { 1765af4b7fbSShuo Chen LOG_TRACE << "I am going to write more data"; 1775af4b7fbSShuo Chen } 1785af4b7fbSShuo Chen } else { 1795af4b7fbSShuo Chen LOG_SYSERR << "TcpConnection::handleWrite"; 1805af4b7fbSShuo Chen abort(); // FIXME 1815af4b7fbSShuo Chen } 1825af4b7fbSShuo Chen } else { 1835af4b7fbSShuo Chen LOG_TRACE << "Connection is down, no more writing"; 1845af4b7fbSShuo Chen } 1855af4b7fbSShuo Chen } 1865af4b7fbSShuo Chen 1875af4b7fbSShuo Chen void TcpConnection::handleClose() 1885af4b7fbSShuo Chen { 1895af4b7fbSShuo Chen loop_->assertInLoopThread(); 1905af4b7fbSShuo Chen LOG_TRACE << "TcpConnection::handleClose state = " << state_; 1915af4b7fbSShuo Chen assert(state_ == kConnected || state_ == kDisconnecting); 1925af4b7fbSShuo Chen // we don't close fd, leave it to dtor, so we can find leaks easily. 1935af4b7fbSShuo Chen channel_->disableAll(); 1945af4b7fbSShuo Chen // must be the last line 1955af4b7fbSShuo Chen closeCallback_(shared_from_this()); 1965af4b7fbSShuo Chen } 1975af4b7fbSShuo Chen 1985af4b7fbSShuo Chen void TcpConnection::handleError() 1995af4b7fbSShuo Chen { 2005af4b7fbSShuo Chen int err = sockets::getSocketError(channel_->fd()); 2015af4b7fbSShuo Chen LOG_ERROR << "TcpConnection::handleError [" << name_ 2025af4b7fbSShuo Chen << "] - SO_ERROR = " << err << " " << strerror_tl(err); 2035af4b7fbSShuo Chen } 204