1726c52daSShuo Chen#include "thread/Atomic.h" 2726c52daSShuo Chen#include "datetime/Timestamp.h" 3533a0924SShuo Chen#include "Acceptor.h" 4533a0924SShuo Chen#include "InetAddress.h" 5533a0924SShuo Chen#include "TcpStream.h" 6533a0924SShuo Chen 7533a0924SShuo Chen#include <thread> 8533a0924SShuo Chen#include <assert.h> 9267a4893SShuo Chen#include <unistd.h> 10533a0924SShuo Chen 117a9ed4ceSShuo Chenmuduo::AtomicInt32 g_req; 127a9ed4ceSShuo Chenmuduo::AtomicInt32 g_resp; 13267a4893SShuo Chenmuduo::AtomicInt32 g_resp_bad; 14533a0924SShuo Chen 157a9ed4ceSShuo Chenvoid measure(int total, bool norecv) 16533a0924SShuo Chen{ 17533a0924SShuo Chen muduo::Timestamp start = muduo::Timestamp::now(); 187a9ed4ceSShuo Chen int last_req = 0; 197a9ed4ceSShuo Chen int last_resp = 0; 20533a0924SShuo Chen while (true) 21533a0924SShuo Chen { 22533a0924SShuo Chen struct timespec ts = { 1, 0 }; 23533a0924SShuo Chen ::nanosleep(&ts, NULL); 24533a0924SShuo Chen // unfortunately, those two assignments are not atomic 257a9ed4ceSShuo Chen int req = g_req.get(); 267a9ed4ceSShuo Chen int resp = g_resp.get(); 277a9ed4ceSShuo Chen 28533a0924SShuo Chen muduo::Timestamp end = muduo::Timestamp::now(); 29533a0924SShuo Chen double elapsed = timeDifference(end, start); 30533a0924SShuo Chen start = end; 317a9ed4ceSShuo Chen int req_delta = req - last_req; 327a9ed4ceSShuo Chen int resp_delta = resp - last_resp; 337a9ed4ceSShuo Chen last_req = req; 347a9ed4ceSShuo Chen last_resp = resp; 357a9ed4ceSShuo Chen printf("%8.0f req/s %8.0f resp/s\n", req_delta / elapsed, resp_delta / elapsed); 367a9ed4ceSShuo Chen if (resp >= total || (norecv && req >= total)) 377a9ed4ceSShuo Chen break; 38533a0924SShuo Chen } 397a9ed4ceSShuo Chen printf("measure thread finished.\n"); 40533a0924SShuo Chen} 41533a0924SShuo Chen 427a9ed4ceSShuo Chenvoid sender(TcpStream* stream, int total) 43533a0924SShuo Chen{ 44533a0924SShuo Chen long id = 0; 457a9ed4ceSShuo Chen for (int i = 0; i < total; ++i) 46533a0924SShuo Chen { 47533a0924SShuo Chen char buf[256]; 48533a0924SShuo Chen int n = snprintf(buf, sizeof buf, 49533a0924SShuo Chen "%016lx:" 50533a0924SShuo Chen "000000010" 51533a0924SShuo Chen "400000000" 52533a0924SShuo Chen "020000000" 53533a0924SShuo Chen "000050407" 54533a0924SShuo Chen "008000300" 55533a0924SShuo Chen "001090000" 56533a0924SShuo Chen "300400200" 57533a0924SShuo Chen "050100000" 58533a0924SShuo Chen "000806000\r\n", 59533a0924SShuo Chen id); 60533a0924SShuo Chen assert(n == 100); 61533a0924SShuo Chen int nw = stream->sendAll(buf, n); 62533a0924SShuo Chen if (nw != n) 63533a0924SShuo Chen break; 64533a0924SShuo Chen ++id; 65533a0924SShuo Chen g_req.increment(); 66533a0924SShuo Chen } 67267a4893SShuo Chen printf("sender thread finished: %d requests\n", g_req.get()); 68267a4893SShuo Chen} 69267a4893SShuo Chen 70267a4893SShuo Chenbool readResponse(TcpStream* stream, bool* bad) 71267a4893SShuo Chen{ 72267a4893SShuo Chen static std::string input; 73267a4893SShuo Chen while (input.find("\r\n") == std::string::npos) 74267a4893SShuo Chen { 75267a4893SShuo Chen char buf[256]; 76267a4893SShuo Chen int nr = stream->receiveSome(buf, 256); 77267a4893SShuo Chen if (nr <= 0) 78267a4893SShuo Chen return false; 79267a4893SShuo Chen input.append(buf, nr); 80267a4893SShuo Chen } 81267a4893SShuo Chen 82267a4893SShuo Chen size_t crlf = input.find("\r\n"); 83267a4893SShuo Chen assert(crlf != std::string::npos); 84267a4893SShuo Chen *bad = (crlf + 2 != 100); 85267a4893SShuo Chen input.erase(0, crlf + 2); 86267a4893SShuo Chen return true; 87533a0924SShuo Chen} 88533a0924SShuo Chen 89533a0924SShuo Chenint main(int argc, char* argv[]) 90533a0924SShuo Chen{ 917a9ed4ceSShuo Chen if (argc < 2) 92533a0924SShuo Chen { 937a9ed4ceSShuo Chen printf("Usage:\n %s ip [requests] [-r]\n", argv[0]); 94533a0924SShuo Chen return 0; 95533a0924SShuo Chen } 96533a0924SShuo Chen 9724ca08a8SShuo Chen const uint16_t port = 9981; 9824ca08a8SShuo Chen InetAddress addr; 9924ca08a8SShuo Chen if (!InetAddress::resolve(argv[1], port, &addr)) 100533a0924SShuo Chen { 101533a0924SShuo Chen printf("Unable to resolve %s\n", argv[1]); 102533a0924SShuo Chen return 0; 103533a0924SShuo Chen } 104533a0924SShuo Chen 105533a0924SShuo Chen printf("connecting to %s\n", addr.toIpPort().c_str()); 106533a0924SShuo Chen TcpStreamPtr stream(TcpStream::connect(addr)); 107533a0924SShuo Chen if (!stream) 108533a0924SShuo Chen { 109533a0924SShuo Chen perror(""); 110533a0924SShuo Chen printf("Unable to connect %s\n", addr.toIpPort().c_str()); 111533a0924SShuo Chen return 0; 112533a0924SShuo Chen } 113533a0924SShuo Chen 1147a9ed4ceSShuo Chen int total = 1e9; 1157a9ed4ceSShuo Chen if (argc > 2) 1167a9ed4ceSShuo Chen { 1177a9ed4ceSShuo Chen total = atoi(argv[2]); 1187a9ed4ceSShuo Chen } 119533a0924SShuo Chen 1207a9ed4ceSShuo Chen bool norecv = false; 121267a4893SShuo Chen int recvDelay = 0; 122267a4893SShuo Chen if (argc > 3) 123533a0924SShuo Chen { 124267a4893SShuo Chen if (std::string(argv[3]) == "-r") 125267a4893SShuo Chen { 126267a4893SShuo Chen printf("do not receive responses.\n"); 127267a4893SShuo Chen norecv = true; 128267a4893SShuo Chen } 129267a4893SShuo Chen else 130267a4893SShuo Chen { 131267a4893SShuo Chen recvDelay = atoi(argv[3]); 132267a4893SShuo Chen printf("delay receiving by %d seconds.\n", recvDelay); 133267a4893SShuo Chen } 134533a0924SShuo Chen } 1357a9ed4ceSShuo Chen 1367a9ed4ceSShuo Chen std::thread measureThr(measure, total, norecv); 1377a9ed4ceSShuo Chen 1387a9ed4ceSShuo Chen printf("connected, sending %d requests\n", total); 1397a9ed4ceSShuo Chen std::thread sendThr(sender, stream.get(), total); 1407a9ed4ceSShuo Chen 1417a9ed4ceSShuo Chen if (!norecv) 142533a0924SShuo Chen { 143267a4893SShuo Chen if (recvDelay > 0) 144267a4893SShuo Chen sleep(recvDelay); 145267a4893SShuo Chen bool bad = false; 146267a4893SShuo Chen while (readResponse(stream.get(), &bad)) 147533a0924SShuo Chen { 148267a4893SShuo Chen if (g_resp.incrementAndGet() >= total) 149533a0924SShuo Chen break; 150267a4893SShuo Chen if (bad) 151267a4893SShuo Chen { 152267a4893SShuo Chen g_resp_bad.increment(); 153267a4893SShuo Chen } 154533a0924SShuo Chen } 155267a4893SShuo Chen printf("all responses received: total=%d bad=%d\n", g_resp.get(), g_resp_bad.get()); 156533a0924SShuo Chen } 157533a0924SShuo Chen 158533a0924SShuo Chen sendThr.join(); 1597a9ed4ceSShuo Chen measureThr.join(); 160267a4893SShuo Chen printf("total requests %d\ntotal responses %d\nbad responses %d\n", 161267a4893SShuo Chen g_req.get(), g_resp.get(), g_resp_bad.get()); 1627a9ed4ceSShuo Chen getchar(); 163533a0924SShuo Chen} 164