Connector.cc revision 354280cf
1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/ 2354280cfSShuo Chen// 3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license 4354280cfSShuo Chen// that can be found in the License file. 5354280cfSShuo Chen// 6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com) 7354280cfSShuo Chen 8354280cfSShuo Chen#include "Connector.h" 9354280cfSShuo Chen 10354280cfSShuo Chen#include "Channel.h" 11354280cfSShuo Chen#include "EventLoop.h" 12354280cfSShuo Chen#include "SocketsOps.h" 13354280cfSShuo Chen 14354280cfSShuo Chen#include "logging/Logging.h" 15354280cfSShuo Chen 16354280cfSShuo Chen#include <boost/bind.hpp> 17354280cfSShuo Chen 18354280cfSShuo Chen#include <errno.h> 19354280cfSShuo Chen 20354280cfSShuo Chenusing namespace muduo; 21354280cfSShuo Chen 22354280cfSShuo Chenconst int Connector::kMaxRetryDelayMs; 23354280cfSShuo Chen 24354280cfSShuo ChenConnector::Connector(EventLoop* loop, const InetAddress& serverAddr) 25354280cfSShuo Chen : loop_(loop), 26354280cfSShuo Chen serverAddr_(serverAddr), 27354280cfSShuo Chen connect_(false), 28354280cfSShuo Chen state_(kDisconnected), 29354280cfSShuo Chen retryDelayMs_(kInitRetryDelayMs) 30354280cfSShuo Chen{ 31354280cfSShuo Chen LOG_DEBUG << "ctor[" << this << "]"; 32354280cfSShuo Chen} 33354280cfSShuo Chen 34354280cfSShuo ChenConnector::~Connector() 35354280cfSShuo Chen{ 36354280cfSShuo Chen LOG_DEBUG << "dtor[" << this << "]"; 37354280cfSShuo Chen loop_->cancel(timerId_); 38354280cfSShuo Chen assert(!channel_); 39354280cfSShuo Chen} 40354280cfSShuo Chen 41354280cfSShuo Chenvoid Connector::start() 42354280cfSShuo Chen{ 43354280cfSShuo Chen connect_ = true; 44354280cfSShuo Chen loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe 45354280cfSShuo Chen} 46354280cfSShuo Chen 47354280cfSShuo Chenvoid Connector::startInLoop() 48354280cfSShuo Chen{ 49354280cfSShuo Chen loop_->assertInLoopThread(); 50354280cfSShuo Chen assert(state_ == kDisconnected); 51354280cfSShuo Chen if (connect_) 52354280cfSShuo Chen { 53354280cfSShuo Chen connect(); 54354280cfSShuo Chen } 55354280cfSShuo Chen else 56354280cfSShuo Chen { 57354280cfSShuo Chen LOG_DEBUG << "do not connect"; 58354280cfSShuo Chen } 59354280cfSShuo Chen} 60354280cfSShuo Chen 61354280cfSShuo Chenvoid Connector::connect() 62354280cfSShuo Chen{ 63354280cfSShuo Chen int sockfd = sockets::createNonblockingOrDie(); 64354280cfSShuo Chen int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet()); 65354280cfSShuo Chen int savedErrno = (ret == 0) ? 0 : errno; 66354280cfSShuo Chen switch (savedErrno) 67354280cfSShuo Chen { 68354280cfSShuo Chen case 0: 69354280cfSShuo Chen case EINPROGRESS: 70354280cfSShuo Chen case EINTR: 71354280cfSShuo Chen case EISCONN: 72354280cfSShuo Chen connecting(sockfd); 73354280cfSShuo Chen break; 74354280cfSShuo Chen 75354280cfSShuo Chen case EAGAIN: 76354280cfSShuo Chen case EADDRINUSE: 77354280cfSShuo Chen case EADDRNOTAVAIL: 78354280cfSShuo Chen case ECONNREFUSED: 79354280cfSShuo Chen case ENETUNREACH: 80354280cfSShuo Chen retry(sockfd); 81354280cfSShuo Chen break; 82354280cfSShuo Chen 83354280cfSShuo Chen case EACCES: 84354280cfSShuo Chen case EPERM: 85354280cfSShuo Chen case EAFNOSUPPORT: 86354280cfSShuo Chen case EALREADY: 87354280cfSShuo Chen case EBADF: 88354280cfSShuo Chen case EFAULT: 89354280cfSShuo Chen case ENOTSOCK: 90354280cfSShuo Chen LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; 91354280cfSShuo Chen sockets::close(sockfd); 92354280cfSShuo Chen break; 93354280cfSShuo Chen 94354280cfSShuo Chen default: 95354280cfSShuo Chen LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; 96354280cfSShuo Chen sockets::close(sockfd); 97354280cfSShuo Chen // connectErrorCallback_(); 98354280cfSShuo Chen break; 99354280cfSShuo Chen } 100354280cfSShuo Chen} 101354280cfSShuo Chen 102354280cfSShuo Chenvoid Connector::restart() 103354280cfSShuo Chen{ 104354280cfSShuo Chen loop_->assertInLoopThread(); 105354280cfSShuo Chen setState(kDisconnected); 106354280cfSShuo Chen retryDelayMs_ = kInitRetryDelayMs; 107354280cfSShuo Chen connect_ = true; 108354280cfSShuo Chen startInLoop(); 109354280cfSShuo Chen} 110354280cfSShuo Chen 111354280cfSShuo Chenvoid Connector::stop() 112354280cfSShuo Chen{ 113354280cfSShuo Chen connect_ = false; 114354280cfSShuo Chen loop_->cancel(timerId_); 115354280cfSShuo Chen} 116354280cfSShuo Chen 117354280cfSShuo Chenvoid Connector::connecting(int sockfd) 118354280cfSShuo Chen{ 119354280cfSShuo Chen setState(kConnecting); 120354280cfSShuo Chen assert(!channel_); 121354280cfSShuo Chen channel_.reset(new Channel(loop_, sockfd)); 122354280cfSShuo Chen channel_->setWriteCallback( 123354280cfSShuo Chen boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe 124354280cfSShuo Chen channel_->setErrorCallback( 125354280cfSShuo Chen boost::bind(&Connector::handleError, this)); // FIXME: unsafe 126354280cfSShuo Chen 127354280cfSShuo Chen // channel_->tie(shared_from_this()); is not working, 128354280cfSShuo Chen // as channel_ is not managed by shared_ptr 129354280cfSShuo Chen channel_->enableWriting(); 130354280cfSShuo Chen} 131354280cfSShuo Chen 132354280cfSShuo Chenint Connector::removeAndResetChannel() 133354280cfSShuo Chen{ 134354280cfSShuo Chen channel_->disableAll(); 135354280cfSShuo Chen loop_->removeChannel(get_pointer(channel_)); 136354280cfSShuo Chen int sockfd = channel_->fd(); 137354280cfSShuo Chen // Can't reset channel_ here, because we are inside Channel::handleEvent 138354280cfSShuo Chen loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe 139354280cfSShuo Chen return sockfd; 140354280cfSShuo Chen} 141354280cfSShuo Chen 142354280cfSShuo Chenvoid Connector::resetChannel() 143354280cfSShuo Chen{ 144354280cfSShuo Chen channel_.reset(); 145354280cfSShuo Chen} 146354280cfSShuo Chen 147354280cfSShuo Chenvoid Connector::handleWrite() 148354280cfSShuo Chen{ 149354280cfSShuo Chen LOG_TRACE << "Connector::handleWrite " << state_; 150354280cfSShuo Chen 151354280cfSShuo Chen if (state_ == kConnecting) 152354280cfSShuo Chen { 153354280cfSShuo Chen int sockfd = removeAndResetChannel(); 154354280cfSShuo Chen int err = sockets::getSocketError(sockfd); 155354280cfSShuo Chen if (err) 156354280cfSShuo Chen { 157354280cfSShuo Chen LOG_WARN << "Connector::handleWrite - SO_ERROR = " 158354280cfSShuo Chen << err << " " << strerror_tl(err); 159354280cfSShuo Chen retry(sockfd); 160354280cfSShuo Chen } 161354280cfSShuo Chen else if (sockets::isSelfConnect(sockfd)) 162354280cfSShuo Chen { 163354280cfSShuo Chen LOG_WARN << "Connector::handleWrite - Self connect"; 164354280cfSShuo Chen retry(sockfd); 165354280cfSShuo Chen } 166354280cfSShuo Chen else 167354280cfSShuo Chen { 168354280cfSShuo Chen setState(kConnected); 169354280cfSShuo Chen if (connect_) 170354280cfSShuo Chen { 171354280cfSShuo Chen newConnectionCallback_(sockfd); 172354280cfSShuo Chen } 173354280cfSShuo Chen else 174354280cfSShuo Chen { 175354280cfSShuo Chen sockets::close(sockfd); 176354280cfSShuo Chen } 177354280cfSShuo Chen } 178354280cfSShuo Chen } 179354280cfSShuo Chen else 180354280cfSShuo Chen { 181354280cfSShuo Chen // what happened? 182354280cfSShuo Chen assert(state_ == kDisconnected); 183354280cfSShuo Chen } 184354280cfSShuo Chen} 185354280cfSShuo Chen 186354280cfSShuo Chenvoid Connector::handleError() 187354280cfSShuo Chen{ 188354280cfSShuo Chen LOG_ERROR << "Connector::handleError"; 189354280cfSShuo Chen assert(state_ == kConnecting); 190354280cfSShuo Chen 191354280cfSShuo Chen int sockfd = removeAndResetChannel(); 192354280cfSShuo Chen int err = sockets::getSocketError(sockfd); 193354280cfSShuo Chen LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err); 194354280cfSShuo Chen retry(sockfd); 195354280cfSShuo Chen} 196354280cfSShuo Chen 197354280cfSShuo Chenvoid Connector::retry(int sockfd) 198354280cfSShuo Chen{ 199354280cfSShuo Chen sockets::close(sockfd); 200354280cfSShuo Chen setState(kDisconnected); 201354280cfSShuo Chen if (connect_) 202354280cfSShuo Chen { 203354280cfSShuo Chen LOG_INFO << "Connector::retry - Retry connecting to " 204354280cfSShuo Chen << serverAddr_.toHostPort() << " in " 205354280cfSShuo Chen << retryDelayMs_ << " milliseconds. "; 206354280cfSShuo Chen timerId_ = loop_->runAfter(retryDelayMs_/1000.0, // FIXME: unsafe 207354280cfSShuo Chen boost::bind(&Connector::startInLoop, this)); 208354280cfSShuo Chen retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); 209354280cfSShuo Chen } 210354280cfSShuo Chen else 211354280cfSShuo Chen { 212354280cfSShuo Chen LOG_DEBUG << "do not connect"; 213354280cfSShuo Chen } 214354280cfSShuo Chen} 215354280cfSShuo Chen 216