18f04b50cSShuo Chen#include "Acceptor.h" 28f04b50cSShuo Chen#include "InetAddress.h" 38f04b50cSShuo Chen#include "TcpStream.h" 48f04b50cSShuo Chen 5fca8cc3fSShuo Chen#include <iostream> 6fca8cc3fSShuo Chen 78f04b50cSShuo Chen#include <boost/program_options.hpp> 88f04b50cSShuo Chen 98f04b50cSShuo Chen#include <sys/time.h> 108f04b50cSShuo Chen 118f04b50cSShuo Chenstruct Options 128f04b50cSShuo Chen{ 138f04b50cSShuo Chen uint16_t port; 148f04b50cSShuo Chen int length; 158f04b50cSShuo Chen int number; 168f04b50cSShuo Chen bool transmit, receive, nodelay; 178f04b50cSShuo Chen std::string host; 188f04b50cSShuo Chen Options() 198f04b50cSShuo Chen : port(0), length(0), number(0), 208f04b50cSShuo Chen transmit(false), receive(false), nodelay(false) 218f04b50cSShuo Chen { 228f04b50cSShuo Chen } 238f04b50cSShuo Chen}; 248f04b50cSShuo Chen 258f04b50cSShuo Chenstruct SessionMessage 268f04b50cSShuo Chen{ 278f04b50cSShuo Chen int32_t number; 288f04b50cSShuo Chen int32_t length; 298f04b50cSShuo Chen} __attribute__ ((__packed__)); 308f04b50cSShuo Chen 318f04b50cSShuo Chenstruct PayloadMessage 328f04b50cSShuo Chen{ 338f04b50cSShuo Chen int32_t length; 348f04b50cSShuo Chen char data[0]; 358f04b50cSShuo Chen}; 368f04b50cSShuo Chen 378f04b50cSShuo Chendouble now() 388f04b50cSShuo Chen{ 398f04b50cSShuo Chen struct timeval tv = { 0, 0 }; 408f04b50cSShuo Chen gettimeofday(&tv, NULL); 418f04b50cSShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 428f04b50cSShuo Chen} 438f04b50cSShuo Chen 448f04b50cSShuo Chen// FIXME: rewrite with getopt(3). 458f04b50cSShuo Chenbool parseCommandLine(int argc, char* argv[], Options* opt) 468f04b50cSShuo Chen{ 478f04b50cSShuo Chen namespace po = boost::program_options; 488f04b50cSShuo Chen 498f04b50cSShuo Chen po::options_description desc("Allowed options"); 508f04b50cSShuo Chen desc.add_options() 518f04b50cSShuo Chen ("help,h", "Help") 528f04b50cSShuo Chen ("port,p", po::value<uint16_t>(&opt->port)->default_value(5001), "TCP port") 538f04b50cSShuo Chen ("length,l", po::value<int>(&opt->length)->default_value(65536), "Buffer length") 548f04b50cSShuo Chen ("number,n", po::value<int>(&opt->number)->default_value(8192), "Number of buffers") 558f04b50cSShuo Chen ("trans,t", po::value<std::string>(&opt->host), "Transmit") 568f04b50cSShuo Chen ("recv,r", "Receive") 578f04b50cSShuo Chen ("nodelay,D", "set TCP_NODELAY") 588f04b50cSShuo Chen ; 598f04b50cSShuo Chen 608f04b50cSShuo Chen po::variables_map vm; 618f04b50cSShuo Chen po::store(po::parse_command_line(argc, argv, desc), vm); 628f04b50cSShuo Chen po::notify(vm); 638f04b50cSShuo Chen 648f04b50cSShuo Chen opt->transmit = vm.count("trans"); 658f04b50cSShuo Chen opt->receive = vm.count("recv"); 668f04b50cSShuo Chen opt->nodelay = vm.count("nodelay"); 678f04b50cSShuo Chen if (vm.count("help")) 688f04b50cSShuo Chen { 698f04b50cSShuo Chen std::cout << desc << std::endl; 708f04b50cSShuo Chen return false; 718f04b50cSShuo Chen } 728f04b50cSShuo Chen 738f04b50cSShuo Chen if (opt->transmit == opt->receive) 748f04b50cSShuo Chen { 758f04b50cSShuo Chen printf("either -t or -r must be specified.\n"); 768f04b50cSShuo Chen return false; 778f04b50cSShuo Chen } 788f04b50cSShuo Chen 798f04b50cSShuo Chen printf("port = %d\n", opt->port); 808f04b50cSShuo Chen if (opt->transmit) 818f04b50cSShuo Chen { 828f04b50cSShuo Chen printf("buffer length = %d\n", opt->length); 838f04b50cSShuo Chen printf("number of buffers = %d\n", opt->number); 848f04b50cSShuo Chen } 858f04b50cSShuo Chen else 868f04b50cSShuo Chen { 878f04b50cSShuo Chen printf("accepting...\n"); 888f04b50cSShuo Chen } 898f04b50cSShuo Chen return true; 908f04b50cSShuo Chen} 918f04b50cSShuo Chen 928f04b50cSShuo Chenvoid transmit(const Options& opt) 938f04b50cSShuo Chen{ 9424ca08a8SShuo Chen InetAddress addr; 9524ca08a8SShuo Chen if (!InetAddress::resolve(opt.host.c_str(), opt.port, &addr)) 968f04b50cSShuo Chen { 978f04b50cSShuo Chen printf("Unable to resolve %s\n", opt.host.c_str()); 988f04b50cSShuo Chen return; 998f04b50cSShuo Chen } 1008f04b50cSShuo Chen 1018f04b50cSShuo Chen printf("connecting to %s\n", addr.toIpPort().c_str()); 1028f04b50cSShuo Chen TcpStreamPtr stream(TcpStream::connect(addr)); 1038f04b50cSShuo Chen if (!stream) 1048f04b50cSShuo Chen { 1058f04b50cSShuo Chen printf("Unable to connect %s\n", addr.toIpPort().c_str()); 1068f04b50cSShuo Chen perror(""); 1078f04b50cSShuo Chen return; 1088f04b50cSShuo Chen } 1098f04b50cSShuo Chen 110a52ee0fdSShuo Chen if (opt.nodelay) 111a52ee0fdSShuo Chen { 112569528b0SShuo Chen stream->setTcpNoDelay(true); 113a52ee0fdSShuo Chen } 1148f04b50cSShuo Chen printf("connected\n"); 1158f04b50cSShuo Chen double start = now(); 1168f04b50cSShuo Chen struct SessionMessage sessionMessage = { 0, 0 }; 1178f04b50cSShuo Chen sessionMessage.number = htonl(opt.number); 1188f04b50cSShuo Chen sessionMessage.length = htonl(opt.length); 1198f04b50cSShuo Chen if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)) 1208f04b50cSShuo Chen { 1218f04b50cSShuo Chen perror("write SessionMessage"); 1228f04b50cSShuo Chen return; 1238f04b50cSShuo Chen } 1248f04b50cSShuo Chen 1258f04b50cSShuo Chen const int total_len = sizeof(int32_t) + opt.length; 1268f04b50cSShuo Chen PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); 1278f04b50cSShuo Chen std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free); 1288f04b50cSShuo Chen assert(payload); 1298f04b50cSShuo Chen payload->length = htonl(opt.length); 1308f04b50cSShuo Chen for (int i = 0; i < opt.length; ++i) 1318f04b50cSShuo Chen { 1328f04b50cSShuo Chen payload->data[i] = "0123456789ABCDEF"[i % 16]; 1338f04b50cSShuo Chen } 1348f04b50cSShuo Chen 135a52ee0fdSShuo Chen double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024; 136a52ee0fdSShuo Chen printf("%.3f MiB in total\n", total_mb); 137a52ee0fdSShuo Chen 1388f04b50cSShuo Chen for (int i = 0; i < opt.number; ++i) 1398f04b50cSShuo Chen { 1408f04b50cSShuo Chen int nw = stream->sendAll(payload, total_len); 1418f04b50cSShuo Chen assert(nw == total_len); 1428f04b50cSShuo Chen 1438f04b50cSShuo Chen int ack = 0; 144aafef3ccSShuo Chen int nr = stream->receiveAll(&ack, sizeof(ack)); 1458f04b50cSShuo Chen assert(nr == sizeof(ack)); 1468f04b50cSShuo Chen ack = ntohl(ack); 1478f04b50cSShuo Chen assert(ack == opt.length); 1488f04b50cSShuo Chen } 1498f04b50cSShuo Chen 1508f04b50cSShuo Chen double elapsed = now() - start; 151a52ee0fdSShuo Chen printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed); 1528f04b50cSShuo Chen} 1538f04b50cSShuo Chen 1548f04b50cSShuo Chenvoid receive(const Options& opt) 1558f04b50cSShuo Chen{ 1568f04b50cSShuo Chen Acceptor acceptor(InetAddress(opt.port)); 1578f04b50cSShuo Chen TcpStreamPtr stream(acceptor.accept()); 1588f04b50cSShuo Chen if (!stream) 1598f04b50cSShuo Chen { 1608f04b50cSShuo Chen return; 1618f04b50cSShuo Chen } 1628f04b50cSShuo Chen struct SessionMessage sessionMessage = { 0, 0 }; 163aafef3ccSShuo Chen if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)) 1648f04b50cSShuo Chen { 1658f04b50cSShuo Chen perror("read SessionMessage"); 1668f04b50cSShuo Chen return; 1678f04b50cSShuo Chen } 1688f04b50cSShuo Chen 1698f04b50cSShuo Chen sessionMessage.number = ntohl(sessionMessage.number); 1708f04b50cSShuo Chen sessionMessage.length = ntohl(sessionMessage.length); 171a52ee0fdSShuo Chen printf("receive buffer length = %d\nreceive number of buffers = %d\n", 172a52ee0fdSShuo Chen sessionMessage.length, sessionMessage.number); 173a52ee0fdSShuo Chen double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024; 174a52ee0fdSShuo Chen printf("%.3f MiB in total\n", total_mb); 175a52ee0fdSShuo Chen 1768f04b50cSShuo Chen const int total_len = sizeof(int32_t) + sessionMessage.length; 1778f04b50cSShuo Chen PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); 1788f04b50cSShuo Chen std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free); 1798f04b50cSShuo Chen assert(payload); 1808f04b50cSShuo Chen 181a52ee0fdSShuo Chen double start = now(); 1828f04b50cSShuo Chen for (int i = 0; i < sessionMessage.number; ++i) 1838f04b50cSShuo Chen { 1848f04b50cSShuo Chen payload->length = 0; 185aafef3ccSShuo Chen if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length)) 1868f04b50cSShuo Chen { 1878f04b50cSShuo Chen perror("read length"); 1888f04b50cSShuo Chen return; 1898f04b50cSShuo Chen } 1908f04b50cSShuo Chen payload->length = ntohl(payload->length); 1918f04b50cSShuo Chen assert(payload->length == sessionMessage.length); 192aafef3ccSShuo Chen if (stream->receiveAll(payload->data, payload->length) != payload->length) 1938f04b50cSShuo Chen { 1948f04b50cSShuo Chen perror("read payload data"); 1958f04b50cSShuo Chen return; 1968f04b50cSShuo Chen } 1978f04b50cSShuo Chen int32_t ack = htonl(payload->length); 1988f04b50cSShuo Chen if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack)) 1998f04b50cSShuo Chen { 2008f04b50cSShuo Chen perror("write ack"); 2018f04b50cSShuo Chen return; 2028f04b50cSShuo Chen } 2038f04b50cSShuo Chen } 204a52ee0fdSShuo Chen double elapsed = now() - start; 205a52ee0fdSShuo Chen printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed); 2068f04b50cSShuo Chen} 2078f04b50cSShuo Chen 2088f04b50cSShuo Chenint main(int argc, char* argv[]) 2098f04b50cSShuo Chen{ 2108f04b50cSShuo Chen Options options; 2118f04b50cSShuo Chen if (parseCommandLine(argc, argv, &options)) 2128f04b50cSShuo Chen { 2138f04b50cSShuo Chen if (options.transmit) 2148f04b50cSShuo Chen { 2158f04b50cSShuo Chen transmit(options); 2168f04b50cSShuo Chen } 2178f04b50cSShuo Chen else if (options.receive) 2188f04b50cSShuo Chen { 2198f04b50cSShuo Chen receive(options); 2208f04b50cSShuo Chen } 2218f04b50cSShuo Chen else 2228f04b50cSShuo Chen { 2238f04b50cSShuo Chen assert(0); 2248f04b50cSShuo Chen } 2258f04b50cSShuo Chen } 2268f04b50cSShuo Chen} 227