1 // excerpts from http://code.google.com/p/muduo/
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the License file.
5 //
6 // Author: Shuo Chen (chenshuo at chenshuo dot com)
7 
8 #include "TcpConnection.h"
9 
10 #include "logging/Logging.h"
11 #include "Channel.h"
12 #include "EventLoop.h"
13 #include "Socket.h"
14 #include "SocketsOps.h"
15 
16 #include <boost/bind.hpp>
17 
18 #include <errno.h>
19 #include <stdio.h>
20 
21 using namespace muduo;
22 
23 TcpConnection::TcpConnection(EventLoop* loop,
24                              const std::string& nameArg,
25                              int sockfd,
26                              const InetAddress& localAddr,
27                              const InetAddress& peerAddr)
28   : loop_(CHECK_NOTNULL(loop)),
29     name_(nameArg),
30     state_(kConnecting),
31     socket_(new Socket(sockfd)),
32     channel_(new Channel(loop, sockfd)),
33     localAddr_(localAddr),
34     peerAddr_(peerAddr)
35 {
36   LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
37             << " fd=" << sockfd;
38   channel_->setReadCallback(
39       boost::bind(&TcpConnection::handleRead, this, _1));
40   channel_->setWriteCallback(
41       boost::bind(&TcpConnection::handleWrite, this));
42   channel_->setCloseCallback(
43       boost::bind(&TcpConnection::handleClose, this));
44   channel_->setErrorCallback(
45       boost::bind(&TcpConnection::handleError, this));
46 }
47 
48 TcpConnection::~TcpConnection()
49 {
50   LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
51             << " fd=" << channel_->fd();
52 }
53 
54+void TcpConnection::send(const std::string& message)
55+{
56+  if (state_ == kConnected) {
57+    if (loop_->isInLoopThread()) {
58+      sendInLoop(message);
59+    } else {
60+      loop_->runInLoop(
61+          boost::bind(&TcpConnection::sendInLoop, this, message));
62+    }
63+  }
64+}
65+
66+void TcpConnection::sendInLoop(const std::string& message)
67+{
68+  loop_->assertInLoopThread();
69+  ssize_t nwrote = 0;
70+  // if no thing in output queue, try writing directly
71+  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
72+    nwrote = ::write(channel_->fd(), message.data(), message.size());
73+    if (nwrote >= 0) {
74+      if (implicit_cast<size_t>(nwrote) < message.size()) {
75+        LOG_TRACE << "I am going to write more data";
76+      }
77+    } else {
78+      nwrote = 0;
79+      if (errno != EWOULDBLOCK) {
80+        LOG_SYSERR << "TcpConnection::sendInLoop";
81+      }
82+    }
83+  }
84+
85+  assert(nwrote >= 0);
86+  if (implicit_cast<size_t>(nwrote) < message.size()) {
87+    outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);
88+    if (!channel_->isWriting()) {
89+      channel_->enableWriting();
90+    }
91+  }
92+}
93+
94+void TcpConnection::shutdown()
95+{
96+  // FIXME: use compare and swap
97+  if (state_ == kConnected)
98+  {
99+    setState(kDisconnecting);
100+    // FIXME: shared_from_this()?
101+    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
102+  }
103+}
104+
105+void TcpConnection::shutdownInLoop()
106+{
107+  loop_->assertInLoopThread();
108+  if (!channel_->isWriting())
109+  {
110+    // we are not writing
111+    socket_->shutdownWrite();
112+  }
113+}
114+
115 void TcpConnection::connectEstablished()
116 {
117   loop_->assertInLoopThread();
118   assert(state_ == kConnecting);
119   setState(kConnected);
120   channel_->enableReading();
121   connectionCallback_(shared_from_this());
122 }
123 
124 void TcpConnection::connectDestroyed()
125 {
126   loop_->assertInLoopThread();
127!  assert(state_ == kConnected || state_ == kDisconnecting);
128   setState(kDisconnected);
129   channel_->disableAll();
130   connectionCallback_(shared_from_this());
131 
132   loop_->removeChannel(get_pointer(channel_));
133 }
134 
135 void TcpConnection::handleRead(Timestamp receiveTime)
136 {
137   int savedErrno = 0;
138   ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
139   if (n > 0) {
140     messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
141   } else if (n == 0) {
142     handleClose();
143   } else {
144     // FIXME: check savedErrno
145     handleError();
146   }
147 }
148 
149 void TcpConnection::handleWrite()
150 {
151+  loop_->assertInLoopThread();
152+  if (channel_->isWriting()) {
153+    ssize_t n = ::write(channel_->fd(),
154+                        outputBuffer_.peek(),
155+                        outputBuffer_.readableBytes());
156+    if (n > 0) {
157+      outputBuffer_.retrieve(n);
158+      if (outputBuffer_.readableBytes() == 0) {
159+        channel_->disableWriting();
160+        if (state_ == kDisconnecting) {
161+          shutdownInLoop();
162+        }
163+      } else {
164+        LOG_TRACE << "I am going to write more data";
165+      }
166+    } else {
167+      LOG_SYSERR << "TcpConnection::handleWrite";
168+      abort();  // FIXME
169+    }
170+  } else {
171+    LOG_TRACE << "Connection is down, no more writing";
172+  }
173 }
174 
175 void TcpConnection::handleClose()
176 {
177   loop_->assertInLoopThread();
178   LOG_TRACE << "TcpConnection::handleClose state = " << state_;
179!  assert(state_ == kConnected || state_ == kDisconnecting);
180   // we don't close fd, leave it to dtor, so we can find leaks easily.
181   channel_->disableAll();
182   // must be the last line
183   closeCallback_(shared_from_this());
184 }
185 
186 void TcpConnection::handleError()
187 {
188   int err = sockets::getSocketError(channel_->fd());
189   LOG_ERROR << "TcpConnection::handleError [" << name_
190             << "] - SO_ERROR = " << err << " " << strerror_tl(err);
191 }
192