1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/
2b37003a7SShuo Chen//
3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license
4b37003a7SShuo Chen// that can be found in the License file.
5b37003a7SShuo Chen//
6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7b37003a7SShuo Chen
8b37003a7SShuo Chen#include "TcpConnection.h"
9b37003a7SShuo Chen
10b37003a7SShuo Chen#include "logging/Logging.h"
11b37003a7SShuo Chen#include "Channel.h"
12b37003a7SShuo Chen#include "EventLoop.h"
13b37003a7SShuo Chen#include "Socket.h"
14b37003a7SShuo Chen#include "SocketsOps.h"
15b37003a7SShuo Chen
16b37003a7SShuo Chen#include <boost/bind.hpp>
17b37003a7SShuo Chen
18b37003a7SShuo Chen#include <errno.h>
19b37003a7SShuo Chen#include <stdio.h>
20b37003a7SShuo Chen
21b37003a7SShuo Chenusing namespace muduo;
22b37003a7SShuo Chen
23b37003a7SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
24b37003a7SShuo Chen                             const std::string& nameArg,
25b37003a7SShuo Chen                             int sockfd,
26b37003a7SShuo Chen                             const InetAddress& localAddr,
27b37003a7SShuo Chen                             const InetAddress& peerAddr)
28b37003a7SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
29b37003a7SShuo Chen    name_(nameArg),
30b37003a7SShuo Chen    state_(kConnecting),
31b37003a7SShuo Chen    socket_(new Socket(sockfd)),
32b37003a7SShuo Chen    channel_(new Channel(loop, sockfd)),
33b37003a7SShuo Chen    localAddr_(localAddr),
34b37003a7SShuo Chen    peerAddr_(peerAddr)
35b37003a7SShuo Chen{
36b37003a7SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37b37003a7SShuo Chen            << " fd=" << sockfd;
38b37003a7SShuo Chen  channel_->setReadCallback(
39b37003a7SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
40b37003a7SShuo Chen  channel_->setWriteCallback(
41b37003a7SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
42b37003a7SShuo Chen  channel_->setCloseCallback(
43b37003a7SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
44b37003a7SShuo Chen  channel_->setErrorCallback(
45b37003a7SShuo Chen      boost::bind(&TcpConnection::handleError, this));
46b37003a7SShuo Chen}
47b37003a7SShuo Chen
48b37003a7SShuo ChenTcpConnection::~TcpConnection()
49b37003a7SShuo Chen{
50b37003a7SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51b37003a7SShuo Chen            << " fd=" << channel_->fd();
52b37003a7SShuo Chen}
53b37003a7SShuo Chen
54b37003a7SShuo Chenvoid TcpConnection::send(const std::string& message)
55b37003a7SShuo Chen{
56b37003a7SShuo Chen  if (state_ == kConnected) {
57b37003a7SShuo Chen    if (loop_->isInLoopThread()) {
58b37003a7SShuo Chen      sendInLoop(message);
59b37003a7SShuo Chen    } else {
60b37003a7SShuo Chen      loop_->runInLoop(
61b37003a7SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62b37003a7SShuo Chen    }
63b37003a7SShuo Chen  }
64b37003a7SShuo Chen}
65b37003a7SShuo Chen
66b37003a7SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67b37003a7SShuo Chen{
68b37003a7SShuo Chen  loop_->assertInLoopThread();
69b37003a7SShuo Chen  ssize_t nwrote = 0;
70b37003a7SShuo Chen  // if no thing in output queue, try writing directly
71b37003a7SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72b37003a7SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73b37003a7SShuo Chen    if (nwrote >= 0) {
74b37003a7SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75b37003a7SShuo Chen        LOG_TRACE << "I am going to write more data";
7613e937ffSShuo Chen      } else if (writeCompleteCallback_) {
7713e937ffSShuo Chen        loop_->queueInLoop(
7813e937ffSShuo Chen            boost::bind(writeCompleteCallback_, shared_from_this()));
79b37003a7SShuo Chen      }
80b37003a7SShuo Chen    } else {
81b37003a7SShuo Chen      nwrote = 0;
82b37003a7SShuo Chen      if (errno != EWOULDBLOCK) {
83b37003a7SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
84b37003a7SShuo Chen      }
85b37003a7SShuo Chen    }
86b37003a7SShuo Chen  }
87b37003a7SShuo Chen
88b37003a7SShuo Chen  assert(nwrote >= 0);
89b37003a7SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
90b37003a7SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
91b37003a7SShuo Chen    if (!channel_->isWriting()) {
92b37003a7SShuo Chen      channel_->enableWriting();
93b37003a7SShuo Chen    }
94b37003a7SShuo Chen  }
95b37003a7SShuo Chen}
96b37003a7SShuo Chen
97b37003a7SShuo Chenvoid TcpConnection::shutdown()
98b37003a7SShuo Chen{
99b37003a7SShuo Chen  // FIXME: use compare and swap
100b37003a7SShuo Chen  if (state_ == kConnected)
101b37003a7SShuo Chen  {
102b37003a7SShuo Chen    setState(kDisconnecting);
103b37003a7SShuo Chen    // FIXME: shared_from_this()?
104b37003a7SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
105b37003a7SShuo Chen  }
106b37003a7SShuo Chen}
107b37003a7SShuo Chen
108b37003a7SShuo Chenvoid TcpConnection::shutdownInLoop()
109b37003a7SShuo Chen{
110b37003a7SShuo Chen  loop_->assertInLoopThread();
111b37003a7SShuo Chen  if (!channel_->isWriting())
112b37003a7SShuo Chen  {
113b37003a7SShuo Chen    // we are not writing
114b37003a7SShuo Chen    socket_->shutdownWrite();
115b37003a7SShuo Chen  }
116b37003a7SShuo Chen}
117b37003a7SShuo Chen
11813e937ffSShuo Chenvoid TcpConnection::setTcpNoDelay(bool on)
11913e937ffSShuo Chen{
12013e937ffSShuo Chen  socket_->setTcpNoDelay(on);
12113e937ffSShuo Chen}
12213e937ffSShuo Chen
123b37003a7SShuo Chenvoid TcpConnection::connectEstablished()
124b37003a7SShuo Chen{
125b37003a7SShuo Chen  loop_->assertInLoopThread();
126b37003a7SShuo Chen  assert(state_ == kConnecting);
127b37003a7SShuo Chen  setState(kConnected);
128b37003a7SShuo Chen  channel_->enableReading();
129b37003a7SShuo Chen  connectionCallback_(shared_from_this());
130b37003a7SShuo Chen}
131b37003a7SShuo Chen
132b37003a7SShuo Chenvoid TcpConnection::connectDestroyed()
133b37003a7SShuo Chen{
134b37003a7SShuo Chen  loop_->assertInLoopThread();
135b37003a7SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
136b37003a7SShuo Chen  setState(kDisconnected);
137b37003a7SShuo Chen  channel_->disableAll();
138b37003a7SShuo Chen  connectionCallback_(shared_from_this());
139b37003a7SShuo Chen
140b37003a7SShuo Chen  loop_->removeChannel(get_pointer(channel_));
141b37003a7SShuo Chen}
142b37003a7SShuo Chen
143b37003a7SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
144b37003a7SShuo Chen{
145b37003a7SShuo Chen  int savedErrno = 0;
146b37003a7SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
147b37003a7SShuo Chen  if (n > 0) {
148b37003a7SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
149b37003a7SShuo Chen  } else if (n == 0) {
150b37003a7SShuo Chen    handleClose();
151b37003a7SShuo Chen  } else {
1520dd528a5SShuo Chen    errno = savedErrno;
1530dd528a5SShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
154b37003a7SShuo Chen    handleError();
155b37003a7SShuo Chen  }
156b37003a7SShuo Chen}
157b37003a7SShuo Chen
158b37003a7SShuo Chenvoid TcpConnection::handleWrite()
159b37003a7SShuo Chen{
160b37003a7SShuo Chen  loop_->assertInLoopThread();
161b37003a7SShuo Chen  if (channel_->isWriting()) {
162b37003a7SShuo Chen    ssize_t n = ::write(channel_->fd(),
163b37003a7SShuo Chen                        outputBuffer_.peek(),
164b37003a7SShuo Chen                        outputBuffer_.readableBytes());
165b37003a7SShuo Chen    if (n > 0) {
166b37003a7SShuo Chen      outputBuffer_.retrieve(n);
167b37003a7SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
168b37003a7SShuo Chen        channel_->disableWriting();
16913e937ffSShuo Chen        if (writeCompleteCallback_) {
17013e937ffSShuo Chen          loop_->queueInLoop(
17113e937ffSShuo Chen              boost::bind(writeCompleteCallback_, shared_from_this()));
17213e937ffSShuo Chen        }
173b37003a7SShuo Chen        if (state_ == kDisconnecting) {
174b37003a7SShuo Chen          shutdownInLoop();
175b37003a7SShuo Chen        }
176b37003a7SShuo Chen      } else {
177b37003a7SShuo Chen        LOG_TRACE << "I am going to write more data";
178b37003a7SShuo Chen      }
179b37003a7SShuo Chen    } else {
180b37003a7SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
181b37003a7SShuo Chen    }
182b37003a7SShuo Chen  } else {
183b37003a7SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
184b37003a7SShuo Chen  }
185b37003a7SShuo Chen}
186b37003a7SShuo Chen
187b37003a7SShuo Chenvoid TcpConnection::handleClose()
188b37003a7SShuo Chen{
189b37003a7SShuo Chen  loop_->assertInLoopThread();
190b37003a7SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
191b37003a7SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
192b37003a7SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
193b37003a7SShuo Chen  channel_->disableAll();
194b37003a7SShuo Chen  // must be the last line
195b37003a7SShuo Chen  closeCallback_(shared_from_this());
196b37003a7SShuo Chen}
197b37003a7SShuo Chen
198b37003a7SShuo Chenvoid TcpConnection::handleError()
199b37003a7SShuo Chen{
200b37003a7SShuo Chen  int err = sockets::getSocketError(channel_->fd());
201b37003a7SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
202b37003a7SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
203b37003a7SShuo Chen}
204