md5_client.c revision 99a1ad0f
1/* Copyright (c) 2017 - 2021 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * md5_client.c -- This client sends one or more files to MD5 QUIC server
4 *                 for MD5 sum calculation.
5 */
6
7#include <assert.h>
8#include <errno.h>
9#include <inttypes.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/types.h>
15#include <sys/stat.h>
16
17#ifndef WIN32
18#include <unistd.h>
19#include <fcntl.h>
20#else
21#include "vc_compat.h"
22#include "getopt.h"
23#endif
24
25#include <event2/event.h>
26#include <openssl/md5.h>
27
28#include "lsquic.h"
29#include "test_common.h"
30#include "prog.h"
31
32#include "../src/liblsquic/lsquic_logger.h"
33#include "../src/liblsquic/lsquic_int_types.h"
34#include "../src/liblsquic/lsquic_varint.h"
35#include "../src/liblsquic/lsquic_hq.h"
36#include "../src/liblsquic/lsquic_sfcw.h"
37#include "../src/liblsquic/lsquic_hash.h"
38#include "../src/liblsquic/lsquic_stream.h"
39
40/* Set to non-zero value to test out what happens when reset is sent */
41#define RESET_AFTER_N_WRITES 0
42
43static int g_write_file = 1;
44
45#define LOCAL_BUF_SIZE 0x100
46
47static struct {
48    unsigned    stream_id;  /* If set, reset this stream ID */
49    off_t       offset;     /* Reset it after writing this many bytes */
50} g_reset_stream;
51
52struct file {
53    LIST_ENTRY(file)        next_file;
54    const char             *filename;
55    struct lsquic_reader    reader;
56    int                     fd;
57    unsigned                priority;
58    enum {
59        FILE_RESET  = (1 << 0),
60    }                       file_flags;
61    size_t                  md5_off;
62    char                    md5str[MD5_DIGEST_LENGTH * 2];
63};
64
65struct lsquic_conn_ctx;
66
67struct client_ctx {
68    struct lsquic_conn_ctx  *conn_h;
69    LIST_HEAD(, file)            files;
70    unsigned                     n_files;
71    struct file                 *cur_file;
72    lsquic_engine_t             *engine;
73    struct service_port         *sport;
74    struct prog                 *prog;
75};
76
77struct lsquic_conn_ctx {
78    lsquic_conn_t       *conn;
79    struct client_ctx   *client_ctx;
80};
81
82
83static lsquic_conn_ctx_t *
84client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
85{
86    struct client_ctx *client_ctx = stream_if_ctx;
87    lsquic_conn_ctx_t *conn_h = malloc(sizeof(*conn_h));
88    conn_h->conn = conn;
89    conn_h->client_ctx = client_ctx;
90    client_ctx->conn_h = conn_h;
91    assert(client_ctx->n_files > 0);
92    unsigned n = client_ctx->n_files;
93    while (n--)
94        lsquic_conn_make_stream(conn);
95    print_conn_info(conn);
96    return conn_h;
97}
98
99
100static void
101client_on_goaway_received (lsquic_conn_t *conn)
102{
103    LSQ_NOTICE("GOAWAY received");
104}
105
106
107static void
108client_on_conn_closed (lsquic_conn_t *conn)
109{
110    lsquic_conn_ctx_t *conn_h = lsquic_conn_get_ctx(conn);
111    LSQ_NOTICE("Connection closed");
112    prog_stop(conn_h->client_ctx->prog);
113    free(conn_h);
114}
115
116
117struct lsquic_stream_ctx {
118    lsquic_stream_t     *stream;
119    struct client_ctx   *client_ctx;
120    struct file         *file;
121    struct event        *read_stdin_ev;
122    struct {
123        int         initialized;
124        size_t      size,
125                    off;
126    }                    small;
127};
128
129
130static lsquic_stream_ctx_t *
131client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream)
132{
133    struct client_ctx *const client_ctx = stream_if_ctx;
134    if (!stream)
135    {
136        assert(client_ctx->n_files > 0);
137        LSQ_NOTICE("%s: got null stream: no more streams possible; # files: %u",
138                                                 __func__, client_ctx->n_files);
139        --client_ctx->n_files;
140        if (0 == client_ctx->n_files)
141        {
142            LSQ_DEBUG("closing connection");
143            lsquic_conn_close(client_ctx->conn_h->conn);
144        }
145        return NULL;
146    }
147    lsquic_stream_ctx_t *st_h = calloc(1, sizeof(*st_h));
148    st_h->stream = stream;
149    st_h->client_ctx = stream_if_ctx;
150    if (LIST_EMPTY(&st_h->client_ctx->files))
151    {
152        /* XXX: perhaps we should not be able to write immediately: there may
153         * be internal memory constraints...
154         */
155        lsquic_stream_write(stream, "client request", 14);
156        (void) lsquic_stream_flush(stream);
157        lsquic_stream_wantwrite(stream, 0);
158        lsquic_stream_wantread(stream, 1);
159    }
160    else
161    {
162        st_h->file = LIST_FIRST(&st_h->client_ctx->files);
163        if (g_write_file)
164        {
165            st_h->file->fd = -1;
166            st_h->file->reader.lsqr_read = test_reader_read;
167            st_h->file->reader.lsqr_size = test_reader_size;
168            st_h->file->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->file->filename);
169            if (!st_h->file->reader.lsqr_ctx)
170                exit(1);
171        }
172        else
173        {
174            st_h->file->fd = open(st_h->file->filename, O_RDONLY);
175            if (st_h->file->fd < 0)
176            {
177                LSQ_ERROR("could not open %s for reading: %s",
178                          st_h->file->filename, strerror(errno));
179                exit(1);
180            }
181        }
182        LIST_REMOVE(st_h->file, next_file);
183        lsquic_stream_set_priority(stream, st_h->file->priority);
184        lsquic_stream_wantwrite(stream, 1);
185    }
186    return st_h;
187}
188
189
190static size_t
191buf_reader_size (void *reader_ctx)
192{
193    lsquic_stream_ctx_t *const st_h = reader_ctx;
194    struct stat st;
195    off_t off;
196
197    if (st_h->small.initialized)
198        goto initialized;
199
200    if (0 != fstat(st_h->file->fd, &st))
201    {
202        LSQ_ERROR("fstat failed: %s", strerror(errno));
203        goto err;
204    }
205
206    off = lseek(st_h->file->fd, 0, SEEK_CUR);
207    if (off == (off_t) -1)
208    {
209        LSQ_ERROR("lseek failed: %s", strerror(errno));
210        goto err;
211    }
212
213    if (st.st_size < off)
214    {
215        LSQ_ERROR("size mismatch");
216        goto err;
217    }
218
219    st_h->small.initialized = 1;
220    st_h->small.off = off;
221    st_h->small.size = st.st_size;
222
223  initialized:
224    if (st_h->small.size - st_h->small.off > LOCAL_BUF_SIZE)
225        return LOCAL_BUF_SIZE;
226    else
227        return st_h->small.size - st_h->small.off;
228
229  err:
230    close(st_h->file->fd);
231    st_h->file->fd = 0;
232    return 0;
233}
234
235
236static size_t
237buf_reader_read (void *reader_ctx, void *buf, size_t count)
238{
239    lsquic_stream_ctx_t *const st_h = reader_ctx;
240    ssize_t nr;
241    unsigned char local_buf[LOCAL_BUF_SIZE];
242
243    assert(st_h->small.initialized);
244
245    if (count > sizeof(local_buf))
246        count = sizeof(local_buf);
247
248    nr = read(st_h->file->fd, local_buf, count);
249    if (nr < 0)
250    {
251        LSQ_ERROR("read: %s", strerror(errno));
252        close(st_h->file->fd);
253        st_h->file->fd = 0;
254        return 0;
255    }
256
257    memcpy(buf, local_buf, nr);
258    st_h->small.off += nr;
259    return nr;
260}
261
262
263static void
264client_file_on_write_buf (lsquic_stream_ctx_t *st_h)
265{
266    ssize_t nw;
267    struct lsquic_reader reader = {
268        .lsqr_read = buf_reader_read,
269        .lsqr_size = buf_reader_size,
270        .lsqr_ctx  = st_h,
271    };
272
273    if (g_reset_stream.stream_id == lsquic_stream_id(st_h->stream) &&
274        lseek(st_h->file->fd, 0, SEEK_CUR) >= g_reset_stream.offset)
275    {
276        /* Note: this is an internal function */
277        lsquic_stream_maybe_reset(st_h->stream,
278                0x01 /* QUIC_INTERNAL_ERROR */, 1);
279        g_reset_stream.stream_id = 0;   /* Reset only once */
280    }
281
282    nw = lsquic_stream_writef(st_h->stream, &reader);
283    if (-1 == nw)
284    {
285        if (ECONNRESET == errno)
286            st_h->file->file_flags |= FILE_RESET;
287        LSQ_WARN("lsquic_stream_read: %s", strerror(errno));
288        lsquic_stream_close(st_h->stream);
289        return;
290    }
291
292#if RESET_AFTER_N_WRITES
293    static int write_count = 0;
294    if (write_count++ > RESET_AFTER_N_WRITES)
295        lsquic_stream_reset(st_h->stream, 0);
296#endif
297
298    if (0 == nw)
299    {
300        (void) close(st_h->file->fd);
301        if (0 == lsquic_stream_shutdown(st_h->stream, 1))
302            lsquic_stream_wantread(st_h->stream, 1);
303        else
304        {
305            if (ECONNRESET == errno)
306                st_h->file->file_flags |= FILE_RESET;
307            LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno));
308            lsquic_stream_close(st_h->stream);
309        }
310    }
311}
312
313
314static void
315client_file_on_write_efficient (lsquic_stream_t *stream,
316                                                lsquic_stream_ctx_t *st_h)
317{
318    ssize_t nw;
319
320    nw = lsquic_stream_writef(stream, &st_h->file->reader);
321    if (nw < 0)
322    {
323        LSQ_ERROR("write error: %s", strerror(errno));
324        exit(1);
325    }
326    if (nw == 0)
327    {
328        destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx);
329        st_h->file->reader.lsqr_ctx = NULL;
330        if (0 == lsquic_stream_shutdown(st_h->stream, 1))
331            lsquic_stream_wantread(st_h->stream, 1);
332        else
333        {
334            if (ECONNRESET == errno)
335                st_h->file->file_flags |= FILE_RESET;
336            LSQ_WARN("lsquic_stream_shutdown: %s", strerror(errno));
337            lsquic_stream_close(st_h->stream);
338        }
339    }
340}
341
342
343static void
344client_file_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
345{
346    if (g_write_file)
347        client_file_on_write_efficient(stream, st_h);
348    else
349        client_file_on_write_buf(st_h);
350}
351
352
353static void
354client_file_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
355{
356    char buf;
357    /* We expect to read in 32-character MD5 string */
358    size_t ntoread = sizeof(st_h->file->md5str) - st_h->file->md5_off;
359    if (0 == ntoread)
360    {
361        lsquic_stream_wantread(stream, 0);
362        /* XXX What about an error (due to RST_STREAM) here: how are we to
363         *     handle it?
364         */
365        /* Expect a FIN */
366        if (0 == lsquic_stream_read(stream, &buf, sizeof(buf)))
367        {
368            LSQ_NOTICE("%.*s  %s", (int) sizeof(st_h->file->md5str),
369                                                    st_h->file->md5str,
370                                                    st_h->file->filename);
371            fflush(stdout);
372            LSQ_DEBUG("# of files: %d", st_h->client_ctx->n_files);
373            lsquic_stream_shutdown(stream, 0);
374        }
375        else
376            LSQ_ERROR("expected FIN from stream!");
377    }
378    else
379    {
380        ssize_t nr = lsquic_stream_read(stream,
381            st_h->file->md5str + st_h->file->md5_off, ntoread);
382        if (-1 == nr)
383        {
384            if (ECONNRESET == errno)
385                st_h->file->file_flags |= FILE_RESET;
386            LSQ_WARN("lsquic_stream_read: %s", strerror(errno));
387            lsquic_stream_close(stream);
388            return;
389        }
390        else
391            st_h->file->md5_off += nr;
392    }
393}
394
395
396static void
397client_file_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
398{
399    --st_h->client_ctx->n_files;
400    LSQ_NOTICE("%s called for stream %"PRIu64", # files: %u", __func__,
401                        lsquic_stream_id(stream), st_h->client_ctx->n_files);
402    if (0 == st_h->client_ctx->n_files)
403        lsquic_conn_close(st_h->client_ctx->conn_h->conn);
404    if (!(st_h->file->file_flags & FILE_RESET) && 0 == RESET_AFTER_N_WRITES)
405        assert(st_h->file->md5_off == sizeof(st_h->file->md5str));
406    if (st_h->file->reader.lsqr_ctx)
407    {
408        destroy_lsquic_reader_ctx(st_h->file->reader.lsqr_ctx);
409        st_h->file->reader.lsqr_ctx = NULL;
410    }
411    if (st_h->file->fd >= 0)
412        (void) close(st_h->file->fd);
413    free(st_h->file);
414    free(st_h);
415}
416
417
418const struct lsquic_stream_if client_file_stream_if = {
419    .on_new_conn            = client_on_new_conn,
420    .on_goaway_received     = client_on_goaway_received,
421    .on_conn_closed         = client_on_conn_closed,
422    .on_new_stream          = client_on_new_stream,
423    .on_read                = client_file_on_read,
424    .on_write               = client_file_on_write,
425    .on_close               = client_file_on_close,
426};
427
428
429static void
430usage (const char *prog)
431{
432    const char *const slash = strrchr(prog, '/');
433    if (slash)
434        prog = slash + 1;
435    printf(
436"Usage: %s [opts]\n"
437"\n"
438"Options:\n"
439"   -f FILE     File to send to the server -- must be specified at least\n"
440"                 once.\n"
441"   -b          Use buffering API for sending files over rather than\n"
442"                 the efficient version.\n"
443"   -p PRIORITY Applicatble to previous file specified with -f\n"
444"   -r STREAM_ID:OFFSET\n"
445"               Reset stream STREAM_ID after sending more that OFFSET bytes.\n"
446            , prog);
447}
448
449
450int
451main (int argc, char **argv)
452{
453    int opt, s;
454    struct sport_head sports;
455    struct prog prog;
456    struct client_ctx client_ctx;
457    struct file *file;
458
459    file = NULL;
460    memset(&client_ctx, 0, sizeof(client_ctx));
461    client_ctx.prog = &prog;
462
463    TAILQ_INIT(&sports);
464    prog_init(&prog, 0, &sports, &client_file_stream_if, &client_ctx);
465    prog.prog_api.ea_alpn = "md5";
466
467    while (-1 != (opt = getopt(argc, argv, PROG_OPTS "bhr:f:p:")))
468    {
469        switch (opt) {
470        case 'p':
471            if (file)
472                file->priority = atoi(optarg);
473            else
474            {
475                fprintf(stderr, "No file to apply priority to\n");
476                exit(1);
477            }
478            break;
479        case 'b':
480            g_write_file = 0;
481            break;
482        case 'f':
483            file = calloc(1, sizeof(*file));
484            LIST_INSERT_HEAD(&client_ctx.files, file, next_file);
485            ++client_ctx.n_files;
486            file->filename = optarg;
487            break;
488        case 'r':
489            g_reset_stream.stream_id = atoi(optarg);
490            g_reset_stream.offset = atoi(strchr(optarg, ':') + 1);
491            break;
492        case 'h':
493            usage(argv[0]);
494            prog_print_common_options(&prog, stdout);
495            exit(0);
496        default:
497            if (0 != prog_set_opt(&prog, opt, optarg))
498                exit(1);
499        }
500    }
501
502    if (LIST_EMPTY(&client_ctx.files))
503    {
504        fprintf(stderr, "please specify one of more files using -f\n");
505        exit(1);
506    }
507
508    if (0 != prog_prep(&prog))
509    {
510        LSQ_ERROR("could not prep");
511        exit(EXIT_FAILURE);
512    }
513    client_ctx.sport = TAILQ_FIRST(&sports);
514
515    if (0 != prog_connect(&prog, NULL, 0))
516    {
517        LSQ_ERROR("could not connect");
518        exit(EXIT_FAILURE);
519    }
520
521    LSQ_DEBUG("entering event loop");
522
523    s = prog_run(&prog);
524    prog_cleanup(&prog);
525
526    exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE);
527}
528