1354280cfSShuo Chen// excerpts from http://code.google.com/p/muduo/
2354280cfSShuo Chen//
3354280cfSShuo Chen// Use of this source code is governed by a BSD-style license
4354280cfSShuo Chen// that can be found in the License file.
5354280cfSShuo Chen//
6354280cfSShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7354280cfSShuo Chen
8354280cfSShuo Chen#include "Connector.h"
9354280cfSShuo Chen
10354280cfSShuo Chen#include "Channel.h"
11354280cfSShuo Chen#include "EventLoop.h"
12354280cfSShuo Chen#include "SocketsOps.h"
13354280cfSShuo Chen
14354280cfSShuo Chen#include "logging/Logging.h"
15354280cfSShuo Chen
16354280cfSShuo Chen#include <boost/bind.hpp>
17354280cfSShuo Chen
18354280cfSShuo Chen#include <errno.h>
19354280cfSShuo Chen
20354280cfSShuo Chenusing namespace muduo;
21354280cfSShuo Chen
22354280cfSShuo Chenconst int Connector::kMaxRetryDelayMs;
23354280cfSShuo Chen
24354280cfSShuo ChenConnector::Connector(EventLoop* loop, const InetAddress& serverAddr)
25354280cfSShuo Chen  : loop_(loop),
26354280cfSShuo Chen    serverAddr_(serverAddr),
27354280cfSShuo Chen    connect_(false),
28354280cfSShuo Chen    state_(kDisconnected),
29354280cfSShuo Chen    retryDelayMs_(kInitRetryDelayMs)
30354280cfSShuo Chen{
31354280cfSShuo Chen  LOG_DEBUG << "ctor[" << this << "]";
32354280cfSShuo Chen}
33354280cfSShuo Chen
34354280cfSShuo ChenConnector::~Connector()
35354280cfSShuo Chen{
36354280cfSShuo Chen  LOG_DEBUG << "dtor[" << this << "]";
37354280cfSShuo Chen  loop_->cancel(timerId_);
38354280cfSShuo Chen  assert(!channel_);
39354280cfSShuo Chen}
40354280cfSShuo Chen
41354280cfSShuo Chenvoid Connector::start()
42354280cfSShuo Chen{
43354280cfSShuo Chen  connect_ = true;
44354280cfSShuo Chen  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
45354280cfSShuo Chen}
46354280cfSShuo Chen
47354280cfSShuo Chenvoid Connector::startInLoop()
48354280cfSShuo Chen{
49354280cfSShuo Chen  loop_->assertInLoopThread();
50354280cfSShuo Chen  assert(state_ == kDisconnected);
51354280cfSShuo Chen  if (connect_)
52354280cfSShuo Chen  {
53354280cfSShuo Chen    connect();
54354280cfSShuo Chen  }
55354280cfSShuo Chen  else
56354280cfSShuo Chen  {
57354280cfSShuo Chen    LOG_DEBUG << "do not connect";
58354280cfSShuo Chen  }
59354280cfSShuo Chen}
60354280cfSShuo Chen
61354280cfSShuo Chenvoid Connector::connect()
62354280cfSShuo Chen{
63354280cfSShuo Chen  int sockfd = sockets::createNonblockingOrDie();
64354280cfSShuo Chen  int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet());
65354280cfSShuo Chen  int savedErrno = (ret == 0) ? 0 : errno;
66354280cfSShuo Chen  switch (savedErrno)
67354280cfSShuo Chen  {
68354280cfSShuo Chen    case 0:
69354280cfSShuo Chen    case EINPROGRESS:
70354280cfSShuo Chen    case EINTR:
71354280cfSShuo Chen    case EISCONN:
72354280cfSShuo Chen      connecting(sockfd);
73354280cfSShuo Chen      break;
74354280cfSShuo Chen
75354280cfSShuo Chen    case EAGAIN:
76354280cfSShuo Chen    case EADDRINUSE:
77354280cfSShuo Chen    case EADDRNOTAVAIL:
78354280cfSShuo Chen    case ECONNREFUSED:
79354280cfSShuo Chen    case ENETUNREACH:
80354280cfSShuo Chen      retry(sockfd);
81354280cfSShuo Chen      break;
82354280cfSShuo Chen
83354280cfSShuo Chen    case EACCES:
84354280cfSShuo Chen    case EPERM:
85354280cfSShuo Chen    case EAFNOSUPPORT:
86354280cfSShuo Chen    case EALREADY:
87354280cfSShuo Chen    case EBADF:
88354280cfSShuo Chen    case EFAULT:
89354280cfSShuo Chen    case ENOTSOCK:
90354280cfSShuo Chen      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
91354280cfSShuo Chen      sockets::close(sockfd);
92354280cfSShuo Chen      break;
93354280cfSShuo Chen
94354280cfSShuo Chen    default:
95354280cfSShuo Chen      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
96354280cfSShuo Chen      sockets::close(sockfd);
97354280cfSShuo Chen      // connectErrorCallback_();
98354280cfSShuo Chen      break;
99354280cfSShuo Chen  }
100354280cfSShuo Chen}
101354280cfSShuo Chen
102354280cfSShuo Chenvoid Connector::restart()
103354280cfSShuo Chen{
104354280cfSShuo Chen  loop_->assertInLoopThread();
105354280cfSShuo Chen  setState(kDisconnected);
106354280cfSShuo Chen  retryDelayMs_ = kInitRetryDelayMs;
107354280cfSShuo Chen  connect_ = true;
108354280cfSShuo Chen  startInLoop();
109354280cfSShuo Chen}
110354280cfSShuo Chen
111354280cfSShuo Chenvoid Connector::stop()
112354280cfSShuo Chen{
113354280cfSShuo Chen  connect_ = false;
114354280cfSShuo Chen  loop_->cancel(timerId_);
115354280cfSShuo Chen}
116354280cfSShuo Chen
117354280cfSShuo Chenvoid Connector::connecting(int sockfd)
118354280cfSShuo Chen{
119354280cfSShuo Chen  setState(kConnecting);
120354280cfSShuo Chen  assert(!channel_);
121354280cfSShuo Chen  channel_.reset(new Channel(loop_, sockfd));
122354280cfSShuo Chen  channel_->setWriteCallback(
123354280cfSShuo Chen      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
124354280cfSShuo Chen  channel_->setErrorCallback(
125354280cfSShuo Chen      boost::bind(&Connector::handleError, this)); // FIXME: unsafe
126354280cfSShuo Chen
127354280cfSShuo Chen  // channel_->tie(shared_from_this()); is not working,
128354280cfSShuo Chen  // as channel_ is not managed by shared_ptr
129354280cfSShuo Chen  channel_->enableWriting();
130354280cfSShuo Chen}
131354280cfSShuo Chen
132354280cfSShuo Chenint Connector::removeAndResetChannel()
133354280cfSShuo Chen{
134354280cfSShuo Chen  channel_->disableAll();
135354280cfSShuo Chen  loop_->removeChannel(get_pointer(channel_));
136354280cfSShuo Chen  int sockfd = channel_->fd();
137354280cfSShuo Chen  // Can't reset channel_ here, because we are inside Channel::handleEvent
138354280cfSShuo Chen  loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe
139354280cfSShuo Chen  return sockfd;
140354280cfSShuo Chen}
141354280cfSShuo Chen
142354280cfSShuo Chenvoid Connector::resetChannel()
143354280cfSShuo Chen{
144354280cfSShuo Chen  channel_.reset();
145354280cfSShuo Chen}
146354280cfSShuo Chen
147354280cfSShuo Chenvoid Connector::handleWrite()
148354280cfSShuo Chen{
149354280cfSShuo Chen  LOG_TRACE << "Connector::handleWrite " << state_;
150354280cfSShuo Chen
151354280cfSShuo Chen  if (state_ == kConnecting)
152354280cfSShuo Chen  {
153354280cfSShuo Chen    int sockfd = removeAndResetChannel();
154354280cfSShuo Chen    int err = sockets::getSocketError(sockfd);
155354280cfSShuo Chen    if (err)
156354280cfSShuo Chen    {
157354280cfSShuo Chen      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
158354280cfSShuo Chen               << err << " " << strerror_tl(err);
159354280cfSShuo Chen      retry(sockfd);
160354280cfSShuo Chen    }
161354280cfSShuo Chen    else if (sockets::isSelfConnect(sockfd))
162354280cfSShuo Chen    {
163354280cfSShuo Chen      LOG_WARN << "Connector::handleWrite - Self connect";
164354280cfSShuo Chen      retry(sockfd);
165354280cfSShuo Chen    }
166354280cfSShuo Chen    else
167354280cfSShuo Chen    {
168354280cfSShuo Chen      setState(kConnected);
169354280cfSShuo Chen      if (connect_)
170354280cfSShuo Chen      {
171354280cfSShuo Chen        newConnectionCallback_(sockfd);
172354280cfSShuo Chen      }
173354280cfSShuo Chen      else
174354280cfSShuo Chen      {
175354280cfSShuo Chen        sockets::close(sockfd);
176354280cfSShuo Chen      }
177354280cfSShuo Chen    }
178354280cfSShuo Chen  }
179354280cfSShuo Chen  else
180354280cfSShuo Chen  {
181354280cfSShuo Chen    // what happened?
182354280cfSShuo Chen    assert(state_ == kDisconnected);
183354280cfSShuo Chen  }
184354280cfSShuo Chen}
185354280cfSShuo Chen
186354280cfSShuo Chenvoid Connector::handleError()
187354280cfSShuo Chen{
188354280cfSShuo Chen  LOG_ERROR << "Connector::handleError";
189354280cfSShuo Chen  assert(state_ == kConnecting);
190354280cfSShuo Chen
191354280cfSShuo Chen  int sockfd = removeAndResetChannel();
192354280cfSShuo Chen  int err = sockets::getSocketError(sockfd);
193354280cfSShuo Chen  LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
194354280cfSShuo Chen  retry(sockfd);
195354280cfSShuo Chen}
196354280cfSShuo Chen
197354280cfSShuo Chenvoid Connector::retry(int sockfd)
198354280cfSShuo Chen{
199354280cfSShuo Chen  sockets::close(sockfd);
200354280cfSShuo Chen  setState(kDisconnected);
201354280cfSShuo Chen  if (connect_)
202354280cfSShuo Chen  {
203354280cfSShuo Chen    LOG_INFO << "Connector::retry - Retry connecting to "
204354280cfSShuo Chen             << serverAddr_.toHostPort() << " in "
205354280cfSShuo Chen             << retryDelayMs_ << " milliseconds. ";
206354280cfSShuo Chen    timerId_ = loop_->runAfter(retryDelayMs_/1000.0,  // FIXME: unsafe
207354280cfSShuo Chen                               boost::bind(&Connector::startInLoop, this));
208354280cfSShuo Chen    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
209354280cfSShuo Chen  }
210354280cfSShuo Chen  else
211354280cfSShuo Chen  {
212354280cfSShuo Chen    LOG_DEBUG << "do not connect";
213354280cfSShuo Chen  }
214354280cfSShuo Chen}
215354280cfSShuo Chen
216