lsquic_qenc_hdl.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#include "lsquic.h"
14#include "lsquic_types.h"
15#include "lsquic_int_types.h"
16#include "lsquic_sfcw.h"
17#include "lsquic_varint.h"
18#include "lsquic_hq.h"
19#include "lsquic_hash.h"
20#include "lsquic_stream.h"
21#include "lsquic_frab_list.h"
22#include "lsqpack.h"
23#include "lsquic_conn.h"
24#include "lsquic_qenc_hdl.h"
25
26#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL
27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn)
28#include "lsquic_logger.h"
29
30
31static int
32qeh_write_type (struct qpack_enc_hdl *qeh)
33{
34    int s;
35
36#ifndef NDEBUG
37    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
38    if (env && atoi(env))
39    {
40        s = rand() & 3;
41        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
42    }
43    else
44#endif
45        s = 0;
46
47    switch (s)
48    {
49    case 0:
50        return lsquic_frab_list_write(&qeh->qeh_fral,
51                                (unsigned char []) { HQUST_QPACK_ENC }, 1);
52    case 1:
53        return lsquic_frab_list_write(&qeh->qeh_fral,
54                            (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2);
55    case 2:
56        return lsquic_frab_list_write(&qeh->qeh_fral,
57                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4);
58    default:
59        return lsquic_frab_list_write(&qeh->qeh_fral,
60                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
61                                                        HQUST_QPACK_ENC }, 8);
62    }
63}
64
65
66static void
67qeh_begin_out (struct qpack_enc_hdl *qeh)
68{
69    if (0 == qeh_write_type(qeh)
70        && (qeh->qeh_tsu_sz == 0
71            || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf,
72                                                            qeh->qeh_tsu_sz)))
73    {
74        LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz);
75        lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1);
76    }
77    else
78    {
79        LSQ_WARN("could not write to frab list");
80        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
81                                            "cannot write to frab list");
82    }
83}
84
85
86void
87lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn)
88{
89    assert(!(qeh->qeh_flags & QEH_INITIALIZED));
90    qeh->qeh_conn = conn;
91    lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL);
92    lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn);
93    qeh->qeh_flags |= QEH_INITIALIZED;
94    qeh->qeh_max_prefix_size =
95                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
96    if (qeh->qeh_dec_sm_in)
97        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
98    LSQ_DEBUG("initialized");
99}
100
101
102int
103lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size,
104             unsigned dyn_table_size, unsigned max_risked_streams, int server)
105{
106    enum lsqpack_enc_opts enc_opts;
107
108    assert(qeh->qeh_flags & QEH_INITIALIZED);
109
110    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
111    {
112        LSQ_WARN("settings already set");
113        return -1;
114    }
115
116    enc_opts = LSQPACK_ENC_OPT_STAGE_2
117             | (server ? LSQPACK_ENC_OPT_SERVER : 0);
118    qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf);
119    if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn,
120                max_table_size, dyn_table_size, max_risked_streams, enc_opts,
121                qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz))
122    {
123        LSQ_INFO("could not initialize QPACK encoder");
124        return -1;
125    }
126    LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz);
127    qeh->qeh_flags |= QEH_HAVE_SETTINGS;
128    qeh->qeh_max_prefix_size =
129                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
130    LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked "
131        "streams=%u", max_table_size, dyn_table_size, max_risked_streams);
132    if (qeh->qeh_enc_sm_out)
133        qeh_begin_out(qeh);
134    return 0;
135}
136
137
138void
139lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh)
140{
141    if (qeh->qeh_flags & QEH_INITIALIZED)
142    {
143        LSQ_DEBUG("cleanup");
144        lsqpack_enc_cleanup(&qeh->qeh_encoder);
145        lsquic_frab_list_cleanup(&qeh->qeh_fral);
146        memset(qeh, 0, sizeof(*qeh));
147    }
148}
149
150static lsquic_stream_ctx_t *
151qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
152{
153    struct qpack_enc_hdl *const qeh = stream_if_ctx;
154    qeh->qeh_enc_sm_out = stream;
155    if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
156                                    == (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
157        qeh_begin_out(qeh);
158    else
159        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
160    LSQ_DEBUG("initialized outgoing encoder stream");
161    return (void *) qeh;
162}
163
164
165static void
166qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
167{
168    struct qpack_enc_hdl *const qeh = (void *) ctx;
169    struct lsquic_reader reader = {
170        .lsqr_read  = lsquic_frab_list_read,
171        .lsqr_size  = lsquic_frab_list_size,
172        .lsqr_ctx   = &qeh->qeh_fral,
173    };
174    ssize_t nw;
175
176    nw = lsquic_stream_writef(stream, &reader);
177    if (nw >= 0)
178    {
179        LSQ_DEBUG("wrote %zd bytes to stream", nw);
180        (void) lsquic_stream_flush(stream);
181        if (lsquic_frab_list_empty(&qeh->qeh_fral))
182            lsquic_stream_wantwrite(stream, 0);
183    }
184    else
185    {
186        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
187                                            "cannot write to stream");
188        LSQ_WARN("cannot write to stream: %s", strerror(errno));
189        lsquic_stream_wantwrite(stream, 0);
190    }
191}
192
193
194static void
195qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
196{
197    struct qpack_enc_hdl *const qeh = (void *) ctx;
198    qeh->qeh_enc_sm_out = NULL;
199    LSQ_DEBUG("closed outgoing encoder stream");
200}
201
202
203static void
204qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
205{
206    assert(0);
207}
208
209
210static const struct lsquic_stream_if qeh_enc_sm_out_if =
211{
212    .on_new_stream  = qeh_out_on_new,
213    .on_read        = qeh_out_on_read,
214    .on_write       = qeh_out_on_write,
215    .on_close       = qeh_out_on_close,
216};
217const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if =
218                                                    &qeh_enc_sm_out_if;
219
220
221static lsquic_stream_ctx_t *
222qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
223{
224    struct qpack_enc_hdl *const qeh = stream_if_ctx;
225    qeh->qeh_dec_sm_in = stream;
226    if (qeh->qeh_flags & QEH_INITIALIZED)
227        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
228    else
229        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
230    LSQ_DEBUG("initialized incoming decoder stream");
231    return (void *) qeh;
232}
233
234
235static size_t
236qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz,
237                                                                    int fin)
238{
239    struct qpack_enc_hdl *const qeh = (void *) ctx;
240    uint64_t offset;
241    int s;
242
243    if (fin)
244    {
245        LSQ_INFO("decoder stream is closed");
246        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
247            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream");
248        goto end;
249    }
250
251    offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in);
252    s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz);
253    if (s != 0)
254    {
255        LSQ_INFO("error reading decoder stream");
256        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
257            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder "
258            "stream at offset %"PRIu64, offset);
259        goto end;
260    }
261    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
262
263  end:
264    return sz;
265}
266
267
268static void
269qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
270{
271    struct qpack_enc_hdl *const qeh = (void *) ctx;
272    ssize_t nread;
273
274    nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh);
275    if (nread <= 0)
276    {
277        if (nread < 0)
278        {
279            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
280            qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
281                                        "cannot read from encoder stream");
282        }
283        else
284        {
285            LSQ_INFO("encoder stream closed by peer: abort connection");
286            qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
287                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
288        }
289        lsquic_stream_wantread(stream, 0);
290    }
291}
292
293
294static void
295qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
296{
297    struct qpack_enc_hdl *const qeh = (void *) ctx;
298    LSQ_DEBUG("closed incoming decoder stream");
299    qeh->qeh_dec_sm_in = NULL;
300}
301
302
303static void
304qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
305{
306    assert(0);
307}
308
309
310static const struct lsquic_stream_if qeh_dec_sm_in_if =
311{
312    .on_new_stream  = qeh_in_on_new,
313    .on_read        = qeh_in_on_read,
314    .on_write       = qeh_in_on_write,
315    .on_close       = qeh_in_on_close,
316};
317const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if =
318                                                    &qeh_dec_sm_in_if;
319
320
321static enum qwh_status
322qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id,
323    unsigned seqno, const struct lsquic_http_headers *headers,
324    unsigned char *buf, size_t *prefix_sz, size_t *headers_sz,
325    uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags)
326{
327    unsigned char *p = buf;
328    unsigned char *const end = buf + *headers_sz;
329    const unsigned char *enc_p;
330    size_t enc_sz, hea_sz, total_enc_sz;
331    ssize_t nw;
332    enum lsqpack_enc_status st;
333    int i, s, write_to_stream;
334    enum lsqpack_enc_flags enc_flags;
335    unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ];
336
337    s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0);
338    if (s != 0)
339    {
340        LSQ_WARN("cannot start header");
341        return QWH_ERR;
342    }
343    LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id);
344
345    if (qeh->qeh_enc_sm_out)
346        enc_flags = 0;
347    else
348    {
349        enc_flags = LQEF_NO_INDEX;
350        LSQ_DEBUG("encoder stream is unavailable, won't index headers");
351    }
352    write_to_stream = qeh->qeh_enc_sm_out
353                                && lsquic_frab_list_empty(&qeh->qeh_fral);
354    total_enc_sz = 0;
355    for (i = 0; i < headers->count; ++i)
356    {
357        enc_sz = sizeof(enc_buf);
358        hea_sz = end - p;
359        st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p,
360                    &hea_sz, headers->headers[i].name.iov_base,
361                    headers->headers[i].name.iov_len,
362                    headers->headers[i].value.iov_base,
363                    headers->headers[i].value.iov_len, enc_flags);
364        switch (st)
365        {
366        case LQES_OK:
367            LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, "
368                "%zd bytes to encoder stream",
369                (int) headers->headers[i].name.iov_len,
370                                    (char *) headers->headers[i].name.iov_base,
371                (int) headers->headers[i].value.iov_len,
372                                    (char *) headers->headers[i].value.iov_base,
373                hea_sz, enc_sz);
374            total_enc_sz += enc_sz;
375            p += hea_sz;
376            if (enc_sz)
377            {
378                if (write_to_stream)
379                {
380                    nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz);
381                    if ((size_t) nw == enc_sz)
382                        break;
383                    if (nw < 0)
384                    {
385                        LSQ_INFO("could not write to encoder stream: %s",
386                                                                strerror(errno));
387                        return QWH_ERR;
388                    }
389                    write_to_stream = 0;
390                    enc_p = enc_buf + (size_t) nw;
391                    enc_sz -= (size_t) nw;
392                }
393                else
394                    enc_p = enc_buf;
395                if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz))
396                {
397                    LSQ_INFO("could not write to frab list");
398                    return QWH_ERR;
399                }
400            }
401            break;
402        case LQES_NOBUF_HEAD:
403            return QWH_ENOBUF;
404        default:
405            assert(0);
406            return QWH_ERR;
407        case LQES_NOBUF_ENC:
408            LSQ_DEBUG("not enough room to write encoder stream data");
409            return QWH_ERR;
410        }
411    }
412
413    nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz,
414                                                        *prefix_sz, hflags);
415    if (nw <= 0)
416    {
417        LSQ_WARN("could not end header: %zd", nw);
418        return QWH_ERR;
419    }
420
421    if ((size_t) nw < *prefix_sz)
422    {
423        memmove(buf - nw, buf - *prefix_sz, (size_t) nw);
424        *prefix_sz = (size_t) nw;
425    }
426    *headers_sz = p - buf;
427    if (lsquic_frab_list_empty(&qeh->qeh_fral))
428    {
429        LSQ_DEBUG("all %zd bytes of encoder stream written out; header block "
430            "is %zd bytes; estimated compression ratio %.3f", total_enc_sz,
431            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
432        return QWH_FULL;
433    }
434    else
435    {
436        *completion_offset = lsquic_qeh_enc_off(qeh)
437                                    + lsquic_frab_list_size(&qeh->qeh_fral);
438        LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes "
439            "buffered; header block is %zd bytes; estimated compression ratio "
440            "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral),
441            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
442        return QWH_PARTIAL;
443    }
444}
445
446
447#if !defined(NDEBUG) && __GNUC__
448__attribute__((weak))
449#endif
450enum qwh_status
451lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh,
452    lsquic_stream_id_t stream_id, unsigned seqno,
453    const struct lsquic_http_headers *headers, unsigned char *buf,
454    size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset,
455    enum lsqpack_enc_header_flags *hflags)
456{
457    if (qeh->qeh_flags & QEH_INITIALIZED)
458        return qeh_write_headers(qeh, stream_id, seqno, headers, buf,
459                        prefix_sz, headers_sz, completion_offset, hflags);
460    else
461        return QWH_ERR;
462}
463
464
465#if !defined(NDEBUG) && __GNUC__
466__attribute__((weak))
467#endif
468uint64_t
469lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh)
470{
471    if (qeh->qeh_enc_sm_out)
472        return qeh->qeh_enc_sm_out->tosend_off;
473    else
474        return 0;
475}
476
477
478size_t
479lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh)
480{
481    if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out)
482        return lsquic_stream_write_avail(qeh->qeh_enc_sm_out);
483    else if (qeh->qeh_flags & QEH_INITIALIZED)
484        return ~((size_t) 0);   /* Unlimited write */
485    else
486        return 0;
487}
488
489
490size_t
491lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh)
492{
493    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
494        return qeh->qeh_max_prefix_size;
495    else
496        return LSQPACK_UINT64_ENC_SZ * 2;
497}
498