TcpStream.cc revision 8bdf4aaf
1efbfdb0cSShuo Chen#include "TcpStream.h"
2f41285cfSShuo Chen#include "InetAddress.h"
3efbfdb0cSShuo Chen
4355c2bf7SShuo Chen#include <errno.h>
5355c2bf7SShuo Chen#include <signal.h>
6355c2bf7SShuo Chen#include <unistd.h>
7355c2bf7SShuo Chen#include <sys/socket.h>
8355c2bf7SShuo Chen
9355c2bf7SShuo Chennamespace
10355c2bf7SShuo Chen{
11355c2bf7SShuo Chen
12355c2bf7SShuo Chenclass IgnoreSigPipe
13355c2bf7SShuo Chen{
14355c2bf7SShuo Chen public:
15355c2bf7SShuo Chen  IgnoreSigPipe()
16355c2bf7SShuo Chen  {
17355c2bf7SShuo Chen    ::signal(SIGPIPE, SIG_IGN);
18355c2bf7SShuo Chen  }
19355c2bf7SShuo Chen} initObj;
20355c2bf7SShuo Chen
21f41285cfSShuo Chenbool isSelfConnection(const Socket& sock)
22f41285cfSShuo Chen{
23f41285cfSShuo Chen  return sock.getLocalAddr() == sock.getPeerAddr();
24f41285cfSShuo Chen}
25f41285cfSShuo Chen
26355c2bf7SShuo Chen}
27355c2bf7SShuo Chen
28efbfdb0cSShuo ChenTcpStream::TcpStream(Socket&& sock)
29efbfdb0cSShuo Chen  : sock_(std::move(sock))
30efbfdb0cSShuo Chen{
31efbfdb0cSShuo Chen}
32355c2bf7SShuo Chen
33aafef3ccSShuo Chenint TcpStream::receiveAll(void* buf, int len)
34355c2bf7SShuo Chen{
358bdf4aafSShuo Chen  return TEMP_FAILURE_RETRY(::recv(sock_.fd(), buf, len, MSG_WAITALL));
36355c2bf7SShuo Chen}
37355c2bf7SShuo Chen
38aafef3ccSShuo Chenint TcpStream::receiveSome(void* buf, int len)
39355c2bf7SShuo Chen{
408bdf4aafSShuo Chen  return sock_.recv(buf, len);
41355c2bf7SShuo Chen}
42355c2bf7SShuo Chen
43355c2bf7SShuo Chenint TcpStream::sendAll(const void* buf, int len)
44355c2bf7SShuo Chen{
45355c2bf7SShuo Chen  int written = 0;
46355c2bf7SShuo Chen  while (written < len)
47355c2bf7SShuo Chen  {
488bdf4aafSShuo Chen    int nw = sock_.send(static_cast<const char*>(buf) + written, len - written);
497573bb11SShuo Chen    if (nw > 0)
50355c2bf7SShuo Chen    {
517573bb11SShuo Chen      written += nw;
52355c2bf7SShuo Chen    }
538bdf4aafSShuo Chen    else
54355c2bf7SShuo Chen    {
55355c2bf7SShuo Chen      break;
56355c2bf7SShuo Chen    }
57355c2bf7SShuo Chen  }
58355c2bf7SShuo Chen  return written;
59355c2bf7SShuo Chen}
60355c2bf7SShuo Chen
61355c2bf7SShuo Chenint TcpStream::sendSome(const void* buf, int len)
62355c2bf7SShuo Chen{
638bdf4aafSShuo Chen  return sock_.send(buf, len);
64355c2bf7SShuo Chen}
658f04b50cSShuo Chen
66569528b0SShuo Chenvoid TcpStream::setTcpNoDelay(bool on)
67a52ee0fdSShuo Chen{
68a52ee0fdSShuo Chen  sock_.setTcpNoDelay(on);
69a52ee0fdSShuo Chen}
70a52ee0fdSShuo Chen
71a52ee0fdSShuo Chenvoid TcpStream::shutdownWrite()
72a52ee0fdSShuo Chen{
73a52ee0fdSShuo Chen  sock_.shutdownWrite();
74a52ee0fdSShuo Chen}
75a52ee0fdSShuo Chen
768f04b50cSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr)
778f04b50cSShuo Chen{
78f41285cfSShuo Chen  return connectInternal(serverAddr, nullptr);
798f04b50cSShuo Chen}
808f04b50cSShuo Chen
81aafef3ccSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr, const InetAddress& localAddr)
828f04b50cSShuo Chen{
83f41285cfSShuo Chen  return connectInternal(serverAddr, &localAddr);
84f41285cfSShuo Chen}
85f41285cfSShuo Chen
86f41285cfSShuo ChenTcpStreamPtr TcpStream::connectInternal(const InetAddress& serverAddr, const InetAddress* localAddr)
87f41285cfSShuo Chen{
888f04b50cSShuo Chen  TcpStreamPtr stream;
8924ca08a8SShuo Chen  Socket sock(Socket::createTCP(serverAddr.family()));
90f41285cfSShuo Chen  if (localAddr)
91f41285cfSShuo Chen  {
92f41285cfSShuo Chen    sock.bindOrDie(*localAddr);
93f41285cfSShuo Chen  }
94f41285cfSShuo Chen  if (sock.connect(serverAddr) == 0 && !isSelfConnection(sock))
958f04b50cSShuo Chen  {
968f04b50cSShuo Chen    // FIXME: do poll(POLLOUT) to check errors
978f04b50cSShuo Chen    stream.reset(new TcpStream(std::move(sock)));
988f04b50cSShuo Chen  }
998f04b50cSShuo Chen  return stream;
1008f04b50cSShuo Chen}
101