lsquic_qenc_hdl.c revision 758aff32
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#ifdef WIN32
14#include <malloc.h>
15#endif
16
17#include "lsquic.h"
18#include "lsquic_types.h"
19#include "lsquic_int_types.h"
20#include "lsquic_sfcw.h"
21#include "lsquic_varint.h"
22#include "lsquic_hq.h"
23#include "lsquic_hash.h"
24#include "lsquic_stream.h"
25#include "lsquic_frab_list.h"
26#include "lsqpack.h"
27#include "lsxpack_header.h"
28#include "lsquic_conn.h"
29#include "lsquic_qpack_exp.h"
30#include "lsquic_util.h"
31#include "lsquic_qenc_hdl.h"
32
33#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL
34#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn)
35#include "lsquic_logger.h"
36
37
38static int
39qeh_write_type (struct qpack_enc_hdl *qeh)
40{
41    int s;
42
43#ifndef NDEBUG
44    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
45    if (env && atoi(env))
46    {
47        s = rand() & 3;
48        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
49    }
50    else
51#endif
52        s = 0;
53
54    switch (s)
55    {
56    case 0:
57        return lsquic_frab_list_write(&qeh->qeh_fral,
58                                (unsigned char []) { HQUST_QPACK_ENC }, 1);
59    case 1:
60        return lsquic_frab_list_write(&qeh->qeh_fral,
61                            (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2);
62    case 2:
63        return lsquic_frab_list_write(&qeh->qeh_fral,
64                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4);
65    default:
66        return lsquic_frab_list_write(&qeh->qeh_fral,
67                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
68                                                        HQUST_QPACK_ENC }, 8);
69    }
70}
71
72
73static void
74qeh_begin_out (struct qpack_enc_hdl *qeh)
75{
76    if (0 == qeh_write_type(qeh)
77        && (qeh->qeh_tsu_sz == 0
78            || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf,
79                                                            qeh->qeh_tsu_sz)))
80    {
81        LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz);
82        lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1);
83    }
84    else
85    {
86        LSQ_WARN("could not write to frab list");
87        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
88                                            "cannot write to frab list");
89    }
90}
91
92
93void
94lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn)
95{
96    assert(!(qeh->qeh_flags & QEH_INITIALIZED));
97    qeh->qeh_conn = conn;
98    lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL);
99    lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn);
100    qeh->qeh_flags |= QEH_INITIALIZED;
101    qeh->qeh_max_prefix_size =
102                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
103    if (qeh->qeh_dec_sm_in)
104        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
105    LSQ_DEBUG("initialized");
106}
107
108
109int
110lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size,
111             unsigned dyn_table_size, unsigned max_risked_streams, int server)
112{
113    enum lsqpack_enc_opts enc_opts;
114
115    assert(qeh->qeh_flags & QEH_INITIALIZED);
116
117    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
118    {
119        LSQ_WARN("settings already set");
120        return -1;
121    }
122
123    enc_opts = LSQPACK_ENC_OPT_STAGE_2
124             | (server ? LSQPACK_ENC_OPT_SERVER : 0);
125    qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf);
126    if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn,
127                max_table_size, dyn_table_size, max_risked_streams, enc_opts,
128                qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz))
129    {
130        LSQ_INFO("could not initialize QPACK encoder");
131        return -1;
132    }
133    LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz);
134    qeh->qeh_flags |= QEH_HAVE_SETTINGS;
135    qeh->qeh_max_prefix_size =
136                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
137    LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked "
138        "streams=%u", max_table_size, dyn_table_size, max_risked_streams);
139    if (qeh->qeh_enc_sm_out)
140        qeh_begin_out(qeh);
141    return 0;
142}
143
144
145static void
146qeh_log_and_clean_exp_rec (struct qpack_enc_hdl *qeh)
147{
148    char buf[0x400];
149
150    qeh->qeh_exp_rec->qer_comp_ratio = lsqpack_enc_ratio(&qeh->qeh_encoder);
151    (void) lsquic_qpack_exp_to_xml(qeh->qeh_exp_rec, buf, sizeof(buf));
152    LSQ_NOTICE("%s", buf);
153    lsquic_qpack_exp_destroy(qeh->qeh_exp_rec);
154    qeh->qeh_exp_rec = NULL;
155}
156
157
158void
159lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh)
160{
161    if (qeh->qeh_flags & QEH_INITIALIZED)
162    {
163        LSQ_DEBUG("cleanup");
164        if (qeh->qeh_exp_rec)
165            qeh_log_and_clean_exp_rec(qeh);
166        lsqpack_enc_cleanup(&qeh->qeh_encoder);
167        lsquic_frab_list_cleanup(&qeh->qeh_fral);
168        memset(qeh, 0, sizeof(*qeh));
169    }
170}
171
172static lsquic_stream_ctx_t *
173qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
174{
175    struct qpack_enc_hdl *const qeh = stream_if_ctx;
176    qeh->qeh_enc_sm_out = stream;
177    if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
178                                    == (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
179        qeh_begin_out(qeh);
180    else
181        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
182    LSQ_DEBUG("initialized outgoing encoder stream");
183    return (void *) qeh;
184}
185
186
187static void
188qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
189{
190    struct qpack_enc_hdl *const qeh = (void *) ctx;
191    struct lsquic_reader reader = {
192        .lsqr_read  = lsquic_frab_list_read,
193        .lsqr_size  = lsquic_frab_list_size,
194        .lsqr_ctx   = &qeh->qeh_fral,
195    };
196    ssize_t nw;
197
198    nw = lsquic_stream_writef(stream, &reader);
199    if (nw >= 0)
200    {
201        LSQ_DEBUG("wrote %zd bytes to stream", nw);
202        (void) lsquic_stream_flush(stream);
203        if (lsquic_frab_list_empty(&qeh->qeh_fral))
204            lsquic_stream_wantwrite(stream, 0);
205    }
206    else
207    {
208        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
209                                            "cannot write to stream");
210        LSQ_WARN("cannot write to stream: %s", strerror(errno));
211        lsquic_stream_wantwrite(stream, 0);
212    }
213}
214
215
216static void
217qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
218{
219    struct qpack_enc_hdl *const qeh = (void *) ctx;
220    qeh->qeh_enc_sm_out = NULL;
221    LSQ_DEBUG("closed outgoing encoder stream");
222}
223
224
225static void
226qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
227{
228    assert(0);
229}
230
231
232static const struct lsquic_stream_if qeh_enc_sm_out_if =
233{
234    .on_new_stream  = qeh_out_on_new,
235    .on_read        = qeh_out_on_read,
236    .on_write       = qeh_out_on_write,
237    .on_close       = qeh_out_on_close,
238};
239const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if =
240                                                    &qeh_enc_sm_out_if;
241
242
243static lsquic_stream_ctx_t *
244qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
245{
246    struct qpack_enc_hdl *const qeh = stream_if_ctx;
247    qeh->qeh_dec_sm_in = stream;
248    if (qeh->qeh_flags & QEH_INITIALIZED)
249        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
250    else
251        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
252    LSQ_DEBUG("initialized incoming decoder stream");
253    return (void *) qeh;
254}
255
256
257static size_t
258qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz,
259                                                                    int fin)
260{
261    struct qpack_enc_hdl *const qeh = (void *) ctx;
262    uint64_t offset;
263    int s;
264
265    if (fin)
266    {
267        LSQ_INFO("decoder stream is closed");
268        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
269            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream");
270        goto end;
271    }
272
273    offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in);
274    s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz);
275    if (s != 0)
276    {
277        LSQ_INFO("error reading decoder stream");
278        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
279            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder "
280            "stream at offset %"PRIu64, offset);
281        goto end;
282    }
283    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
284
285  end:
286    return sz;
287}
288
289
290static void
291qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
292{
293    struct qpack_enc_hdl *const qeh = (void *) ctx;
294    ssize_t nread;
295
296    nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh);
297    if (nread <= 0)
298    {
299        if (nread < 0)
300        {
301            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
302            qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
303                                        "cannot read from encoder stream");
304        }
305        else
306        {
307            LSQ_INFO("encoder stream closed by peer: abort connection");
308            qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
309                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
310        }
311        lsquic_stream_wantread(stream, 0);
312    }
313}
314
315
316static void
317qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
318{
319    struct qpack_enc_hdl *const qeh = (void *) ctx;
320    LSQ_DEBUG("closed incoming decoder stream");
321    qeh->qeh_dec_sm_in = NULL;
322}
323
324
325static void
326qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
327{
328    assert(0);
329}
330
331
332static const struct lsquic_stream_if qeh_dec_sm_in_if =
333{
334    .on_new_stream  = qeh_in_on_new,
335    .on_read        = qeh_in_on_read,
336    .on_write       = qeh_in_on_write,
337    .on_close       = qeh_in_on_close,
338};
339const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if =
340                                                    &qeh_dec_sm_in_if;
341
342
343static void
344qeh_maybe_set_user_agent (struct qpack_enc_hdl *qeh,
345                                    const struct lsquic_http_headers *headers)
346{
347    const char *const name = qeh->qeh_exp_rec->qer_flags & QER_SERVER ?
348                                    "server" : "user-agent";
349    const size_t len = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 6 : 10;
350    int i;
351
352    for (i = 0; i < headers->count; ++i)
353        if (len == headers->headers[i].name_len
354                && 0 == memcmp(name,
355                        lsxpack_header_get_name(&headers->headers[i]), len))
356        {
357            qeh->qeh_exp_rec->qer_user_agent = strndup(
358                            lsxpack_header_get_value(&headers->headers[i]),
359                            headers->headers[i].val_len);
360            break;
361        }
362}
363
364
365static enum qwh_status
366qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id,
367    unsigned seqno, const struct lsquic_http_headers *headers,
368    unsigned char *buf, size_t *prefix_sz, size_t *headers_sz,
369    uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags)
370{
371    unsigned char *p = buf;
372    unsigned char *const end = buf + *headers_sz;
373    const unsigned char *enc_p;
374    size_t enc_sz, hea_sz, total_enc_sz;
375    ssize_t nw;
376    enum lsqpack_enc_status st;
377    int i, s, write_to_stream;
378    enum lsqpack_enc_flags enc_flags;
379    enum qwh_status retval;
380#ifndef WIN32
381    unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ];
382#else
383    unsigned char *enc_buf;
384    enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2);
385    if (!enc_buf)
386        return QWH_ERR;
387#endif
388
389    if (qeh->qeh_exp_rec)
390    {
391        const lsquic_time_t now = lsquic_time_now();
392        if (qeh->qeh_exp_rec->qer_hblock_count == 0)
393            qeh->qeh_exp_rec->qer_first_req = now;
394        qeh->qeh_exp_rec->qer_last_req = now;
395        ++qeh->qeh_exp_rec->qer_hblock_count;
396        if (!qeh->qeh_exp_rec->qer_user_agent)
397            qeh_maybe_set_user_agent(qeh, headers);
398    }
399
400    s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0);
401    if (s != 0)
402    {
403        LSQ_WARN("cannot start header");
404        retval = QWH_ERR;
405        goto end;
406    }
407    LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id);
408
409    if (qeh->qeh_enc_sm_out)
410        enc_flags = 0;
411    else
412    {
413        enc_flags = LQEF_NO_INDEX;
414        LSQ_DEBUG("encoder stream is unavailable, won't index headers");
415    }
416    write_to_stream = qeh->qeh_enc_sm_out
417                                && lsquic_frab_list_empty(&qeh->qeh_fral);
418    total_enc_sz = 0;
419    for (i = 0; i < headers->count; ++i)
420    {
421        if (headers->headers[i].buf == NULL)
422            continue;
423        enc_sz = sizeof(enc_buf);
424        hea_sz = end - p;
425        st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p,
426                                &hea_sz, &headers->headers[i], enc_flags);
427        switch (st)
428        {
429        case LQES_OK:
430            LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, "
431                "%zd bytes to encoder stream",
432                (int) headers->headers[i].name_len,
433                    lsxpack_header_get_name(&headers->headers[i]),
434                (int) headers->headers[i].val_len,
435                    lsxpack_header_get_value(&headers->headers[i]),
436                hea_sz, enc_sz);
437            total_enc_sz += enc_sz;
438            p += hea_sz;
439            if (enc_sz)
440            {
441                if (write_to_stream)
442                {
443                    nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz);
444                    if ((size_t) nw == enc_sz)
445                        break;
446                    if (nw < 0)
447                    {
448                        LSQ_INFO("could not write to encoder stream: %s",
449                                                                strerror(errno));
450                        retval = QWH_ERR;
451                        goto end;
452                    }
453                    write_to_stream = 0;
454                    enc_p = enc_buf + (size_t) nw;
455                    enc_sz -= (size_t) nw;
456                }
457                else
458                    enc_p = enc_buf;
459                if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz))
460                {
461                    LSQ_INFO("could not write to frab list");
462                    retval = QWH_ERR;
463                    goto end;
464                }
465            }
466            break;
467        case LQES_NOBUF_HEAD:
468            retval = QWH_ENOBUF;
469            goto end;
470        default:
471            assert(0);
472            retval = QWH_ERR;
473            goto end;
474        case LQES_NOBUF_ENC:
475            LSQ_DEBUG("not enough room to write encoder stream data");
476            retval = QWH_ERR;
477            goto end;
478        }
479    }
480
481    nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz,
482                                                        *prefix_sz, hflags);
483    if (nw <= 0)
484    {
485        LSQ_WARN("could not end header: %zd", nw);
486        retval = QWH_ERR;
487        goto end;
488    }
489
490    if ((size_t) nw < *prefix_sz)
491    {
492        memmove(buf - nw, buf - *prefix_sz, (size_t) nw);
493        *prefix_sz = (size_t) nw;
494    }
495    *headers_sz = p - buf;
496    if (qeh->qeh_exp_rec)
497        qeh->qeh_exp_rec->qer_hblock_size += p - buf;
498    if (lsquic_frab_list_empty(&qeh->qeh_fral))
499    {
500        LSQ_DEBUG("all %zd bytes of encoder stream written out; header block "
501            "is %zd bytes; estimated compression ratio %.3f", total_enc_sz,
502            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
503        retval = QWH_FULL;
504        goto end;
505    }
506    else
507    {
508        *completion_offset = lsquic_qeh_enc_off(qeh)
509                                    + lsquic_frab_list_size(&qeh->qeh_fral);
510        LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes "
511            "buffered; header block is %zd bytes; estimated compression ratio "
512            "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral),
513            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
514        retval = QWH_PARTIAL;
515        goto end;
516    }
517
518  end:
519#ifdef WIN32
520    _freea(enc_buf);
521#endif
522    return retval;
523}
524
525
526#if !defined(NDEBUG) && __GNUC__
527__attribute__((weak))
528#endif
529enum qwh_status
530lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh,
531    lsquic_stream_id_t stream_id, unsigned seqno,
532    const struct lsquic_http_headers *headers, unsigned char *buf,
533    size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset,
534    enum lsqpack_enc_header_flags *hflags)
535{
536    if (qeh->qeh_flags & QEH_INITIALIZED)
537        return qeh_write_headers(qeh, stream_id, seqno, headers, buf,
538                        prefix_sz, headers_sz, completion_offset, hflags);
539    else
540        return QWH_ERR;
541}
542
543
544#if !defined(NDEBUG) && __GNUC__
545__attribute__((weak))
546#endif
547uint64_t
548lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh)
549{
550    if (qeh->qeh_enc_sm_out)
551        return qeh->qeh_enc_sm_out->tosend_off;
552    else
553        return 0;
554}
555
556
557size_t
558lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh)
559{
560    if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out)
561        return lsquic_stream_write_avail(qeh->qeh_enc_sm_out);
562    else if (qeh->qeh_flags & QEH_INITIALIZED)
563        return ~((size_t) 0);   /* Unlimited write */
564    else
565        return 0;
566}
567
568
569size_t
570lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh)
571{
572    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
573        return qeh->qeh_max_prefix_size;
574    else
575        return LSQPACK_UINT64_ENC_SZ * 2;
576}
577