perf_server.c revision 8ecb980d
1/* Copyright (c) 2017 - 2021 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * perf_server.c -- Implements the "perf" server, see
4 *      https://tools.ietf.org/html/draft-banks-quic-performance-00
5 */
6
7#include <assert.h>
8#include <errno.h>
9#include <stdio.h>
10#include <inttypes.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <time.h>
15
16#ifndef WIN32
17#include <unistd.h>
18#include <fcntl.h>
19#else
20#include "vc_compat.h"
21#include "getopt.h"
22#endif
23
24#include <event2/event.h>
25
26#include "lsquic.h"
27#include "test_common.h"
28#include "../src/liblsquic/lsquic_hash.h"
29#include "test_cert.h"
30#include "prog.h"
31
32#include "../src/liblsquic/lsquic_byteswap.h"
33#include "../src/liblsquic/lsquic_logger.h"
34
35
36static lsquic_conn_ctx_t *
37perf_server_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
38{
39    LSQ_INFO("New connection!");
40    return NULL;
41}
42
43
44static void
45perf_server_on_conn_closed (lsquic_conn_t *conn)
46{
47    LSQ_INFO("Connection closed");
48}
49
50
51struct lsquic_stream_ctx
52{
53    union {
54        uint64_t        left;   /* Number of bytes left to write */
55        unsigned char   buf[sizeof(uint64_t)];  /* Read client header in */
56    }                   u;
57    unsigned            n_h_read;   /* Number of header bytes read in */
58};
59
60
61static struct lsquic_stream_ctx *
62perf_server_on_new_stream (void *unused, struct lsquic_stream *stream)
63{
64    struct lsquic_stream_ctx *stream_ctx;
65
66    stream_ctx = calloc(1, sizeof(*stream_ctx));
67    if (stream_ctx)
68    {
69        lsquic_stream_wantread(stream, 1);
70        return stream_ctx;
71    }
72    else
73    {
74        perror("calloc");
75        exit(EXIT_FAILURE);
76    }
77}
78
79
80static size_t
81perf_read_and_discard (void *user_data, const unsigned char *buf,
82                                                        size_t count, int fin)
83{
84    return count;
85}
86
87
88static void
89perf_server_on_read (struct lsquic_stream *stream,
90                                        struct lsquic_stream_ctx *stream_ctx)
91{
92    ssize_t nr;
93    size_t toread;
94
95    if (stream_ctx->n_h_read < sizeof(stream_ctx->u.buf))
96    {
97        /* Read the header */
98        toread = sizeof(stream_ctx->u.buf) - stream_ctx->n_h_read;
99        nr = lsquic_stream_read(stream, stream_ctx->u.buf
100                            + sizeof(stream_ctx->u.buf) - toread, toread);
101        if (nr > 0)
102        {
103            stream_ctx->n_h_read += nr;
104            if (stream_ctx->n_h_read == sizeof(stream_ctx->u.left))
105            {
106#if __BYTE_ORDER == __LITTLE_ENDIAN
107                stream_ctx->u.left = bswap_64(stream_ctx->u.left);
108#endif
109                LSQ_INFO("client requests %"PRIu64" bytes on stream %"PRIu64,
110                    stream_ctx->u.left, lsquic_stream_id(stream));
111            }
112        }
113        else if (nr < 0)
114        {
115            LSQ_WARN("error reading from stream: %s", strerror(errno));
116            lsquic_stream_close(stream);
117        }
118        else
119        {
120            LSQ_WARN("incomplete header on stream %"PRIu64", abort connection",
121                lsquic_stream_id(stream));
122            lsquic_stream_wantread(stream, 0);
123            lsquic_conn_abort(lsquic_stream_conn(stream));
124        }
125    }
126    else
127    {
128        /* Read up until FIN, discarding whatever the client is sending */
129        nr = lsquic_stream_readf(stream, perf_read_and_discard, NULL);
130        if (nr == 0)
131        {
132            lsquic_stream_wantread(stream, 0);
133            lsquic_stream_wantwrite(stream, 1);
134        }
135        else if (nr < 0)
136        {
137            LSQ_WARN("error reading from stream: %s", strerror(errno));
138            lsquic_stream_close(stream);
139        }
140    }
141}
142
143
144static size_t
145buffer_size (void *lsqr_ctx)
146{
147    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
148    return stream_ctx->u.left;
149}
150
151
152static size_t
153buffer_read (void *lsqr_ctx, void *buf, size_t count)
154{
155    struct lsquic_stream_ctx *const stream_ctx = lsqr_ctx;
156    size_t left;
157
158    left = buffer_size(stream_ctx);
159    if (count > left)
160        count = left;
161    memset(buf, 0, count);
162    stream_ctx->u.left -= count;
163    return count;
164}
165
166
167static void
168perf_server_on_write (struct lsquic_stream *stream,
169                                        struct lsquic_stream_ctx *stream_ctx)
170{
171    struct lsquic_reader reader;
172    ssize_t nw;
173
174    reader = (struct lsquic_reader) { buffer_read, buffer_size, stream_ctx, };
175    nw = lsquic_stream_writef(stream, &reader);
176    if (nw >= 0)
177        LSQ_DEBUG("%s: wrote %zd bytes", __func__, nw);
178    else
179        LSQ_WARN("%s: cannot write to stream: %s", __func__, strerror(errno));
180
181    if (stream_ctx->u.left == 0)
182        lsquic_stream_shutdown(stream, 1);
183}
184
185
186static void
187perf_server_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *stream_ctx)
188{
189    LSQ_DEBUG("stream closed");
190    free(stream_ctx);
191}
192
193
194const struct lsquic_stream_if perf_server_stream_if = {
195    .on_new_conn            = perf_server_on_new_conn,
196    .on_conn_closed         = perf_server_on_conn_closed,
197    .on_new_stream          = perf_server_on_new_stream,
198    .on_read                = perf_server_on_read,
199    .on_write               = perf_server_on_write,
200    .on_close               = perf_server_on_close,
201};
202
203
204static void
205usage (const char *prog)
206{
207    const char *const slash = strrchr(prog, '/');
208    if (slash)
209        prog = slash + 1;
210    printf(
211"Usage: %s [opts]\n"
212"\n"
213                , prog);
214}
215
216
217int
218main (int argc, char **argv)
219{
220    int opt, s;
221    struct prog prog;
222    struct sport_head sports;
223
224    TAILQ_INIT(&sports);
225    prog_init(&prog, LSENG_SERVER, &sports, &perf_server_stream_if, NULL);
226
227    while (-1 != (opt = getopt(argc, argv, PROG_OPTS "h")))
228    {
229        switch (opt) {
230        case 'h':
231            usage(argv[0]);
232            prog_print_common_options(&prog, stdout);
233            exit(0);
234        default:
235            if (0 != prog_set_opt(&prog, opt, optarg))
236                exit(1);
237        }
238    }
239
240    add_alpn("perf");
241    if (0 != prog_prep(&prog))
242    {
243        LSQ_ERROR("could not prep");
244        exit(EXIT_FAILURE);
245    }
246
247    LSQ_DEBUG("entering event loop");
248
249    s = prog_run(&prog);
250    prog_cleanup(&prog);
251
252    exit(0 == s ? EXIT_SUCCESS : EXIT_FAILURE);
253}
254