TcpStream.cc revision e146b157
1efbfdb0cSShuo Chen#include "TcpStream.h" 2f41285cfSShuo Chen#include "InetAddress.h" 3efbfdb0cSShuo Chen 4355c2bf7SShuo Chen#include <errno.h> 5355c2bf7SShuo Chen#include <signal.h> 6355c2bf7SShuo Chen#include <unistd.h> 7355c2bf7SShuo Chen#include <sys/socket.h> 8355c2bf7SShuo Chen 9355c2bf7SShuo Chennamespace 10355c2bf7SShuo Chen{ 11355c2bf7SShuo Chen 12355c2bf7SShuo Chenclass IgnoreSigPipe 13355c2bf7SShuo Chen{ 14355c2bf7SShuo Chen public: 15355c2bf7SShuo Chen IgnoreSigPipe() 16355c2bf7SShuo Chen { 17355c2bf7SShuo Chen ::signal(SIGPIPE, SIG_IGN); 18355c2bf7SShuo Chen } 19355c2bf7SShuo Chen} initObj; 20355c2bf7SShuo Chen 21f41285cfSShuo Chenbool isSelfConnection(const Socket& sock) 22f41285cfSShuo Chen{ 23f41285cfSShuo Chen return sock.getLocalAddr() == sock.getPeerAddr(); 24f41285cfSShuo Chen} 25f41285cfSShuo Chen 26355c2bf7SShuo Chen} 27355c2bf7SShuo Chen 28efbfdb0cSShuo ChenTcpStream::TcpStream(Socket&& sock) 29efbfdb0cSShuo Chen : sock_(std::move(sock)) 30efbfdb0cSShuo Chen{ 31efbfdb0cSShuo Chen} 32355c2bf7SShuo Chen 33aafef3ccSShuo Chenint TcpStream::receiveAll(void* buf, int len) 34355c2bf7SShuo Chen{ 35e146b157SShuo Chen#ifdef TEMP_FAILURE_RETRY 368bdf4aafSShuo Chen return TEMP_FAILURE_RETRY(::recv(sock_.fd(), buf, len, MSG_WAITALL)); 37e146b157SShuo Chen#else 38e146b157SShuo Chen return ::recv(sock_.fd(), buf, len, MSG_WAITALL); 39e146b157SShuo Chen#endif 40355c2bf7SShuo Chen} 41355c2bf7SShuo Chen 42aafef3ccSShuo Chenint TcpStream::receiveSome(void* buf, int len) 43355c2bf7SShuo Chen{ 448bdf4aafSShuo Chen return sock_.recv(buf, len); 45355c2bf7SShuo Chen} 46355c2bf7SShuo Chen 47355c2bf7SShuo Chenint TcpStream::sendAll(const void* buf, int len) 48355c2bf7SShuo Chen{ 49355c2bf7SShuo Chen int written = 0; 50355c2bf7SShuo Chen while (written < len) 51355c2bf7SShuo Chen { 528bdf4aafSShuo Chen int nw = sock_.send(static_cast<const char*>(buf) + written, len - written); 537573bb11SShuo Chen if (nw > 0) 54355c2bf7SShuo Chen { 557573bb11SShuo Chen written += nw; 56355c2bf7SShuo Chen } 578bdf4aafSShuo Chen else 58355c2bf7SShuo Chen { 59355c2bf7SShuo Chen break; 60355c2bf7SShuo Chen } 61355c2bf7SShuo Chen } 62355c2bf7SShuo Chen return written; 63355c2bf7SShuo Chen} 64355c2bf7SShuo Chen 65355c2bf7SShuo Chenint TcpStream::sendSome(const void* buf, int len) 66355c2bf7SShuo Chen{ 678bdf4aafSShuo Chen return sock_.send(buf, len); 68355c2bf7SShuo Chen} 698f04b50cSShuo Chen 70569528b0SShuo Chenvoid TcpStream::setTcpNoDelay(bool on) 71a52ee0fdSShuo Chen{ 72a52ee0fdSShuo Chen sock_.setTcpNoDelay(on); 73a52ee0fdSShuo Chen} 74a52ee0fdSShuo Chen 75a52ee0fdSShuo Chenvoid TcpStream::shutdownWrite() 76a52ee0fdSShuo Chen{ 77a52ee0fdSShuo Chen sock_.shutdownWrite(); 78a52ee0fdSShuo Chen} 79a52ee0fdSShuo Chen 808f04b50cSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr) 818f04b50cSShuo Chen{ 82f41285cfSShuo Chen return connectInternal(serverAddr, nullptr); 838f04b50cSShuo Chen} 848f04b50cSShuo Chen 85aafef3ccSShuo ChenTcpStreamPtr TcpStream::connect(const InetAddress& serverAddr, const InetAddress& localAddr) 868f04b50cSShuo Chen{ 87f41285cfSShuo Chen return connectInternal(serverAddr, &localAddr); 88f41285cfSShuo Chen} 89f41285cfSShuo Chen 90f41285cfSShuo ChenTcpStreamPtr TcpStream::connectInternal(const InetAddress& serverAddr, const InetAddress* localAddr) 91f41285cfSShuo Chen{ 928f04b50cSShuo Chen TcpStreamPtr stream; 9324ca08a8SShuo Chen Socket sock(Socket::createTCP(serverAddr.family())); 94f41285cfSShuo Chen if (localAddr) 95f41285cfSShuo Chen { 96f41285cfSShuo Chen sock.bindOrDie(*localAddr); 97f41285cfSShuo Chen } 98f41285cfSShuo Chen if (sock.connect(serverAddr) == 0 && !isSelfConnection(sock)) 998f04b50cSShuo Chen { 1008f04b50cSShuo Chen // FIXME: do poll(POLLOUT) to check errors 1018f04b50cSShuo Chen stream.reset(new TcpStream(std::move(sock))); 1028f04b50cSShuo Chen } 1038f04b50cSShuo Chen return stream; 1048f04b50cSShuo Chen} 105