lsquic_stream.c revision ad08470c
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_varint.h" 39#include "lsquic_hq.h" 40#include "lsquic_hash.h" 41#include "lsquic_stream.h" 42#include "lsquic_conn_public.h" 43#include "lsquic_util.h" 44#include "lsquic_mm.h" 45#include "lsquic_headers_stream.h" 46#include "lsquic_conn.h" 47#include "lsquic_data_in_if.h" 48#include "lsquic_parse.h" 49#include "lsquic_packet_out.h" 50#include "lsquic_engine_public.h" 51#include "lsquic_senhist.h" 52#include "lsquic_pacer.h" 53#include "lsquic_cubic.h" 54#include "lsquic_bw_sampler.h" 55#include "lsquic_minmax.h" 56#include "lsquic_bbr.h" 57#include "lsquic_send_ctl.h" 58#include "lsquic_headers.h" 59#include "lsquic_ev_log.h" 60#include "lsquic_enc_sess.h" 61#include "lsqpack.h" 62#include "lsquic_frab_list.h" 63#include "lsquic_http1x_if.h" 64#include "lsquic_qdec_hdl.h" 65#include "lsquic_qenc_hdl.h" 66#include "lsquic_byteswap.h" 67#include "lsquic_ietf.h" 68#include "lsquic_push_promise.h" 69 70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 72#define LSQUIC_LOG_STREAM_ID stream->id 73#include "lsquic_logger.h" 74 75#define MIN(a, b) ((a) < (b) ? (a) : (b)) 76 77static void 78drop_frames_in (lsquic_stream_t *stream); 79 80static void 81maybe_schedule_call_on_close (lsquic_stream_t *stream); 82 83static int 84stream_wantread (lsquic_stream_t *stream, int is_want); 85 86static int 87stream_wantwrite (lsquic_stream_t *stream, int is_want); 88 89static ssize_t 90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 91 92static ssize_t 93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 94 95static int 96stream_flush (lsquic_stream_t *stream); 97 98static int 99stream_flush_nocheck (lsquic_stream_t *stream); 100 101static void 102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 103 104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 105 106static enum swtp_status 107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 108 109static enum swtp_status 110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 111 112static enum swtp_status 113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 114 115static size_t 116stream_write_avail_no_frames (struct lsquic_stream *); 117 118static size_t 119stream_write_avail_with_frames (struct lsquic_stream *); 120 121static size_t 122stream_write_avail_with_headers (struct lsquic_stream *); 123 124static int 125hq_filter_readable (struct lsquic_stream *stream); 126 127static void 128hq_decr_left (struct lsquic_stream *stream, size_t); 129 130static size_t 131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 132 133static int 134stream_readable_non_http (struct lsquic_stream *stream); 135 136static int 137stream_readable_http_gquic (struct lsquic_stream *stream); 138 139static int 140stream_readable_http_ietf (struct lsquic_stream *stream); 141 142static ssize_t 143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 144 145static size_t 146active_hq_frame_sizes (const struct lsquic_stream *); 147 148static void 149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 150 151static void 152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 153 154static void 155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 156 157static size_t 158stream_hq_frame_size (const struct stream_hq_frame *); 159 160const struct stream_filter_if hq_stream_filter_if = 161{ 162 .sfi_readable = hq_filter_readable, 163 .sfi_filter_df = hq_filter_df, 164 .sfi_decr_left = hq_decr_left, 165}; 166 167 168#if LSQUIC_KEEP_STREAM_HISTORY 169/* These values are printable ASCII characters for ease of printing the 170 * whole history in a single line of a log message. 171 * 172 * The list of events is not exhaustive: only most interesting events 173 * are recorded. 174 */ 175enum stream_history_event 176{ 177 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 178 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 179 SHE_REACH_FIN = 'a', 180 SHE_BLOCKED_OUT = 'b', 181 SHE_CREATED = 'C', 182 SHE_FRAME_IN = 'd', 183 SHE_FRAME_OUT = 'D', 184 SHE_RESET = 'e', 185 SHE_WINDOW_UPDATE = 'E', 186 SHE_FIN_IN = 'f', 187 SHE_FINISHED = 'F', 188 SHE_GOAWAY_IN = 'g', 189 SHE_USER_WRITE_HEADER = 'h', 190 SHE_HEADERS_IN = 'H', 191 SHE_IF_SWITCH = 'i', 192 SHE_ONCLOSE_SCHED = 'l', 193 SHE_ONCLOSE_CALL = 'L', 194 SHE_ONNEW = 'N', 195 SHE_SET_PRIO = 'p', 196 SHE_USER_READ = 'r', 197 SHE_SHUTDOWN_READ = 'R', 198 SHE_RST_IN = 's', 199 SHE_SS_IN = 'S', 200 SHE_RST_OUT = 't', 201 SHE_RST_ACKED = 'T', 202 SHE_FLUSH = 'u', 203 SHE_USER_WRITE_DATA = 'w', 204 SHE_SHUTDOWN_WRITE = 'W', 205 SHE_CLOSE = 'X', 206 SHE_DELAY_SW = 'y', 207 SHE_FORCE_FINISH = 'Z', 208 SHE_WANTREAD_NO = '0', /* "YES" must be one more than "NO" */ 209 SHE_WANTREAD_YES = '1', 210 SHE_WANTWRITE_NO = '2', 211 SHE_WANTWRITE_YES = '3', 212}; 213 214static void 215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 216{ 217 enum stream_history_event prev_event; 218 sm_hist_idx_t idx; 219 int plus; 220 221 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 222 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 223 idx = (idx - plus) & SM_HIST_IDX_MASK; 224 prev_event = stream->sm_hist_buf[idx]; 225 226 if (prev_event == sh_event && plus) 227 return; 228 229 if (prev_event == sh_event) 230 sh_event = SHE_PLUS; 231 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 232 233 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 234 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 235 stream->sm_hist_buf); 236} 237 238 239# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 240# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 241 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 242 LSQ_DEBUG("history: [%.*s]", \ 243 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 244 (stream)->sm_hist_buf); \ 245 } while (0) 246#else 247# define SM_HISTORY_APPEND(stream, event) 248# define SM_HISTORY_DUMP_REMAINING(stream) 249#endif 250 251 252static int 253stream_inside_callback (const lsquic_stream_t *stream) 254{ 255 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 256} 257 258 259static void 260maybe_conn_to_tickable (lsquic_stream_t *stream) 261{ 262 if (!stream_inside_callback(stream)) 263 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 264 stream->conn_pub->lconn); 265} 266 267 268/* Here, "readable" means that the user is able to read from the stream. */ 269static void 270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 271{ 272 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 273 { 274 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 275 stream->conn_pub->lconn); 276 } 277} 278 279 280/* Here, "writeable" means that data can be put into packets to be 281 * scheduled to be sent out. 282 * 283 * If `check_can_send' is false, it means that we do not need to check 284 * whether packets can be sent. This check was already performed when 285 * we packetized stream data. 286 */ 287static void 288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 289 int check_can_send) 290{ 291 if (!stream_inside_callback(stream) && 292 (!check_can_send 293 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 294 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 295 { 296 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 297 stream->conn_pub->lconn); 298 } 299} 300 301 302static int 303stream_stalled (const lsquic_stream_t *stream) 304{ 305 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 306 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 307 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 308} 309 310 311static size_t 312stream_stream_frame_header_sz (const struct lsquic_stream *stream, 313 unsigned data_sz) 314{ 315 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 316 stream->id, stream->tosend_off, data_sz); 317} 318 319 320static size_t 321stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 322 unsigned data_sz_IGNORED) 323{ 324 return stream->conn_pub->lconn->cn_pf 325 ->pf_calc_crypto_frame_header_sz(stream->tosend_off); 326} 327 328 329/* GQUIC-only function */ 330static int 331stream_is_hsk (const struct lsquic_stream *stream) 332{ 333 if (stream->sm_bflags & SMBF_IETF) 334 return 0; 335 else 336 return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE; 337} 338 339 340static struct lsquic_stream * 341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 342 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 343 enum stream_ctor_flags ctor_flags) 344{ 345 struct lsquic_stream *stream; 346 347 stream = calloc(1, sizeof(*stream)); 348 if (!stream) 349 return NULL; 350 351 if (ctor_flags & SCF_USE_DI_HASH) 352 stream->data_in = data_in_hash_new(conn_pub, id, 0); 353 else 354 stream->data_in = data_in_nocopy_new(conn_pub, id); 355 if (!stream->data_in) 356 { 357 free(stream); 358 return NULL; 359 } 360 361 stream->id = id; 362 stream->stream_if = stream_if; 363 stream->conn_pub = conn_pub; 364 stream->sm_onnew_arg = stream_if_ctx; 365 stream->sm_write_avail = stream_write_avail_no_frames; 366 367 STAILQ_INIT(&stream->sm_hq_frames); 368 369 stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1); 370 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 371 stream->sm_bflags |= SMBF_SERVER; 372 373 return stream; 374} 375 376 377/* TODO: The logic to figure out whether the stream is connection limited 378 * should be taken out of the constructor. The caller should specify this 379 * via one of enum stream_ctor_flags. 380 */ 381lsquic_stream_t * 382lsquic_stream_new (lsquic_stream_id_t id, 383 struct lsquic_conn_public *conn_pub, 384 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 385 unsigned initial_window, uint64_t initial_send_off, 386 enum stream_ctor_flags ctor_flags) 387{ 388 lsquic_cfcw_t *cfcw; 389 lsquic_stream_t *stream; 390 391 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 392 ctor_flags); 393 if (!stream) 394 return NULL; 395 396 if (!initial_window) 397 initial_window = 16 * 1024; 398 399 if (ctor_flags & SCF_IETF) 400 { 401 cfcw = &conn_pub->cfcw; 402 stream->sm_bflags |= SMBF_CONN_LIMITED; 403 if (ctor_flags & SCF_HTTP) 404 { 405 stream->sm_write_avail = stream_write_avail_with_headers; 406 stream->sm_readable = stream_readable_http_ietf; 407 stream->sm_sfi = &hq_stream_filter_if; 408 } 409 else 410 stream->sm_readable = stream_readable_non_http; 411 lsquic_stream_set_priority_internal(stream, 412 LSQUIC_STREAM_DEFAULT_PRIO); 413 stream->sm_write_to_packet = stream_write_to_packet_std; 414 } 415 else 416 { 417 if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id)) 418 cfcw = NULL; 419 else 420 { 421 cfcw = &conn_pub->cfcw; 422 stream->sm_bflags |= SMBF_CONN_LIMITED; 423 lsquic_stream_set_priority_internal(stream, 424 LSQUIC_STREAM_DEFAULT_PRIO); 425 } 426 if (stream->sm_bflags & SMBF_USE_HEADERS) 427 stream->sm_readable = stream_readable_http_gquic; 428 else 429 stream->sm_readable = stream_readable_non_http; 430 if (stream_is_hsk(stream)) 431 stream->sm_write_to_packet = stream_write_to_packet_hsk; 432 else 433 stream->sm_write_to_packet = stream_write_to_packet_std; 434 } 435 436 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 437 stream->max_send_off = initial_send_off; 438 LSQ_DEBUG("created stream"); 439 SM_HISTORY_APPEND(stream, SHE_CREATED); 440 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 441 if (ctor_flags & SCF_CALL_ON_NEW) 442 lsquic_stream_call_on_new(stream); 443 return stream; 444} 445 446 447struct lsquic_stream * 448lsquic_stream_new_crypto (enum enc_level enc_level, 449 struct lsquic_conn_public *conn_pub, 450 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 451 enum stream_ctor_flags ctor_flags) 452{ 453 struct lsquic_stream *stream; 454 lsquic_stream_id_t stream_id; 455 456 assert(ctor_flags & SCF_CRITICAL); 457 458 stream_id = ~0ULL - enc_level; 459 stream = stream_new_common(stream_id, conn_pub, stream_if, 460 stream_if_ctx, ctor_flags); 461 if (!stream) 462 return NULL; 463 464 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 465 stream->sm_enc_level = enc_level; 466 /* TODO: why have limit in crypto stream? Set it to UINT64_MAX? */ 467 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 468 stream->max_send_off = 16 * 1024; 469 LSQ_DEBUG("created crypto stream"); 470 SM_HISTORY_APPEND(stream, SHE_CREATED); 471 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 472 stream->sm_write_to_packet = stream_write_to_packet_crypto; 473 stream->sm_readable = stream_readable_non_http; 474 if (ctor_flags & SCF_CALL_ON_NEW) 475 lsquic_stream_call_on_new(stream); 476 return stream; 477} 478 479 480void 481lsquic_stream_call_on_new (lsquic_stream_t *stream) 482{ 483 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 484 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 485 { 486 LSQ_DEBUG("calling on_new_stream"); 487 SM_HISTORY_APPEND(stream, SHE_ONNEW); 488 stream->stream_flags |= STREAM_ONNEW_DONE; 489 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 490 stream); 491 } 492} 493 494 495static void 496decr_conn_cap (struct lsquic_stream *stream, size_t incr) 497{ 498 if (stream->sm_bflags & SMBF_CONN_LIMITED) 499 { 500 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 501 stream->conn_pub->conn_cap.cc_sent -= incr; 502 } 503} 504 505 506static void 507maybe_resize_stream_buffer (struct lsquic_stream *stream) 508{ 509 assert(0 == stream->sm_n_buffered); 510 511 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 512 { 513 free(stream->sm_buf); 514 stream->sm_buf = NULL; 515 stream->sm_n_allocated = 0; 516 } 517 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 518 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 519} 520 521 522static void 523drop_buffered_data (struct lsquic_stream *stream) 524{ 525 decr_conn_cap(stream, stream->sm_n_buffered); 526 stream->sm_n_buffered = 0; 527 maybe_resize_stream_buffer(stream); 528 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 529 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 530} 531 532 533static void 534destroy_uh (struct lsquic_stream *stream) 535{ 536 if (stream->uh) 537 { 538 if (stream->uh->uh_hset) 539 stream->conn_pub->enpub->enp_hsi_if 540 ->hsi_discard_header_set(stream->uh->uh_hset); 541 free(stream->uh); 542 stream->uh = NULL; 543 } 544} 545 546 547void 548lsquic_stream_destroy (lsquic_stream_t *stream) 549{ 550 struct push_promise *promise; 551 struct stream_hq_frame *shf; 552 553 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 554 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 555 STREAM_ONNEW_DONE) 556 { 557 stream->stream_flags |= STREAM_ONCLOSE_DONE; 558 stream->stream_if->on_close(stream, stream->st_ctx); 559 } 560 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 561 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 562 if (stream->sm_qflags & SMQF_WANT_READ) 563 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 564 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 565 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 566 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 567 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 568 if (stream->sm_qflags & SMQF_QPACK_DEC) 569 lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream); 570 drop_buffered_data(stream); 571 lsquic_sfcw_consume_rem(&stream->fc); 572 drop_frames_in(stream); 573 if (stream->push_req) 574 { 575 if (stream->push_req->uh_hset) 576 stream->conn_pub->enpub->enp_hsi_if 577 ->hsi_discard_header_set(stream->push_req->uh_hset); 578 free(stream->push_req); 579 } 580 while ((promise = SLIST_FIRST(&stream->sm_promises))) 581 { 582 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 583 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 584 } 585 if (stream->sm_promise) 586 { 587 assert(stream->sm_promise->pp_pushed_stream == stream); 588 stream->sm_promise->pp_pushed_stream = NULL; 589 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 590 } 591 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 592 stream_hq_frame_put(stream, shf); 593 destroy_uh(stream); 594 free(stream->sm_buf); 595 free(stream->sm_header_block); 596 LSQ_DEBUG("destroyed stream"); 597 SM_HISTORY_DUMP_REMAINING(stream); 598 free(stream); 599} 600 601 602static int 603stream_is_finished (const lsquic_stream_t *stream) 604{ 605 return lsquic_stream_is_closed(stream) 606 /* n_unacked checks that no outgoing packets that reference this 607 * stream are outstanding: 608 */ 609 && 0 == stream->n_unacked 610 /* This checks that no packets that reference this stream will 611 * become outstanding: 612 */ 613 && 0 == (stream->sm_qflags & SMQF_SEND_RST) 614 && ((stream->stream_flags & STREAM_FORCE_FINISH) 615 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 616} 617 618 619/* This is an internal function */ 620void 621lsquic_stream_force_finish (struct lsquic_stream *stream) 622{ 623 LSQ_DEBUG("stream is now finished"); 624 SM_HISTORY_APPEND(stream, SHE_FINISHED); 625 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 626 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 627 next_service_stream); 628 stream->sm_qflags |= SMQF_FREE_STREAM; 629 stream->stream_flags |= STREAM_FINISHED; 630} 631 632 633static void 634maybe_finish_stream (lsquic_stream_t *stream) 635{ 636 if (0 == (stream->stream_flags & STREAM_FINISHED) && 637 stream_is_finished(stream)) 638 lsquic_stream_force_finish(stream); 639} 640 641 642static void 643maybe_schedule_call_on_close (lsquic_stream_t *stream) 644{ 645 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 646 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 647 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 648 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 649 { 650 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 651 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 652 next_service_stream); 653 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 654 LSQ_DEBUG("scheduled calling on_close"); 655 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 656 } 657} 658 659 660void 661lsquic_stream_call_on_close (lsquic_stream_t *stream) 662{ 663 assert(stream->stream_flags & STREAM_ONNEW_DONE); 664 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 665 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 666 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 667 next_service_stream); 668 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 669 { 670 LSQ_DEBUG("calling on_close"); 671 stream->stream_flags |= STREAM_ONCLOSE_DONE; 672 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 673 stream->stream_if->on_close(stream, stream->st_ctx); 674 } 675 else 676 assert(0); 677} 678 679 680static int 681stream_readable_non_http (struct lsquic_stream *stream) 682{ 683 return stream->data_in->di_if->di_get_frame(stream->data_in, 684 stream->read_offset) != NULL; 685} 686 687 688static int 689stream_readable_http_gquic (struct lsquic_stream *stream) 690{ 691 return (stream->stream_flags & STREAM_HAVE_UH) 692 && (stream->uh 693 || stream->data_in->di_if->di_get_frame(stream->data_in, 694 stream->read_offset)); 695} 696 697 698static int 699stream_readable_http_ietf (struct lsquic_stream *stream) 700{ 701 return 702 /* If we have read the header set and the header set has not yet 703 * been read, the stream is readable. 704 */ 705 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 706 || 707 /* Alternatively, run the filter and check for payload availability. */ 708 (stream->sm_sfi->sfi_readable(stream) 709 && (/* Running the filter may result in hitting FIN: */ 710 (stream->stream_flags & STREAM_FIN_REACHED) 711 || stream->data_in->di_if->di_get_frame(stream->data_in, 712 stream->read_offset))); 713} 714 715 716int 717lsquic_stream_readable (struct lsquic_stream *stream) 718{ 719 /* A stream is readable if one of the following is true: */ 720 return 721 /* - It is already finished: in that case, lsquic_stream_read() will 722 * return 0. 723 */ 724 (stream->stream_flags & STREAM_FIN_REACHED) 725 /* - The stream is reset, by either side. In this case, 726 * lsquic_stream_read() will return -1 (we want the user to be 727 * able to collect the error). 728 */ 729 || lsquic_stream_is_reset(stream) 730 /* Type-dependent readability check: */ 731 || stream->sm_readable(stream); 732 ; 733} 734 735 736static size_t 737stream_write_avail_no_frames (struct lsquic_stream *stream) 738{ 739 uint64_t stream_avail, conn_avail; 740 741 stream_avail = stream->max_send_off - stream->tosend_off 742 - stream->sm_n_buffered; 743 744 if (stream->sm_bflags & SMBF_CONN_LIMITED) 745 { 746 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 747 if (conn_avail < stream_avail) 748 stream_avail = conn_avail; 749 } 750 751 return stream_avail; 752} 753 754 755static size_t 756stream_write_avail_with_frames (struct lsquic_stream *stream) 757{ 758 uint64_t stream_avail, conn_avail; 759 const struct stream_hq_frame *shf; 760 size_t size; 761 762 stream_avail = stream->max_send_off - stream->tosend_off 763 - stream->sm_n_buffered; 764 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 765 if (!(shf->shf_flags & SHF_WRITTEN)) 766 { 767 size = stream_hq_frame_size(shf); 768 assert(size <= stream_avail); 769 stream_avail -= size; 770 } 771 772 if (stream->sm_bflags & SMBF_CONN_LIMITED) 773 { 774 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 775 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 776 if (!(shf->shf_flags & SHF_CC_PAID)) 777 { 778 size = stream_hq_frame_size(shf); 779 if (size < conn_avail) 780 conn_avail -= size; 781 else 782 return 0; 783 } 784 if (conn_avail < stream_avail) 785 stream_avail = conn_avail; 786 } 787 788 if (stream_avail >= 3 /* Smallest new frame */) 789 return stream_avail; 790 else 791 return 0; 792} 793 794 795static int 796stream_is_pushing_promise (const struct lsquic_stream *stream) 797{ 798 return (stream->stream_flags & STREAM_PUSHING) 799 && SLIST_FIRST(&stream->sm_promises) 800 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 801 ; 802} 803 804 805/* To prevent deadlocks, ensure that when headers are sent, the bytes 806 * sent on the encoder stream are written first. 807 * 808 * XXX If the encoder is set up in non-risking mode, it is perfectly 809 * fine to send the header block first. TODO: update the logic to 810 * reflect this. There should be two sending behaviors: risk and non-risk. 811 * For now, we assume risk for everything to be on the conservative side. 812 */ 813static size_t 814stream_write_avail_with_headers (struct lsquic_stream *stream) 815{ 816 if (stream->stream_flags & STREAM_PUSHING) 817 return stream_write_avail_with_frames(stream); 818 819 switch (stream->sm_send_headers_state) 820 { 821 case SSHS_BEGIN: 822 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 823 case SSHS_ENC_SENDING: 824 if (stream->sm_hb_compl > 825 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 826 return 0; 827 LSQ_DEBUG("encoder stream bytes have all been sent"); 828 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 829 /* fall-through */ 830 default: 831 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 832 return stream_write_avail_with_frames(stream); 833 } 834} 835 836 837size_t 838lsquic_stream_write_avail (struct lsquic_stream *stream) 839{ 840 return stream->sm_write_avail(stream); 841} 842 843 844int 845lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 846{ 847 struct lsquic_conn *lconn; 848 849 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 850 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 851 { 852 if (stream->sm_bflags & SMBF_IETF) 853 { 854 lconn = stream->conn_pub->lconn; 855 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 856 "flow control violation on stream %"PRIu64, stream->id); 857 } 858 return -1; 859 } 860 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 861 { 862 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 863 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 864 next_send_stream); 865 stream->sm_qflags |= SMQF_SEND_WUF; 866 } 867 return 0; 868} 869 870 871int 872lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 873{ 874 uint64_t max_off; 875 int got_next_offset, rv, free_frame; 876 enum ins_frame ins_frame; 877 struct lsquic_conn *lconn; 878 879 assert(frame->packet_in); 880 881 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 882 LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; " 883 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 884 885 if ((stream->sm_bflags & SMBF_USE_HEADERS) 886 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 887 { 888 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 889 lsquic_malo_put(frame); 890 return -1; 891 } 892 893 if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF) 894 && (stream->stream_flags & STREAM_FIN_RECVD) 895 && stream->sm_fin_off != DF_END(frame)) 896 { 897 lconn = stream->conn_pub->lconn; 898 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 899 "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does " 900 "not match previous final size %"PRIu64, DF_END(frame), 901 stream->id, stream->sm_fin_off); 902 return -1; 903 } 904 905 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 906 insert_frame: 907 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 908 if (INS_FRAME_OK == ins_frame) 909 { 910 /* Update maximum offset in the flow controller and check for flow 911 * control violation: 912 */ 913 rv = -1; 914 free_frame = !stream->data_in->di_if->di_own_on_ok; 915 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 916 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 917 goto end_ok; 918 if (frame->data_frame.df_fin) 919 { 920 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 921 stream->stream_flags |= STREAM_FIN_RECVD; 922 stream->sm_fin_off = DF_END(frame); 923 maybe_finish_stream(stream); 924 } 925 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 926 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 927 { 928 stream->data_in = stream->data_in->di_if->di_switch_impl( 929 stream->data_in, stream->read_offset); 930 if (!stream->data_in) 931 { 932 stream->data_in = data_in_error_new(); 933 goto end_ok; 934 } 935 } 936 if (got_next_offset) 937 /* Checking the offset saves di_get_frame() call */ 938 maybe_conn_to_tickable_if_readable(stream); 939 rv = 0; 940 end_ok: 941 if (free_frame) 942 lsquic_malo_put(frame); 943 return rv; 944 } 945 else if (INS_FRAME_DUP == ins_frame) 946 { 947 return 0; 948 } 949 else if (INS_FRAME_OVERLAP == ins_frame) 950 { 951 LSQ_DEBUG("overlap: switching DATA IN implementation"); 952 stream->data_in = stream->data_in->di_if->di_switch_impl( 953 stream->data_in, stream->read_offset); 954 if (stream->data_in) 955 goto insert_frame; 956 stream->data_in = data_in_error_new(); 957 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 958 lsquic_malo_put(frame); 959 return -1; 960 } 961 else 962 { 963 assert(INS_FRAME_ERR == ins_frame); 964 return -1; 965 } 966} 967 968 969static void 970drop_frames_in (lsquic_stream_t *stream) 971{ 972 if (stream->data_in) 973 { 974 stream->data_in->di_if->di_destroy(stream->data_in); 975 /* To avoid checking whether `data_in` is set, just set to the error 976 * data-in stream. It does the right thing after incoming data is 977 * dropped. 978 */ 979 stream->data_in = data_in_error_new(); 980 } 981} 982 983 984static void 985maybe_elide_stream_frames (struct lsquic_stream *stream) 986{ 987 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 988 { 989 if (stream->n_unacked) 990 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 991 stream->id); 992 stream->stream_flags |= STREAM_FRAMES_ELIDED; 993 } 994} 995 996 997int 998lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 999 uint64_t error_code) 1000{ 1001 struct lsquic_conn *lconn; 1002 1003 if ((stream->sm_bflags & SMBF_IETF) 1004 && (stream->stream_flags & STREAM_FIN_RECVD) 1005 && stream->sm_fin_off != offset) 1006 { 1007 lconn = stream->conn_pub->lconn; 1008 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1009 "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") " 1010 "does not match previous final size %"PRIu64, offset, 1011 stream->id, stream->sm_fin_off); 1012 return -1; 1013 } 1014 1015 if (stream->stream_flags & STREAM_RST_RECVD) 1016 { 1017 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 1018 return 0; 1019 } 1020 1021 SM_HISTORY_APPEND(stream, SHE_RST_IN); 1022 /* This flag must always be set, even if we are "ignoring" it: it is 1023 * used by elision code. 1024 */ 1025 stream->stream_flags |= STREAM_RST_RECVD; 1026 1027 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 1028 { 1029 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is " 1030 "smaller than that of byte following the last byte we have seen: " 1031 "0x%"PRIX64, offset, 1032 lsquic_sfcw_get_max_recv_off(&stream->fc)); 1033 return -1; 1034 } 1035 1036 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 1037 { 1038 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64 1039 " violates flow control", offset); 1040 return -1; 1041 } 1042 1043 /* Let user collect error: */ 1044 maybe_conn_to_tickable_if_readable(stream); 1045 1046 lsquic_sfcw_consume_rem(&stream->fc); 1047 drop_frames_in(stream); 1048 drop_buffered_data(stream); 1049 maybe_elide_stream_frames(stream); 1050 1051 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1052 && !(stream->sm_qflags & SMQF_SEND_RST)) 1053 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1054 1055 stream->stream_flags |= STREAM_RST_RECVD; 1056 1057 maybe_finish_stream(stream); 1058 maybe_schedule_call_on_close(stream); 1059 1060 return 0; 1061} 1062 1063 1064void 1065lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1066 uint64_t error_code) 1067{ 1068 if (stream->stream_flags & STREAM_SS_RECVD) 1069 { 1070 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1071 return; 1072 } 1073 1074 SM_HISTORY_APPEND(stream, SHE_SS_IN); 1075 stream->stream_flags |= STREAM_SS_RECVD; 1076 1077 /* Let user collect error: */ 1078 maybe_conn_to_tickable_if_readable(stream); 1079 1080 lsquic_sfcw_consume_rem(&stream->fc); 1081 drop_frames_in(stream); 1082 drop_buffered_data(stream); 1083 maybe_elide_stream_frames(stream); 1084 1085 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1086 && !(stream->sm_qflags & SMQF_SEND_RST)) 1087 lsquic_stream_reset_ext(stream, error_code, 0); 1088 1089 maybe_finish_stream(stream); 1090 maybe_schedule_call_on_close(stream); 1091} 1092 1093 1094uint64_t 1095lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1096{ 1097 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1098} 1099 1100 1101void 1102lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1103{ 1104 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1105 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1106 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1107 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1108 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1109} 1110 1111 1112uint64_t 1113lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1114{ 1115 assert(stream->sm_qflags & SMQF_SEND_WUF); 1116 stream->sm_qflags &= ~SMQF_SEND_WUF; 1117 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1118 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1119 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1120} 1121 1122 1123void 1124lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1125{ 1126 uint64_t last_off; 1127 1128 if (stream->sm_last_recv_off) 1129 last_off = stream->sm_last_recv_off; 1130 else 1131 /* This gets advertized in transport parameters */ 1132 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1133 1134 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1135 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1136 1137 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1138 { 1139 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1140 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1141 next_send_stream); 1142 stream->sm_qflags |= SMQF_SEND_WUF; 1143 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1144 } 1145 else if (stream->sm_qflags & SMQF_SEND_WUF) 1146 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1147 else if (stream->sm_last_recv_off) 1148 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1149 "packetized or sent", stream->sm_last_recv_off); 1150 else 1151 LSQ_INFO("Peer should have receive transport param limit " 1152 "of %"PRIu64"; odd.", last_off); 1153} 1154 1155 1156/* GQUIC's BLOCKED frame does not have an offset */ 1157void 1158lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1159{ 1160 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1161 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1162 { 1163 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1164 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1165 next_send_stream); 1166 stream->sm_qflags |= SMQF_SEND_WUF; 1167 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1168 } 1169 else 1170 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1171} 1172 1173 1174void 1175lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1176{ 1177 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1178 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1179 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1180 stream->stream_flags |= STREAM_BLOCKED_SENT; 1181 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1182 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1183} 1184 1185 1186void 1187lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1188{ 1189 assert(stream->sm_qflags & SMQF_SEND_RST); 1190 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1191 stream->sm_qflags &= ~SMQF_SEND_RST; 1192 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1193 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1194 stream->stream_flags |= STREAM_RST_SENT; 1195 maybe_finish_stream(stream); 1196} 1197 1198 1199static size_t 1200read_uh (struct lsquic_stream *stream, 1201 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1202{ 1203 struct http1x_headers *const h1h = stream->uh->uh_hset; 1204 size_t nread; 1205 1206 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1207 h1h->h1h_size - h1h->h1h_off, 1208 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1209 h1h->h1h_off += nread; 1210 if (h1h->h1h_off == h1h->h1h_size) 1211 { 1212 LSQ_DEBUG("read all uncompressed headers"); 1213 destroy_uh(stream); 1214 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1215 { 1216 stream->stream_flags |= STREAM_FIN_REACHED; 1217 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1218 } 1219 } 1220 return nread; 1221} 1222 1223 1224static void 1225stream_consumed_bytes (struct lsquic_stream *stream) 1226{ 1227 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1228 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1229 { 1230 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1231 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1232 next_send_stream); 1233 stream->sm_qflags |= SMQF_SEND_WUF; 1234 maybe_conn_to_tickable_if_writeable(stream, 1); 1235 } 1236} 1237 1238 1239struct read_frames_status 1240{ 1241 int error; 1242 int processed_frames; 1243 size_t total_nread; 1244}; 1245 1246 1247static struct read_frames_status 1248read_data_frames (struct lsquic_stream *stream, int do_filtering, 1249 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1250{ 1251 struct data_frame *data_frame; 1252 size_t nread, toread, total_nread; 1253 int short_read, processed_frames; 1254 1255 processed_frames = 0; 1256 total_nread = 0; 1257 1258 while ((data_frame = stream->data_in->di_if->di_get_frame( 1259 stream->data_in, stream->read_offset))) 1260 { 1261 1262 ++processed_frames; 1263 1264 do 1265 { 1266 if (do_filtering && stream->sm_sfi) 1267 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1268 else 1269 toread = data_frame->df_size - data_frame->df_read_off; 1270 1271 if (toread || data_frame->df_fin) 1272 { 1273 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1274 toread, data_frame->df_fin); 1275 if (do_filtering && stream->sm_sfi) 1276 stream->sm_sfi->sfi_decr_left(stream, nread); 1277 data_frame->df_read_off += nread; 1278 stream->read_offset += nread; 1279 total_nread += nread; 1280 short_read = nread < toread; 1281 } 1282 else 1283 short_read = 0; 1284 1285 if (data_frame->df_read_off == data_frame->df_size) 1286 { 1287 const int fin = data_frame->df_fin; 1288 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1289 data_frame = NULL; 1290 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 1291 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 1292 { 1293 stream->data_in = stream->data_in->di_if->di_switch_impl( 1294 stream->data_in, stream->read_offset); 1295 if (!stream->data_in) 1296 { 1297 stream->data_in = data_in_error_new(); 1298 return (struct read_frames_status) { .error = 1, }; 1299 } 1300 } 1301 if (fin) 1302 { 1303 stream->stream_flags |= STREAM_FIN_REACHED; 1304 goto end_while; 1305 } 1306 } 1307 else if (short_read) 1308 goto end_while; 1309 } 1310 while (data_frame); 1311 } 1312 end_while: 1313 1314 if (processed_frames) 1315 stream_consumed_bytes(stream); 1316 1317 return (struct read_frames_status) { 1318 .error = 0, 1319 .processed_frames = processed_frames, 1320 .total_nread = total_nread, 1321 }; 1322} 1323 1324 1325static ssize_t 1326stream_readf (struct lsquic_stream *stream, 1327 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1328{ 1329 size_t total_nread, nread; 1330 int read_unc_headers; 1331 1332 total_nread = 0; 1333 1334 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1335 == (SMBF_USE_HEADERS|SMBF_IETF) 1336 && !(stream->stream_flags & STREAM_HAVE_UH) 1337 && !stream->uh) 1338 { 1339 if (stream->sm_readable(stream)) 1340 { 1341 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1342 { 1343 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1344 errno = EBADMSG; 1345 return -1; 1346 } 1347 assert(stream->uh); 1348 } 1349 else 1350 { 1351 errno = EWOULDBLOCK; 1352 return -1; 1353 } 1354 } 1355 1356 if (stream->uh) 1357 { 1358 if (stream->uh->uh_flags & UH_H1H) 1359 { 1360 nread = read_uh(stream, readf, ctx); 1361 read_unc_headers = nread > 0; 1362 total_nread += nread; 1363 if (stream->uh) 1364 return total_nread; 1365 } 1366 else 1367 { 1368 LSQ_INFO("header set not claimed: cannot read from stream"); 1369 return -1; 1370 } 1371 } 1372 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1373 && !(stream->stream_flags & STREAM_HAVE_UH)) 1374 { 1375 LSQ_DEBUG("cannot read: headers not available"); 1376 errno = EWOULDBLOCK; 1377 return -1; 1378 } 1379 else 1380 read_unc_headers = 0; 1381 1382 const struct read_frames_status rfs 1383 = read_data_frames(stream, 1, readf, ctx); 1384 if (rfs.error) 1385 return -1; 1386 total_nread += rfs.total_nread; 1387 1388 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 1389 total_nread, stream->read_offset); 1390 1391 if (rfs.processed_frames || read_unc_headers) 1392 { 1393 return total_nread; 1394 } 1395 else 1396 { 1397 assert(0 == total_nread); 1398 errno = EWOULDBLOCK; 1399 return -1; 1400 } 1401} 1402 1403 1404/* This function returns 0 when EOF is reached. 1405 */ 1406ssize_t 1407lsquic_stream_readf (struct lsquic_stream *stream, 1408 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1409{ 1410 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1411 1412 if (lsquic_stream_is_reset(stream)) 1413 { 1414 if (stream->stream_flags & STREAM_RST_RECVD) 1415 stream->stream_flags |= STREAM_RST_READ; 1416 errno = ECONNRESET; 1417 return -1; 1418 } 1419 if (stream->stream_flags & STREAM_U_READ_DONE) 1420 { 1421 errno = EBADF; 1422 return -1; 1423 } 1424 if ((stream->stream_flags & STREAM_FIN_REACHED) 1425 && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH) 1426 ^ !!(stream->sm_bflags & SMBF_USE_HEADERS))) 1427 return 0; 1428 1429 return stream_readf(stream, readf, ctx); 1430} 1431 1432 1433struct readv_ctx 1434{ 1435 const struct iovec *iov; 1436 const struct iovec *const end; 1437 unsigned char *p; 1438}; 1439 1440 1441static size_t 1442readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1443{ 1444 struct readv_ctx *const ctx = ctx_p; 1445 const unsigned char *const end = buf + len; 1446 size_t ntocopy; 1447 1448 while (ctx->iov < ctx->end && buf < end) 1449 { 1450 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1451 - ctx->p; 1452 if (ntocopy > (size_t) (end - buf)) 1453 ntocopy = end - buf; 1454 memcpy(ctx->p, buf, ntocopy); 1455 ctx->p += ntocopy; 1456 buf += ntocopy; 1457 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1458 { 1459 do 1460 ++ctx->iov; 1461 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1462 if (ctx->iov < ctx->end) 1463 ctx->p = ctx->iov->iov_base; 1464 else 1465 ctx->p = NULL; 1466 } 1467 } 1468 1469 return len - (end - buf); 1470} 1471 1472 1473ssize_t 1474lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1475 int iovcnt) 1476{ 1477 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1478 return lsquic_stream_readf(stream, readv_f, &ctx); 1479} 1480 1481 1482ssize_t 1483lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1484{ 1485 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1486 return lsquic_stream_readv(stream, &iov, 1); 1487} 1488 1489 1490static void 1491stream_shutdown_read (lsquic_stream_t *stream) 1492{ 1493 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1494 { 1495 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1496 stream->stream_flags |= STREAM_U_READ_DONE; 1497 stream_wantread(stream, 0); 1498 maybe_finish_stream(stream); 1499 } 1500} 1501 1502 1503static int 1504stream_is_incoming_unidir (const struct lsquic_stream *stream) 1505{ 1506 enum stream_id_type sit; 1507 1508 if (stream->sm_bflags & SMBF_IETF) 1509 { 1510 sit = stream->id & SIT_MASK; 1511 if (stream->sm_bflags & SMBF_SERVER) 1512 return sit == SIT_UNI_CLIENT; 1513 else 1514 return sit == SIT_UNI_SERVER; 1515 } 1516 else 1517 return 0; 1518} 1519 1520 1521static void 1522stream_shutdown_write (lsquic_stream_t *stream) 1523{ 1524 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1525 return; 1526 1527 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1528 stream->stream_flags |= STREAM_U_WRITE_DONE; 1529 stream_wantwrite(stream, 0); 1530 1531 /* Don't bother to check whether there is anything else to write if 1532 * the flags indicate that nothing else should be written. 1533 */ 1534 if (!(stream->sm_bflags & SMBF_CRYPTO) 1535 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1536 && !stream_is_incoming_unidir(stream) 1537 && !(stream->sm_qflags & SMQF_SEND_RST)) 1538 { 1539 if (stream->sm_n_buffered == 0) 1540 { 1541 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1542 stream)) 1543 { 1544 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1545 stream->stream_flags |= STREAM_FIN_SENT; 1546 } 1547 else 1548 { 1549 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1550 "flag in it"); 1551 (void) stream_flush_nocheck(stream); 1552 } 1553 } 1554 else 1555 (void) stream_flush_nocheck(stream); 1556 } 1557} 1558 1559 1560static void 1561maybe_stream_shutdown_write (struct lsquic_stream *stream) 1562{ 1563 if (stream->sm_send_headers_state == SSHS_BEGIN) 1564 stream_shutdown_write(stream); 1565 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1566 { 1567 LSQ_DEBUG("shutdown delayed"); 1568 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1569 stream->stream_flags |= STREAM_DELAYED_SW; 1570 } 1571} 1572 1573 1574int 1575lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1576{ 1577 LSQ_DEBUG("shutdown; how: %d", how); 1578 if (lsquic_stream_is_closed(stream)) 1579 { 1580 LSQ_INFO("Attempt to shut down a closed stream"); 1581 errno = EBADF; 1582 return -1; 1583 } 1584 /* 0: read, 1: write: 2: read and write 1585 */ 1586 if (how < 0 || how > 2) 1587 { 1588 errno = EINVAL; 1589 return -1; 1590 } 1591 1592 if (how) 1593 maybe_stream_shutdown_write(stream); 1594 if (how != 1) 1595 stream_shutdown_read(stream); 1596 1597 maybe_finish_stream(stream); 1598 maybe_schedule_call_on_close(stream); 1599 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1600 maybe_conn_to_tickable_if_writeable(stream, 1); 1601 1602 return 0; 1603} 1604 1605 1606void 1607lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1608{ 1609 LSQ_DEBUG("internal shutdown"); 1610 if (lsquic_stream_is_critical(stream)) 1611 { 1612 LSQ_DEBUG("add flag to force-finish special stream"); 1613 stream->stream_flags |= STREAM_FORCE_FINISH; 1614 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1615 } 1616 maybe_finish_stream(stream); 1617 maybe_schedule_call_on_close(stream); 1618} 1619 1620 1621static void 1622fake_reset_unused_stream (lsquic_stream_t *stream) 1623{ 1624 stream->stream_flags |= 1625 STREAM_RST_RECVD /* User will pick this up on read or write */ 1626 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1627 ; 1628 1629 /* Cancel all writes to the network scheduled for this stream: */ 1630 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1631 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1632 next_send_stream); 1633 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1634 drop_buffered_data(stream); 1635 LSQ_DEBUG("fake-reset stream%s", 1636 stream_stalled(stream) ? " (stalled)" : ""); 1637 maybe_finish_stream(stream); 1638 maybe_schedule_call_on_close(stream); 1639} 1640 1641 1642/* This function should only be called for locally-initiated streams whose ID 1643 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1644 * frame sent by peer but we have not yet received it and created a stream. 1645 * In this situation, we mark the stream as reset, so that user's on_read or 1646 * on_write event callback picks up the error. That, in turn, should result 1647 * in stream being closed. 1648 * 1649 * If we have received any data frames on this stream, this probably indicates 1650 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1651 * lower than this. However, we still try to handle it gracefully and peform 1652 * a shutdown, as if the stream was not reset. 1653 */ 1654void 1655lsquic_stream_received_goaway (lsquic_stream_t *stream) 1656{ 1657 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1658 if (0 == stream->read_offset && 1659 stream->data_in->di_if->di_empty(stream->data_in)) 1660 fake_reset_unused_stream(stream); /* Normal condition */ 1661 else 1662 { /* This is odd, let's handle it the best we can: */ 1663 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1664 lsquic_stream_shutdown_internal(stream); 1665 } 1666} 1667 1668 1669uint64_t 1670lsquic_stream_read_offset (const lsquic_stream_t *stream) 1671{ 1672 return stream->read_offset; 1673} 1674 1675 1676static int 1677stream_wantread (lsquic_stream_t *stream, int is_want) 1678{ 1679 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 1680 const int new_val = !!is_want; 1681 if (old_val != new_val) 1682 { 1683 if (new_val) 1684 { 1685 if (!old_val) 1686 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1687 next_read_stream); 1688 stream->sm_qflags |= SMQF_WANT_READ; 1689 } 1690 else 1691 { 1692 stream->sm_qflags &= ~SMQF_WANT_READ; 1693 if (old_val) 1694 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1695 next_read_stream); 1696 } 1697 } 1698 return old_val; 1699} 1700 1701 1702static void 1703maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1704{ 1705 assert(SMQF_WRITE_Q_FLAGS & flag); 1706 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1707 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1708 next_write_stream); 1709 stream->sm_qflags |= flag; 1710} 1711 1712 1713static void 1714maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1715{ 1716 assert(SMQF_WRITE_Q_FLAGS & flag); 1717 if (stream->sm_qflags & flag) 1718 { 1719 stream->sm_qflags &= ~flag; 1720 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1721 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1722 next_write_stream); 1723 } 1724} 1725 1726 1727static int 1728stream_wantwrite (struct lsquic_stream *stream, int new_val) 1729{ 1730 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1731 1732 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 1733 1734 if (old_val != new_val) 1735 { 1736 if (new_val) 1737 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 1738 else 1739 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 1740 } 1741 return old_val; 1742} 1743 1744 1745int 1746lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1747{ 1748 SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want); 1749 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1750 { 1751 if (is_want) 1752 maybe_conn_to_tickable_if_readable(stream); 1753 return stream_wantread(stream, is_want); 1754 } 1755 else 1756 { 1757 errno = EBADF; 1758 return -1; 1759 } 1760} 1761 1762 1763int 1764lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1765{ 1766 int old_val; 1767 1768 is_want = !!is_want; 1769 1770 SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want); 1771 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 1772 && SSHS_BEGIN == stream->sm_send_headers_state) 1773 { 1774 stream->sm_saved_want_write = is_want; 1775 if (is_want) 1776 maybe_conn_to_tickable_if_writeable(stream, 1); 1777 return stream_wantwrite(stream, is_want); 1778 } 1779 else if (SSHS_BEGIN != stream->sm_send_headers_state) 1780 { 1781 old_val = stream->sm_saved_want_write; 1782 stream->sm_saved_want_write = is_want; 1783 return old_val; 1784 } 1785 else 1786 { 1787 errno = EBADF; 1788 return -1; 1789 } 1790} 1791 1792 1793struct progress 1794{ 1795 enum stream_flags s_flags; 1796 enum stream_q_flags q_flags; 1797}; 1798 1799 1800static struct progress 1801stream_progress (const struct lsquic_stream *stream) 1802{ 1803 return (struct progress) { 1804 .s_flags = stream->stream_flags 1805 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 1806 .q_flags = stream->sm_qflags 1807 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 1808 }; 1809} 1810 1811 1812static int 1813progress_eq (struct progress a, struct progress b) 1814{ 1815 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 1816} 1817 1818 1819static void 1820stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1821{ 1822 unsigned no_progress_count, no_progress_limit; 1823 struct progress progress; 1824 uint64_t size; 1825 1826 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1827 1828 no_progress_count = 0; 1829 while ((stream->sm_qflags & SMQF_WANT_READ) 1830 && lsquic_stream_readable(stream)) 1831 { 1832 progress = stream_progress(stream); 1833 size = stream->read_offset; 1834 1835 stream->stream_if->on_read(stream, stream->st_ctx); 1836 1837 if (no_progress_limit && size == stream->read_offset && 1838 progress_eq(progress, stream_progress(stream))) 1839 { 1840 ++no_progress_count; 1841 if (no_progress_count >= no_progress_limit) 1842 { 1843 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1844 "progress) in user code reading from stream", 1845 no_progress_count, 1846 no_progress_count == 1 ? "" : "s"); 1847 break; 1848 } 1849 } 1850 else 1851 no_progress_count = 0; 1852 } 1853} 1854 1855 1856static void 1857stream_hblock_sent (struct lsquic_stream *stream) 1858{ 1859 int want_write; 1860 1861 LSQ_DEBUG("header block has been sent: restore default behavior"); 1862 stream->sm_send_headers_state = SSHS_BEGIN; 1863 stream->sm_write_avail = stream_write_avail_with_frames; 1864 1865 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1866 if (want_write != stream->sm_saved_want_write) 1867 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 1868 1869 if (stream->stream_flags & STREAM_DELAYED_SW) 1870 { 1871 LSQ_DEBUG("performing delayed shutdown write"); 1872 stream->stream_flags &= ~STREAM_DELAYED_SW; 1873 stream_shutdown_write(stream); 1874 maybe_schedule_call_on_close(stream); 1875 maybe_finish_stream(stream); 1876 maybe_conn_to_tickable_if_writeable(stream, 1); 1877 } 1878} 1879 1880 1881static void 1882on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 1883{ 1884 ssize_t nw; 1885 1886 nw = stream_write_buf(stream, 1887 stream->sm_header_block + stream->sm_hblock_off, 1888 stream->sm_hblock_sz - stream->sm_hblock_off); 1889 if (nw > 0) 1890 { 1891 stream->sm_hblock_off += nw; 1892 if (stream->sm_hblock_off == stream->sm_hblock_sz) 1893 { 1894 stream->stream_flags |= STREAM_HEADERS_SENT; 1895 free(stream->sm_header_block); 1896 stream->sm_header_block = NULL; 1897 stream->sm_hblock_sz = 0; 1898 stream_hblock_sent(stream); 1899 LSQ_DEBUG("header block written out successfully"); 1900 /* TODO: if there was eos, do something else */ 1901 if (stream->sm_qflags & SMQF_WANT_WRITE) 1902 stream->stream_if->on_write(stream, h); 1903 } 1904 else 1905 { 1906 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 1907 nw); 1908 } 1909 } 1910 else if (nw < 0) 1911 { 1912 /* XXX What should happen if we hit an error? TODO */ 1913 } 1914} 1915 1916 1917static void 1918(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 1919 lsquic_stream_ctx_t *) 1920{ 1921 if (0 == (stream->stream_flags & STREAM_PUSHING) 1922 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 1923 /* Common case */ 1924 return stream->stream_if->on_write; 1925 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 1926 return on_write_header_wrapper; 1927 else 1928 { 1929 assert(stream->stream_flags & STREAM_PUSHING); 1930 if (stream_is_pushing_promise(stream)) 1931 return on_write_pp_wrapper; 1932 else if (stream->sm_dup_push_off < stream->sm_dup_push_len) 1933 return on_write_dp_wrapper; 1934 else 1935 return stream->stream_if->on_write; 1936 } 1937} 1938 1939 1940static void 1941stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1942{ 1943 unsigned no_progress_count, no_progress_limit; 1944 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1945 struct progress progress; 1946 1947 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1948 1949 no_progress_count = 0; 1950 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1951 while ((stream->sm_qflags & SMQF_WANT_WRITE) 1952 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 1953 && lsquic_stream_write_avail(stream)) 1954 { 1955 progress = stream_progress(stream); 1956 1957 on_write = select_on_write(stream); 1958 on_write(stream, stream->st_ctx); 1959 1960 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 1961 { 1962 ++no_progress_count; 1963 if (no_progress_count >= no_progress_limit) 1964 { 1965 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1966 "progress) in user code writing to stream", 1967 no_progress_count, 1968 no_progress_count == 1 ? "" : "s"); 1969 break; 1970 } 1971 } 1972 else 1973 no_progress_count = 0; 1974 } 1975} 1976 1977 1978static void 1979stream_dispatch_read_events_once (lsquic_stream_t *stream) 1980{ 1981 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 1982 { 1983 stream->stream_if->on_read(stream, stream->st_ctx); 1984 } 1985} 1986 1987 1988uint64_t 1989lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 1990{ 1991 size_t frames_sizes; 1992 1993 frames_sizes = active_hq_frame_sizes(stream); 1994 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 1995} 1996 1997 1998static void 1999maybe_mark_as_blocked (lsquic_stream_t *stream) 2000{ 2001 struct lsquic_conn_cap *cc; 2002 uint64_t used; 2003 2004 used = lsquic_stream_combined_send_off(stream); 2005 if (stream->max_send_off == used) 2006 { 2007 if (stream->blocked_off < stream->max_send_off) 2008 { 2009 stream->blocked_off = used; 2010 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 2011 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2012 next_send_stream); 2013 stream->sm_qflags |= SMQF_SEND_BLOCKED; 2014 LSQ_DEBUG("marked stream-blocked at stream offset " 2015 "%"PRIu64, stream->blocked_off); 2016 } 2017 else 2018 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 2019 " has been, or is about to be, sent", stream->blocked_off); 2020 } 2021 2022 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 2023 && (cc = &stream->conn_pub->conn_cap, 2024 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 2025 { 2026 if (cc->cc_blocked < cc->cc_max) 2027 { 2028 cc->cc_blocked = cc->cc_max; 2029 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 2030 LSQ_DEBUG("marked connection-blocked at connection offset " 2031 "%"PRIu64, cc->cc_max); 2032 } 2033 else 2034 LSQ_DEBUG("stream has already been marked connection-blocked " 2035 "at offset %"PRIu64, cc->cc_blocked); 2036 } 2037} 2038 2039 2040void 2041lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 2042{ 2043 assert(stream->sm_qflags & SMQF_WANT_READ); 2044 2045 if (stream->sm_bflags & SMBF_RW_ONCE) 2046 stream_dispatch_read_events_once(stream); 2047 else 2048 stream_dispatch_read_events_loop(stream); 2049} 2050 2051 2052void 2053lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 2054{ 2055 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2056 int progress; 2057 uint64_t tosend_off; 2058 unsigned short n_buffered; 2059 enum stream_q_flags q_flags; 2060 2061 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 2062 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 2063 tosend_off = stream->tosend_off; 2064 n_buffered = stream->sm_n_buffered; 2065 2066 if (stream->sm_qflags & SMQF_WANT_FLUSH) 2067 (void) stream_flush(stream); 2068 2069 if (stream->sm_bflags & SMBF_RW_ONCE) 2070 { 2071 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2072 && lsquic_stream_write_avail(stream)) 2073 { 2074 on_write = select_on_write(stream); 2075 on_write(stream, stream->st_ctx); 2076 } 2077 } 2078 else 2079 stream_dispatch_write_events_loop(stream); 2080 2081 /* Progress means either flags or offsets changed: */ 2082 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2083 stream->tosend_off == tosend_off && 2084 stream->sm_n_buffered == n_buffered); 2085 2086 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2087 { 2088 if (progress) 2089 { /* Move the stream to the end of the list to ensure fairness. */ 2090 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2091 next_write_stream); 2092 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2093 next_write_stream); 2094 } 2095 } 2096} 2097 2098 2099static size_t 2100inner_reader_empty_size (void *ctx) 2101{ 2102 return 0; 2103} 2104 2105 2106static size_t 2107inner_reader_empty_read (void *ctx, void *buf, size_t count) 2108{ 2109 return 0; 2110} 2111 2112 2113static int 2114stream_flush (lsquic_stream_t *stream) 2115{ 2116 struct lsquic_reader empty_reader; 2117 ssize_t nw; 2118 2119 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2120 assert(stream->sm_n_buffered > 0 || 2121 /* Flushing is also used to packetize standalone FIN: */ 2122 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2123 == STREAM_U_WRITE_DONE)); 2124 2125 empty_reader.lsqr_size = inner_reader_empty_size; 2126 empty_reader.lsqr_read = inner_reader_empty_read; 2127 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2128 nw = stream_write_to_packets(stream, &empty_reader, 0); 2129 2130 if (nw >= 0) 2131 { 2132 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2133 return 0; 2134 } 2135 else 2136 return -1; 2137} 2138 2139 2140static int 2141stream_flush_nocheck (lsquic_stream_t *stream) 2142{ 2143 size_t frames; 2144 2145 frames = active_hq_frame_sizes(stream); 2146 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2147 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2148 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2149 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2150 2151 return stream_flush(stream); 2152} 2153 2154 2155int 2156lsquic_stream_flush (lsquic_stream_t *stream) 2157{ 2158 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2159 { 2160 LSQ_DEBUG("cannot flush closed stream"); 2161 errno = EBADF; 2162 return -1; 2163 } 2164 2165 if (0 == stream->sm_n_buffered) 2166 { 2167 LSQ_DEBUG("flushing 0 bytes: noop"); 2168 return 0; 2169 } 2170 2171 return stream_flush_nocheck(stream); 2172} 2173 2174 2175static size_t 2176stream_get_n_allowed (const struct lsquic_stream *stream) 2177{ 2178 if (stream->sm_n_allocated) 2179 return stream->sm_n_allocated; 2180 else 2181 return stream->conn_pub->path->np_pack_size; 2182} 2183 2184 2185/* The flush threshold is the maximum size of stream data that can be sent 2186 * in a full packet. 2187 */ 2188#ifdef NDEBUG 2189static 2190#endif 2191 size_t 2192lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2193 unsigned data_sz) 2194{ 2195 enum packet_out_flags flags; 2196 enum packno_bits bits; 2197 size_t packet_header_sz, stream_header_sz, tag_len; 2198 size_t threshold; 2199 2200 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 2201 flags = bits << POBIT_SHIFT; 2202 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2203 flags |= PO_CONN_ID; 2204 if (stream_is_hsk(stream)) 2205 flags |= PO_LONGHEAD; 2206 2207 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2208 stream->conn_pub->path->np_dcid.len); 2209 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2210 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2211 2212 threshold = stream_get_n_allowed(stream) - tag_len 2213 - packet_header_sz - stream_header_sz; 2214 return threshold; 2215} 2216 2217 2218#define COMMON_WRITE_CHECKS() do { \ 2219 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2220 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2221 { \ 2222 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2223 { \ 2224 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2225 return 0; \ 2226 } \ 2227 else \ 2228 { \ 2229 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2230 "headers"); \ 2231 errno = EILSEQ; \ 2232 return -1; \ 2233 } \ 2234 } \ 2235 if (lsquic_stream_is_reset(stream)) \ 2236 { \ 2237 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2238 errno = ECONNRESET; \ 2239 return -1; \ 2240 } \ 2241 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2242 { \ 2243 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2244 "writing"); \ 2245 errno = EBADF; \ 2246 return -1; \ 2247 } \ 2248} while (0) 2249 2250 2251struct frame_gen_ctx 2252{ 2253 lsquic_stream_t *fgc_stream; 2254 struct lsquic_reader *fgc_reader; 2255 /* We keep our own count of how many bytes were read from reader because 2256 * some readers are external. The external caller does not have to rely 2257 * on our count, but it can. 2258 */ 2259 size_t fgc_nread_from_reader; 2260 size_t (*fgc_size) (void *ctx); 2261 int (*fgc_fin) (void *ctx); 2262 gsf_read_f fgc_read; 2263}; 2264 2265 2266static size_t 2267frame_std_gen_size (void *ctx) 2268{ 2269 struct frame_gen_ctx *fg_ctx = ctx; 2270 size_t available, remaining; 2271 2272 /* Make sure we are not writing past available size: */ 2273 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2274 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2275 if (available < remaining) 2276 remaining = available; 2277 2278 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2279} 2280 2281 2282static size_t 2283stream_hq_frame_size (const struct stream_hq_frame *shf) 2284{ 2285 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2286 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2287 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2288 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2289 else 2290 { 2291 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2292 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2293 return 0; 2294 } 2295} 2296 2297 2298static size_t 2299active_hq_frame_sizes (const struct lsquic_stream *stream) 2300{ 2301 const struct stream_hq_frame *shf; 2302 size_t size; 2303 2304 size = 0; 2305 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2306 == (SMBF_IETF|SMBF_USE_HEADERS)) 2307 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2308 if (!(shf->shf_flags & SHF_WRITTEN)) 2309 size += stream_hq_frame_size(shf); 2310 2311 return size; 2312} 2313 2314 2315static uint64_t 2316stream_hq_frame_end (const struct stream_hq_frame *shf) 2317{ 2318 if (shf->shf_flags & SHF_FIXED_SIZE) 2319 return shf->shf_off + shf->shf_frame_size; 2320 else if (shf->shf_flags & SHF_TWO_BYTES) 2321 return shf->shf_off + ((1 << 14) - 1); 2322 else 2323 return shf->shf_off + ((1 << 6) - 1); 2324} 2325 2326 2327static int 2328frame_in_stream (const struct lsquic_stream *stream, 2329 const struct stream_hq_frame *shf) 2330{ 2331 return shf >= stream->sm_hq_frame_arr 2332 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2333 / sizeof(stream->sm_hq_frame_arr[0]) 2334 ; 2335} 2336 2337 2338static void 2339stream_hq_frame_put (struct lsquic_stream *stream, 2340 struct stream_hq_frame *shf) 2341{ 2342 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2343 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2344 if (frame_in_stream(stream, shf)) 2345 memset(shf, 0, sizeof(*shf)); 2346 else 2347 lsquic_malo_put(shf); 2348} 2349 2350 2351static void 2352stream_hq_frame_close (struct lsquic_stream *stream, 2353 struct stream_hq_frame *shf) 2354{ 2355 unsigned bits; 2356 2357 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2358 " (actual offset %"PRIu64")", shf->shf_frame_type, 2359 stream->sm_payload, stream->tosend_off); 2360 assert(shf->shf_flags & SHF_ACTIVE); 2361 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2362 { 2363 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2364 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2365 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2366 bits, 1 << bits); 2367 } 2368 stream_hq_frame_put(stream, shf); 2369} 2370 2371 2372static size_t 2373frame_hq_gen_size (void *ctx) 2374{ 2375 struct frame_gen_ctx *fg_ctx = ctx; 2376 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2377 size_t available, remaining, frames; 2378 const struct stream_hq_frame *shf; 2379 2380 frames = 0; 2381 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2382 if (shf->shf_off >= stream->sm_payload) 2383 frames += stream_hq_frame_size(shf); 2384 2385 /* Make sure we are not writing past available size: */ 2386 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2387 available = lsquic_stream_write_avail(stream); 2388 if (available < remaining) 2389 remaining = available; 2390 2391 return remaining + stream->sm_n_buffered + frames; 2392} 2393 2394 2395static int 2396frame_std_gen_fin (void *ctx) 2397{ 2398 struct frame_gen_ctx *fg_ctx = ctx; 2399 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2400 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2401 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2402 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2403 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2404} 2405 2406 2407static void 2408incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2409{ 2410 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2411 { 2412 stream->conn_pub->conn_cap.cc_sent += incr; 2413 assert(stream->conn_pub->conn_cap.cc_sent 2414 <= stream->conn_pub->conn_cap.cc_max); 2415 } 2416} 2417 2418 2419void 2420incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2421{ 2422 stream->sm_payload += incr; 2423 stream->tosend_off += incr; 2424 assert(stream->tosend_off <= stream->max_send_off); 2425} 2426 2427 2428static size_t 2429frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2430{ 2431 struct frame_gen_ctx *fg_ctx = ctx; 2432 unsigned char *p = begin_buf; 2433 unsigned char *const end = p + len; 2434 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2435 size_t n_written, available, n_to_write; 2436 2437 if (stream->sm_n_buffered > 0) 2438 { 2439 if (len <= stream->sm_n_buffered) 2440 { 2441 memcpy(p, stream->sm_buf, len); 2442 memmove(stream->sm_buf, stream->sm_buf + len, 2443 stream->sm_n_buffered - len); 2444 stream->sm_n_buffered -= len; 2445 if (0 == stream->sm_n_buffered) 2446 maybe_resize_stream_buffer(stream); 2447 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2448 incr_sm_payload(stream, len); 2449 *fin = fg_ctx->fgc_fin(fg_ctx); 2450 return len; 2451 } 2452 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2453 p += stream->sm_n_buffered; 2454 stream->sm_n_buffered = 0; 2455 maybe_resize_stream_buffer(stream); 2456 } 2457 2458 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2459 n_to_write = end - p; 2460 if (n_to_write > available) 2461 n_to_write = available; 2462 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2463 n_to_write); 2464 p += n_written; 2465 fg_ctx->fgc_nread_from_reader += n_written; 2466 *fin = fg_ctx->fgc_fin(fg_ctx); 2467 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2468 incr_conn_cap(stream, n_written); 2469 return p - (const unsigned char *) begin_buf; 2470} 2471 2472 2473static struct stream_hq_frame * 2474find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2475{ 2476 struct stream_hq_frame *shf; 2477 2478 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2479 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2480 return shf; 2481 2482 return NULL; 2483} 2484 2485 2486static struct stream_hq_frame * 2487find_cur_hq_frame (const struct lsquic_stream *stream) 2488{ 2489 return find_hq_frame(stream, stream->sm_payload); 2490} 2491 2492 2493static struct stream_hq_frame * 2494open_hq_frame (struct lsquic_stream *stream) 2495{ 2496 struct stream_hq_frame *shf; 2497 2498 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2499 + sizeof(stream->sm_hq_frame_arr) 2500 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2501 if (!(shf->shf_flags & SHF_ACTIVE)) 2502 goto found; 2503 2504 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2505 if (!shf) 2506 { 2507 LSQ_WARN("cannot allocate HQ frame"); 2508 return NULL; 2509 } 2510 memset(shf, 0, sizeof(*shf)); 2511 2512 found: 2513 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2514 shf->shf_flags = SHF_ACTIVE; 2515 return shf; 2516} 2517 2518 2519/* Returns index of the new frame */ 2520static struct stream_hq_frame * 2521stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2522 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2523{ 2524 struct stream_hq_frame *shf; 2525 2526 shf = open_hq_frame(stream); 2527 if (!shf) 2528 { 2529 LSQ_WARN("could not open HQ frame"); 2530 return NULL; 2531 } 2532 2533 shf->shf_off = off; 2534 shf->shf_flags |= flags; 2535 shf->shf_frame_type = frame_type; 2536 if (shf->shf_flags & SHF_FIXED_SIZE) 2537 { 2538 shf->shf_frame_size = size; 2539 LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset " 2540 "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size); 2541 } 2542 else 2543 { 2544 shf->shf_frame_ptr = NULL; 2545 if (size >= (1 << 6)) 2546 shf->shf_flags |= SHF_TWO_BYTES; 2547 LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset " 2548 "%"PRIu64, shf->shf_frame_type, shf->shf_off); 2549 } 2550 2551 return shf; 2552} 2553 2554 2555static size_t 2556frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2557{ 2558 struct frame_gen_ctx *fg_ctx = ctx; 2559 unsigned char *p = begin_buf; 2560 unsigned char *const end = p + len; 2561 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2562 struct stream_hq_frame *shf; 2563 size_t nw, frame_sz, avail, rem; 2564 unsigned bits; 2565 2566 while (p < end) 2567 { 2568 shf = find_cur_hq_frame(stream); 2569 if (shf) 2570 LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64, 2571 shf->shf_frame_type, shf->shf_off); 2572 else 2573 { 2574 rem = frame_std_gen_size(ctx); 2575 if (rem) 2576 { 2577 if (rem > ((1 << 14) - 1)) 2578 rem = (1 << 14) - 1; 2579 shf = stream_activate_hq_frame(stream, 2580 stream->sm_payload, HQFT_DATA, 0, rem); 2581 if (!shf) 2582 { 2583 /* TODO: abort connection? Handle failure somehow */ 2584 break; 2585 } 2586 } 2587 else 2588 break; 2589 } 2590 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 2591 if (shf->shf_off == stream->sm_payload 2592 && !(shf->shf_flags & SHF_WRITTEN)) 2593 { 2594 frame_sz = stream_hq_frame_size(shf); 2595 if (frame_sz > (uintptr_t) (end - p)) 2596 { 2597 stream_hq_frame_put(stream, shf); 2598 break; 2599 } 2600 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2601 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2602 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2603 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2604 { 2605 shf->shf_frame_ptr = p; 2606 memset(p, 0, frame_sz); 2607 p += frame_sz; 2608 } 2609 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2610 == SHF_FIXED_SIZE) 2611 { 2612 *p++ = shf->shf_frame_type; 2613 bits = vint_val2bits(shf->shf_frame_size); 2614 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2615 p += 1 << bits; 2616 } 2617 else 2618 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2619 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2620 if (!(shf->shf_flags & SHF_CC_PAID)) 2621 { 2622 incr_conn_cap(stream, frame_sz); 2623 shf->shf_flags |= SHF_CC_PAID; 2624 } 2625 shf->shf_flags |= SHF_WRITTEN; 2626 stream->tosend_off += frame_sz; 2627 assert(stream->tosend_off <= stream->max_send_off); 2628 } 2629 else 2630 { 2631 len = stream_hq_frame_end(shf) - stream->sm_payload; 2632 assert(len); 2633 if (len > (unsigned) (end - p)) 2634 len = end - p; 2635 if (len > avail) 2636 len = avail; 2637 if (!len) 2638 break; 2639 nw = frame_std_gen_read(ctx, p, len, fin); 2640 p += nw; 2641 if (nw < len) 2642 break; 2643 if (stream_hq_frame_end(shf) == stream->sm_payload) 2644 stream_hq_frame_close(stream, shf); 2645 } 2646 } 2647 2648 return p - (unsigned char *) begin_buf; 2649} 2650 2651 2652static size_t 2653crypto_frame_gen_read (void *ctx, void *buf, size_t len) 2654{ 2655 int fin_ignored; 2656 2657 return frame_std_gen_read(ctx, buf, len, &fin_ignored); 2658} 2659 2660 2661static void 2662check_flush_threshold (lsquic_stream_t *stream) 2663{ 2664 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 2665 stream->tosend_off >= stream->sm_flush_to) 2666 { 2667 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 2668 stream->sm_flush_to); 2669 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 2670 } 2671} 2672 2673 2674static int 2675write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 2676 struct lsquic_packet_out *packet_out) 2677{ 2678 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2679 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2680 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2681 unsigned off; 2682 int len, s; 2683 2684#if LSQUIC_CONN_STATS 2685 const uint64_t begin_off = stream->tosend_off; 2686#endif 2687 off = packet_out->po_data_sz; 2688 len = pf->pf_gen_stream_frame( 2689 packet_out->po_data + packet_out->po_data_sz, 2690 lsquic_packet_out_avail(packet_out), stream->id, 2691 stream->tosend_off, 2692 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 2693 if (len < 0) 2694 return len; 2695 2696#if LSQUIC_CONN_STATS 2697 stream->conn_pub->conn_stats->out.stream_frames += 1; 2698 stream->conn_pub->conn_stats->out.stream_data_sz 2699 += stream->tosend_off - begin_off; 2700#endif 2701 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 2702 packet_out->po_data + packet_out->po_data_sz, len); 2703 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2704 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 2705 if (0 == lsquic_packet_out_avail(packet_out)) 2706 packet_out->po_flags |= PO_STREAM_END; 2707 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2708 stream, QUIC_FRAME_STREAM, off, len); 2709 if (s != 0) 2710 { 2711 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 2712 return -1; 2713 } 2714 2715 check_flush_threshold(stream); 2716 return len; 2717} 2718 2719 2720static enum swtp_status 2721stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 2722{ 2723 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2724 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2725 struct lsquic_packet_out *packet_out; 2726 int len; 2727 2728 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 2729 stream->conn_pub->path); 2730 if (!packet_out) 2731 return SWTP_STOP; 2732 packet_out->po_header_type = stream->tosend_off == 0 2733 ? HETY_INITIAL : HETY_HANDSHAKE; 2734 2735 len = write_stream_frame(fg_ctx, size, packet_out); 2736 2737 if (len > 0) 2738 { 2739 packet_out->po_flags |= PO_HELLO; 2740 lsquic_packet_out_zero_pad(packet_out); 2741 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 2742 return SWTP_OK; 2743 } 2744 else 2745 return SWTP_ERROR; 2746} 2747 2748 2749static enum swtp_status 2750stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 2751{ 2752 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2753 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2754 unsigned stream_header_sz, need_at_least; 2755 struct lsquic_packet_out *packet_out; 2756 struct lsquic_stream *headers_stream; 2757 int len; 2758 2759 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 2760 == STREAM_HEADERS_SENT) 2761 { 2762 if (stream->sm_bflags & SMBF_IETF) 2763 { 2764 if (stream->stream_flags & STREAM_ENCODER_DEP) 2765 headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out; 2766 else 2767 headers_stream = NULL; 2768 } 2769 else 2770 headers_stream = 2771 lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 2772 if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream)) 2773 { 2774 LSQ_DEBUG("flushing headers stream before packetizing stream data"); 2775 (void) lsquic_stream_flush(headers_stream); 2776 } 2777 /* If there is nothing to flush, some other stream must have flushed it: 2778 * this means our headers are flushed. Either way, only do this once. 2779 */ 2780 stream->stream_flags |= STREAM_HDRS_FLUSHED; 2781 } 2782 2783 stream_header_sz = stream->sm_frame_header_sz(stream, size); 2784 need_at_least = stream_header_sz; 2785 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2786 == (SMBF_IETF|SMBF_USE_HEADERS)) 2787 { 2788 if (size > 0) 2789 need_at_least += 3; /* Enough room for HTTP/3 frame */ 2790 } 2791 else 2792 need_at_least += size > 0; 2793 get_packet: 2794 packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl, 2795 need_at_least, stream->conn_pub->path, stream); 2796 if (packet_out) 2797 { 2798 len = write_stream_frame(fg_ctx, size, packet_out); 2799 if (len > 0) 2800 return SWTP_OK; 2801 assert(len < 0); 2802 if (-len > (int) need_at_least) 2803 { 2804 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 2805 "%u bytes, will try again", -len, need_at_least); 2806 need_at_least = -len; 2807 goto get_packet; 2808 } 2809 return SWTP_ERROR; 2810 } 2811 else 2812 return SWTP_STOP; 2813} 2814 2815 2816static enum swtp_status 2817stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 2818{ 2819 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2820 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2821 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2822 unsigned crypto_header_sz, need_at_least; 2823 struct lsquic_packet_out *packet_out; 2824 unsigned short off; 2825 const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ]; 2826 int len, s; 2827 2828 assert(size > 0); 2829 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 2830 need_at_least = crypto_header_sz + 1; 2831 2832 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 2833 need_at_least, pns, stream->conn_pub->path); 2834 if (!packet_out) 2835 return SWTP_STOP; 2836 2837 off = packet_out->po_data_sz; 2838 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 2839 lsquic_packet_out_avail(packet_out), stream->tosend_off, 2840 size, crypto_frame_gen_read, fg_ctx); 2841 if (len < 0) 2842 return len; 2843 2844 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 2845 packet_out->po_data + packet_out->po_data_sz, len); 2846 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2847 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 2848 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2849 stream, QUIC_FRAME_CRYPTO, off, len); 2850 if (s != 0) 2851 { 2852 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 2853 return -1; 2854 } 2855 2856 packet_out->po_flags |= PO_HELLO; 2857 2858 check_flush_threshold(stream); 2859 return SWTP_OK; 2860} 2861 2862 2863static void 2864abort_connection (struct lsquic_stream *stream) 2865{ 2866 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 2867 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 2868 next_service_stream); 2869 stream->sm_qflags |= SMQF_ABORT_CONN; 2870 LSQ_WARN("connection will be aborted"); 2871 maybe_conn_to_tickable(stream); 2872} 2873 2874 2875static void 2876maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 2877{ 2878 struct stream_hq_frame *shf; 2879 uint64_t size; 2880 unsigned bits; 2881 2882 shf = find_cur_hq_frame(stream); 2883 if (!shf) 2884 return; 2885 2886 if (shf->shf_flags & SHF_FIXED_SIZE) 2887 { 2888 if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload) 2889 stream_hq_frame_put(stream, shf); 2890 return; 2891 } 2892 2893 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2894 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 2895 if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr) 2896 { 2897 if (0 == stream->sm_n_buffered) 2898 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 2899 shf->shf_frame_type, size); 2900 else 2901 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 2902 shf->shf_frame_type, size); 2903 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2904 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 2905 if (0 == stream->sm_n_buffered) 2906 stream_hq_frame_put(stream, shf); 2907 else 2908 { 2909 shf->shf_frame_size = size; 2910 shf->shf_flags |= SHF_FIXED_SIZE; 2911 } 2912 } 2913 else if (!shf->shf_frame_ptr) 2914 { 2915 LSQ_WARN("dangling HTTP/3 frame"); 2916 stream->conn_pub->lconn->cn_if->ci_internal_error( 2917 stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64, 2918 stream->id); 2919 stream_hq_frame_put(stream, shf); 2920 } 2921 else if (!size) 2922 { 2923 assert(!shf->shf_frame_ptr); 2924 LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")", 2925 shf->shf_frame_type, shf->shf_off); 2926 stream_hq_frame_put(stream, shf); 2927 } 2928 else 2929 { 2930 assert(stream->sm_n_buffered); 2931 LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size); 2932 /* TODO: abort connection */ 2933 stream_hq_frame_put(stream, shf); 2934 } 2935} 2936 2937 2938static ssize_t 2939stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 2940 size_t thresh) 2941{ 2942 size_t size; 2943 ssize_t nw; 2944 unsigned seen_ok; 2945 int use_framing; 2946 struct frame_gen_ctx fg_ctx = { 2947 .fgc_stream = stream, 2948 .fgc_reader = reader, 2949 .fgc_nread_from_reader = 0, 2950 }; 2951 2952 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2953 == (SMBF_IETF|SMBF_USE_HEADERS); 2954 if (use_framing) 2955 { 2956 fg_ctx.fgc_size = frame_hq_gen_size; 2957 fg_ctx.fgc_read = frame_hq_gen_read; 2958 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 2959 } 2960 else 2961 { 2962 fg_ctx.fgc_size = frame_std_gen_size; 2963 fg_ctx.fgc_read = frame_std_gen_read; 2964 fg_ctx.fgc_fin = frame_std_gen_fin; 2965 } 2966 2967 seen_ok = 0; 2968 while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0) 2969 || fg_ctx.fgc_fin(&fg_ctx)) 2970 { 2971 switch (stream->sm_write_to_packet(&fg_ctx, size)) 2972 { 2973 case SWTP_OK: 2974 if (!seen_ok++) 2975 maybe_conn_to_tickable_if_writeable(stream, 0); 2976 if (fg_ctx.fgc_fin(&fg_ctx)) 2977 { 2978 if (use_framing && seen_ok) 2979 maybe_close_varsize_hq_frame(stream); 2980 stream->stream_flags |= STREAM_FIN_SENT; 2981 goto end; 2982 } 2983 else 2984 break; 2985 case SWTP_STOP: 2986 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2987 if (use_framing && seen_ok) 2988 maybe_close_varsize_hq_frame(stream); 2989 goto end; 2990 default: 2991 abort_connection(stream); 2992 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2993 return -1; 2994 } 2995 } 2996 2997 if (use_framing && seen_ok) 2998 maybe_close_varsize_hq_frame(stream); 2999 3000 if (thresh) 3001 { 3002 assert(size < thresh); 3003 assert(size >= stream->sm_n_buffered); 3004 size -= stream->sm_n_buffered; 3005 if (size > 0) 3006 { 3007 nw = save_to_buffer(stream, reader, size); 3008 if (nw < 0) 3009 return -1; 3010 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 3011 } 3012 } 3013 else 3014 { 3015 /* We count flushed data towards both stream and connection limits, 3016 * so we should have been able to packetize all of it: 3017 */ 3018 assert(0 == stream->sm_n_buffered); 3019 assert(size == 0); 3020 } 3021 3022 maybe_mark_as_blocked(stream); 3023 3024 end: 3025 return fg_ctx.fgc_nread_from_reader; 3026} 3027 3028 3029/* Perform an implicit flush when we hit connection limit while buffering 3030 * data. This is to prevent a (theoretical) stall: 3031 * 3032 * Imagine a number of streams, all of which buffered some data. The buffered 3033 * data is up to connection cap, which means no further writes are possible. 3034 * None of them flushes, which means that data is not sent and connection 3035 * WINDOW_UPDATE frame never arrives from peer. Stall. 3036 */ 3037static int 3038maybe_flush_stream (struct lsquic_stream *stream) 3039{ 3040 if (stream->sm_n_buffered > 0 3041 && (stream->sm_bflags & SMBF_CONN_LIMITED) 3042 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 3043 return stream_flush_nocheck(stream); 3044 else 3045 return 0; 3046} 3047 3048 3049static int 3050stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 3051 unsigned len) 3052{ 3053 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 3054 && cur_off - shf->shf_off < (1 << 6) 3055 && cur_off - shf->shf_off + len >= (1 << 6) 3056 ; 3057} 3058 3059 3060/* Update currently buffered HQ frame or create a new one, if possible. 3061 * Return update length to be buffered. If a HQ frame cannot be 3062 * buffered due to size, 0 is returned, thereby preventing both HQ frame 3063 * creation and buffering. 3064 */ 3065static size_t 3066update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 3067 size_t avail) 3068{ 3069 struct stream_hq_frame *shf; 3070 uint64_t cur_off, end; 3071 size_t frame_sz; 3072 unsigned extendable; 3073 3074 cur_off = stream->sm_payload + stream->sm_n_buffered; 3075 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3076 if (shf->shf_off <= cur_off) 3077 { 3078 end = stream_hq_frame_end(shf); 3079 extendable = stream_hq_frame_extendable(shf, cur_off, len); 3080 if (cur_off < end + extendable) 3081 break; 3082 } 3083 3084 if (shf) 3085 { 3086 if (len > end + extendable - cur_off) 3087 len = end + extendable - cur_off; 3088 frame_sz = stream_hq_frame_size(shf); 3089 } 3090 else 3091 { 3092 assert(avail >= 3); 3093 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 3094 /* XXX malloc can fail */ 3095 if (len > stream_hq_frame_end(shf) - cur_off) 3096 len = stream_hq_frame_end(shf) - cur_off; 3097 extendable = 0; 3098 frame_sz = stream_hq_frame_size(shf); 3099 if (avail < frame_sz) 3100 return 0; 3101 avail -= frame_sz; 3102 } 3103 3104 if (!(shf->shf_flags & SHF_CC_PAID)) 3105 { 3106 incr_conn_cap(stream, frame_sz); 3107 shf->shf_flags |= SHF_CC_PAID; 3108 } 3109 if (extendable) 3110 { 3111 shf->shf_flags |= SHF_TWO_BYTES; 3112 incr_conn_cap(stream, 1); 3113 avail -= 1; 3114 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3115 && shf->shf_off <= stream->sm_payload 3116 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3117 stream->sm_flush_to += 1; 3118 } 3119 3120 if (len <= avail) 3121 return len; 3122 else 3123 return avail; 3124} 3125 3126 3127static ssize_t 3128save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3129 size_t len) 3130{ 3131 size_t avail, n_written, n_allowed; 3132 3133 avail = lsquic_stream_write_avail(stream); 3134 if (avail < len) 3135 len = avail; 3136 if (len == 0) 3137 { 3138 LSQ_DEBUG("zero-byte write (avail: %zu)", avail); 3139 return 0; 3140 } 3141 3142 n_allowed = stream_get_n_allowed(stream); 3143 assert(stream->sm_n_buffered + len <= n_allowed); 3144 3145 if (!stream->sm_buf) 3146 { 3147 stream->sm_buf = malloc(n_allowed); 3148 if (!stream->sm_buf) 3149 return -1; 3150 stream->sm_n_allocated = n_allowed; 3151 } 3152 3153 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3154 == (SMBF_IETF|SMBF_USE_HEADERS)) 3155 len = update_buffered_hq_frames(stream, len, avail); 3156 3157 n_written = reader->lsqr_read(reader->lsqr_ctx, 3158 stream->sm_buf + stream->sm_n_buffered, len); 3159 stream->sm_n_buffered += n_written; 3160 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3161 incr_conn_cap(stream, n_written); 3162 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3163 n_written, stream->sm_n_buffered); 3164 if (0 != maybe_flush_stream(stream)) 3165 return -1; 3166 return n_written; 3167} 3168 3169 3170static ssize_t 3171stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 3172{ 3173 const struct stream_hq_frame *shf; 3174 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3175 ssize_t nw; 3176 3177 len = reader->lsqr_size(reader->lsqr_ctx); 3178 if (len == 0) 3179 return 0; 3180 3181 frames = 0; 3182 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3183 == (SMBF_IETF|SMBF_USE_HEADERS)) 3184 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3185 if (shf->shf_off >= stream->sm_payload) 3186 frames += stream_hq_frame_size(shf); 3187 total_len = len + frames + stream->sm_n_buffered; 3188 thresh = lsquic_stream_flush_threshold(stream, total_len); 3189 n_allowed = stream_get_n_allowed(stream); 3190 if (total_len <= n_allowed && total_len < thresh) 3191 { 3192 nwritten = 0; 3193 do 3194 { 3195 nw = save_to_buffer(stream, reader, len - nwritten); 3196 if (nw > 0) 3197 nwritten += (size_t) nw; 3198 else if (nw == 0) 3199 break; 3200 else 3201 return nw; 3202 } 3203 while (nwritten < len 3204 && stream->sm_n_buffered < stream->sm_n_allocated); 3205 return nwritten; 3206 } 3207 else 3208 return stream_write_to_packets(stream, reader, thresh); 3209} 3210 3211 3212ssize_t 3213lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3214{ 3215 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3216 return lsquic_stream_writev(stream, &iov, 1); 3217} 3218 3219 3220struct inner_reader_iovec { 3221 const struct iovec *iov; 3222 const struct iovec *end; 3223 unsigned cur_iovec_off; 3224}; 3225 3226 3227static size_t 3228inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3229{ 3230 struct inner_reader_iovec *const iro = ctx; 3231 unsigned char *p = buf; 3232 unsigned char *const end = p + count; 3233 unsigned n_tocopy; 3234 3235 while (iro->iov < iro->end && p < end) 3236 { 3237 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3238 if (n_tocopy > (unsigned) (end - p)) 3239 n_tocopy = end - p; 3240 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3241 n_tocopy); 3242 p += n_tocopy; 3243 iro->cur_iovec_off += n_tocopy; 3244 if (iro->iov->iov_len == iro->cur_iovec_off) 3245 { 3246 ++iro->iov; 3247 iro->cur_iovec_off = 0; 3248 } 3249 } 3250 3251 return p + count - end; 3252} 3253 3254 3255static size_t 3256inner_reader_iovec_size (void *ctx) 3257{ 3258 struct inner_reader_iovec *const iro = ctx; 3259 const struct iovec *iov; 3260 size_t size; 3261 3262 size = 0; 3263 for (iov = iro->iov; iov < iro->end; ++iov) 3264 size += iov->iov_len; 3265 3266 return size - iro->cur_iovec_off; 3267} 3268 3269 3270ssize_t 3271lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3272 int iovcnt) 3273{ 3274 COMMON_WRITE_CHECKS(); 3275 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3276 3277 struct inner_reader_iovec iro = { 3278 .iov = iov, 3279 .end = iov + iovcnt, 3280 .cur_iovec_off = 0, 3281 }; 3282 struct lsquic_reader reader = { 3283 .lsqr_read = inner_reader_iovec_read, 3284 .lsqr_size = inner_reader_iovec_size, 3285 .lsqr_ctx = &iro, 3286 }; 3287 3288 return stream_write(stream, &reader); 3289} 3290 3291 3292ssize_t 3293lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3294{ 3295 COMMON_WRITE_CHECKS(); 3296 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3297 return stream_write(stream, reader); 3298} 3299 3300 3301/* This bypasses COMMON_WRITE_CHECKS */ 3302static ssize_t 3303stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3304{ 3305 struct iovec iov = { (void *) buf, sz, }; 3306 struct inner_reader_iovec iro = { 3307 .iov = &iov, 3308 .end = &iov + 1, 3309 .cur_iovec_off = 0, 3310 }; 3311 struct lsquic_reader reader = { 3312 .lsqr_read = inner_reader_iovec_read, 3313 .lsqr_size = inner_reader_iovec_size, 3314 .lsqr_ctx = &iro, 3315 }; 3316 return stream_write(stream, &reader); 3317} 3318 3319 3320/* XXX Move this define elsewhere? */ 3321#define MAX_HEADERS_SIZE (64 * 1024) 3322 3323static int 3324send_headers_ietf (struct lsquic_stream *stream, 3325 const struct lsquic_http_headers *headers, int eos) 3326{ 3327 enum qwh_status qwh; 3328 const size_t max_prefix_size = 3329 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 3330 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 3331 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 3332 unsigned bits; 3333 ssize_t nw; 3334 unsigned char *header_block; 3335 enum lsqpack_enc_header_flags hflags; 3336 unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE]; 3337 3338 stream->stream_flags &= ~STREAM_PUSHING; 3339 stream->stream_flags |= STREAM_NOPUSH; 3340 3341 /* TODO: Optimize for the common case: write directly to sm_buf and fall 3342 * back to a larger buffer if that fails. 3343 */ 3344 prefix_sz = max_prefix_size; 3345 headers_sz = sizeof(buf) - max_prefix_size - max_push_size; 3346 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 3347 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 3348 &headers_sz, &stream->sm_hb_compl, &hflags); 3349 3350 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 3351 { 3352 if (qwh == QWH_ENOBUF) 3353 LSQ_INFO("not enough room for header block"); 3354 else 3355 LSQ_WARN("internal error encoding and sending HTTP headers"); 3356 return -1; 3357 } 3358 3359 if (hflags & LSQECH_REF_NEW_ENTRIES) 3360 stream->stream_flags |= STREAM_ENCODER_DEP; 3361 3362 if (stream->sm_promise) 3363 { 3364 assert(lsquic_stream_is_pushed(stream)); 3365 bits = vint_val2bits(stream->sm_promise->pp_id); 3366 push_sz = 1 + (1 << bits); 3367 if (!stream_activate_hq_frame(stream, 3368 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 3369 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 3370 return -1; 3371 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 3372 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 3373 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 3374 } 3375 else 3376 push_sz = 0; 3377 3378 /* Construct contiguous header block buffer including HQ framing */ 3379 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 3380 hblock_sz = push_sz + prefix_sz + headers_sz; 3381 if (!stream_activate_hq_frame(stream, 3382 stream->sm_payload + stream->sm_n_buffered + push_sz, 3383 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 3384 return -1; 3385 3386 if (qwh == QWH_FULL) 3387 { 3388 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 3389 if (lsquic_stream_write_avail(stream)) 3390 { 3391 nw = stream_write_buf(stream, header_block, hblock_sz); 3392 if (nw < 0) 3393 { 3394 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 3395 return -1; 3396 } 3397 if ((size_t) nw == hblock_sz) 3398 { 3399 stream->stream_flags |= STREAM_HEADERS_SENT; 3400 stream_hblock_sent(stream); 3401 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 3402 return 0; 3403 } 3404 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 3405 } 3406 else 3407 { 3408 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 3409 "header block", hblock_sz); 3410 nw = 0; 3411 } 3412 } 3413 else 3414 { 3415 stream->sm_send_headers_state = SSHS_ENC_SENDING; 3416 nw = 0; 3417 } 3418 3419 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 3420 stream_wantwrite(stream, 1); 3421 3422 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 3423 if (!stream->sm_header_block) 3424 { 3425 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 3426 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 3427 return -1; 3428 } 3429 memcpy(stream->sm_header_block, header_block + (size_t) nw, 3430 hblock_sz - (size_t) nw); 3431 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 3432 stream->sm_hblock_off = 0; 3433 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 3434 return 0; 3435} 3436 3437 3438static int 3439send_headers_gquic (struct lsquic_stream *stream, 3440 const struct lsquic_http_headers *headers, int eos) 3441{ 3442 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 3443 stream->id, headers, eos, lsquic_stream_priority(stream)); 3444 if (0 == s) 3445 { 3446 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 3447 stream->stream_flags |= STREAM_HEADERS_SENT; 3448 if (eos) 3449 stream->stream_flags |= STREAM_FIN_SENT; 3450 LSQ_INFO("sent headers"); 3451 } 3452 else 3453 LSQ_WARN("could not send headers: %s", strerror(errno)); 3454 return s; 3455} 3456 3457 3458int 3459lsquic_stream_send_headers (lsquic_stream_t *stream, 3460 const lsquic_http_headers_t *headers, int eos) 3461{ 3462 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3463 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 3464 { 3465 if (stream->sm_bflags & SMBF_IETF) 3466 return send_headers_ietf(stream, headers, eos); 3467 else 3468 return send_headers_gquic(stream, headers, eos); 3469 } 3470 else 3471 { 3472 LSQ_INFO("cannot send headers in this state"); 3473 errno = EBADMSG; 3474 return -1; 3475 } 3476} 3477 3478 3479void 3480lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 3481{ 3482 if (offset > stream->max_send_off) 3483 { 3484 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 3485 LSQ_DEBUG("update max send offset from 0x%"PRIX64" to " 3486 "0x%"PRIX64, stream->max_send_off, offset); 3487 stream->max_send_off = offset; 3488 } 3489 else 3490 LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old " 3491 "max send offset 0x%"PRIX64", ignoring", offset, 3492 stream->max_send_off); 3493} 3494 3495 3496/* This function is used to update offsets after handshake completes and we 3497 * learn of peer's limits from the handshake values. 3498 */ 3499int 3500lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 3501{ 3502 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 3503 if (offset > stream->max_send_off) 3504 { 3505 lsquic_stream_window_update(stream, offset); 3506 return 0; 3507 } 3508 else if (offset < stream->tosend_off) 3509 { 3510 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 3511 "data already sent on this stream (%"PRIu64" bytes)", offset, 3512 stream->tosend_off); 3513 return -1; 3514 } 3515 else 3516 { 3517 stream->max_send_off = offset; 3518 return 0; 3519 } 3520} 3521 3522 3523void 3524lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 3525{ 3526 lsquic_stream_reset_ext(stream, error_code, 1); 3527} 3528 3529 3530void 3531lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 3532 int do_close) 3533{ 3534 if ((stream->stream_flags & STREAM_RST_SENT) 3535 || (stream->sm_qflags & SMQF_SEND_RST)) 3536 { 3537 LSQ_INFO("reset already sent"); 3538 return; 3539 } 3540 3541 SM_HISTORY_APPEND(stream, SHE_RESET); 3542 3543 LSQ_INFO("reset, error code %"PRIu64, error_code); 3544 stream->error_code = error_code; 3545 3546 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 3547 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 3548 next_send_stream); 3549 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 3550 stream->sm_qflags |= SMQF_SEND_RST; 3551 3552 if (stream->sm_qflags & SMQF_QPACK_DEC) 3553 { 3554 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 3555 stream->sm_qflags |= ~SMQF_QPACK_DEC; 3556 } 3557 3558 drop_buffered_data(stream); 3559 maybe_elide_stream_frames(stream); 3560 maybe_schedule_call_on_close(stream); 3561 3562 if (do_close) 3563 lsquic_stream_close(stream); 3564 else 3565 maybe_conn_to_tickable_if_writeable(stream, 1); 3566} 3567 3568 3569lsquic_stream_id_t 3570lsquic_stream_id (const lsquic_stream_t *stream) 3571{ 3572 return stream->id; 3573} 3574 3575 3576#if !defined(NDEBUG) && __GNUC__ 3577__attribute__((weak)) 3578#endif 3579struct lsquic_conn * 3580lsquic_stream_conn (const lsquic_stream_t *stream) 3581{ 3582 return stream->conn_pub->lconn; 3583} 3584 3585 3586int 3587lsquic_stream_close (lsquic_stream_t *stream) 3588{ 3589 LSQ_DEBUG("lsquic_stream_close() called"); 3590 SM_HISTORY_APPEND(stream, SHE_CLOSE); 3591 if (lsquic_stream_is_closed(stream)) 3592 { 3593 LSQ_INFO("Attempt to close an already-closed stream"); 3594 errno = EBADF; 3595 return -1; 3596 } 3597 maybe_stream_shutdown_write(stream); 3598 stream_shutdown_read(stream); 3599 maybe_schedule_call_on_close(stream); 3600 maybe_finish_stream(stream); 3601 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 3602 maybe_conn_to_tickable_if_writeable(stream, 1); 3603 return 0; 3604} 3605 3606 3607#ifndef NDEBUG 3608#if __GNUC__ 3609__attribute__((weak)) 3610#endif 3611#endif 3612void 3613lsquic_stream_acked (struct lsquic_stream *stream, 3614 enum quic_frame_type frame_type) 3615{ 3616 assert(stream->n_unacked); 3617 --stream->n_unacked; 3618 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 3619 if (frame_type == QUIC_FRAME_RST_STREAM) 3620 { 3621 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 3622 LSQ_DEBUG("RESET that we sent has been acked by peer"); 3623 stream->stream_flags |= STREAM_RST_ACKED; 3624 } 3625 if (0 == stream->n_unacked) 3626 maybe_finish_stream(stream); 3627} 3628 3629 3630void 3631lsquic_stream_push_req (lsquic_stream_t *stream, 3632 struct uncompressed_headers *push_req) 3633{ 3634 assert(!stream->push_req); 3635 stream->push_req = push_req; 3636 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 3637} 3638 3639 3640int 3641lsquic_stream_is_pushed (const lsquic_stream_t *stream) 3642{ 3643 enum stream_id_type sit; 3644 3645 if (stream->sm_bflags & SMBF_IETF) 3646 { 3647 sit = stream->id & SIT_MASK; 3648 return sit == SIT_UNI_SERVER; 3649 } 3650 else 3651 return 1 & ~stream->id; 3652} 3653 3654 3655int 3656lsquic_stream_push_info (const lsquic_stream_t *stream, 3657 lsquic_stream_id_t *ref_stream_id, void **hset) 3658{ 3659 if (lsquic_stream_is_pushed(stream)) 3660 { 3661 assert(stream->push_req); 3662 *ref_stream_id = stream->push_req->uh_stream_id; 3663 *hset = stream->push_req->uh_hset; 3664 return 0; 3665 } 3666 else 3667 return -1; 3668} 3669 3670 3671int 3672lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 3673{ 3674 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3675 && !(stream->stream_flags & STREAM_HAVE_UH)) 3676 { 3677 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3678 LSQ_DEBUG("received uncompressed headers"); 3679 stream->stream_flags |= STREAM_HAVE_UH; 3680 if (uh->uh_flags & UH_FIN) 3681 { 3682 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3683 * mark request as done: 3684 */ 3685 if (stream->sm_bflags & SMBF_IETF) 3686 assert((stream->sm_bflags & SMBF_SERVER) 3687 && lsquic_stream_is_pushed(stream)); 3688 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3689 } 3690 stream->uh = uh; 3691 if (uh->uh_oth_stream_id == 0) 3692 { 3693 if (uh->uh_weight) 3694 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3695 } 3696 else 3697 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3698 uh->uh_oth_stream_id); 3699 return 0; 3700 } 3701 else 3702 { 3703 LSQ_ERROR("received unexpected uncompressed headers"); 3704 return -1; 3705 } 3706} 3707 3708 3709unsigned 3710lsquic_stream_priority (const lsquic_stream_t *stream) 3711{ 3712 return 256 - stream->sm_priority; 3713} 3714 3715 3716int 3717lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 3718{ 3719 /* The user should never get a reference to the special streams, 3720 * but let's check just in case: 3721 */ 3722 if (lsquic_stream_is_critical(stream)) 3723 return -1; 3724 if (priority < 1 || priority > 256) 3725 return -1; 3726 stream->sm_priority = 256 - priority; 3727 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 3728 LSQ_DEBUG("set priority to %u", priority); 3729 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 3730 return 0; 3731} 3732 3733 3734static int 3735maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 3736{ 3737 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3738 && (stream->stream_flags & STREAM_HEADERS_SENT)) 3739 { 3740 /* We need to send headers only if we are a) using HEADERS stream 3741 * and b) we already sent initial headers. If initial headers 3742 * have not been sent yet, stream priority will be sent in the 3743 * HEADERS frame. 3744 */ 3745 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 3746 stream->id, 0, 0, priority); 3747 } 3748 else 3749 return 0; 3750} 3751 3752 3753static int 3754send_priority_ietf (struct lsquic_stream *stream, unsigned priority) 3755{ 3756 LSQ_WARN("%s: TODO", __func__); /* TODO */ 3757 return -1; 3758} 3759 3760 3761int 3762lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 3763{ 3764 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 3765 { 3766 if (stream->sm_bflags & SMBF_IETF) 3767 return send_priority_ietf(stream, priority); 3768 else 3769 return maybe_send_priority_gquic(stream, priority); 3770 } 3771 else 3772 return -1; 3773} 3774 3775 3776lsquic_stream_ctx_t * 3777lsquic_stream_get_ctx (const lsquic_stream_t *stream) 3778{ 3779 return stream->st_ctx; 3780} 3781 3782 3783int 3784lsquic_stream_refuse_push (lsquic_stream_t *stream) 3785{ 3786 if (lsquic_stream_is_pushed(stream) 3787 && !(stream->sm_qflags & SMQF_SEND_RST) 3788 && !(stream->stream_flags & STREAM_RST_SENT)) 3789 { 3790 LSQ_DEBUG("refusing pushed stream: send reset"); 3791 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 3792 return 0; 3793 } 3794 else 3795 return -1; 3796} 3797 3798 3799size_t 3800lsquic_stream_mem_used (const struct lsquic_stream *stream) 3801{ 3802 size_t size; 3803 3804 size = sizeof(stream); 3805 if (stream->sm_buf) 3806 size += stream->sm_n_allocated; 3807 if (stream->data_in) 3808 size += stream->data_in->di_if->di_mem_used(stream->data_in); 3809 3810 return size; 3811} 3812 3813 3814const lsquic_cid_t * 3815lsquic_stream_cid (const struct lsquic_stream *stream) 3816{ 3817 return LSQUIC_LOG_CONN_ID; 3818} 3819 3820 3821void 3822lsquic_stream_dump_state (const struct lsquic_stream *stream) 3823{ 3824 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 3825 stream->read_offset); 3826 stream->data_in->di_if->di_dump_state(stream->data_in); 3827} 3828 3829 3830void * 3831lsquic_stream_get_hset (struct lsquic_stream *stream) 3832{ 3833 void *hset; 3834 3835 if (lsquic_stream_is_reset(stream)) 3836 { 3837 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 3838 errno = ECONNRESET; 3839 return NULL; 3840 } 3841 3842 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 3843 && (stream->stream_flags & STREAM_HAVE_UH))) 3844 { 3845 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 3846 stream->stream_flags); 3847 return NULL; 3848 } 3849 3850 if (!stream->uh) 3851 { 3852 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 3853 return NULL; 3854 } 3855 3856 if (stream->uh->uh_flags & UH_H1H) 3857 { 3858 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 3859 return NULL; 3860 } 3861 3862 hset = stream->uh->uh_hset; 3863 stream->uh->uh_hset = NULL; 3864 destroy_uh(stream); 3865 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 3866 { 3867 stream->stream_flags |= STREAM_FIN_REACHED; 3868 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 3869 } 3870 LSQ_DEBUG("return header set"); 3871 return hset; 3872} 3873 3874 3875/* GQUIC-only function */ 3876int 3877lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id) 3878{ 3879 return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE 3880 || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http); 3881} 3882 3883 3884int 3885lsquic_stream_is_critical (const struct lsquic_stream *stream) 3886{ 3887 return stream->sm_bflags & SMBF_CRITICAL; 3888} 3889 3890 3891void 3892lsquic_stream_set_stream_if (struct lsquic_stream *stream, 3893 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 3894{ 3895 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 3896 stream->stream_if = stream_if; 3897 stream->sm_onnew_arg = stream_if_ctx; 3898 LSQ_DEBUG("switched interface"); 3899 assert(stream->stream_flags & STREAM_ONNEW_DONE); 3900 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 3901 stream); 3902} 3903 3904 3905static int 3906update_type_hist_and_check (struct hq_filter *filter) 3907{ 3908 /* 3-bit codes: */ 3909 enum { 3910 CODE_UNSET, 3911 CODE_HEADER, /* H Header */ 3912 CODE_DATA, /* D Data */ 3913 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 3914 }; 3915 static const unsigned valid_seqs[] = { 3916 /* Ordered by expected frequency */ 3917 0123, /* HD+ */ 3918 012, /* HD */ 3919 01, /* H */ 3920 01231, /* HD+H */ 3921 0121, /* HDH */ 3922 }; 3923 unsigned code, i; 3924 3925 switch (filter->hqfi_type) 3926 { 3927 case HQFT_HEADERS: 3928 code = CODE_HEADER; 3929 break; 3930 case HQFT_DATA: 3931 code = CODE_DATA; 3932 break; 3933 default: 3934 /* Ignore unknown frames */ 3935 return 0; 3936 } 3937 3938 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 3939 return -1; 3940 3941 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 3942 { 3943 filter->hqfi_hist_buf <<= 3; 3944 filter->hqfi_hist_buf |= CODE_PLUS; 3945 filter->hqfi_hist_idx++; 3946 } 3947 else if (filter->hqfi_hist_idx > 1 3948 && ((filter->hqfi_hist_buf >> 3) & 7) == code 3949 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 3950 /* Keep it at plus, do nothing */; 3951 else 3952 { 3953 filter->hqfi_hist_buf <<= 3; 3954 filter->hqfi_hist_buf |= code; 3955 filter->hqfi_hist_idx++; 3956 } 3957 3958 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 3959 if (filter->hqfi_hist_buf == valid_seqs[i]) 3960 return 0; 3961 3962 return -1; 3963} 3964 3965 3966static size_t 3967hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 3968{ 3969 struct lsquic_stream *const stream = ctx; 3970 struct hq_filter *const filter = &stream->sm_hq_filter; 3971 const unsigned char *p = buf, *prev; 3972 const unsigned char *const end = buf + sz; 3973 struct lsquic_conn *lconn; 3974 enum lsqpack_read_header_status rhs; 3975 int s; 3976 3977 while (p < end) 3978 { 3979 switch (filter->hqfi_state) 3980 { 3981 case HQFI_STATE_FRAME_HEADER_BEGIN: 3982 filter->hqfi_vint_state.vr2s_state = 0; 3983 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 3984 /* fall-through */ 3985 case HQFI_STATE_FRAME_HEADER_CONTINUE: 3986 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state); 3987 if (s < 0) 3988 break; 3989 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 3990 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 3991 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 3992 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 3993 filter->hqfi_left); 3994 if (0 != update_type_hist_and_check(filter)) 3995 { 3996 lconn = stream->conn_pub->lconn; 3997 filter->hqfi_flags |= HQFI_FLAG_ERROR; 3998 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 3999 filter->hqfi_hist_buf); 4000 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED, 4001 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 4002 stream->id); 4003 goto end; 4004 } 4005 if (filter->hqfi_type == HQFT_HEADERS) 4006 { 4007 if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS)) 4008 filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS; 4009 else 4010 { 4011 filter->hqfi_type = (1ull << 62) - 1; 4012 LSQ_DEBUG("Ignoring HEADERS frame"); 4013 } 4014 } 4015 if (filter->hqfi_left > 0) 4016 { 4017 if (filter->hqfi_type == HQFT_DATA) 4018 goto end; 4019 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 4020 { 4021 lconn = stream->conn_pub->lconn; 4022 if (stream->sm_bflags & SMBF_SERVER) 4023 lconn->cn_if->ci_abort_error(lconn, 1, 4024 HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame " 4025 "on stream %"PRIu64" (clients are not supposed to " 4026 "send those)", stream->id); 4027 else 4028 /* Our client implementation does not support server 4029 * push. 4030 */ 4031 lconn->cn_if->ci_abort_error(lconn, 1, 4032 HEC_FRAME_UNEXPECTED, 4033 "Received PUSH_PROMISE frame (not supported)" 4034 "on stream %"PRIu64, stream->id); 4035 goto end; 4036 } 4037 } 4038 else 4039 { 4040 switch (filter->hqfi_type) 4041 { 4042 case HQFT_CANCEL_PUSH: 4043 case HQFT_GOAWAY: 4044 case HQFT_HEADERS: 4045 case HQFT_MAX_PUSH_ID: 4046 case HQFT_PUSH_PROMISE: 4047 case HQFT_SETTINGS: 4048 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4049 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 4050 filter->hqfi_type); 4051 abort_connection(stream); /* XXX Overkill? */ 4052 goto end; 4053 default: 4054 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4055 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4056 break; 4057 } 4058 } 4059 break; 4060 case HQFI_STATE_READING_PAYLOAD: 4061 if (filter->hqfi_type == HQFT_DATA) 4062 goto end; 4063 sz = filter->hqfi_left; 4064 if (sz > (uintptr_t) (end - p)) 4065 sz = (uintptr_t) (end - p); 4066 switch (filter->hqfi_type) 4067 { 4068 case HQFT_HEADERS: 4069 prev = p; 4070 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 4071 { 4072 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4073 rhs = lsquic_qdh_header_in_begin( 4074 stream->conn_pub->u.ietf.qdh, 4075 stream, filter->hqfi_left, &p, sz); 4076 } 4077 else 4078 rhs = lsquic_qdh_header_in_continue( 4079 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 4080 assert(p > prev || LQRHS_ERROR == rhs); 4081 filter->hqfi_left -= p - prev; 4082 if (filter->hqfi_left == 0) 4083 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4084 switch (rhs) 4085 { 4086 case LQRHS_DONE: 4087 assert(filter->hqfi_left == 0); 4088 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4089 break; 4090 case LQRHS_NEED: 4091 stream->sm_qflags |= SMQF_QPACK_DEC; 4092 break; 4093 case LQRHS_BLOCKED: 4094 stream->sm_qflags |= SMQF_QPACK_DEC; 4095 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 4096 goto end; 4097 default: 4098 assert(LQRHS_ERROR == rhs); 4099 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4100 LSQ_INFO("error processing header block"); 4101 abort_connection(stream); /* XXX Overkill? */ 4102 goto end; 4103 } 4104 break; 4105 default: 4106 /* Simply skip unknown frame type payload for now */ 4107 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4108 p += sz; 4109 filter->hqfi_left -= sz; 4110 if (filter->hqfi_left == 0) 4111 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4112 break; 4113 } 4114 break; 4115 default: 4116 assert(0); 4117 goto end; 4118 } 4119 } 4120 4121 end: 4122 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4123 { 4124 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4125 filter->hqfi_state); 4126 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4127/* From [draft-ietf-quic-http-16] Section 3.1: 4128 * When a stream terminates cleanly, if the last frame on 4129 * the stream was truncated, this MUST be treated as a connection error 4130 * (see HTTP_MALFORMED_FRAME in Section 8.1). 4131 */ 4132 abort_connection(stream); 4133 } 4134 4135 return p - buf; 4136} 4137 4138 4139static int 4140hq_filter_readable_now (const struct lsquic_stream *stream) 4141{ 4142 const struct hq_filter *const filter = &stream->sm_hq_filter; 4143 4144 return (filter->hqfi_type == HQFT_DATA 4145 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4146 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4147 || stream->uh 4148 || (stream->stream_flags & STREAM_FIN_REACHED) 4149 ; 4150} 4151 4152 4153static int 4154hq_filter_readable (struct lsquic_stream *stream) 4155{ 4156 struct hq_filter *const filter = &stream->sm_hq_filter; 4157 struct read_frames_status rfs; 4158 4159 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 4160 return 0; 4161 4162 if (!hq_filter_readable_now(stream)) 4163 { 4164 rfs = read_data_frames(stream, 0, hq_read, stream); 4165 if (rfs.total_nread == 0) 4166 { 4167 if (rfs.error) 4168 { 4169 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4170 abort_connection(stream); /* XXX Overkill? */ 4171 return 1; /* Collect error */ 4172 } 4173 return 0; 4174 } 4175 } 4176 4177 return hq_filter_readable_now(stream); 4178} 4179 4180 4181static size_t 4182hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 4183{ 4184 struct hq_filter *const filter = &stream->sm_hq_filter; 4185 size_t nr; 4186 4187 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4188 && filter->hqfi_type == HQFT_DATA)) 4189 { 4190 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 4191 data_frame->df_size - data_frame->df_read_off, 4192 data_frame->df_fin); 4193 if (nr) 4194 { 4195 stream->read_offset += nr; 4196 stream_consumed_bytes(stream); 4197 } 4198 } 4199 else 4200 nr = 0; 4201 4202 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 4203 { 4204 data_frame->df_read_off += nr; 4205 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4206 && filter->hqfi_type == HQFT_DATA) 4207 return MIN(filter->hqfi_left, 4208 (unsigned) data_frame->df_size - data_frame->df_read_off); 4209 else 4210 { 4211 assert(data_frame->df_read_off == data_frame->df_size); 4212 return 0; 4213 } 4214 } 4215 else 4216 { 4217 data_frame->df_read_off = data_frame->df_size; 4218 return 0; 4219 } 4220} 4221 4222 4223static void 4224hq_decr_left (struct lsquic_stream *stream, size_t read) 4225{ 4226 struct hq_filter *const filter = &stream->sm_hq_filter; 4227 4228 if (read) 4229 { 4230 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4231 && filter->hqfi_type == HQFT_DATA); 4232 assert(read <= filter->hqfi_left); 4233 } 4234 4235 filter->hqfi_left -= read; 4236 if (0 == filter->hqfi_left) 4237 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4238} 4239 4240 4241struct qpack_dec_hdl * 4242lsquic_stream_get_qdh (const struct lsquic_stream *stream) 4243{ 4244 return stream->conn_pub->u.ietf.qdh; 4245} 4246 4247 4248/* These are IETF QUIC states */ 4249enum stream_state_sending 4250lsquic_stream_sending_state (const struct lsquic_stream *stream) 4251{ 4252 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 4253 { 4254 if (stream->stream_flags & STREAM_FIN_SENT) 4255 { 4256 if (stream->n_unacked) 4257 return SSS_DATA_SENT; 4258 else 4259 return SSS_DATA_RECVD; 4260 } 4261 else 4262 { 4263 if (stream->tosend_off 4264 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 4265 return SSS_SEND; 4266 else 4267 return SSS_READY; 4268 } 4269 } 4270 else if (stream->stream_flags & STREAM_RST_ACKED) 4271 return SSS_RESET_RECVD; 4272 else 4273 return SSS_RESET_SENT; 4274} 4275 4276 4277const char *const lsquic_sss2str[] = 4278{ 4279 [SSS_READY] = "Ready", 4280 [SSS_SEND] = "Send", 4281 [SSS_DATA_SENT] = "Data Sent", 4282 [SSS_RESET_SENT] = "Reset Sent", 4283 [SSS_DATA_RECVD] = "Data Recvd", 4284 [SSS_RESET_RECVD] = "Reset Recvd", 4285}; 4286 4287 4288const char *const lsquic_ssr2str[] = 4289{ 4290 [SSR_RECV] = "Recv", 4291 [SSR_SIZE_KNOWN] = "Size Known", 4292 [SSR_DATA_RECVD] = "Data Recvd", 4293 [SSR_RESET_RECVD] = "Reset Recvd", 4294 [SSR_DATA_READ] = "Data Read", 4295 [SSR_RESET_READ] = "Reset Read", 4296}; 4297 4298 4299/* These are IETF QUIC states */ 4300enum stream_state_receiving 4301lsquic_stream_receiving_state (struct lsquic_stream *stream) 4302{ 4303 uint64_t n_bytes; 4304 4305 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 4306 { 4307 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 4308 return SSR_RECV; 4309 if (stream->stream_flags & STREAM_FIN_REACHED) 4310 return SSR_DATA_READ; 4311 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 4312 { 4313 n_bytes = stream->data_in->di_if->di_readable_bytes( 4314 stream->data_in, stream->read_offset); 4315 if (stream->read_offset + n_bytes == stream->sm_fin_off) 4316 { 4317 stream->stream_flags |= STREAM_DATA_RECVD; 4318 return SSR_DATA_RECVD; 4319 } 4320 else 4321 return SSR_SIZE_KNOWN; 4322 } 4323 else 4324 return SSR_DATA_RECVD; 4325 } 4326 else if (stream->stream_flags & STREAM_RST_READ) 4327 return SSR_RESET_READ; 4328 else 4329 return SSR_RESET_RECVD; 4330} 4331 4332 4333void 4334lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 4335{ 4336 struct hq_filter *const filter = &stream->sm_hq_filter; 4337 4338 assert(stream->sm_qflags & SMQF_QPACK_DEC); 4339 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 4340 4341 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 4342 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 4343 LSQ_DEBUG("QPACK decoder unblocked"); 4344} 4345 4346 4347int 4348lsquic_stream_is_rejected (const struct lsquic_stream *stream) 4349{ 4350 return stream->stream_flags & STREAM_SS_RECVD; 4351} 4352 4353 4354int 4355lsquic_stream_can_push (const struct lsquic_stream *stream) 4356{ 4357 if (lsquic_stream_is_pushed(stream)) 4358 return 0; 4359 else if (stream->sm_bflags & SMBF_IETF) 4360 return (stream->sm_bflags & SMBF_USE_HEADERS) 4361 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 4362 && stream->sm_send_headers_state == SSHS_BEGIN 4363 ; 4364 else 4365 return 1; 4366} 4367 4368 4369static size_t 4370dp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4371{ 4372 struct lsquic_stream *const stream = lsqr_ctx; 4373 unsigned char *dst = buf; 4374 unsigned char *const end = buf + count; 4375 size_t len; 4376 4377 len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off), 4378 (size_t) (end - dst)); 4379 memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len); 4380 stream->sm_dup_push_off += len; 4381 4382 if (stream->sm_dup_push_len == stream->sm_dup_push_off) 4383 LSQ_DEBUG("finish writing duplicate push"); 4384 4385 return len; 4386} 4387 4388 4389static size_t 4390dp_reader_size (void *lsqr_ctx) 4391{ 4392 struct lsquic_stream *const stream = lsqr_ctx; 4393 4394 return stream->sm_dup_push_len - stream->sm_dup_push_off; 4395} 4396 4397 4398static void 4399init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader) 4400{ 4401 reader->lsqr_read = dp_reader_read; 4402 reader->lsqr_size = dp_reader_size; 4403 reader->lsqr_ctx = stream; 4404} 4405 4406 4407static void 4408on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4409{ 4410 struct lsquic_reader dp_reader; 4411 ssize_t nw; 4412 int want_write; 4413 4414 assert(stream->sm_dup_push_off < stream->sm_dup_push_len); 4415 4416 init_dp_reader(stream, &dp_reader); 4417 nw = stream_write(stream, &dp_reader); 4418 if (nw > 0) 4419 { 4420 LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)", 4421 nw, stream->sm_dup_push_off == stream->sm_dup_push_len ? 4422 "done" : "not done"); 4423 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4424 { 4425 /* Restore want_write flag */ 4426 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4427 if (want_write != stream->sm_saved_want_write) 4428 (void) lsquic_stream_wantwrite(stream, 4429 stream->sm_saved_want_write); 4430 } 4431 } 4432 else if (nw < 0) 4433 { 4434 LSQ_WARN("could not write duplicate push (wrapper)"); 4435 /* XXX What should happen if we hit an error? TODO */ 4436 } 4437} 4438 4439 4440int 4441lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id) 4442{ 4443 struct lsquic_reader dp_reader; 4444 unsigned bits, len; 4445 ssize_t nw; 4446 4447 assert(stream->sm_bflags & SMBF_IETF); 4448 assert(lsquic_stream_can_push(stream)); 4449 4450 bits = vint_val2bits(push_id); 4451 len = 1 << bits; 4452 4453 if (!stream_activate_hq_frame(stream, 4454 stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH, 4455 SHF_FIXED_SIZE, len)) 4456 return -1; 4457 4458 stream->stream_flags |= STREAM_PUSHING; 4459 4460 stream->sm_dup_push_len = len; 4461 stream->sm_dup_push_off = 0; 4462 vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits); 4463 4464 init_dp_reader(stream, &dp_reader); 4465 nw = stream_write(stream, &dp_reader); 4466 if (nw > 0) 4467 { 4468 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4469 LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id); 4470 else 4471 { 4472 LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id); 4473 stream->stream_flags |= STREAM_NOPUSH; 4474 stream->sm_saved_want_write = 4475 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4476 stream_wantwrite(stream, 1); 4477 } 4478 return 0; 4479 } 4480 else 4481 { 4482 if (nw < 0) 4483 LSQ_WARN("failure writing DUPLICATE_PUSH"); 4484 stream->stream_flags |= STREAM_NOPUSH; 4485 stream->stream_flags &= ~STREAM_PUSHING; 4486 return -1; 4487 } 4488} 4489 4490 4491static size_t 4492pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4493{ 4494 struct push_promise *const promise = lsqr_ctx; 4495 unsigned char *dst = buf; 4496 unsigned char *const end = buf + count; 4497 size_t len; 4498 4499 while (dst < end) 4500 { 4501 switch (promise->pp_write_state) 4502 { 4503 case PPWS_ID0: 4504 case PPWS_ID1: 4505 case PPWS_ID2: 4506 case PPWS_ID3: 4507 case PPWS_ID4: 4508 case PPWS_ID5: 4509 case PPWS_ID6: 4510 case PPWS_ID7: 4511 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 4512 ++promise->pp_write_state; 4513 break; 4514 case PPWS_PFX0: 4515 *dst++ = 0; 4516 ++promise->pp_write_state; 4517 break; 4518 case PPWS_PFX1: 4519 *dst++ = 0; 4520 ++promise->pp_write_state; 4521 break; 4522 case PPWS_HBLOCK: 4523 len = MIN(promise->pp_content_len - promise->pp_write_off, 4524 (size_t) (end - dst)); 4525 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 4526 len); 4527 promise->pp_write_off += len; 4528 dst += len; 4529 if (promise->pp_content_len == promise->pp_write_off) 4530 { 4531 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 4532 ": reset push state", promise->pp_id); 4533 promise->pp_write_state = PPWS_DONE; 4534 } 4535 goto end; 4536 default: 4537 goto end; 4538 } 4539 } 4540 4541 end: 4542 return dst - (unsigned char *) buf; 4543} 4544 4545 4546static size_t 4547pp_reader_size (void *lsqr_ctx) 4548{ 4549 struct push_promise *const promise = lsqr_ctx; 4550 size_t size; 4551 4552 size = 0; 4553 switch (promise->pp_write_state) 4554 { 4555 case PPWS_ID0: 4556 case PPWS_ID1: 4557 case PPWS_ID2: 4558 case PPWS_ID3: 4559 case PPWS_ID4: 4560 case PPWS_ID5: 4561 case PPWS_ID6: 4562 case PPWS_ID7: 4563 size += 8 - promise->pp_write_state; 4564 case PPWS_PFX0: 4565 ++size; 4566 /* fall-through */ 4567 case PPWS_PFX1: 4568 ++size; 4569 /* fall-through */ 4570 case PPWS_HBLOCK: 4571 size += promise->pp_content_len - promise->pp_write_off; 4572 break; 4573 default: 4574 break; 4575 } 4576 4577 return size; 4578} 4579 4580 4581static void 4582init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 4583{ 4584 reader->lsqr_read = pp_reader_read; 4585 reader->lsqr_size = pp_reader_size; 4586 reader->lsqr_ctx = promise; 4587} 4588 4589 4590static void 4591on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4592{ 4593 struct lsquic_reader pp_reader; 4594 struct push_promise *promise; 4595 ssize_t nw; 4596 int want_write; 4597 4598 assert(stream_is_pushing_promise(stream)); 4599 4600 promise = SLIST_FIRST(&stream->sm_promises); 4601 init_pp_reader(promise, &pp_reader); 4602 nw = stream_write(stream, &pp_reader); 4603 if (nw > 0) 4604 { 4605 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 4606 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 4607 if (promise->pp_write_state == PPWS_DONE) 4608 { 4609 /* Restore want_write flag */ 4610 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4611 if (want_write != stream->sm_saved_want_write) 4612 (void) lsquic_stream_wantwrite(stream, 4613 stream->sm_saved_want_write); 4614 } 4615 } 4616 else if (nw < 0) 4617 { 4618 LSQ_WARN("could not write push promise (wrapper)"); 4619 /* XXX What should happen if we hit an error? TODO */ 4620 } 4621} 4622 4623 4624/* Success means that the push promise has been placed on sm_promises list and 4625 * the stream now owns it. Failure means that the push promise should be 4626 * destroyed by the caller. 4627 * 4628 * A push promise is written immediately. If it cannot be written to packets 4629 * or buffered whole, the stream is marked as unable to push further promises. 4630 */ 4631int 4632lsquic_stream_push_promise (struct lsquic_stream *stream, 4633 struct push_promise *promise) 4634{ 4635 struct lsquic_reader pp_reader; 4636 unsigned bits, len; 4637 ssize_t nw; 4638 4639 assert(stream->sm_bflags & SMBF_IETF); 4640 assert(lsquic_stream_can_push(stream)); 4641 4642 bits = vint_val2bits(promise->pp_id); 4643 len = 1 << bits; 4644 promise->pp_write_state = 8 - len; 4645 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 4646 bits, 1 << bits); 4647 4648 if (!stream_activate_hq_frame(stream, 4649 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 4650 SHF_FIXED_SIZE, pp_reader_size(promise))) 4651 return -1; 4652 4653 stream->stream_flags |= STREAM_PUSHING; 4654 4655 init_pp_reader(promise, &pp_reader); 4656 nw = stream_write(stream, &pp_reader); 4657 if (nw > 0) 4658 { 4659 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 4660 ++promise->pp_refcnt; 4661 if (promise->pp_write_state == PPWS_DONE) 4662 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 4663 else 4664 { 4665 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 4666 ", disable further pushing", promise->pp_id, 4667 promise->pp_write_state, promise->pp_write_off); 4668 stream->stream_flags |= STREAM_NOPUSH; 4669 stream->sm_saved_want_write = 4670 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4671 stream_wantwrite(stream, 1); 4672 } 4673 return 0; 4674 } 4675 else 4676 { 4677 if (nw < 0) 4678 LSQ_WARN("failure writing push promise"); 4679 stream->stream_flags |= STREAM_NOPUSH; 4680 stream->stream_flags &= ~STREAM_PUSHING; 4681 return -1; 4682 } 4683} 4684