1e254a845SShuo Chen// excerpts from http://code.google.com/p/muduo/
2e254a845SShuo Chen//
3e254a845SShuo Chen// Use of this source code is governed by a BSD-style license
4e254a845SShuo Chen// that can be found in the License file.
5e254a845SShuo Chen//
6e254a845SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7e254a845SShuo Chen
8e254a845SShuo Chen#include "TcpConnection.h"
9e254a845SShuo Chen
10e254a845SShuo Chen#include "logging/Logging.h"
11e254a845SShuo Chen#include "Channel.h"
12e254a845SShuo Chen#include "EventLoop.h"
13e254a845SShuo Chen#include "Socket.h"
14e254a845SShuo Chen#include "SocketsOps.h"
15e254a845SShuo Chen
16e254a845SShuo Chen#include <boost/bind.hpp>
17e254a845SShuo Chen
18e254a845SShuo Chen#include <errno.h>
19e254a845SShuo Chen#include <stdio.h>
20e254a845SShuo Chen
21e254a845SShuo Chenusing namespace muduo;
22e254a845SShuo Chen
23e254a845SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
24e254a845SShuo Chen                             const std::string& nameArg,
25e254a845SShuo Chen                             int sockfd,
26e254a845SShuo Chen                             const InetAddress& localAddr,
27e254a845SShuo Chen                             const InetAddress& peerAddr)
28e254a845SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
29e254a845SShuo Chen    name_(nameArg),
30e254a845SShuo Chen    state_(kConnecting),
31e254a845SShuo Chen    socket_(new Socket(sockfd)),
32e254a845SShuo Chen    channel_(new Channel(loop, sockfd)),
33e254a845SShuo Chen    localAddr_(localAddr),
34e254a845SShuo Chen    peerAddr_(peerAddr)
35e254a845SShuo Chen{
36e254a845SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37e254a845SShuo Chen            << " fd=" << sockfd;
38e254a845SShuo Chen  channel_->setReadCallback(
39e254a845SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
40e254a845SShuo Chen  channel_->setWriteCallback(
41e254a845SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
42e254a845SShuo Chen  channel_->setCloseCallback(
43e254a845SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
44e254a845SShuo Chen  channel_->setErrorCallback(
45e254a845SShuo Chen      boost::bind(&TcpConnection::handleError, this));
46e254a845SShuo Chen}
47e254a845SShuo Chen
48e254a845SShuo ChenTcpConnection::~TcpConnection()
49e254a845SShuo Chen{
50e254a845SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51e254a845SShuo Chen            << " fd=" << channel_->fd();
52e254a845SShuo Chen}
53e254a845SShuo Chen
54e254a845SShuo Chenvoid TcpConnection::send(const std::string& message)
55e254a845SShuo Chen{
56e254a845SShuo Chen  if (state_ == kConnected) {
57e254a845SShuo Chen    if (loop_->isInLoopThread()) {
58e254a845SShuo Chen      sendInLoop(message);
59e254a845SShuo Chen    } else {
60e254a845SShuo Chen      loop_->runInLoop(
61e254a845SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62e254a845SShuo Chen    }
63e254a845SShuo Chen  }
64e254a845SShuo Chen}
65e254a845SShuo Chen
66e254a845SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67e254a845SShuo Chen{
68e254a845SShuo Chen  loop_->assertInLoopThread();
69e254a845SShuo Chen  ssize_t nwrote = 0;
70e254a845SShuo Chen  // if no thing in output queue, try writing directly
71e254a845SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72e254a845SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73e254a845SShuo Chen    if (nwrote >= 0) {
74e254a845SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75e254a845SShuo Chen        LOG_TRACE << "I am going to write more data";
76e254a845SShuo Chen      } else if (writeCompleteCallback_) {
77e254a845SShuo Chen        loop_->queueInLoop(
78e254a845SShuo Chen            boost::bind(writeCompleteCallback_, shared_from_this()));
79e254a845SShuo Chen      }
80e254a845SShuo Chen    } else {
81e254a845SShuo Chen      nwrote = 0;
82e254a845SShuo Chen      if (errno != EWOULDBLOCK) {
83e254a845SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
84e254a845SShuo Chen      }
85e254a845SShuo Chen    }
86e254a845SShuo Chen  }
87e254a845SShuo Chen
88e254a845SShuo Chen  assert(nwrote >= 0);
89e254a845SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
90e254a845SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
91e254a845SShuo Chen    if (!channel_->isWriting()) {
92e254a845SShuo Chen      channel_->enableWriting();
93e254a845SShuo Chen    }
94e254a845SShuo Chen  }
95e254a845SShuo Chen}
96e254a845SShuo Chen
97e254a845SShuo Chenvoid TcpConnection::shutdown()
98e254a845SShuo Chen{
99e254a845SShuo Chen  // FIXME: use compare and swap
100e254a845SShuo Chen  if (state_ == kConnected)
101e254a845SShuo Chen  {
102e254a845SShuo Chen    setState(kDisconnecting);
103e254a845SShuo Chen    // FIXME: shared_from_this()?
104e254a845SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
105e254a845SShuo Chen  }
106e254a845SShuo Chen}
107e254a845SShuo Chen
108e254a845SShuo Chenvoid TcpConnection::shutdownInLoop()
109e254a845SShuo Chen{
110e254a845SShuo Chen  loop_->assertInLoopThread();
111e254a845SShuo Chen  if (!channel_->isWriting())
112e254a845SShuo Chen  {
113e254a845SShuo Chen    // we are not writing
114e254a845SShuo Chen    socket_->shutdownWrite();
115e254a845SShuo Chen  }
116e254a845SShuo Chen}
117e254a845SShuo Chen
118e254a845SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on)
119e254a845SShuo Chen{
120e254a845SShuo Chen  socket_->setTcpNoDelay(on);
121e254a845SShuo Chen}
122e254a845SShuo Chen
123e254a845SShuo Chenvoid TcpConnection::connectEstablished()
124e254a845SShuo Chen{
125e254a845SShuo Chen  loop_->assertInLoopThread();
126e254a845SShuo Chen  assert(state_ == kConnecting);
127e254a845SShuo Chen  setState(kConnected);
128e254a845SShuo Chen  channel_->enableReading();
129e254a845SShuo Chen  connectionCallback_(shared_from_this());
130e254a845SShuo Chen}
131e254a845SShuo Chen
132e254a845SShuo Chenvoid TcpConnection::connectDestroyed()
133e254a845SShuo Chen{
134e254a845SShuo Chen  loop_->assertInLoopThread();
135e254a845SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
136e254a845SShuo Chen  setState(kDisconnected);
137e254a845SShuo Chen  channel_->disableAll();
138e254a845SShuo Chen  connectionCallback_(shared_from_this());
139e254a845SShuo Chen
140e254a845SShuo Chen  loop_->removeChannel(get_pointer(channel_));
141e254a845SShuo Chen}
142e254a845SShuo Chen
143e254a845SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
144e254a845SShuo Chen{
145e254a845SShuo Chen  int savedErrno = 0;
146e254a845SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
147e254a845SShuo Chen  if (n > 0) {
148e254a845SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
149e254a845SShuo Chen  } else if (n == 0) {
150e254a845SShuo Chen    handleClose();
151e254a845SShuo Chen  } else {
1520dd528a5SShuo Chen    errno = savedErrno;
1530dd528a5SShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
154e254a845SShuo Chen    handleError();
155e254a845SShuo Chen  }
156e254a845SShuo Chen}
157e254a845SShuo Chen
158e254a845SShuo Chenvoid TcpConnection::handleWrite()
159e254a845SShuo Chen{
160e254a845SShuo Chen  loop_->assertInLoopThread();
161e254a845SShuo Chen  if (channel_->isWriting()) {
162e254a845SShuo Chen    ssize_t n = ::write(channel_->fd(),
163e254a845SShuo Chen                        outputBuffer_.peek(),
164e254a845SShuo Chen                        outputBuffer_.readableBytes());
165e254a845SShuo Chen    if (n > 0) {
166e254a845SShuo Chen      outputBuffer_.retrieve(n);
167e254a845SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
168e254a845SShuo Chen        channel_->disableWriting();
169e254a845SShuo Chen        if (writeCompleteCallback_) {
170e254a845SShuo Chen          loop_->queueInLoop(
171e254a845SShuo Chen              boost::bind(writeCompleteCallback_, shared_from_this()));
172e254a845SShuo Chen        }
173e254a845SShuo Chen        if (state_ == kDisconnecting) {
174e254a845SShuo Chen          shutdownInLoop();
175e254a845SShuo Chen        }
176e254a845SShuo Chen      } else {
177e254a845SShuo Chen        LOG_TRACE << "I am going to write more data";
178e254a845SShuo Chen      }
179e254a845SShuo Chen    } else {
180e254a845SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
181e254a845SShuo Chen    }
182e254a845SShuo Chen  } else {
183e254a845SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
184e254a845SShuo Chen  }
185e254a845SShuo Chen}
186e254a845SShuo Chen
187e254a845SShuo Chenvoid TcpConnection::handleClose()
188e254a845SShuo Chen{
189e254a845SShuo Chen  loop_->assertInLoopThread();
190e254a845SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
191e254a845SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
192e254a845SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
193e254a845SShuo Chen  channel_->disableAll();
194e254a845SShuo Chen  // must be the last line
195e254a845SShuo Chen  closeCallback_(shared_from_this());
196e254a845SShuo Chen}
197e254a845SShuo Chen
198e254a845SShuo Chenvoid TcpConnection::handleError()
199e254a845SShuo Chen{
200e254a845SShuo Chen  int err = sockets::getSocketError(channel_->fd());
201e254a845SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
202e254a845SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
203e254a845SShuo Chen}
204