1726c52daSShuo Chen#include "thread/Atomic.h"
2726c52daSShuo Chen#include "datetime/Timestamp.h"
3533a0924SShuo Chen#include "Acceptor.h"
4533a0924SShuo Chen#include "InetAddress.h"
5533a0924SShuo Chen#include "TcpStream.h"
6533a0924SShuo Chen
7533a0924SShuo Chen#include <thread>
8533a0924SShuo Chen#include <assert.h>
9267a4893SShuo Chen#include <unistd.h>
10533a0924SShuo Chen
117a9ed4ceSShuo Chenmuduo::AtomicInt32 g_req;
127a9ed4ceSShuo Chenmuduo::AtomicInt32 g_resp;
13267a4893SShuo Chenmuduo::AtomicInt32 g_resp_bad;
14533a0924SShuo Chen
157a9ed4ceSShuo Chenvoid measure(int total, bool norecv)
16533a0924SShuo Chen{
17533a0924SShuo Chen  muduo::Timestamp start = muduo::Timestamp::now();
187a9ed4ceSShuo Chen  int last_req = 0;
197a9ed4ceSShuo Chen  int last_resp = 0;
20533a0924SShuo Chen  while (true)
21533a0924SShuo Chen  {
22533a0924SShuo Chen    struct timespec ts = { 1, 0 };
23533a0924SShuo Chen    ::nanosleep(&ts, NULL);
24533a0924SShuo Chen    // unfortunately, those two assignments are not atomic
257a9ed4ceSShuo Chen    int req = g_req.get();
267a9ed4ceSShuo Chen    int resp = g_resp.get();
277a9ed4ceSShuo Chen
28533a0924SShuo Chen    muduo::Timestamp end = muduo::Timestamp::now();
29533a0924SShuo Chen    double elapsed = timeDifference(end, start);
30533a0924SShuo Chen    start = end;
317a9ed4ceSShuo Chen    int req_delta = req - last_req;
327a9ed4ceSShuo Chen    int resp_delta = resp - last_resp;
337a9ed4ceSShuo Chen    last_req = req;
347a9ed4ceSShuo Chen    last_resp = resp;
357a9ed4ceSShuo Chen    printf("%8.0f req/s %8.0f resp/s\n", req_delta / elapsed, resp_delta / elapsed);
367a9ed4ceSShuo Chen    if (resp >= total || (norecv && req >= total))
377a9ed4ceSShuo Chen      break;
38533a0924SShuo Chen  }
397a9ed4ceSShuo Chen  printf("measure thread finished.\n");
40533a0924SShuo Chen}
41533a0924SShuo Chen
427a9ed4ceSShuo Chenvoid sender(TcpStream* stream, int total)
43533a0924SShuo Chen{
44533a0924SShuo Chen  long id = 0;
457a9ed4ceSShuo Chen  for (int i = 0; i < total; ++i)
46533a0924SShuo Chen  {
47533a0924SShuo Chen    char buf[256];
48533a0924SShuo Chen    int n = snprintf(buf, sizeof buf,
49533a0924SShuo Chen                     "%016lx:"
50533a0924SShuo Chen                     "000000010"
51533a0924SShuo Chen                     "400000000"
52533a0924SShuo Chen                     "020000000"
53533a0924SShuo Chen                     "000050407"
54533a0924SShuo Chen                     "008000300"
55533a0924SShuo Chen                     "001090000"
56533a0924SShuo Chen                     "300400200"
57533a0924SShuo Chen                     "050100000"
58533a0924SShuo Chen                     "000806000\r\n",
59533a0924SShuo Chen                     id);
60533a0924SShuo Chen    assert(n == 100);
61533a0924SShuo Chen    int nw = stream->sendAll(buf, n);
62533a0924SShuo Chen    if (nw != n)
63533a0924SShuo Chen      break;
64533a0924SShuo Chen    ++id;
65533a0924SShuo Chen    g_req.increment();
66533a0924SShuo Chen  }
67267a4893SShuo Chen  printf("sender thread finished: %d requests\n", g_req.get());
68267a4893SShuo Chen}
69267a4893SShuo Chen
70267a4893SShuo Chenbool readResponse(TcpStream* stream, bool* bad)
71267a4893SShuo Chen{
72267a4893SShuo Chen  static std::string input;
73267a4893SShuo Chen  while (input.find("\r\n") == std::string::npos)
74267a4893SShuo Chen  {
75267a4893SShuo Chen    char buf[256];
76267a4893SShuo Chen    int nr = stream->receiveSome(buf, 256);
77267a4893SShuo Chen    if (nr <= 0)
78267a4893SShuo Chen      return false;
79267a4893SShuo Chen    input.append(buf, nr);
80267a4893SShuo Chen  }
81267a4893SShuo Chen
82267a4893SShuo Chen  size_t crlf = input.find("\r\n");
83267a4893SShuo Chen  assert(crlf != std::string::npos);
84267a4893SShuo Chen  *bad = (crlf + 2 != 100);
85267a4893SShuo Chen  input.erase(0, crlf + 2);
86267a4893SShuo Chen  return true;
87533a0924SShuo Chen}
88533a0924SShuo Chen
89533a0924SShuo Chenint main(int argc, char* argv[])
90533a0924SShuo Chen{
917a9ed4ceSShuo Chen  if (argc < 2)
92533a0924SShuo Chen  {
937a9ed4ceSShuo Chen    printf("Usage:\n  %s ip [requests] [-r]\n", argv[0]);
94533a0924SShuo Chen    return 0;
95533a0924SShuo Chen  }
96533a0924SShuo Chen
9724ca08a8SShuo Chen  const uint16_t port = 9981;
9824ca08a8SShuo Chen  InetAddress addr;
9924ca08a8SShuo Chen  if (!InetAddress::resolve(argv[1], port, &addr))
100533a0924SShuo Chen  {
101533a0924SShuo Chen    printf("Unable to resolve %s\n", argv[1]);
102533a0924SShuo Chen    return 0;
103533a0924SShuo Chen  }
104533a0924SShuo Chen
105533a0924SShuo Chen  printf("connecting to %s\n", addr.toIpPort().c_str());
106533a0924SShuo Chen  TcpStreamPtr stream(TcpStream::connect(addr));
107533a0924SShuo Chen  if (!stream)
108533a0924SShuo Chen  {
109533a0924SShuo Chen    perror("");
110533a0924SShuo Chen    printf("Unable to connect %s\n", addr.toIpPort().c_str());
111533a0924SShuo Chen    return 0;
112533a0924SShuo Chen  }
113533a0924SShuo Chen
1147a9ed4ceSShuo Chen  int total = 1e9;
1157a9ed4ceSShuo Chen  if (argc > 2)
1167a9ed4ceSShuo Chen  {
1177a9ed4ceSShuo Chen    total = atoi(argv[2]);
1187a9ed4ceSShuo Chen  }
119533a0924SShuo Chen
1207a9ed4ceSShuo Chen  bool norecv = false;
121267a4893SShuo Chen  int recvDelay = 0;
122267a4893SShuo Chen  if (argc > 3)
123533a0924SShuo Chen  {
124267a4893SShuo Chen    if (std::string(argv[3]) == "-r")
125267a4893SShuo Chen    {
126267a4893SShuo Chen      printf("do not receive responses.\n");
127267a4893SShuo Chen      norecv = true;
128267a4893SShuo Chen    }
129267a4893SShuo Chen    else
130267a4893SShuo Chen    {
131267a4893SShuo Chen      recvDelay = atoi(argv[3]);
132267a4893SShuo Chen      printf("delay receiving by %d seconds.\n", recvDelay);
133267a4893SShuo Chen    }
134533a0924SShuo Chen  }
1357a9ed4ceSShuo Chen
1367a9ed4ceSShuo Chen  std::thread measureThr(measure, total, norecv);
1377a9ed4ceSShuo Chen
1387a9ed4ceSShuo Chen  printf("connected, sending %d requests\n", total);
1397a9ed4ceSShuo Chen  std::thread sendThr(sender, stream.get(), total);
1407a9ed4ceSShuo Chen
1417a9ed4ceSShuo Chen  if (!norecv)
142533a0924SShuo Chen  {
143267a4893SShuo Chen    if (recvDelay > 0)
144267a4893SShuo Chen      sleep(recvDelay);
145267a4893SShuo Chen    bool bad = false;
146267a4893SShuo Chen    while (readResponse(stream.get(), &bad))
147533a0924SShuo Chen    {
148267a4893SShuo Chen      if (g_resp.incrementAndGet() >= total)
149533a0924SShuo Chen        break;
150267a4893SShuo Chen      if (bad)
151267a4893SShuo Chen      {
152267a4893SShuo Chen        g_resp_bad.increment();
153267a4893SShuo Chen      }
154533a0924SShuo Chen    }
155267a4893SShuo Chen    printf("all responses received: total=%d bad=%d\n", g_resp.get(), g_resp_bad.get());
156533a0924SShuo Chen  }
157533a0924SShuo Chen
158533a0924SShuo Chen  sendThr.join();
1597a9ed4ceSShuo Chen  measureThr.join();
160267a4893SShuo Chen  printf("total requests  %d\ntotal responses %d\nbad responses %d\n",
161267a4893SShuo Chen         g_req.get(), g_resp.get(), g_resp_bad.get());
1627a9ed4ceSShuo Chen  getchar();
163533a0924SShuo Chen}
164