lsquic_stream.h revision 7a8b2ece
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2#ifndef LSQUIC_STREAM_H
3#define LSQUIC_STREAM_H
4
5#define LSQUIC_STREAM_DEFAULT_PRIO 16   /* RFC 7540, Section 5.3.5 */
6
7
8struct lsquic_stream_if;
9struct lsquic_stream_ctx;
10struct lsquic_conn_public;
11struct stream_frame;
12struct uncompressed_headers;
13enum enc_level;
14enum swtp_status;
15struct frame_gen_ctx;
16struct data_frame;
17enum quic_frame_type;
18struct push_promise;
19
20TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream);
21
22
23#ifndef LSQUIC_KEEP_STREAM_HISTORY
24#   ifdef NDEBUG
25#       define LSQUIC_KEEP_STREAM_HISTORY 0
26#   else
27#       define LSQUIC_KEEP_STREAM_HISTORY 1
28#   endif
29#endif
30
31
32#if LSQUIC_KEEP_STREAM_HISTORY
33#define SM_HIST_BITS 6
34#define SM_HIST_IDX_MASK ((1 << SM_HIST_BITS) - 1)
35typedef unsigned char sm_hist_idx_t;
36#endif
37
38
39/*
40 *  +----------+----------------------------------+
41 *  | Low Bits | Stream Type                      |
42 *  +----------+----------------------------------+
43 *  | 0x0      | Client-Initiated, Bidirectional  |
44 *  |          |                                  |
45 *  | 0x1      | Server-Initiated, Bidirectional  |
46 *  |          |                                  |
47 *  | 0x2      | Client-Initiated, Unidirectional |
48 *  |          |                                  |
49 *  | 0x3      | Server-Initiated, Unidirectional |
50 *  +----------+----------------------------------+
51 */
52
53enum stream_id_type
54{
55    SIT_BIDI_CLIENT,
56    SIT_BIDI_SERVER,
57    SIT_UNI_CLIENT,
58    SIT_UNI_SERVER,
59    N_SITS
60};
61
62#define SIT_MASK (N_SITS - 1)
63
64#define SIT_SHIFT 2
65#define SD_SHIFT 1
66
67enum stream_dir { SD_BIDI, SD_UNI, N_SDS };
68
69
70struct stream_hq_frame
71{
72    STAILQ_ENTRY(stream_hq_frame)
73                        shf_next;
74    /* At which point in the stream (sm_payload) to insert the HQ frame. */
75    uint64_t            shf_off;
76    union {
77        /* Points to the frame if SHF_FIXED_SIZE is not set */
78        unsigned char  *frame_ptr;
79        /* If SHF_FIXED_SIZE is set, the size of the frame to follow.
80         * Non-fixed frame size gets calculated using sm_payload when they
81         * are closed.
82         */
83        size_t          frame_size;
84    }                   shf_u;
85#define shf_frame_ptr shf_u.frame_ptr
86#define shf_frame_size shf_u.frame_size
87    enum hq_frame_type  shf_frame_type:8;
88    enum shf_flags {
89        SHF_TWO_BYTES   = 1 << 0,   /* Use two byte to encode frame length */
90        SHF_FIXED_SIZE  = 1 << 1,   /* Payload size guaranteed */
91        SHF_ACTIVE      = 1 << 2,   /* On sm_hq_frames list */
92        SHF_WRITTEN     = 1 << 3,   /* Framing bytes have been packetized */
93        SHF_CC_PAID     = 1 << 4,   /* Paid connection cap */
94        SHF_PHANTOM     = 1 << 5,   /* Phantom frame headers are not written */
95    }                   shf_flags:8;
96};
97
98
99struct hq_filter
100{
101    struct varint_read2_state   hqfi_vint2_state;
102    /* No need to copy the values: use it directly */
103#define hqfi_left hqfi_vint2_state.vr2s_two
104#define hqfi_type hqfi_vint2_state.vr2s_one
105    struct varint_read_state    hqfi_vint1_state;
106#define hqfi_push_id hqfi_vint1_state.value
107    enum {
108        HQFI_FLAG_UNUSED_0      = 1 << 0,
109        HQFI_FLAG_ERROR         = 1 << 1,
110        HQFI_FLAG_BEGIN         = 1 << 2,
111        HQFI_FLAG_BLOCKED       = 1 << 3,
112    }                           hqfi_flags:8;
113    enum {
114        HQFI_STATE_FRAME_HEADER_BEGIN,
115        HQFI_STATE_FRAME_HEADER_CONTINUE,
116        HQFI_STATE_READING_PAYLOAD,
117        HQFI_STATE_PUSH_ID_BEGIN,
118        HQFI_STATE_PUSH_ID_CONTINUE,
119    }                           hqfi_state:8;
120    unsigned char               hqfi_hist_idx;
121#define MAX_HQFI_ENTRIES (sizeof(unsigned) * 8 / 3)
122    unsigned                    hqfi_hist_buf;
123};
124
125
126struct stream_filter_if
127{
128    int         (*sfi_readable)(struct lsquic_stream *);
129    size_t      (*sfi_filter_df)(struct lsquic_stream *, struct data_frame *);
130    void        (*sfi_decr_left)(struct lsquic_stream *, size_t);
131};
132
133
134/* These flags indicate which queues -- or other entities -- currently
135 * reference the stream.
136 */
137enum stream_q_flags
138{
139    /* read_streams: */
140    SMQF_WANT_READ    = 1 << 0,
141
142    /* write_streams: */
143#define SMQF_WRITE_Q_FLAGS (SMQF_WANT_FLUSH|SMQF_WANT_WRITE)
144    SMQF_WANT_WRITE   = 1 << 1,
145    SMQF_WANT_FLUSH   = 1 << 2,     /* Flush until sm_flush_to is hit */
146
147    /* There are more than one reason that a stream may be put onto
148     * connections's sending_streams queue.  Note that writing STREAM
149     * frames is done separately.
150     */
151#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED)
152    /* sending_streams: */
153    SMQF_SEND_WUF     = 1 << 3,     /* WUF: Window Update Frame */
154    SMQF_SEND_BLOCKED = 1 << 4,
155    SMQF_SEND_RST     = 1 << 5,     /* Error: want to send RST_STREAM */
156
157    /* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is
158     * the MAX_STREAM_DATA frame.  Define an alias for use in the IETF
159     * QUIC code:
160     */
161#define SMQF_SEND_MAX_STREAM_DATA SMQF_SEND_WUF
162
163#define SMQF_SERVICE_FLAGS (SMQF_CALL_ONCLOSE|SMQF_FREE_STREAM|SMQF_ABORT_CONN)
164    SMQF_CALL_ONCLOSE = 1 << 6,
165    SMQF_FREE_STREAM  = 1 << 7,
166    SMQF_ABORT_CONN   = 1 << 8,     /* Unrecoverable error occurred */
167
168    SMQF_QPACK_DEC    = 1 << 9,     /* QPACK decoder is holding a reference to this stream */
169};
170
171
172/* Stream behavior flags */
173enum stream_b_flags
174{
175    SMBF_SERVER       = 1 << 0,
176    SMBF_IETF         = 1 << 1,
177    SMBF_USE_HEADERS  = 1 << 2,
178    SMBF_CRYPTO       = 1 << 3,  /* Crypto stream: applies to both gQUIC and IETF QUIC */
179    SMBF_CRITICAL     = 1 << 4,  /* This is a critical stream */
180    SMBF_AUTOSWITCH   = 1 << 5,
181    SMBF_RW_ONCE      = 1 << 6,  /* When set, read/write events are dispatched once per call */
182    SMBF_CONN_LIMITED = 1 << 7,
183    SMBF_HEADERS      = 1 << 8,  /* Headers stream */
184#define N_SMBF_FLAGS 9
185};
186
187
188enum stream_flags {
189    STREAM_FIN_RECVD    = 1 << 0,   /* Received STREAM frame with FIN bit set */
190    STREAM_RST_RECVD    = 1 << 1,   /* Received RST frame */
191    STREAM_LAST_WRITE_OK= 1 << 2,   /* Used to break out of write event dispatch loop */
192    STREAM_U_READ_DONE  = 1 << 3,   /* User is done reading (shutdown was called) */
193    STREAM_U_WRITE_DONE = 1 << 4,   /* User is done writing (shutdown was called) */
194    STREAM_FIN_SENT     = 1 << 5,   /* FIN was written to network */
195    STREAM_RST_SENT     = 1 << 6,   /* RST_STREAM was written to network */
196    STREAM_FIN_REACHED  = 1 << 7,   /* User read data up to FIN */
197    STREAM_FINISHED     = 1 << 8,   /* Stream is finished */
198    STREAM_ONCLOSE_DONE = 1 << 9,   /* on_close has been called */
199    STREAM_CACHED_FRAME = 1 << 10,  /* If set, sm_has_frame can be used */
200    STREAM_HEADERS_SENT = 1 << 11,
201    STREAM_HAVE_UH      = 1 << 12,  /* Have uncompressed headers */
202    STREAM_ENCODER_DEP  = 1 << 13,  /* Encoder dependency: flush (IETF only) */
203    STREAM_HEAD_IN_FIN  = 1 << 14,  /* Incoming headers has FIN bit set */
204    STREAM_FRAMES_ELIDED= 1 << 15,
205    STREAM_FORCE_FINISH = 1 << 16,  /* Replaces FIN sent and received */
206    STREAM_ONNEW_DONE   = 1 << 17,  /* on_new_stream has been called */
207    STREAM_PUSHING      = 1 << 18,
208    STREAM_NOPUSH       = 1 << 19,  /* Disallow further push promises */
209    STREAM_UNUSED20     = 1 << 20,  /* Unused */
210    STREAM_UNUSED21     = 1 << 21,  /* Unused */
211    STREAM_RST_ACKED    = 1 << 22,  /* Packet containing RST has been acked */
212    STREAM_BLOCKED_SENT = 1 << 23,  /* Stays set once a STREAM_BLOCKED frame is sent */
213    STREAM_RST_READ     = 1 << 24,  /* User code collected the error */
214    STREAM_DATA_RECVD   = 1 << 25,  /* Cache stream state calculation */
215    STREAM_UNUSED26     = 1 << 26,  /* Unused */
216    STREAM_HDRS_FLUSHED = 1 << 27,  /* Only used in buffered packets mode */
217    STREAM_SS_RECVD     = 1 << 28,  /* Received STOP_SENDING frame */
218    STREAM_DELAYED_SW   = 1 << 29,  /* Delayed shutdown_write call */
219};
220
221
222/* By keeping this number low, we make sure that the code to allocate HQ
223 * frames dynamically gets exercised whenever push promises are sent.
224 */
225#define NUM_ALLOCED_HQ_FRAMES 2
226
227
228struct lsquic_stream
229{
230    struct lsquic_hash_elem         sm_hash_el;
231    lsquic_stream_id_t              id;
232    enum stream_flags               stream_flags;
233    enum stream_b_flags             sm_bflags;
234    enum stream_q_flags             sm_qflags;
235    unsigned                        n_unacked;
236
237    const struct lsquic_stream_if  *stream_if;
238    struct lsquic_stream_ctx       *st_ctx;
239    struct lsquic_conn_public      *conn_pub;
240    TAILQ_ENTRY(lsquic_stream)      next_send_stream, next_read_stream,
241                                        next_write_stream, next_service_stream,
242                                        next_prio_stream;
243
244    uint64_t                        tosend_off;
245    uint64_t                        sm_payload;     /* Not counting HQ frames */
246    uint64_t                        max_send_off;
247    uint64_t                        sm_last_recv_off;
248    uint64_t                        error_code;
249
250    /* From the network, we get frames, which we keep on a list ordered
251     * by offset.
252     */
253    struct data_in                 *data_in;
254    uint64_t                        read_offset;
255    lsquic_sfcw_t                   fc;
256
257    /* List of active HQ frames */
258    STAILQ_HEAD(, stream_hq_frame)  sm_hq_frames;
259
260    /* For efficiency, several frames are allocated as part of the stream
261     * itself.  If more frames are needed, they are allocated.
262     */
263    struct stream_hq_frame          sm_hq_frame_arr[NUM_ALLOCED_HQ_FRAMES];
264
265    struct hq_filter                sm_hq_filter;
266
267    /* We can safely use sm_hq_filter */
268#define sm_uni_type_state sm_hq_filter.hqfi_vint2_state.vr2s_varint_state
269
270    /** If @ref SMQF_WANT_FLUSH is set, flush until this offset. */
271    uint64_t                        sm_flush_to;
272
273    /**
274     * If @ref SMQF_WANT_FLUSH is set, this indicates payload offset
275     * to flush to.  Used to adjust @ref sm_flush_to when H3 frame
276     * size grows.
277     */
278    uint64_t                        sm_flush_to_payload;
279
280    /* Last offset sent in BLOCKED frame */
281    uint64_t                        blocked_off;
282
283    struct uncompressed_headers    *uh,
284                                   *push_req;
285
286    unsigned char                  *sm_buf;
287    void                           *sm_onnew_arg;
288
289    unsigned char                  *sm_header_block;
290    uint64_t                        sm_hb_compl;
291
292    /* Valid if STREAM_FIN_RECVD is set: */
293    uint64_t                        sm_fin_off;
294
295    /* A stream may be generating STREAM or CRYPTO frames */
296    size_t                        (*sm_frame_header_sz)(
297                                        const struct lsquic_stream *, unsigned);
298    enum swtp_status              (*sm_write_to_packet)(struct frame_gen_ctx *,
299                                                const size_t);
300    size_t                        (*sm_write_avail)(struct lsquic_stream *);
301    int                           (*sm_readable)(struct lsquic_stream *);
302
303    /* This element is optional */
304    const struct stream_filter_if  *sm_sfi;
305
306    /* sm_promise and sm_promises are never used at the same time and can
307     * be combined into a union should space in this struct become tight.
308     */
309    /* Push promise that engendered this push stream */
310    struct push_promise            *sm_promise;
311
312    /* Push promises sent on this stream */
313    SLIST_HEAD(, push_promise)      sm_promises;
314
315    uint64_t                        sm_last_frame_off;
316
317    /* How much data there is in sm_header_block and how much of it has been
318     * sent:
319     */
320    unsigned                        sm_hblock_sz,
321                                    sm_hblock_off;
322
323    unsigned short                  sm_n_buffered;  /* Amount of data in sm_buf */
324    unsigned short                  sm_n_allocated;  /* Size of sm_buf */
325
326    unsigned char                   sm_priority;  /* 0: high; 255: low */
327    unsigned char                   sm_enc_level;
328    enum {
329        SSHS_BEGIN,         /* Nothing has happened yet */
330        SSHS_ENC_SENDING,   /* Sending encoder stream data */
331        SSHS_HBLOCK_SENDING,/* Sending header block data */
332    }                               sm_send_headers_state:8;
333    signed char                     sm_saved_want_write;
334    signed char                     sm_has_frame;
335
336    unsigned char                   sm_dup_push_off;
337    unsigned char                   sm_dup_push_len;
338    unsigned char                   sm_dup_push_buf[8];
339
340#if LSQUIC_KEEP_STREAM_HISTORY
341    sm_hist_idx_t                   sm_hist_idx;
342#endif
343
344#if LSQUIC_KEEP_STREAM_HISTORY
345    /* Stream history: see enum stream_history_event */
346    unsigned char                   sm_hist_buf[ 1 << SM_HIST_BITS ];
347#endif
348};
349
350
351enum stream_ctor_flags
352{
353    SCF_CALL_ON_NEW   = (1 << (N_SMBF_FLAGS + 0)), /* Call on_new_stream() immediately */
354    SCF_USE_DI_HASH   = (1 << (N_SMBF_FLAGS + 1)), /* Use hash-based data input.  If not set,
355                                   * the nocopy data input is used.
356                                   */
357    SCF_CRYPTO_FRAMES = (1 << (N_SMBF_FLAGS + 2)), /* Write CRYPTO frames */
358    SCF_DI_AUTOSWITCH = SMBF_AUTOSWITCH, /* Automatically switch between nocopy
359                                   * and hash-based to data input for optimal
360                                   * performance.
361                                   */
362    SCF_DISP_RW_ONCE  = SMBF_RW_ONCE,
363    SCF_CRITICAL      = SMBF_CRITICAL, /* This is a critical stream */
364    SCF_IETF          = SMBF_IETF,
365    SCF_HTTP          = SMBF_USE_HEADERS,
366    SCF_CRYPTO        = SMBF_CRYPTO,
367    SCF_HEADERS       = SMBF_HEADERS,
368};
369
370
371lsquic_stream_t *
372lsquic_stream_new (lsquic_stream_id_t id, struct lsquic_conn_public *,
373                   const struct lsquic_stream_if *, void *stream_if_ctx,
374                   unsigned initial_sfrw, uint64_t initial_send_off,
375                   enum stream_ctor_flags);
376
377struct lsquic_stream *
378lsquic_stream_new_crypto (enum enc_level,
379        struct lsquic_conn_public *conn_pub,
380        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
381        enum stream_ctor_flags ctor_flags);
382
383void
384lsquic_stream_call_on_new (lsquic_stream_t *);
385
386void
387lsquic_stream_destroy (lsquic_stream_t *);
388
389/* Any of these flags will cause user-facing read and write and
390 * shutdown calls to return an error.  They also make the stream
391 * both readable and writeable, as we want the user to collect
392 * the error.
393 */
394#define lsquic_stream_is_reset(stream) \
395    (((stream)->stream_flags & \
396                    (STREAM_RST_RECVD|STREAM_RST_SENT|STREAM_SS_RECVD)) \
397        || ((stream)->sm_qflags & SMQF_SEND_RST))
398
399/* Data that from the network gets inserted into the stream using
400 * lsquic_stream_frame_in() function.  Returns 0 on success, -1 on
401 * failure.  The latter may be caused by flow control violation or
402 * invalid stream frame data, e.g. overlapping segments.
403 *
404 * Note that the caller does gives up control of `frame' no matter
405 * what this function returns.
406 *
407 * This data is read by the user using lsquic_stream_read() function.
408 */
409int
410lsquic_stream_frame_in (lsquic_stream_t *, struct stream_frame *frame);
411
412/* Only one (at least for now) uncompressed header structure is allowed to be
413 * passed in, and only in HTTP mode.
414 */
415int
416lsquic_stream_uh_in (lsquic_stream_t *, struct uncompressed_headers *);
417
418void
419lsquic_stream_push_req (lsquic_stream_t *,
420                        struct uncompressed_headers *push_req);
421
422int
423lsquic_stream_rst_in (lsquic_stream_t *, uint64_t offset, uint64_t error_code);
424
425void
426lsquic_stream_stop_sending_in (struct lsquic_stream *, uint64_t error_code);
427
428ssize_t
429lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len);
430
431uint64_t
432lsquic_stream_read_offset (const lsquic_stream_t *stream);
433
434/* Return true if we sent all available data to the network and write
435 * end of the stream was closed.
436 */
437int
438lsquic_stream_tosend_fin (const lsquic_stream_t *stream);
439
440/* Data to be sent out to the network is written using lsquic_stream_write().
441 */
442ssize_t
443lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len);
444
445void
446lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset);
447
448int
449lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset);
450
451/* The caller should only call this function if SMQF_SEND_WUF is set and
452 * it must generate a window update frame using this value.
453 */
454uint64_t
455lsquic_stream_fc_recv_off (lsquic_stream_t *stream);
456
457void
458lsquic_stream_peer_blocked (struct lsquic_stream *, uint64_t);
459
460void
461lsquic_stream_peer_blocked_gquic (struct lsquic_stream *);
462
463void
464lsquic_stream_dispatch_read_events (lsquic_stream_t *);
465
466void
467lsquic_stream_dispatch_write_events (lsquic_stream_t *);
468
469void
470lsquic_stream_blocked_frame_sent (lsquic_stream_t *);
471
472void
473lsquic_stream_rst_frame_sent (lsquic_stream_t *);
474
475void
476lsquic_stream_stream_frame_sent (lsquic_stream_t *);
477
478void
479lsquic_stream_reset (lsquic_stream_t *, uint64_t error_code);
480
481void
482lsquic_stream_reset_ext (lsquic_stream_t *, uint64_t error_code, int close);
483
484void
485lsquic_stream_call_on_close (lsquic_stream_t *);
486
487void
488lsquic_stream_shutdown_internal (lsquic_stream_t *);
489
490void
491lsquic_stream_received_goaway (lsquic_stream_t *);
492
493void
494lsquic_stream_acked (struct lsquic_stream *, enum quic_frame_type);
495
496#define lsquic_stream_is_closed(s)                                          \
497    (((s)->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))         \
498                            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))
499int
500lsquic_stream_update_sfcw (lsquic_stream_t *, uint64_t max_off);
501
502int
503lsquic_stream_set_priority_internal (lsquic_stream_t *, unsigned priority);
504
505#define lsquic_stream_is_critical(s) ((s)->sm_bflags & SMBF_CRITICAL)
506
507#define lsquic_stream_is_crypto(s) ((s)->sm_bflags & SMBF_CRYPTO)
508
509size_t
510lsquic_stream_mem_used (const struct lsquic_stream *);
511
512const lsquic_cid_t *
513lsquic_stream_cid (const struct lsquic_stream *);
514
515#define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0)
516
517int
518lsquic_stream_readable (struct lsquic_stream *);
519
520size_t
521lsquic_stream_write_avail (struct lsquic_stream *);
522
523void
524lsquic_stream_dump_state (const struct lsquic_stream *);
525
526#ifndef NDEBUG
527size_t
528lsquic_stream_flush_threshold (const struct lsquic_stream *, unsigned);
529#endif
530
531#define crypto_level(stream) (~0ULL - (stream)->id)
532
533void
534lsquic_stream_set_stream_if (struct lsquic_stream *,
535                   const struct lsquic_stream_if *, void *stream_if_ctx);
536
537struct qpack_dec_hdl *
538lsquic_stream_get_qdh (const struct lsquic_stream *);
539
540uint64_t
541lsquic_stream_combined_send_off (const struct lsquic_stream *);
542
543/* [draft-ietf-quic-transport-16] Section 3.1 */
544enum stream_state_sending
545{
546    SSS_READY,
547    SSS_SEND,
548    SSS_DATA_SENT,
549    SSS_RESET_SENT,
550    SSS_DATA_RECVD,
551    SSS_RESET_RECVD,
552};
553
554extern const char *const lsquic_sss2str[];
555
556enum stream_state_sending
557lsquic_stream_sending_state (const struct lsquic_stream *);
558
559/* [draft-ietf-quic-transport-16] Section 3.2 */
560enum stream_state_receiving
561{
562    SSR_RECV,
563    SSR_SIZE_KNOWN,
564    SSR_DATA_RECVD,
565    SSR_RESET_RECVD,
566    SSR_DATA_READ,
567    SSR_RESET_READ,
568};
569
570extern const char *const lsquic_ssr2str[];
571
572enum stream_state_receiving
573lsquic_stream_receiving_state (struct lsquic_stream *);
574
575uint64_t
576lsquic_stream_fc_recv_off_const (const struct lsquic_stream *);
577
578void
579lsquic_stream_max_stream_data_sent (struct lsquic_stream *);
580
581void
582lsquic_stream_qdec_unblocked (struct lsquic_stream *);
583
584int
585lsquic_stream_can_push (const struct lsquic_stream *);
586
587int
588lsquic_stream_duplicate_push (struct lsquic_stream *, uint64_t push_id);
589
590int
591lsquic_stream_push_promise (struct lsquic_stream *, struct push_promise *);
592
593void
594lsquic_stream_force_finish (struct lsquic_stream *);
595
596int
597lsquic_stream_header_is_pp (const struct lsquic_stream *);
598
599int
600lsquic_stream_header_is_trailer (const struct lsquic_stream *);
601
602#endif
603