md5_client.c revision 06b2a236
1/* Copyright (c) 2017 - 2021 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * md5_client.c -- This client sends one or more files to MD5 QUIC server
4 *                 for MD5 sum calculation.
5 */
6
7#include <assert.h>
8#include <errno.h>
9#include <inttypes.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16
17#ifndef WIN32
18#include <unistd.h>
19#include <fcntl.h>
20#else
21#include "vc_compat.h"
22#include "getopt.h"
23#endif
24
25#include <event2/event.h>
26#include <openssl/md5.h>
27
28#include "lsquic.h"
29#include "test_common.h"
30#include "prog.h"
31
32#include "../src/liblsquic/lsquic_logger.h"
33#include "../src/liblsquic/lsquic_int_types.h"
34#include "../src/liblsquic/lsquic_varint.h"
35#include "../src/liblsquic/lsquic_hq.h"
36#include "../src/liblsquic/lsquic_sfcw.h"
37#include "../src/liblsquic/lsquic_hash.h"
38#include "../src/liblsquic/lsquic_stream.h"
39
40/* Set to non-zero value to test out what happens when reset is sent */
41#define RESET_AFTER_N_WRITES 0
42
43static int g_write_file = 1;
44
45#define LOCAL_BUF_SIZE 0x100
46
47static struct {
48    unsigned    stream_id;  /* If set, reset this stream ID */
49    off_t       offset;     /* Reset it after writing this many bytes */
50} g_reset_stream;
51
52struct file {
53    LIST_ENTRY(file)        next_file;
54    const char             *filename;
55    struct lsquic_reader    reader;
56    int                     fd;
57    unsigned                priority;
58    enum {
59        FILE_RESET  = (1 << 0),
60    }                       file_flags;
61    size_t                  md5_off;
62    char                    md5str[MD5_DIGEST_LENGTH * 2];
63};
64
65struct lsquic_conn_ctx;
66
67struct client_ctx {
68    struct lsquic_conn_ctx  *conn_h;
69    LIST_HEAD(, file)            files;
70    unsigned                     n_files;
71    struct file                 *cur_file;
72    lsquic_engine_t             *engine;
73    struct service_port         *sport;
74    struct prog                 *prog;
75};
76
77struct lsquic_conn_ctx {
78    lsquic_conn_t       *conn;
79    struct client_ctx   *client_ctx;
80};
81
82
83static lsquic_conn_ctx_t *
84client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
85{
86    struct client_ctx *client_ctx = stream_if_ctx;
87    lsquic_conn_ctx_t *conn_h = malloc(sizeof(*conn_h));
88    conn_h->conn = conn;
89    conn_h->client_ctx = client_ctx;
90    client_ctx->conn_h = conn_h;
91    assert(client_ctx->n_files > 0);
92    unsigned n = client_ctx->n_files;
93    while (n--)
94        lsquic_conn_make_stream(conn);
95    print_conn_info(conn);
96    return conn_h;
97}
98
99
100static void
101client_on_goaway_received (lsquic_conn_t *conn)
102{
103    LSQ_NOTICE("GOAWAY received");
104}
105
106
107static void
108client_on_conn_closed (lsquic_conn_t *conn)
109{
110    lsquic_conn_ctx_t *conn_h = lsquic_conn_get_ctx(conn);
111    LSQ_NOTICE("Connection closed");
112    prog_stop(conn_h->client_ctx->prog);
113    free(conn_h);
114}
115
116
117struct lsquic_stream_ctx {
118    lsquic_stream_t     *stream;
119    struct client_ctx   *client_ctx;
120    struct file         *file;
121    struct event        *read_stdin_ev;
122    struct {
123        int         initialized;
124        size_t      size,
125                    off;
126    }                    small;
127};
128
129
130static lsquic_stream_ctx_t *
131client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream)
132{
133    struct client_ctx *const client_ctx = stream_if_ctx;
134    if (!stream)
135    {
136        assert(client_ctx->n_files > 0);
137        LSQ_NOTICE("%s: got null stream: no more streams possible; # files: %u",
138                                                 __func__, client_ctx->n_files);
139        --client_ctx->n_files;
140        if (0 == client_ctx->n_files)
141        {
142            LSQ_DEBUG("closing connection");
143            lsquic_conn_close(client_ctx->conn_h->conn);
144        }
145        return NULL;
146    }
147    lsquic_stream_ctx_t *st_h = calloc(1, sizeof(*st_h));
148    st_h->stream = stream;
149    st_h->client_ctx = stream_if_ctx;
150    if (LIST_EMPTY(&st_h->client_ctx->files))
151    {
152        /* XXX: perhaps we should not be able to write immediately: there may
153         * be internal memory constraints...
154         */
155        lsquic_stream_write(stream, "client request", 14);
156        (void) lsquic_stream_flush(stream);
157        lsquic_stream_wantwrite(stream, 0);
158        lsquic_stream_wantread(stream, 1);
159    }
160    else
161    {
162        st_h->file = LIST_FIRST(&st_h->client_ctx->files);
163        if (g_write_file)
164        {
165            st_h->file->fd = -1;
166            st_h->file->reader.lsqr_read = test_reader_read;
167            st_h->file->reader.lsqr_size = test_reader_size;
168            st_h->file->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->file->filename);
169            if (!st_h->file->reader.lsqr_ctx)
170                exit(1);
171        }
172        else
173        {
174            st_h->file->fd = open(st_h->file->filename, O_RDONLY);
175            if (st_h->file->fd < 0)
176            {
177                LSQ_ERROR("could not open %s for reading: %s",
178                          st_h->file->filename, strerror(errno));
179                exit(1);
180            }
181        }
182        LIST_REMOVE(st_h->file, next_file);
183        lsquic_stream_set_priority(stream, st_h->file->priority);
184        lsquic_stream_wantwrite(stream, 1);
185    }
186    return st_h;
187}
188
189
190static size_t
191buf_reader_size (void *reader_ctx)
192{
193    lsquic_stream_ctx_t *const st_h = reader_ctx;
194    struct stat st;
195    off_t off;
196
197    if (st_h->small.initialized)
198        goto initialized;
199
200    if (0 != fstat(st_h->file->fd, &st))
201    {
202        LSQ_ERROR("fstat failed: %s", strerror(errno));
203        goto err;
204    }
205
206    off = lseek(st_h->file->fd, 0, SEEK_CUR);
207    if (off == (off_t) -1)
208    {
209        LSQ_ERROR("lseek failed: %s", strerror(errno));
210        goto err;
211    }
212
213    if (st.st_size < off)
214    {
215        LSQ_ERROR("size mismatch");
216        goto err;
217    }
218
219    st_h->small.initialized = 1;
220    st_h->small.off = off;
221    st_h->small.size = st.st_size;
222
223  initialized:
224    if (st_h->small.size - st_h->small.off > LOCAL_BUF_SIZE)
225        return LOCAL_BUF_SIZE;
226    else
227        return st_h->small.size - st_h->small.off;
228
229  err:
230    close(st_h->file->fd);
231    st_h->file->fd = 0;
232    return 0;
233}
234
235
236static size_t
237buf_reader_read (void *reader_ctx, void *buf, size_t count)
238{
239    lsquic_stream_ctx_t *const st_h = reader_ctx;
240    ssize_t nr;
241    unsigned char local_buf[LOCAL_BUF_SIZE];
242
243    assert(st_h->small.initialized);
244
245    if (count > sizeof(local_buf))
246        count = sizeof(local_buf);
247
248    nr = read(st_h->file->fd, local_buf, count);
249    if (nr < 0)
250    {
251        LSQ_ERROR("read: %s", strerror(errno));
252        close(st_h->file->fd);
253        st_h->file->fd = 0;
254        return 0;
255    }
256
257    memcpy(buf, local_buf, nr);
258    st_h->small.off += nr;
259    return nr;
260}
261
262
263static void
264client_file_on_write_buf (lsquic_stream_ctx_t *st_h)
265{
266    ssize_t nw;
267    struct lsquic_reader reader = {
268        .lsqr_read = buf_reader_read,
269        .lsqr_size = buf_reader_size,
270        .lsqr_ctx  = st_h,
271    };
272
273    if (g_reset_stream.stream_id == lsquic_stream_id(st_h->stream) &&
274        lseek(st_h->file->fd, 0, SEEK_CUR) >= g_reset_stream.offset)
275    {
276        lsquic_stream_reset(st_h->stream, 0x01    /* QUIC_INTERNAL_ERROR */);
277        g_reset_stream.stream_id = 0;   /* Reset only once */
278    }
279
280    nw = lsquic_stream_writef(st_h->stream, &reader);
281    if (-1 == nw)
282    {
283        if (ECONNRESET == errno)
284            st_h->file->file_flags |= FILE_RESET;
285        LSQ_WARN("lsquic_stream_read: %s", strerror(errno));
286        lsquic_stream_close(st_h->stream);
287        return;
288    }
289
290#if RESET_AFTER_N_WRITES
291    static int write_count = 0;
292    if (write_count++ > RESET_AFTER_N_WRITES)
293        lsquic_stream_reset(st_h->stream, 0);
294#endif
295
296    if (0 == nw)
297    {
298        (void) close(st_h->file->fd);
299        if (0 == lsquic_stream_shutdown(st_h->stream, 1))
300            lsquic_stream_wantread(st_h->stream, 1);
301        else
302        {
303            if (ECONNRESET == errno)
304                st_h->file->file_flags |= FILE_RESET;
305            LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno));
306            lsquic_stream_close(st_h->stream);
307        }
308    }
309}
310
311
312static void
313client_file_on_write_efficient (lsquic_stream_t *stream,
314                                                lsquic_stream_ctx_t *st_h)
315{
316    ssize_t nw;
317
318    nw = lsquic_stream_writef(stream, &st_h->file->reader);
319    if (nw < 0)
320    {
321        LSQ_ERROR("write error: %s", strerror(errno));
322        exit(1);
323    }
324    if (nw == 0)
325    {
326        destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx);
327        st_h->file->reader.lsqr_ctx = NULL;
328        if (0 == lsquic_stream_shutdown(st_h->stream, 1))
329            lsquic_stream_wantread(st_h->stream, 1);
330        else
331        {
332            if (ECONNRESET == errno)
333                st_h->file->file_flags |= FILE_RESET;
334            LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno));
335            lsquic_stream_close(st_h->stream);
336        }
337    }
338}
339
340
341static void
342client_file_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
343{
344    if (g_write_file)
345        client_file_on_write_efficient(stream, st_h);
346    else
347        client_file_on_write_buf(st_h);
348}
349
350
351static void
352client_file_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
353{
354    char buf;
355    /* We expect to read in 32-character MD5 string */
356    size_t ntoread = sizeof(st_h->file->md5str) - st_h->file->md5_off;
357    if (0 == ntoread)
358    {
359        lsquic_stream_wantread(stream, 0);
360        /* XXX What about an error (due to RST_STREAM) here: how are we to
361         *     handle it?
362         */
363        /* Expect a FIN */
364        if (0 == lsquic_stream_read(stream, &buf, sizeof(buf)))
365        {
366            LSQ_NOTICE("%.*s  %s", (int) sizeof(st_h->file->md5str),
367                                                    st_h->file->md5str,
368                                                    st_h->file->filename);
369            fflush(stdout);
370            LSQ_DEBUG("# of files: %d", st_h->client_ctx->n_files);
371            lsquic_stream_shutdown(stream, 0);
372        }
373        else
374            LSQ_ERROR("expected FIN from stream!");
375    }
376    else
377    {
378        ssize_t nr = lsquic_stream_read(stream,
379            st_h->file->md5str + st_h->file->md5_off, ntoread);
380        if (-1 == nr)
381        {
382            if (ECONNRESET == errno)
383                st_h->file->file_flags |= FILE_RESET;
384            LSQ_WARN("lsquic_stream_read: %s", strerror(errno));
385            lsquic_stream_close(stream);
386            return;
387        }
388        else
389            st_h->file->md5_off += nr;
390    }
391}
392
393
394static void
395client_file_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
396{
397    --st_h->client_ctx->n_files;
398    LSQ_NOTICE("%s called for stream %"PRIu64", # files: %u", __func__,
399                        lsquic_stream_id(stream), st_h->client_ctx->n_files);
400    if (0 == st_h->client_ctx->n_files)
401        lsquic_conn_close(st_h->client_ctx->conn_h->conn);
402    if (!(st_h->file->file_flags & FILE_RESET) && 0 == RESET_AFTER_N_WRITES)
403        assert(st_h->file->md5_off == sizeof(st_h->file->md5str));
404    if (st_h->file->reader.lsqr_ctx)
405    {
406        destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx);
407        st_h->file->reader.lsqr_ctx = NULL;
408    }
409    if (st_h->file->fd >= 0)
410        (void) close(st_h->file->fd);
411    free(st_h->file);
412    free(st_h);
413}
414
415
416const struct lsquic_stream_if client_file_stream_if = {
417    .on_new_conn            = client_on_new_conn,
418    .on_goaway_received     = client_on_goaway_received,
419    .on_conn_closed         = client_on_conn_closed,
420    .on_new_stream          = client_on_new_stream,
421    .on_read                = client_file_on_read,
422    .on_write               = client_file_on_write,
423    .on_close               = client_file_on_close,
424};
425
426
427static void
428usage (const char *prog)
429{
430    const char *const slash = strrchr(prog, '/');
431    if (slash)
432        prog = slash + 1;
433    printf(
434"Usage: %s [opts]\n"
435"\n"
436"Options:\n"
437"   -f FILE     File to send to the server -- must be specified at least\n"
438"                 once.\n"
439"   -b          Use buffering API for sending files over rather than\n"
440"                 the efficient version.\n"
441"   -p PRIORITY Applicatble to previous file specified with -f\n"
442"   -r STREAM_ID:OFFSET\n"
443"               Reset stream STREAM_ID after sending more that OFFSET bytes.\n"
444            , prog);
445}
446
447
448int
449main (int argc, char **argv)
450{
451    int opt, s;
452    struct sport_head sports;
453    struct prog prog;
454    struct client_ctx client_ctx;
455    struct file *file;
456
457    file = NULL;
458    memset(&client_ctx, 0, sizeof(client_ctx));
459    client_ctx.prog = &prog;
460
461    TAILQ_INIT(&sports);
462    prog_init(&prog, 0, &sports, &client_file_stream_if, &client_ctx);
463    prog.prog_api.ea_alpn = "md5";
464
465    while (-1 != (opt = getopt(argc, argv, PROG_OPTS "bhr:f:p:")))
466    {
467        switch (opt) {
468        case 'p':
469            if (file)
470                file->priority = atoi(optarg);
471            else
472            {
473                fprintf(stderr, "No file to apply priority to\n");
474                exit(1);
475            }
476            break;
477        case 'b':
478            g_write_file = 0;
479            break;
480        case 'f':
481            file = calloc(1, sizeof(*file));
482            LIST_INSERT_HEAD(&client_ctx.files, file, next_file);
483            ++client_ctx.n_files;
484            file->filename = optarg;
485            break;
486        case 'r':
487            g_reset_stream.stream_id = atoi(optarg);
488            g_reset_stream.offset = atoi(strchr(optarg, ':') + 1);
489            break;
490        case 'h':
491            usage(argv[0]);
492            prog_print_common_options(&prog, stdout);
493            exit(0);
494        default:
495            if (0 != prog_set_opt(&prog, opt, optarg))
496                exit(1);
497        }
498    }
499
500    if (LIST_EMPTY(&client_ctx.files))
501    {
502        fprintf(stderr, "please specify one of more files using -f\n");
503        exit(1);
504    }
505
506    if (0 != prog_prep(&prog))
507    {
508        LSQ_ERROR("could not prep");
509        exit(EXIT_FAILURE);
510    }
511    client_ctx.sport = TAILQ_FIRST(&sports);
512
513    if (0 != prog_connect(&prog, NULL, 0))
514    {
515        LSQ_ERROR("could not connect");
516        exit(EXIT_FAILURE);
517    }
518
519    LSQ_DEBUG("entering event loop");
520
521    s = prog_run(&prog);
522    prog_cleanup(&prog);
523
524    exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE);
525}
526