lsquic_stream.h revision 4051ae3a
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2#ifndef LSQUIC_STREAM_H 3#define LSQUIC_STREAM_H 4 5#define LSQUIC_STREAM_DEFAULT_PRIO 16 /* RFC 7540, Section 5.3.5 */ 6 7 8struct lsquic_stream_if; 9struct lsquic_stream_ctx; 10struct lsquic_conn_public; 11struct stream_frame; 12struct uncompressed_headers; 13enum enc_level; 14enum swtp_status; 15struct frame_gen_ctx; 16struct data_frame; 17enum quic_frame_type; 18struct push_promise; 19union hblock_ctx; 20 21TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream); 22 23 24#ifndef LSQUIC_KEEP_STREAM_HISTORY 25# ifdef NDEBUG 26# define LSQUIC_KEEP_STREAM_HISTORY 0 27# else 28# define LSQUIC_KEEP_STREAM_HISTORY 1 29# endif 30#endif 31 32 33#if LSQUIC_KEEP_STREAM_HISTORY 34#define SM_HIST_BITS 6 35#define SM_HIST_IDX_MASK ((1 << SM_HIST_BITS) - 1) 36typedef unsigned char sm_hist_idx_t; 37#endif 38 39 40/* 41 * +----------+----------------------------------+ 42 * | Low Bits | Stream Type | 43 * +----------+----------------------------------+ 44 * | 0x0 | Client-Initiated, Bidirectional | 45 * | | | 46 * | 0x1 | Server-Initiated, Bidirectional | 47 * | | | 48 * | 0x2 | Client-Initiated, Unidirectional | 49 * | | | 50 * | 0x3 | Server-Initiated, Unidirectional | 51 * +----------+----------------------------------+ 52 */ 53 54enum stream_id_type 55{ 56 SIT_BIDI_CLIENT, 57 SIT_BIDI_SERVER, 58 SIT_UNI_CLIENT, 59 SIT_UNI_SERVER, 60 N_SITS 61}; 62 63#define SIT_MASK (N_SITS - 1) 64 65#define SIT_SHIFT 2 66#define SD_SHIFT 1 67 68enum stream_dir { SD_BIDI, SD_UNI, N_SDS }; 69 70 71struct stream_hq_frame 72{ 73 STAILQ_ENTRY(stream_hq_frame) 74 shf_next; 75 /* At which point in the stream (sm_payload) to insert the HQ frame. */ 76 uint64_t shf_off; 77 union { 78 /* Points to the frame if SHF_FIXED_SIZE is not set */ 79 unsigned char *frame_ptr; 80 /* If SHF_FIXED_SIZE is set, the size of the frame to follow. 81 * Non-fixed frame size gets calculated using sm_payload when they 82 * are closed. 83 */ 84 size_t frame_size; 85 } shf_u; 86#define shf_frame_ptr shf_u.frame_ptr 87#define shf_frame_size shf_u.frame_size 88 enum hq_frame_type shf_frame_type:8; 89 enum shf_flags { 90 SHF_TWO_BYTES = 1 << 0, /* Use two byte to encode frame length */ 91 SHF_FIXED_SIZE = 1 << 1, /* Payload size guaranteed */ 92 SHF_ACTIVE = 1 << 2, /* On sm_hq_frames list */ 93 SHF_WRITTEN = 1 << 3, /* Framing bytes have been packetized */ 94 SHF_CC_PAID = 1 << 4, /* Paid connection cap */ 95 SHF_PHANTOM = 1 << 5, /* Phantom frame headers are not written */ 96 } shf_flags:8; 97}; 98 99 100struct hq_filter 101{ 102 struct varint_read2_state hqfi_vint2_state; 103 /* No need to copy the values: use it directly */ 104#define hqfi_left hqfi_vint2_state.vr2s_two 105#define hqfi_type hqfi_vint2_state.vr2s_one 106 struct varint_read_state hqfi_vint1_state; 107#define hqfi_push_id hqfi_vint1_state.value 108 enum { 109 HQFI_FLAG_UNUSED_0 = 1 << 0, 110 HQFI_FLAG_ERROR = 1 << 1, 111 HQFI_FLAG_BEGIN = 1 << 2, 112 HQFI_FLAG_BLOCKED = 1 << 3, 113 } hqfi_flags:8; 114 enum { 115 HQFI_STATE_FRAME_HEADER_BEGIN, 116 HQFI_STATE_FRAME_HEADER_CONTINUE, 117 HQFI_STATE_READING_PAYLOAD, 118 HQFI_STATE_PUSH_ID_BEGIN, 119 HQFI_STATE_PUSH_ID_CONTINUE, 120 } hqfi_state:8; 121 unsigned char hqfi_hist_idx; 122#define MAX_HQFI_ENTRIES (sizeof(unsigned) * 8 / 3) 123 unsigned hqfi_hist_buf; 124}; 125 126 127struct stream_filter_if 128{ 129 int (*sfi_readable)(struct lsquic_stream *); 130 size_t (*sfi_filter_df)(struct lsquic_stream *, struct data_frame *); 131 void (*sfi_decr_left)(struct lsquic_stream *, size_t); 132}; 133 134 135/* These flags indicate which queues -- or other entities -- currently 136 * reference the stream. 137 */ 138enum stream_q_flags 139{ 140 /* read_streams: */ 141 SMQF_WANT_READ = 1 << 0, 142 143 /* write_streams: */ 144#define SMQF_WRITE_Q_FLAGS (SMQF_WANT_FLUSH|SMQF_WANT_WRITE) 145 SMQF_WANT_WRITE = 1 << 1, 146 SMQF_WANT_FLUSH = 1 << 2, /* Flush until sm_flush_to is hit */ 147 148 /* There are more than one reason that a stream may be put onto 149 * connections's sending_streams queue. Note that writing STREAM 150 * frames is done separately. 151 */ 152#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED) 153 /* sending_streams: */ 154 SMQF_SEND_WUF = 1 << 3, /* WUF: Window Update Frame */ 155 SMQF_SEND_BLOCKED = 1 << 4, 156 SMQF_SEND_RST = 1 << 5, /* Error: want to send RST_STREAM */ 157 158 /* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is 159 * the MAX_STREAM_DATA frame. Define an alias for use in the IETF 160 * QUIC code: 161 */ 162#define SMQF_SEND_MAX_STREAM_DATA SMQF_SEND_WUF 163 164#define SMQF_SERVICE_FLAGS (SMQF_CALL_ONCLOSE|SMQF_FREE_STREAM|SMQF_ABORT_CONN) 165 SMQF_CALL_ONCLOSE = 1 << 6, 166 SMQF_FREE_STREAM = 1 << 7, 167 SMQF_ABORT_CONN = 1 << 8, /* Unrecoverable error occurred */ 168 169 SMQF_QPACK_DEC = 1 << 9, /* QPACK decoder handler is holding a reference to this stream */ 170}; 171 172 173/* Stream behavior flags */ 174enum stream_b_flags 175{ 176 SMBF_SERVER = 1 << 0, 177 SMBF_IETF = 1 << 1, 178 SMBF_USE_HEADERS = 1 << 2, 179 SMBF_CRYPTO = 1 << 3, /* Crypto stream: applies to both gQUIC and IETF QUIC */ 180 SMBF_CRITICAL = 1 << 4, /* This is a critical stream */ 181 SMBF_AUTOSWITCH = 1 << 5, 182 SMBF_RW_ONCE = 1 << 6, /* When set, read/write events are dispatched once per call */ 183 SMBF_CONN_LIMITED = 1 << 7, 184 SMBF_HEADERS = 1 << 8, /* Headers stream */ 185 SMBF_VERIFY_CL = 1 << 9, /* Verify content-length (stored in sm_cont_len) */ 186#define N_SMBF_FLAGS 10 187}; 188 189 190enum stream_flags { 191 STREAM_FIN_RECVD = 1 << 0, /* Received STREAM frame with FIN bit set */ 192 STREAM_RST_RECVD = 1 << 1, /* Received RST frame */ 193 STREAM_LAST_WRITE_OK= 1 << 2, /* Used to break out of write event dispatch loop */ 194 STREAM_U_READ_DONE = 1 << 3, /* User is done reading (shutdown was called) */ 195 STREAM_U_WRITE_DONE = 1 << 4, /* User is done writing (shutdown was called) */ 196 STREAM_FIN_SENT = 1 << 5, /* FIN was written to network */ 197 STREAM_RST_SENT = 1 << 6, /* RST_STREAM was written to network */ 198 STREAM_FIN_REACHED = 1 << 7, /* User read data up to FIN */ 199 STREAM_FINISHED = 1 << 8, /* Stream is finished */ 200 STREAM_ONCLOSE_DONE = 1 << 9, /* on_close has been called */ 201 STREAM_CACHED_FRAME = 1 << 10, /* If set, sm_has_frame can be used */ 202 STREAM_HEADERS_SENT = 1 << 11, 203 STREAM_HAVE_UH = 1 << 12, /* Have uncompressed headers */ 204 STREAM_ENCODER_DEP = 1 << 13, /* Encoder dependency: flush (IETF only) */ 205 STREAM_HEAD_IN_FIN = 1 << 14, /* Incoming headers has FIN bit set */ 206 STREAM_FRAMES_ELIDED= 1 << 15, 207 STREAM_FORCE_FINISH = 1 << 16, /* Replaces FIN sent and received */ 208 STREAM_ONNEW_DONE = 1 << 17, /* on_new_stream has been called */ 209 STREAM_PUSHING = 1 << 18, 210 STREAM_NOPUSH = 1 << 19, /* Disallow further push promises */ 211 STREAM_GOAWAY_IN = 1 << 20, /* Incoming GOAWAY has been processed */ 212 STREAM_UNUSED21 = 1 << 21, /* Unused */ 213 STREAM_RST_ACKED = 1 << 22, /* Packet containing RST has been acked */ 214 STREAM_BLOCKED_SENT = 1 << 23, /* Stays set once a STREAM_BLOCKED frame is sent */ 215 STREAM_RST_READ = 1 << 24, /* User code collected the error */ 216 STREAM_DATA_RECVD = 1 << 25, /* Cache stream state calculation */ 217 STREAM_UNUSED26 = 1 << 26, /* Unused */ 218 STREAM_HDRS_FLUSHED = 1 << 27, /* Only used in buffered packets mode */ 219 STREAM_SS_RECVD = 1 << 28, /* Received STOP_SENDING frame */ 220 STREAM_DELAYED_SW = 1 << 29, /* Delayed shutdown_write call */ 221}; 222 223 224/* By keeping this number low, we make sure that the code to allocate HQ 225 * frames dynamically gets exercised whenever push promises are sent. 226 */ 227#define NUM_ALLOCED_HQ_FRAMES 2 228 229 230struct lsquic_stream 231{ 232 struct lsquic_hash_elem sm_hash_el; 233 lsquic_stream_id_t id; 234 enum stream_flags stream_flags; 235 enum stream_b_flags sm_bflags; 236 enum stream_q_flags sm_qflags; 237 unsigned n_unacked; 238 239 const struct lsquic_stream_if *stream_if; 240 struct lsquic_stream_ctx *st_ctx; 241 struct lsquic_conn_public *conn_pub; 242 TAILQ_ENTRY(lsquic_stream) next_send_stream, next_read_stream, 243 next_write_stream, next_service_stream, 244 next_prio_stream; 245 246 uint64_t tosend_off; 247 uint64_t sm_payload; /* Not counting HQ frames */ 248 uint64_t max_send_off; 249 uint64_t sm_last_recv_off; 250 uint64_t error_code; 251 252 /* From the network, we get frames, which we keep on a list ordered 253 * by offset. 254 */ 255 struct data_in *data_in; 256 uint64_t read_offset; 257 lsquic_sfcw_t fc; 258 259 /* List of active HQ frames */ 260 STAILQ_HEAD(, stream_hq_frame) sm_hq_frames; 261 262 /* For efficiency, several frames are allocated as part of the stream 263 * itself. If more frames are needed, they are allocated. 264 */ 265 struct stream_hq_frame sm_hq_frame_arr[NUM_ALLOCED_HQ_FRAMES]; 266 267 struct hq_filter sm_hq_filter; 268 269 /* We can safely use sm_hq_filter */ 270#define sm_uni_type_state sm_hq_filter.hqfi_vint2_state.vr2s_varint_state 271 272 /** If @ref SMQF_WANT_FLUSH is set, flush until this offset. */ 273 uint64_t sm_flush_to; 274 275 /** 276 * If @ref SMQF_WANT_FLUSH is set, this indicates payload offset 277 * to flush to. Used to adjust @ref sm_flush_to when H3 frame 278 * size grows. 279 */ 280 uint64_t sm_flush_to_payload; 281 282 /* Last offset sent in BLOCKED frame */ 283 uint64_t blocked_off; 284 285 struct uncompressed_headers *uh, 286 *push_req; 287 union hblock_ctx *sm_hblock_ctx; 288 289 unsigned char *sm_buf; 290 void *sm_onnew_arg; 291 292 unsigned char *sm_header_block; 293 uint64_t sm_hb_compl; 294 295 /* Valid if STREAM_FIN_RECVD is set: */ 296 uint64_t sm_fin_off; 297 298 /* A stream may be generating STREAM or CRYPTO frames */ 299 size_t (*sm_frame_header_sz)( 300 const struct lsquic_stream *, unsigned); 301 enum swtp_status (*sm_write_to_packet)(struct frame_gen_ctx *, 302 const size_t); 303 size_t (*sm_write_avail)(struct lsquic_stream *); 304 int (*sm_readable)(struct lsquic_stream *); 305 306 /* This element is optional */ 307 const struct stream_filter_if *sm_sfi; 308 309 /* sm_promise and sm_promises are never used at the same time and can 310 * be combined into a union should space in this struct become tight. 311 */ 312 /* Push promise that engendered this push stream */ 313 struct push_promise *sm_promise; 314 315 /* Push promises sent on this stream */ 316 SLIST_HEAD(, push_promise) sm_promises; 317 318 uint64_t sm_last_frame_off; 319 320#ifndef NDEBUG 321 /* Last time stream made progress */ 322 lsquic_time_t sm_last_prog; 323#endif 324 325 /* Content length specified in incoming `content-length' header field. 326 * Used to verify size of DATA frames. 327 */ 328 unsigned long long sm_cont_len; 329 /* Sum of bytes in all incoming DATA frames. Used for verification. */ 330 unsigned long long sm_data_in; 331 332 /* How much data there is in sm_header_block and how much of it has been 333 * sent: 334 */ 335 unsigned sm_hblock_sz, 336 sm_hblock_off; 337 338 unsigned short sm_n_buffered; /* Amount of data in sm_buf */ 339 unsigned short sm_n_allocated; /* Size of sm_buf */ 340 341 unsigned char sm_priority; /* 0: high; 255: low */ 342 unsigned char sm_enc_level; 343 enum { 344 SSHS_BEGIN, /* Nothing has happened yet */ 345 SSHS_ENC_SENDING, /* Sending encoder stream data */ 346 SSHS_HBLOCK_SENDING,/* Sending header block data */ 347 } sm_send_headers_state:8; 348 signed char sm_saved_want_write; 349 signed char sm_has_frame; 350 351#if LSQUIC_KEEP_STREAM_HISTORY 352 sm_hist_idx_t sm_hist_idx; 353#endif 354 355#if LSQUIC_KEEP_STREAM_HISTORY 356 /* Stream history: see enum stream_history_event */ 357 unsigned char sm_hist_buf[ 1 << SM_HIST_BITS ]; 358#endif 359}; 360 361 362enum stream_ctor_flags 363{ 364 SCF_CALL_ON_NEW = (1 << (N_SMBF_FLAGS + 0)), /* Call on_new_stream() immediately */ 365 SCF_USE_DI_HASH = (1 << (N_SMBF_FLAGS + 1)), /* Use hash-based data input. If not set, 366 * the nocopy data input is used. 367 */ 368 SCF_CRYPTO_FRAMES = (1 << (N_SMBF_FLAGS + 2)), /* Write CRYPTO frames */ 369 SCF_DI_AUTOSWITCH = SMBF_AUTOSWITCH, /* Automatically switch between nocopy 370 * and hash-based to data input for optimal 371 * performance. 372 */ 373 SCF_DISP_RW_ONCE = SMBF_RW_ONCE, 374 SCF_CRITICAL = SMBF_CRITICAL, /* This is a critical stream */ 375 SCF_IETF = SMBF_IETF, 376 SCF_HTTP = SMBF_USE_HEADERS, 377 SCF_CRYPTO = SMBF_CRYPTO, 378 SCF_HEADERS = SMBF_HEADERS, 379}; 380 381 382lsquic_stream_t * 383lsquic_stream_new (lsquic_stream_id_t id, struct lsquic_conn_public *, 384 const struct lsquic_stream_if *, void *stream_if_ctx, 385 unsigned initial_sfrw, uint64_t initial_send_off, 386 enum stream_ctor_flags); 387 388struct lsquic_stream * 389lsquic_stream_new_crypto (enum enc_level, 390 struct lsquic_conn_public *conn_pub, 391 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 392 enum stream_ctor_flags ctor_flags); 393 394void 395lsquic_stream_call_on_new (lsquic_stream_t *); 396 397void 398lsquic_stream_destroy (lsquic_stream_t *); 399 400/* Any of these flags will cause user-facing read and write and 401 * shutdown calls to return an error. They also make the stream 402 * both readable and writeable, as we want the user to collect 403 * the error. 404 */ 405#define lsquic_stream_is_reset(stream) \ 406 (((stream)->stream_flags & \ 407 (STREAM_RST_RECVD|STREAM_RST_SENT|STREAM_SS_RECVD)) \ 408 || ((stream)->sm_qflags & SMQF_SEND_RST)) 409 410/* Data that from the network gets inserted into the stream using 411 * lsquic_stream_frame_in() function. Returns 0 on success, -1 on 412 * failure. The latter may be caused by flow control violation or 413 * invalid stream frame data, e.g. overlapping segments. 414 * 415 * Note that the caller does gives up control of `frame' no matter 416 * what this function returns. 417 * 418 * This data is read by the user using lsquic_stream_read() function. 419 */ 420int 421lsquic_stream_frame_in (lsquic_stream_t *, struct stream_frame *frame); 422 423/* Only one (at least for now) uncompressed header structure is allowed to be 424 * passed in, and only in HTTP mode. 425 */ 426int 427lsquic_stream_uh_in (lsquic_stream_t *, struct uncompressed_headers *); 428 429void 430lsquic_stream_push_req (lsquic_stream_t *, 431 struct uncompressed_headers *push_req); 432 433int 434lsquic_stream_rst_in (lsquic_stream_t *, uint64_t offset, uint64_t error_code); 435 436void 437lsquic_stream_stop_sending_in (struct lsquic_stream *, uint64_t error_code); 438 439ssize_t 440lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len); 441 442uint64_t 443lsquic_stream_read_offset (const lsquic_stream_t *stream); 444 445/* Return true if we sent all available data to the network and write 446 * end of the stream was closed. 447 */ 448int 449lsquic_stream_tosend_fin (const lsquic_stream_t *stream); 450 451/* Data to be sent out to the network is written using lsquic_stream_write(). 452 */ 453ssize_t 454lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len); 455 456void 457lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset); 458 459int 460lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset); 461 462/* The caller should only call this function if SMQF_SEND_WUF is set and 463 * it must generate a window update frame using this value. 464 */ 465uint64_t 466lsquic_stream_fc_recv_off (lsquic_stream_t *stream); 467 468void 469lsquic_stream_peer_blocked (struct lsquic_stream *, uint64_t); 470 471void 472lsquic_stream_peer_blocked_gquic (struct lsquic_stream *); 473 474void 475lsquic_stream_dispatch_read_events (lsquic_stream_t *); 476 477void 478lsquic_stream_dispatch_write_events (lsquic_stream_t *); 479 480void 481lsquic_stream_blocked_frame_sent (lsquic_stream_t *); 482 483void 484lsquic_stream_rst_frame_sent (lsquic_stream_t *); 485 486void 487lsquic_stream_stream_frame_sent (lsquic_stream_t *); 488 489void 490lsquic_stream_reset (lsquic_stream_t *, uint64_t error_code); 491 492void 493lsquic_stream_reset_ext (lsquic_stream_t *, uint64_t error_code, int close); 494 495void 496lsquic_stream_call_on_close (lsquic_stream_t *); 497 498void 499lsquic_stream_shutdown_internal (lsquic_stream_t *); 500 501void 502lsquic_stream_received_goaway (lsquic_stream_t *); 503 504void 505lsquic_stream_acked (struct lsquic_stream *, enum quic_frame_type); 506 507#define lsquic_stream_is_closed(s) \ 508 (((s)->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE)) \ 509 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE)) 510int 511lsquic_stream_update_sfcw (lsquic_stream_t *, uint64_t max_off); 512 513int 514lsquic_stream_set_priority_internal (lsquic_stream_t *, unsigned priority); 515 516#define lsquic_stream_is_critical(s) ((s)->sm_bflags & SMBF_CRITICAL) 517 518#define lsquic_stream_is_crypto(s) ((s)->sm_bflags & SMBF_CRYPTO) 519 520size_t 521lsquic_stream_mem_used (const struct lsquic_stream *); 522 523const lsquic_cid_t * 524lsquic_stream_cid (const struct lsquic_stream *); 525 526#define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0) 527 528int 529lsquic_stream_readable (struct lsquic_stream *); 530 531size_t 532lsquic_stream_write_avail (struct lsquic_stream *); 533 534void 535lsquic_stream_dump_state (const struct lsquic_stream *); 536 537#ifndef NDEBUG 538size_t 539lsquic_stream_flush_threshold (const struct lsquic_stream *, unsigned); 540#endif 541 542#define crypto_level(stream) (~0ULL - (stream)->id) 543 544void 545lsquic_stream_set_stream_if (struct lsquic_stream *, 546 const struct lsquic_stream_if *, void *stream_if_ctx); 547 548uint64_t 549lsquic_stream_combined_send_off (const struct lsquic_stream *); 550 551/* [draft-ietf-quic-transport-16] Section 3.1 */ 552enum stream_state_sending 553{ 554 SSS_READY, 555 SSS_SEND, 556 SSS_DATA_SENT, 557 SSS_RESET_SENT, 558 SSS_DATA_RECVD, 559 SSS_RESET_RECVD, 560}; 561 562extern const char *const lsquic_sss2str[]; 563 564enum stream_state_sending 565lsquic_stream_sending_state (const struct lsquic_stream *); 566 567/* [draft-ietf-quic-transport-16] Section 3.2 */ 568enum stream_state_receiving 569{ 570 SSR_RECV, 571 SSR_SIZE_KNOWN, 572 SSR_DATA_RECVD, 573 SSR_RESET_RECVD, 574 SSR_DATA_READ, 575 SSR_RESET_READ, 576}; 577 578extern const char *const lsquic_ssr2str[]; 579 580enum stream_state_receiving 581lsquic_stream_receiving_state (struct lsquic_stream *); 582 583uint64_t 584lsquic_stream_fc_recv_off_const (const struct lsquic_stream *); 585 586void 587lsquic_stream_max_stream_data_sent (struct lsquic_stream *); 588 589void 590lsquic_stream_qdec_unblocked (struct lsquic_stream *); 591 592int 593lsquic_stream_can_push (const struct lsquic_stream *); 594 595int 596lsquic_stream_push_promise (struct lsquic_stream *, struct push_promise *); 597 598void 599lsquic_stream_force_finish (struct lsquic_stream *); 600 601int 602lsquic_stream_header_is_pp (const struct lsquic_stream *); 603 604int 605lsquic_stream_header_is_trailer (const struct lsquic_stream *); 606 607int 608lsquic_stream_verify_len (struct lsquic_stream *, unsigned long long); 609 610#define lsquic_stream_is_blocked(stream_) ((stream_)->blocked_off && \ 611 (stream_)->blocked_off == (stream_)->max_send_off) 612 613#endif 614