1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#include "TcpConnection.h"
9
10#include "logging/Logging.h"
11#include "Channel.h"
12#include "EventLoop.h"
13#include "Socket.h"
14#include "SocketsOps.h"
15
16#include <boost/bind.hpp>
17
18#include <errno.h>
19#include <stdio.h>
20
21using namespace muduo;
22
23TcpConnection::TcpConnection(EventLoop* loop,
24                             const std::string& nameArg,
25                             int sockfd,
26                             const InetAddress& localAddr,
27                             const InetAddress& peerAddr)
28  : loop_(CHECK_NOTNULL(loop)),
29    name_(nameArg),
30    state_(kConnecting),
31    socket_(new Socket(sockfd)),
32    channel_(new Channel(loop, sockfd)),
33    localAddr_(localAddr),
34    peerAddr_(peerAddr)
35{
36  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37            << " fd=" << sockfd;
38  channel_->setReadCallback(
39      boost::bind(&TcpConnection::handleRead, this, _1));
40  channel_->setWriteCallback(
41      boost::bind(&TcpConnection::handleWrite, this));
42  channel_->setCloseCallback(
43      boost::bind(&TcpConnection::handleClose, this));
44  channel_->setErrorCallback(
45      boost::bind(&TcpConnection::handleError, this));
46}
47
48TcpConnection::~TcpConnection()
49{
50  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51            << " fd=" << channel_->fd();
52}
53
54void TcpConnection::send(const std::string& message)
55{
56  if (state_ == kConnected) {
57    if (loop_->isInLoopThread()) {
58      sendInLoop(message);
59    } else {
60      loop_->runInLoop(
61          boost::bind(&TcpConnection::sendInLoop, this, message));
62    }
63  }
64}
65
66void TcpConnection::sendInLoop(const std::string& message)
67{
68  loop_->assertInLoopThread();
69  ssize_t nwrote = 0;
70  // if no thing in output queue, try writing directly
71  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72    nwrote = ::write(channel_->fd(), message.data(), message.size());
73    if (nwrote >= 0) {
74      if (implicit_cast<size_t>(nwrote) < message.size()) {
75        LOG_TRACE << "I am going to write more data";
76      } else if (writeCompleteCallback_) {
77        loop_->queueInLoop(
78            boost::bind(writeCompleteCallback_, shared_from_this()));
79      }
80    } else {
81      nwrote = 0;
82      if (errno != EWOULDBLOCK) {
83        LOG_SYSERR << "TcpConnection::sendInLoop";
84      }
85    }
86  }
87
88  assert(nwrote >= 0);
89  if (implicit_cast<size_t>(nwrote) < message.size()) {
90    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
91    if (!channel_->isWriting()) {
92      channel_->enableWriting();
93    }
94  }
95}
96
97void TcpConnection::shutdown()
98{
99  // FIXME: use compare and swap
100  if (state_ == kConnected)
101  {
102    setState(kDisconnecting);
103    // FIXME: shared_from_this()?
104    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
105  }
106}
107
108void TcpConnection::shutdownInLoop()
109{
110  loop_->assertInLoopThread();
111  if (!channel_->isWriting())
112  {
113    // we are not writing
114    socket_->shutdownWrite();
115  }
116}
117
118void TcpConnection::setTcpNoDelay(bool on)
119{
120  socket_->setTcpNoDelay(on);
121}
122
123void TcpConnection::connectEstablished()
124{
125  loop_->assertInLoopThread();
126  assert(state_ == kConnecting);
127  setState(kConnected);
128  channel_->enableReading();
129  connectionCallback_(shared_from_this());
130}
131
132void TcpConnection::connectDestroyed()
133{
134  loop_->assertInLoopThread();
135  assert(state_ == kConnected || state_ == kDisconnecting);
136  setState(kDisconnected);
137  channel_->disableAll();
138  connectionCallback_(shared_from_this());
139
140  loop_->removeChannel(get_pointer(channel_));
141}
142
143void TcpConnection::handleRead(Timestamp receiveTime)
144{
145  int savedErrno = 0;
146  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
147  if (n > 0) {
148    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
149  } else if (n == 0) {
150    handleClose();
151  } else {
152    errno = savedErrno;
153    LOG_SYSERR << "TcpConnection::handleRead";
154    handleError();
155  }
156}
157
158void TcpConnection::handleWrite()
159{
160  loop_->assertInLoopThread();
161  if (channel_->isWriting()) {
162    ssize_t n = ::write(channel_->fd(),
163                        outputBuffer_.peek(),
164                        outputBuffer_.readableBytes());
165    if (n > 0) {
166      outputBuffer_.retrieve(n);
167      if (outputBuffer_.readableBytes() == 0) {
168        channel_->disableWriting();
169        if (writeCompleteCallback_) {
170          loop_->queueInLoop(
171              boost::bind(writeCompleteCallback_, shared_from_this()));
172        }
173        if (state_ == kDisconnecting) {
174          shutdownInLoop();
175        }
176      } else {
177        LOG_TRACE << "I am going to write more data";
178      }
179    } else {
180      LOG_SYSERR << "TcpConnection::handleWrite";
181    }
182  } else {
183    LOG_TRACE << "Connection is down, no more writing";
184  }
185}
186
187void TcpConnection::handleClose()
188{
189  loop_->assertInLoopThread();
190  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
191  assert(state_ == kConnected || state_ == kDisconnecting);
192  // we don't close fd, leave it to dtor, so we can find leaks easily.
193  channel_->disableAll();
194  // must be the last line
195  closeCallback_(shared_from_this());
196}
197
198void TcpConnection::handleError()
199{
200  int err = sockets::getSocketError(channel_->fd());
201  LOG_ERROR << "TcpConnection::handleError [" << name_
202            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
203}
204