1#include "Acceptor.h" 2#include "InetAddress.h" 3#include "TcpStream.h" 4 5#include <iostream> 6 7#include <boost/program_options.hpp> 8 9#include <sys/time.h> 10 11struct Options 12{ 13 uint16_t port; 14 int length; 15 int number; 16 bool transmit, receive, nodelay; 17 std::string host; 18 Options() 19 : port(0), length(0), number(0), 20 transmit(false), receive(false), nodelay(false) 21 { 22 } 23}; 24 25struct SessionMessage 26{ 27 int32_t number; 28 int32_t length; 29} __attribute__ ((__packed__)); 30 31struct PayloadMessage 32{ 33 int32_t length; 34 char data[0]; 35}; 36 37double now() 38{ 39 struct timeval tv = { 0, 0 }; 40 gettimeofday(&tv, NULL); 41 return tv.tv_sec + tv.tv_usec / 1000000.0; 42} 43 44// FIXME: rewrite with getopt(3). 45bool parseCommandLine(int argc, char* argv[], Options* opt) 46{ 47 namespace po = boost::program_options; 48 49 po::options_description desc("Allowed options"); 50 desc.add_options() 51 ("help,h", "Help") 52 ("port,p", po::value<uint16_t>(&opt->port)->default_value(5001), "TCP port") 53 ("length,l", po::value<int>(&opt->length)->default_value(65536), "Buffer length") 54 ("number,n", po::value<int>(&opt->number)->default_value(8192), "Number of buffers") 55 ("trans,t", po::value<std::string>(&opt->host), "Transmit") 56 ("recv,r", "Receive") 57 ("nodelay,D", "set TCP_NODELAY") 58 ; 59 60 po::variables_map vm; 61 po::store(po::parse_command_line(argc, argv, desc), vm); 62 po::notify(vm); 63 64 opt->transmit = vm.count("trans"); 65 opt->receive = vm.count("recv"); 66 opt->nodelay = vm.count("nodelay"); 67 if (vm.count("help")) 68 { 69 std::cout << desc << std::endl; 70 return false; 71 } 72 73 if (opt->transmit == opt->receive) 74 { 75 printf("either -t or -r must be specified.\n"); 76 return false; 77 } 78 79 printf("port = %d\n", opt->port); 80 if (opt->transmit) 81 { 82 printf("buffer length = %d\n", opt->length); 83 printf("number of buffers = %d\n", opt->number); 84 } 85 else 86 { 87 printf("accepting...\n"); 88 } 89 return true; 90} 91 92void transmit(const Options& opt) 93{ 94 InetAddress addr; 95 if (!InetAddress::resolve(opt.host.c_str(), opt.port, &addr)) 96 { 97 printf("Unable to resolve %s\n", opt.host.c_str()); 98 return; 99 } 100 101 printf("connecting to %s\n", addr.toIpPort().c_str()); 102 TcpStreamPtr stream(TcpStream::connect(addr)); 103 if (!stream) 104 { 105 printf("Unable to connect %s\n", addr.toIpPort().c_str()); 106 perror(""); 107 return; 108 } 109 110 if (opt.nodelay) 111 { 112 stream->setTcpNoDelay(true); 113 } 114 printf("connected\n"); 115 double start = now(); 116 struct SessionMessage sessionMessage = { 0, 0 }; 117 sessionMessage.number = htonl(opt.number); 118 sessionMessage.length = htonl(opt.length); 119 if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)) 120 { 121 perror("write SessionMessage"); 122 return; 123 } 124 125 const int total_len = sizeof(int32_t) + opt.length; 126 PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); 127 std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free); 128 assert(payload); 129 payload->length = htonl(opt.length); 130 for (int i = 0; i < opt.length; ++i) 131 { 132 payload->data[i] = "0123456789ABCDEF"[i % 16]; 133 } 134 135 double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024; 136 printf("%.3f MiB in total\n", total_mb); 137 138 for (int i = 0; i < opt.number; ++i) 139 { 140 int nw = stream->sendAll(payload, total_len); 141 assert(nw == total_len); 142 143 int ack = 0; 144 int nr = stream->receiveAll(&ack, sizeof(ack)); 145 assert(nr == sizeof(ack)); 146 ack = ntohl(ack); 147 assert(ack == opt.length); 148 } 149 150 double elapsed = now() - start; 151 printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed); 152} 153 154void receive(const Options& opt) 155{ 156 Acceptor acceptor(InetAddress(opt.port)); 157 TcpStreamPtr stream(acceptor.accept()); 158 if (!stream) 159 { 160 return; 161 } 162 struct SessionMessage sessionMessage = { 0, 0 }; 163 if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)) 164 { 165 perror("read SessionMessage"); 166 return; 167 } 168 169 sessionMessage.number = ntohl(sessionMessage.number); 170 sessionMessage.length = ntohl(sessionMessage.length); 171 printf("receive buffer length = %d\nreceive number of buffers = %d\n", 172 sessionMessage.length, sessionMessage.number); 173 double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024; 174 printf("%.3f MiB in total\n", total_mb); 175 176 const int total_len = sizeof(int32_t) + sessionMessage.length; 177 PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); 178 std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free); 179 assert(payload); 180 181 double start = now(); 182 for (int i = 0; i < sessionMessage.number; ++i) 183 { 184 payload->length = 0; 185 if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length)) 186 { 187 perror("read length"); 188 return; 189 } 190 payload->length = ntohl(payload->length); 191 assert(payload->length == sessionMessage.length); 192 if (stream->receiveAll(payload->data, payload->length) != payload->length) 193 { 194 perror("read payload data"); 195 return; 196 } 197 int32_t ack = htonl(payload->length); 198 if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack)) 199 { 200 perror("write ack"); 201 return; 202 } 203 } 204 double elapsed = now() - start; 205 printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed); 206} 207 208int main(int argc, char* argv[]) 209{ 210 Options options; 211 if (parseCommandLine(argc, argv, &options)) 212 { 213 if (options.transmit) 214 { 215 transmit(options); 216 } 217 else if (options.receive) 218 { 219 receive(options); 220 } 221 else 222 { 223 assert(0); 224 } 225 } 226} 227