1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/
2354280cfSShuo Chen//
3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license
4354280cfSShuo Chen// that can be found in the License file.
5354280cfSShuo Chen//
6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7354280cfSShuo Chen
8354280cfSShuo Chen#include "TcpConnection.h"
9354280cfSShuo Chen
10354280cfSShuo Chen#include "logging/Logging.h"
11354280cfSShuo Chen#include "Channel.h"
12354280cfSShuo Chen#include "EventLoop.h"
13354280cfSShuo Chen#include "Socket.h"
14354280cfSShuo Chen#include "SocketsOps.h"
15354280cfSShuo Chen
16354280cfSShuo Chen#include <boost/bind.hpp>
17354280cfSShuo Chen
18354280cfSShuo Chen#include <errno.h>
19354280cfSShuo Chen#include <stdio.h>
20354280cfSShuo Chen
21354280cfSShuo Chenusing namespace muduo;
22354280cfSShuo Chen
23354280cfSShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
24354280cfSShuo Chen                             const std::string& nameArg,
25354280cfSShuo Chen                             int sockfd,
26354280cfSShuo Chen                             const InetAddress& localAddr,
27354280cfSShuo Chen                             const InetAddress& peerAddr)
28354280cfSShuo Chen  : loop_(CHECK_NOTNULL(loop)),
29354280cfSShuo Chen    name_(nameArg),
30354280cfSShuo Chen    state_(kConnecting),
31354280cfSShuo Chen    socket_(new Socket(sockfd)),
32354280cfSShuo Chen    channel_(new Channel(loop, sockfd)),
33354280cfSShuo Chen    localAddr_(localAddr),
34354280cfSShuo Chen    peerAddr_(peerAddr)
35354280cfSShuo Chen{
36354280cfSShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37354280cfSShuo Chen            << " fd=" << sockfd;
38354280cfSShuo Chen  channel_->setReadCallback(
39354280cfSShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
40354280cfSShuo Chen  channel_->setWriteCallback(
41354280cfSShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
42354280cfSShuo Chen  channel_->setCloseCallback(
43354280cfSShuo Chen      boost::bind(&TcpConnection::handleClose, this));
44354280cfSShuo Chen  channel_->setErrorCallback(
45354280cfSShuo Chen      boost::bind(&TcpConnection::handleError, this));
46354280cfSShuo Chen}
47354280cfSShuo Chen
48354280cfSShuo ChenTcpConnection::~TcpConnection()
49354280cfSShuo Chen{
50354280cfSShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51354280cfSShuo Chen            << " fd=" << channel_->fd();
52354280cfSShuo Chen}
53354280cfSShuo Chen
54354280cfSShuo Chenvoid TcpConnection::send(const std::string& message)
55354280cfSShuo Chen{
56354280cfSShuo Chen  if (state_ == kConnected) {
57354280cfSShuo Chen    if (loop_->isInLoopThread()) {
58354280cfSShuo Chen      sendInLoop(message);
59354280cfSShuo Chen    } else {
60354280cfSShuo Chen      loop_->runInLoop(
61354280cfSShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62354280cfSShuo Chen    }
63354280cfSShuo Chen  }
64354280cfSShuo Chen}
65354280cfSShuo Chen
66354280cfSShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67354280cfSShuo Chen{
68354280cfSShuo Chen  loop_->assertInLoopThread();
69354280cfSShuo Chen  ssize_t nwrote = 0;
70354280cfSShuo Chen  // if no thing in output queue, try writing directly
71354280cfSShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72354280cfSShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73354280cfSShuo Chen    if (nwrote >= 0) {
74354280cfSShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75354280cfSShuo Chen        LOG_TRACE << "I am going to write more data";
76354280cfSShuo Chen      } else if (writeCompleteCallback_) {
77354280cfSShuo Chen        loop_->queueInLoop(
78354280cfSShuo Chen            boost::bind(writeCompleteCallback_, shared_from_this()));
79354280cfSShuo Chen      }
80354280cfSShuo Chen    } else {
81354280cfSShuo Chen      nwrote = 0;
82354280cfSShuo Chen      if (errno != EWOULDBLOCK) {
83354280cfSShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
84354280cfSShuo Chen      }
85354280cfSShuo Chen    }
86354280cfSShuo Chen  }
87354280cfSShuo Chen
88354280cfSShuo Chen  assert(nwrote >= 0);
89354280cfSShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
90354280cfSShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
91354280cfSShuo Chen    if (!channel_->isWriting()) {
92354280cfSShuo Chen      channel_->enableWriting();
93354280cfSShuo Chen    }
94354280cfSShuo Chen  }
95354280cfSShuo Chen}
96354280cfSShuo Chen
97354280cfSShuo Chenvoid TcpConnection::shutdown()
98354280cfSShuo Chen{
99354280cfSShuo Chen  // FIXME: use compare and swap
100354280cfSShuo Chen  if (state_ == kConnected)
101354280cfSShuo Chen  {
102354280cfSShuo Chen    setState(kDisconnecting);
103354280cfSShuo Chen    // FIXME: shared_from_this()?
104354280cfSShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
105354280cfSShuo Chen  }
106354280cfSShuo Chen}
107354280cfSShuo Chen
108354280cfSShuo Chenvoid TcpConnection::shutdownInLoop()
109354280cfSShuo Chen{
110354280cfSShuo Chen  loop_->assertInLoopThread();
111354280cfSShuo Chen  if (!channel_->isWriting())
112354280cfSShuo Chen  {
113354280cfSShuo Chen    // we are not writing
114354280cfSShuo Chen    socket_->shutdownWrite();
115354280cfSShuo Chen  }
116354280cfSShuo Chen}
117354280cfSShuo Chen
118354280cfSShuo Chenvoid TcpConnection::setTcpNoDelay(bool on)
119354280cfSShuo Chen{
120354280cfSShuo Chen  socket_->setTcpNoDelay(on);
121354280cfSShuo Chen}
122354280cfSShuo Chen
123354280cfSShuo Chenvoid TcpConnection::connectEstablished()
124354280cfSShuo Chen{
125354280cfSShuo Chen  loop_->assertInLoopThread();
126354280cfSShuo Chen  assert(state_ == kConnecting);
127354280cfSShuo Chen  setState(kConnected);
128354280cfSShuo Chen  channel_->enableReading();
129354280cfSShuo Chen  connectionCallback_(shared_from_this());
130354280cfSShuo Chen}
131354280cfSShuo Chen
132354280cfSShuo Chenvoid TcpConnection::connectDestroyed()
133354280cfSShuo Chen{
134354280cfSShuo Chen  loop_->assertInLoopThread();
135354280cfSShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
136354280cfSShuo Chen  setState(kDisconnected);
137354280cfSShuo Chen  channel_->disableAll();
138354280cfSShuo Chen  connectionCallback_(shared_from_this());
139354280cfSShuo Chen
140354280cfSShuo Chen  loop_->removeChannel(get_pointer(channel_));
141354280cfSShuo Chen}
142354280cfSShuo Chen
143354280cfSShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
144354280cfSShuo Chen{
145354280cfSShuo Chen  int savedErrno = 0;
146354280cfSShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
147354280cfSShuo Chen  if (n > 0) {
148354280cfSShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
149354280cfSShuo Chen  } else if (n == 0) {
150354280cfSShuo Chen    handleClose();
151354280cfSShuo Chen  } else {
152354280cfSShuo Chen    errno = savedErrno;
153354280cfSShuo Chen    LOG_SYSERR << "TcpConnection::handleRead";
154354280cfSShuo Chen    handleError();
155354280cfSShuo Chen  }
156354280cfSShuo Chen}
157354280cfSShuo Chen
158354280cfSShuo Chenvoid TcpConnection::handleWrite()
159354280cfSShuo Chen{
160354280cfSShuo Chen  loop_->assertInLoopThread();
161354280cfSShuo Chen  if (channel_->isWriting()) {
162354280cfSShuo Chen    ssize_t n = ::write(channel_->fd(),
163354280cfSShuo Chen                        outputBuffer_.peek(),
164354280cfSShuo Chen                        outputBuffer_.readableBytes());
165354280cfSShuo Chen    if (n > 0) {
166354280cfSShuo Chen      outputBuffer_.retrieve(n);
167354280cfSShuo Chen      if (outputBuffer_.readableBytes() == 0) {
168354280cfSShuo Chen        channel_->disableWriting();
169354280cfSShuo Chen        if (writeCompleteCallback_) {
170354280cfSShuo Chen          loop_->queueInLoop(
171354280cfSShuo Chen              boost::bind(writeCompleteCallback_, shared_from_this()));
172354280cfSShuo Chen        }
173354280cfSShuo Chen        if (state_ == kDisconnecting) {
174354280cfSShuo Chen          shutdownInLoop();
175354280cfSShuo Chen        }
176354280cfSShuo Chen      } else {
177354280cfSShuo Chen        LOG_TRACE << "I am going to write more data";
178354280cfSShuo Chen      }
179354280cfSShuo Chen    } else {
180354280cfSShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
181354280cfSShuo Chen    }
182354280cfSShuo Chen  } else {
183354280cfSShuo Chen    LOG_TRACE << "Connection is down, no more writing";
184354280cfSShuo Chen  }
185354280cfSShuo Chen}
186354280cfSShuo Chen
187354280cfSShuo Chenvoid TcpConnection::handleClose()
188354280cfSShuo Chen{
189354280cfSShuo Chen  loop_->assertInLoopThread();
190354280cfSShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
191354280cfSShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
192354280cfSShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
193354280cfSShuo Chen  channel_->disableAll();
194354280cfSShuo Chen  // must be the last line
195354280cfSShuo Chen  closeCallback_(shared_from_this());
196354280cfSShuo Chen}
197354280cfSShuo Chen
198354280cfSShuo Chenvoid TcpConnection::handleError()
199354280cfSShuo Chen{
200354280cfSShuo Chen  int err = sockets::getSocketError(channel_->fd());
201354280cfSShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
202354280cfSShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
203354280cfSShuo Chen}
204