lsquic_di_nocopy.c revision 0a4f8953
1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_di_nocopy.c -- The "no-copy" data in stream.
4 *
5 * Data from packets is not copied: the packets are referenced by stream
6 * frames.  When all data from stream frame is read, the frame is released
7 * and packet reference count is decremented, which possibly results in
8 * packet being released as well.
9 *
10 * This approach works well in regular circumstances; there are two scenarios
11 * when it does not:
12 *
13 *  A.  If we have many out-of-order frames, insertion into the list becomes
14 *      expensive.  In the degenerate case, we'd have to traverse the whole
15 *      list to find appropriate position.
16 *
17 *  B.  Having many frames ties up resources, as each frame keeps a reference
18 *      to the packet that contains it.  This is a possible attack vector:
19 *      send many one-byte packets; a single hole at the beginning will stop
20 *      the server from being able to read the stream, thus tying up resources.
21 *
22 * If we detect that either (A) or (B) is true, we request that the stream
23 * switch to a more robust incoming stream frame handler by setting
24 * DI_SWITCH_IMPL flag.
25 *
26 * For a small number of elements, (A) and (B) do not matter and the checks
27 * are not performed.  This number is defined by EFF_CHECK_THRESH_LOW.  On
28 * the other side of the spectrum, if the number of frames grows very high,
29 * we want to switch to a more memory-efficient implementation even if (A)
30 * and (B) are not true.  EFF_CHECK_THRESH_HIGH defines this threshold.
31 *
32 * Between the low and high thresholds, we detect efficiency problems as
33 * follows.
34 *
35 * To detect (A), we count how many elements we have to traverse during
36 * insertion.  If we have to traverse at least half the list
37 * EFF_FAR_TRAVERSE_COUNT in a row, DI_SWITCH_IMPL is issued.
38 *
39 * If average stream frame size is smaller than EFF_TINY_FRAME_SZ bytes,
40 * (B) condition is true.  In addition, if there are more than EFF_MAX_HOLES
41 * in the stream, this is also indicative of (B).
42 */
43
44
45#include <assert.h>
46#include <inttypes.h>
47#include <stddef.h>
48#include <stdint.h>
49#include <stdlib.h>
50#include <sys/queue.h>
51
52#include "lsquic.h"
53#include "lsquic_types.h"
54#include "lsquic_int_types.h"
55#include "lsquic_conn_flow.h"
56#include "lsquic_packet_common.h"
57#include "lsquic_packet_in.h"
58#include "lsquic_rtt.h"
59#include "lsquic_sfcw.h"
60#include "lsquic_varint.h"
61#include "lsquic_hq.h"
62#include "lsquic_hash.h"
63#include "lsquic_stream.h"
64#include "lsquic_mm.h"
65#include "lsquic_malo.h"
66#include "lsquic_conn.h"
67#include "lsquic_conn_public.h"
68#include "lsquic_data_in_if.h"
69
70
71#define LSQUIC_LOGGER_MODULE LSQLM_DI
72#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(ncdi->ncdi_conn_pub->lconn)
73#define LSQUIC_LOG_STREAM_ID ncdi->ncdi_stream_id
74#include "lsquic_logger.h"
75
76
77/* If number of elements is at or below this number, we do not bother to check
78 * efficiency conditions.
79 */
80#define EFF_CHECK_THRESH_LOW    10
81
82/* If number of elements is higher than this number, efficiency alert
83 * is issued unconditionally.
84 */
85#define EFF_CHECK_THRESH_HIGH   1000
86
87/* Maximum number of consecutive far traversals */
88#define EFF_FAR_TRAVERSE_COUNT  4
89
90/* Maximum number of holes that is not deemed suspicious */
91#define EFF_MAX_HOLES           5
92
93/* What is deemed a tiny frame, in bytes.  If it is a power of two, calculation
94 * is cheaper.
95 */
96#define EFF_TINY_FRAME_SZ       64
97
98
99TAILQ_HEAD(stream_frames_tailq, stream_frame);
100
101
102struct nocopy_data_in
103{
104    struct stream_frames_tailq  ncdi_frames_in;
105    struct data_in              ncdi_data_in;
106    struct lsquic_conn_public  *ncdi_conn_pub;
107    uint64_t                    ncdi_byteage;
108    uint64_t                    ncdi_fin_off;
109    lsquic_stream_id_t          ncdi_stream_id;
110    unsigned                    ncdi_n_frames;
111    unsigned                    ncdi_n_holes;
112    unsigned                    ncdi_cons_far;
113    enum {
114        NCDI_FIN_SET        = 1 << 0,
115        NCDI_FIN_REACHED    = 1 << 1,
116    }                           ncdi_flags;
117};
118
119
120#define NCDI_PTR(data_in) (struct nocopy_data_in *) \
121    ((unsigned char *) (data_in) - offsetof(struct nocopy_data_in, ncdi_data_in))
122
123#define STREAM_FRAME_PTR(data_frame) (struct stream_frame *) \
124    ((unsigned char *) (data_frame) - offsetof(struct stream_frame, data_frame))
125
126
127static const struct data_in_iface *di_if_nocopy_ptr;
128
129
130struct data_in *
131lsquic_data_in_nocopy_new (struct lsquic_conn_public *conn_pub,
132                                                lsquic_stream_id_t stream_id)
133{
134    struct nocopy_data_in *ncdi;
135
136    ncdi = malloc(sizeof(*ncdi));
137    if (!ncdi)
138        return NULL;
139
140    TAILQ_INIT(&ncdi->ncdi_frames_in);
141    ncdi->ncdi_data_in.di_if    = di_if_nocopy_ptr;
142    ncdi->ncdi_data_in.di_flags = 0;
143    ncdi->ncdi_conn_pub         = conn_pub;
144    ncdi->ncdi_stream_id        = stream_id;
145    ncdi->ncdi_byteage          = 0;
146    ncdi->ncdi_n_frames         = 0;
147    ncdi->ncdi_n_holes          = 0;
148    ncdi->ncdi_cons_far         = 0;
149    ncdi->ncdi_fin_off          = 0;
150    ncdi->ncdi_flags            = 0;
151    LSQ_DEBUG("initialized");
152    return &ncdi->ncdi_data_in;
153}
154
155
156static void
157nocopy_di_destroy (struct data_in *data_in)
158{
159    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
160    stream_frame_t *frame;
161    while ((frame = TAILQ_FIRST(&ncdi->ncdi_frames_in)))
162    {
163        TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame);
164        lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in);
165        lsquic_malo_put(frame);
166    }
167    free(ncdi);
168}
169
170
171#if LSQUIC_EXTRA_CHECKS
172static int
173frame_list_is_sane (const struct nocopy_data_in *ncdi)
174{
175    const stream_frame_t *frame;
176    uint64_t prev_off = 0, prev_end = 0;
177    int ordered = 1, overlaps = 0;
178    TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame)
179    {
180        ordered &= prev_off <= DF_OFF(frame);
181        overlaps |= prev_end > DF_OFF(frame);
182        prev_off = DF_OFF(frame);
183        prev_end = DF_END(frame);
184    }
185    return ordered && !overlaps;
186}
187#define CHECK_ORDER(ncdi) assert(frame_list_is_sane(ncdi))
188#else
189#define CHECK_ORDER(ncdi)
190#endif
191
192
193#define CASE(letter) ((int) (letter) << 8)
194
195/* Not all errors are picked up by this function, as it is expensive (and
196 * potentially error-prone) to check for all possible error conditions.
197 * It an error be misclassified as an overlap or dup, in the worst case
198 * we end up with an application error instead of protocol violation.
199 */
200static int
201insert_frame (struct nocopy_data_in *ncdi, struct stream_frame *new_frame,
202                                    uint64_t read_offset, unsigned *p_n_frames)
203{
204    stream_frame_t *prev_frame, *next_frame;
205    unsigned count;
206
207    if (read_offset > DF_END(new_frame))
208    {
209        if (DF_FIN(new_frame))
210            return INS_FRAME_ERR                                | CASE('A');
211        else
212            return INS_FRAME_DUP                                | CASE('B');
213    }
214
215    if (ncdi->ncdi_flags & NCDI_FIN_SET)
216    {
217        if (DF_FIN(new_frame) && DF_END(new_frame) != ncdi->ncdi_fin_off)
218            return INS_FRAME_ERR                                | CASE('C');
219        if (DF_END(new_frame) > ncdi->ncdi_fin_off)
220            return INS_FRAME_ERR                                | CASE('D');
221        if (read_offset == DF_END(new_frame))
222            return INS_FRAME_DUP                                | CASE('M');
223    }
224    else
225    {
226        if (read_offset == DF_END(new_frame) && !DF_FIN(new_frame))
227            return INS_FRAME_DUP                                | CASE('L');
228    }
229
230    /* Find position in the list, going backwards.  We go backwards because
231     * that is the most likely scenario.
232     */
233    next_frame = TAILQ_LAST(&ncdi->ncdi_frames_in, stream_frames_tailq);
234    if (next_frame && DF_OFF(new_frame) < DF_OFF(next_frame))
235    {
236        count = 1;
237        prev_frame = TAILQ_PREV(next_frame, stream_frames_tailq, next_frame);
238        for ( ; prev_frame && DF_OFF(new_frame) < DF_OFF(next_frame);
239                next_frame = prev_frame,
240                    prev_frame = TAILQ_PREV(prev_frame, stream_frames_tailq, next_frame))
241        {
242            if (DF_OFF(new_frame) >= DF_OFF(prev_frame))
243                break;
244            ++count;
245        }
246    }
247    else
248    {
249        count = 0;
250        prev_frame = NULL;
251    }
252
253    if (!prev_frame && next_frame && DF_OFF(new_frame) >= DF_OFF(next_frame))
254    {
255        prev_frame = next_frame;
256        next_frame = TAILQ_NEXT(next_frame, next_frame);
257    }
258
259    const int select = !!prev_frame << 1 | !!next_frame;
260    switch (select)
261    {
262    default:    /* No neighbors */
263        if (read_offset == DF_END(new_frame))
264        {
265            if (DF_SIZE(new_frame))
266            {
267                if (DF_FIN(new_frame)
268                    && !((ncdi->ncdi_flags & NCDI_FIN_REACHED)
269                            && read_offset == ncdi->ncdi_fin_off))
270                    return INS_FRAME_OVERLAP                    | CASE('E');
271                else
272                    return INS_FRAME_DUP                        | CASE('F');
273            }
274            else if (!DF_FIN(new_frame)
275                     || ((ncdi->ncdi_flags & NCDI_FIN_REACHED)
276                         && read_offset == ncdi->ncdi_fin_off))
277                return INS_FRAME_DUP                            | CASE('G');
278        }
279        else if (read_offset > DF_OFF(new_frame))
280            return INS_FRAME_OVERLAP                            | CASE('N');
281        goto list_was_empty;
282    case 3:     /* Both left and right neighbors */
283    case 2:     /* Only left neighbor (prev_frame) */
284        if (DF_OFF(prev_frame) == DF_OFF(new_frame)
285            && DF_SIZE(prev_frame) == DF_SIZE(new_frame))
286        {
287            if (!DF_FIN(prev_frame) && DF_FIN(new_frame))
288                return INS_FRAME_OVERLAP                        | CASE('H');
289            else
290                return INS_FRAME_DUP                            | CASE('I');
291        }
292        if (DF_END(prev_frame) > DF_OFF(new_frame))
293            return INS_FRAME_OVERLAP                            | CASE('J');
294        if (select == 2)
295            goto have_prev;
296        /* Fall-through */
297    case 1:     /* Only right neighbor (next_frame) */
298        if (DF_END(new_frame) > DF_OFF(next_frame))
299            return INS_FRAME_OVERLAP                            | CASE('K');
300        else if (read_offset > DF_OFF(new_frame))
301            return INS_FRAME_OVERLAP                            | CASE('O');
302        break;
303    }
304
305    if (prev_frame)
306    {
307  have_prev:
308        TAILQ_INSERT_AFTER(&ncdi->ncdi_frames_in, prev_frame, new_frame, next_frame);
309        ncdi->ncdi_n_holes += DF_END(prev_frame) != DF_OFF(new_frame);
310        if (next_frame)
311        {
312            ncdi->ncdi_n_holes += DF_END(new_frame) != DF_OFF(next_frame);
313            --ncdi->ncdi_n_holes;
314        }
315    }
316    else
317    {
318        ncdi->ncdi_n_holes += next_frame
319                           && DF_END(new_frame) != DF_OFF(next_frame);
320  list_was_empty:
321        TAILQ_INSERT_HEAD(&ncdi->ncdi_frames_in, new_frame, next_frame);
322    }
323    CHECK_ORDER(ncdi);
324
325    if (DF_FIN(new_frame))
326    {
327        ncdi->ncdi_flags |= NCDI_FIN_SET;
328        ncdi->ncdi_fin_off = DF_END(new_frame);
329        LSQ_DEBUG("FIN set at %"PRIu64, DF_END(new_frame));
330    }
331
332    ++ncdi->ncdi_n_frames;
333    ncdi->ncdi_byteage += DF_SIZE(new_frame);
334    *p_n_frames = count;
335
336    return INS_FRAME_OK                                         | CASE('Z');
337}
338
339
340static int
341check_efficiency (struct nocopy_data_in *ncdi, unsigned count)
342{
343    if (ncdi->ncdi_n_frames <= EFF_CHECK_THRESH_LOW)
344    {
345        ncdi->ncdi_cons_far = 0;
346        return 0;
347    }
348    if (ncdi->ncdi_n_frames > EFF_CHECK_THRESH_HIGH)
349        return 1;
350    if (count >= ncdi->ncdi_n_frames / 2)
351    {
352        ++ncdi->ncdi_cons_far;
353        if (ncdi->ncdi_cons_far > EFF_FAR_TRAVERSE_COUNT)
354            return 1;
355    }
356    else
357        ncdi->ncdi_cons_far = 0;
358    if (ncdi->ncdi_n_holes > EFF_MAX_HOLES)
359        return 1;
360    if (ncdi->ncdi_byteage / EFF_TINY_FRAME_SZ < ncdi->ncdi_n_frames)
361        return 1;
362    return 0;
363}
364
365
366static void
367set_eff_alert (struct nocopy_data_in *ncdi)
368{
369    LSQ_DEBUG("low efficiency: n_frames: %u; n_holes: %u; cons_far: %u; "
370              "byteage: %"PRIu64, ncdi->ncdi_n_frames, ncdi->ncdi_n_holes,
371              ncdi->ncdi_cons_far, ncdi->ncdi_byteage);
372    ncdi->ncdi_data_in.di_flags |= DI_SWITCH_IMPL;
373}
374
375
376static enum ins_frame
377nocopy_di_insert_frame (struct data_in *data_in,
378                        struct stream_frame *new_frame, uint64_t read_offset)
379{
380    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
381    unsigned count;
382    enum ins_frame ins;
383    int ins_case;
384
385    assert(0 == (new_frame->data_frame.df_fin & ~1));
386    ins_case = insert_frame(ncdi, new_frame, read_offset, &count);
387    ins = ins_case & 0xFF;
388    ins_case >>= 8;
389    LSQ_DEBUG("%s: ins: %d (case '%c')", __func__, ins, (char) ins_case);
390    switch (ins)
391    {
392    case INS_FRAME_OK:
393        if (check_efficiency(ncdi, count))
394            set_eff_alert(ncdi);
395        break;
396    case INS_FRAME_DUP:
397    case INS_FRAME_ERR:
398        break;
399    default:
400        break;
401    }
402
403    return ins;
404}
405
406
407static struct data_frame *
408nocopy_di_get_frame (struct data_in *data_in, uint64_t read_offset)
409{
410    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
411    struct stream_frame *frame = TAILQ_FIRST(&ncdi->ncdi_frames_in);
412    if (frame && frame->data_frame.df_offset +
413                                frame->data_frame.df_read_off == read_offset)
414    {
415        LSQ_DEBUG("get_frame: frame (off: %"PRIu64", size: %u, fin: %d), at "
416            "read offset %"PRIu64, DF_OFF(frame), DF_SIZE(frame), DF_FIN(frame),
417            read_offset);
418        return &frame->data_frame;
419    }
420    else
421    {
422        LSQ_DEBUG("get_frame: no frame at read offset %"PRIu64, read_offset);
423        return NULL;
424    }
425}
426
427
428static void
429nocopy_di_frame_done (struct data_in *data_in, struct data_frame *data_frame)
430{
431    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
432    struct stream_frame *const frame = STREAM_FRAME_PTR(data_frame), *first;
433    assert(data_frame->df_read_off == data_frame->df_size);
434    TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame);
435    first = TAILQ_FIRST(&ncdi->ncdi_frames_in);
436    ncdi->ncdi_n_holes -= first && frame->data_frame.df_offset +
437                    frame->data_frame.df_size != first->data_frame.df_offset;
438    --ncdi->ncdi_n_frames;
439    ncdi->ncdi_byteage -= frame->data_frame.df_size;
440    if (DF_FIN(frame))
441    {
442        ncdi->ncdi_flags |= NCDI_FIN_REACHED;
443        LSQ_DEBUG("FIN has been reached at offset %"PRIu64, DF_END(frame));
444    }
445    LSQ_DEBUG("frame (off: %"PRIu64", size: %u, fin: %d) done",
446                                DF_OFF(frame), DF_SIZE(frame), DF_FIN(frame));
447    lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in);
448    lsquic_malo_put(frame);
449}
450
451
452static int
453nocopy_di_empty (struct data_in *data_in)
454{
455    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
456    return TAILQ_EMPTY(&ncdi->ncdi_frames_in);
457}
458
459
460static struct data_in *
461nocopy_di_switch_impl (struct data_in *data_in, uint64_t read_offset)
462{
463    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
464    struct data_in *new_data_in;
465    stream_frame_t *frame;
466    enum ins_frame ins;
467
468    new_data_in = lsquic_data_in_hash_new(ncdi->ncdi_conn_pub,
469                                ncdi->ncdi_stream_id, ncdi->ncdi_byteage);
470    if (!new_data_in)
471        goto end;
472
473    while ((frame = TAILQ_FIRST(&ncdi->ncdi_frames_in)))
474    {
475        TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame);
476        ins = lsquic_data_in_hash_insert_data_frame(new_data_in,
477                                            &frame->data_frame, read_offset);
478        lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in);
479        lsquic_malo_put(frame);
480        if (INS_FRAME_ERR == ins)
481        {
482            new_data_in->di_if->di_destroy(new_data_in);
483            new_data_in = NULL;
484            goto end;
485        }
486    }
487
488  end:
489    data_in->di_if->di_destroy(data_in);
490    return new_data_in;
491}
492
493
494/* This function overestimates amount of memory because some packets are
495 * referenced by more than one stream.  In the usual case, however, I
496 * expect the error not to be large.
497 */
498static size_t
499nocopy_di_mem_used (struct data_in *data_in)
500{
501    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
502    const stream_frame_t *frame;
503    size_t size;
504
505    size = sizeof(*data_in);
506    TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame)
507        size += lsquic_packet_in_mem_used(frame->packet_in);
508
509    return size;
510}
511
512
513static void
514nocopy_di_dump_state (struct data_in *data_in)
515{
516    struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
517    const struct stream_frame *frame;
518
519    LSQ_DEBUG("nocopy state: frames: %u; holes: %u; cons_far: %u",
520        ncdi->ncdi_n_frames, ncdi->ncdi_n_holes, ncdi->ncdi_cons_far);
521    TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame)
522        LSQ_DEBUG("frame: off: %"PRIu64"; read_off: %"PRIu16"; size: %"PRIu16
523            "; fin: %d", DF_OFF(frame), frame->data_frame.df_read_off,
524            DF_SIZE(frame), DF_FIN(frame));
525}
526
527
528static uint64_t
529nocopy_di_readable_bytes (struct data_in *data_in, uint64_t read_offset)
530{
531    const struct nocopy_data_in *const ncdi = NCDI_PTR(data_in);
532    const struct stream_frame *frame;
533    uint64_t starting_offset;
534
535    starting_offset = read_offset;
536    TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame)
537        if (DF_ROFF(frame) == read_offset)
538            read_offset += DF_END(frame) - DF_ROFF(frame);
539        else if (read_offset > starting_offset)
540            break;
541
542    return read_offset - starting_offset;
543}
544
545
546static const struct data_in_iface di_if_nocopy = {
547    .di_destroy      = nocopy_di_destroy,
548    .di_dump_state   = nocopy_di_dump_state,
549    .di_empty        = nocopy_di_empty,
550    .di_frame_done   = nocopy_di_frame_done,
551    .di_get_frame    = nocopy_di_get_frame,
552    .di_insert_frame = nocopy_di_insert_frame,
553    .di_mem_used     = nocopy_di_mem_used,
554    .di_own_on_ok    = 1,
555    .di_readable_bytes
556                     = nocopy_di_readable_bytes,
557    .di_switch_impl  = nocopy_di_switch_impl,
558};
559
560static const struct data_in_iface *di_if_nocopy_ptr = &di_if_nocopy;
561