TcpConnection.cc revision 0dd528a5
12a18e699SShuo Chen// excerpts from http://code.google.com/p/muduo/
22a18e699SShuo Chen//
32a18e699SShuo Chen// Use of this source code is governed by a BSD-style license
42a18e699SShuo Chen// that can be found in the License file.
52a18e699SShuo Chen//
62a18e699SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
72a18e699SShuo Chen
82a18e699SShuo Chen#include "TcpConnection.h"
92a18e699SShuo Chen
102a18e699SShuo Chen#include "logging/Logging.h"
112a18e699SShuo Chen#include "Channel.h"
122a18e699SShuo Chen#include "EventLoop.h"
132a18e699SShuo Chen#include "Socket.h"
142a18e699SShuo Chen#include "SocketsOps.h"
152a18e699SShuo Chen
162a18e699SShuo Chen#include <boost/bind.hpp>
172a18e699SShuo Chen
182a18e699SShuo Chen#include <errno.h>
192a18e699SShuo Chen#include <stdio.h>
202a18e699SShuo Chen
212a18e699SShuo Chenusing namespace muduo;
222a18e699SShuo Chen
232a18e699SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
242a18e699SShuo Chen                             const std::string& nameArg,
252a18e699SShuo Chen                             int sockfd,
262a18e699SShuo Chen                             const InetAddress& localAddr,
272a18e699SShuo Chen                             const InetAddress& peerAddr)
282a18e699SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
292a18e699SShuo Chen    name_(nameArg),
302a18e699SShuo Chen    state_(kConnecting),
312a18e699SShuo Chen    socket_(new Socket(sockfd)),
322a18e699SShuo Chen    channel_(new Channel(loop, sockfd)),
332a18e699SShuo Chen    localAddr_(localAddr),
342a18e699SShuo Chen    peerAddr_(peerAddr)
352a18e699SShuo Chen{
362a18e699SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
372a18e699SShuo Chen            << " fd=" << sockfd;
382a18e699SShuo Chen  channel_->setReadCallback(
392a18e699SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
402a18e699SShuo Chen  channel_->setWriteCallback(
412a18e699SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
422a18e699SShuo Chen  channel_->setCloseCallback(
432a18e699SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
442a18e699SShuo Chen  channel_->setErrorCallback(
452a18e699SShuo Chen      boost::bind(&TcpConnection::handleError, this));
462a18e699SShuo Chen}
472a18e699SShuo Chen
482a18e699SShuo ChenTcpConnection::~TcpConnection()
492a18e699SShuo Chen{
502a18e699SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
512a18e699SShuo Chen            << " fd=" << channel_->fd();
522a18e699SShuo Chen}
532a18e699SShuo Chen
54129fe122SShuo Chenvoid TcpConnection::send(const std::string& message)
55129fe122SShuo Chen{
56129fe122SShuo Chen  if (state_ == kConnected) {
57129fe122SShuo Chen    if (loop_->isInLoopThread()) {
58129fe122SShuo Chen      sendInLoop(message);
59129fe122SShuo Chen    } else {
60129fe122SShuo Chen      loop_->runInLoop(
61129fe122SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62129fe122SShuo Chen    }
63129fe122SShuo Chen  }
64129fe122SShuo Chen}
65129fe122SShuo Chen
66129fe122SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67129fe122SShuo Chen{
68129fe122SShuo Chen  loop_->assertInLoopThread();
69129fe122SShuo Chen  ssize_t nwrote = 0;
70129fe122SShuo Chen  // if no thing in output queue, try writing directly
71129fe122SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72129fe122SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73129fe122SShuo Chen    if (nwrote >= 0) {
74129fe122SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75129fe122SShuo Chen        LOG_TRACE << "I am going to write more data";
76129fe122SShuo Chen      }
77129fe122SShuo Chen    } else {
78129fe122SShuo Chen      nwrote = 0;
79129fe122SShuo Chen      if (errno != EWOULDBLOCK) {
80129fe122SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
81129fe122SShuo Chen      }
82129fe122SShuo Chen    }
83129fe122SShuo Chen  }
84129fe122SShuo Chen
85129fe122SShuo Chen  assert(nwrote >= 0);
86129fe122SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
87129fe122SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
88129fe122SShuo Chen    if (!channel_->isWriting()) {
89129fe122SShuo Chen      channel_->enableWriting();
90129fe122SShuo Chen    }
91129fe122SShuo Chen  }
92129fe122SShuo Chen}
93129fe122SShuo Chen
94129fe122SShuo Chenvoid TcpConnection::shutdown()
95129fe122SShuo Chen{
96129fe122SShuo Chen  // FIXME: use compare and swap
97129fe122SShuo Chen  if (state_ == kConnected)
98129fe122SShuo Chen  {
99129fe122SShuo Chen    setState(kDisconnecting);
100129fe122SShuo Chen    // FIXME: shared_from_this()?
101129fe122SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
102129fe122SShuo Chen  }
103129fe122SShuo Chen}
104129fe122SShuo Chen
105129fe122SShuo Chenvoid TcpConnection::shutdownInLoop()
106129fe122SShuo Chen{
107129fe122SShuo Chen  loop_->assertInLoopThread();
108129fe122SShuo Chen  if (!channel_->isWriting())
109129fe122SShuo Chen  {
110129fe122SShuo Chen    // we are not writing
111129fe122SShuo Chen    socket_->shutdownWrite();
112129fe122SShuo Chen  }
113129fe122SShuo Chen}
114129fe122SShuo Chen
1152a18e699SShuo Chenvoid TcpConnection::connectEstablished()
1162a18e699SShuo Chen{
1172a18e699SShuo Chen  loop_->assertInLoopThread();
1182a18e699SShuo Chen  assert(state_ == kConnecting);
1192a18e699SShuo Chen  setState(kConnected);
1202a18e699SShuo Chen  channel_->enableReading();
1212a18e699SShuo Chen  connectionCallback_(shared_from_this());
1222a18e699SShuo Chen}
1232a18e699SShuo Chen
1242a18e699SShuo Chenvoid TcpConnection::connectDestroyed()
1252a18e699SShuo Chen{
1262a18e699SShuo Chen  loop_->assertInLoopThread();
127129fe122SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
1282a18e699SShuo Chen  setState(kDisconnected);
1292a18e699SShuo Chen  channel_->disableAll();
1302a18e699SShuo Chen  connectionCallback_(shared_from_this());
1312a18e699SShuo Chen
1322a18e699SShuo Chen  loop_->removeChannel(get_pointer(channel_));
1332a18e699SShuo Chen}
1342a18e699SShuo Chen
1352a18e699SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
1362a18e699SShuo Chen{
1372a18e699SShuo Chen  int savedErrno = 0;
1382a18e699SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
1392a18e699SShuo Chen  if (n > 0) {
1402a18e699SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
1412a18e699SShuo Chen  } else if (n == 0) {
1422a18e699SShuo Chen    handleClose();
1432a18e699SShuo Chen  } else {
1440dd528a5SShuo Chen    errno = savedErrno;
1450dd528a5SShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
1462a18e699SShuo Chen    handleError();
1472a18e699SShuo Chen  }
1482a18e699SShuo Chen}
1492a18e699SShuo Chen
1502a18e699SShuo Chenvoid TcpConnection::handleWrite()
1512a18e699SShuo Chen{
152129fe122SShuo Chen  loop_->assertInLoopThread();
153129fe122SShuo Chen  if (channel_->isWriting()) {
154129fe122SShuo Chen    ssize_t n = ::write(channel_->fd(),
155129fe122SShuo Chen                        outputBuffer_.peek(),
156129fe122SShuo Chen                        outputBuffer_.readableBytes());
157129fe122SShuo Chen    if (n > 0) {
158129fe122SShuo Chen      outputBuffer_.retrieve(n);
159129fe122SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
160129fe122SShuo Chen        channel_->disableWriting();
161129fe122SShuo Chen        if (state_ == kDisconnecting) {
162129fe122SShuo Chen          shutdownInLoop();
163129fe122SShuo Chen        }
164129fe122SShuo Chen      } else {
165129fe122SShuo Chen        LOG_TRACE << "I am going to write more data";
166129fe122SShuo Chen      }
167129fe122SShuo Chen    } else {
168129fe122SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
169129fe122SShuo Chen      abort();  // FIXME
170129fe122SShuo Chen    }
171129fe122SShuo Chen  } else {
172129fe122SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
173129fe122SShuo Chen  }
1742a18e699SShuo Chen}
1752a18e699SShuo Chen
1762a18e699SShuo Chenvoid TcpConnection::handleClose()
1772a18e699SShuo Chen{
1782a18e699SShuo Chen  loop_->assertInLoopThread();
1792a18e699SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
180129fe122SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
1812a18e699SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
1822a18e699SShuo Chen  channel_->disableAll();
1832a18e699SShuo Chen  // must be the last line
1842a18e699SShuo Chen  closeCallback_(shared_from_this());
1852a18e699SShuo Chen}
1862a18e699SShuo Chen
1872a18e699SShuo Chenvoid TcpConnection::handleError()
1882a18e699SShuo Chen{
1892a18e699SShuo Chen  int err = sockets::getSocketError(channel_->fd());
1902a18e699SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
1912a18e699SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
1922a18e699SShuo Chen}
193