114ca1249SShuo Chen#include "datetime/Timestamp.h"
214ca1249SShuo Chen#include "Acceptor.h"
314ca1249SShuo Chen#include "InetAddress.h"
414ca1249SShuo Chen#include "TcpStream.h"
514ca1249SShuo Chen
614ca1249SShuo Chen#ifdef __linux
714ca1249SShuo Chen#include <linux/tcp.h>
814ca1249SShuo Chen#else
914ca1249SShuo Chen#include <netinet/tcp.h>
1014ca1249SShuo Chen#endif
1114ca1249SShuo Chen#include <stdio.h>
1214ca1249SShuo Chen#include <stdlib.h>
1314ca1249SShuo Chen#include <unistd.h>
1414ca1249SShuo Chen
1514ca1249SShuo Chenusing muduo::Timestamp;
1614ca1249SShuo Chen
1714ca1249SShuo Chenclass BandwidthReporter
1814ca1249SShuo Chen{
1914ca1249SShuo Chen public:
2014ca1249SShuo Chen  BandwidthReporter(int fd, bool sender)
2114ca1249SShuo Chen      : fd_(fd), sender_(sender)
2214ca1249SShuo Chen  {
2314ca1249SShuo Chen  }
2414ca1249SShuo Chen
2514ca1249SShuo Chen  void reportDelta(double now, int64_t total_bytes)
2614ca1249SShuo Chen  {
277431d6ceSShuo Chen    report(now, total_bytes - last_bytes_, now - last_time_);
2814ca1249SShuo Chen    last_time_ = now;
2914ca1249SShuo Chen    last_bytes_ = total_bytes;
3014ca1249SShuo Chen  }
3114ca1249SShuo Chen
327431d6ceSShuo Chen  void reportAll(double now, int64_t total_bytes, int64_t syscalls)
3314ca1249SShuo Chen  {
347431d6ceSShuo Chen    printf("Transferred %.3fMB %.3fMiB in %.3fs, %lld syscalls, %.1f Bytes/syscall\n",
357431d6ceSShuo Chen           total_bytes / 1e6, total_bytes / (1024.0 * 1024), now, (long long)syscalls,
367431d6ceSShuo Chen           total_bytes * 1.0 / syscalls);
377431d6ceSShuo Chen    report(now, total_bytes, now);
3814ca1249SShuo Chen  }
3914ca1249SShuo Chen
4014ca1249SShuo Chen private:
417431d6ceSShuo Chen  void report(double now, int64_t bytes, double elapsed)
4214ca1249SShuo Chen  {
437431d6ceSShuo Chen    double mbps = elapsed > 0 ? bytes / 1e6 / elapsed : 0.0;
447431d6ceSShuo Chen    printf("%6.3f  %6.2fMB/s  %6.1fMbits/s ", now, mbps, mbps*8);
4514ca1249SShuo Chen    if (sender_)
4614ca1249SShuo Chen      printSender();
4714ca1249SShuo Chen    else
4814ca1249SShuo Chen      printReceiver();
4914ca1249SShuo Chen  }
5014ca1249SShuo Chen
51c7fe36baSShuo Chen  void printSender()
5214ca1249SShuo Chen  {
5314ca1249SShuo Chen    int sndbuf = 0;
5414ca1249SShuo Chen    socklen_t optlen = sizeof sndbuf;
5514ca1249SShuo Chen    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &sndbuf, &optlen) < 0)
5614ca1249SShuo Chen      perror("getsockopt(SNDBUF)");
5714ca1249SShuo Chen
5814ca1249SShuo Chen    struct tcp_info tcpi = {0};
5914ca1249SShuo Chen    socklen_t len = sizeof(tcpi);
6014ca1249SShuo Chen    if (getsockopt(fd_, IPPROTO_TCP, TCP_INFO, &tcpi, &len) < 0)
6114ca1249SShuo Chen      perror("getsockopt(TCP_INFO)");
6214ca1249SShuo Chen
6314ca1249SShuo Chen    // bytes_in_flight = tcpi.tcpi_bytes_sent - tcpi.tcpi_bytes_acked;
6414ca1249SShuo Chen    // tcpi.tcpi_notsent_bytes;
65c7fe36baSShuo Chen    int snd_cwnd = tcpi.tcpi_snd_cwnd;
66c7fe36baSShuo Chen    int ssthresh = tcpi.tcpi_snd_ssthresh;
67c7fe36baSShuo Chen#ifdef __linux
68c7fe36baSShuo Chen    snd_cwnd *= tcpi.tcpi_snd_mss;  // Linux's cwnd is # of mss.
69c7fe36baSShuo Chen    if (ssthresh < INT32_MAX)
70c7fe36baSShuo Chen      ssthresh *= tcpi.tcpi_snd_mss;
71c7fe36baSShuo Chen#endif
7214ca1249SShuo Chen
73c7fe36baSShuo Chen#ifdef __linux
74c7fe36baSShuo Chen    int retrans = tcpi.tcpi_total_retrans;
75c7fe36baSShuo Chen#elif __FreeBSD__
76c7fe36baSShuo Chen    int retrans = tcpi.tcpi_snd_rexmitpack;
77c7fe36baSShuo Chen#endif
78c7fe36baSShuo Chen
79c7fe36baSShuo Chen    printf(" sndbuf=%.1fK snd_cwnd=%.1fK ssthresh=%.1fK snd_wnd=%.1fK rtt=%d/%d",
80c7fe36baSShuo Chen           sndbuf / 1024.0, snd_cwnd / 1024.0, ssthresh / 1024.0,
81c7fe36baSShuo Chen           tcpi.tcpi_snd_wnd / 1024.0, tcpi.tcpi_rtt, tcpi.tcpi_rttvar);
82c7fe36baSShuo Chen    if (retrans - last_retrans_ > 0) {
83c7fe36baSShuo Chen      printf(" retrans=%d", retrans - last_retrans_);
84c7fe36baSShuo Chen    }
85c7fe36baSShuo Chen    printf("\n");
86c7fe36baSShuo Chen    last_retrans_ = retrans;
8714ca1249SShuo Chen  }
8814ca1249SShuo Chen
8914ca1249SShuo Chen  void printReceiver() const
9014ca1249SShuo Chen  {
9114ca1249SShuo Chen    int rcvbuf = 0;
9214ca1249SShuo Chen    socklen_t optlen = sizeof rcvbuf;
9314ca1249SShuo Chen    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &optlen) < 0)
9414ca1249SShuo Chen      perror("getsockopt(RCVBUF)");
9514ca1249SShuo Chen
9614ca1249SShuo Chen    printf(" rcvbuf=%.1fK\n", rcvbuf / 1024.0);
9714ca1249SShuo Chen  }
9814ca1249SShuo Chen
9914ca1249SShuo Chen  const int fd_ = 0;
10014ca1249SShuo Chen  const bool sender_ = false;
10114ca1249SShuo Chen  double last_time_ = 0;
10214ca1249SShuo Chen  int64_t last_bytes_ = 0;
103c7fe36baSShuo Chen  int last_retrans_ = 0;
10414ca1249SShuo Chen};
10514ca1249SShuo Chen
10614ca1249SShuo Chenvoid runClient(const InetAddress& serverAddr, int64_t bytes_limit, double duration)
10714ca1249SShuo Chen{
10814ca1249SShuo Chen  TcpStreamPtr stream(TcpStream::connect(serverAddr));
10914ca1249SShuo Chen  if (!stream) {
11014ca1249SShuo Chen    printf("Unable to connect %s\n", serverAddr.toIpPort().c_str());
11114ca1249SShuo Chen    perror("");
11214ca1249SShuo Chen    return;
11314ca1249SShuo Chen  }
1147431d6ceSShuo Chen  char cong[64] = "";
1157431d6ceSShuo Chen  socklen_t optlen = sizeof cong;
1167431d6ceSShuo Chen  if (::getsockopt(stream->fd(), IPPROTO_TCP, TCP_CONGESTION, cong, &optlen) < 0)
1177431d6ceSShuo Chen      perror("getsockopt(TCP_CONGESTION)");
1187431d6ceSShuo Chen  printf("Connected %s -> %s, congestion control: %s\n",
1197431d6ceSShuo Chen         stream->getLocalAddr().toIpPort().c_str(),
1207431d6ceSShuo Chen         stream->getPeerAddr().toIpPort().c_str(), cong);
12114ca1249SShuo Chen
12214ca1249SShuo Chen  const Timestamp start = Timestamp::now();
12314ca1249SShuo Chen  const int block_size = 64 * 1024;
12414ca1249SShuo Chen  std::string message(block_size, 'S');
12514ca1249SShuo Chen  int seconds = 1;
12614ca1249SShuo Chen  int64_t total_bytes = 0;
1277431d6ceSShuo Chen  int64_t syscalls = 0;
12814ca1249SShuo Chen  double elapsed = 0;
12914ca1249SShuo Chen  BandwidthReporter rpt(stream->fd(), true);
1307431d6ceSShuo Chen  rpt.reportDelta(0, 0);
13114ca1249SShuo Chen
13214ca1249SShuo Chen  while (total_bytes < bytes_limit) {
1337431d6ceSShuo Chen    int bytes = std::min<int64_t>(message.size(), bytes_limit - total_bytes);
1347431d6ceSShuo Chen    int nw = stream->sendSome(message.data(), bytes);
13514ca1249SShuo Chen    if (nw <= 0)
13614ca1249SShuo Chen      break;
13714ca1249SShuo Chen    total_bytes += nw;
1387431d6ceSShuo Chen    syscalls++;
13914ca1249SShuo Chen    elapsed = timeDifference(Timestamp::now(), start);
14014ca1249SShuo Chen
14114ca1249SShuo Chen    if (elapsed >= duration)
14214ca1249SShuo Chen      break;
14314ca1249SShuo Chen
14414ca1249SShuo Chen    if (elapsed >= seconds) {
14514ca1249SShuo Chen      rpt.reportDelta(elapsed, total_bytes);
14614ca1249SShuo Chen      while (elapsed >= seconds)
14714ca1249SShuo Chen        ++seconds;
14814ca1249SShuo Chen    }
14914ca1249SShuo Chen  }
15014ca1249SShuo Chen
15114ca1249SShuo Chen  stream->shutdownWrite();
15214ca1249SShuo Chen  Timestamp shutdown = Timestamp::now();
15314ca1249SShuo Chen  elapsed = timeDifference(shutdown, start);
15414ca1249SShuo Chen  rpt.reportDelta(elapsed, total_bytes);
15514ca1249SShuo Chen
15614ca1249SShuo Chen  char buf[1024];
15714ca1249SShuo Chen  int nr = stream->receiveSome(buf, sizeof buf);
15814ca1249SShuo Chen  if (nr != 0)
15914ca1249SShuo Chen    printf("nr = %d\n", nr);
16014ca1249SShuo Chen  Timestamp end = Timestamp::now();
16114ca1249SShuo Chen  elapsed = timeDifference(end, start);
1627431d6ceSShuo Chen  rpt.reportAll(elapsed, total_bytes, syscalls);
16314ca1249SShuo Chen}
16414ca1249SShuo Chen
16514ca1249SShuo Chenvoid runServer(int port)
16614ca1249SShuo Chen{
16714ca1249SShuo Chen  InetAddress listenAddr(port);
16814ca1249SShuo Chen  Acceptor acceptor(listenAddr);
16914ca1249SShuo Chen  int count = 0;
17014ca1249SShuo Chen  while (true) {
17114ca1249SShuo Chen    printf("Accepting on port %d ... Ctrl-C to exit\n", port);
17214ca1249SShuo Chen    TcpStreamPtr stream = acceptor.accept();
173c7fe36baSShuo Chen    ++count;
1747431d6ceSShuo Chen    printf("accepted no. %d client %s <- %s\n", count,
1757431d6ceSShuo Chen           stream->getLocalAddr().toIpPort().c_str(),
176c7fe36baSShuo Chen           stream->getPeerAddr().toIpPort().c_str());
17714ca1249SShuo Chen
17814ca1249SShuo Chen    const Timestamp start = Timestamp::now();
17914ca1249SShuo Chen    int seconds = 1;
18014ca1249SShuo Chen    int64_t bytes = 0;
1817431d6ceSShuo Chen    int64_t syscalls = 0;
18214ca1249SShuo Chen    double elapsed = 0;
18314ca1249SShuo Chen    BandwidthReporter rpt(stream->fd(), false);
1847431d6ceSShuo Chen    rpt.reportDelta(elapsed, bytes);
18514ca1249SShuo Chen
18614ca1249SShuo Chen    char buf[65536];
18714ca1249SShuo Chen    while (true) {
18814ca1249SShuo Chen      int nr = stream->receiveSome(buf, sizeof buf);
18914ca1249SShuo Chen      if (nr <= 0)
19014ca1249SShuo Chen        break;
19114ca1249SShuo Chen      bytes += nr;
1927431d6ceSShuo Chen      syscalls++;
19314ca1249SShuo Chen
19414ca1249SShuo Chen      elapsed = timeDifference(Timestamp::now(), start);
19514ca1249SShuo Chen      if (elapsed >= seconds) {
19614ca1249SShuo Chen        rpt.reportDelta(elapsed, bytes);
19714ca1249SShuo Chen        while (elapsed >= seconds)
19814ca1249SShuo Chen          ++seconds;
19914ca1249SShuo Chen      }
20014ca1249SShuo Chen    }
20114ca1249SShuo Chen    elapsed = timeDifference(Timestamp::now(), start);
2027431d6ceSShuo Chen    rpt.reportAll(elapsed, bytes, syscalls);
20314ca1249SShuo Chen    printf("Client no. %d done\n", count);
20414ca1249SShuo Chen  }
20514ca1249SShuo Chen}
20614ca1249SShuo Chen
2077431d6ceSShuo Chenint64_t parseBytes(const char* arg)
2087431d6ceSShuo Chen{
2097431d6ceSShuo Chen  char* end = NULL;
2107431d6ceSShuo Chen  int64_t bytes = strtoll(arg, &end, 10);
2117431d6ceSShuo Chen  switch (*end) {
2127431d6ceSShuo Chen    case '\0':
2137431d6ceSShuo Chen      return bytes;
2147431d6ceSShuo Chen    case 'k':
2157431d6ceSShuo Chen      return bytes * 1000;
2167431d6ceSShuo Chen    case 'K':
2177431d6ceSShuo Chen      return bytes * 1024;
2187431d6ceSShuo Chen    case 'm':
2197431d6ceSShuo Chen      return bytes * 1000 * 1000;
2207431d6ceSShuo Chen    case 'M':
2217431d6ceSShuo Chen      return bytes * 1024 * 1024;
2227431d6ceSShuo Chen    case 'g':
2237431d6ceSShuo Chen      return bytes * 1000 * 1000 * 1000;
2247431d6ceSShuo Chen    case 'G':
2257431d6ceSShuo Chen      return bytes * 1024 * 1024 * 1024;
2267431d6ceSShuo Chen    default:
2277431d6ceSShuo Chen      return 0;
2287431d6ceSShuo Chen  }
2297431d6ceSShuo Chen}
2307431d6ceSShuo Chen
23114ca1249SShuo Chenint main(int argc, char* argv[])
23214ca1249SShuo Chen{
23314ca1249SShuo Chen  int opt;
23414ca1249SShuo Chen  bool client = false, server = false;
2357431d6ceSShuo Chen  std::string serverAddr;
2367431d6ceSShuo Chen  int port = 2009;
23714ca1249SShuo Chen  const int64_t kGigaBytes = 1024 * 1024 * 1024;
23814ca1249SShuo Chen  int64_t bytes_limit = 10 * kGigaBytes;
23914ca1249SShuo Chen  double duration = 10;
24014ca1249SShuo Chen
2417431d6ceSShuo Chen  while ((opt = getopt(argc, argv, "sc:t:b:p:")) != -1) {
24214ca1249SShuo Chen    switch (opt) {
24314ca1249SShuo Chen      case 's':
24414ca1249SShuo Chen        server = true;
24514ca1249SShuo Chen        break;
24614ca1249SShuo Chen      case 'c':
24714ca1249SShuo Chen        client = true;
2487431d6ceSShuo Chen        serverAddr = optarg;
24914ca1249SShuo Chen        break;
250c7fe36baSShuo Chen      case 't':
251c7fe36baSShuo Chen        duration = strtod(optarg, NULL);
252c7fe36baSShuo Chen        break;
2537431d6ceSShuo Chen      case 'b':
2547431d6ceSShuo Chen        bytes_limit = parseBytes(optarg);
2557431d6ceSShuo Chen        break;
2567431d6ceSShuo Chen      case 'p':
2577431d6ceSShuo Chen        port = strtol(optarg, NULL, 10);
2587431d6ceSShuo Chen        break;
25914ca1249SShuo Chen      default:
26014ca1249SShuo Chen        fprintf(stderr, "Usage: %s FIXME\n", argv[0]);
26114ca1249SShuo Chen        break;
26214ca1249SShuo Chen    }
26314ca1249SShuo Chen  }
26414ca1249SShuo Chen
26514ca1249SShuo Chen  if (client)
2667431d6ceSShuo Chen    runClient(InetAddress(serverAddr, port), bytes_limit, duration);
26714ca1249SShuo Chen  else if (server)
26814ca1249SShuo Chen    runServer(port);
26914ca1249SShuo Chen}
270