1#include "thread/Atomic.h"
2#include "datetime/Timestamp.h"
3#include "Acceptor.h"
4#include "InetAddress.h"
5#include "TcpStream.h"
6
7#include <thread>
8#include <assert.h>
9#include <unistd.h>
10
11muduo::AtomicInt32 g_req;
12muduo::AtomicInt32 g_resp;
13muduo::AtomicInt32 g_resp_bad;
14
15void measure(int total, bool norecv)
16{
17  muduo::Timestamp start = muduo::Timestamp::now();
18  int last_req = 0;
19  int last_resp = 0;
20  while (true)
21  {
22    struct timespec ts = { 1, 0 };
23    ::nanosleep(&ts, NULL);
24    // unfortunately, those two assignments are not atomic
25    int req = g_req.get();
26    int resp = g_resp.get();
27
28    muduo::Timestamp end = muduo::Timestamp::now();
29    double elapsed = timeDifference(end, start);
30    start = end;
31    int req_delta = req - last_req;
32    int resp_delta = resp - last_resp;
33    last_req = req;
34    last_resp = resp;
35    printf("%8.0f req/s %8.0f resp/s\n", req_delta / elapsed, resp_delta / elapsed);
36    if (resp >= total || (norecv && req >= total))
37      break;
38  }
39  printf("measure thread finished.\n");
40}
41
42void sender(TcpStream* stream, int total)
43{
44  long id = 0;
45  for (int i = 0; i < total; ++i)
46  {
47    char buf[256];
48    int n = snprintf(buf, sizeof buf,
49                     "%016lx:"
50                     "000000010"
51                     "400000000"
52                     "020000000"
53                     "000050407"
54                     "008000300"
55                     "001090000"
56                     "300400200"
57                     "050100000"
58                     "000806000\r\n",
59                     id);
60    assert(n == 100);
61    int nw = stream->sendAll(buf, n);
62    if (nw != n)
63      break;
64    ++id;
65    g_req.increment();
66  }
67  printf("sender thread finished: %d requests\n", g_req.get());
68}
69
70bool readResponse(TcpStream* stream, bool* bad)
71{
72  static std::string input;
73  while (input.find("\r\n") == std::string::npos)
74  {
75    char buf[256];
76    int nr = stream->receiveSome(buf, 256);
77    if (nr <= 0)
78      return false;
79    input.append(buf, nr);
80  }
81
82  size_t crlf = input.find("\r\n");
83  assert(crlf != std::string::npos);
84  *bad = (crlf + 2 != 100);
85  input.erase(0, crlf + 2);
86  return true;
87}
88
89int main(int argc, char* argv[])
90{
91  if (argc < 2)
92  {
93    printf("Usage:\n  %s ip [requests] [-r]\n", argv[0]);
94    return 0;
95  }
96
97  const uint16_t port = 9981;
98  InetAddress addr;
99  if (!InetAddress::resolve(argv[1], port, &addr))
100  {
101    printf("Unable to resolve %s\n", argv[1]);
102    return 0;
103  }
104
105  printf("connecting to %s\n", addr.toIpPort().c_str());
106  TcpStreamPtr stream(TcpStream::connect(addr));
107  if (!stream)
108  {
109    perror("");
110    printf("Unable to connect %s\n", addr.toIpPort().c_str());
111    return 0;
112  }
113
114  int total = 1e9;
115  if (argc > 2)
116  {
117    total = atoi(argv[2]);
118  }
119
120  bool norecv = false;
121  int recvDelay = 0;
122  if (argc > 3)
123  {
124    if (std::string(argv[3]) == "-r")
125    {
126      printf("do not receive responses.\n");
127      norecv = true;
128    }
129    else
130    {
131      recvDelay = atoi(argv[3]);
132      printf("delay receiving by %d seconds.\n", recvDelay);
133    }
134  }
135
136  std::thread measureThr(measure, total, norecv);
137
138  printf("connected, sending %d requests\n", total);
139  std::thread sendThr(sender, stream.get(), total);
140
141  if (!norecv)
142  {
143    if (recvDelay > 0)
144      sleep(recvDelay);
145    bool bad = false;
146    while (readResponse(stream.get(), &bad))
147    {
148      if (g_resp.incrementAndGet() >= total)
149        break;
150      if (bad)
151      {
152        g_resp_bad.increment();
153      }
154    }
155    printf("all responses received: total=%d bad=%d\n", g_resp.get(), g_resp_bad.get());
156  }
157
158  sendThr.join();
159  measureThr.join();
160  printf("total requests  %d\ntotal responses %d\nbad responses %d\n",
161         g_req.get(), g_resp.get(), g_resp_bad.get());
162  getchar();
163}
164