1// excerpts from http://code.google.com/p/muduo/ 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the License file. 5// 6// Author: Shuo Chen (chenshuo at chenshuo dot com) 7 8#include "Connector.h" 9 10#include "Channel.h" 11#include "EventLoop.h" 12#include "SocketsOps.h" 13 14#include "logging/Logging.h" 15 16#include <boost/bind.hpp> 17 18#include <errno.h> 19 20using namespace muduo; 21 22const int Connector::kMaxRetryDelayMs; 23 24Connector::Connector(EventLoop* loop, const InetAddress& serverAddr) 25 : loop_(loop), 26 serverAddr_(serverAddr), 27 connect_(false), 28 state_(kDisconnected), 29 retryDelayMs_(kInitRetryDelayMs) 30{ 31 LOG_DEBUG << "ctor[" << this << "]"; 32} 33 34Connector::~Connector() 35{ 36 LOG_DEBUG << "dtor[" << this << "]"; 37 loop_->cancel(timerId_); 38 assert(!channel_); 39} 40 41void Connector::start() 42{ 43 connect_ = true; 44 loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe 45} 46 47void Connector::startInLoop() 48{ 49 loop_->assertInLoopThread(); 50 assert(state_ == kDisconnected); 51 if (connect_) 52 { 53 connect(); 54 } 55 else 56 { 57 LOG_DEBUG << "do not connect"; 58 } 59} 60 61void Connector::connect() 62{ 63 int sockfd = sockets::createNonblockingOrDie(); 64 int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet()); 65 int savedErrno = (ret == 0) ? 0 : errno; 66 switch (savedErrno) 67 { 68 case 0: 69 case EINPROGRESS: 70 case EINTR: 71 case EISCONN: 72 connecting(sockfd); 73 break; 74 75 case EAGAIN: 76 case EADDRINUSE: 77 case EADDRNOTAVAIL: 78 case ECONNREFUSED: 79 case ENETUNREACH: 80 retry(sockfd); 81 break; 82 83 case EACCES: 84 case EPERM: 85 case EAFNOSUPPORT: 86 case EALREADY: 87 case EBADF: 88 case EFAULT: 89 case ENOTSOCK: 90 LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; 91 sockets::close(sockfd); 92 break; 93 94 default: 95 LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; 96 sockets::close(sockfd); 97 // connectErrorCallback_(); 98 break; 99 } 100} 101 102void Connector::restart() 103{ 104 loop_->assertInLoopThread(); 105 setState(kDisconnected); 106 retryDelayMs_ = kInitRetryDelayMs; 107 connect_ = true; 108 startInLoop(); 109} 110 111void Connector::stop() 112{ 113 connect_ = false; 114 loop_->cancel(timerId_); 115} 116 117void Connector::connecting(int sockfd) 118{ 119 setState(kConnecting); 120 assert(!channel_); 121 channel_.reset(new Channel(loop_, sockfd)); 122 channel_->setWriteCallback( 123 boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe 124 channel_->setErrorCallback( 125 boost::bind(&Connector::handleError, this)); // FIXME: unsafe 126 127 // channel_->tie(shared_from_this()); is not working, 128 // as channel_ is not managed by shared_ptr 129 channel_->enableWriting(); 130} 131 132int Connector::removeAndResetChannel() 133{ 134 channel_->disableAll(); 135 loop_->removeChannel(get_pointer(channel_)); 136 int sockfd = channel_->fd(); 137 // Can't reset channel_ here, because we are inside Channel::handleEvent 138 loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe 139 return sockfd; 140} 141 142void Connector::resetChannel() 143{ 144 channel_.reset(); 145} 146 147void Connector::handleWrite() 148{ 149 LOG_TRACE << "Connector::handleWrite " << state_; 150 151 if (state_ == kConnecting) 152 { 153 int sockfd = removeAndResetChannel(); 154 int err = sockets::getSocketError(sockfd); 155 if (err) 156 { 157 LOG_WARN << "Connector::handleWrite - SO_ERROR = " 158 << err << " " << strerror_tl(err); 159 retry(sockfd); 160 } 161 else if (sockets::isSelfConnect(sockfd)) 162 { 163 LOG_WARN << "Connector::handleWrite - Self connect"; 164 retry(sockfd); 165 } 166 else 167 { 168 setState(kConnected); 169 if (connect_) 170 { 171 newConnectionCallback_(sockfd); 172 } 173 else 174 { 175 sockets::close(sockfd); 176 } 177 } 178 } 179 else 180 { 181 // what happened? 182 assert(state_ == kDisconnected); 183 } 184} 185 186void Connector::handleError() 187{ 188 LOG_ERROR << "Connector::handleError"; 189 assert(state_ == kConnecting); 190 191 int sockfd = removeAndResetChannel(); 192 int err = sockets::getSocketError(sockfd); 193 LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err); 194 retry(sockfd); 195} 196 197void Connector::retry(int sockfd) 198{ 199 sockets::close(sockfd); 200 setState(kDisconnected); 201 if (connect_) 202 { 203 LOG_INFO << "Connector::retry - Retry connecting to " 204 << serverAddr_.toHostPort() << " in " 205 << retryDelayMs_ << " milliseconds. "; 206 timerId_ = loop_->runAfter(retryDelayMs_/1000.0, // FIXME: unsafe 207 boost::bind(&Connector::startInLoop, this)); 208 retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); 209 } 210 else 211 { 212 LOG_DEBUG << "do not connect"; 213 } 214} 215 216