140161064SShuo Chen// excerpts from http://code.google.com/p/muduo/
240161064SShuo Chen//
340161064SShuo Chen// Use of this source code is governed by a BSD-style license
440161064SShuo Chen// that can be found in the License file.
540161064SShuo Chen//
640161064SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
740161064SShuo Chen
840161064SShuo Chen#include "TcpConnection.h"
940161064SShuo Chen
1040161064SShuo Chen#include "logging/Logging.h"
1140161064SShuo Chen#include "Channel.h"
1240161064SShuo Chen#include "EventLoop.h"
1340161064SShuo Chen#include "Socket.h"
1440161064SShuo Chen#include "SocketsOps.h"
1540161064SShuo Chen
1640161064SShuo Chen#include <boost/bind.hpp>
1740161064SShuo Chen
1840161064SShuo Chen#include <errno.h>
1940161064SShuo Chen#include <stdio.h>
2040161064SShuo Chen
2140161064SShuo Chenusing namespace muduo;
2240161064SShuo Chen
2340161064SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
2440161064SShuo Chen                             const std::string& nameArg,
2540161064SShuo Chen                             int sockfd,
2640161064SShuo Chen                             const InetAddress& localAddr,
2740161064SShuo Chen                             const InetAddress& peerAddr)
2840161064SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
2940161064SShuo Chen    name_(nameArg),
3040161064SShuo Chen    state_(kConnecting),
3140161064SShuo Chen    socket_(new Socket(sockfd)),
3240161064SShuo Chen    channel_(new Channel(loop, sockfd)),
3340161064SShuo Chen    localAddr_(localAddr),
3440161064SShuo Chen    peerAddr_(peerAddr)
3540161064SShuo Chen{
3640161064SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
3740161064SShuo Chen            << " fd=" << sockfd;
3840161064SShuo Chen  channel_->setReadCallback(
3940161064SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
4040161064SShuo Chen  channel_->setWriteCallback(
4140161064SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
4240161064SShuo Chen  channel_->setCloseCallback(
4340161064SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
4440161064SShuo Chen  channel_->setErrorCallback(
4540161064SShuo Chen      boost::bind(&TcpConnection::handleError, this));
4640161064SShuo Chen}
4740161064SShuo Chen
4840161064SShuo ChenTcpConnection::~TcpConnection()
4940161064SShuo Chen{
5040161064SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
5140161064SShuo Chen            << " fd=" << channel_->fd();
5240161064SShuo Chen}
5340161064SShuo Chen
5440161064SShuo Chenvoid TcpConnection::send(const std::string& message)
5540161064SShuo Chen{
5640161064SShuo Chen  if (state_ == kConnected) {
5740161064SShuo Chen    if (loop_->isInLoopThread()) {
5840161064SShuo Chen      sendInLoop(message);
5940161064SShuo Chen    } else {
6040161064SShuo Chen      loop_->runInLoop(
6140161064SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
6240161064SShuo Chen    }
6340161064SShuo Chen  }
6440161064SShuo Chen}
6540161064SShuo Chen
6640161064SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
6740161064SShuo Chen{
6840161064SShuo Chen  loop_->assertInLoopThread();
6940161064SShuo Chen  ssize_t nwrote = 0;
7040161064SShuo Chen  // if no thing in output queue, try writing directly
7140161064SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
7240161064SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
7340161064SShuo Chen    if (nwrote >= 0) {
7440161064SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
7540161064SShuo Chen        LOG_TRACE << "I am going to write more data";
7640161064SShuo Chen      } else if (writeCompleteCallback_) {
7740161064SShuo Chen        loop_->queueInLoop(
7840161064SShuo Chen            boost::bind(writeCompleteCallback_, shared_from_this()));
7940161064SShuo Chen      }
8040161064SShuo Chen    } else {
8140161064SShuo Chen      nwrote = 0;
8240161064SShuo Chen      if (errno != EWOULDBLOCK) {
8340161064SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
8440161064SShuo Chen      }
8540161064SShuo Chen    }
8640161064SShuo Chen  }
8740161064SShuo Chen
8840161064SShuo Chen  assert(nwrote >= 0);
8940161064SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
9040161064SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
9140161064SShuo Chen    if (!channel_->isWriting()) {
9240161064SShuo Chen      channel_->enableWriting();
9340161064SShuo Chen    }
9440161064SShuo Chen  }
9540161064SShuo Chen}
9640161064SShuo Chen
9740161064SShuo Chenvoid TcpConnection::shutdown()
9840161064SShuo Chen{
9940161064SShuo Chen  // FIXME: use compare and swap
10040161064SShuo Chen  if (state_ == kConnected)
10140161064SShuo Chen  {
10240161064SShuo Chen    setState(kDisconnecting);
10340161064SShuo Chen    // FIXME: shared_from_this()?
10440161064SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
10540161064SShuo Chen  }
10640161064SShuo Chen}
10740161064SShuo Chen
10840161064SShuo Chenvoid TcpConnection::shutdownInLoop()
10940161064SShuo Chen{
11040161064SShuo Chen  loop_->assertInLoopThread();
11140161064SShuo Chen  if (!channel_->isWriting())
11240161064SShuo Chen  {
11340161064SShuo Chen    // we are not writing
11440161064SShuo Chen    socket_->shutdownWrite();
11540161064SShuo Chen  }
11640161064SShuo Chen}
11740161064SShuo Chen
11840161064SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on)
11940161064SShuo Chen{
12040161064SShuo Chen  socket_->setTcpNoDelay(on);
12140161064SShuo Chen}
12240161064SShuo Chen
12340161064SShuo Chenvoid TcpConnection::connectEstablished()
12440161064SShuo Chen{
12540161064SShuo Chen  loop_->assertInLoopThread();
12640161064SShuo Chen  assert(state_ == kConnecting);
12740161064SShuo Chen  setState(kConnected);
12840161064SShuo Chen  channel_->enableReading();
12940161064SShuo Chen  connectionCallback_(shared_from_this());
13040161064SShuo Chen}
13140161064SShuo Chen
13240161064SShuo Chenvoid TcpConnection::connectDestroyed()
13340161064SShuo Chen{
13440161064SShuo Chen  loop_->assertInLoopThread();
13540161064SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
13640161064SShuo Chen  setState(kDisconnected);
13740161064SShuo Chen  channel_->disableAll();
13840161064SShuo Chen  connectionCallback_(shared_from_this());
13940161064SShuo Chen
14040161064SShuo Chen  loop_->removeChannel(get_pointer(channel_));
14140161064SShuo Chen}
14240161064SShuo Chen
14340161064SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
14440161064SShuo Chen{
14540161064SShuo Chen  int savedErrno = 0;
14640161064SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
14740161064SShuo Chen  if (n > 0) {
14840161064SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
14940161064SShuo Chen  } else if (n == 0) {
15040161064SShuo Chen    handleClose();
15140161064SShuo Chen  } else {
15240161064SShuo Chen    errno = savedErrno;
15340161064SShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
15440161064SShuo Chen    handleError();
15540161064SShuo Chen  }
15640161064SShuo Chen}
15740161064SShuo Chen
15840161064SShuo Chenvoid TcpConnection::handleWrite()
15940161064SShuo Chen{
16040161064SShuo Chen  loop_->assertInLoopThread();
16140161064SShuo Chen  if (channel_->isWriting()) {
16240161064SShuo Chen    ssize_t n = ::write(channel_->fd(),
16340161064SShuo Chen                        outputBuffer_.peek(),
16440161064SShuo Chen                        outputBuffer_.readableBytes());
16540161064SShuo Chen    if (n > 0) {
16640161064SShuo Chen      outputBuffer_.retrieve(n);
16740161064SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
16840161064SShuo Chen        channel_->disableWriting();
16940161064SShuo Chen        if (writeCompleteCallback_) {
17040161064SShuo Chen          loop_->queueInLoop(
17140161064SShuo Chen              boost::bind(writeCompleteCallback_, shared_from_this()));
17240161064SShuo Chen        }
17340161064SShuo Chen        if (state_ == kDisconnecting) {
17440161064SShuo Chen          shutdownInLoop();
17540161064SShuo Chen        }
17640161064SShuo Chen      } else {
17740161064SShuo Chen        LOG_TRACE << "I am going to write more data";
17840161064SShuo Chen      }
17940161064SShuo Chen    } else {
18040161064SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
18140161064SShuo Chen    }
18240161064SShuo Chen  } else {
18340161064SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
18440161064SShuo Chen  }
18540161064SShuo Chen}
18640161064SShuo Chen
18740161064SShuo Chenvoid TcpConnection::handleClose()
18840161064SShuo Chen{
18940161064SShuo Chen  loop_->assertInLoopThread();
19040161064SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
19140161064SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
19240161064SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
19340161064SShuo Chen  channel_->disableAll();
19440161064SShuo Chen  // must be the last line
19540161064SShuo Chen  closeCallback_(shared_from_this());
19640161064SShuo Chen}
19740161064SShuo Chen
19840161064SShuo Chenvoid TcpConnection::handleError()
19940161064SShuo Chen{
20040161064SShuo Chen  int err = sockets::getSocketError(channel_->fd());
20140161064SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
20240161064SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
20340161064SShuo Chen}
204