lsquic_stream.h revision 5392f7a3
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2#ifndef LSQUIC_STREAM_H
3#define LSQUIC_STREAM_H
4
5#define LSQUIC_GQUIC_STREAM_HANDSHAKE 1
6#define LSQUIC_GQUIC_STREAM_HEADERS   3
7
8#define LSQUIC_STREAM_DEFAULT_PRIO 16   /* RFC 7540, Section 5.3.5 */
9
10
11struct lsquic_stream_if;
12struct lsquic_stream_ctx;
13struct lsquic_conn_public;
14struct stream_frame;
15struct uncompressed_headers;
16enum enc_level;
17enum swtp_status;
18struct frame_gen_ctx;
19struct data_frame;
20enum quic_frame_type;
21struct push_promise;
22
23TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream);
24
25
26#ifndef LSQUIC_KEEP_STREAM_HISTORY
27#   ifdef NDEBUG
28#       define LSQUIC_KEEP_STREAM_HISTORY 0
29#   else
30#       define LSQUIC_KEEP_STREAM_HISTORY 1
31#   endif
32#endif
33
34
35#if LSQUIC_KEEP_STREAM_HISTORY
36#define SM_HIST_BITS 6
37#define SM_HIST_IDX_MASK ((1 << SM_HIST_BITS) - 1)
38typedef unsigned char sm_hist_idx_t;
39#endif
40
41
42/*
43 *  +----------+----------------------------------+
44 *  | Low Bits | Stream Type                      |
45 *  +----------+----------------------------------+
46 *  | 0x0      | Client-Initiated, Bidirectional  |
47 *  |          |                                  |
48 *  | 0x1      | Server-Initiated, Bidirectional  |
49 *  |          |                                  |
50 *  | 0x2      | Client-Initiated, Unidirectional |
51 *  |          |                                  |
52 *  | 0x3      | Server-Initiated, Unidirectional |
53 *  +----------+----------------------------------+
54 */
55
56enum stream_id_type
57{
58    SIT_BIDI_CLIENT,
59    SIT_BIDI_SERVER,
60    SIT_UNI_CLIENT,
61    SIT_UNI_SERVER,
62    N_SITS
63};
64
65#define SIT_MASK (N_SITS - 1)
66
67#define SIT_SHIFT 2
68#define SD_SHIFT 1
69
70enum stream_dir { SD_BIDI, SD_UNI, N_SDS };
71
72
73struct stream_hq_frame
74{
75    STAILQ_ENTRY(stream_hq_frame)
76                        shf_next;
77    /* At which point in the stream (sm_payload) to insert the HQ frame. */
78    uint64_t            shf_off;
79    union {
80        /* Points to the frame if SHF_FIXED_SIZE is not set */
81        unsigned char  *frame_ptr;
82        /* If SHF_FIXED_SIZE is set, the size of the frame to follow.
83         * Non-fixed frame size gets calculated using sm_payload when they
84         * are closed.
85         */
86        size_t          frame_size;
87    }                   shf_u;
88#define shf_frame_ptr shf_u.frame_ptr
89#define shf_frame_size shf_u.frame_size
90    enum hq_frame_type  shf_frame_type:8;
91    enum shf_flags {
92        SHF_TWO_BYTES   = 1 << 0,   /* Use two byte to encode frame length */
93        SHF_FIXED_SIZE  = 1 << 1,   /* Payload size guaranteed */
94        SHF_ACTIVE      = 1 << 2,   /* On sm_hq_frames list */
95        SHF_WRITTEN     = 1 << 3,   /* Framing bytes have been packetized */
96        SHF_CC_PAID     = 1 << 4,   /* Paid connection cap */
97        SHF_PHANTOM     = 1 << 5,   /* Phantom frame headers are not written */
98    }                   shf_flags:8;
99};
100
101
102struct hq_filter
103{
104    struct varint_read2_state   hqfi_vint_state;
105    /* No need to copy the values: use it directly */
106#define hqfi_left hqfi_vint_state.vr2s_two
107#define hqfi_type hqfi_vint_state.vr2s_one
108    enum {
109        HQFI_FLAG_GOT_HEADERS   = 1 << 0,
110        HQFI_FLAG_ERROR         = 1 << 1,
111        HQFI_FLAG_BEGIN         = 1 << 2,
112        HQFI_FLAG_BLOCKED       = 1 << 3,
113    }                           hqfi_flags:8;
114    enum {
115        HQFI_STATE_FRAME_HEADER_BEGIN,
116        HQFI_STATE_FRAME_HEADER_CONTINUE,
117        HQFI_STATE_READING_PAYLOAD,
118    }                           hqfi_state:8;
119    unsigned char               hqfi_hist_idx;
120#define MAX_HQFI_ENTRIES (sizeof(unsigned) * 8 / 3)
121    unsigned                    hqfi_hist_buf;
122};
123
124
125struct stream_filter_if
126{
127    int         (*sfi_readable)(struct lsquic_stream *);
128    size_t      (*sfi_filter_df)(struct lsquic_stream *, struct data_frame *);
129    void        (*sfi_decr_left)(struct lsquic_stream *, size_t);
130};
131
132
133/* These flags indicate which queues -- or other entities -- currently
134 * reference the stream.
135 */
136enum stream_q_flags
137{
138    /* read_streams: */
139    SMQF_WANT_READ    = 1 << 0,
140
141    /* write_streams: */
142#define SMQF_WRITE_Q_FLAGS (SMQF_WANT_FLUSH|SMQF_WANT_WRITE)
143    SMQF_WANT_WRITE   = 1 << 1,
144    SMQF_WANT_FLUSH   = 1 << 2,     /* Flush until sm_flush_to is hit */
145
146    /* There are more than one reason that a stream may be put onto
147     * connections's sending_streams queue.  Note that writing STREAM
148     * frames is done separately.
149     */
150#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED)
151    /* sending_streams: */
152    SMQF_SEND_WUF     = 1 << 3,     /* WUF: Window Update Frame */
153    SMQF_SEND_BLOCKED = 1 << 4,
154    SMQF_SEND_RST     = 1 << 5,     /* Error: want to send RST_STREAM */
155
156    /* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is
157     * the MAX_STREAM_DATA frame.  Define an alias for use in the IETF
158     * QUIC code:
159     */
160#define SMQF_SEND_MAX_STREAM_DATA SMQF_SEND_WUF
161
162#define SMQF_SERVICE_FLAGS (SMQF_CALL_ONCLOSE|SMQF_FREE_STREAM|SMQF_ABORT_CONN)
163    SMQF_CALL_ONCLOSE = 1 << 6,
164    SMQF_FREE_STREAM  = 1 << 7,
165    SMQF_ABORT_CONN   = 1 << 8,     /* Unrecoverable error occurred */
166
167    SMQF_QPACK_DEC    = 1 << 9,     /* QPACK decoder is holding a reference to this stream */
168
169    SMQF_H3_PRIO      = 1 <<10,     /* Referenced from HTTP/3 priority tree */
170};
171
172
173/* Stream behavior flags */
174enum stream_b_flags
175{
176    SMBF_SERVER       = 1 << 0,
177    SMBF_IETF         = 1 << 1,
178    SMBF_USE_HEADERS  = 1 << 2,
179    SMBF_CRYPTO       = 1 << 3,
180    SMBF_CRITICAL     = 1 << 4,  /* This is a critical stream */
181    SMBF_AUTOSWITCH   = 1 << 5,
182    SMBF_RW_ONCE      = 1 << 6,  /* When set, read/write events are dispatched once per call */
183    SMBF_CONN_LIMITED = 1 << 7,
184#define N_SMBF_FLAGS 8
185};
186
187
188enum stream_flags {
189    STREAM_FIN_RECVD    = 1 << 0,   /* Received STREAM frame with FIN bit set */
190    STREAM_RST_RECVD    = 1 << 1,   /* Received RST frame */
191    STREAM_LAST_WRITE_OK= 1 << 2,   /* Used to break out of write event dispatch loop */
192    STREAM_U_READ_DONE  = 1 << 3,   /* User is done reading (shutdown was called) */
193    STREAM_U_WRITE_DONE = 1 << 4,   /* User is done writing (shutdown was called) */
194    STREAM_FIN_SENT     = 1 << 5,   /* FIN was written to network */
195    STREAM_RST_SENT     = 1 << 6,   /* RST_STREAM was written to network */
196    STREAM_FIN_REACHED  = 1 << 7,   /* User read data up to FIN */
197    STREAM_FINISHED     = 1 << 8,   /* Stream is finished */
198    STREAM_ONCLOSE_DONE = 1 << 9,   /* on_close has been called */
199    STREAM_UNUSED10     = 1 << 10,  /* Unused */
200    STREAM_HEADERS_SENT = 1 << 11,
201    STREAM_HAVE_UH      = 1 << 12,  /* Have uncompressed headers */
202    STREAM_UNUSED13     = 1 << 13,  /* Unused */
203    STREAM_HEAD_IN_FIN  = 1 << 14,  /* Incoming headers has FIN bit set */
204    STREAM_FRAMES_ELIDED= 1 << 15,
205    STREAM_FORCE_FINISH = 1 << 16,  /* Replaces FIN sent and received */
206    STREAM_ONNEW_DONE   = 1 << 17,  /* on_new_stream has been called */
207    STREAM_PUSHING      = 1 << 18,
208    STREAM_NOPUSH       = 1 << 19,  /* Disallow further push promises */
209    STREAM_UNUSED20     = 1 << 20,  /* Unused */
210    STREAM_UNUSED21     = 1 << 21,  /* Unused */
211    STREAM_RST_ACKED    = 1 << 22,  /* Packet containing RST has been acked */
212    STREAM_BLOCKED_SENT = 1 << 23,  /* Stays set once a STREAM_BLOCKED frame is sent */
213    STREAM_RST_READ     = 1 << 24,  /* User code collected the error */
214    STREAM_DATA_RECVD   = 1 << 25,  /* Cache stream state calculation */
215    STREAM_UNUSED26     = 1 << 26,  /* Unused */
216    STREAM_HDRS_FLUSHED = 1 << 27,  /* Only used in buffered packets mode */
217    STREAM_SS_RECVD     = 1 << 28,  /* Received STOP_SENDING frame */
218    STREAM_DELAYED_SW   = 1 << 29,  /* Delayed shutdown_write call */
219};
220
221
222/* By keeping this number low, we make sure that the code to allocate HQ
223 * frames dynamically gets exercised whenever push promises are sent.
224 */
225#define NUM_ALLOCED_HQ_FRAMES 2
226
227
228struct lsquic_stream
229{
230    struct lsquic_hash_elem         sm_hash_el;
231    lsquic_stream_id_t              id;
232    enum stream_flags               stream_flags;
233    enum stream_b_flags             sm_bflags;
234    enum stream_q_flags             sm_qflags;
235    unsigned                        n_unacked;
236    unsigned                        sm_h3_prio_idx;
237
238    const struct lsquic_stream_if  *stream_if;
239    struct lsquic_stream_ctx       *st_ctx;
240    struct lsquic_conn_public      *conn_pub;
241    TAILQ_ENTRY(lsquic_stream)      next_send_stream, next_read_stream,
242                                        next_write_stream, next_service_stream,
243                                        next_prio_stream;
244
245    uint64_t                        tosend_off;
246    uint64_t                        sm_payload;     /* Not counting HQ frames */
247    uint64_t                        max_send_off;
248    uint64_t                        sm_last_recv_off;
249    uint64_t                        error_code;
250
251    /* From the network, we get frames, which we keep on a list ordered
252     * by offset.
253     */
254    struct data_in                 *data_in;
255    uint64_t                        read_offset;
256    lsquic_sfcw_t                   fc;
257
258    /* List of active HQ frames */
259    STAILQ_HEAD(, stream_hq_frame)  sm_hq_frames;
260
261    /* For efficiency, several frames are allocated as part of the stream
262     * itself.  If more frames are needed, they are allocated.
263     */
264    struct stream_hq_frame          sm_hq_frame_arr[NUM_ALLOCED_HQ_FRAMES];
265
266    struct hq_filter                sm_hq_filter;
267
268    /* We can safely use sm_hq_filter */
269#define sm_uni_type_state sm_hq_filter.hqfi_vint_state.vr2s_varint_state
270
271    /** If @ref SMQF_WANT_FLUSH is set, flush until this offset. */
272    uint64_t                        sm_flush_to;
273
274    /**
275     * If @ref SMQF_WANT_FLUSH is set, this indicates payload offset
276     * to flush to.  Used to adjust @ref sm_flush_to when H3 frame
277     * size grows.
278     */
279    uint64_t                        sm_flush_to_payload;
280
281    /* Last offset sent in BLOCKED frame */
282    uint64_t                        blocked_off;
283
284    struct uncompressed_headers    *uh,
285                                   *push_req;
286
287    unsigned char                  *sm_buf;
288    void                           *sm_onnew_arg;
289
290    unsigned char                  *sm_header_block;
291    uint64_t                        sm_hb_compl;
292
293    /* Valid if STREAM_FIN_RECVD is set: */
294    uint64_t                        sm_fin_off;
295
296    /* A stream may be generating STREAM or CRYPTO frames */
297    size_t                        (*sm_frame_header_sz)(
298                                        const struct lsquic_stream *, unsigned);
299    enum swtp_status              (*sm_write_to_packet)(struct frame_gen_ctx *,
300                                                const size_t);
301    size_t                        (*sm_write_avail)(struct lsquic_stream *);
302    int                           (*sm_readable)(struct lsquic_stream *);
303
304    /* This element is optional */
305    const struct stream_filter_if  *sm_sfi;
306
307    /* sm_promise and sm_promises are never used at the same time and can
308     * be combined into a union should space in this struct become tight.
309     */
310    /* Push promise that engendered this push stream */
311    struct push_promise            *sm_promise;
312
313    /* Push promises sent on this stream */
314    SLIST_HEAD(, push_promise)      sm_promises;
315
316    /* How much data there is in sm_header_block and how much of it has been
317     * sent:
318     */
319    unsigned                        sm_hblock_sz,
320                                    sm_hblock_off;
321
322    unsigned short                  sm_n_buffered;  /* Amount of data in sm_buf */
323    unsigned short                  sm_n_allocated;  /* Size of sm_buf */
324
325    unsigned char                   sm_priority;  /* 0: high; 255: low */
326    unsigned char                   sm_enc_level;
327    enum {
328        SSHS_BEGIN,         /* Nothing has happened yet */
329        SSHS_ENC_SENDING,   /* Sending encoder stream data */
330        SSHS_HBLOCK_SENDING,/* Sending header block data */
331    }                               sm_send_headers_state:8;
332    signed char                     sm_saved_want_write;
333
334    unsigned char                   sm_dup_push_off;
335    unsigned char                   sm_dup_push_len;
336    unsigned char                   sm_dup_push_buf[8];
337
338#if LSQUIC_KEEP_STREAM_HISTORY
339    sm_hist_idx_t                   sm_hist_idx;
340#endif
341
342#if LSQUIC_KEEP_STREAM_HISTORY
343    /* Stream history: see enum stream_history_event */
344    unsigned char                   sm_hist_buf[ 1 << SM_HIST_BITS ];
345#endif
346};
347
348
349enum stream_ctor_flags
350{
351    SCF_CALL_ON_NEW   = (1 << (N_SMBF_FLAGS + 0)), /* Call on_new_stream() immediately */
352    SCF_USE_DI_HASH   = (1 << (N_SMBF_FLAGS + 1)), /* Use hash-based data input.  If not set,
353                                   * the nocopy data input is used.
354                                   */
355    SCF_DI_AUTOSWITCH = SMBF_AUTOSWITCH, /* Automatically switch between nocopy
356                                   * and hash-based to data input for optimal
357                                   * performance.
358                                   */
359    SCF_DISP_RW_ONCE  = SMBF_RW_ONCE,
360    SCF_CRITICAL      = SMBF_CRITICAL, /* This is a critical stream */
361    SCF_IETF          = SMBF_IETF,
362    SCF_HTTP          = SMBF_USE_HEADERS,
363};
364
365
366lsquic_stream_t *
367lsquic_stream_new (lsquic_stream_id_t id, struct lsquic_conn_public *,
368                   const struct lsquic_stream_if *, void *stream_if_ctx,
369                   unsigned initial_sfrw, uint64_t initial_send_off,
370                   enum stream_ctor_flags);
371
372struct lsquic_stream *
373lsquic_stream_new_crypto (enum enc_level,
374        struct lsquic_conn_public *conn_pub,
375        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
376        enum stream_ctor_flags ctor_flags);
377
378void
379lsquic_stream_call_on_new (lsquic_stream_t *);
380
381void
382lsquic_stream_destroy (lsquic_stream_t *);
383
384/* Any of these flags will cause user-facing read and write and
385 * shutdown calls to return an error.  They also make the stream
386 * both readable and writeable, as we want the user to collect
387 * the error.
388 */
389#define lsquic_stream_is_reset(stream) \
390    (((stream)->stream_flags & \
391                    (STREAM_RST_RECVD|STREAM_RST_SENT|STREAM_SS_RECVD)) \
392        || ((stream)->sm_qflags & SMQF_SEND_RST))
393
394/* Data that from the network gets inserted into the stream using
395 * lsquic_stream_frame_in() function.  Returns 0 on success, -1 on
396 * failure.  The latter may be caused by flow control violation or
397 * invalid stream frame data, e.g. overlapping segments.
398 *
399 * Note that the caller does gives up control of `frame' no matter
400 * what this function returns.
401 *
402 * This data is read by the user using lsquic_stream_read() function.
403 */
404int
405lsquic_stream_frame_in (lsquic_stream_t *, struct stream_frame *frame);
406
407/* Only one (at least for now) uncompressed header structure is allowed to be
408 * passed in, and only in HTTP mode.
409 */
410int
411lsquic_stream_uh_in (lsquic_stream_t *, struct uncompressed_headers *);
412
413void
414lsquic_stream_push_req (lsquic_stream_t *,
415                        struct uncompressed_headers *push_req);
416
417int
418lsquic_stream_rst_in (lsquic_stream_t *, uint64_t offset, uint64_t error_code);
419
420void
421lsquic_stream_stop_sending_in (struct lsquic_stream *, uint64_t error_code);
422
423ssize_t
424lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len);
425
426uint64_t
427lsquic_stream_read_offset (const lsquic_stream_t *stream);
428
429/* Return true if we sent all available data to the network and write
430 * end of the stream was closed.
431 */
432int
433lsquic_stream_tosend_fin (const lsquic_stream_t *stream);
434
435/* Data to be sent out to the network is written using lsquic_stream_write().
436 */
437ssize_t
438lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len);
439
440void
441lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset);
442
443int
444lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset);
445
446/* The caller should only call this function if SMQF_SEND_WUF is set and
447 * it must generate a window update frame using this value.
448 */
449uint64_t
450lsquic_stream_fc_recv_off (lsquic_stream_t *stream);
451
452void
453lsquic_stream_peer_blocked (struct lsquic_stream *, uint64_t);
454
455void
456lsquic_stream_peer_blocked_gquic (struct lsquic_stream *);
457
458void
459lsquic_stream_dispatch_read_events (lsquic_stream_t *);
460
461void
462lsquic_stream_dispatch_write_events (lsquic_stream_t *);
463
464void
465lsquic_stream_blocked_frame_sent (lsquic_stream_t *);
466
467void
468lsquic_stream_rst_frame_sent (lsquic_stream_t *);
469
470void
471lsquic_stream_stream_frame_sent (lsquic_stream_t *);
472
473void
474lsquic_stream_reset (lsquic_stream_t *, uint64_t error_code);
475
476void
477lsquic_stream_reset_ext (lsquic_stream_t *, uint64_t error_code, int close);
478
479void
480lsquic_stream_call_on_close (lsquic_stream_t *);
481
482void
483lsquic_stream_shutdown_internal (lsquic_stream_t *);
484
485void
486lsquic_stream_received_goaway (lsquic_stream_t *);
487
488void
489lsquic_stream_acked (struct lsquic_stream *, enum quic_frame_type);
490
491#define lsquic_stream_is_closed(s)                                          \
492    (((s)->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))         \
493                            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))
494int
495lsquic_stream_update_sfcw (lsquic_stream_t *, uint64_t max_off);
496
497int
498lsquic_stream_set_priority_internal (lsquic_stream_t *, unsigned priority);
499
500int
501lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t);
502
503int
504lsquic_stream_is_critical (const struct lsquic_stream *);
505
506size_t
507lsquic_stream_mem_used (const struct lsquic_stream *);
508
509const lsquic_cid_t *
510lsquic_stream_cid (const struct lsquic_stream *);
511
512#define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0)
513
514int
515lsquic_stream_readable (struct lsquic_stream *);
516
517size_t
518lsquic_stream_write_avail (struct lsquic_stream *);
519
520void
521lsquic_stream_dump_state (const struct lsquic_stream *);
522
523#ifndef NDEBUG
524size_t
525lsquic_stream_flush_threshold (const struct lsquic_stream *, unsigned);
526#endif
527
528#define crypto_level(stream) (~0ULL - (stream)->id)
529
530void
531lsquic_stream_set_stream_if (struct lsquic_stream *,
532                   const struct lsquic_stream_if *, void *stream_if_ctx);
533
534struct qpack_dec_hdl *
535lsquic_stream_get_qdh (const struct lsquic_stream *);
536
537uint64_t
538lsquic_stream_combined_send_off (const struct lsquic_stream *);
539
540/* [draft-ietf-quic-transport-16] Section 3.1 */
541enum stream_state_sending
542{
543    SSS_READY,
544    SSS_SEND,
545    SSS_DATA_SENT,
546    SSS_RESET_SENT,
547    SSS_DATA_RECVD,
548    SSS_RESET_RECVD,
549};
550
551extern const char *const lsquic_sss2str[];
552
553enum stream_state_sending
554lsquic_stream_sending_state (const struct lsquic_stream *);
555
556/* [draft-ietf-quic-transport-16] Section 3.2 */
557enum stream_state_receiving
558{
559    SSR_RECV,
560    SSR_SIZE_KNOWN,
561    SSR_DATA_RECVD,
562    SSR_RESET_RECVD,
563    SSR_DATA_READ,
564    SSR_RESET_READ,
565};
566
567extern const char *const lsquic_ssr2str[];
568
569enum stream_state_receiving
570lsquic_stream_receiving_state (struct lsquic_stream *);
571
572uint64_t
573lsquic_stream_fc_recv_off_const (const struct lsquic_stream *);
574
575void
576lsquic_stream_max_stream_data_sent (struct lsquic_stream *);
577
578void
579lsquic_stream_qdec_unblocked (struct lsquic_stream *);
580
581int
582lsquic_stream_can_push (const struct lsquic_stream *);
583
584int
585lsquic_stream_duplicate_push (struct lsquic_stream *, uint64_t push_id);
586
587int
588lsquic_stream_push_promise (struct lsquic_stream *, struct push_promise *);
589
590#endif
591