TcpConnection.cc revision b37003a7
1b37003a7SShuo Chen// excerpts from http://code.google.com/p/muduo/
2b37003a7SShuo Chen//
3b37003a7SShuo Chen// Use of this source code is governed by a BSD-style license
4b37003a7SShuo Chen// that can be found in the License file.
5b37003a7SShuo Chen//
6b37003a7SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7b37003a7SShuo Chen
8b37003a7SShuo Chen#include "TcpConnection.h"
9b37003a7SShuo Chen
10b37003a7SShuo Chen#include "logging/Logging.h"
11b37003a7SShuo Chen#include "Channel.h"
12b37003a7SShuo Chen#include "EventLoop.h"
13b37003a7SShuo Chen#include "Socket.h"
14b37003a7SShuo Chen#include "SocketsOps.h"
15b37003a7SShuo Chen
16b37003a7SShuo Chen#include <boost/bind.hpp>
17b37003a7SShuo Chen
18b37003a7SShuo Chen#include <errno.h>
19b37003a7SShuo Chen#include <stdio.h>
20b37003a7SShuo Chen
21b37003a7SShuo Chenusing namespace muduo;
22b37003a7SShuo Chen
23b37003a7SShuo ChenTcpConnection::TcpConnection(EventLoop* loop,
24b37003a7SShuo Chen                             const std::string& nameArg,
25b37003a7SShuo Chen                             int sockfd,
26b37003a7SShuo Chen                             const InetAddress& localAddr,
27b37003a7SShuo Chen                             const InetAddress& peerAddr)
28b37003a7SShuo Chen  : loop_(CHECK_NOTNULL(loop)),
29b37003a7SShuo Chen    name_(nameArg),
30b37003a7SShuo Chen    state_(kConnecting),
31b37003a7SShuo Chen    socket_(new Socket(sockfd)),
32b37003a7SShuo Chen    channel_(new Channel(loop, sockfd)),
33b37003a7SShuo Chen    localAddr_(localAddr),
34b37003a7SShuo Chen    peerAddr_(peerAddr)
35b37003a7SShuo Chen{
36b37003a7SShuo Chen  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37b37003a7SShuo Chen            << " fd=" << sockfd;
38b37003a7SShuo Chen  channel_->setReadCallback(
39b37003a7SShuo Chen      boost::bind(&TcpConnection::handleRead, this, _1));
40b37003a7SShuo Chen  channel_->setWriteCallback(
41b37003a7SShuo Chen      boost::bind(&TcpConnection::handleWrite, this));
42b37003a7SShuo Chen  channel_->setCloseCallback(
43b37003a7SShuo Chen      boost::bind(&TcpConnection::handleClose, this));
44b37003a7SShuo Chen  channel_->setErrorCallback(
45b37003a7SShuo Chen      boost::bind(&TcpConnection::handleError, this));
46b37003a7SShuo Chen}
47b37003a7SShuo Chen
48b37003a7SShuo ChenTcpConnection::~TcpConnection()
49b37003a7SShuo Chen{
50b37003a7SShuo Chen  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51b37003a7SShuo Chen            << " fd=" << channel_->fd();
52b37003a7SShuo Chen}
53b37003a7SShuo Chen
54b37003a7SShuo Chenvoid TcpConnection::send(const std::string& message)
55b37003a7SShuo Chen{
56b37003a7SShuo Chen  if (state_ == kConnected) {
57b37003a7SShuo Chen    if (loop_->isInLoopThread()) {
58b37003a7SShuo Chen      sendInLoop(message);
59b37003a7SShuo Chen    } else {
60b37003a7SShuo Chen      loop_->runInLoop(
61b37003a7SShuo Chen          boost::bind(&TcpConnection::sendInLoop, this, message));
62b37003a7SShuo Chen    }
63b37003a7SShuo Chen  }
64b37003a7SShuo Chen}
65b37003a7SShuo Chen
66b37003a7SShuo Chenvoid TcpConnection::sendInLoop(const std::string& message)
67b37003a7SShuo Chen{
68b37003a7SShuo Chen  loop_->assertInLoopThread();
69b37003a7SShuo Chen  ssize_t nwrote = 0;
70b37003a7SShuo Chen  // if no thing in output queue, try writing directly
71b37003a7SShuo Chen  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72b37003a7SShuo Chen    nwrote = ::write(channel_->fd(), message.data(), message.size());
73b37003a7SShuo Chen    if (nwrote >= 0) {
74b37003a7SShuo Chen      if (implicit_cast<size_t>(nwrote) < message.size()) {
75b37003a7SShuo Chen        LOG_TRACE << "I am going to write more data";
76b37003a7SShuo Chen      }
77b37003a7SShuo Chen    } else {
78b37003a7SShuo Chen      nwrote = 0;
79b37003a7SShuo Chen      if (errno != EWOULDBLOCK) {
80b37003a7SShuo Chen        LOG_SYSERR << "TcpConnection::sendInLoop";
81b37003a7SShuo Chen      }
82b37003a7SShuo Chen    }
83b37003a7SShuo Chen  }
84b37003a7SShuo Chen
85b37003a7SShuo Chen  assert(nwrote >= 0);
86b37003a7SShuo Chen  if (implicit_cast<size_t>(nwrote) < message.size()) {
87b37003a7SShuo Chen    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
88b37003a7SShuo Chen    if (!channel_->isWriting()) {
89b37003a7SShuo Chen      channel_->enableWriting();
90b37003a7SShuo Chen    }
91b37003a7SShuo Chen  }
92b37003a7SShuo Chen}
93b37003a7SShuo Chen
94b37003a7SShuo Chenvoid TcpConnection::shutdown()
95b37003a7SShuo Chen{
96b37003a7SShuo Chen  // FIXME: use compare and swap
97b37003a7SShuo Chen  if (state_ == kConnected)
98b37003a7SShuo Chen  {
99b37003a7SShuo Chen    setState(kDisconnecting);
100b37003a7SShuo Chen    // FIXME: shared_from_this()?
101b37003a7SShuo Chen    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
102b37003a7SShuo Chen  }
103b37003a7SShuo Chen}
104b37003a7SShuo Chen
105b37003a7SShuo Chenvoid TcpConnection::shutdownInLoop()
106b37003a7SShuo Chen{
107b37003a7SShuo Chen  loop_->assertInLoopThread();
108b37003a7SShuo Chen  if (!channel_->isWriting())
109b37003a7SShuo Chen  {
110b37003a7SShuo Chen    // we are not writing
111b37003a7SShuo Chen    socket_->shutdownWrite();
112b37003a7SShuo Chen  }
113b37003a7SShuo Chen}
114b37003a7SShuo Chen
115b37003a7SShuo Chenvoid TcpConnection::connectEstablished()
116b37003a7SShuo Chen{
117b37003a7SShuo Chen  loop_->assertInLoopThread();
118b37003a7SShuo Chen  assert(state_ == kConnecting);
119b37003a7SShuo Chen  setState(kConnected);
120b37003a7SShuo Chen  channel_->enableReading();
121b37003a7SShuo Chen  connectionCallback_(shared_from_this());
122b37003a7SShuo Chen}
123b37003a7SShuo Chen
124b37003a7SShuo Chenvoid TcpConnection::connectDestroyed()
125b37003a7SShuo Chen{
126b37003a7SShuo Chen  loop_->assertInLoopThread();
127b37003a7SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
128b37003a7SShuo Chen  setState(kDisconnected);
129b37003a7SShuo Chen  channel_->disableAll();
130b37003a7SShuo Chen  connectionCallback_(shared_from_this());
131b37003a7SShuo Chen
132b37003a7SShuo Chen  loop_->removeChannel(get_pointer(channel_));
133b37003a7SShuo Chen}
134b37003a7SShuo Chen
135b37003a7SShuo Chenvoid TcpConnection::handleRead(Timestamp receiveTime)
136b37003a7SShuo Chen{
137b37003a7SShuo Chen  int savedErrno = 0;
138b37003a7SShuo Chen  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
139b37003a7SShuo Chen  if (n > 0) {
140b37003a7SShuo Chen    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
141b37003a7SShuo Chen  } else if (n == 0) {
142b37003a7SShuo Chen    handleClose();
143b37003a7SShuo Chen  } else {
144b37003a7SShuo Chen    // FIXME: check savedErrno
145b37003a7SShuo Chen    handleError();
146b37003a7SShuo Chen  }
147b37003a7SShuo Chen}
148b37003a7SShuo Chen
149b37003a7SShuo Chenvoid TcpConnection::handleWrite()
150b37003a7SShuo Chen{
151b37003a7SShuo Chen  loop_->assertInLoopThread();
152b37003a7SShuo Chen  if (channel_->isWriting()) {
153b37003a7SShuo Chen    ssize_t n = ::write(channel_->fd(),
154b37003a7SShuo Chen                        outputBuffer_.peek(),
155b37003a7SShuo Chen                        outputBuffer_.readableBytes());
156b37003a7SShuo Chen    if (n > 0) {
157b37003a7SShuo Chen      outputBuffer_.retrieve(n);
158b37003a7SShuo Chen      if (outputBuffer_.readableBytes() == 0) {
159b37003a7SShuo Chen        channel_->disableWriting();
160b37003a7SShuo Chen        if (state_ == kDisconnecting) {
161b37003a7SShuo Chen          shutdownInLoop();
162b37003a7SShuo Chen        }
163b37003a7SShuo Chen      } else {
164b37003a7SShuo Chen        LOG_TRACE << "I am going to write more data";
165b37003a7SShuo Chen      }
166b37003a7SShuo Chen    } else {
167b37003a7SShuo Chen      LOG_SYSERR << "TcpConnection::handleWrite";
168b37003a7SShuo Chen      abort();  // FIXME
169b37003a7SShuo Chen    }
170b37003a7SShuo Chen  } else {
171b37003a7SShuo Chen    LOG_TRACE << "Connection is down, no more writing";
172b37003a7SShuo Chen  }
173b37003a7SShuo Chen}
174b37003a7SShuo Chen
175b37003a7SShuo Chenvoid TcpConnection::handleClose()
176b37003a7SShuo Chen{
177b37003a7SShuo Chen  loop_->assertInLoopThread();
178b37003a7SShuo Chen  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
179b37003a7SShuo Chen  assert(state_ == kConnected || state_ == kDisconnecting);
180b37003a7SShuo Chen  // we don't close fd, leave it to dtor, so we can find leaks easily.
181b37003a7SShuo Chen  channel_->disableAll();
182b37003a7SShuo Chen  // must be the last line
183b37003a7SShuo Chen  closeCallback_(shared_from_this());
184b37003a7SShuo Chen}
185b37003a7SShuo Chen
186b37003a7SShuo Chenvoid TcpConnection::handleError()
187b37003a7SShuo Chen{
188b37003a7SShuo Chen  int err = sockets::getSocketError(channel_->fd());
189b37003a7SShuo Chen  LOG_ERROR << "TcpConnection::handleError [" << name_
190b37003a7SShuo Chen            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
191b37003a7SShuo Chen}
192