perf_client.c revision 8ecb980d
1/* Copyright (c) 2017 - 2021 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * perf_client.c -- Implements the "perf" client, see 4 * https://tools.ietf.org/html/draft-banks-quic-performance-00 5 */ 6 7#include <assert.h> 8#include <errno.h> 9#include <inttypes.h> 10#include <stdbool.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <inttypes.h> 14#include <string.h> 15#include <sys/queue.h> 16#include <sys/types.h> 17#include <sys/stat.h> 18 19#ifndef WIN32 20#include <unistd.h> 21#include <fcntl.h> 22#else 23#include "vc_compat.h" 24#include "getopt.h" 25#endif 26 27#include <event2/event.h> 28 29#include "lsquic.h" 30#include "test_common.h" 31#include "prog.h" 32 33#include "../src/liblsquic/lsquic_logger.h" 34#include "../src/liblsquic/lsquic_int_types.h" 35#include "../src/liblsquic/lsquic_byteswap.h" 36 37struct scenario 38{ 39 STAILQ_ENTRY(scenario) next; 40 uint64_t bytes_to_request; 41 uint64_t bytes_to_send; /* After the 8-byte header */ 42}; 43 44/* Assume all connections use the same list of scenarios, so store it in 45 * a global variable. 46 */ 47static STAILQ_HEAD(, scenario) s_scenarios 48 = STAILQ_HEAD_INITIALIZER(s_scenarios); 49static unsigned s_n_scenarios; 50static unsigned s_n_conns; 51 52struct prog s_prog; 53 54struct lsquic_conn_ctx 55{ 56 /* Once a connection runs out of scenarios, no new streams are created 57 * and the connection is closed when all streams are closed. 58 */ 59 const struct scenario *next_scenario; 60 unsigned n_scenarios_left; 61 unsigned n_streams; 62}; 63 64 65static bool 66perf_create_streams (struct lsquic_conn *conn, struct lsquic_conn_ctx *conn_ctx) 67{ 68 if (conn_ctx->n_scenarios_left) 69 { 70 --conn_ctx->n_scenarios_left; 71 lsquic_conn_make_stream(conn); 72 return true; 73 } 74 else 75 return false; 76} 77 78 79static lsquic_conn_ctx_t * 80perf_client_on_new_conn (void *stream_if_ctx, struct lsquic_conn *conn) 81{ 82 struct lsquic_conn_ctx *conn_ctx; 83 84 if (s_n_scenarios) 85 { 86 conn_ctx = calloc(1, sizeof(*conn_ctx)); 87 conn_ctx->next_scenario = STAILQ_FIRST(&s_scenarios); 88 conn_ctx->n_scenarios_left = s_n_scenarios; 89 perf_create_streams(conn, conn_ctx); 90 ++s_n_conns; 91 return conn_ctx; 92 } 93 else 94 { 95 lsquic_conn_close(conn); 96 return NULL; 97 } 98} 99 100 101static void 102perf_client_on_conn_closed (struct lsquic_conn *conn) 103{ 104 struct lsquic_conn_ctx *conn_ctx; 105 106 LSQ_NOTICE("Connection closed"); 107 conn_ctx = lsquic_conn_get_ctx(conn); 108 free(conn_ctx); 109 --s_n_conns; 110 if (0 == s_n_conns) 111 prog_stop(&s_prog); 112} 113 114 115struct lsquic_stream_ctx 116{ 117 const struct scenario *scenario; 118 struct { 119 uint64_t header; /* Big-endian */ 120 unsigned n_h; /* Number of header bytes written */ 121 uint64_t n_written; /* Number of non-header bytes written */ 122 } write_state; 123 struct { 124 uint64_t n_read; 125 } read_state; 126}; 127 128 129static struct lsquic_stream_ctx * 130perf_client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) 131{ 132 struct lsquic_conn_ctx *conn_ctx; 133 struct lsquic_conn *conn; 134 135 conn = lsquic_stream_conn(stream); 136 conn_ctx = lsquic_conn_get_ctx(conn); 137 138 if (!stream) 139 { 140 LSQ_NOTICE("%s: got null stream: no more streams possible", __func__); 141 lsquic_conn_close(conn); 142 return NULL; 143 } 144 145 assert(conn_ctx->next_scenario); 146 147 struct lsquic_stream_ctx *stream_ctx = calloc(1, sizeof(*stream_ctx)); 148 stream_ctx->scenario = conn_ctx->next_scenario; 149 conn_ctx->next_scenario = STAILQ_NEXT(conn_ctx->next_scenario, next); 150#if __BYTE_ORDER == __LITTLE_ENDIAN 151 stream_ctx->write_state.header 152 = bswap_64(stream_ctx->scenario->bytes_to_request); 153#else 154 stream_ctx->write_state.header = stream_ctx->scenario->bytes_to_request; 155#endif 156 lsquic_stream_wantwrite(stream, 1); 157 return stream_ctx; 158} 159 160 161static size_t 162buffer_size (void *lsqr_ctx) 163{ 164 struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx; 165 return stream_ctx->scenario->bytes_to_send 166 - stream_ctx->write_state.n_written; 167} 168 169 170static size_t 171buffer_read (void *lsqr_ctx, void *buf, size_t count) 172{ 173 struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx; 174 size_t left; 175 176 left = buffer_size(stream_ctx); 177 if (count > left) 178 count = left; 179 memset(buf, 0, count); 180 stream_ctx->write_state.n_written += count; 181 return count; 182} 183 184 185static size_t 186header_size (void *lsqr_ctx) 187{ 188 struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx; 189 return sizeof(uint64_t) - stream_ctx->write_state.n_h; 190} 191 192 193static size_t 194header_read (void *lsqr_ctx, void *buf, size_t count) 195{ 196 struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx; 197 const unsigned char *src; 198 size_t left; 199 200 left = header_size(stream_ctx); 201 if (count < left) 202 count = left; 203 src = (unsigned char *) &stream_ctx->write_state.header 204 + sizeof(uint64_t) - left; 205 memcpy(buf, src, count); 206 stream_ctx->write_state.n_h += count; 207 return count; 208} 209 210 211static void 212perf_client_on_write (struct lsquic_stream *stream, 213 struct lsquic_stream_ctx *stream_ctx) 214{ 215 struct lsquic_reader reader; 216 ssize_t nw; 217 218 if (stream_ctx->write_state.n_h >= sizeof(uint64_t)) 219 reader = (struct lsquic_reader) { 220 buffer_read, 221 buffer_size, 222 stream_ctx, 223 }; 224 else 225 reader = (struct lsquic_reader) { 226 header_read, 227 header_size, 228 stream_ctx, 229 }; 230 231 nw = lsquic_stream_writef(stream, &reader); 232 if (nw >= 0) 233 LSQ_DEBUG("%s: wrote %zd bytes", __func__, nw); 234 else 235 LSQ_WARN("%s: cannot write to stream: %s", __func__, strerror(errno)); 236 237 if (reader.lsqr_size(stream_ctx) == 0 238 && (reader.lsqr_size == buffer_size || buffer_size(stream_ctx) == 0)) 239 { 240 lsquic_stream_shutdown(stream, 1); 241 lsquic_stream_wantread(stream, 1); 242 } 243} 244 245 246static size_t 247perf_read_and_discard (void *user_data, const unsigned char *buf, 248 size_t count, int fin) 249{ 250 return count; 251} 252 253 254static void 255perf_client_on_read (struct lsquic_stream *stream, 256 struct lsquic_stream_ctx *stream_ctx) 257{ 258 ssize_t nr; 259 260 nr = lsquic_stream_readf(stream, perf_read_and_discard, NULL); 261 if (nr >= 0) 262 { 263 stream_ctx->read_state.n_read += nr; 264 if (nr == 0) 265 { 266 LSQ_DEBUG("reached fin after reading %"PRIu64" bytes from server", 267 stream_ctx->read_state.n_read); 268 lsquic_stream_shutdown(stream, 0); 269 } 270 } 271 else 272 { 273 LSQ_WARN("error reading from stream: %s, abort connection", 274 strerror(errno)); 275 lsquic_stream_close(stream); 276 lsquic_conn_abort(lsquic_stream_conn(stream)); 277 } 278} 279 280 281static void 282perf_client_on_close (struct lsquic_stream *stream, 283 struct lsquic_stream_ctx *stream_ctx) 284{ 285 struct lsquic_conn_ctx *conn_ctx; 286 struct lsquic_conn *conn; 287 288 conn = lsquic_stream_conn(stream); 289 conn_ctx = lsquic_conn_get_ctx(conn); 290 if (!perf_create_streams(conn, conn_ctx)) 291 { 292 LSQ_DEBUG("out of scenarios, will close connection"); 293 lsquic_conn_close(conn); 294 } 295 free(stream_ctx); 296} 297 298 299const struct lsquic_stream_if perf_stream_if = { 300 .on_new_conn = perf_client_on_new_conn, 301 .on_conn_closed = perf_client_on_conn_closed, 302 .on_new_stream = perf_client_on_new_stream, 303 .on_read = perf_client_on_read, 304 .on_write = perf_client_on_write, 305 .on_close = perf_client_on_close, 306}; 307 308 309static void 310usage (const char *prog) 311{ 312 const char *const slash = strrchr(prog, '/'); 313 if (slash) 314 prog = slash + 1; 315 printf( 316"Usage: %s [opts]\n" 317"\n" 318"Options:\n" 319" -p NREQ:NSEND Request NREQ bytes from server and, in addition, send\n" 320" NSEND bytes to server. May be specified many times\n" 321" and must be specified at least once.\n" 322" -T FILE Print stats to FILE. If FILE is -, print stats to stdout.\n" 323 , prog); 324} 325 326 327int 328main (int argc, char **argv) 329{ 330 char *p; 331 int opt, s; 332 struct sport_head sports; 333 struct scenario *scenario; 334 335 TAILQ_INIT(&sports); 336 prog_init(&s_prog, 0, &sports, &perf_stream_if, NULL); 337 s_prog.prog_api.ea_alpn = "perf"; 338 339 while (-1 != (opt = getopt(argc, argv, PROG_OPTS "hp:T:"))) 340 { 341 switch (opt) { 342 case 'p': 343 scenario = calloc(1, sizeof(*scenario)); 344 if (!scenario) 345 { 346 perror("calloc"); 347 exit(EXIT_FAILURE); 348 } 349 ++s_n_scenarios; 350 STAILQ_INSERT_TAIL(&s_scenarios, scenario, next); 351 scenario->bytes_to_request = strtoull(optarg, &p, 10); 352 if (*p != ':') 353 { 354 fprintf(stderr, "invalid scenario `%s'\n", optarg); 355 exit(EXIT_FAILURE); 356 } 357 scenario->bytes_to_send = strtoull(p + 1, NULL, 10); 358 break; 359 case 'T': 360 if (0 == strcmp(optarg, "-")) 361 s_prog.prog_api.ea_stats_fh = stdout; 362 else 363 { 364 s_prog.prog_api.ea_stats_fh = fopen(optarg, "w"); 365 if (!s_prog.prog_api.ea_stats_fh) 366 { 367 perror("fopen"); 368 exit(1); 369 } 370 } 371 break; 372 case 'h': 373 usage(argv[0]); 374 prog_print_common_options(&s_prog, stdout); 375 exit(0); 376 default: 377 if (0 != prog_set_opt(&s_prog, opt, optarg)) 378 exit(1); 379 } 380 } 381 382 if (STAILQ_EMPTY(&s_scenarios)) 383 { 384 fprintf(stderr, "please specify one of more requests using -p\n"); 385 exit(1); 386 } 387 388 if (0 != prog_prep(&s_prog)) 389 { 390 LSQ_ERROR("could not prep"); 391 exit(EXIT_FAILURE); 392 } 393 394 if (0 != prog_connect(&s_prog, NULL, 0)) 395 { 396 LSQ_ERROR("could not connect"); 397 exit(EXIT_FAILURE); 398 } 399 400 LSQ_DEBUG("entering event loop"); 401 402 s = prog_run(&s_prog); 403 prog_cleanup(&s_prog); 404 405 exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE); 406} 407