1#include "thread/Atomic.h" 2#include "datetime/Timestamp.h" 3#include "Acceptor.h" 4#include "InetAddress.h" 5#include "TcpStream.h" 6 7#include <thread> 8#include <assert.h> 9#include <unistd.h> 10 11muduo::AtomicInt32 g_req; 12muduo::AtomicInt32 g_resp; 13muduo::AtomicInt32 g_resp_bad; 14 15void measure(int total, bool norecv) 16{ 17 muduo::Timestamp start = muduo::Timestamp::now(); 18 int last_req = 0; 19 int last_resp = 0; 20 while (true) 21 { 22 struct timespec ts = { 1, 0 }; 23 ::nanosleep(&ts, NULL); 24 // unfortunately, those two assignments are not atomic 25 int req = g_req.get(); 26 int resp = g_resp.get(); 27 28 muduo::Timestamp end = muduo::Timestamp::now(); 29 double elapsed = timeDifference(end, start); 30 start = end; 31 int req_delta = req - last_req; 32 int resp_delta = resp - last_resp; 33 last_req = req; 34 last_resp = resp; 35 printf("%8.0f req/s %8.0f resp/s\n", req_delta / elapsed, resp_delta / elapsed); 36 if (resp >= total || (norecv && req >= total)) 37 break; 38 } 39 printf("measure thread finished.\n"); 40} 41 42void sender(TcpStream* stream, int total) 43{ 44 long id = 0; 45 for (int i = 0; i < total; ++i) 46 { 47 char buf[256]; 48 int n = snprintf(buf, sizeof buf, 49 "%016lx:" 50 "000000010" 51 "400000000" 52 "020000000" 53 "000050407" 54 "008000300" 55 "001090000" 56 "300400200" 57 "050100000" 58 "000806000\r\n", 59 id); 60 assert(n == 100); 61 int nw = stream->sendAll(buf, n); 62 if (nw != n) 63 break; 64 ++id; 65 g_req.increment(); 66 } 67 printf("sender thread finished: %d requests\n", g_req.get()); 68} 69 70bool readResponse(TcpStream* stream, bool* bad) 71{ 72 static std::string input; 73 while (input.find("\r\n") == std::string::npos) 74 { 75 char buf[256]; 76 int nr = stream->receiveSome(buf, 256); 77 if (nr <= 0) 78 return false; 79 input.append(buf, nr); 80 } 81 82 size_t crlf = input.find("\r\n"); 83 assert(crlf != std::string::npos); 84 *bad = (crlf + 2 != 100); 85 input.erase(0, crlf + 2); 86 return true; 87} 88 89int main(int argc, char* argv[]) 90{ 91 if (argc < 2) 92 { 93 printf("Usage:\n %s ip [requests] [-r]\n", argv[0]); 94 return 0; 95 } 96 97 const uint16_t port = 9981; 98 InetAddress addr; 99 if (!InetAddress::resolve(argv[1], port, &addr)) 100 { 101 printf("Unable to resolve %s\n", argv[1]); 102 return 0; 103 } 104 105 printf("connecting to %s\n", addr.toIpPort().c_str()); 106 TcpStreamPtr stream(TcpStream::connect(addr)); 107 if (!stream) 108 { 109 perror(""); 110 printf("Unable to connect %s\n", addr.toIpPort().c_str()); 111 return 0; 112 } 113 114 int total = 1e9; 115 if (argc > 2) 116 { 117 total = atoi(argv[2]); 118 } 119 120 bool norecv = false; 121 int recvDelay = 0; 122 if (argc > 3) 123 { 124 if (std::string(argv[3]) == "-r") 125 { 126 printf("do not receive responses.\n"); 127 norecv = true; 128 } 129 else 130 { 131 recvDelay = atoi(argv[3]); 132 printf("delay receiving by %d seconds.\n", recvDelay); 133 } 134 } 135 136 std::thread measureThr(measure, total, norecv); 137 138 printf("connected, sending %d requests\n", total); 139 std::thread sendThr(sender, stream.get(), total); 140 141 if (!norecv) 142 { 143 if (recvDelay > 0) 144 sleep(recvDelay); 145 bool bad = false; 146 while (readResponse(stream.get(), &bad)) 147 { 148 if (g_resp.incrementAndGet() >= total) 149 break; 150 if (bad) 151 { 152 g_resp_bad.increment(); 153 } 154 } 155 printf("all responses received: total=%d bad=%d\n", g_resp.get(), g_resp_bad.get()); 156 } 157 158 sendThr.join(); 159 measureThr.join(); 160 printf("total requests %d\ntotal responses %d\nbad responses %d\n", 161 g_req.get(), g_resp.get(), g_resp_bad.get()); 162 getchar(); 163} 164