1// excerpts from http://code.google.com/p/muduo/
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the License file.
5//
6// Author: Shuo Chen (chenshuo at chenshuo dot com)
7
8#include "TcpConnection.h"
9
10#include "logging/Logging.h"
11#include "Channel.h"
12#include "EventLoop.h"
13#include "Socket.h"
14#include "SocketsOps.h"
15
16#include <boost/bind.hpp>
17
18#include <errno.h>
19#include <stdio.h>
20
21using namespace muduo;
22
23TcpConnection::TcpConnection(EventLoop* loop,
24                             const std::string& nameArg,
25                             int sockfd,
26                             const InetAddress& localAddr,
27                             const InetAddress& peerAddr)
28  : loop_(CHECK_NOTNULL(loop)),
29    name_(nameArg),
30    state_(kConnecting),
31    socket_(new Socket(sockfd)),
32    channel_(new Channel(loop, sockfd)),
33    localAddr_(localAddr),
34    peerAddr_(peerAddr)
35{
36  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37            << " fd=" << sockfd;
38  channel_->setReadCallback(
39      boost::bind(&TcpConnection::handleRead, this, _1));
40  channel_->setWriteCallback(
41      boost::bind(&TcpConnection::handleWrite, this));
42  channel_->setCloseCallback(
43      boost::bind(&TcpConnection::handleClose, this));
44  channel_->setErrorCallback(
45      boost::bind(&TcpConnection::handleError, this));
46}
47
48TcpConnection::~TcpConnection()
49{
50  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51            << " fd=" << channel_->fd();
52}
53
54void TcpConnection::send(const std::string& message)
55{
56  if (state_ == kConnected) {
57    if (loop_->isInLoopThread()) {
58      sendInLoop(message);
59    } else {
60      loop_->runInLoop(
61          boost::bind(&TcpConnection::sendInLoop, this, message));
62    }
63  }
64}
65
66void TcpConnection::sendInLoop(const std::string& message)
67{
68  loop_->assertInLoopThread();
69  ssize_t nwrote = 0;
70  // if no thing in output queue, try writing directly
71  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72    nwrote = ::write(channel_->fd(), message.data(), message.size());
73    if (nwrote >= 0) {
74      if (implicit_cast<size_t>(nwrote) < message.size()) {
75        LOG_TRACE << "I am going to write more data";
76      }
77    } else {
78      nwrote = 0;
79      if (errno != EWOULDBLOCK) {
80        LOG_SYSERR << "TcpConnection::sendInLoop";
81      }
82    }
83  }
84
85  assert(nwrote >= 0);
86  if (implicit_cast<size_t>(nwrote) < message.size()) {
87    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
88    if (!channel_->isWriting()) {
89      channel_->enableWriting();
90    }
91  }
92}
93
94void TcpConnection::shutdown()
95{
96  // FIXME: use compare and swap
97  if (state_ == kConnected)
98  {
99    setState(kDisconnecting);
100    // FIXME: shared_from_this()?
101    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
102  }
103}
104
105void TcpConnection::shutdownInLoop()
106{
107  loop_->assertInLoopThread();
108  if (!channel_->isWriting())
109  {
110    // we are not writing
111    socket_->shutdownWrite();
112  }
113}
114
115void TcpConnection::connectEstablished()
116{
117  loop_->assertInLoopThread();
118  assert(state_ == kConnecting);
119  setState(kConnected);
120  channel_->enableReading();
121  connectionCallback_(shared_from_this());
122}
123
124void TcpConnection::connectDestroyed()
125{
126  loop_->assertInLoopThread();
127  assert(state_ == kConnected || state_ == kDisconnecting);
128  setState(kDisconnected);
129  channel_->disableAll();
130  connectionCallback_(shared_from_this());
131
132  loop_->removeChannel(get_pointer(channel_));
133}
134
135void TcpConnection::handleRead(Timestamp receiveTime)
136{
137  int savedErrno = 0;
138  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
139  if (n > 0) {
140    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
141  } else if (n == 0) {
142    handleClose();
143  } else {
144    errno = savedErrno;
145    LOG_SYSERR << "TcpConnection::handleRead";
146    handleError();
147  }
148}
149
150void TcpConnection::handleWrite()
151{
152  loop_->assertInLoopThread();
153  if (channel_->isWriting()) {
154    ssize_t n = ::write(channel_->fd(),
155                        outputBuffer_.peek(),
156                        outputBuffer_.readableBytes());
157    if (n > 0) {
158      outputBuffer_.retrieve(n);
159      if (outputBuffer_.readableBytes() == 0) {
160        channel_->disableWriting();
161        if (state_ == kDisconnecting) {
162          shutdownInLoop();
163        }
164      } else {
165        LOG_TRACE << "I am going to write more data";
166      }
167    } else {
168      LOG_SYSERR << "TcpConnection::handleWrite";
169    }
170  } else {
171    LOG_TRACE << "Connection is down, no more writing";
172  }
173}
174
175void TcpConnection::handleClose()
176{
177  loop_->assertInLoopThread();
178  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
179  assert(state_ == kConnected || state_ == kDisconnecting);
180  // we don't close fd, leave it to dtor, so we can find leaks easily.
181  channel_->disableAll();
182  // must be the last line
183  closeCallback_(shared_from_this());
184}
185
186void TcpConnection::handleError()
187{
188  int err = sockets::getSocketError(channel_->fd());
189  LOG_ERROR << "TcpConnection::handleError [" << name_
190            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
191}
192