lsquic_qdec_hdl.c revision 8dc2321b
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qdec_hdl.c -- QPACK decoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#include "lsquic.h"
14#include "lsquic_types.h"
15#include "lsxpack_header.h"
16#include "lsquic_int_types.h"
17#include "lsquic_sfcw.h"
18#include "lsquic_varint.h"
19#include "lsquic_hq.h"
20#include "lsquic_hash.h"
21#include "lsquic_stream.h"
22#include "lsquic_frab_list.h"
23#include "lsqpack.h"
24#include "lsquic_http1x_if.h"
25#include "lsquic_qdec_hdl.h"
26#include "lsquic_mm.h"
27#include "lsquic_engine_public.h"
28#include "lsquic_headers.h"
29#include "lsquic_conn.h"
30
31#define LSQUIC_LOGGER_MODULE LSQLM_QDEC_HDL
32#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qdh->qdh_conn)
33#include "lsquic_logger.h"
34
35static const struct lsqpack_dec_hset_if dhi_if;
36
37
38struct header_ctx
39{
40    void                    *hset;
41    struct qpack_dec_hdl    *qdh;
42};
43
44
45/* We need to allocate struct uncompressed_headers anyway when header set
46 * is complete and we give it to the stream using lsquic_stream_uh_in().
47 * To save a malloc, we reuse context after we're done with it.
48 */
49union hblock_ctx
50{
51    struct header_ctx           ctx;
52    struct uncompressed_headers uh;
53};
54
55
56static int
57qdh_write_decoder (struct qpack_dec_hdl *qdh, const unsigned char *buf,
58                                                                size_t sz)
59{
60    ssize_t nw;
61
62    if (!(qdh->qdh_dec_sm_out && lsquic_frab_list_empty(&qdh->qdh_fral)))
63    {
64  write_to_frab:
65        if (0 == lsquic_frab_list_write(&qdh->qdh_fral,
66                                                (unsigned char *) buf, sz))
67        {
68            LSQ_DEBUG("wrote %zu bytes to frab list", sz);
69            lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
70            return 0;
71        }
72        else
73        {
74            LSQ_INFO("error writing to frab list");
75            return -1;
76        }
77    }
78
79    nw = lsquic_stream_write(qdh->qdh_dec_sm_out, buf, sz);
80    if (nw < 0)
81    {
82        LSQ_INFO("error writing to outgoing QPACK decoder stream: %s",
83                                                        strerror(errno));
84        return -1;
85    }
86    LSQ_DEBUG("wrote %zd bytes to outgoing QPACK decoder stream", nw);
87
88    if ((size_t) nw == sz)
89        return 0;
90
91    buf = buf + nw;
92    sz -= (size_t) nw;
93    goto write_to_frab;
94}
95
96
97static int
98qdh_write_type (struct qpack_dec_hdl *qdh)
99{
100    int s;
101
102#ifndef NDEBUG
103    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
104    if (env && atoi(env))
105    {
106        s = rand() & 3;
107        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
108    }
109    else
110#endif
111        s = 0;
112
113    switch (s)
114    {
115    case 0:
116        return qdh_write_decoder(qdh,
117                                (unsigned char []) { HQUST_QPACK_DEC }, 1);
118    case 1:
119        return qdh_write_decoder(qdh,
120                            (unsigned char []) { 0x40, HQUST_QPACK_DEC }, 2);
121    case 2:
122        return qdh_write_decoder(qdh,
123                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_DEC }, 4);
124    default:
125        return qdh_write_decoder(qdh,
126                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
127                                                        HQUST_QPACK_DEC }, 8);
128    }
129}
130
131
132static void
133qdh_begin_out (struct qpack_dec_hdl *qdh)
134{
135    if (0 != qdh_write_type(qdh))
136    {
137        LSQ_WARN("%s: could not write to decoder", __func__);
138        qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
139                                        "cannot write to decoder stream");
140    }
141}
142
143
144int
145lsquic_qdh_init (struct qpack_dec_hdl *qdh, struct lsquic_conn *conn,
146                    int is_server, const struct lsquic_engine_public *enpub,
147                    unsigned dyn_table_size, unsigned max_risked_streams)
148{
149    enum lsqpack_dec_opts dec_opts;
150
151    dec_opts = 0;
152    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HTTP1X)
153        dec_opts |= LSQPACK_DEC_OPT_HTTP1X;
154    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAME)
155        dec_opts |= LSQPACK_DEC_OPT_HASH_NAME;
156    if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAMEVAL)
157        dec_opts |= LSQPACK_DEC_OPT_HASH_NAMEVAL;
158
159    qdh->qdh_conn = conn;
160    lsquic_frab_list_init(&qdh->qdh_fral, 0x400, NULL, NULL, NULL);
161    lsqpack_dec_init(&qdh->qdh_decoder, (void *) conn, dyn_table_size,
162                        max_risked_streams, &dhi_if, dec_opts);
163    qdh->qdh_flags |= QDH_INITIALIZED;
164    qdh->qdh_enpub = enpub;
165    if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if)
166    {
167        qdh->qdh_h1x_ctor_ctx = (struct http1x_ctor_ctx) {
168            .conn           = conn,
169            .max_headers_sz = MAX_HTTP1X_HEADERS_SIZE,
170            .is_server      = is_server,
171        };
172        qdh->qdh_hsi_ctx = &qdh->qdh_h1x_ctor_ctx;
173    }
174    else
175        qdh->qdh_hsi_ctx = qdh->qdh_enpub->enp_hsi_ctx;
176    if (qdh->qdh_dec_sm_out)
177        qdh_begin_out(qdh);
178    if (qdh->qdh_enc_sm_in)
179        lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1);
180    LSQ_DEBUG("initialized");
181    return 0;
182}
183
184
185void
186lsquic_qdh_cleanup (struct qpack_dec_hdl *qdh)
187{
188    if (qdh->qdh_flags & QDH_INITIALIZED)
189    {
190        LSQ_DEBUG("cleanup");
191        lsqpack_dec_cleanup(&qdh->qdh_decoder);
192        lsquic_frab_list_cleanup(&qdh->qdh_fral);
193        qdh->qdh_flags &= ~QDH_INITIALIZED;
194    }
195}
196
197static lsquic_stream_ctx_t *
198qdh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
199{
200    struct qpack_dec_hdl *const qdh = stream_if_ctx;
201    qdh->qdh_dec_sm_out = stream;
202    if (qdh->qdh_flags & QDH_INITIALIZED)
203        qdh_begin_out(qdh);
204    LSQ_DEBUG("initialized outgoing decoder stream");
205    return (void *) qdh;
206}
207
208
209static void
210qdh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
211{
212    struct qpack_dec_hdl *const qdh = (void *) ctx;
213    struct lsquic_reader reader;
214    ssize_t nw;
215    unsigned char buf[LSQPACK_LONGEST_ICI];
216
217    if (lsqpack_dec_ici_pending(&qdh->qdh_decoder))
218    {
219        nw = lsqpack_dec_write_ici(&qdh->qdh_decoder, buf, sizeof(buf));
220        if (nw > 0)
221        {
222            if (0 == qdh_write_decoder(qdh, buf, nw))
223                LSQ_DEBUG("wrote %zd-byte TSS instruction", nw);
224            else
225                goto err;
226        }
227        else if (nw < 0)
228        {
229            LSQ_WARN("could not generate TSS instruction");
230            goto err;
231        }
232    }
233
234    if (lsquic_frab_list_empty(&qdh->qdh_fral))
235    {
236        LSQ_DEBUG("%s: nothing to write", __func__);
237        lsquic_stream_wantwrite(stream, 0);
238        return;
239    }
240
241    reader = (struct lsquic_reader) {
242        .lsqr_read  = lsquic_frab_list_read,
243        .lsqr_size  = lsquic_frab_list_size,
244        .lsqr_ctx   = &qdh->qdh_fral,
245    };
246
247    nw = lsquic_stream_writef(stream, &reader);
248    if (nw >= 0)
249    {
250        LSQ_DEBUG("wrote %zd bytes to stream", nw);
251        (void) lsquic_stream_flush(stream);
252        if (lsquic_frab_list_empty(&qdh->qdh_fral))
253        {
254            lsquic_stream_wantwrite(stream, 0);
255            if (qdh->qdh_on_dec_sent_func)
256            {
257                LSQ_DEBUG("buffered data written: call callback");
258                qdh->qdh_on_dec_sent_func(qdh->qdh_on_dec_sent_ctx);
259                qdh->qdh_on_dec_sent_func = NULL;
260                qdh->qdh_on_dec_sent_ctx = NULL;
261            }
262        }
263    }
264    else
265    {
266        LSQ_WARN("cannot write to stream: %s", strerror(errno));
267  err:
268        lsquic_stream_wantwrite(stream, 0);
269        qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
270                                        "cannot write to stream");
271    }
272}
273
274
275static void
276qdh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
277{
278    struct qpack_dec_hdl *const qdh = (void *) ctx;
279    qdh->qdh_dec_sm_out = NULL;
280    LSQ_DEBUG("closed outgoing decoder stream");
281}
282
283
284static void
285qdh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
286{
287    assert(0);
288}
289
290
291static const struct lsquic_stream_if qdh_dec_sm_out_if =
292{
293    .on_new_stream  = qdh_out_on_new,
294    .on_read        = qdh_out_on_read,
295    .on_write       = qdh_out_on_write,
296    .on_close       = qdh_out_on_close,
297};
298const struct lsquic_stream_if *const lsquic_qdh_dec_sm_out_if =
299                                                    &qdh_dec_sm_out_if;
300
301
302static lsquic_stream_ctx_t *
303qdh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
304{
305    struct qpack_dec_hdl *const qdh = stream_if_ctx;
306    qdh->qdh_enc_sm_in = stream;
307    if (qdh->qdh_flags & QDH_INITIALIZED)
308        lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1);
309    LSQ_DEBUG("initialized incoming encoder stream");
310    return (void *) qdh;
311}
312
313
314static size_t
315qdh_read_encoder_stream (void *ctx, const unsigned char *buf, size_t sz,
316                                                                    int fin)
317{
318    struct qpack_dec_hdl *const qdh = (void *) ctx;
319    const struct lsqpack_dec_err *qerr;
320    int s;
321
322    if (fin)
323    {
324        LSQ_INFO("encoder stream is closed");
325        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
326            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK encoder stream");
327        goto end;
328    }
329
330    s = lsqpack_dec_enc_in(&qdh->qdh_decoder, buf, sz);
331    if (s != 0)
332    {
333        LSQ_INFO("error reading encoder stream");
334        qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder);
335        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
336            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK encoder "
337            "stream; offset %"PRIu64", line %d", qerr->off, qerr->line);
338        goto end;
339    }
340    if (qdh->qdh_dec_sm_out
341                    && lsqpack_dec_ici_pending(&qdh->qdh_decoder))
342        lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
343
344    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
345
346  end:
347    return sz;
348}
349
350
351static void
352qdh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
353{
354    struct qpack_dec_hdl *const qdh = (void *) ctx;
355    ssize_t nread;
356
357    nread = lsquic_stream_readf(stream, qdh_read_encoder_stream, qdh);
358    if (nread <= 0)
359    {
360        if (nread < 0)
361        {
362            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
363            qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn,
364                                        "cannot read from encoder stream");
365        }
366        else
367        {
368            LSQ_INFO("encoder stream closed by peer: abort connection");
369            qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
370                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
371        }
372        lsquic_stream_wantread(stream, 0);
373    }
374}
375
376
377static void
378qdh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
379{
380    struct qpack_dec_hdl *const qdh = (void *) ctx;
381    LSQ_DEBUG("closed incoming encoder stream");
382    qdh->qdh_enc_sm_in = NULL;
383}
384
385
386static void
387qdh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
388{
389    assert(0);
390}
391
392
393static const struct lsquic_stream_if qdh_enc_sm_in_if =
394{
395    .on_new_stream  = qdh_in_on_new,
396    .on_read        = qdh_in_on_read,
397    .on_write       = qdh_in_on_write,
398    .on_close       = qdh_in_on_close,
399};
400const struct lsquic_stream_if *const lsquic_qdh_enc_sm_in_if =
401                                                    &qdh_enc_sm_in_if;
402
403
404static void
405qdh_hblock_unblocked (void *stream_p)
406{
407    struct lsquic_stream *const stream = stream_p;
408    union hblock_ctx *const u = stream->sm_hblock_ctx;
409    struct qpack_dec_hdl *qdh = u->ctx.qdh;
410
411    LSQ_DEBUG("header block for stream %"PRIu64" unblocked", stream->id);
412    lsquic_stream_qdec_unblocked(stream);
413}
414
415
416struct cont_len
417{
418    unsigned long long      value;
419    int                     has;    /* 1: set, 0: not set, -1: invalid */
420};
421
422
423static void
424process_content_length (const struct qpack_dec_hdl *qdh /* for logging */,
425            struct cont_len *cl, const char *val /* not NUL-terminated */,
426                                                                unsigned len)
427{
428    char *endcl, cont_len_buf[30];
429
430    if (0 == cl->has)
431    {
432        if (len >= sizeof(cont_len_buf))
433        {
434            LSQ_DEBUG("content-length has invalid value `%.*s'",
435                                                            (int) len, val);
436            cl->has = -1;
437            return;
438        }
439        memcpy(cont_len_buf, val, len);
440        cont_len_buf[len] = '\0';
441        cl->value = strtoull(cont_len_buf, &endcl, 10);
442        if (*endcl == '\0' && !(ULLONG_MAX == cl->value && ERANGE == errno))
443        {
444            cl->has = 1;
445            LSQ_DEBUG("content length is %llu", cl->value);
446        }
447        else
448        {
449            cl->has = -1;
450            LSQ_DEBUG("content-length has invalid value `%.*s'",
451                (int) len, val);
452        }
453    }
454    else if (cl->has > 0)
455    {
456        LSQ_DEBUG("header set has two content-length: ambiguous, "
457            "turn off checking");
458        cl->has = -1;
459    }
460}
461
462
463static int
464is_content_length (const struct lsxpack_header *xhdr)
465{
466    return ((xhdr->flags & LSXPACK_QPACK_IDX)
467                        && xhdr->qpack_index == LSQPACK_TNV_CONTENT_LENGTH_0)
468        || (xhdr->name_len == 14 && 0 == memcmp(lsxpack_header_get_name(xhdr),
469                                                        "content-length", 13))
470        ;
471}
472
473
474static struct lsxpack_header *
475qdh_prepare_decode (void *stream_p, struct lsxpack_header *xhdr, size_t space)
476{
477    struct lsquic_stream *const stream = stream_p;
478    union hblock_ctx *const u = stream->sm_hblock_ctx;
479    struct qpack_dec_hdl *const qdh = u->ctx.qdh;
480
481    return qdh->qdh_enpub->enp_hsi_if->hsi_prepare_decode(
482                                                u->ctx.hset, xhdr, space);
483}
484
485
486static int
487qdh_process_header (void *stream_p, struct lsxpack_header *xhdr)
488{
489    struct lsquic_stream *const stream = stream_p;
490    union hblock_ctx *const u = stream->sm_hblock_ctx;
491    struct qpack_dec_hdl *const qdh = u->ctx.qdh;
492    struct cont_len cl;
493
494    if (is_content_length(xhdr))
495    {
496        cl.has = 0;
497        process_content_length(qdh, &cl, lsxpack_header_get_value(xhdr),
498                                                            xhdr->val_len);
499        if (cl.has > 0)
500            (void) lsquic_stream_verify_len(stream, cl.value);
501    }
502
503    return qdh->qdh_enpub->enp_hsi_if->hsi_process_header(u->ctx.hset, xhdr);
504}
505
506
507static const struct lsqpack_dec_hset_if dhi_if =
508{
509    .dhi_unblocked      = qdh_hblock_unblocked,
510    .dhi_prepare_decode = qdh_prepare_decode,
511    .dhi_process_header = qdh_process_header,
512};
513
514
515static enum lsqpack_read_header_status
516qdh_header_read_results (struct qpack_dec_hdl *qdh,
517        struct lsquic_stream *stream, enum lsqpack_read_header_status rhs,
518        const unsigned char *dec_buf, size_t dec_buf_sz)
519{
520    const struct lsqpack_dec_err *qerr;
521    struct uncompressed_headers *uh;
522    void *hset;
523
524    if (rhs == LQRHS_DONE)
525    {
526        if (!lsquic_stream_header_is_trailer(stream))
527        {
528            hset = stream->sm_hblock_ctx->ctx.hset;
529            uh = &stream->sm_hblock_ctx->uh;
530            stream->sm_hblock_ctx = NULL;
531            memset(uh, 0, sizeof(*uh));
532            uh->uh_stream_id = stream->id;
533            uh->uh_oth_stream_id = 0;
534            uh->uh_weight = 0;
535            uh->uh_exclusive = -1;
536            if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if)
537                uh->uh_flags    |= UH_H1H;
538            if (0 != qdh->qdh_enpub->enp_hsi_if
539                                        ->hsi_process_header(hset, NULL))
540            {
541                LSQ_DEBUG("finishing HTTP/1.x hset failed");
542                free(uh);
543                return LQRHS_ERROR;
544            }
545            uh->uh_hset = hset;
546            if (0 == lsquic_stream_uh_in(stream, uh))
547                LSQ_DEBUG("gave hset to stream %"PRIu64, stream->id);
548            else
549            {
550                LSQ_DEBUG("could not give hset to stream %"PRIu64, stream->id);
551                free(uh);
552                return LQRHS_ERROR;
553            }
554        }
555        else
556        {
557            LSQ_DEBUG("discard trailer header set");
558            free(stream->sm_hblock_ctx);
559            stream->sm_hblock_ctx = NULL;
560        }
561        if (qdh->qdh_dec_sm_out)
562        {
563            if (dec_buf_sz
564                && 0 != qdh_write_decoder(qdh, dec_buf, dec_buf_sz))
565            {
566                return LQRHS_ERROR;
567            }
568            if (dec_buf_sz || lsqpack_dec_ici_pending(&qdh->qdh_decoder))
569                lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1);
570        }
571    }
572    else if (rhs == LQRHS_ERROR)
573    {
574        qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder);
575        qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1,
576            HEC_QPACK_DECOMPRESSION_FAILED, "QPACK decompression error; "
577            "stream %"PRIu64", offset %"PRIu64", line %d", qerr->stream_id,
578            qerr->off, qerr->line);
579    }
580
581    return rhs;
582}
583
584
585enum lsqpack_read_header_status
586lsquic_qdh_header_in_begin (struct qpack_dec_hdl *qdh,
587                        struct lsquic_stream *stream, uint64_t header_size,
588                        const unsigned char **buf, size_t bufsz)
589{
590    enum lsqpack_read_header_status rhs;
591    void *hset;
592    int is_pp;
593    size_t dec_buf_sz;
594    union hblock_ctx *u;
595    unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK];
596
597    assert(!(stream->stream_flags & STREAM_U_READ_DONE));
598
599    if (!(qdh->qdh_flags & QDH_INITIALIZED))
600    {
601        LSQ_WARN("not initialized: cannot process header block");
602        return LQRHS_ERROR;
603    }
604
605    u = malloc(sizeof(*u));
606    if (!u)
607    {
608        LSQ_INFO("cannot allocate hblock_ctx");
609        return LQRHS_ERROR;
610    }
611
612    is_pp = lsquic_stream_header_is_pp(stream);
613    hset = qdh->qdh_enpub->enp_hsi_if->hsi_create_header_set(
614                                          qdh->qdh_hsi_ctx, stream, is_pp);
615    if (!hset)
616    {
617        free(u);
618        LSQ_DEBUG("hsi_create_header_set failure");
619        return LQRHS_ERROR;
620    }
621
622    u->ctx.hset   = hset;
623    u->ctx.qdh    = qdh;
624    stream->sm_hblock_ctx = u;
625
626    dec_buf_sz = sizeof(dec_buf);
627    rhs = lsqpack_dec_header_in(&qdh->qdh_decoder, stream, stream->id,
628                    header_size, buf, bufsz, dec_buf, &dec_buf_sz);
629    return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz);
630}
631
632
633enum lsqpack_read_header_status
634lsquic_qdh_header_in_continue (struct qpack_dec_hdl *qdh,
635        struct lsquic_stream *stream, const unsigned char **buf, size_t bufsz)
636{
637    enum lsqpack_read_header_status rhs;
638    size_t dec_buf_sz;
639    unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK];
640
641    assert(!(stream->stream_flags & STREAM_U_READ_DONE));
642
643    if (qdh->qdh_flags & QDH_INITIALIZED)
644    {
645        dec_buf_sz = sizeof(dec_buf);
646        rhs = lsqpack_dec_header_read(&qdh->qdh_decoder, stream,
647                                    buf, bufsz, dec_buf, &dec_buf_sz);
648        return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz);
649    }
650    else
651    {
652        LSQ_WARN("not initialized: cannot process header block");
653        return LQRHS_ERROR;
654    }
655}
656
657
658static void
659lsquic_qdh_unref_stream (struct qpack_dec_hdl *qdh,
660                                                struct lsquic_stream *stream)
661{
662    if (0 == lsqpack_dec_unref_stream(&qdh->qdh_decoder, stream))
663        LSQ_DEBUG("unreffed stream %"PRIu64, stream->id);
664    else
665        LSQ_WARN("cannot unref stream %"PRIu64, stream->id);
666}
667
668
669void
670lsquic_qdh_cancel_stream (struct qpack_dec_hdl *qdh,
671                                                struct lsquic_stream *stream)
672{
673    ssize_t nw;
674    unsigned char buf[LSQPACK_LONGEST_CANCEL];
675
676    if (!qdh->qdh_dec_sm_out)
677        return;
678
679    nw = lsqpack_dec_cancel_stream(&qdh->qdh_decoder, stream, buf, sizeof(buf));
680    if (nw > 0)
681    {
682        if (0 == qdh_write_decoder(qdh, buf, nw))
683            LSQ_DEBUG("cancelled stream %"PRIu64" and wrote %zd-byte Cancel "
684                "Stream instruction to the decoder stream", stream->id, nw);
685    }
686    else if (nw == 0)
687        LSQ_WARN("cannot cancel stream %"PRIu64" -- not found", stream->id);
688    else
689    {
690        LSQ_WARN("cannot cancel stream %"PRIu64" -- not enough buffer space "
691            "to encode Cancel Stream instructin", stream->id);
692        lsquic_qdh_unref_stream(qdh, stream);
693    }
694}
695
696
697void
698lsquic_qdh_cancel_stream_id (struct qpack_dec_hdl *qdh,
699                                                lsquic_stream_id_t stream_id)
700{
701    ssize_t nw;
702    unsigned char buf[LSQPACK_LONGEST_CANCEL];
703
704    if (!qdh->qdh_dec_sm_out)
705        return;
706
707    nw = lsqpack_dec_cancel_stream_id(&qdh->qdh_decoder, stream_id, buf,
708                                                                sizeof(buf));
709    if (nw > 0)
710    {
711        if (0 == qdh_write_decoder(qdh, buf, nw))
712            LSQ_DEBUG("wrote %zd-byte Cancel Stream instruction for "
713                "stream %"PRIu64" to the decoder stream", nw, stream_id);
714    }
715    else if (nw == 0)
716        LSQ_DEBUG("not generating Cancel Stream instruction for "
717            "stream %"PRIu64, stream_id);
718    else
719        LSQ_WARN("cannot generate Cancel Stream instruction for "
720            "stream %"PRIu64" -- not enough buffer space", stream_id);
721}
722
723
724int
725lsquic_qdh_arm_if_unsent (struct qpack_dec_hdl *qdh, void (*func)(void *),
726                                                                    void *ctx)
727{
728    size_t bytes;
729
730    /* Use size of a single frab list buffer as an arbitrary threshold */
731    bytes = lsquic_frab_list_size(&qdh->qdh_fral);
732    if (bytes <= qdh->qdh_fral.fl_buf_size)
733        return 0;
734    else
735    {
736        LSQ_DEBUG("have %zu bytes of unsent QPACK decoder stream data: set "
737            "up callback", bytes);
738        qdh->qdh_on_dec_sent_func = func;
739        qdh->qdh_on_dec_sent_ctx  = ctx;
740        return 1;
741    }
742}
743