lsquic_qenc_hdl.c revision fb3e20e0
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#ifdef WIN32
14#include <malloc.h>
15#endif
16
17#include "lsquic.h"
18#include "lsquic_types.h"
19#include "lsquic_int_types.h"
20#include "lsquic_sfcw.h"
21#include "lsquic_varint.h"
22#include "lsquic_hq.h"
23#include "lsquic_hash.h"
24#include "lsquic_stream.h"
25#include "lsquic_frab_list.h"
26#include "lsqpack.h"
27#include "lsxpack_header.h"
28#include "lsquic_conn.h"
29#include "lsquic_qenc_hdl.h"
30
31#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL
32#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn)
33#include "lsquic_logger.h"
34
35
36static int
37qeh_write_type (struct qpack_enc_hdl *qeh)
38{
39    int s;
40
41#ifndef NDEBUG
42    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
43    if (env && atoi(env))
44    {
45        s = rand() & 3;
46        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
47    }
48    else
49#endif
50        s = 0;
51
52    switch (s)
53    {
54    case 0:
55        return lsquic_frab_list_write(&qeh->qeh_fral,
56                                (unsigned char []) { HQUST_QPACK_ENC }, 1);
57    case 1:
58        return lsquic_frab_list_write(&qeh->qeh_fral,
59                            (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2);
60    case 2:
61        return lsquic_frab_list_write(&qeh->qeh_fral,
62                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4);
63    default:
64        return lsquic_frab_list_write(&qeh->qeh_fral,
65                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
66                                                        HQUST_QPACK_ENC }, 8);
67    }
68}
69
70
71static void
72qeh_begin_out (struct qpack_enc_hdl *qeh)
73{
74    if (0 == qeh_write_type(qeh)
75        && (qeh->qeh_tsu_sz == 0
76            || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf,
77                                                            qeh->qeh_tsu_sz)))
78    {
79        LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz);
80        lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1);
81    }
82    else
83    {
84        LSQ_WARN("could not write to frab list");
85        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
86                                            "cannot write to frab list");
87    }
88}
89
90
91void
92lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn)
93{
94    assert(!(qeh->qeh_flags & QEH_INITIALIZED));
95    qeh->qeh_conn = conn;
96    lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL);
97    lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn);
98    qeh->qeh_flags |= QEH_INITIALIZED;
99    qeh->qeh_max_prefix_size =
100                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
101    if (qeh->qeh_dec_sm_in)
102        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
103    LSQ_DEBUG("initialized");
104}
105
106
107int
108lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size,
109             unsigned dyn_table_size, unsigned max_risked_streams, int server)
110{
111    enum lsqpack_enc_opts enc_opts;
112
113    assert(qeh->qeh_flags & QEH_INITIALIZED);
114
115    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
116    {
117        LSQ_WARN("settings already set");
118        return -1;
119    }
120
121    enc_opts = LSQPACK_ENC_OPT_STAGE_2
122             | (server ? LSQPACK_ENC_OPT_SERVER : 0);
123    qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf);
124    if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn,
125                max_table_size, dyn_table_size, max_risked_streams, enc_opts,
126                qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz))
127    {
128        LSQ_INFO("could not initialize QPACK encoder");
129        return -1;
130    }
131    LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz);
132    qeh->qeh_flags |= QEH_HAVE_SETTINGS;
133    qeh->qeh_max_prefix_size =
134                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
135    LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked "
136        "streams=%u", max_table_size, dyn_table_size, max_risked_streams);
137    if (qeh->qeh_enc_sm_out)
138        qeh_begin_out(qeh);
139    return 0;
140}
141
142
143void
144lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh)
145{
146    if (qeh->qeh_flags & QEH_INITIALIZED)
147    {
148        LSQ_DEBUG("cleanup");
149        lsqpack_enc_cleanup(&qeh->qeh_encoder);
150        lsquic_frab_list_cleanup(&qeh->qeh_fral);
151        memset(qeh, 0, sizeof(*qeh));
152    }
153}
154
155static lsquic_stream_ctx_t *
156qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
157{
158    struct qpack_enc_hdl *const qeh = stream_if_ctx;
159    qeh->qeh_enc_sm_out = stream;
160    if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
161                                    == (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
162        qeh_begin_out(qeh);
163    else
164        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
165    LSQ_DEBUG("initialized outgoing encoder stream");
166    return (void *) qeh;
167}
168
169
170static void
171qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
172{
173    struct qpack_enc_hdl *const qeh = (void *) ctx;
174    struct lsquic_reader reader = {
175        .lsqr_read  = lsquic_frab_list_read,
176        .lsqr_size  = lsquic_frab_list_size,
177        .lsqr_ctx   = &qeh->qeh_fral,
178    };
179    ssize_t nw;
180
181    nw = lsquic_stream_writef(stream, &reader);
182    if (nw >= 0)
183    {
184        LSQ_DEBUG("wrote %zd bytes to stream", nw);
185        (void) lsquic_stream_flush(stream);
186        if (lsquic_frab_list_empty(&qeh->qeh_fral))
187            lsquic_stream_wantwrite(stream, 0);
188    }
189    else
190    {
191        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
192                                            "cannot write to stream");
193        LSQ_WARN("cannot write to stream: %s", strerror(errno));
194        lsquic_stream_wantwrite(stream, 0);
195    }
196}
197
198
199static void
200qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
201{
202    struct qpack_enc_hdl *const qeh = (void *) ctx;
203    qeh->qeh_enc_sm_out = NULL;
204    LSQ_DEBUG("closed outgoing encoder stream");
205}
206
207
208static void
209qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
210{
211    assert(0);
212}
213
214
215static const struct lsquic_stream_if qeh_enc_sm_out_if =
216{
217    .on_new_stream  = qeh_out_on_new,
218    .on_read        = qeh_out_on_read,
219    .on_write       = qeh_out_on_write,
220    .on_close       = qeh_out_on_close,
221};
222const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if =
223                                                    &qeh_enc_sm_out_if;
224
225
226static lsquic_stream_ctx_t *
227qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
228{
229    struct qpack_enc_hdl *const qeh = stream_if_ctx;
230    qeh->qeh_dec_sm_in = stream;
231    if (qeh->qeh_flags & QEH_INITIALIZED)
232        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
233    else
234        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
235    LSQ_DEBUG("initialized incoming decoder stream");
236    return (void *) qeh;
237}
238
239
240static size_t
241qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz,
242                                                                    int fin)
243{
244    struct qpack_enc_hdl *const qeh = (void *) ctx;
245    uint64_t offset;
246    int s;
247
248    if (fin)
249    {
250        LSQ_INFO("decoder stream is closed");
251        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
252            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream");
253        goto end;
254    }
255
256    offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in);
257    s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz);
258    if (s != 0)
259    {
260        LSQ_INFO("error reading decoder stream");
261        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
262            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder "
263            "stream at offset %"PRIu64, offset);
264        goto end;
265    }
266    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
267
268  end:
269    return sz;
270}
271
272
273static void
274qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
275{
276    struct qpack_enc_hdl *const qeh = (void *) ctx;
277    ssize_t nread;
278
279    nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh);
280    if (nread <= 0)
281    {
282        if (nread < 0)
283        {
284            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
285            qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
286                                        "cannot read from encoder stream");
287        }
288        else
289        {
290            LSQ_INFO("encoder stream closed by peer: abort connection");
291            qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
292                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
293        }
294        lsquic_stream_wantread(stream, 0);
295    }
296}
297
298
299static void
300qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
301{
302    struct qpack_enc_hdl *const qeh = (void *) ctx;
303    LSQ_DEBUG("closed incoming decoder stream");
304    qeh->qeh_dec_sm_in = NULL;
305}
306
307
308static void
309qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
310{
311    assert(0);
312}
313
314
315static const struct lsquic_stream_if qeh_dec_sm_in_if =
316{
317    .on_new_stream  = qeh_in_on_new,
318    .on_read        = qeh_in_on_read,
319    .on_write       = qeh_in_on_write,
320    .on_close       = qeh_in_on_close,
321};
322const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if =
323                                                    &qeh_dec_sm_in_if;
324
325
326static enum qwh_status
327qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id,
328    unsigned seqno, const struct lsquic_http_headers *headers,
329    unsigned char *buf, size_t *prefix_sz, size_t *headers_sz,
330    uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags)
331{
332    unsigned char *p = buf;
333    unsigned char *const end = buf + *headers_sz;
334    const unsigned char *enc_p;
335    size_t enc_sz, hea_sz, total_enc_sz;
336    ssize_t nw;
337    enum lsqpack_enc_status st;
338    int i, s, write_to_stream;
339    enum lsqpack_enc_flags enc_flags;
340    enum qwh_status retval;
341#ifndef WIN32
342    unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ];
343#else
344    unsigned char *enc_buf;
345    enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2);
346    if (!enc_buf)
347        return QWH_ERR;
348#endif
349
350    s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0);
351    if (s != 0)
352    {
353        LSQ_WARN("cannot start header");
354        retval = QWH_ERR;
355        goto end;
356    }
357    LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id);
358
359    if (qeh->qeh_enc_sm_out)
360        enc_flags = 0;
361    else
362    {
363        enc_flags = LQEF_NO_INDEX;
364        LSQ_DEBUG("encoder stream is unavailable, won't index headers");
365    }
366    write_to_stream = qeh->qeh_enc_sm_out
367                                && lsquic_frab_list_empty(&qeh->qeh_fral);
368    total_enc_sz = 0;
369    for (i = 0; i < headers->count; ++i)
370    {
371        if (headers->headers[i].buf == NULL)
372            continue;
373        enc_sz = sizeof(enc_buf);
374        hea_sz = end - p;
375        st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p,
376                                &hea_sz, &headers->headers[i], enc_flags);
377        switch (st)
378        {
379        case LQES_OK:
380            LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, "
381                "%zd bytes to encoder stream",
382                (int) headers->headers[i].name_len,
383                    lsxpack_header_get_name(&headers->headers[i]),
384                (int) headers->headers[i].val_len,
385                    lsxpack_header_get_value(&headers->headers[i]),
386                hea_sz, enc_sz);
387            total_enc_sz += enc_sz;
388            p += hea_sz;
389            if (enc_sz)
390            {
391                if (write_to_stream)
392                {
393                    nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz);
394                    if ((size_t) nw == enc_sz)
395                        break;
396                    if (nw < 0)
397                    {
398                        LSQ_INFO("could not write to encoder stream: %s",
399                                                                strerror(errno));
400                        retval = QWH_ERR;
401                        goto end;
402                    }
403                    write_to_stream = 0;
404                    enc_p = enc_buf + (size_t) nw;
405                    enc_sz -= (size_t) nw;
406                }
407                else
408                    enc_p = enc_buf;
409                if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz))
410                {
411                    LSQ_INFO("could not write to frab list");
412                    retval = QWH_ERR;
413                    goto end;
414                }
415            }
416            break;
417        case LQES_NOBUF_HEAD:
418            retval = QWH_ENOBUF;
419            goto end;
420        default:
421            assert(0);
422            retval = QWH_ERR;
423            goto end;
424        case LQES_NOBUF_ENC:
425            LSQ_DEBUG("not enough room to write encoder stream data");
426            retval = QWH_ERR;
427            goto end;
428        }
429    }
430
431    nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz,
432                                                        *prefix_sz, hflags);
433    if (nw <= 0)
434    {
435        LSQ_WARN("could not end header: %zd", nw);
436        retval = QWH_ERR;
437        goto end;
438    }
439
440    if ((size_t) nw < *prefix_sz)
441    {
442        memmove(buf - nw, buf - *prefix_sz, (size_t) nw);
443        *prefix_sz = (size_t) nw;
444    }
445    *headers_sz = p - buf;
446    if (lsquic_frab_list_empty(&qeh->qeh_fral))
447    {
448        LSQ_DEBUG("all %zd bytes of encoder stream written out; header block "
449            "is %zd bytes; estimated compression ratio %.3f", total_enc_sz,
450            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
451        retval = QWH_FULL;
452        goto end;
453    }
454    else
455    {
456        *completion_offset = lsquic_qeh_enc_off(qeh)
457                                    + lsquic_frab_list_size(&qeh->qeh_fral);
458        LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes "
459            "buffered; header block is %zd bytes; estimated compression ratio "
460            "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral),
461            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
462        retval = QWH_PARTIAL;
463        goto end;
464    }
465
466  end:
467#ifdef WIN32
468    _freea(enc_buf);
469#endif
470    return retval;
471}
472
473
474#if !defined(NDEBUG) && __GNUC__
475__attribute__((weak))
476#endif
477enum qwh_status
478lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh,
479    lsquic_stream_id_t stream_id, unsigned seqno,
480    const struct lsquic_http_headers *headers, unsigned char *buf,
481    size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset,
482    enum lsqpack_enc_header_flags *hflags)
483{
484    if (qeh->qeh_flags & QEH_INITIALIZED)
485        return qeh_write_headers(qeh, stream_id, seqno, headers, buf,
486                        prefix_sz, headers_sz, completion_offset, hflags);
487    else
488        return QWH_ERR;
489}
490
491
492#if !defined(NDEBUG) && __GNUC__
493__attribute__((weak))
494#endif
495uint64_t
496lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh)
497{
498    if (qeh->qeh_enc_sm_out)
499        return qeh->qeh_enc_sm_out->tosend_off;
500    else
501        return 0;
502}
503
504
505size_t
506lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh)
507{
508    if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out)
509        return lsquic_stream_write_avail(qeh->qeh_enc_sm_out);
510    else if (qeh->qeh_flags & QEH_INITIALIZED)
511        return ~((size_t) 0);   /* Unlimited write */
512    else
513        return 0;
514}
515
516
517size_t
518lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh)
519{
520    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
521        return qeh->qeh_max_prefix_size;
522    else
523        return LSQPACK_UINT64_ENC_SZ * 2;
524}
525