1efbfdb0cSShuo Chen#include "TcpStream.h"
2f41285cfSShuo Chen#include "InetAddress.h"
3efbfdb0cSShuo Chen
4355c2bf7SShuo Chen#include <errno.h>
5355c2bf7SShuo Chen#include <signal.h>
6355c2bf7SShuo Chen#include <unistd.h>
7355c2bf7SShuo Chen#include <sys/socket.h>
8355c2bf7SShuo Chen
9355c2bf7SShuo Chennamespace
10355c2bf7SShuo Chen{
11355c2bf7SShuo Chen
12355c2bf7SShuo Chenclass IgnoreSigPipe
13355c2bf7SShuo Chen{
14355c2bf7SShuo Chen public:
15355c2bf7SShuo Chen  IgnoreSigPipe()
16355c2bf7SShuo Chen  {
17355c2bf7SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
18355c2bf7SShuo Chen  }
19355c2bf7SShuo Chen} initObj;
20355c2bf7SShuo Chen
21f41285cfSShuo Chenbool isSelfConnection(const Socket& sock)
22f41285cfSShuo Chen{
23f41285cfSShuo Chen  return sock.getLocalAddr() == sock.getPeerAddr();
24f41285cfSShuo Chen}
25f41285cfSShuo Chen
26355c2bf7SShuo Chen}
27355c2bf7SShuo Chen
28efbfdb0cSShuo ChenTcpStream::TcpStream(Socket&& sock)
29efbfdb0cSShuo Chen  : sock_(std::move(sock))
30efbfdb0cSShuo Chen{
31efbfdb0cSShuo Chen}
32355c2bf7SShuo Chen
33aafef3ccSShuo Chenint TcpStream::receiveAll(void* buf, int len)
34355c2bf7SShuo Chen{
35e146b157SShuo Chen#ifdef TEMP_FAILURE_RETRY
368bdf4aafSShuo Chen  return TEMP_FAILURE_RETRY(::recv(sock_.fd(), buf, len, MSG_WAITALL));
37e146b157SShuo Chen#else
38e146b157SShuo Chen  return ::recv(sock_.fd(), buf, len, MSG_WAITALL);
39e146b157SShuo Chen#endif
40355c2bf7SShuo Chen}
41355c2bf7SShuo Chen
42aafef3ccSShuo Chenint TcpStream::receiveSome(void* buf, int len)
43355c2bf7SShuo Chen{
448bdf4aafSShuo Chen  return sock_.recv(buf, len);
45355c2bf7SShuo Chen}
46355c2bf7SShuo Chen
47355c2bf7SShuo Chenint TcpStream::sendAll(const void* buf, int len)
48355c2bf7SShuo Chen{
49355c2bf7SShuo Chen  int written = 0;
50355c2bf7SShuo Chen  while (written < len)
51355c2bf7SShuo Chen  {
528bdf4aafSShuo Chen    int nw = sock_.send(static_cast<const char*>(buf) + written, len - written);
537573bb11SShuo Chen    if (nw > 0)
54355c2bf7SShuo Chen    {
557573bb11SShuo Chen      written += nw;
56355c2bf7SShuo Chen    }
578bdf4aafSShuo Chen    else
58355c2bf7SShuo Chen    {
59355c2bf7SShuo Chen      break;
60355c2bf7SShuo Chen    }
61355c2bf7SShuo Chen  }
62355c2bf7SShuo Chen  return written;
63355c2bf7SShuo Chen}
64355c2bf7SShuo Chen
65355c2bf7SShuo Chenint TcpStream::sendSome(const void* buf, int len)
66355c2bf7SShuo Chen{
678bdf4aafSShuo Chen  return sock_.send(buf, len);
68355c2bf7SShuo Chen}
698f04b50cSShuo Chen
70569528b0SShuo Chenvoid TcpStream::setTcpNoDelay(bool on)
71a52ee0fdSShuo Chen{
72a52ee0fdSShuo Chen  sock_.setTcpNoDelay(on);
73a52ee0fdSShuo Chen}
74a52ee0fdSShuo Chen
75a52ee0fdSShuo Chenvoid TcpStream::shutdownWrite()
76a52ee0fdSShuo Chen{
77a52ee0fdSShuo Chen  sock_.shutdownWrite();
78a52ee0fdSShuo Chen}
79a52ee0fdSShuo Chen
808f04b50cSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr)
818f04b50cSShuo Chen{
82f41285cfSShuo Chen  return connectInternal(serverAddr, nullptr);
838f04b50cSShuo Chen}
848f04b50cSShuo Chen
85aafef3ccSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr, const InetAddress& localAddr)
868f04b50cSShuo Chen{
87f41285cfSShuo Chen  return connectInternal(serverAddr, &localAddr);
88f41285cfSShuo Chen}
89f41285cfSShuo Chen
90f41285cfSShuo ChenTcpStreamPtr TcpStream::connectInternal(const InetAddress& serverAddr, const InetAddress* localAddr)
91f41285cfSShuo Chen{
928f04b50cSShuo Chen  TcpStreamPtr stream;
9324ca08a8SShuo Chen  Socket sock(Socket::createTCP(serverAddr.family()));
94f41285cfSShuo Chen  if (localAddr)
95f41285cfSShuo Chen  {
96f41285cfSShuo Chen    sock.bindOrDie(*localAddr);
97f41285cfSShuo Chen  }
98f41285cfSShuo Chen  if (sock.connect(serverAddr) == 0 && !isSelfConnection(sock))
998f04b50cSShuo Chen  {
1008f04b50cSShuo Chen    // FIXME: do poll(POLLOUT) to check errors
1018f04b50cSShuo Chen    stream.reset(new TcpStream(std::move(sock)));
1028f04b50cSShuo Chen  }
1038f04b50cSShuo Chen  return stream;
1048f04b50cSShuo Chen}
105