1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#include "TcpConnection.h"
9a1bde736SShuo Chen
10a1bde736SShuo Chen#include "logging/Logging.h"
11a1bde736SShuo Chen#include "Channel.h"
12a1bde736SShuo Chen#include "EventLoop.h"
13a1bde736SShuo Chen#include "Socket.h"
14a1bde736SShuo Chen#include "SocketsOps.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen
18a1bde736SShuo Chen#include <errno.h>
19a1bde736SShuo Chen#include <stdio.h>
20a1bde736SShuo Chen
21a1bde736SShuo Chenusing namespace muduo;
22a1bde736SShuo Chen
23a1bde736SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
24a1bde736SShuo Chen                             const std::string& nameArg,
25a1bde736SShuo Chen                             int sockfd,
26a1bde736SShuo Chen                             const InetAddress& localAddr,
27a1bde736SShuo Chen                             const InetAddress& peerAddr)
28a1bde736SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
29a1bde736SShuo Chen    name_(nameArg),
30a1bde736SShuo Chen    state_(kConnecting),
31a1bde736SShuo Chen    socket_(new Socket(sockfd)),
32a1bde736SShuo Chen    channel_(new Channel(loop, sockfd)),
33a1bde736SShuo Chen    localAddr_(localAddr),
34a1bde736SShuo Chen    peerAddr_(peerAddr)
35a1bde736SShuo Chen{
36a1bde736SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37a1bde736SShuo Chen            << " fd=" << sockfd;
38a1bde736SShuo Chen  channel_->setReadCallback(
39a1bde736SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
40a1bde736SShuo Chen  channel_->setWriteCallback(
41a1bde736SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
42a1bde736SShuo Chen  channel_->setCloseCallback(
43a1bde736SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
44a1bde736SShuo Chen  channel_->setErrorCallback(
45a1bde736SShuo Chen      boost::bind(&TcpConnection::handleError, this));
46a1bde736SShuo Chen}
47a1bde736SShuo Chen
48a1bde736SShuo ChenTcpConnection::~TcpConnection()
49a1bde736SShuo Chen{
50a1bde736SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51a1bde736SShuo Chen            << " fd=" << channel_->fd();
52a1bde736SShuo Chen}
53a1bde736SShuo Chen
54a1bde736SShuo Chenvoid TcpConnection::send(const std::string& message)
55a1bde736SShuo Chen{
56a1bde736SShuo Chen  if (state_ == kConnected) {
57a1bde736SShuo Chen    if (loop_->isInLoopThread()) {
58a1bde736SShuo Chen      sendInLoop(message);
59a1bde736SShuo Chen    } else {
60a1bde736SShuo Chen      loop_->runInLoop(
61a1bde736SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62a1bde736SShuo Chen    }
63a1bde736SShuo Chen  }
64a1bde736SShuo Chen}
65a1bde736SShuo Chen
66a1bde736SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67a1bde736SShuo Chen{
68a1bde736SShuo Chen  loop_->assertInLoopThread();
69a1bde736SShuo Chen  ssize_t nwrote = 0;
70a1bde736SShuo Chen  // if no thing in output queue, try writing directly
71a1bde736SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72a1bde736SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73a1bde736SShuo Chen    if (nwrote >= 0) {
74a1bde736SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75a1bde736SShuo Chen        LOG_TRACE << "I am going to write more data";
76a1bde736SShuo Chen      } else if (writeCompleteCallback_) {
77a1bde736SShuo Chen        loop_->queueInLoop(
78a1bde736SShuo Chen            boost::bind(writeCompleteCallback_, shared_from_this()));
79a1bde736SShuo Chen      }
80a1bde736SShuo Chen    } else {
81a1bde736SShuo Chen      nwrote = 0;
82a1bde736SShuo Chen      if (errno != EWOULDBLOCK) {
83a1bde736SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
84a1bde736SShuo Chen      }
85a1bde736SShuo Chen    }
86a1bde736SShuo Chen  }
87a1bde736SShuo Chen
88a1bde736SShuo Chen  assert(nwrote >= 0);
89a1bde736SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
90a1bde736SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
91a1bde736SShuo Chen    if (!channel_->isWriting()) {
92a1bde736SShuo Chen      channel_->enableWriting();
93a1bde736SShuo Chen    }
94a1bde736SShuo Chen  }
95a1bde736SShuo Chen}
96a1bde736SShuo Chen
97a1bde736SShuo Chenvoid TcpConnection::shutdown()
98a1bde736SShuo Chen{
99a1bde736SShuo Chen  // FIXME: use compare and swap
100a1bde736SShuo Chen  if (state_ == kConnected)
101a1bde736SShuo Chen  {
102a1bde736SShuo Chen    setState(kDisconnecting);
103a1bde736SShuo Chen    // FIXME: shared_from_this()?
104a1bde736SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
105a1bde736SShuo Chen  }
106a1bde736SShuo Chen}
107a1bde736SShuo Chen
108a1bde736SShuo Chenvoid TcpConnection::shutdownInLoop()
109a1bde736SShuo Chen{
110a1bde736SShuo Chen  loop_->assertInLoopThread();
111a1bde736SShuo Chen  if (!channel_->isWriting())
112a1bde736SShuo Chen  {
113a1bde736SShuo Chen    // we are not writing
114a1bde736SShuo Chen    socket_->shutdownWrite();
115a1bde736SShuo Chen  }
116a1bde736SShuo Chen}
117a1bde736SShuo Chen
118a1bde736SShuo Chenvoid TcpConnection::setTcpNoDelay(bool on)
119a1bde736SShuo Chen{
120a1bde736SShuo Chen  socket_->setTcpNoDelay(on);
121a1bde736SShuo Chen}
122a1bde736SShuo Chen
123a1bde736SShuo Chenvoid TcpConnection::connectEstablished()
124a1bde736SShuo Chen{
125a1bde736SShuo Chen  loop_->assertInLoopThread();
126a1bde736SShuo Chen  assert(state_ == kConnecting);
127a1bde736SShuo Chen  setState(kConnected);
128a1bde736SShuo Chen  channel_->enableReading();
129a1bde736SShuo Chen  connectionCallback_(shared_from_this());
130a1bde736SShuo Chen}
131a1bde736SShuo Chen
132a1bde736SShuo Chenvoid TcpConnection::connectDestroyed()
133a1bde736SShuo Chen{
134a1bde736SShuo Chen  loop_->assertInLoopThread();
135a1bde736SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
136a1bde736SShuo Chen  setState(kDisconnected);
137a1bde736SShuo Chen  channel_->disableAll();
138a1bde736SShuo Chen  connectionCallback_(shared_from_this());
139a1bde736SShuo Chen
140a1bde736SShuo Chen  loop_->removeChannel(get_pointer(channel_));
141a1bde736SShuo Chen}
142a1bde736SShuo Chen
143a1bde736SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
144a1bde736SShuo Chen{
145a1bde736SShuo Chen  int savedErrno = 0;
146a1bde736SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
147a1bde736SShuo Chen  if (n > 0) {
148a1bde736SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
149a1bde736SShuo Chen  } else if (n == 0) {
150a1bde736SShuo Chen    handleClose();
151a1bde736SShuo Chen  } else {
152a1bde736SShuo Chen    errno = savedErrno;
153a1bde736SShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
154a1bde736SShuo Chen    handleError();
155a1bde736SShuo Chen  }
156a1bde736SShuo Chen}
157a1bde736SShuo Chen
158a1bde736SShuo Chenvoid TcpConnection::handleWrite()
159a1bde736SShuo Chen{
160a1bde736SShuo Chen  loop_->assertInLoopThread();
161a1bde736SShuo Chen  if (channel_->isWriting()) {
162a1bde736SShuo Chen    ssize_t n = ::write(channel_->fd(),
163a1bde736SShuo Chen                        outputBuffer_.peek(),
164a1bde736SShuo Chen                        outputBuffer_.readableBytes());
165a1bde736SShuo Chen    if (n > 0) {
166a1bde736SShuo Chen      outputBuffer_.retrieve(n);
167a1bde736SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
168a1bde736SShuo Chen        channel_->disableWriting();
169a1bde736SShuo Chen        if (writeCompleteCallback_) {
170a1bde736SShuo Chen          loop_->queueInLoop(
171a1bde736SShuo Chen              boost::bind(writeCompleteCallback_, shared_from_this()));
172a1bde736SShuo Chen        }
173a1bde736SShuo Chen        if (state_ == kDisconnecting) {
174a1bde736SShuo Chen          shutdownInLoop();
175a1bde736SShuo Chen        }
176a1bde736SShuo Chen      } else {
177a1bde736SShuo Chen        LOG_TRACE << "I am going to write more data";
178a1bde736SShuo Chen      }
179a1bde736SShuo Chen    } else {
180a1bde736SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
181a1bde736SShuo Chen    }
182a1bde736SShuo Chen  } else {
183a1bde736SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
184a1bde736SShuo Chen  }
185a1bde736SShuo Chen}
186a1bde736SShuo Chen
187a1bde736SShuo Chenvoid TcpConnection::handleClose()
188a1bde736SShuo Chen{
189a1bde736SShuo Chen  loop_->assertInLoopThread();
190a1bde736SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
191a1bde736SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
192a1bde736SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
193a1bde736SShuo Chen  channel_->disableAll();
194a1bde736SShuo Chen  // must be the last line
195a1bde736SShuo Chen  closeCallback_(shared_from_this());
196a1bde736SShuo Chen}
197a1bde736SShuo Chen
198a1bde736SShuo Chenvoid TcpConnection::handleError()
199a1bde736SShuo Chen{
200a1bde736SShuo Chen  int err = sockets::getSocketError(channel_->fd());
201a1bde736SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
202a1bde736SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
203a1bde736SShuo Chen}
204