// excerpts from http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // // Author: Shuo Chen (chenshuo at chenshuo dot com) #include "TcpConnection.h" #include "logging/Logging.h" #include "Channel.h" #include "EventLoop.h" #include "Socket.h" #include "SocketsOps.h" #include #include #include using namespace muduo; TcpConnection::TcpConnection(EventLoop* loop, const std::string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr) { LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd(); } void TcpConnection::send(const std::string& message) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(message); } else { loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, message)); } } } void TcpConnection::sendInLoop(const std::string& message) { loop_->assertInLoopThread(); ssize_t nwrote = 0; // if no thing in output queue, try writing directly if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = ::write(channel_->fd(), message.data(), message.size()); if (nwrote >= 0) { if (implicit_cast(nwrote) < message.size()) { LOG_TRACE << "I am going to write more data"; } else if (writeCompleteCallback_) { loop_->queueInLoop( boost::bind(writeCompleteCallback_, shared_from_this())); } } else { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; } } } assert(nwrote >= 0); if (implicit_cast(nwrote) < message.size()) { outputBuffer_.append(message.data()+nwrote, message.size()-nwrote); if (!channel_->isWriting()) { channel_->enableWriting(); } } } void TcpConnection::shutdown() { // FIXME: use compare and swap if (state_ == kConnected) { setState(kDisconnecting); // FIXME: shared_from_this()? loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop() { loop_->assertInLoopThread(); if (!channel_->isWriting()) { // we are not writing socket_->shutdownWrite(); } } void TcpConnection::setTcpNoDelay(bool on) { socket_->setTcpNoDelay(on); } void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); setState(kConnected); channel_->enableReading(); connectionCallback_(shared_from_this()); } void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); assert(state_ == kConnected || state_ == kDisconnecting); setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); loop_->removeChannel(get_pointer(channel_)); } void TcpConnection::handleRead(Timestamp receiveTime) { int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } void TcpConnection::handleWrite() { loop_->assertInLoopThread(); if (channel_->isWriting()) { ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) { loop_->queueInLoop( boost::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); } } else { LOG_TRACE << "I am going to write more data"; } } else { LOG_SYSERR << "TcpConnection::handleWrite"; abort(); // FIXME } } else { LOG_TRACE << "Connection is down, no more writing"; } } void TcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "TcpConnection::handleClose state = " << state_; assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. channel_->disableAll(); // must be the last line closeCallback_(shared_from_this()); } void TcpConnection::handleError() { int err = sockets::getSocketError(channel_->fd()); LOG_ERROR << "TcpConnection::handleError [" << name_ << "] - SO_ERROR = " << err << " " << strerror_tl(err); }