1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#ifdef WIN32
14#include <malloc.h>
15#endif
16
17#include "lsquic.h"
18#include "lsquic_types.h"
19#include "lsquic_int_types.h"
20#include "lsquic_sfcw.h"
21#include "lsquic_varint.h"
22#include "lsquic_hq.h"
23#include "lsquic_hash.h"
24#include "lsquic_stream.h"
25#include "lsquic_frab_list.h"
26#include "lsqpack.h"
27#include "lsxpack_header.h"
28#include "lsquic_conn.h"
29#include "lsquic_qpack_exp.h"
30#include "lsquic_util.h"
31#include "lsquic_qenc_hdl.h"
32
33#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL
34#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn)
35#include "lsquic_logger.h"
36
37#define QENC_MIN_DYN_TABLE_SIZE 32u
38
39static int
40qeh_write_type (struct qpack_enc_hdl *qeh)
41{
42    int s;
43
44#ifndef NDEBUG
45    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
46    if (env && atoi(env))
47    {
48        s = rand() & 3;
49        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
50    }
51    else
52#endif
53        s = 0;
54
55    switch (s)
56    {
57    case 0:
58        return lsquic_frab_list_write(&qeh->qeh_fral,
59                                (unsigned char []) { HQUST_QPACK_ENC }, 1);
60    case 1:
61        return lsquic_frab_list_write(&qeh->qeh_fral,
62                            (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2);
63    case 2:
64        return lsquic_frab_list_write(&qeh->qeh_fral,
65                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4);
66    default:
67        return lsquic_frab_list_write(&qeh->qeh_fral,
68                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
69                                                        HQUST_QPACK_ENC }, 8);
70    }
71}
72
73
74static void
75qeh_begin_out (struct qpack_enc_hdl *qeh)
76{
77    if (0 == qeh_write_type(qeh)
78        && (qeh->qeh_tsu_sz == 0
79            || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf,
80                                                            qeh->qeh_tsu_sz)))
81    {
82        LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz);
83        lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1);
84    }
85    else
86    {
87        LSQ_WARN("could not write to frab list");
88        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
89                                            "cannot write to frab list");
90    }
91}
92
93
94void
95lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn)
96{
97    assert(!(qeh->qeh_flags & QEH_INITIALIZED));
98    qeh->qeh_conn = conn;
99    lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL);
100    lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn);
101    qeh->qeh_flags |= QEH_INITIALIZED;
102    qeh->qeh_max_prefix_size =
103                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
104    if (qeh->qeh_dec_sm_in)
105        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
106    LSQ_DEBUG("initialized");
107}
108
109
110int
111lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size,
112             unsigned dyn_table_size, unsigned max_risked_streams, int server)
113{
114    enum lsqpack_enc_opts enc_opts;
115
116    assert(qeh->qeh_flags & QEH_INITIALIZED);
117
118    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
119    {
120        LSQ_WARN("settings already set");
121        return -1;
122    }
123
124    enc_opts = LSQPACK_ENC_OPT_STAGE_2
125             | (server ? LSQPACK_ENC_OPT_SERVER : 0);
126    qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf);
127    if (QENC_MIN_DYN_TABLE_SIZE > dyn_table_size)
128        dyn_table_size = 0;
129    if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn,
130                max_table_size, dyn_table_size, max_risked_streams, enc_opts,
131                qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz))
132    {
133        LSQ_INFO("could not initialize QPACK encoder");
134        return -1;
135    }
136    LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz);
137    qeh->qeh_flags |= QEH_HAVE_SETTINGS;
138    qeh->qeh_max_prefix_size =
139                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
140    LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked "
141        "streams=%u", max_table_size, dyn_table_size, max_risked_streams);
142    if (qeh->qeh_enc_sm_out)
143        qeh_begin_out(qeh);
144    return 0;
145}
146
147
148static void
149qeh_log_and_clean_exp_rec (struct qpack_enc_hdl *qeh)
150{
151    char buf[0x400];
152
153    qeh->qeh_exp_rec->qer_comp_ratio = lsqpack_enc_ratio(&qeh->qeh_encoder);
154    (void) lsquic_qpack_exp_to_xml(qeh->qeh_exp_rec, buf, sizeof(buf));
155    LSQ_NOTICE("%s", buf);
156    lsquic_qpack_exp_destroy(qeh->qeh_exp_rec);
157    qeh->qeh_exp_rec = NULL;
158}
159
160
161void
162lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh)
163{
164    if (qeh->qeh_flags & QEH_INITIALIZED)
165    {
166        LSQ_DEBUG("cleanup");
167        if (qeh->qeh_exp_rec)
168            qeh_log_and_clean_exp_rec(qeh);
169        lsqpack_enc_cleanup(&qeh->qeh_encoder);
170        lsquic_frab_list_cleanup(&qeh->qeh_fral);
171        memset(qeh, 0, sizeof(*qeh));
172    }
173}
174
175static lsquic_stream_ctx_t *
176qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
177{
178    struct qpack_enc_hdl *const qeh = stream_if_ctx;
179    qeh->qeh_enc_sm_out = stream;
180    if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
181                                    == (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
182        qeh_begin_out(qeh);
183    else
184        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
185    LSQ_DEBUG("initialized outgoing encoder stream");
186    return (void *) qeh;
187}
188
189
190static void
191qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
192{
193    struct qpack_enc_hdl *const qeh = (void *) ctx;
194    struct lsquic_reader reader = {
195        .lsqr_read  = lsquic_frab_list_read,
196        .lsqr_size  = lsquic_frab_list_size,
197        .lsqr_ctx   = &qeh->qeh_fral,
198    };
199    ssize_t nw;
200
201    nw = lsquic_stream_writef(stream, &reader);
202    if (nw >= 0)
203    {
204        LSQ_DEBUG("wrote %zd bytes to stream", nw);
205        (void) lsquic_stream_flush(stream);
206        if (lsquic_frab_list_empty(&qeh->qeh_fral))
207            lsquic_stream_wantwrite(stream, 0);
208    }
209    else
210    {
211        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
212                                            "cannot write to stream");
213        LSQ_WARN("cannot write to stream: %s", strerror(errno));
214        lsquic_stream_wantwrite(stream, 0);
215    }
216}
217
218
219static void
220qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
221{
222    struct qpack_enc_hdl *const qeh = (void *) ctx;
223    qeh->qeh_enc_sm_out = NULL;
224    LSQ_DEBUG("closed outgoing encoder stream");
225}
226
227
228static void
229qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
230{
231    assert(0);
232}
233
234
235static const struct lsquic_stream_if qeh_enc_sm_out_if =
236{
237    .on_new_stream  = qeh_out_on_new,
238    .on_read        = qeh_out_on_read,
239    .on_write       = qeh_out_on_write,
240    .on_close       = qeh_out_on_close,
241};
242const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if =
243                                                    &qeh_enc_sm_out_if;
244
245
246static lsquic_stream_ctx_t *
247qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
248{
249    struct qpack_enc_hdl *const qeh = stream_if_ctx;
250    qeh->qeh_dec_sm_in = stream;
251    if (qeh->qeh_flags & QEH_INITIALIZED)
252        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
253    else
254        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
255    LSQ_DEBUG("initialized incoming decoder stream");
256    return (void *) qeh;
257}
258
259
260static size_t
261qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz,
262                                                                    int fin)
263{
264    struct qpack_enc_hdl *const qeh = (void *) ctx;
265    uint64_t offset;
266    int s;
267
268    if (fin)
269    {
270        LSQ_INFO("decoder stream is closed");
271        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
272            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream");
273        goto end;
274    }
275
276    offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in);
277    s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz);
278    if (s != 0)
279    {
280        LSQ_INFO("error reading decoder stream");
281        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
282            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder "
283            "stream at offset %"PRIu64, offset);
284        goto end;
285    }
286    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
287
288  end:
289    return sz;
290}
291
292
293static void
294qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
295{
296    struct qpack_enc_hdl *const qeh = (void *) ctx;
297    ssize_t nread;
298
299    nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh);
300    if (nread <= 0)
301    {
302        if (nread < 0)
303        {
304            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
305            qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
306                                        "cannot read from encoder stream");
307        }
308        else
309        {
310            LSQ_INFO("encoder stream closed by peer: abort connection");
311            qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
312                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
313        }
314        lsquic_stream_wantread(stream, 0);
315    }
316}
317
318
319static void
320qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
321{
322    struct qpack_enc_hdl *const qeh = (void *) ctx;
323    LSQ_DEBUG("closed incoming decoder stream");
324    qeh->qeh_dec_sm_in = NULL;
325}
326
327
328static void
329qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
330{
331    assert(0);
332}
333
334
335static const struct lsquic_stream_if qeh_dec_sm_in_if =
336{
337    .on_new_stream  = qeh_in_on_new,
338    .on_read        = qeh_in_on_read,
339    .on_write       = qeh_in_on_write,
340    .on_close       = qeh_in_on_close,
341};
342const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if =
343                                                    &qeh_dec_sm_in_if;
344
345
346static void
347qeh_maybe_set_user_agent (struct qpack_enc_hdl *qeh,
348                                    const struct lsquic_http_headers *headers)
349{
350    const char *const name = qeh->qeh_exp_rec->qer_flags & QER_SERVER ?
351                                    "server" : "user-agent";
352    const size_t len = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 6 : 10;
353    int i;
354
355    for (i = 0; i < headers->count; ++i)
356        if (len == headers->headers[i].name_len
357                && 0 == memcmp(name,
358                        lsxpack_header_get_name(&headers->headers[i]), len))
359        {
360            qeh->qeh_exp_rec->qer_user_agent = strndup(
361                            lsxpack_header_get_value(&headers->headers[i]),
362                            headers->headers[i].val_len);
363            break;
364        }
365}
366
367
368static enum qwh_status
369qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id,
370    unsigned seqno, const struct lsquic_http_headers *headers,
371    unsigned char *buf, size_t *prefix_sz, size_t *headers_sz,
372    uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags)
373{
374    unsigned char *p = buf;
375    unsigned char *const end = buf + *headers_sz;
376    const unsigned char *enc_p;
377    size_t enc_sz, hea_sz, total_enc_sz;
378    ssize_t nw;
379    enum lsqpack_enc_status st;
380    int i, s, write_to_stream;
381    enum lsqpack_enc_flags enc_flags;
382    enum qwh_status retval;
383#ifndef WIN32
384    unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ];
385#else
386    unsigned char *enc_buf;
387    enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2);
388    if (!enc_buf)
389        return QWH_ERR;
390#endif
391
392    if (qeh->qeh_exp_rec)
393    {
394        const lsquic_time_t now = lsquic_time_now();
395        if (qeh->qeh_exp_rec->qer_hblock_count == 0)
396            qeh->qeh_exp_rec->qer_first_req = now;
397        qeh->qeh_exp_rec->qer_last_req = now;
398        ++qeh->qeh_exp_rec->qer_hblock_count;
399        if (!qeh->qeh_exp_rec->qer_user_agent)
400            qeh_maybe_set_user_agent(qeh, headers);
401    }
402
403    s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0);
404    if (s != 0)
405    {
406        LSQ_WARN("cannot start header");
407        retval = QWH_ERR;
408        goto end;
409    }
410    LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id);
411
412    if (qeh->qeh_enc_sm_out)
413        enc_flags = 0;
414    else
415    {
416        enc_flags = LQEF_NO_INDEX;
417        LSQ_DEBUG("encoder stream is unavailable, won't index headers");
418    }
419    write_to_stream = qeh->qeh_enc_sm_out
420                                && lsquic_frab_list_empty(&qeh->qeh_fral);
421    total_enc_sz = 0;
422    for (i = 0; i < headers->count; ++i)
423    {
424        if (headers->headers[i].buf == NULL)
425            continue;
426        enc_sz = sizeof(enc_buf);
427        hea_sz = end - p;
428        st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p,
429                                &hea_sz, &headers->headers[i], enc_flags);
430        switch (st)
431        {
432        case LQES_OK:
433            LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, "
434                "%zd bytes to encoder stream",
435                (int) headers->headers[i].name_len,
436                    lsxpack_header_get_name(&headers->headers[i]),
437                (int) headers->headers[i].val_len,
438                    lsxpack_header_get_value(&headers->headers[i]),
439                hea_sz, enc_sz);
440            total_enc_sz += enc_sz;
441            p += hea_sz;
442            if (enc_sz)
443            {
444                if (write_to_stream)
445                {
446                    nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz);
447                    if ((size_t) nw == enc_sz)
448                        break;
449                    if (nw < 0)
450                    {
451                        LSQ_INFO("could not write to encoder stream: %s",
452                                                                strerror(errno));
453                        retval = QWH_ERR;
454                        goto end;
455                    }
456                    write_to_stream = 0;
457                    enc_p = enc_buf + (size_t) nw;
458                    enc_sz -= (size_t) nw;
459                }
460                else
461                    enc_p = enc_buf;
462                if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz))
463                {
464                    LSQ_INFO("could not write to frab list");
465                    retval = QWH_ERR;
466                    goto end;
467                }
468            }
469            break;
470        case LQES_NOBUF_HEAD:
471            retval = QWH_ENOBUF;
472            goto end;
473        default:
474            assert(0);
475            retval = QWH_ERR;
476            goto end;
477        case LQES_NOBUF_ENC:
478            LSQ_DEBUG("not enough room to write encoder stream data");
479            retval = QWH_ERR;
480            goto end;
481        }
482    }
483
484    nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz,
485                                                        *prefix_sz, hflags);
486    if (nw <= 0)
487    {
488        LSQ_WARN("could not end header: %zd", nw);
489        retval = QWH_ERR;
490        goto end;
491    }
492
493    if ((size_t) nw < *prefix_sz)
494    {
495        memmove(buf - nw, buf - *prefix_sz, (size_t) nw);
496        *prefix_sz = (size_t) nw;
497    }
498    *headers_sz = p - buf;
499    if (qeh->qeh_exp_rec)
500        qeh->qeh_exp_rec->qer_hblock_size += p - buf;
501    if (lsquic_frab_list_empty(&qeh->qeh_fral))
502    {
503        LSQ_DEBUG("all %zd bytes of encoder stream written out; header block "
504            "is %zd bytes; estimated compression ratio %.3f", total_enc_sz,
505            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
506        retval = QWH_FULL;
507        goto end;
508    }
509    else
510    {
511        *completion_offset = lsquic_qeh_enc_off(qeh)
512                                    + lsquic_frab_list_size(&qeh->qeh_fral);
513        LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes "
514            "buffered; header block is %zd bytes; estimated compression ratio "
515            "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral),
516            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
517        retval = QWH_PARTIAL;
518        goto end;
519    }
520
521  end:
522#ifdef WIN32
523    _freea(enc_buf);
524#endif
525    return retval;
526}
527
528
529#if !defined(NDEBUG) && __GNUC__
530__attribute__((weak))
531#endif
532enum qwh_status
533lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh,
534    lsquic_stream_id_t stream_id, unsigned seqno,
535    const struct lsquic_http_headers *headers, unsigned char *buf,
536    size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset,
537    enum lsqpack_enc_header_flags *hflags)
538{
539    if (qeh->qeh_flags & QEH_INITIALIZED)
540        return qeh_write_headers(qeh, stream_id, seqno, headers, buf,
541                        prefix_sz, headers_sz, completion_offset, hflags);
542    else
543        return QWH_ERR;
544}
545
546
547#if !defined(NDEBUG) && __GNUC__
548__attribute__((weak))
549#endif
550uint64_t
551lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh)
552{
553    if (qeh->qeh_enc_sm_out)
554        return qeh->qeh_enc_sm_out->tosend_off;
555    else
556        return 0;
557}
558
559
560size_t
561lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh)
562{
563    if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out)
564        return lsquic_stream_write_avail(qeh->qeh_enc_sm_out);
565    else if (qeh->qeh_flags & QEH_INITIALIZED)
566        return ~((size_t) 0);   /* Unlimited write */
567    else
568        return 0;
569}
570
571
572size_t
573lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh)
574{
575    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
576        return qeh->qeh_max_prefix_size;
577    else
578        return LSQPACK_UINT64_ENC_SZ * 2;
579}
580