Connector.cc revision fdb5c17c
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/ 2a1bde736SShuo Chen// 3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license 4a1bde736SShuo Chen// that can be found in the License file. 5a1bde736SShuo Chen// 6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7a1bde736SShuo Chen 8a1bde736SShuo Chen#include "Connector.h" 9a1bde736SShuo Chen 10a1bde736SShuo Chen#include "Channel.h" 11a1bde736SShuo Chen#include "EventLoop.h" 12a1bde736SShuo Chen#include "SocketsOps.h" 13a1bde736SShuo Chen 14a1bde736SShuo Chen#include "logging/Logging.h" 15a1bde736SShuo Chen 16a1bde736SShuo Chen#include <boost/bind.hpp> 17a1bde736SShuo Chen 18a1bde736SShuo Chen#include <errno.h> 19a1bde736SShuo Chen 20a1bde736SShuo Chenusing namespace muduo; 21a1bde736SShuo Chen 22a1bde736SShuo Chenconst int Connector::kMaxRetryDelayMs; 23a1bde736SShuo Chen 24a1bde736SShuo ChenConnector::Connector(EventLoop* loop, const InetAddress& serverAddr) 25a1bde736SShuo Chen : loop_(loop), 26a1bde736SShuo Chen serverAddr_(serverAddr), 27a1bde736SShuo Chen connect_(false), 28a1bde736SShuo Chen state_(kDisconnected), 29a1bde736SShuo Chen retryDelayMs_(kInitRetryDelayMs) 30a1bde736SShuo Chen{ 31a1bde736SShuo Chen LOG_DEBUG << "ctor[" << this << "]"; 32a1bde736SShuo Chen} 33a1bde736SShuo Chen 34a1bde736SShuo ChenConnector::~Connector() 35a1bde736SShuo Chen{ 36a1bde736SShuo Chen LOG_DEBUG << "dtor[" << this << "]"; 37a1bde736SShuo Chen loop_->cancel(timerId_); 38a1bde736SShuo Chen assert(!channel_); 39a1bde736SShuo Chen} 40a1bde736SShuo Chen 41a1bde736SShuo Chenvoid Connector::start() 42a1bde736SShuo Chen{ 43a1bde736SShuo Chen connect_ = true; 44a1bde736SShuo Chen loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe 45a1bde736SShuo Chen} 46a1bde736SShuo Chen 47a1bde736SShuo Chenvoid Connector::startInLoop() 48a1bde736SShuo Chen{ 49a1bde736SShuo Chen loop_->assertInLoopThread(); 50a1bde736SShuo Chen assert(state_ == kDisconnected); 51a1bde736SShuo Chen if (connect_) 52a1bde736SShuo Chen { 53a1bde736SShuo Chen connect(); 54a1bde736SShuo Chen } 55a1bde736SShuo Chen else 56a1bde736SShuo Chen { 57a1bde736SShuo Chen LOG_DEBUG << "do not connect"; 58a1bde736SShuo Chen } 59a1bde736SShuo Chen} 60a1bde736SShuo Chen 61a1bde736SShuo Chenvoid Connector::connect() 62a1bde736SShuo Chen{ 63a1bde736SShuo Chen int sockfd = sockets::createNonblockingOrDie(); 64a1bde736SShuo Chen int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet()); 65a1bde736SShuo Chen int savedErrno = (ret == 0) ? 0 : errno; 66a1bde736SShuo Chen switch (savedErrno) 67a1bde736SShuo Chen { 68a1bde736SShuo Chen case 0: 69a1bde736SShuo Chen case EINPROGRESS: 70a1bde736SShuo Chen case EINTR: 71a1bde736SShuo Chen case EISCONN: 72a1bde736SShuo Chen connecting(sockfd); 73a1bde736SShuo Chen break; 74a1bde736SShuo Chen 75a1bde736SShuo Chen case EAGAIN: 76a1bde736SShuo Chen case EADDRINUSE: 77a1bde736SShuo Chen case EADDRNOTAVAIL: 78a1bde736SShuo Chen case ECONNREFUSED: 79a1bde736SShuo Chen case ENETUNREACH: 80a1bde736SShuo Chen retry(sockfd); 81a1bde736SShuo Chen break; 82a1bde736SShuo Chen 83a1bde736SShuo Chen case EACCES: 84a1bde736SShuo Chen case EPERM: 85a1bde736SShuo Chen case EAFNOSUPPORT: 86a1bde736SShuo Chen case EALREADY: 87a1bde736SShuo Chen case EBADF: 88a1bde736SShuo Chen case EFAULT: 89a1bde736SShuo Chen case ENOTSOCK: 90a1bde736SShuo Chen LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; 91a1bde736SShuo Chen sockets::close(sockfd); 92a1bde736SShuo Chen break; 93a1bde736SShuo Chen 94a1bde736SShuo Chen default: 95a1bde736SShuo Chen LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; 96a1bde736SShuo Chen sockets::close(sockfd); 97a1bde736SShuo Chen // connectErrorCallback_(); 98a1bde736SShuo Chen break; 99a1bde736SShuo Chen } 100a1bde736SShuo Chen} 101a1bde736SShuo Chen 102a1bde736SShuo Chenvoid Connector::restart() 103a1bde736SShuo Chen{ 104a1bde736SShuo Chen loop_->assertInLoopThread(); 105a1bde736SShuo Chen setState(kDisconnected); 106a1bde736SShuo Chen retryDelayMs_ = kInitRetryDelayMs; 107a1bde736SShuo Chen connect_ = true; 108a1bde736SShuo Chen startInLoop(); 109a1bde736SShuo Chen} 110a1bde736SShuo Chen 111a1bde736SShuo Chenvoid Connector::stop() 112a1bde736SShuo Chen{ 113a1bde736SShuo Chen connect_ = false; 114fdb5c17cSShuo Chen loop_->cancel(timerId_); 115a1bde736SShuo Chen} 116a1bde736SShuo Chen 117a1bde736SShuo Chenvoid Connector::connecting(int sockfd) 118a1bde736SShuo Chen{ 119a1bde736SShuo Chen setState(kConnecting); 120a1bde736SShuo Chen assert(!channel_); 121a1bde736SShuo Chen channel_.reset(new Channel(loop_, sockfd)); 122a1bde736SShuo Chen channel_->setWriteCallback( 123a1bde736SShuo Chen boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe 124a1bde736SShuo Chen channel_->setErrorCallback( 125a1bde736SShuo Chen boost::bind(&Connector::handleError, this)); // FIXME: unsafe 126a1bde736SShuo Chen 127a1bde736SShuo Chen // channel_->tie(shared_from_this()); is not working, 128a1bde736SShuo Chen // as channel_ is not managed by shared_ptr 129a1bde736SShuo Chen channel_->enableWriting(); 130a1bde736SShuo Chen} 131a1bde736SShuo Chen 132a1bde736SShuo Chenint Connector::removeAndResetChannel() 133a1bde736SShuo Chen{ 134a1bde736SShuo Chen channel_->disableAll(); 135a1bde736SShuo Chen loop_->removeChannel(get_pointer(channel_)); 136a1bde736SShuo Chen int sockfd = channel_->fd(); 137a1bde736SShuo Chen // Can't reset channel_ here, because we are inside Channel::handleEvent 138a1bde736SShuo Chen loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe 139a1bde736SShuo Chen return sockfd; 140a1bde736SShuo Chen} 141a1bde736SShuo Chen 142a1bde736SShuo Chenvoid Connector::resetChannel() 143a1bde736SShuo Chen{ 144a1bde736SShuo Chen channel_.reset(); 145a1bde736SShuo Chen} 146a1bde736SShuo Chen 147a1bde736SShuo Chenvoid Connector::handleWrite() 148a1bde736SShuo Chen{ 149a1bde736SShuo Chen LOG_TRACE << "Connector::handleWrite " << state_; 150a1bde736SShuo Chen 151a1bde736SShuo Chen if (state_ == kConnecting) 152a1bde736SShuo Chen { 153a1bde736SShuo Chen int sockfd = removeAndResetChannel(); 154a1bde736SShuo Chen int err = sockets::getSocketError(sockfd); 155a1bde736SShuo Chen if (err) 156a1bde736SShuo Chen { 157a1bde736SShuo Chen LOG_WARN << "Connector::handleWrite - SO_ERROR = " 158a1bde736SShuo Chen << err << " " << strerror_tl(err); 159a1bde736SShuo Chen retry(sockfd); 160a1bde736SShuo Chen } 161a1bde736SShuo Chen else if (sockets::isSelfConnect(sockfd)) 162a1bde736SShuo Chen { 163a1bde736SShuo Chen LOG_WARN << "Connector::handleWrite - Self connect"; 164a1bde736SShuo Chen retry(sockfd); 165a1bde736SShuo Chen } 166a1bde736SShuo Chen else 167a1bde736SShuo Chen { 168a1bde736SShuo Chen setState(kConnected); 169a1bde736SShuo Chen if (connect_) 170a1bde736SShuo Chen { 171a1bde736SShuo Chen newConnectionCallback_(sockfd); 172a1bde736SShuo Chen } 173a1bde736SShuo Chen else 174a1bde736SShuo Chen { 175a1bde736SShuo Chen sockets::close(sockfd); 176a1bde736SShuo Chen } 177a1bde736SShuo Chen } 178a1bde736SShuo Chen } 179a1bde736SShuo Chen else 180a1bde736SShuo Chen { 181a1bde736SShuo Chen // what happened? 182a1bde736SShuo Chen assert(state_ == kDisconnected); 183a1bde736SShuo Chen } 184a1bde736SShuo Chen} 185a1bde736SShuo Chen 186a1bde736SShuo Chenvoid Connector::handleError() 187a1bde736SShuo Chen{ 188a1bde736SShuo Chen LOG_ERROR << "Connector::handleError"; 189a1bde736SShuo Chen assert(state_ == kConnecting); 190a1bde736SShuo Chen 191a1bde736SShuo Chen int sockfd = removeAndResetChannel(); 192a1bde736SShuo Chen int err = sockets::getSocketError(sockfd); 193a1bde736SShuo Chen LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err); 194a1bde736SShuo Chen retry(sockfd); 195a1bde736SShuo Chen} 196a1bde736SShuo Chen 197a1bde736SShuo Chenvoid Connector::retry(int sockfd) 198a1bde736SShuo Chen{ 199a1bde736SShuo Chen sockets::close(sockfd); 200a1bde736SShuo Chen setState(kDisconnected); 201a1bde736SShuo Chen if (connect_) 202a1bde736SShuo Chen { 203a1bde736SShuo Chen LOG_INFO << "Connector::retry - Retry connecting to " 204a1bde736SShuo Chen << serverAddr_.toHostPort() << " in " 205a1bde736SShuo Chen << retryDelayMs_ << " milliseconds. "; 206a1bde736SShuo Chen timerId_ = loop_->runAfter(retryDelayMs_/1000.0, // FIXME: unsafe 207a1bde736SShuo Chen boost::bind(&Connector::startInLoop, this)); 208a1bde736SShuo Chen retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); 209a1bde736SShuo Chen } 210a1bde736SShuo Chen else 211a1bde736SShuo Chen { 212a1bde736SShuo Chen LOG_DEBUG << "do not connect"; 213a1bde736SShuo Chen } 214a1bde736SShuo Chen} 215a1bde736SShuo Chen 216