md5_client.c revision 4429f8ea
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * md5_client.c -- This client sends one or more files to MD5 QUIC server 4 * for MD5 sum calculation. 5 */ 6 7#include <assert.h> 8#include <errno.h> 9#include <inttypes.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/types.h> 15#include <sys/stat.h> 16 17#ifndef WIN32 18#include <unistd.h> 19#include <fcntl.h> 20#else 21#include "vc_compat.h" 22#include "getopt.h" 23#endif 24 25#include <event2/event.h> 26#include <openssl/md5.h> 27 28#include "lsquic.h" 29#include "test_common.h" 30#include "prog.h" 31 32#include "../src/liblsquic/lsquic_logger.h" 33#include "../src/liblsquic/lsquic_int_types.h" 34#include "../src/liblsquic/lsquic_varint.h" 35#include "../src/liblsquic/lsquic_hq.h" 36#include "../src/liblsquic/lsquic_sfcw.h" 37#include "../src/liblsquic/lsquic_hash.h" 38#include "../src/liblsquic/lsquic_stream.h" 39 40/* Set to non-zero value to test out what happens when reset is sent */ 41#define RESET_AFTER_N_WRITES 0 42 43static int g_write_file = 1; 44 45#define LOCAL_BUF_SIZE 0x100 46 47static struct { 48 unsigned stream_id; /* If set, reset this stream ID */ 49 off_t offset; /* Reset it after writing this many bytes */ 50} g_reset_stream; 51 52struct file { 53 LIST_ENTRY(file) next_file; 54 const char *filename; 55 struct lsquic_reader reader; 56 int fd; 57 unsigned priority; 58 enum { 59 FILE_RESET = (1 << 0), 60 } file_flags; 61 size_t md5_off; 62 char md5str[MD5_DIGEST_LENGTH * 2]; 63}; 64 65struct lsquic_conn_ctx; 66 67struct client_ctx { 68 struct lsquic_conn_ctx *conn_h; 69 LIST_HEAD(, file) files; 70 unsigned n_files; 71 struct file *cur_file; 72 lsquic_engine_t *engine; 73 struct service_port *sport; 74 struct prog *prog; 75}; 76 77struct lsquic_conn_ctx { 78 lsquic_conn_t *conn; 79 struct client_ctx *client_ctx; 80}; 81 82 83static lsquic_conn_ctx_t * 84client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) 85{ 86 struct client_ctx *client_ctx = stream_if_ctx; 87 lsquic_conn_ctx_t *conn_h = malloc(sizeof(*conn_h)); 88 conn_h->conn = conn; 89 conn_h->client_ctx = client_ctx; 90 client_ctx->conn_h = conn_h; 91 assert(client_ctx->n_files > 0); 92 unsigned n = client_ctx->n_files; 93 while (n--) 94 lsquic_conn_make_stream(conn); 95 print_conn_info(conn); 96 return conn_h; 97} 98 99 100static void 101client_on_goaway_received (lsquic_conn_t *conn) 102{ 103 LSQ_NOTICE("GOAWAY received"); 104} 105 106 107static void 108client_on_conn_closed (lsquic_conn_t *conn) 109{ 110 lsquic_conn_ctx_t *conn_h = lsquic_conn_get_ctx(conn); 111 LSQ_NOTICE("Connection closed"); 112 prog_stop(conn_h->client_ctx->prog); 113 free(conn_h); 114} 115 116 117struct lsquic_stream_ctx { 118 lsquic_stream_t *stream; 119 struct client_ctx *client_ctx; 120 struct file *file; 121 struct event *read_stdin_ev; 122 struct { 123 int initialized; 124 size_t size, 125 off; 126 } small; 127}; 128 129 130static lsquic_stream_ctx_t * 131client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) 132{ 133 struct client_ctx *const client_ctx = stream_if_ctx; 134 if (!stream) 135 { 136 assert(client_ctx->n_files > 0); 137 LSQ_NOTICE("%s: got null stream: no more streams possible; # files: %u", 138 __func__, client_ctx->n_files); 139 --client_ctx->n_files; 140 if (0 == client_ctx->n_files) 141 { 142 LSQ_DEBUG("closing connection"); 143 lsquic_conn_close(client_ctx->conn_h->conn); 144 } 145 return NULL; 146 } 147 lsquic_stream_ctx_t *st_h = calloc(1, sizeof(*st_h)); 148 st_h->stream = stream; 149 st_h->client_ctx = stream_if_ctx; 150 if (LIST_EMPTY(&st_h->client_ctx->files)) 151 { 152 /* XXX: perhaps we should not be able to write immediately: there may 153 * be internal memory constraints... 154 */ 155 lsquic_stream_write(stream, "client request", 14); 156 (void) lsquic_stream_flush(stream); 157 lsquic_stream_wantwrite(stream, 0); 158 lsquic_stream_wantread(stream, 1); 159 } 160 else 161 { 162 st_h->file = LIST_FIRST(&st_h->client_ctx->files); 163 if (g_write_file) 164 { 165 st_h->file->fd = -1; 166 st_h->file->reader.lsqr_read = test_reader_read; 167 st_h->file->reader.lsqr_size = test_reader_size; 168 st_h->file->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->file->filename); 169 if (!st_h->file->reader.lsqr_ctx) 170 exit(1); 171 } 172 else 173 { 174 st_h->file->fd = open(st_h->file->filename, O_RDONLY); 175 if (st_h->file->fd < 0) 176 { 177 LSQ_ERROR("could not open %s for reading: %s", 178 st_h->file->filename, strerror(errno)); 179 exit(1); 180 } 181 } 182 LIST_REMOVE(st_h->file, next_file); 183 lsquic_stream_set_priority(stream, st_h->file->priority); 184 lsquic_stream_wantwrite(stream, 1); 185 } 186 return st_h; 187} 188 189 190static size_t 191buf_reader_size (void *reader_ctx) 192{ 193 lsquic_stream_ctx_t *const st_h = reader_ctx; 194 struct stat st; 195 off_t off; 196 197 if (st_h->small.initialized) 198 goto initialized; 199 200 if (0 != fstat(st_h->file->fd, &st)) 201 { 202 LSQ_ERROR("fstat failed: %s", strerror(errno)); 203 goto err; 204 } 205 206 off = lseek(st_h->file->fd, 0, SEEK_CUR); 207 if (off == (off_t) -1) 208 { 209 LSQ_ERROR("lseek failed: %s", strerror(errno)); 210 goto err; 211 } 212 213 if (st.st_size < off) 214 { 215 LSQ_ERROR("size mismatch"); 216 goto err; 217 } 218 219 st_h->small.initialized = 1; 220 st_h->small.off = off; 221 st_h->small.size = st.st_size; 222 223 initialized: 224 if (st_h->small.size - st_h->small.off > LOCAL_BUF_SIZE) 225 return LOCAL_BUF_SIZE; 226 else 227 return st_h->small.size - st_h->small.off; 228 229 err: 230 close(st_h->file->fd); 231 st_h->file->fd = 0; 232 return 0; 233} 234 235 236static size_t 237buf_reader_read (void *reader_ctx, void *buf, size_t count) 238{ 239 lsquic_stream_ctx_t *const st_h = reader_ctx; 240 ssize_t nr; 241 unsigned char local_buf[LOCAL_BUF_SIZE]; 242 243 assert(st_h->small.initialized); 244 245 if (count > sizeof(local_buf)) 246 count = sizeof(local_buf); 247 248 nr = read(st_h->file->fd, local_buf, count); 249 if (nr < 0) 250 { 251 LSQ_ERROR("read: %s", strerror(errno)); 252 close(st_h->file->fd); 253 st_h->file->fd = 0; 254 return 0; 255 } 256 257 memcpy(buf, local_buf, nr); 258 st_h->small.off += nr; 259 return nr; 260} 261 262 263static void 264client_file_on_write_buf (lsquic_stream_ctx_t *st_h) 265{ 266 ssize_t nw; 267 struct lsquic_reader reader = { 268 .lsqr_read = buf_reader_read, 269 .lsqr_size = buf_reader_size, 270 .lsqr_ctx = st_h, 271 }; 272 273 if (g_reset_stream.stream_id == lsquic_stream_id(st_h->stream) && 274 lseek(st_h->file->fd, 0, SEEK_CUR) >= g_reset_stream.offset) 275 { 276 lsquic_stream_reset(st_h->stream, 0x01 /* QUIC_INTERNAL_ERROR */); 277 g_reset_stream.stream_id = 0; /* Reset only once */ 278 } 279 280 nw = lsquic_stream_writef(st_h->stream, &reader); 281 if (-1 == nw) 282 { 283 if (ECONNRESET == errno) 284 st_h->file->file_flags |= FILE_RESET; 285 LSQ_WARN("lsquic_stream_read: %s", strerror(errno)); 286 lsquic_stream_close(st_h->stream); 287 return; 288 } 289 290#if RESET_AFTER_N_WRITES 291 static int write_count = 0; 292 if (write_count++ > RESET_AFTER_N_WRITES) 293 lsquic_stream_reset(st_h->stream, 0); 294#endif 295 296 if (0 == nw) 297 { 298 (void) close(st_h->file->fd); 299 if (0 == lsquic_stream_shutdown(st_h->stream, 1)) 300 lsquic_stream_wantread(st_h->stream, 1); 301 else 302 { 303 if (ECONNRESET == errno) 304 st_h->file->file_flags |= FILE_RESET; 305 LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno)); 306 lsquic_stream_close(st_h->stream); 307 } 308 } 309} 310 311 312static void 313client_file_on_write_efficient (lsquic_stream_t *stream, 314 lsquic_stream_ctx_t *st_h) 315{ 316 ssize_t nw; 317 318 nw = lsquic_stream_writef(stream, &st_h->file->reader); 319 if (nw < 0) 320 { 321 LSQ_ERROR("write error: %s", strerror(errno)); 322 exit(1); 323 } 324 if (nw == 0) 325 { 326 destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx); 327 st_h->file->reader.lsqr_ctx = NULL; 328 if (0 == lsquic_stream_shutdown(st_h->stream, 1)) 329 lsquic_stream_wantread(st_h->stream, 1); 330 else 331 { 332 if (ECONNRESET == errno) 333 st_h->file->file_flags |= FILE_RESET; 334 LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno)); 335 lsquic_stream_close(st_h->stream); 336 } 337 } 338} 339 340 341static void 342client_file_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 343{ 344 if (g_write_file) 345 client_file_on_write_efficient(stream, st_h); 346 else 347 client_file_on_write_buf(st_h); 348} 349 350 351static void 352client_file_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 353{ 354 char buf; 355 /* We expect to read in 32-character MD5 string */ 356 size_t ntoread = sizeof(st_h->file->md5str) - st_h->file->md5_off; 357 if (0 == ntoread) 358 { 359 lsquic_stream_wantread(stream, 0); 360 /* XXX What about an error (due to RST_STREAM) here: how are we to 361 * handle it? 362 */ 363 /* Expect a FIN */ 364 if (0 == lsquic_stream_read(stream, &buf, sizeof(buf))) 365 { 366 LSQ_NOTICE("%.*s %s", (int) sizeof(st_h->file->md5str), 367 st_h->file->md5str, 368 st_h->file->filename); 369 fflush(stdout); 370 LSQ_DEBUG("# of files: %d", st_h->client_ctx->n_files); 371 lsquic_stream_shutdown(stream, 0); 372 } 373 else 374 LSQ_ERROR("expected FIN from stream!"); 375 } 376 else 377 { 378 ssize_t nr = lsquic_stream_read(stream, 379 st_h->file->md5str + st_h->file->md5_off, ntoread); 380 if (-1 == nr) 381 { 382 if (ECONNRESET == errno) 383 st_h->file->file_flags |= FILE_RESET; 384 LSQ_WARN("lsquic_stream_read: %s", strerror(errno)); 385 lsquic_stream_close(stream); 386 return; 387 } 388 else 389 st_h->file->md5_off += nr; 390 } 391} 392 393 394static void 395client_file_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) 396{ 397 --st_h->client_ctx->n_files; 398 LSQ_NOTICE("%s called for stream %"PRIu64", # files: %u", __func__, 399 lsquic_stream_id(stream), st_h->client_ctx->n_files); 400 if (0 == st_h->client_ctx->n_files) 401 lsquic_conn_close(st_h->client_ctx->conn_h->conn); 402 if (!(st_h->file->file_flags & FILE_RESET) && 0 == RESET_AFTER_N_WRITES) 403 assert(st_h->file->md5_off == sizeof(st_h->file->md5str)); 404 if (st_h->file->reader.lsqr_ctx) 405 { 406 destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx); 407 st_h->file->reader.lsqr_ctx = NULL; 408 } 409 if (st_h->file->fd >= 0) 410 (void) close(st_h->file->fd); 411 free(st_h->file); 412 free(st_h); 413} 414 415 416const struct lsquic_stream_if client_file_stream_if = { 417 .on_new_conn = client_on_new_conn, 418 .on_goaway_received = client_on_goaway_received, 419 .on_conn_closed = client_on_conn_closed, 420 .on_new_stream = client_on_new_stream, 421 .on_read = client_file_on_read, 422 .on_write = client_file_on_write, 423 .on_close = client_file_on_close, 424}; 425 426 427static void 428usage (const char *prog) 429{ 430 const char *const slash = strrchr(prog, '/'); 431 if (slash) 432 prog = slash + 1; 433 printf( 434"Usage: %s [opts]\n" 435"\n" 436"Options:\n" 437" -f FILE File to send to the server -- must be specified at least\n" 438" once.\n" 439" -b Use buffering API for sending files over rather than\n" 440" the efficient version.\n" 441" -p PRIORITY Applicatble to previous file specified with -f\n" 442" -r STREAM_ID:OFFSET\n" 443" Reset stream STREAM_ID after sending more that OFFSET bytes.\n" 444 , prog); 445} 446 447 448int 449main (int argc, char **argv) 450{ 451 int opt, s; 452 struct sport_head sports; 453 struct prog prog; 454 struct client_ctx client_ctx; 455 struct file *file; 456 457 file = NULL; 458 memset(&client_ctx, 0, sizeof(client_ctx)); 459 client_ctx.prog = &prog; 460 461 TAILQ_INIT(&sports); 462 prog_init(&prog, 0, &sports, &client_file_stream_if, &client_ctx); 463 prog.prog_api.ea_alpn = "md5"; 464 465 while (-1 != (opt = getopt(argc, argv, PROG_OPTS "bhr:f:p:"))) 466 { 467 switch (opt) { 468 case 'p': 469 if (file) 470 file->priority = atoi(optarg); 471 else 472 { 473 fprintf(stderr, "No file to apply priority to\n"); 474 exit(1); 475 } 476 break; 477 case 'b': 478 g_write_file = 0; 479 break; 480 case 'f': 481 file = calloc(1, sizeof(*file)); 482 LIST_INSERT_HEAD(&client_ctx.files, file, next_file); 483 ++client_ctx.n_files; 484 file->filename = optarg; 485 break; 486 case 'r': 487 g_reset_stream.stream_id = atoi(optarg); 488 g_reset_stream.offset = atoi(strchr(optarg, ':') + 1); 489 break; 490 case 'h': 491 usage(argv[0]); 492 prog_print_common_options(&prog, stdout); 493 exit(0); 494 default: 495 if (0 != prog_set_opt(&prog, opt, optarg)) 496 exit(1); 497 } 498 } 499 500 if (LIST_EMPTY(&client_ctx.files)) 501 { 502 fprintf(stderr, "please specify one of more files using -f\n"); 503 exit(1); 504 } 505 506 if (0 != prog_prep(&prog)) 507 { 508 LSQ_ERROR("could not prep"); 509 exit(EXIT_FAILURE); 510 } 511 client_ctx.sport = TAILQ_FIRST(&sports); 512 513 if (0 != prog_connect(&prog, NULL, 0)) 514 { 515 LSQ_ERROR("could not connect"); 516 exit(EXIT_FAILURE); 517 } 518 519 LSQ_DEBUG("entering event loop"); 520 521 s = prog_run(&prog); 522 prog_cleanup(&prog); 523 524 exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE); 525} 526