lsquic_qenc_hdl.c revision 55613f44
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12
13#include "lsquic.h"
14#include "lsquic_types.h"
15#include "lsquic_int_types.h"
16#include "lsquic_sfcw.h"
17#include "lsquic_varint.h"
18#include "lsquic_hq.h"
19#include "lsquic_hash.h"
20#include "lsquic_stream.h"
21#include "lsquic_frab_list.h"
22#include "lsqpack.h"
23#include "lsxpack_header.h"
24#include "lsquic_conn.h"
25#include "lsquic_qenc_hdl.h"
26
27#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL
28#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn)
29#include "lsquic_logger.h"
30
31
32static int
33qeh_write_type (struct qpack_enc_hdl *qeh)
34{
35    int s;
36
37#ifndef NDEBUG
38    const char *env = getenv("LSQUIC_RND_VARINT_LEN");
39    if (env && atoi(env))
40    {
41        s = rand() & 3;
42        LSQ_DEBUG("writing %d-byte stream type", 1 << s);
43    }
44    else
45#endif
46        s = 0;
47
48    switch (s)
49    {
50    case 0:
51        return lsquic_frab_list_write(&qeh->qeh_fral,
52                                (unsigned char []) { HQUST_QPACK_ENC }, 1);
53    case 1:
54        return lsquic_frab_list_write(&qeh->qeh_fral,
55                            (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2);
56    case 2:
57        return lsquic_frab_list_write(&qeh->qeh_fral,
58                (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4);
59    default:
60        return lsquic_frab_list_write(&qeh->qeh_fral,
61                (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
62                                                        HQUST_QPACK_ENC }, 8);
63    }
64}
65
66
67static void
68qeh_begin_out (struct qpack_enc_hdl *qeh)
69{
70    if (0 == qeh_write_type(qeh)
71        && (qeh->qeh_tsu_sz == 0
72            || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf,
73                                                            qeh->qeh_tsu_sz)))
74    {
75        LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz);
76        lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1);
77    }
78    else
79    {
80        LSQ_WARN("could not write to frab list");
81        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
82                                            "cannot write to frab list");
83    }
84}
85
86
87void
88lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn)
89{
90    assert(!(qeh->qeh_flags & QEH_INITIALIZED));
91    qeh->qeh_conn = conn;
92    lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL);
93    lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn);
94    qeh->qeh_flags |= QEH_INITIALIZED;
95    qeh->qeh_max_prefix_size =
96                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
97    if (qeh->qeh_dec_sm_in)
98        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
99    LSQ_DEBUG("initialized");
100}
101
102
103int
104lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size,
105             unsigned dyn_table_size, unsigned max_risked_streams, int server)
106{
107    enum lsqpack_enc_opts enc_opts;
108
109    assert(qeh->qeh_flags & QEH_INITIALIZED);
110
111    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
112    {
113        LSQ_WARN("settings already set");
114        return -1;
115    }
116
117    enc_opts = LSQPACK_ENC_OPT_STAGE_2
118             | (server ? LSQPACK_ENC_OPT_SERVER : 0);
119    qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf);
120    if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn,
121                max_table_size, dyn_table_size, max_risked_streams, enc_opts,
122                qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz))
123    {
124        LSQ_INFO("could not initialize QPACK encoder");
125        return -1;
126    }
127    LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz);
128    qeh->qeh_flags |= QEH_HAVE_SETTINGS;
129    qeh->qeh_max_prefix_size =
130                        lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder);
131    LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked "
132        "streams=%u", max_table_size, dyn_table_size, max_risked_streams);
133    if (qeh->qeh_enc_sm_out)
134        qeh_begin_out(qeh);
135    return 0;
136}
137
138
139void
140lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh)
141{
142    if (qeh->qeh_flags & QEH_INITIALIZED)
143    {
144        LSQ_DEBUG("cleanup");
145        lsqpack_enc_cleanup(&qeh->qeh_encoder);
146        lsquic_frab_list_cleanup(&qeh->qeh_fral);
147        memset(qeh, 0, sizeof(*qeh));
148    }
149}
150
151static lsquic_stream_ctx_t *
152qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
153{
154    struct qpack_enc_hdl *const qeh = stream_if_ctx;
155    qeh->qeh_enc_sm_out = stream;
156    if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
157                                    == (QEH_INITIALIZED|QEH_HAVE_SETTINGS))
158        qeh_begin_out(qeh);
159    else
160        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
161    LSQ_DEBUG("initialized outgoing encoder stream");
162    return (void *) qeh;
163}
164
165
166static void
167qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
168{
169    struct qpack_enc_hdl *const qeh = (void *) ctx;
170    struct lsquic_reader reader = {
171        .lsqr_read  = lsquic_frab_list_read,
172        .lsqr_size  = lsquic_frab_list_size,
173        .lsqr_ctx   = &qeh->qeh_fral,
174    };
175    ssize_t nw;
176
177    nw = lsquic_stream_writef(stream, &reader);
178    if (nw >= 0)
179    {
180        LSQ_DEBUG("wrote %zd bytes to stream", nw);
181        (void) lsquic_stream_flush(stream);
182        if (lsquic_frab_list_empty(&qeh->qeh_fral))
183            lsquic_stream_wantwrite(stream, 0);
184    }
185    else
186    {
187        qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
188                                            "cannot write to stream");
189        LSQ_WARN("cannot write to stream: %s", strerror(errno));
190        lsquic_stream_wantwrite(stream, 0);
191    }
192}
193
194
195static void
196qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
197{
198    struct qpack_enc_hdl *const qeh = (void *) ctx;
199    qeh->qeh_enc_sm_out = NULL;
200    LSQ_DEBUG("closed outgoing encoder stream");
201}
202
203
204static void
205qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
206{
207    assert(0);
208}
209
210
211static const struct lsquic_stream_if qeh_enc_sm_out_if =
212{
213    .on_new_stream  = qeh_out_on_new,
214    .on_read        = qeh_out_on_read,
215    .on_write       = qeh_out_on_write,
216    .on_close       = qeh_out_on_close,
217};
218const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if =
219                                                    &qeh_enc_sm_out_if;
220
221
222static lsquic_stream_ctx_t *
223qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream)
224{
225    struct qpack_enc_hdl *const qeh = stream_if_ctx;
226    qeh->qeh_dec_sm_in = stream;
227    if (qeh->qeh_flags & QEH_INITIALIZED)
228        lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1);
229    else
230        qeh->qeh_conn = lsquic_stream_conn(stream);   /* Or NULL deref in log */
231    LSQ_DEBUG("initialized incoming decoder stream");
232    return (void *) qeh;
233}
234
235
236static size_t
237qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz,
238                                                                    int fin)
239{
240    struct qpack_enc_hdl *const qeh = (void *) ctx;
241    uint64_t offset;
242    int s;
243
244    if (fin)
245    {
246        LSQ_INFO("decoder stream is closed");
247        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
248            HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream");
249        goto end;
250    }
251
252    offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in);
253    s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz);
254    if (s != 0)
255    {
256        LSQ_INFO("error reading decoder stream");
257        qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
258            HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder "
259            "stream at offset %"PRIu64, offset);
260        goto end;
261    }
262    LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz);
263
264  end:
265    return sz;
266}
267
268
269static void
270qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
271{
272    struct qpack_enc_hdl *const qeh = (void *) ctx;
273    ssize_t nread;
274
275    nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh);
276    if (nread <= 0)
277    {
278        if (nread < 0)
279        {
280            LSQ_WARN("cannot read from encoder stream: %s", strerror(errno));
281            qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn,
282                                        "cannot read from encoder stream");
283        }
284        else
285        {
286            LSQ_INFO("encoder stream closed by peer: abort connection");
287            qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1,
288                HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed");
289        }
290        lsquic_stream_wantread(stream, 0);
291    }
292}
293
294
295static void
296qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
297{
298    struct qpack_enc_hdl *const qeh = (void *) ctx;
299    LSQ_DEBUG("closed incoming decoder stream");
300    qeh->qeh_dec_sm_in = NULL;
301}
302
303
304static void
305qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx)
306{
307    assert(0);
308}
309
310
311static const struct lsquic_stream_if qeh_dec_sm_in_if =
312{
313    .on_new_stream  = qeh_in_on_new,
314    .on_read        = qeh_in_on_read,
315    .on_write       = qeh_in_on_write,
316    .on_close       = qeh_in_on_close,
317};
318const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if =
319                                                    &qeh_dec_sm_in_if;
320
321
322static enum qwh_status
323qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id,
324    unsigned seqno, const struct lsquic_http_headers *headers,
325    unsigned char *buf, size_t *prefix_sz, size_t *headers_sz,
326    uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags)
327{
328    unsigned char *p = buf;
329    unsigned char *const end = buf + *headers_sz;
330    const unsigned char *enc_p;
331    size_t enc_sz, hea_sz, total_enc_sz;
332    ssize_t nw;
333    enum lsqpack_enc_status st;
334    int i, s, write_to_stream;
335    enum lsqpack_enc_flags enc_flags;
336    unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ];
337
338    s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0);
339    if (s != 0)
340    {
341        LSQ_WARN("cannot start header");
342        return QWH_ERR;
343    }
344    LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id);
345
346    if (qeh->qeh_enc_sm_out)
347        enc_flags = 0;
348    else
349    {
350        enc_flags = LQEF_NO_INDEX;
351        LSQ_DEBUG("encoder stream is unavailable, won't index headers");
352    }
353    write_to_stream = qeh->qeh_enc_sm_out
354                                && lsquic_frab_list_empty(&qeh->qeh_fral);
355    total_enc_sz = 0;
356    for (i = 0; i < headers->count; ++i)
357    {
358        if (headers->headers[i].buf == NULL)
359            continue;
360        enc_sz = sizeof(enc_buf);
361        hea_sz = end - p;
362        st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p,
363                                &hea_sz, &headers->headers[i], enc_flags);
364        switch (st)
365        {
366        case LQES_OK:
367            LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, "
368                "%zd bytes to encoder stream",
369                (int) headers->headers[i].name_len,
370                    lsxpack_header_get_name(&headers->headers[i]),
371                (int) headers->headers[i].val_len,
372                    lsxpack_header_get_value(&headers->headers[i]),
373                hea_sz, enc_sz);
374            total_enc_sz += enc_sz;
375            p += hea_sz;
376            if (enc_sz)
377            {
378                if (write_to_stream)
379                {
380                    nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz);
381                    if ((size_t) nw == enc_sz)
382                        break;
383                    if (nw < 0)
384                    {
385                        LSQ_INFO("could not write to encoder stream: %s",
386                                                                strerror(errno));
387                        return QWH_ERR;
388                    }
389                    write_to_stream = 0;
390                    enc_p = enc_buf + (size_t) nw;
391                    enc_sz -= (size_t) nw;
392                }
393                else
394                    enc_p = enc_buf;
395                if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz))
396                {
397                    LSQ_INFO("could not write to frab list");
398                    return QWH_ERR;
399                }
400            }
401            break;
402        case LQES_NOBUF_HEAD:
403            return QWH_ENOBUF;
404        default:
405            assert(0);
406            return QWH_ERR;
407        case LQES_NOBUF_ENC:
408            LSQ_DEBUG("not enough room to write encoder stream data");
409            return QWH_ERR;
410        }
411    }
412
413    nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz,
414                                                        *prefix_sz, hflags);
415    if (nw <= 0)
416    {
417        LSQ_WARN("could not end header: %zd", nw);
418        return QWH_ERR;
419    }
420
421    if ((size_t) nw < *prefix_sz)
422    {
423        memmove(buf - nw, buf - *prefix_sz, (size_t) nw);
424        *prefix_sz = (size_t) nw;
425    }
426    *headers_sz = p - buf;
427    if (lsquic_frab_list_empty(&qeh->qeh_fral))
428    {
429        LSQ_DEBUG("all %zd bytes of encoder stream written out; header block "
430            "is %zd bytes; estimated compression ratio %.3f", total_enc_sz,
431            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
432        return QWH_FULL;
433    }
434    else
435    {
436        *completion_offset = lsquic_qeh_enc_off(qeh)
437                                    + lsquic_frab_list_size(&qeh->qeh_fral);
438        LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes "
439            "buffered; header block is %zd bytes; estimated compression ratio "
440            "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral),
441            *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder));
442        return QWH_PARTIAL;
443    }
444}
445
446
447#if !defined(NDEBUG) && __GNUC__
448__attribute__((weak))
449#endif
450enum qwh_status
451lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh,
452    lsquic_stream_id_t stream_id, unsigned seqno,
453    const struct lsquic_http_headers *headers, unsigned char *buf,
454    size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset,
455    enum lsqpack_enc_header_flags *hflags)
456{
457    if (qeh->qeh_flags & QEH_INITIALIZED)
458        return qeh_write_headers(qeh, stream_id, seqno, headers, buf,
459                        prefix_sz, headers_sz, completion_offset, hflags);
460    else
461        return QWH_ERR;
462}
463
464
465#if !defined(NDEBUG) && __GNUC__
466__attribute__((weak))
467#endif
468uint64_t
469lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh)
470{
471    if (qeh->qeh_enc_sm_out)
472        return qeh->qeh_enc_sm_out->tosend_off;
473    else
474        return 0;
475}
476
477
478size_t
479lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh)
480{
481    if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out)
482        return lsquic_stream_write_avail(qeh->qeh_enc_sm_out);
483    else if (qeh->qeh_flags & QEH_INITIALIZED)
484        return ~((size_t) 0);   /* Unlimited write */
485    else
486        return 0;
487}
488
489
490size_t
491lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh)
492{
493    if (qeh->qeh_flags & QEH_HAVE_SETTINGS)
494        return qeh->qeh_max_prefix_size;
495    else
496        return LSQPACK_UINT64_ENC_SZ * 2;
497}
498