1#include "Acceptor.h"
2#include "InetAddress.h"
3#include "TcpStream.h"
4
5#include <iostream>
6
7#include <boost/program_options.hpp>
8
9#include <sys/time.h>
10
11struct Options
12{
13  uint16_t port;
14  int length;
15  int number;
16  bool transmit, receive, nodelay;
17  std::string host;
18  Options()
19    : port(0), length(0), number(0),
20      transmit(false), receive(false), nodelay(false)
21  {
22  }
23};
24
25struct SessionMessage
26{
27  int32_t number;
28  int32_t length;
29} __attribute__ ((__packed__));
30
31struct PayloadMessage
32{
33  int32_t length;
34  char data[0];
35};
36
37double now()
38{
39  struct timeval tv = { 0, 0 };
40  gettimeofday(&tv, NULL);
41  return tv.tv_sec + tv.tv_usec / 1000000.0;
42}
43
44// FIXME: rewrite with getopt(3).
45bool parseCommandLine(int argc, char* argv[], Options* opt)
46{
47  namespace po = boost::program_options;
48
49  po::options_description desc("Allowed options");
50  desc.add_options()
51      ("help,h", "Help")
52      ("port,p", po::value<uint16_t>(&opt->port)->default_value(5001), "TCP port")
53      ("length,l", po::value<int>(&opt->length)->default_value(65536), "Buffer length")
54      ("number,n", po::value<int>(&opt->number)->default_value(8192), "Number of buffers")
55      ("trans,t",  po::value<std::string>(&opt->host), "Transmit")
56      ("recv,r", "Receive")
57      ("nodelay,D", "set TCP_NODELAY")
58      ;
59
60  po::variables_map vm;
61  po::store(po::parse_command_line(argc, argv, desc), vm);
62  po::notify(vm);
63
64  opt->transmit = vm.count("trans");
65  opt->receive = vm.count("recv");
66  opt->nodelay = vm.count("nodelay");
67  if (vm.count("help"))
68  {
69    std::cout << desc << std::endl;
70    return false;
71  }
72
73  if (opt->transmit == opt->receive)
74  {
75    printf("either -t or -r must be specified.\n");
76    return false;
77  }
78
79  printf("port = %d\n", opt->port);
80  if (opt->transmit)
81  {
82    printf("buffer length = %d\n", opt->length);
83    printf("number of buffers = %d\n", opt->number);
84  }
85  else
86  {
87    printf("accepting...\n");
88  }
89  return true;
90}
91
92void transmit(const Options& opt)
93{
94  InetAddress addr;
95  if (!InetAddress::resolve(opt.host.c_str(), opt.port, &addr))
96  {
97    printf("Unable to resolve %s\n", opt.host.c_str());
98    return;
99  }
100
101  printf("connecting to %s\n", addr.toIpPort().c_str());
102  TcpStreamPtr stream(TcpStream::connect(addr));
103  if (!stream)
104  {
105    printf("Unable to connect %s\n", addr.toIpPort().c_str());
106    perror("");
107    return;
108  }
109
110  if (opt.nodelay)
111  {
112    stream->setTcpNoDelay(true);
113  }
114  printf("connected\n");
115  double start = now();
116  struct SessionMessage sessionMessage = { 0, 0 };
117  sessionMessage.number = htonl(opt.number);
118  sessionMessage.length = htonl(opt.length);
119  if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
120  {
121    perror("write SessionMessage");
122    return;
123  }
124
125  const int total_len = sizeof(int32_t) + opt.length;
126  PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
127  std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
128  assert(payload);
129  payload->length = htonl(opt.length);
130  for (int i = 0; i < opt.length; ++i)
131  {
132    payload->data[i] = "0123456789ABCDEF"[i % 16];
133  }
134
135  double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;
136  printf("%.3f MiB in total\n", total_mb);
137
138  for (int i = 0; i < opt.number; ++i)
139  {
140    int nw = stream->sendAll(payload, total_len);
141    assert(nw == total_len);
142
143    int ack = 0;
144    int nr = stream->receiveAll(&ack, sizeof(ack));
145    assert(nr == sizeof(ack));
146    ack = ntohl(ack);
147    assert(ack == opt.length);
148  }
149
150  double elapsed = now() - start;
151  printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
152}
153
154void receive(const Options& opt)
155{
156  Acceptor acceptor(InetAddress(opt.port));
157  TcpStreamPtr stream(acceptor.accept());
158  if (!stream)
159  {
160    return;
161  }
162  struct SessionMessage sessionMessage = { 0, 0 };
163  if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
164  {
165    perror("read SessionMessage");
166    return;
167  }
168
169  sessionMessage.number = ntohl(sessionMessage.number);
170  sessionMessage.length = ntohl(sessionMessage.length);
171  printf("receive buffer length = %d\nreceive number of buffers = %d\n",
172         sessionMessage.length, sessionMessage.number);
173  double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024;
174  printf("%.3f MiB in total\n", total_mb);
175
176  const int total_len = sizeof(int32_t) + sessionMessage.length;
177  PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
178  std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
179  assert(payload);
180
181  double start = now();
182  for (int i = 0; i < sessionMessage.number; ++i)
183  {
184    payload->length = 0;
185    if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length))
186    {
187      perror("read length");
188      return;
189    }
190    payload->length = ntohl(payload->length);
191    assert(payload->length == sessionMessage.length);
192    if (stream->receiveAll(payload->data, payload->length) != payload->length)
193    {
194      perror("read payload data");
195      return;
196    }
197    int32_t ack = htonl(payload->length);
198    if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack))
199    {
200      perror("write ack");
201      return;
202    }
203  }
204  double elapsed = now() - start;
205  printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
206}
207
208int main(int argc, char* argv[])
209{
210  Options options;
211  if (parseCommandLine(argc, argv, &options))
212  {
213    if (options.transmit)
214    {
215      transmit(options);
216    }
217    else if (options.receive)
218    {
219      receive(options);
220    }
221    else
222    {
223      assert(0);
224    }
225  }
226}
227