1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * md5_client.c -- This client sends one or more files to MD5 QUIC server 4 * for MD5 sum calculation. 5 */ 6 7#include <assert.h> 8#include <errno.h> 9#include <inttypes.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/types.h> 15#include <sys/stat.h> 16 17#ifndef WIN32 18#include <unistd.h> 19#include <fcntl.h> 20#else 21#include "vc_compat.h" 22#include "getopt.h" 23#endif 24 25#include <event2/event.h> 26#include <openssl/md5.h> 27 28#include "lsquic.h" 29#include "test_common.h" 30#include "prog.h" 31 32#include "../src/liblsquic/lsquic_logger.h" 33#include "../src/liblsquic/lsquic_int_types.h" 34#include "../src/liblsquic/lsquic_varint.h" 35#include "../src/liblsquic/lsquic_hq.h" 36#include "../src/liblsquic/lsquic_sfcw.h" 37#include "../src/liblsquic/lsquic_hash.h" 38#include "../src/liblsquic/lsquic_stream.h" 39 40/* Set to non-zero value to test out what happens when reset is sent */ 41#define RESET_AFTER_N_WRITES 0 42 43static int g_write_file = 1; 44 45#define LOCAL_BUF_SIZE 0x100 46 47static struct { 48 unsigned stream_id; /* If set, reset this stream ID */ 49 off_t offset; /* Reset it after writing this many bytes */ 50} g_reset_stream; 51 52struct file { 53 LIST_ENTRY(file) next_file; 54 const char *filename; 55 struct lsquic_reader reader; 56 int fd; 57 unsigned priority; 58 enum { 59 FILE_RESET = (1 << 0), 60 } file_flags; 61 size_t md5_off; 62 char md5str[MD5_DIGEST_LENGTH * 2]; 63}; 64 65struct lsquic_conn_ctx; 66 67struct client_ctx { 68 struct lsquic_conn_ctx *conn_h; 69 LIST_HEAD(, file) files; 70 unsigned n_files; 71 struct file *cur_file; 72 lsquic_engine_t *engine; 73 struct service_port *sport; 74 struct prog *prog; 75}; 76 77struct lsquic_conn_ctx { 78 lsquic_conn_t *conn; 79 struct client_ctx *client_ctx; 80}; 81 82 83static lsquic_conn_ctx_t * 84client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) 85{ 86 struct client_ctx *client_ctx = stream_if_ctx; 87 lsquic_conn_ctx_t *conn_h = malloc(sizeof(*conn_h)); 88 conn_h->conn = conn; 89 conn_h->client_ctx = client_ctx; 90 client_ctx->conn_h = conn_h; 91 assert(client_ctx->n_files > 0); 92 unsigned n = client_ctx->n_files; 93 while (n--) 94 lsquic_conn_make_stream(conn); 95 print_conn_info(conn); 96 return conn_h; 97} 98 99 100static void 101client_on_goaway_received (lsquic_conn_t *conn) 102{ 103 LSQ_NOTICE("GOAWAY received"); 104} 105 106 107static void 108client_on_conn_closed (lsquic_conn_t *conn) 109{ 110 lsquic_conn_ctx_t *conn_h = lsquic_conn_get_ctx(conn); 111 LSQ_NOTICE("Connection closed"); 112 prog_stop(conn_h->client_ctx->prog); 113 free(conn_h); 114} 115 116 117struct lsquic_stream_ctx { 118 lsquic_stream_t *stream; 119 struct client_ctx *client_ctx; 120 struct file *file; 121 struct event *read_stdin_ev; 122 struct { 123 int initialized; 124 size_t size, 125 off; 126 } small; 127}; 128 129 130static lsquic_stream_ctx_t * 131client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) 132{ 133 struct client_ctx *const client_ctx = stream_if_ctx; 134 if (!stream) 135 { 136 assert(client_ctx->n_files > 0); 137 LSQ_NOTICE("%s: got null stream: no more streams possible; # files: %u", 138 __func__, client_ctx->n_files); 139 --client_ctx->n_files; 140 if (0 == client_ctx->n_files) 141 { 142 LSQ_DEBUG("closing connection"); 143 lsquic_conn_close(client_ctx->conn_h->conn); 144 } 145 return NULL; 146 } 147 lsquic_stream_ctx_t *st_h = calloc(1, sizeof(*st_h)); 148 st_h->stream = stream; 149 st_h->client_ctx = stream_if_ctx; 150 if (LIST_EMPTY(&st_h->client_ctx->files)) 151 { 152 /* XXX: perhaps we should not be able to write immediately: there may 153 * be internal memory constraints... 154 */ 155 lsquic_stream_write(stream, "client request", 14); 156 (void) lsquic_stream_flush(stream); 157 lsquic_stream_wantwrite(stream, 0); 158 lsquic_stream_wantread(stream, 1); 159 } 160 else 161 { 162 st_h->file = LIST_FIRST(&st_h->client_ctx->files); 163 if (g_write_file) 164 { 165 st_h->file->fd = -1; 166 st_h->file->reader.lsqr_read = test_reader_read; 167 st_h->file->reader.lsqr_size = test_reader_size; 168 st_h->file->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->file->filename); 169 if (!st_h->file->reader.lsqr_ctx) 170 exit(1); 171 } 172 else 173 { 174 st_h->file->fd = open(st_h->file->filename, O_RDONLY); 175 if (st_h->file->fd < 0) 176 { 177 LSQ_ERROR("could not open %s for reading: %s", 178 st_h->file->filename, strerror(errno)); 179 exit(1); 180 } 181 } 182 LIST_REMOVE(st_h->file, next_file); 183 lsquic_stream_set_priority(stream, st_h->file->priority); 184 lsquic_stream_wantwrite(stream, 1); 185 } 186 return st_h; 187} 188 189 190static size_t 191buf_reader_size (void *reader_ctx) 192{ 193 lsquic_stream_ctx_t *const st_h = reader_ctx; 194 struct stat st; 195 off_t off; 196 197 if (st_h->small.initialized) 198 goto initialized; 199 200 if (0 != fstat(st_h->file->fd, &st)) 201 { 202 LSQ_ERROR("fstat failed: %s", strerror(errno)); 203 goto err; 204 } 205 206 off = lseek(st_h->file->fd, 0, SEEK_CUR); 207 if (off == (off_t) -1) 208 { 209 LSQ_ERROR("lseek failed: %s", strerror(errno)); 210 goto err; 211 } 212 213 if (st.st_size < off) 214 { 215 LSQ_ERROR("size mismatch"); 216 goto err; 217 } 218 219 st_h->small.initialized = 1; 220 st_h->small.off = off; 221 st_h->small.size = st.st_size; 222 223 initialized: 224 if (st_h->small.size - st_h->small.off > LOCAL_BUF_SIZE) 225 return LOCAL_BUF_SIZE; 226 else 227 return st_h->small.size - st_h->small.off; 228 229 err: 230 close(st_h->file->fd); 231 st_h->file->fd = 0; 232 return 0; 233} 234 235 236static size_t 237buf_reader_read (void *reader_ctx, void *buf, size_t count) 238{ 239 lsquic_stream_ctx_t *const st_h = reader_ctx; 240 ssize_t nr; 241 unsigned char local_buf[LOCAL_BUF_SIZE]; 242 243 assert(st_h->small.initialized); 244 245 if (count > sizeof(local_buf)) 246 count = sizeof(local_buf); 247 248 nr = read(st_h->file->fd, local_buf, count); 249 if (nr < 0) 250 { 251 LSQ_ERROR("read: %s", strerror(errno)); 252 close(st_h->file->fd); 253 st_h->file->fd = 0; 254 return 0; 255 } 256 257 memcpy(buf, local_buf, nr); 258 st_h->small.off += nr; 259 return nr; 260} 261 262 263static void 264client_file_on_write_buf (lsquic_stream_ctx_t *st_h) 265{ 266 ssize_t nw; 267 struct lsquic_reader reader = { 268 .lsqr_read = buf_reader_read, 269 .lsqr_size = buf_reader_size, 270 .lsqr_ctx = st_h, 271 }; 272 273 if (g_reset_stream.stream_id == lsquic_stream_id(st_h->stream) && 274 lseek(st_h->file->fd, 0, SEEK_CUR) >= g_reset_stream.offset) 275 { 276 /* Note: this is an internal function */ 277 lsquic_stream_maybe_reset(st_h->stream, 278 0x01 /* QUIC_INTERNAL_ERROR */, 1); 279 g_reset_stream.stream_id = 0; /* Reset only once */ 280 } 281 282 nw = lsquic_stream_writef(st_h->stream, &reader); 283 if (-1 == nw) 284 { 285 if (ECONNRESET == errno) 286 st_h->file->file_flags |= FILE_RESET; 287 LSQ_WARN("lsquic_stream_read: %s", strerror(errno)); 288 lsquic_stream_close(st_h->stream); 289 return; 290 } 291 292#if RESET_AFTER_N_WRITES 293 static int write_count = 0; 294 if (write_count++ > RESET_AFTER_N_WRITES) 295 lsquic_stream_reset(st_h->stream, 0); 296#endif 297 298 if (0 == nw) 299 { 300 (void) close(st_h->file->fd); 301 if (0 == lsquic_stream_shutdown(st_h->stream, 1)) 302 lsquic_stream_wantread(st_h->stream, 1); 303 else 304 { 305 if (ECONNRESET == errno) 306 st_h->file->file_flags |= FILE_RESET; 307 LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno)); 308 lsquic_stream_close(st_h->stream); 309 } 310 } 311} 312 313 314static void 315client_file_on_write_efficient (lsquic_stream_t *stream, 316 lsquic_stream_ctx_t *st_h) 317{ 318 ssize_t nw; 319 320 nw = lsquic_stream_writef(stream, &st_h->file->reader); 321 if (nw < 0) 322 { 323 LSQ_ERROR("write error: %s", strerror(errno)); 324 exit(1); 325 } 326 if (nw == 0) 327 { 328 destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx); 329 st_h->file->reader.lsqr_ctx = NULL; 330 if (0 == lsquic_stream_shutdown(st_h->stream, 1)) 331 lsquic_stream_wantread(st_h->stream, 1); 332 else 333 { 334 if (ECONNRESET == errno) 335 st_h->file->file_flags |= FILE_RESET; 336 LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno)); 337 lsquic_stream_close(st_h->stream); 338 } 339 } 340} 341 342 343static void 344client_file_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 345{ 346 if (g_write_file) 347 client_file_on_write_efficient(stream, st_h); 348 else 349 client_file_on_write_buf(st_h); 350} 351 352 353static void 354client_file_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 355{ 356 char buf; 357 /* We expect to read in 32-character MD5 string */ 358 size_t ntoread = sizeof(st_h->file->md5str) - st_h->file->md5_off; 359 if (0 == ntoread) 360 { 361 lsquic_stream_wantread(stream, 0); 362 /* XXX What about an error (due to RST_STREAM) here: how are we to 363 * handle it? 364 */ 365 /* Expect a FIN */ 366 if (0 == lsquic_stream_read(stream, &buf, sizeof(buf))) 367 { 368 LSQ_NOTICE("%.*s %s", (int) sizeof(st_h->file->md5str), 369 st_h->file->md5str, 370 st_h->file->filename); 371 fflush(stdout); 372 LSQ_DEBUG("# of files: %d", st_h->client_ctx->n_files); 373 lsquic_stream_shutdown(stream, 0); 374 } 375 else 376 LSQ_ERROR("expected FIN from stream!"); 377 } 378 else 379 { 380 ssize_t nr = lsquic_stream_read(stream, 381 st_h->file->md5str + st_h->file->md5_off, ntoread); 382 if (-1 == nr) 383 { 384 if (ECONNRESET == errno) 385 st_h->file->file_flags |= FILE_RESET; 386 LSQ_WARN("lsquic_stream_read: %s", strerror(errno)); 387 lsquic_stream_close(stream); 388 return; 389 } 390 else 391 st_h->file->md5_off += nr; 392 } 393} 394 395 396static void 397client_file_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 398{ 399 --st_h->client_ctx->n_files; 400 LSQ_NOTICE("%s called for stream %"PRIu64", # files: %u", __func__, 401 lsquic_stream_id(stream), st_h->client_ctx->n_files); 402 if (0 == st_h->client_ctx->n_files) 403 lsquic_conn_close(st_h->client_ctx->conn_h->conn); 404 if (!(st_h->file->file_flags & FILE_RESET) && 0 == RESET_AFTER_N_WRITES) 405 assert(st_h->file->md5_off == sizeof(st_h->file->md5str)); 406 if (st_h->file->reader.lsqr_ctx) 407 { 408 destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx); 409 st_h->file->reader.lsqr_ctx = NULL; 410 } 411 if (st_h->file->fd >= 0) 412 (void) close(st_h->file->fd); 413 free(st_h->file); 414 free(st_h); 415} 416 417 418const struct lsquic_stream_if client_file_stream_if = { 419 .on_new_conn = client_on_new_conn, 420 .on_goaway_received = client_on_goaway_received, 421 .on_conn_closed = client_on_conn_closed, 422 .on_new_stream = client_on_new_stream, 423 .on_read = client_file_on_read, 424 .on_write = client_file_on_write, 425 .on_close = client_file_on_close, 426}; 427 428 429static void 430usage (const char *prog) 431{ 432 const char *const slash = strrchr(prog, '/'); 433 if (slash) 434 prog = slash + 1; 435 printf( 436"Usage: %s [opts]\n" 437"\n" 438"Options:\n" 439" -f FILE File to send to the server -- must be specified at least\n" 440" once.\n" 441" -b Use buffering API for sending files over rather than\n" 442" the efficient version.\n" 443" -p PRIORITY Applicatble to previous file specified with -f\n" 444" -r STREAM_ID:OFFSET\n" 445" Reset stream STREAM_ID after sending more that OFFSET bytes.\n" 446 , prog); 447} 448 449 450int 451main (int argc, char **argv) 452{ 453 int opt, s; 454 struct sport_head sports; 455 struct prog prog; 456 struct client_ctx client_ctx; 457 struct file *file; 458 459 file = NULL; 460 memset(&client_ctx, 0, sizeof(client_ctx)); 461 client_ctx.prog = &prog; 462 463 TAILQ_INIT(&sports); 464 prog_init(&prog, 0, &sports, &client_file_stream_if, &client_ctx); 465 prog.prog_api.ea_alpn = "md5"; 466 467 while (-1 != (opt = getopt(argc, argv, PROG_OPTS "bhr:f:p:"))) 468 { 469 switch (opt) { 470 case 'p': 471 if (file) 472 file->priority = atoi(optarg); 473 else 474 { 475 fprintf(stderr, "No file to apply priority to\n"); 476 exit(1); 477 } 478 break; 479 case 'b': 480 g_write_file = 0; 481 break; 482 case 'f': 483 file = calloc(1, sizeof(*file)); 484 LIST_INSERT_HEAD(&client_ctx.files, file, next_file); 485 ++client_ctx.n_files; 486 file->filename = optarg; 487 break; 488 case 'r': 489 g_reset_stream.stream_id = atoi(optarg); 490 g_reset_stream.offset = atoi(strchr(optarg, ':') + 1); 491 break; 492 case 'h': 493 usage(argv[0]); 494 prog_print_common_options(&prog, stdout); 495 exit(0); 496 default: 497 if (0 != prog_set_opt(&prog, opt, optarg)) 498 exit(1); 499 } 500 } 501 502 if (LIST_EMPTY(&client_ctx.files)) 503 { 504 fprintf(stderr, "please specify one of more files using -f\n"); 505 exit(1); 506 } 507 508 if (0 != prog_prep(&prog)) 509 { 510 LSQ_ERROR("could not prep"); 511 exit(EXIT_FAILURE); 512 } 513 client_ctx.sport = TAILQ_FIRST(&sports); 514 515 if (0 != prog_connect(&prog, NULL, 0)) 516 { 517 LSQ_ERROR("could not connect"); 518 exit(EXIT_FAILURE); 519 } 520 521 LSQ_DEBUG("entering event loop"); 522 523 s = prog_run(&prog); 524 prog_cleanup(&prog); 525 526 exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE); 527} 528