lsquic_qdec_hdl.c revision 758aff32
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qdec_hdl.c -- QPACK decoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#include "lsquic.h"
14#include "lsquic_types.h"
15#include "lsxpack_header.h"
16#include "lsquic_int_types.h"
17#include "lsquic_sfcw.h"
18#include "lsquic_varint.h"
19#include "lsquic_hq.h"
20#include "lsquic_hash.h"
21#include "lsquic_stream.h"
22#include "lsquic_frab_list.h"
23#include "lsqpack.h"
24#include "lsquic_http1x_if.h"
25#include "lsquic_qdec_hdl.h"
26#include "lsquic_mm.h"
27#include "lsquic_engine_public.h"
28#include "lsquic_headers.h"
29#include "lsquic_conn.h"
30#include "lsquic_conn_flow.h"
31#include "lsquic_rtt.h"
32#include "lsquic_conn_public.h"
33#include "lsquic_hq.h"
34#include "lsquic_parse.h"
35#include "lsquic_qpack_exp.h"
36#include "lsquic_util.h"
37
38#define LSQUIC_LOGGER_MODULE LSQLM_QDEC_HDL
39#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qdh->qdh_conn)
40#include "lsquic_logger.h"
41
42static const struct lsqpack_dec_hset_if dhi_if;
43
44
45struct header_ctx
46{
47    void                    *hset;
48    struct qpack_dec_hdl    *qdh;
49    enum ppc_flags           ppc_flags;
50    struct lsquic_ext_http_prio ehp;
51};
52
53
54/* We need to allocate struct uncompressed_headers anyway when header set
55 * is complete and we give it to the stream using lsquic_stream_uh_in().
56 * To save a malloc, we reuse context after we're done with it.
57 */
58union hblock_ctx
59{
60    struct header_ctx           ctx;
61    struct uncompressed_headers uh;
62};
63
64
65static int
66qdh_write_decoder (struct qpack_dec_hdl *qdh, const unsigned char *buf,
67                                                                size_t sz)
68{
69    ssize_t nw;
70
71    if (!(qdh->qdh_dec_sm_out && lsquic_frab_list_empty(&qdh->qdh_fral)))
72    {
73  write_to_frab:
74        if (0 == lsquic_frab_list_write(&qdh->qdh_fral,
75                                                (unsigned char *) buf, sz))
76        {
77            LSQ_DEBUG("wrote %zu bytes to frab list", sz);
78            lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
79            return 0;
80        }
81        else
82        {
83            LSQ_INFO("error writing to frab list");
84            return -1;
85        }
86    }
87
88    nw = lsquic_stream_write(qdh->qdh_dec_sm_out, buf, sz);
89    if (nw < 0)
90    {
91        LSQ_INFO("error writing to outgoing QPACK decoder stream: %s",
92                                                        strerror(errno));
93        return -1;
94    }
95    LSQ_DEBUG("wrote %zd bytes to outgoing QPACK decoder stream", nw);
96
97    if ((size_t) nw == sz)
98        return 0;
99
100    buf = buf + nw;
101    sz -= (size_t) nw;
102    goto write_to_frab;
103}
104
105
106static int
107qdh_write_type (struct qpack_dec_hdl *qdh)
108{
109    int s;
110
111#ifndef NDEBUG
112    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
113    if (env && atoi(env))
114    {
115        s = rand() & 3;
116        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
117    }
118    else
119#endif
120        s = 0;
121
122    switch (s)
123    {
124    case 0:
125        return qdh_write_decoder(qdh,
126                                (unsigned char []) { HQUST_QPACK_DEC }, 1);
127    case 1:
128        return qdh_write_decoder(qdh,
129                            (unsigned char []) { 0x40, HQUST_QPACK_DEC }, 2);
130    case 2:
131        return qdh_write_decoder(qdh,
132                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_DEC }, 4);
133    default:
134        return qdh_write_decoder(qdh,
135                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
136                                                        HQUST_QPACK_DEC }, 8);
137    }
138}
139
140
141static void
142qdh_begin_out (struct qpack_dec_hdl *qdh)
143{
144    if (0 != qdh_write_type(qdh))
145    {
146        LSQ_WARN("%s: could not write to decoder", __func__);
147        qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
148                                        "cannot write to decoder stream");
149    }
150}
151
152
153int
154lsquic_qdh_init (struct qpack_dec_hdl *qdh, struct lsquic_conn *conn,
155                    int is_server, const struct lsquic_engine_public *enpub,
156                    unsigned dyn_table_size, unsigned max_risked_streams)
157{
158    enum lsqpack_dec_opts dec_opts;
159
160    dec_opts = 0;
161    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HTTP1X)
162        dec_opts |= LSQPACK_DEC_OPT_HTTP1X;
163    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAME)
164        dec_opts |= LSQPACK_DEC_OPT_HASH_NAME;
165    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAMEVAL)
166        dec_opts |= LSQPACK_DEC_OPT_HASH_NAMEVAL;
167
168    if (enpub->enp_settings.es_qpack_experiment)
169    {
170        qdh->qdh_exp_rec = lsquic_qpack_exp_new();
171        if (qdh->qdh_exp_rec)
172        {
173            if (conn->cn_flags & LSCONN_SERVER)
174                qdh->qdh_exp_rec->qer_flags |= QER_SERVER;
175            qdh->qdh_exp_rec->qer_used_max_size = dyn_table_size;
176            qdh->qdh_exp_rec->qer_used_max_blocked = max_risked_streams;
177        }
178    }
179
180    qdh->qdh_conn = conn;
181    lsquic_frab_list_init(&qdh->qdh_fral, 0x400, NULL, NULL, NULL);
182    lsqpack_dec_init(&qdh->qdh_decoder, (void *) conn, dyn_table_size,
183                        max_risked_streams, &dhi_if, dec_opts);
184    qdh->qdh_flags |= QDH_INITIALIZED;
185    qdh->qdh_enpub = enpub;
186    if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if)
187    {
188        qdh->qdh_h1x_ctor_ctx = (struct http1x_ctor_ctx) {
189            .conn           = conn,
190            .max_headers_sz = MAX_HTTP1X_HEADERS_SIZE,
191            .is_server      = is_server,
192        };
193        qdh->qdh_hsi_ctx = &qdh->qdh_h1x_ctor_ctx;
194    }
195    else
196        qdh->qdh_hsi_ctx = qdh->qdh_enpub->enp_hsi_ctx;
197    if (qdh->qdh_dec_sm_out)
198        qdh_begin_out(qdh);
199    if (qdh->qdh_enc_sm_in)
200        lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1);
201    LSQ_DEBUG("initialized");
202    return 0;
203}
204
205
206static void
207qdh_log_and_clean_exp_rec (struct qpack_dec_hdl *qdh)
208{
209    char buf[0x400];
210
211    qdh->qdh_exp_rec->qer_comp_ratio = lsqpack_dec_ratio(&qdh->qdh_decoder);
212    /* Naughty: poking inside the decoder, it's not exposed.  (Should it be?) */
213    qdh->qdh_exp_rec->qer_peer_max_size = qdh->qdh_decoder.qpd_cur_max_capacity;
214    (void) lsquic_qpack_exp_to_xml(qdh->qdh_exp_rec, buf, sizeof(buf));
215    LSQ_NOTICE("%s", buf);
216    lsquic_qpack_exp_destroy(qdh->qdh_exp_rec);
217    qdh->qdh_exp_rec = NULL;
218}
219
220
221void
222lsquic_qdh_cleanup (struct qpack_dec_hdl *qdh)
223{
224    if (qdh->qdh_flags & QDH_INITIALIZED)
225    {
226        LSQ_DEBUG("cleanup");
227        if (qdh->qdh_exp_rec)
228            qdh_log_and_clean_exp_rec(qdh);
229        lsqpack_dec_cleanup(&qdh->qdh_decoder);
230        lsquic_frab_list_cleanup(&qdh->qdh_fral);
231        qdh->qdh_flags &= ~QDH_INITIALIZED;
232    }
233}
234
235static lsquic_stream_ctx_t *
236qdh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
237{
238    struct qpack_dec_hdl *const qdh = stream_if_ctx;
239    qdh->qdh_dec_sm_out = stream;
240    if (qdh->qdh_flags & QDH_INITIALIZED)
241        qdh_begin_out(qdh);
242    LSQ_DEBUG("initialized outgoing decoder stream");
243    return (void *) qdh;
244}
245
246
247static void
248qdh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
249{
250    struct qpack_dec_hdl *const qdh = (void *) ctx;
251    struct lsquic_reader reader;
252    ssize_t nw;
253    unsigned char buf[LSQPACK_LONGEST_ICI];
254
255    if (lsqpack_dec_ici_pending(&qdh->qdh_decoder))
256    {
257        nw = lsqpack_dec_write_ici(&qdh->qdh_decoder, buf, sizeof(buf));
258        if (nw > 0)
259        {
260            if (0 == qdh_write_decoder(qdh, buf, nw))
261                LSQ_DEBUG("wrote %zd-byte TSS instruction", nw);
262            else
263                goto err;
264        }
265        else if (nw < 0)
266        {
267            LSQ_WARN("could not generate TSS instruction");
268            goto err;
269        }
270    }
271
272    if (lsquic_frab_list_empty(&qdh->qdh_fral))
273    {
274        LSQ_DEBUG("%s: nothing to write", __func__);
275        lsquic_stream_wantwrite(stream, 0);
276        return;
277    }
278
279    reader = (struct lsquic_reader) {
280        .lsqr_read  = lsquic_frab_list_read,
281        .lsqr_size  = lsquic_frab_list_size,
282        .lsqr_ctx   = &qdh->qdh_fral,
283    };
284
285    nw = lsquic_stream_writef(stream, &reader);
286    if (nw >= 0)
287    {
288        LSQ_DEBUG("wrote %zd bytes to stream", nw);
289        (void) lsquic_stream_flush(stream);
290        if (lsquic_frab_list_empty(&qdh->qdh_fral))
291        {
292            lsquic_stream_wantwrite(stream, 0);
293            if (qdh->qdh_on_dec_sent_func)
294            {
295                LSQ_DEBUG("buffered data written: call callback");
296                qdh->qdh_on_dec_sent_func(qdh->qdh_on_dec_sent_ctx);
297                qdh->qdh_on_dec_sent_func = NULL;
298                qdh->qdh_on_dec_sent_ctx = NULL;
299            }
300        }
301    }
302    else
303    {
304        LSQ_WARN("cannot write to stream: %s", strerror(errno));
305  err:
306        lsquic_stream_wantwrite(stream, 0);
307        qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
308                                        "cannot write to stream");
309    }
310}
311
312
313static void
314qdh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
315{
316    struct qpack_dec_hdl *const qdh = (void *) ctx;
317    qdh->qdh_dec_sm_out = NULL;
318    LSQ_DEBUG("closed outgoing decoder stream");
319}
320
321
322static void
323qdh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
324{
325    assert(0);
326}
327
328
329static const struct lsquic_stream_if qdh_dec_sm_out_if =
330{
331    .on_new_stream  = qdh_out_on_new,
332    .on_read        = qdh_out_on_read,
333    .on_write       = qdh_out_on_write,
334    .on_close       = qdh_out_on_close,
335};
336const struct lsquic_stream_if *const lsquic_qdh_dec_sm_out_if =
337                                                    &qdh_dec_sm_out_if;
338
339
340static lsquic_stream_ctx_t *
341qdh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
342{
343    struct qpack_dec_hdl *const qdh = stream_if_ctx;
344    qdh->qdh_enc_sm_in = stream;
345    if (qdh->qdh_flags & QDH_INITIALIZED)
346        lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1);
347    LSQ_DEBUG("initialized incoming encoder stream");
348    return (void *) qdh;
349}
350
351
352static size_t
353qdh_read_encoder_stream (void *ctx, const unsigned char *buf, size_t sz,
354                                                                    int fin)
355{
356    struct qpack_dec_hdl *const qdh = (void *) ctx;
357    const struct lsqpack_dec_err *qerr;
358    int s;
359
360    if (fin)
361    {
362        LSQ_INFO("encoder stream is closed");
363        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
364            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK encoder stream");
365        goto end;
366    }
367
368    s = lsqpack_dec_enc_in(&qdh->qdh_decoder, buf, sz);
369    if (s != 0)
370    {
371        LSQ_INFO("error reading encoder stream");
372        qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder);
373        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
374            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK encoder "
375            "stream; offset %"PRIu64", line %d", qerr->off, qerr->line);
376        goto end;
377    }
378    if (qdh->qdh_dec_sm_out
379                    && lsqpack_dec_ici_pending(&qdh->qdh_decoder))
380        lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
381
382    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
383
384  end:
385    return sz;
386}
387
388
389static void
390qdh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
391{
392    struct qpack_dec_hdl *const qdh = (void *) ctx;
393    ssize_t nread;
394
395    nread = lsquic_stream_readf(stream, qdh_read_encoder_stream, qdh);
396    if (nread <= 0)
397    {
398        if (nread < 0)
399        {
400            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
401            qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
402                                        "cannot read from encoder stream");
403        }
404        else
405        {
406            LSQ_INFO("encoder stream closed by peer: abort connection");
407            qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
408                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
409        }
410        lsquic_stream_wantread(stream, 0);
411    }
412}
413
414
415static void
416qdh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
417{
418    struct qpack_dec_hdl *const qdh = (void *) ctx;
419    LSQ_DEBUG("closed incoming encoder stream");
420    qdh->qdh_enc_sm_in = NULL;
421}
422
423
424static void
425qdh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
426{
427    assert(0);
428}
429
430
431static const struct lsquic_stream_if qdh_enc_sm_in_if =
432{
433    .on_new_stream  = qdh_in_on_new,
434    .on_read        = qdh_in_on_read,
435    .on_write       = qdh_in_on_write,
436    .on_close       = qdh_in_on_close,
437};
438const struct lsquic_stream_if *const lsquic_qdh_enc_sm_in_if =
439                                                    &qdh_enc_sm_in_if;
440
441
442static void
443qdh_hblock_unblocked (void *stream_p)
444{
445    struct lsquic_stream *const stream = stream_p;
446    union hblock_ctx *const u = stream->sm_hblock_ctx;
447    struct qpack_dec_hdl *qdh = u->ctx.qdh;
448
449    LSQ_DEBUG("header block for stream %"PRIu64" unblocked", stream->id);
450    lsquic_stream_qdec_unblocked(stream);
451}
452
453
454struct cont_len
455{
456    unsigned long long      value;
457    int                     has;    /* 1: set, 0: not set, -1: invalid */
458};
459
460
461static void
462process_content_length (const struct qpack_dec_hdl *qdh /* for logging */,
463            struct cont_len *cl, const char *val /* not NUL-terminated */,
464                                                                unsigned len)
465{
466    char *endcl, cont_len_buf[30];
467
468    if (0 == cl->has)
469    {
470        if (len >= sizeof(cont_len_buf))
471        {
472            LSQ_DEBUG("content-length has invalid value `%.*s'",
473                                                            (int) len, val);
474            cl->has = -1;
475            return;
476        }
477        memcpy(cont_len_buf, val, len);
478        cont_len_buf[len] = '\0';
479        cl->value = strtoull(cont_len_buf, &endcl, 10);
480        if (*endcl == '\0' && !(ULLONG_MAX == cl->value && ERANGE == errno))
481        {
482            cl->has = 1;
483            LSQ_DEBUG("content length is %llu", cl->value);
484        }
485        else
486        {
487            cl->has = -1;
488            LSQ_DEBUG("content-length has invalid value `%.*s'",
489                (int) len, val);
490        }
491    }
492    else if (cl->has > 0)
493    {
494        LSQ_DEBUG("header set has two content-length: ambiguous, "
495            "turn off checking");
496        cl->has = -1;
497    }
498}
499
500
501static int
502is_content_length (const struct lsxpack_header *xhdr)
503{
504    return ((xhdr->flags & LSXPACK_QPACK_IDX)
505                        && xhdr->qpack_index == LSQPACK_TNV_CONTENT_LENGTH_0)
506        || (xhdr->name_len == 14 && 0 == memcmp(lsxpack_header_get_name(xhdr),
507                                                        "content-length", 13))
508        ;
509}
510
511
512static int
513is_priority (const struct lsxpack_header *xhdr)
514{
515    return xhdr->name_len == 8
516        && 0 == memcmp(lsxpack_header_get_name(xhdr), "priority", 8);
517}
518
519
520static struct lsxpack_header *
521qdh_prepare_decode (void *stream_p, struct lsxpack_header *xhdr, size_t space)
522{
523    struct lsquic_stream *const stream = stream_p;
524    union hblock_ctx *const u = stream->sm_hblock_ctx;
525    struct qpack_dec_hdl *const qdh = u->ctx.qdh;
526
527    return qdh->qdh_enpub->enp_hsi_if->hsi_prepare_decode(
528                                                u->ctx.hset, xhdr, space);
529}
530
531
532static void
533qdh_maybe_set_user_agent (struct qpack_dec_hdl *qdh,
534                                        const struct lsxpack_header *xhdr)
535{
536    /* Flipped: we are the *decoder* */
537    const char *const name = qdh->qdh_exp_rec->qer_flags & QER_SERVER ?
538                                    "user-agent" : "server";
539    const size_t len = qdh->qdh_exp_rec->qer_flags & QER_SERVER ? 10 : 6;
540
541    if (len == xhdr->name_len
542                && 0 == memcmp(name, lsxpack_header_get_name(xhdr), len))
543        qdh->qdh_exp_rec->qer_user_agent
544                = strndup(lsxpack_header_get_value(xhdr), xhdr->val_len);
545}
546
547
548static int
549qdh_process_header (void *stream_p, struct lsxpack_header *xhdr)
550{
551    struct lsquic_stream *const stream = stream_p;
552    union hblock_ctx *const u = stream->sm_hblock_ctx;
553    struct qpack_dec_hdl *const qdh = u->ctx.qdh;
554    struct cont_len cl;
555
556    if (is_content_length(xhdr))
557    {
558        cl.has = 0;
559        process_content_length(qdh, &cl, lsxpack_header_get_value(xhdr),
560                                                            xhdr->val_len);
561        if (cl.has > 0)
562            (void) lsquic_stream_verify_len(stream, cl.value);
563    }
564    else if ((stream->sm_bflags & (SMBF_HTTP_PRIO|SMBF_HPRIO_SET))
565                                                            == SMBF_HTTP_PRIO
566            && is_priority(xhdr))
567    {
568        u->ctx.ppc_flags &= ~(PPC_INC_NAME|PPC_URG_NAME);
569        (void) lsquic_http_parse_pfv(lsxpack_header_get_value(xhdr),
570                        xhdr->val_len, &u->ctx.ppc_flags, &u->ctx.ehp,
571                        (char *) stream->conn_pub->mm->acki,
572                        sizeof(*stream->conn_pub->mm->acki));
573    }
574    else if (qdh->qdh_exp_rec && !qdh->qdh_exp_rec->qer_user_agent)
575        qdh_maybe_set_user_agent(qdh, xhdr);
576
577    return qdh->qdh_enpub->enp_hsi_if->hsi_process_header(u->ctx.hset, xhdr);
578}
579
580
581static const struct lsqpack_dec_hset_if dhi_if =
582{
583    .dhi_unblocked      = qdh_hblock_unblocked,
584    .dhi_prepare_decode = qdh_prepare_decode,
585    .dhi_process_header = qdh_process_header,
586};
587
588
589static enum lsqpack_read_header_status
590qdh_header_read_results (struct qpack_dec_hdl *qdh,
591        struct lsquic_stream *stream, enum lsqpack_read_header_status rhs,
592        const unsigned char *dec_buf, size_t dec_buf_sz)
593{
594    const struct lsqpack_dec_err *qerr;
595    struct uncompressed_headers *uh;
596    void *hset;
597
598    if (rhs == LQRHS_DONE)
599    {
600        if (!lsquic_stream_header_is_trailer(stream))
601        {
602            if (stream->sm_hblock_ctx->ctx.ppc_flags
603                                                & (PPC_INC_SET|PPC_URG_SET))
604            {
605                assert(stream->sm_bflags & SMBF_HTTP_PRIO);
606                LSQ_DEBUG("Apply Priority from headers to stream %"PRIu64,
607                                                                stream->id);
608                (void) lsquic_stream_set_http_prio(stream,
609                                            &stream->sm_hblock_ctx->ctx.ehp);
610            }
611            hset = stream->sm_hblock_ctx->ctx.hset;
612            uh = &stream->sm_hblock_ctx->uh;
613            stream->sm_hblock_ctx = NULL;
614            memset(uh, 0, sizeof(*uh));
615            uh->uh_stream_id = stream->id;
616            uh->uh_oth_stream_id = 0;
617            uh->uh_weight = 0;
618            uh->uh_exclusive = -1;
619            if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if)
620                uh->uh_flags    |= UH_H1H;
621            if (0 != qdh->qdh_enpub->enp_hsi_if
622                                        ->hsi_process_header(hset, NULL))
623            {
624                LSQ_DEBUG("finishing HTTP/1.x hset failed");
625                free(uh);
626                return LQRHS_ERROR;
627            }
628            uh->uh_hset = hset;
629            if (0 == lsquic_stream_uh_in(stream, uh))
630                LSQ_DEBUG("gave hset to stream %"PRIu64, stream->id);
631            else
632            {
633                LSQ_DEBUG("could not give hset to stream %"PRIu64, stream->id);
634                free(uh);
635                return LQRHS_ERROR;
636            }
637        }
638        else
639        {
640            LSQ_DEBUG("discard trailer header set");
641            free(stream->sm_hblock_ctx);
642            stream->sm_hblock_ctx = NULL;
643        }
644        if (qdh->qdh_dec_sm_out)
645        {
646            if (dec_buf_sz
647                && 0 != qdh_write_decoder(qdh, dec_buf, dec_buf_sz))
648            {
649                return LQRHS_ERROR;
650            }
651            if (dec_buf_sz || lsqpack_dec_ici_pending(&qdh->qdh_decoder))
652                lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
653        }
654    }
655    else if (rhs == LQRHS_ERROR)
656    {
657        qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder);
658        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
659            HEC_QPACK_DECOMPRESSION_FAILED, "QPACK decompression error; "
660            "stream %"PRIu64", offset %"PRIu64", line %d", qerr->stream_id,
661            qerr->off, qerr->line);
662    }
663
664    return rhs;
665}
666
667
668enum lsqpack_read_header_status
669lsquic_qdh_header_in_begin (struct qpack_dec_hdl *qdh,
670                        struct lsquic_stream *stream, uint64_t header_size,
671                        const unsigned char **buf, size_t bufsz)
672{
673    enum lsqpack_read_header_status rhs;
674    void *hset;
675    int is_pp;
676    size_t dec_buf_sz;
677    union hblock_ctx *u;
678    unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK];
679
680    assert(!(stream->stream_flags & STREAM_U_READ_DONE));
681
682    if (!(qdh->qdh_flags & QDH_INITIALIZED))
683    {
684        LSQ_WARN("not initialized: cannot process header block");
685        return LQRHS_ERROR;
686    }
687
688    u = malloc(sizeof(*u));
689    if (!u)
690    {
691        LSQ_INFO("cannot allocate hblock_ctx");
692        return LQRHS_ERROR;
693    }
694
695    is_pp = lsquic_stream_header_is_pp(stream);
696    hset = qdh->qdh_enpub->enp_hsi_if->hsi_create_header_set(
697                                          qdh->qdh_hsi_ctx, stream, is_pp);
698    if (!hset)
699    {
700        free(u);
701        LSQ_DEBUG("hsi_create_header_set failure");
702        return LQRHS_ERROR;
703    }
704
705    u->ctx.hset   = hset;
706    u->ctx.qdh    = qdh;
707    u->ctx.ppc_flags = 0;
708    u->ctx.ehp       = (struct lsquic_ext_http_prio) {
709                            .urgency     = LSQUIC_DEF_HTTP_URGENCY,
710                            .incremental = LSQUIC_DEF_HTTP_INCREMENTAL,
711    };
712    stream->sm_hblock_ctx = u;
713
714    if (qdh->qdh_exp_rec)
715    {
716        const lsquic_time_t now = lsquic_time_now();
717        if (0 == qdh->qdh_exp_rec->qer_hblock_count)
718            qdh->qdh_exp_rec->qer_first_req = now;
719        qdh->qdh_exp_rec->qer_last_req = now;
720        ++qdh->qdh_exp_rec->qer_hblock_count;
721        qdh->qdh_exp_rec->qer_hblock_size += bufsz;
722    }
723
724    dec_buf_sz = sizeof(dec_buf);
725    rhs = lsqpack_dec_header_in(&qdh->qdh_decoder, stream, stream->id,
726                    header_size, buf, bufsz, dec_buf, &dec_buf_sz);
727    if (qdh->qdh_exp_rec)
728        qdh->qdh_exp_rec->qer_peer_max_blocked += rhs == LQRHS_BLOCKED;
729    return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz);
730}
731
732
733enum lsqpack_read_header_status
734lsquic_qdh_header_in_continue (struct qpack_dec_hdl *qdh,
735        struct lsquic_stream *stream, const unsigned char **buf, size_t bufsz)
736{
737    enum lsqpack_read_header_status rhs;
738    size_t dec_buf_sz;
739    unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK];
740
741    assert(!(stream->stream_flags & STREAM_U_READ_DONE));
742
743    if (qdh->qdh_flags & QDH_INITIALIZED)
744    {
745        if (qdh->qdh_exp_rec)
746            qdh->qdh_exp_rec->qer_hblock_size += bufsz;
747        dec_buf_sz = sizeof(dec_buf);
748        rhs = lsqpack_dec_header_read(&qdh->qdh_decoder, stream,
749                                    buf, bufsz, dec_buf, &dec_buf_sz);
750        if (qdh->qdh_exp_rec)
751            qdh->qdh_exp_rec->qer_peer_max_blocked += rhs == LQRHS_BLOCKED;
752        return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz);
753    }
754    else
755    {
756        LSQ_WARN("not initialized: cannot process header block");
757        return LQRHS_ERROR;
758    }
759}
760
761
762static void
763lsquic_qdh_unref_stream (struct qpack_dec_hdl *qdh,
764                                                struct lsquic_stream *stream)
765{
766    if (0 == lsqpack_dec_unref_stream(&qdh->qdh_decoder, stream))
767        LSQ_DEBUG("unreffed stream %"PRIu64, stream->id);
768    else
769        LSQ_WARN("cannot unref stream %"PRIu64, stream->id);
770}
771
772
773void
774lsquic_qdh_cancel_stream (struct qpack_dec_hdl *qdh,
775                                                struct lsquic_stream *stream)
776{
777    ssize_t nw;
778    unsigned char buf[LSQPACK_LONGEST_CANCEL];
779
780    if (!qdh->qdh_dec_sm_out)
781        return;
782
783    nw = lsqpack_dec_cancel_stream(&qdh->qdh_decoder, stream, buf, sizeof(buf));
784    if (nw > 0)
785    {
786        if (0 == qdh_write_decoder(qdh, buf, nw))
787            LSQ_DEBUG("cancelled stream %"PRIu64" and wrote %zd-byte Cancel "
788                "Stream instruction to the decoder stream", stream->id, nw);
789    }
790    else if (nw == 0)
791        LSQ_WARN("cannot cancel stream %"PRIu64" -- not found", stream->id);
792    else
793    {
794        LSQ_WARN("cannot cancel stream %"PRIu64" -- not enough buffer space "
795            "to encode Cancel Stream instructin", stream->id);
796        lsquic_qdh_unref_stream(qdh, stream);
797    }
798}
799
800
801void
802lsquic_qdh_cancel_stream_id (struct qpack_dec_hdl *qdh,
803                                                lsquic_stream_id_t stream_id)
804{
805    ssize_t nw;
806    unsigned char buf[LSQPACK_LONGEST_CANCEL];
807
808    if (!qdh->qdh_dec_sm_out)
809        return;
810
811    nw = lsqpack_dec_cancel_stream_id(&qdh->qdh_decoder, stream_id, buf,
812                                                                sizeof(buf));
813    if (nw > 0)
814    {
815        if (0 == qdh_write_decoder(qdh, buf, nw))
816            LSQ_DEBUG("wrote %zd-byte Cancel Stream instruction for "
817                "stream %"PRIu64" to the decoder stream", nw, stream_id);
818    }
819    else if (nw == 0)
820        LSQ_DEBUG("not generating Cancel Stream instruction for "
821            "stream %"PRIu64, stream_id);
822    else
823        LSQ_WARN("cannot generate Cancel Stream instruction for "
824            "stream %"PRIu64" -- not enough buffer space", stream_id);
825}
826
827
828int
829lsquic_qdh_arm_if_unsent (struct qpack_dec_hdl *qdh, void (*func)(void *),
830                                                                    void *ctx)
831{
832    size_t bytes;
833
834    /* Use size of a single frab list buffer as an arbitrary threshold */
835    bytes = lsquic_frab_list_size(&qdh->qdh_fral);
836    if (bytes <= qdh->qdh_fral.fl_buf_size)
837        return 0;
838    else
839    {
840        LSQ_DEBUG("have %zu bytes of unsent QPACK decoder stream data: set "
841            "up callback", bytes);
842        qdh->qdh_on_dec_sent_func = func;
843        qdh->qdh_on_dec_sent_ctx  = ctx;
844        return 1;
845    }
846}
847