perf_client.c revision 8ecb980d
1/* Copyright (c) 2017 - 2021 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * perf_client.c -- Implements the "perf" client, see
4 *      https://tools.ietf.org/html/draft-banks-quic-performance-00
5 */
6
7#include <assert.h>
8#include <errno.h>
9#include <inttypes.h>
10#include <stdbool.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <inttypes.h>
14#include <string.h>
15#include <sys/queue.h>
16#include <sys/types.h>
17#include <sys/stat.h>
18
19#ifndef WIN32
20#include <unistd.h>
21#include <fcntl.h>
22#else
23#include "vc_compat.h"
24#include "getopt.h"
25#endif
26
27#include <event2/event.h>
28
29#include "lsquic.h"
30#include "test_common.h"
31#include "prog.h"
32
33#include "../src/liblsquic/lsquic_logger.h"
34#include "../src/liblsquic/lsquic_int_types.h"
35#include "../src/liblsquic/lsquic_byteswap.h"
36
37struct scenario
38{
39    STAILQ_ENTRY(scenario)  next;
40    uint64_t                bytes_to_request;
41    uint64_t                bytes_to_send;      /* After the 8-byte header */
42};
43
44/* Assume all connections use the same list of scenarios, so store it in
45 * a global variable.
46 */
47static STAILQ_HEAD(, scenario) s_scenarios
48                                    = STAILQ_HEAD_INITIALIZER(s_scenarios);
49static unsigned s_n_scenarios;
50static unsigned s_n_conns;
51
52struct prog s_prog;
53
54struct lsquic_conn_ctx
55{
56    /* Once a connection runs out of scenarios, no new streams are created
57     * and the connection is closed when all streams are closed.
58     */
59    const struct scenario  *next_scenario;
60    unsigned                n_scenarios_left;
61    unsigned                n_streams;
62};
63
64
65static bool
66perf_create_streams (struct lsquic_conn *conn, struct lsquic_conn_ctx *conn_ctx)
67{
68    if (conn_ctx->n_scenarios_left)
69    {
70        --conn_ctx->n_scenarios_left;
71        lsquic_conn_make_stream(conn);
72        return true;
73    }
74    else
75        return false;
76}
77
78
79static lsquic_conn_ctx_t *
80perf_client_on_new_conn (void *stream_if_ctx, struct lsquic_conn *conn)
81{
82    struct lsquic_conn_ctx *conn_ctx;
83
84    if (s_n_scenarios)
85    {
86        conn_ctx = calloc(1, sizeof(*conn_ctx));
87        conn_ctx->next_scenario = STAILQ_FIRST(&s_scenarios);
88        conn_ctx->n_scenarios_left = s_n_scenarios;
89        perf_create_streams(conn, conn_ctx);
90        ++s_n_conns;
91        return conn_ctx;
92    }
93    else
94    {
95        lsquic_conn_close(conn);
96        return NULL;
97    }
98}
99
100
101static void
102perf_client_on_conn_closed (struct lsquic_conn *conn)
103{
104    struct lsquic_conn_ctx *conn_ctx;
105
106    LSQ_NOTICE("Connection closed");
107    conn_ctx = lsquic_conn_get_ctx(conn);
108    free(conn_ctx);
109    --s_n_conns;
110    if (0 == s_n_conns)
111        prog_stop(&s_prog);
112}
113
114
115struct lsquic_stream_ctx
116{
117    const struct scenario  *scenario;
118    struct {
119        uint64_t        header;     /* Big-endian */
120        unsigned        n_h;        /* Number of header bytes written */
121        uint64_t        n_written;  /* Number of non-header bytes written */
122    }                       write_state;
123    struct {
124        uint64_t        n_read;
125    }                       read_state;
126};
127
128
129static struct lsquic_stream_ctx *
130perf_client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream)
131{
132    struct lsquic_conn_ctx *conn_ctx;
133    struct lsquic_conn *conn;
134
135    conn = lsquic_stream_conn(stream);
136    conn_ctx = lsquic_conn_get_ctx(conn);
137
138    if (!stream)
139    {
140        LSQ_NOTICE("%s: got null stream: no more streams possible", __func__);
141        lsquic_conn_close(conn);
142        return NULL;
143    }
144
145    assert(conn_ctx->next_scenario);
146
147    struct lsquic_stream_ctx *stream_ctx = calloc(1, sizeof(*stream_ctx));
148    stream_ctx->scenario = conn_ctx->next_scenario;
149    conn_ctx->next_scenario = STAILQ_NEXT(conn_ctx->next_scenario, next);
150#if __BYTE_ORDER == __LITTLE_ENDIAN
151    stream_ctx->write_state.header
152                        = bswap_64(stream_ctx->scenario->bytes_to_request);
153#else
154    stream_ctx->write_state.header = stream_ctx->scenario->bytes_to_request;
155#endif
156    lsquic_stream_wantwrite(stream, 1);
157    return stream_ctx;
158}
159
160
161static size_t
162buffer_size (void *lsqr_ctx)
163{
164    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
165    return stream_ctx->scenario->bytes_to_send
166                                        - stream_ctx->write_state.n_written;
167}
168
169
170static size_t
171buffer_read (void *lsqr_ctx, void *buf, size_t count)
172{
173    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
174    size_t left;
175
176    left = buffer_size(stream_ctx);
177    if (count > left)
178        count = left;
179    memset(buf, 0, count);
180    stream_ctx->write_state.n_written += count;
181    return count;
182}
183
184
185static size_t
186header_size (void *lsqr_ctx)
187{
188    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
189    return sizeof(uint64_t) - stream_ctx->write_state.n_h;
190}
191
192
193static size_t
194header_read (void *lsqr_ctx, void *buf, size_t count)
195{
196    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
197    const unsigned char *src;
198    size_t left;
199
200    left = header_size(stream_ctx);
201    if (count < left)
202        count = left;
203    src = (unsigned char *) &stream_ctx->write_state.header
204                + sizeof(uint64_t) - left;
205    memcpy(buf, src, count);
206    stream_ctx->write_state.n_h += count;
207    return count;
208}
209
210
211static void
212perf_client_on_write (struct lsquic_stream *stream,
213                                        struct lsquic_stream_ctx *stream_ctx)
214{
215    struct lsquic_reader reader;
216    ssize_t nw;
217
218    if (stream_ctx->write_state.n_h >= sizeof(uint64_t))
219        reader = (struct lsquic_reader) {
220            buffer_read,
221            buffer_size,
222            stream_ctx,
223        };
224    else
225        reader = (struct lsquic_reader) {
226            header_read,
227            header_size,
228            stream_ctx,
229        };
230
231    nw = lsquic_stream_writef(stream, &reader);
232    if (nw >= 0)
233        LSQ_DEBUG("%s: wrote %zd bytes", __func__, nw);
234    else
235        LSQ_WARN("%s: cannot write to stream: %s", __func__, strerror(errno));
236
237    if (reader.lsqr_size(stream_ctx) == 0
238        && (reader.lsqr_size == buffer_size || buffer_size(stream_ctx) == 0))
239    {
240        lsquic_stream_shutdown(stream, 1);
241        lsquic_stream_wantread(stream, 1);
242    }
243}
244
245
246static size_t
247perf_read_and_discard (void *user_data, const unsigned char *buf,
248                                                        size_t count, int fin)
249{
250    return count;
251}
252
253
254static void
255perf_client_on_read (struct lsquic_stream *stream,
256                                        struct lsquic_stream_ctx *stream_ctx)
257{
258    ssize_t nr;
259
260    nr = lsquic_stream_readf(stream, perf_read_and_discard, NULL);
261    if (nr >= 0)
262    {
263        stream_ctx->read_state.n_read += nr;
264        if (nr == 0)
265        {
266            LSQ_DEBUG("reached fin after reading %"PRIu64" bytes from server",
267                stream_ctx->read_state.n_read);
268            lsquic_stream_shutdown(stream, 0);
269        }
270    }
271    else
272    {
273        LSQ_WARN("error reading from stream: %s, abort connection",
274                                                        strerror(errno));
275        lsquic_stream_close(stream);
276        lsquic_conn_abort(lsquic_stream_conn(stream));
277    }
278}
279
280
281static void
282perf_client_on_close (struct lsquic_stream *stream,
283                                        struct lsquic_stream_ctx *stream_ctx)
284{
285    struct lsquic_conn_ctx *conn_ctx;
286    struct lsquic_conn *conn;
287
288    conn = lsquic_stream_conn(stream);
289    conn_ctx = lsquic_conn_get_ctx(conn);
290    if (!perf_create_streams(conn, conn_ctx))
291    {
292        LSQ_DEBUG("out of scenarios, will close connection");
293        lsquic_conn_close(conn);
294    }
295    free(stream_ctx);
296}
297
298
299const struct lsquic_stream_if perf_stream_if = {
300    .on_new_conn            = perf_client_on_new_conn,
301    .on_conn_closed         = perf_client_on_conn_closed,
302    .on_new_stream          = perf_client_on_new_stream,
303    .on_read                = perf_client_on_read,
304    .on_write               = perf_client_on_write,
305    .on_close               = perf_client_on_close,
306};
307
308
309static void
310usage (const char *prog)
311{
312    const char *const slash = strrchr(prog, '/');
313    if (slash)
314        prog = slash + 1;
315    printf(
316"Usage: %s [opts]\n"
317"\n"
318"Options:\n"
319"   -p NREQ:NSEND   Request NREQ bytes from server and, in addition, send\n"
320"                     NSEND bytes to server.  May be specified many times\n"
321"                     and must be specified at least once.\n"
322"   -T FILE     Print stats to FILE.  If FILE is -, print stats to stdout.\n"
323            , prog);
324}
325
326
327int
328main (int argc, char **argv)
329{
330    char *p;
331    int opt, s;
332    struct sport_head sports;
333    struct scenario *scenario;
334
335    TAILQ_INIT(&sports);
336    prog_init(&s_prog, 0, &sports, &perf_stream_if, NULL);
337    s_prog.prog_api.ea_alpn = "perf";
338
339    while (-1 != (opt = getopt(argc, argv, PROG_OPTS "hp:T:")))
340    {
341        switch (opt) {
342        case 'p':
343            scenario = calloc(1, sizeof(*scenario));
344            if (!scenario)
345            {
346                perror("calloc");
347                exit(EXIT_FAILURE);
348            }
349            ++s_n_scenarios;
350            STAILQ_INSERT_TAIL(&s_scenarios, scenario, next);
351            scenario->bytes_to_request = strtoull(optarg, &p, 10);
352            if (*p != ':')
353            {
354                fprintf(stderr, "invalid scenario `%s'\n", optarg);
355                exit(EXIT_FAILURE);
356            }
357            scenario->bytes_to_send = strtoull(p + 1, NULL, 10);
358            break;
359        case 'T':
360            if (0 == strcmp(optarg, "-"))
361                s_prog.prog_api.ea_stats_fh = stdout;
362            else
363            {
364                s_prog.prog_api.ea_stats_fh = fopen(optarg, "w");
365                if (!s_prog.prog_api.ea_stats_fh)
366                {
367                    perror("fopen");
368                    exit(1);
369                }
370            }
371            break;
372        case 'h':
373            usage(argv[0]);
374            prog_print_common_options(&s_prog, stdout);
375            exit(0);
376        default:
377            if (0 != prog_set_opt(&s_prog, opt, optarg))
378                exit(1);
379        }
380    }
381
382    if (STAILQ_EMPTY(&s_scenarios))
383    {
384        fprintf(stderr, "please specify one of more requests using -p\n");
385        exit(1);
386    }
387
388    if (0 != prog_prep(&s_prog))
389    {
390        LSQ_ERROR("could not prep");
391        exit(EXIT_FAILURE);
392    }
393
394    if (0 != prog_connect(&s_prog, NULL, 0))
395    {
396        LSQ_ERROR("could not connect");
397        exit(EXIT_FAILURE);
398    }
399
400    LSQ_DEBUG("entering event loop");
401
402    s = prog_run(&s_prog);
403    prog_cleanup(&s_prog);
404
405    exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE);
406}
407