1#include "thread/Atomic.h"
2#include "datetime/Timestamp.h"
3#include "Acceptor.h"
4#include "InetAddress.h"
5#include "TcpStream.h"
6
7#include <string.h>
8
9#include <thread>
10
11muduo::AtomicInt64 g_bytes;
12
13std::string getMessage()
14{
15  std::string line;
16  for (int i = 33; i < 127; ++i)
17  {
18    line.push_back(char(i));
19  }
20  line += line;
21
22  std::string message;
23  for (size_t i = 0; i < 127-33; ++i)
24  {
25    message += line.substr(i, 72) + '\n';
26  }
27  return message;
28}
29
30void measure()
31{
32  muduo::Timestamp start = muduo::Timestamp::now();
33  while (true)
34  {
35    struct timespec ts = { 1, 0 };
36    ::nanosleep(&ts, NULL);
37    // unfortunately, those two assignments are not atomic
38    int64_t bytes = g_bytes.getAndSet(0);
39    muduo::Timestamp end = muduo::Timestamp::now();
40    double elapsed = timeDifference(end, start);
41    start = end;
42    if (bytes)
43    {
44      printf("%.3f MiB/s\n", bytes / (1024.0 * 1024) / elapsed);
45    }
46  }
47}
48
49void chargen(TcpStreamPtr stream)
50{
51  std::string message = getMessage();
52  while (true)
53  {
54    int nw = stream->sendAll(message.data(), message.size());
55    g_bytes.add(nw);
56    if (nw < static_cast<int>(message.size()))
57    {
58      break;
59    }
60  }
61}
62
63// a thread-per-connection current chargen server and client
64int main(int argc, char* argv[])
65{
66  if (argc < 3)
67  {
68    printf("Usage:\n  %s hostname port\n  %s -l port\n", argv[0], argv[0]);
69    std::string message = getMessage();
70    printf("message size = %zd\n", message.size());
71    return 0;
72  }
73
74  std::thread(measure).detach();
75
76  int port = atoi(argv[2]);
77  if (strcmp(argv[1], "-l") == 0)
78  {
79    InetAddress listenAddr(port);
80    Acceptor acceptor(listenAddr);
81    printf("Accepting... Ctrl-C to exit\n");
82    int count = 0;
83    while (true)
84    {
85      TcpStreamPtr tcpStream = acceptor.accept();
86      printf("accepted no. %d client\n", ++count);
87
88      std::thread thr(chargen, std::move(tcpStream));
89      thr.detach();
90    }
91  }
92  else
93  {
94    InetAddress addr;
95    const char* hostname = argv[1];
96    if (InetAddress::resolve(hostname, port, &addr))
97    {
98      TcpStreamPtr stream(TcpStream::connect(addr));
99      if (stream)
100      {
101        chargen(std::move(stream));
102      }
103      else
104      {
105        printf("Unable to connect %s\n", addr.toIpPort().c_str());
106        perror("");
107      }
108    }
109    else
110    {
111      printf("Unable to resolve %s\n", hostname);
112    }
113  }
114}
115