15af4b7fbSShuo Chen // excerpts from http://code.google.com/p/muduo/
25af4b7fbSShuo Chen //
35af4b7fbSShuo Chen // Use of this source code is governed by a BSD-style license
45af4b7fbSShuo Chen // that can be found in the License file.
55af4b7fbSShuo Chen //
65af4b7fbSShuo Chen // Author: Shuo Chen (chenshuo at chenshuo dot com)
75af4b7fbSShuo Chen 
85af4b7fbSShuo Chen #include "TcpConnection.h"
95af4b7fbSShuo Chen 
105af4b7fbSShuo Chen #include "logging/Logging.h"
115af4b7fbSShuo Chen #include "Channel.h"
125af4b7fbSShuo Chen #include "EventLoop.h"
135af4b7fbSShuo Chen #include "Socket.h"
145af4b7fbSShuo Chen #include "SocketsOps.h"
155af4b7fbSShuo Chen 
165af4b7fbSShuo Chen #include <boost/bind.hpp>
175af4b7fbSShuo Chen 
185af4b7fbSShuo Chen #include <errno.h>
195af4b7fbSShuo Chen #include <stdio.h>
205af4b7fbSShuo Chen 
215af4b7fbSShuo Chen using namespace muduo;
225af4b7fbSShuo Chen 
235af4b7fbSShuo Chen TcpConnection::TcpConnection(EventLoop* loop,
245af4b7fbSShuo Chen                              const std::string& nameArg,
255af4b7fbSShuo Chen                              int sockfd,
265af4b7fbSShuo Chen                              const InetAddress& localAddr,
275af4b7fbSShuo Chen                              const InetAddress& peerAddr)
285af4b7fbSShuo Chen   : loop_(CHECK_NOTNULL(loop)),
295af4b7fbSShuo Chen     name_(nameArg),
305af4b7fbSShuo Chen     state_(kConnecting),
315af4b7fbSShuo Chen     socket_(new Socket(sockfd)),
325af4b7fbSShuo Chen     channel_(new Channel(loop, sockfd)),
335af4b7fbSShuo Chen     localAddr_(localAddr),
345af4b7fbSShuo Chen     peerAddr_(peerAddr)
355af4b7fbSShuo Chen {
365af4b7fbSShuo Chen   LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
375af4b7fbSShuo Chen             << " fd=" << sockfd;
385af4b7fbSShuo Chen   channel_->setReadCallback(
395af4b7fbSShuo Chen       boost::bind(&TcpConnection::handleRead, this, _1));
405af4b7fbSShuo Chen   channel_->setWriteCallback(
415af4b7fbSShuo Chen       boost::bind(&TcpConnection::handleWrite, this));
425af4b7fbSShuo Chen   channel_->setCloseCallback(
435af4b7fbSShuo Chen       boost::bind(&TcpConnection::handleClose, this));
445af4b7fbSShuo Chen   channel_->setErrorCallback(
455af4b7fbSShuo Chen       boost::bind(&TcpConnection::handleError, this));
465af4b7fbSShuo Chen }
475af4b7fbSShuo Chen 
485af4b7fbSShuo Chen TcpConnection::~TcpConnection()
495af4b7fbSShuo Chen {
505af4b7fbSShuo Chen   LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
515af4b7fbSShuo Chen             << " fd=" << channel_->fd();
525af4b7fbSShuo Chen }
535af4b7fbSShuo Chen 
545af4b7fbSShuo Chen void TcpConnection::send(const std::string& message)
555af4b7fbSShuo Chen {
565af4b7fbSShuo Chen   if (state_ == kConnected) {
575af4b7fbSShuo Chen     if (loop_->isInLoopThread()) {
585af4b7fbSShuo Chen       sendInLoop(message);
595af4b7fbSShuo Chen     } else {
605af4b7fbSShuo Chen       loop_->runInLoop(
615af4b7fbSShuo Chen           boost::bind(&TcpConnection::sendInLoop, this, message));
625af4b7fbSShuo Chen     }
635af4b7fbSShuo Chen   }
645af4b7fbSShuo Chen }
655af4b7fbSShuo Chen 
665af4b7fbSShuo Chen void TcpConnection::sendInLoop(const std::string& message)
675af4b7fbSShuo Chen {
685af4b7fbSShuo Chen   loop_->assertInLoopThread();
695af4b7fbSShuo Chen   ssize_t nwrote = 0;
705af4b7fbSShuo Chen   // if no thing in output queue, try writing directly
715af4b7fbSShuo Chen   if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
725af4b7fbSShuo Chen     nwrote = ::write(channel_->fd(), message.data(), message.size());
735af4b7fbSShuo Chen     if (nwrote >= 0) {
745af4b7fbSShuo Chen       if (implicit_cast<size_t>(nwrote) < message.size()) {
755af4b7fbSShuo Chen         LOG_TRACE << "I am going to write more data";
765af4b7fbSShuo Chen+      } else if (writeCompleteCallback_) {
775af4b7fbSShuo Chen+        loop_->queueInLoop(
785af4b7fbSShuo Chen+            boost::bind(writeCompleteCallback_, shared_from_this()));
795af4b7fbSShuo Chen       }
805af4b7fbSShuo Chen     } else {
815af4b7fbSShuo Chen       nwrote = 0;
825af4b7fbSShuo Chen       if (errno != EWOULDBLOCK) {
835af4b7fbSShuo Chen         LOG_SYSERR << "TcpConnection::sendInLoop";
845af4b7fbSShuo Chen       }
855af4b7fbSShuo Chen     }
865af4b7fbSShuo Chen   }
875af4b7fbSShuo Chen 
885af4b7fbSShuo Chen   assert(nwrote >= 0);
895af4b7fbSShuo Chen   if (implicit_cast<size_t>(nwrote) < message.size()) {
905af4b7fbSShuo Chen     outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
915af4b7fbSShuo Chen     if (!channel_->isWriting()) {
925af4b7fbSShuo Chen       channel_->enableWriting();
935af4b7fbSShuo Chen     }
945af4b7fbSShuo Chen   }
955af4b7fbSShuo Chen }
965af4b7fbSShuo Chen 
975af4b7fbSShuo Chen void TcpConnection::shutdown()
985af4b7fbSShuo Chen {
995af4b7fbSShuo Chen   // FIXME: use compare and swap
1005af4b7fbSShuo Chen   if (state_ == kConnected)
1015af4b7fbSShuo Chen   {
1025af4b7fbSShuo Chen     setState(kDisconnecting);
1035af4b7fbSShuo Chen     // FIXME: shared_from_this()?
1045af4b7fbSShuo Chen     loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
1055af4b7fbSShuo Chen   }
1065af4b7fbSShuo Chen }
1075af4b7fbSShuo Chen 
1085af4b7fbSShuo Chen void TcpConnection::shutdownInLoop()
1095af4b7fbSShuo Chen {
1105af4b7fbSShuo Chen   loop_->assertInLoopThread();
1115af4b7fbSShuo Chen   if (!channel_->isWriting())
1125af4b7fbSShuo Chen   {
1135af4b7fbSShuo Chen     // we are not writing
1145af4b7fbSShuo Chen     socket_->shutdownWrite();
1155af4b7fbSShuo Chen   }
1165af4b7fbSShuo Chen }
1175af4b7fbSShuo Chen 
1185af4b7fbSShuo Chen+void TcpConnection::setTcpNoDelay(bool on)
1195af4b7fbSShuo Chen+{
1205af4b7fbSShuo Chen+  socket_->setTcpNoDelay(on);
1215af4b7fbSShuo Chen+}
1225af4b7fbSShuo Chen+
1235af4b7fbSShuo Chen void TcpConnection::connectEstablished()
1245af4b7fbSShuo Chen {
1255af4b7fbSShuo Chen   loop_->assertInLoopThread();
1265af4b7fbSShuo Chen   assert(state_ == kConnecting);
1275af4b7fbSShuo Chen   setState(kConnected);
1285af4b7fbSShuo Chen   channel_->enableReading();
1295af4b7fbSShuo Chen   connectionCallback_(shared_from_this());
1305af4b7fbSShuo Chen }
1315af4b7fbSShuo Chen 
1325af4b7fbSShuo Chen void TcpConnection::connectDestroyed()
1335af4b7fbSShuo Chen {
1345af4b7fbSShuo Chen   loop_->assertInLoopThread();
1355af4b7fbSShuo Chen   assert(state_ == kConnected || state_ == kDisconnecting);
1365af4b7fbSShuo Chen   setState(kDisconnected);
1375af4b7fbSShuo Chen   channel_->disableAll();
1385af4b7fbSShuo Chen   connectionCallback_(shared_from_this());
1395af4b7fbSShuo Chen 
1405af4b7fbSShuo Chen   loop_->removeChannel(get_pointer(channel_));
1415af4b7fbSShuo Chen }
1425af4b7fbSShuo Chen 
1435af4b7fbSShuo Chen void TcpConnection::handleRead(Timestamp receiveTime)
1445af4b7fbSShuo Chen {
1455af4b7fbSShuo Chen   int savedErrno = 0;
1465af4b7fbSShuo Chen   ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
1475af4b7fbSShuo Chen   if (n > 0) {
1485af4b7fbSShuo Chen     messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
1495af4b7fbSShuo Chen   } else if (n == 0) {
1505af4b7fbSShuo Chen     handleClose();
1515af4b7fbSShuo Chen   } else {
1525af4b7fbSShuo Chen     // FIXME: check savedErrno
1535af4b7fbSShuo Chen     handleError();
1545af4b7fbSShuo Chen   }
1555af4b7fbSShuo Chen }
1565af4b7fbSShuo Chen 
1575af4b7fbSShuo Chen void TcpConnection::handleWrite()
1585af4b7fbSShuo Chen {
1595af4b7fbSShuo Chen   loop_->assertInLoopThread();
1605af4b7fbSShuo Chen   if (channel_->isWriting()) {
1615af4b7fbSShuo Chen     ssize_t n = ::write(channel_->fd(),
1625af4b7fbSShuo Chen                         outputBuffer_.peek(),
1635af4b7fbSShuo Chen                         outputBuffer_.readableBytes());
1645af4b7fbSShuo Chen     if (n > 0) {
1655af4b7fbSShuo Chen       outputBuffer_.retrieve(n);
1665af4b7fbSShuo Chen       if (outputBuffer_.readableBytes() == 0) {
1675af4b7fbSShuo Chen         channel_->disableWriting();
1685af4b7fbSShuo Chen+        if (writeCompleteCallback_) {
1695af4b7fbSShuo Chen+          loop_->queueInLoop(
1705af4b7fbSShuo Chen+              boost::bind(writeCompleteCallback_, shared_from_this()));
1715af4b7fbSShuo Chen+        }
1725af4b7fbSShuo Chen         if (state_ == kDisconnecting) {
1735af4b7fbSShuo Chen           shutdownInLoop();
1745af4b7fbSShuo Chen         }
1755af4b7fbSShuo Chen       } else {
1765af4b7fbSShuo Chen         LOG_TRACE << "I am going to write more data";
1775af4b7fbSShuo Chen       }
1785af4b7fbSShuo Chen     } else {
1795af4b7fbSShuo Chen       LOG_SYSERR << "TcpConnection::handleWrite";
1805af4b7fbSShuo Chen       abort();  // FIXME
1815af4b7fbSShuo Chen     }
1825af4b7fbSShuo Chen   } else {
1835af4b7fbSShuo Chen     LOG_TRACE << "Connection is down, no more writing";
1845af4b7fbSShuo Chen   }
1855af4b7fbSShuo Chen }
1865af4b7fbSShuo Chen 
1875af4b7fbSShuo Chen void TcpConnection::handleClose()
1885af4b7fbSShuo Chen {
1895af4b7fbSShuo Chen   loop_->assertInLoopThread();
1905af4b7fbSShuo Chen   LOG_TRACE << "TcpConnection::handleClose state = " << state_;
1915af4b7fbSShuo Chen   assert(state_ == kConnected || state_ == kDisconnecting);
1925af4b7fbSShuo Chen   // we don't close fd, leave it to dtor, so we can find leaks easily.
1935af4b7fbSShuo Chen   channel_->disableAll();
1945af4b7fbSShuo Chen   // must be the last line
1955af4b7fbSShuo Chen   closeCallback_(shared_from_this());
1965af4b7fbSShuo Chen }
1975af4b7fbSShuo Chen 
1985af4b7fbSShuo Chen void TcpConnection::handleError()
1995af4b7fbSShuo Chen {
2005af4b7fbSShuo Chen   int err = sockets::getSocketError(channel_->fd());
2015af4b7fbSShuo Chen   LOG_ERROR << "TcpConnection::handleError [" << name_
2025af4b7fbSShuo Chen             << "] - SO_ERROR = " << err << " " << strerror_tl(err);
2035af4b7fbSShuo Chen }
204