Connector.cc revision fdb5c17c
1a1bde736SShuo Chen// excerpts from http://code.google.com/p/muduo/
2a1bde736SShuo Chen//
3a1bde736SShuo Chen// Use of this source code is governed by a BSD-style license
4a1bde736SShuo Chen// that can be found in the License file.
5a1bde736SShuo Chen//
6a1bde736SShuo Chen// Author: Shuo Chen (chenshuo at chenshuo dot com)
7a1bde736SShuo Chen
8a1bde736SShuo Chen#include "Connector.h"
9a1bde736SShuo Chen
10a1bde736SShuo Chen#include "Channel.h"
11a1bde736SShuo Chen#include "EventLoop.h"
12a1bde736SShuo Chen#include "SocketsOps.h"
13a1bde736SShuo Chen
14a1bde736SShuo Chen#include "logging/Logging.h"
15a1bde736SShuo Chen
16a1bde736SShuo Chen#include <boost/bind.hpp>
17a1bde736SShuo Chen
18a1bde736SShuo Chen#include <errno.h>
19a1bde736SShuo Chen
20a1bde736SShuo Chenusing namespace muduo;
21a1bde736SShuo Chen
22a1bde736SShuo Chenconst int Connector::kMaxRetryDelayMs;
23a1bde736SShuo Chen
24a1bde736SShuo ChenConnector::Connector(EventLoop* loop, const InetAddress& serverAddr)
25a1bde736SShuo Chen  : loop_(loop),
26a1bde736SShuo Chen    serverAddr_(serverAddr),
27a1bde736SShuo Chen    connect_(false),
28a1bde736SShuo Chen    state_(kDisconnected),
29a1bde736SShuo Chen    retryDelayMs_(kInitRetryDelayMs)
30a1bde736SShuo Chen{
31a1bde736SShuo Chen  LOG_DEBUG << "ctor[" << this << "]";
32a1bde736SShuo Chen}
33a1bde736SShuo Chen
34a1bde736SShuo ChenConnector::~Connector()
35a1bde736SShuo Chen{
36a1bde736SShuo Chen  LOG_DEBUG << "dtor[" << this << "]";
37a1bde736SShuo Chen  loop_->cancel(timerId_);
38a1bde736SShuo Chen  assert(!channel_);
39a1bde736SShuo Chen}
40a1bde736SShuo Chen
41a1bde736SShuo Chenvoid Connector::start()
42a1bde736SShuo Chen{
43a1bde736SShuo Chen  connect_ = true;
44a1bde736SShuo Chen  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
45a1bde736SShuo Chen}
46a1bde736SShuo Chen
47a1bde736SShuo Chenvoid Connector::startInLoop()
48a1bde736SShuo Chen{
49a1bde736SShuo Chen  loop_->assertInLoopThread();
50a1bde736SShuo Chen  assert(state_ == kDisconnected);
51a1bde736SShuo Chen  if (connect_)
52a1bde736SShuo Chen  {
53a1bde736SShuo Chen    connect();
54a1bde736SShuo Chen  }
55a1bde736SShuo Chen  else
56a1bde736SShuo Chen  {
57a1bde736SShuo Chen    LOG_DEBUG << "do not connect";
58a1bde736SShuo Chen  }
59a1bde736SShuo Chen}
60a1bde736SShuo Chen
61a1bde736SShuo Chenvoid Connector::connect()
62a1bde736SShuo Chen{
63a1bde736SShuo Chen  int sockfd = sockets::createNonblockingOrDie();
64a1bde736SShuo Chen  int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet());
65a1bde736SShuo Chen  int savedErrno = (ret == 0) ? 0 : errno;
66a1bde736SShuo Chen  switch (savedErrno)
67a1bde736SShuo Chen  {
68a1bde736SShuo Chen    case 0:
69a1bde736SShuo Chen    case EINPROGRESS:
70a1bde736SShuo Chen    case EINTR:
71a1bde736SShuo Chen    case EISCONN:
72a1bde736SShuo Chen      connecting(sockfd);
73a1bde736SShuo Chen      break;
74a1bde736SShuo Chen
75a1bde736SShuo Chen    case EAGAIN:
76a1bde736SShuo Chen    case EADDRINUSE:
77a1bde736SShuo Chen    case EADDRNOTAVAIL:
78a1bde736SShuo Chen    case ECONNREFUSED:
79a1bde736SShuo Chen    case ENETUNREACH:
80a1bde736SShuo Chen      retry(sockfd);
81a1bde736SShuo Chen      break;
82a1bde736SShuo Chen
83a1bde736SShuo Chen    case EACCES:
84a1bde736SShuo Chen    case EPERM:
85a1bde736SShuo Chen    case EAFNOSUPPORT:
86a1bde736SShuo Chen    case EALREADY:
87a1bde736SShuo Chen    case EBADF:
88a1bde736SShuo Chen    case EFAULT:
89a1bde736SShuo Chen    case ENOTSOCK:
90a1bde736SShuo Chen      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
91a1bde736SShuo Chen      sockets::close(sockfd);
92a1bde736SShuo Chen      break;
93a1bde736SShuo Chen
94a1bde736SShuo Chen    default:
95a1bde736SShuo Chen      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
96a1bde736SShuo Chen      sockets::close(sockfd);
97a1bde736SShuo Chen      // connectErrorCallback_();
98a1bde736SShuo Chen      break;
99a1bde736SShuo Chen  }
100a1bde736SShuo Chen}
101a1bde736SShuo Chen
102a1bde736SShuo Chenvoid Connector::restart()
103a1bde736SShuo Chen{
104a1bde736SShuo Chen  loop_->assertInLoopThread();
105a1bde736SShuo Chen  setState(kDisconnected);
106a1bde736SShuo Chen  retryDelayMs_ = kInitRetryDelayMs;
107a1bde736SShuo Chen  connect_ = true;
108a1bde736SShuo Chen  startInLoop();
109a1bde736SShuo Chen}
110a1bde736SShuo Chen
111a1bde736SShuo Chenvoid Connector::stop()
112a1bde736SShuo Chen{
113a1bde736SShuo Chen  connect_ = false;
114fdb5c17cSShuo Chen  loop_->cancel(timerId_);
115a1bde736SShuo Chen}
116a1bde736SShuo Chen
117a1bde736SShuo Chenvoid Connector::connecting(int sockfd)
118a1bde736SShuo Chen{
119a1bde736SShuo Chen  setState(kConnecting);
120a1bde736SShuo Chen  assert(!channel_);
121a1bde736SShuo Chen  channel_.reset(new Channel(loop_, sockfd));
122a1bde736SShuo Chen  channel_->setWriteCallback(
123a1bde736SShuo Chen      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
124a1bde736SShuo Chen  channel_->setErrorCallback(
125a1bde736SShuo Chen      boost::bind(&Connector::handleError, this)); // FIXME: unsafe
126a1bde736SShuo Chen
127a1bde736SShuo Chen  // channel_->tie(shared_from_this()); is not working,
128a1bde736SShuo Chen  // as channel_ is not managed by shared_ptr
129a1bde736SShuo Chen  channel_->enableWriting();
130a1bde736SShuo Chen}
131a1bde736SShuo Chen
132a1bde736SShuo Chenint Connector::removeAndResetChannel()
133a1bde736SShuo Chen{
134a1bde736SShuo Chen  channel_->disableAll();
135a1bde736SShuo Chen  loop_->removeChannel(get_pointer(channel_));
136a1bde736SShuo Chen  int sockfd = channel_->fd();
137a1bde736SShuo Chen  // Can't reset channel_ here, because we are inside Channel::handleEvent
138a1bde736SShuo Chen  loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe
139a1bde736SShuo Chen  return sockfd;
140a1bde736SShuo Chen}
141a1bde736SShuo Chen
142a1bde736SShuo Chenvoid Connector::resetChannel()
143a1bde736SShuo Chen{
144a1bde736SShuo Chen  channel_.reset();
145a1bde736SShuo Chen}
146a1bde736SShuo Chen
147a1bde736SShuo Chenvoid Connector::handleWrite()
148a1bde736SShuo Chen{
149a1bde736SShuo Chen  LOG_TRACE << "Connector::handleWrite " << state_;
150a1bde736SShuo Chen
151a1bde736SShuo Chen  if (state_ == kConnecting)
152a1bde736SShuo Chen  {
153a1bde736SShuo Chen    int sockfd = removeAndResetChannel();
154a1bde736SShuo Chen    int err = sockets::getSocketError(sockfd);
155a1bde736SShuo Chen    if (err)
156a1bde736SShuo Chen    {
157a1bde736SShuo Chen      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
158a1bde736SShuo Chen               << err << " " << strerror_tl(err);
159a1bde736SShuo Chen      retry(sockfd);
160a1bde736SShuo Chen    }
161a1bde736SShuo Chen    else if (sockets::isSelfConnect(sockfd))
162a1bde736SShuo Chen    {
163a1bde736SShuo Chen      LOG_WARN << "Connector::handleWrite - Self connect";
164a1bde736SShuo Chen      retry(sockfd);
165a1bde736SShuo Chen    }
166a1bde736SShuo Chen    else
167a1bde736SShuo Chen    {
168a1bde736SShuo Chen      setState(kConnected);
169a1bde736SShuo Chen      if (connect_)
170a1bde736SShuo Chen      {
171a1bde736SShuo Chen        newConnectionCallback_(sockfd);
172a1bde736SShuo Chen      }
173a1bde736SShuo Chen      else
174a1bde736SShuo Chen      {
175a1bde736SShuo Chen        sockets::close(sockfd);
176a1bde736SShuo Chen      }
177a1bde736SShuo Chen    }
178a1bde736SShuo Chen  }
179a1bde736SShuo Chen  else
180a1bde736SShuo Chen  {
181a1bde736SShuo Chen    // what happened?
182a1bde736SShuo Chen    assert(state_ == kDisconnected);
183a1bde736SShuo Chen  }
184a1bde736SShuo Chen}
185a1bde736SShuo Chen
186a1bde736SShuo Chenvoid Connector::handleError()
187a1bde736SShuo Chen{
188a1bde736SShuo Chen  LOG_ERROR << "Connector::handleError";
189a1bde736SShuo Chen  assert(state_ == kConnecting);
190a1bde736SShuo Chen
191a1bde736SShuo Chen  int sockfd = removeAndResetChannel();
192a1bde736SShuo Chen  int err = sockets::getSocketError(sockfd);
193a1bde736SShuo Chen  LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
194a1bde736SShuo Chen  retry(sockfd);
195a1bde736SShuo Chen}
196a1bde736SShuo Chen
197a1bde736SShuo Chenvoid Connector::retry(int sockfd)
198a1bde736SShuo Chen{
199a1bde736SShuo Chen  sockets::close(sockfd);
200a1bde736SShuo Chen  setState(kDisconnected);
201a1bde736SShuo Chen  if (connect_)
202a1bde736SShuo Chen  {
203a1bde736SShuo Chen    LOG_INFO << "Connector::retry - Retry connecting to "
204a1bde736SShuo Chen             << serverAddr_.toHostPort() << " in "
205a1bde736SShuo Chen             << retryDelayMs_ << " milliseconds. ";
206a1bde736SShuo Chen    timerId_ = loop_->runAfter(retryDelayMs_/1000.0,  // FIXME: unsafe
207a1bde736SShuo Chen                               boost::bind(&Connector::startInLoop, this));
208a1bde736SShuo Chen    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
209a1bde736SShuo Chen  }
210a1bde736SShuo Chen  else
211a1bde736SShuo Chen  {
212a1bde736SShuo Chen    LOG_DEBUG << "do not connect";
213a1bde736SShuo Chen  }
214a1bde736SShuo Chen}
215a1bde736SShuo Chen
216