lsquic_stream.c revision bc520ef7
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "fiu-local.h" 30 31#include "lsquic.h" 32 33#include "lsquic_int_types.h" 34#include "lsquic_packet_common.h" 35#include "lsquic_packet_in.h" 36#include "lsquic_malo.h" 37#include "lsquic_conn_flow.h" 38#include "lsquic_rtt.h" 39#include "lsquic_sfcw.h" 40#include "lsquic_varint.h" 41#include "lsquic_hq.h" 42#include "lsquic_hash.h" 43#include "lsquic_stream.h" 44#include "lsquic_conn_public.h" 45#include "lsquic_util.h" 46#include "lsquic_mm.h" 47#include "lsquic_headers_stream.h" 48#include "lsquic_conn.h" 49#include "lsquic_data_in_if.h" 50#include "lsquic_parse.h" 51#include "lsquic_packet_out.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_senhist.h" 54#include "lsquic_pacer.h" 55#include "lsquic_cubic.h" 56#include "lsquic_bw_sampler.h" 57#include "lsquic_minmax.h" 58#include "lsquic_bbr.h" 59#include "lsquic_send_ctl.h" 60#include "lsquic_headers.h" 61#include "lsquic_ev_log.h" 62#include "lsquic_enc_sess.h" 63#include "lsqpack.h" 64#include "lsquic_frab_list.h" 65#include "lsquic_http1x_if.h" 66#include "lsquic_qdec_hdl.h" 67#include "lsquic_qenc_hdl.h" 68#include "lsquic_byteswap.h" 69#include "lsquic_ietf.h" 70#include "lsquic_push_promise.h" 71 72#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 73#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 74#define LSQUIC_LOG_STREAM_ID stream->id 75#include "lsquic_logger.h" 76 77#define MIN(a, b) ((a) < (b) ? (a) : (b)) 78 79static void 80drop_frames_in (lsquic_stream_t *stream); 81 82static void 83maybe_schedule_call_on_close (lsquic_stream_t *stream); 84 85static int 86stream_wantread (lsquic_stream_t *stream, int is_want); 87 88static int 89stream_wantwrite (lsquic_stream_t *stream, int is_want); 90 91static ssize_t 92stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 93 94static ssize_t 95save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 96 97static int 98stream_flush (lsquic_stream_t *stream); 99 100static int 101stream_flush_nocheck (lsquic_stream_t *stream); 102 103static void 104maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 105 106enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 107 108static enum swtp_status 109stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 110 111static enum swtp_status 112stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 113 114static enum swtp_status 115stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 116 117static size_t 118stream_write_avail_no_frames (struct lsquic_stream *); 119 120static size_t 121stream_write_avail_with_frames (struct lsquic_stream *); 122 123static size_t 124stream_write_avail_with_headers (struct lsquic_stream *); 125 126static int 127hq_filter_readable (struct lsquic_stream *stream); 128 129static void 130hq_decr_left (struct lsquic_stream *stream, size_t); 131 132static size_t 133hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 134 135static int 136stream_readable_non_http (struct lsquic_stream *stream); 137 138static int 139stream_readable_http_gquic (struct lsquic_stream *stream); 140 141static int 142stream_readable_http_ietf (struct lsquic_stream *stream); 143 144static ssize_t 145stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 146 147static size_t 148active_hq_frame_sizes (const struct lsquic_stream *); 149 150static void 151on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 152 153static void 154stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 155 156static size_t 157stream_hq_frame_size (const struct stream_hq_frame *); 158 159const struct stream_filter_if hq_stream_filter_if = 160{ 161 .sfi_readable = hq_filter_readable, 162 .sfi_filter_df = hq_filter_df, 163 .sfi_decr_left = hq_decr_left, 164}; 165 166 167#if LSQUIC_KEEP_STREAM_HISTORY 168/* These values are printable ASCII characters for ease of printing the 169 * whole history in a single line of a log message. 170 * 171 * The list of events is not exhaustive: only most interesting events 172 * are recorded. 173 */ 174enum stream_history_event 175{ 176 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 177 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 178 SHE_REACH_FIN = 'a', 179 SHE_BLOCKED_OUT = 'b', 180 SHE_CREATED = 'C', 181 SHE_FRAME_IN = 'd', 182 SHE_FRAME_OUT = 'D', 183 SHE_RESET = 'e', 184 SHE_WINDOW_UPDATE = 'E', 185 SHE_FIN_IN = 'f', 186 SHE_FINISHED = 'F', 187 SHE_GOAWAY_IN = 'g', 188 SHE_USER_WRITE_HEADER = 'h', 189 SHE_HEADERS_IN = 'H', 190 SHE_IF_SWITCH = 'i', 191 SHE_ONCLOSE_SCHED = 'l', 192 SHE_ONCLOSE_CALL = 'L', 193 SHE_ONNEW = 'N', 194 SHE_SET_PRIO = 'p', 195 SHE_USER_READ = 'r', 196 SHE_SHUTDOWN_READ = 'R', 197 SHE_RST_IN = 's', 198 SHE_SS_IN = 'S', 199 SHE_RST_OUT = 't', 200 SHE_RST_ACKED = 'T', 201 SHE_FLUSH = 'u', 202 SHE_USER_WRITE_DATA = 'w', 203 SHE_SHUTDOWN_WRITE = 'W', 204 SHE_CLOSE = 'X', 205 SHE_DELAY_SW = 'y', 206 SHE_FORCE_FINISH = 'Z', 207 SHE_WANTREAD_NO = '0', /* "YES" must be one more than "NO" */ 208 SHE_WANTREAD_YES = '1', 209 SHE_WANTWRITE_NO = '2', 210 SHE_WANTWRITE_YES = '3', 211}; 212 213static void 214sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 215{ 216 enum stream_history_event prev_event; 217 sm_hist_idx_t idx; 218 int plus; 219 220 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 221 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 222 idx = (idx - plus) & SM_HIST_IDX_MASK; 223 prev_event = stream->sm_hist_buf[idx]; 224 225 if (prev_event == sh_event && plus) 226 return; 227 228 if (prev_event == sh_event) 229 sh_event = SHE_PLUS; 230 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 231 232 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 233 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 234 stream->sm_hist_buf); 235} 236 237 238# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 239# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 240 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 241 LSQ_DEBUG("history: [%.*s]", \ 242 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 243 (stream)->sm_hist_buf); \ 244 } while (0) 245#else 246# define SM_HISTORY_APPEND(stream, event) 247# define SM_HISTORY_DUMP_REMAINING(stream) 248#endif 249 250 251static int 252stream_inside_callback (const lsquic_stream_t *stream) 253{ 254 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 255} 256 257 258static void 259maybe_conn_to_tickable (lsquic_stream_t *stream) 260{ 261 if (!stream_inside_callback(stream)) 262 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 263 stream->conn_pub->lconn); 264} 265 266 267/* Here, "readable" means that the user is able to read from the stream. */ 268static void 269maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 270{ 271 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 272 { 273 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 274 stream->conn_pub->lconn); 275 } 276} 277 278 279/* Here, "writeable" means that data can be put into packets to be 280 * scheduled to be sent out. 281 * 282 * If `check_can_send' is false, it means that we do not need to check 283 * whether packets can be sent. This check was already performed when 284 * we packetized stream data. 285 */ 286static void 287maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 288 int check_can_send) 289{ 290 if (!stream_inside_callback(stream) && 291 (!check_can_send 292 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 293 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 294 { 295 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 296 stream->conn_pub->lconn); 297 } 298} 299 300 301static int 302stream_stalled (const lsquic_stream_t *stream) 303{ 304 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 305 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 306 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 307} 308 309 310static size_t 311stream_stream_frame_header_sz (const struct lsquic_stream *stream, 312 unsigned data_sz) 313{ 314 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 315 stream->id, stream->tosend_off, data_sz); 316} 317 318 319static size_t 320stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 321 unsigned data_sz) 322{ 323 return stream->conn_pub->lconn->cn_pf 324 ->pf_calc_crypto_frame_header_sz(stream->tosend_off, data_sz); 325} 326 327 328/* GQUIC-only function */ 329static int 330stream_is_hsk (const struct lsquic_stream *stream) 331{ 332 if (stream->sm_bflags & SMBF_IETF) 333 return 0; 334 else 335 return lsquic_stream_is_crypto(stream); 336} 337 338 339static struct lsquic_stream * 340stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 341 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 342 enum stream_ctor_flags ctor_flags) 343{ 344 struct lsquic_stream *stream; 345 346 stream = calloc(1, sizeof(*stream)); 347 if (!stream) 348 return NULL; 349 350 if (ctor_flags & SCF_USE_DI_HASH) 351 stream->data_in = data_in_hash_new(conn_pub, id, 0); 352 else 353 stream->data_in = data_in_nocopy_new(conn_pub, id); 354 if (!stream->data_in) 355 { 356 free(stream); 357 return NULL; 358 } 359 360 stream->id = id; 361 stream->stream_if = stream_if; 362 stream->conn_pub = conn_pub; 363 stream->sm_onnew_arg = stream_if_ctx; 364 stream->sm_write_avail = stream_write_avail_no_frames; 365 366 STAILQ_INIT(&stream->sm_hq_frames); 367 368 stream->sm_bflags |= ctor_flags & ((1 << N_SMBF_FLAGS) - 1); 369 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 370 stream->sm_bflags |= SMBF_SERVER; 371 372 return stream; 373} 374 375 376lsquic_stream_t * 377lsquic_stream_new (lsquic_stream_id_t id, 378 struct lsquic_conn_public *conn_pub, 379 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 380 unsigned initial_window, uint64_t initial_send_off, 381 enum stream_ctor_flags ctor_flags) 382{ 383 lsquic_cfcw_t *cfcw; 384 lsquic_stream_t *stream; 385 386 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 387 ctor_flags); 388 if (!stream) 389 return NULL; 390 391 if (!initial_window) 392 initial_window = 16 * 1024; 393 394 if (ctor_flags & SCF_IETF) 395 { 396 cfcw = &conn_pub->cfcw; 397 stream->sm_bflags |= SMBF_CONN_LIMITED; 398 if (ctor_flags & SCF_HTTP) 399 { 400 stream->sm_write_avail = stream_write_avail_with_headers; 401 stream->sm_readable = stream_readable_http_ietf; 402 stream->sm_sfi = &hq_stream_filter_if; 403 } 404 else 405 stream->sm_readable = stream_readable_non_http; 406 lsquic_stream_set_priority_internal(stream, 407 LSQUIC_STREAM_DEFAULT_PRIO); 408 stream->sm_write_to_packet = stream_write_to_packet_std; 409 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 410 } 411 else 412 { 413 if (ctor_flags & SCF_CRITICAL) 414 cfcw = NULL; 415 else 416 { 417 cfcw = &conn_pub->cfcw; 418 stream->sm_bflags |= SMBF_CONN_LIMITED; 419 lsquic_stream_set_priority_internal(stream, 420 LSQUIC_STREAM_DEFAULT_PRIO); 421 } 422 if (stream->sm_bflags & SMBF_USE_HEADERS) 423 stream->sm_readable = stream_readable_http_gquic; 424 else 425 stream->sm_readable = stream_readable_non_http; 426 if (ctor_flags & SCF_CRYPTO_FRAMES) 427 { 428 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 429 stream->sm_write_to_packet = stream_write_to_packet_crypto; 430 } 431 else 432 { 433 if (stream_is_hsk(stream)) 434 stream->sm_write_to_packet = stream_write_to_packet_hsk; 435 else 436 stream->sm_write_to_packet = stream_write_to_packet_std; 437 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 438 } 439 } 440 441 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 442 stream->max_send_off = initial_send_off; 443 LSQ_DEBUG("created stream"); 444 SM_HISTORY_APPEND(stream, SHE_CREATED); 445 if (ctor_flags & SCF_CALL_ON_NEW) 446 lsquic_stream_call_on_new(stream); 447 return stream; 448} 449 450 451struct lsquic_stream * 452lsquic_stream_new_crypto (enum enc_level enc_level, 453 struct lsquic_conn_public *conn_pub, 454 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 455 enum stream_ctor_flags ctor_flags) 456{ 457 struct lsquic_stream *stream; 458 lsquic_stream_id_t stream_id; 459 460 assert(ctor_flags & SCF_CRITICAL); 461 462 fiu_return_on("stream/new_crypto", NULL); 463 464 stream_id = ~0ULL - enc_level; 465 stream = stream_new_common(stream_id, conn_pub, stream_if, 466 stream_if_ctx, ctor_flags); 467 if (!stream) 468 return NULL; 469 470 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 471 stream->sm_enc_level = enc_level; 472 /* TODO: why have limit in crypto stream? Set it to UINT64_MAX? */ 473 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 474 stream->max_send_off = 16 * 1024; 475 LSQ_DEBUG("created crypto stream"); 476 SM_HISTORY_APPEND(stream, SHE_CREATED); 477 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 478 stream->sm_write_to_packet = stream_write_to_packet_crypto; 479 stream->sm_readable = stream_readable_non_http; 480 if (ctor_flags & SCF_CALL_ON_NEW) 481 lsquic_stream_call_on_new(stream); 482 return stream; 483} 484 485 486void 487lsquic_stream_call_on_new (lsquic_stream_t *stream) 488{ 489 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 490 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 491 { 492 LSQ_DEBUG("calling on_new_stream"); 493 SM_HISTORY_APPEND(stream, SHE_ONNEW); 494 stream->stream_flags |= STREAM_ONNEW_DONE; 495 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 496 stream); 497 } 498} 499 500 501static void 502decr_conn_cap (struct lsquic_stream *stream, size_t incr) 503{ 504 if (stream->sm_bflags & SMBF_CONN_LIMITED) 505 { 506 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 507 stream->conn_pub->conn_cap.cc_sent -= incr; 508 } 509} 510 511 512static void 513maybe_resize_stream_buffer (struct lsquic_stream *stream) 514{ 515 assert(0 == stream->sm_n_buffered); 516 517 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 518 { 519 free(stream->sm_buf); 520 stream->sm_buf = NULL; 521 stream->sm_n_allocated = 0; 522 } 523 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 524 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 525} 526 527 528static void 529drop_buffered_data (struct lsquic_stream *stream) 530{ 531 decr_conn_cap(stream, stream->sm_n_buffered); 532 stream->sm_n_buffered = 0; 533 maybe_resize_stream_buffer(stream); 534 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 535 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 536} 537 538 539static void 540destroy_uh (struct lsquic_stream *stream) 541{ 542 if (stream->uh) 543 { 544 if (stream->uh->uh_hset) 545 stream->conn_pub->enpub->enp_hsi_if 546 ->hsi_discard_header_set(stream->uh->uh_hset); 547 free(stream->uh); 548 stream->uh = NULL; 549 } 550} 551 552 553void 554lsquic_stream_destroy (lsquic_stream_t *stream) 555{ 556 struct push_promise *promise; 557 struct stream_hq_frame *shf; 558 559 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 560 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 561 STREAM_ONNEW_DONE) 562 { 563 stream->stream_flags |= STREAM_ONCLOSE_DONE; 564 stream->stream_if->on_close(stream, stream->st_ctx); 565 } 566 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 567 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 568 if (stream->sm_qflags & SMQF_WANT_READ) 569 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 570 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 571 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 572 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 573 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 574 if (stream->sm_qflags & SMQF_QPACK_DEC) 575 lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream); 576 drop_buffered_data(stream); 577 lsquic_sfcw_consume_rem(&stream->fc); 578 drop_frames_in(stream); 579 if (stream->push_req) 580 { 581 if (stream->push_req->uh_hset) 582 stream->conn_pub->enpub->enp_hsi_if 583 ->hsi_discard_header_set(stream->push_req->uh_hset); 584 free(stream->push_req); 585 } 586 while ((promise = SLIST_FIRST(&stream->sm_promises))) 587 { 588 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 589 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 590 } 591 if (stream->sm_promise) 592 { 593 assert(stream->sm_promise->pp_pushed_stream == stream); 594 stream->sm_promise->pp_pushed_stream = NULL; 595 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 596 } 597 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 598 stream_hq_frame_put(stream, shf); 599 destroy_uh(stream); 600 free(stream->sm_buf); 601 free(stream->sm_header_block); 602 LSQ_DEBUG("destroyed stream"); 603 SM_HISTORY_DUMP_REMAINING(stream); 604 free(stream); 605} 606 607 608static int 609stream_is_finished (const lsquic_stream_t *stream) 610{ 611 return lsquic_stream_is_closed(stream) 612 /* n_unacked checks that no outgoing packets that reference this 613 * stream are outstanding: 614 */ 615 && 0 == stream->n_unacked 616 /* This checks that no packets that reference this stream will 617 * become outstanding: 618 */ 619 && 0 == (stream->sm_qflags & SMQF_SEND_RST) 620 && ((stream->stream_flags & STREAM_FORCE_FINISH) 621 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 622} 623 624 625/* This is an internal function */ 626void 627lsquic_stream_force_finish (struct lsquic_stream *stream) 628{ 629 LSQ_DEBUG("stream is now finished"); 630 SM_HISTORY_APPEND(stream, SHE_FINISHED); 631 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 632 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 633 next_service_stream); 634 stream->sm_qflags |= SMQF_FREE_STREAM; 635 stream->stream_flags |= STREAM_FINISHED; 636} 637 638 639static void 640maybe_finish_stream (lsquic_stream_t *stream) 641{ 642 if (0 == (stream->stream_flags & STREAM_FINISHED) && 643 stream_is_finished(stream)) 644 lsquic_stream_force_finish(stream); 645} 646 647 648static void 649maybe_schedule_call_on_close (lsquic_stream_t *stream) 650{ 651 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 652 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 653 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 654 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 655 { 656 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 657 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 658 next_service_stream); 659 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 660 LSQ_DEBUG("scheduled calling on_close"); 661 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 662 } 663} 664 665 666void 667lsquic_stream_call_on_close (lsquic_stream_t *stream) 668{ 669 assert(stream->stream_flags & STREAM_ONNEW_DONE); 670 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 671 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 672 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 673 next_service_stream); 674 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 675 { 676 LSQ_DEBUG("calling on_close"); 677 stream->stream_flags |= STREAM_ONCLOSE_DONE; 678 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 679 stream->stream_if->on_close(stream, stream->st_ctx); 680 } 681 else 682 assert(0); 683} 684 685 686static int 687stream_has_frame_at_read_offset (struct lsquic_stream *stream) 688{ 689 if (!((stream->stream_flags & STREAM_CACHED_FRAME) 690 && stream->read_offset == stream->sm_last_frame_off)) 691 { 692 stream->sm_has_frame = stream->data_in->di_if->di_get_frame( 693 stream->data_in, stream->read_offset) != NULL; 694 stream->sm_last_frame_off = stream->read_offset; 695 stream->stream_flags |= STREAM_CACHED_FRAME; 696 } 697 return stream->sm_has_frame; 698} 699 700 701static int 702stream_readable_non_http (struct lsquic_stream *stream) 703{ 704 return stream_has_frame_at_read_offset(stream); 705} 706 707 708static int 709stream_readable_http_gquic (struct lsquic_stream *stream) 710{ 711 return (stream->stream_flags & STREAM_HAVE_UH) 712 && (stream->uh 713 || stream_has_frame_at_read_offset(stream)); 714} 715 716 717static int 718stream_readable_http_ietf (struct lsquic_stream *stream) 719{ 720 return 721 /* If we have read the header set and the header set has not yet 722 * been read, the stream is readable. 723 */ 724 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 725 || 726 /* Alternatively, run the filter and check for payload availability. */ 727 (stream->sm_sfi->sfi_readable(stream) 728 && (/* Running the filter may result in hitting FIN: */ 729 (stream->stream_flags & STREAM_FIN_REACHED) 730 || stream_has_frame_at_read_offset(stream))); 731} 732 733 734int 735lsquic_stream_readable (struct lsquic_stream *stream) 736{ 737 /* A stream is readable if one of the following is true: */ 738 return 739 /* - It is already finished: in that case, lsquic_stream_read() will 740 * return 0. 741 */ 742 (stream->stream_flags & STREAM_FIN_REACHED) 743 /* - The stream is reset, by either side. In this case, 744 * lsquic_stream_read() will return -1 (we want the user to be 745 * able to collect the error). 746 */ 747 || lsquic_stream_is_reset(stream) 748 /* Type-dependent readability check: */ 749 || stream->sm_readable(stream); 750 ; 751} 752 753 754static size_t 755stream_write_avail_no_frames (struct lsquic_stream *stream) 756{ 757 uint64_t stream_avail, conn_avail; 758 759 stream_avail = stream->max_send_off - stream->tosend_off 760 - stream->sm_n_buffered; 761 762 if (stream->sm_bflags & SMBF_CONN_LIMITED) 763 { 764 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 765 if (conn_avail < stream_avail) 766 stream_avail = conn_avail; 767 } 768 769 return stream_avail; 770} 771 772 773static size_t 774stream_write_avail_with_frames (struct lsquic_stream *stream) 775{ 776 uint64_t stream_avail, conn_avail; 777 const struct stream_hq_frame *shf; 778 size_t size; 779 780 stream_avail = stream->max_send_off - stream->tosend_off 781 - stream->sm_n_buffered; 782 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 783 if (!(shf->shf_flags & SHF_WRITTEN)) 784 { 785 size = stream_hq_frame_size(shf); 786 assert(size <= stream_avail); 787 stream_avail -= size; 788 } 789 790 if (stream->sm_bflags & SMBF_CONN_LIMITED) 791 { 792 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 793 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 794 if (!(shf->shf_flags & SHF_CC_PAID)) 795 { 796 size = stream_hq_frame_size(shf); 797 if (size < conn_avail) 798 conn_avail -= size; 799 else 800 return 0; 801 } 802 if (conn_avail < stream_avail) 803 stream_avail = conn_avail; 804 } 805 806 if (stream_avail >= 3 /* Smallest new frame */) 807 return stream_avail; 808 else 809 return 0; 810} 811 812 813static int 814stream_is_pushing_promise (const struct lsquic_stream *stream) 815{ 816 return (stream->stream_flags & STREAM_PUSHING) 817 && SLIST_FIRST(&stream->sm_promises) 818 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 819 ; 820} 821 822 823/* To prevent deadlocks, ensure that when headers are sent, the bytes 824 * sent on the encoder stream are written first. 825 * 826 * XXX If the encoder is set up in non-risking mode, it is perfectly 827 * fine to send the header block first. TODO: update the logic to 828 * reflect this. There should be two sending behaviors: risk and non-risk. 829 * For now, we assume risk for everything to be on the conservative side. 830 */ 831static size_t 832stream_write_avail_with_headers (struct lsquic_stream *stream) 833{ 834 if (stream->stream_flags & STREAM_PUSHING) 835 return stream_write_avail_with_frames(stream); 836 837 switch (stream->sm_send_headers_state) 838 { 839 case SSHS_BEGIN: 840 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 841 case SSHS_ENC_SENDING: 842 if (stream->sm_hb_compl > 843 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 844 return 0; 845 LSQ_DEBUG("encoder stream bytes have all been sent"); 846 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 847 /* fall-through */ 848 default: 849 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 850 return stream_write_avail_with_frames(stream); 851 } 852} 853 854 855size_t 856lsquic_stream_write_avail (struct lsquic_stream *stream) 857{ 858 return stream->sm_write_avail(stream); 859} 860 861 862int 863lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 864{ 865 struct lsquic_conn *lconn; 866 867 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 868 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 869 { 870 if (stream->sm_bflags & SMBF_IETF) 871 { 872 lconn = stream->conn_pub->lconn; 873 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 874 "flow control violation on stream %"PRIu64, stream->id); 875 } 876 return -1; 877 } 878 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 879 { 880 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 881 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 882 next_send_stream); 883 stream->sm_qflags |= SMQF_SEND_WUF; 884 } 885 return 0; 886} 887 888 889int 890lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 891{ 892 uint64_t max_off; 893 int got_next_offset, rv, free_frame; 894 enum ins_frame ins_frame; 895 struct lsquic_conn *lconn; 896 897 assert(frame->packet_in); 898 899 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 900 LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; " 901 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 902 903 if ((stream->sm_bflags & SMBF_USE_HEADERS) 904 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 905 { 906 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 907 lsquic_malo_put(frame); 908 return -1; 909 } 910 911 if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF) 912 && (stream->stream_flags & STREAM_FIN_RECVD) 913 && stream->sm_fin_off != DF_END(frame)) 914 { 915 lconn = stream->conn_pub->lconn; 916 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 917 "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does " 918 "not match previous final size %"PRIu64, DF_END(frame), 919 stream->id, stream->sm_fin_off); 920 return -1; 921 } 922 923 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 924 insert_frame: 925 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 926 if (INS_FRAME_OK == ins_frame) 927 { 928 /* Update maximum offset in the flow controller and check for flow 929 * control violation: 930 */ 931 rv = -1; 932 free_frame = !stream->data_in->di_if->di_own_on_ok; 933 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 934 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 935 goto end_ok; 936 if (frame->data_frame.df_fin) 937 { 938 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 939 stream->stream_flags |= STREAM_FIN_RECVD; 940 stream->sm_fin_off = DF_END(frame); 941 maybe_finish_stream(stream); 942 } 943 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 944 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 945 { 946 stream->data_in = stream->data_in->di_if->di_switch_impl( 947 stream->data_in, stream->read_offset); 948 if (!stream->data_in) 949 { 950 stream->data_in = data_in_error_new(); 951 goto end_ok; 952 } 953 } 954 if (got_next_offset) 955 /* Checking the offset saves di_get_frame() call */ 956 maybe_conn_to_tickable_if_readable(stream); 957 rv = 0; 958 end_ok: 959 if (free_frame) 960 lsquic_malo_put(frame); 961 stream->stream_flags &= ~STREAM_CACHED_FRAME; 962 return rv; 963 } 964 else if (INS_FRAME_DUP == ins_frame) 965 { 966 return 0; 967 } 968 else if (INS_FRAME_OVERLAP == ins_frame) 969 { 970 LSQ_DEBUG("overlap: switching DATA IN implementation"); 971 stream->data_in = stream->data_in->di_if->di_switch_impl( 972 stream->data_in, stream->read_offset); 973 if (stream->data_in) 974 goto insert_frame; 975 stream->data_in = data_in_error_new(); 976 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 977 lsquic_malo_put(frame); 978 return -1; 979 } 980 else 981 { 982 assert(INS_FRAME_ERR == ins_frame); 983 return -1; 984 } 985} 986 987 988static void 989drop_frames_in (lsquic_stream_t *stream) 990{ 991 if (stream->data_in) 992 { 993 stream->data_in->di_if->di_destroy(stream->data_in); 994 /* To avoid checking whether `data_in` is set, just set to the error 995 * data-in stream. It does the right thing after incoming data is 996 * dropped. 997 */ 998 stream->data_in = data_in_error_new(); 999 stream->stream_flags &= ~STREAM_CACHED_FRAME; 1000 } 1001} 1002 1003 1004static void 1005maybe_elide_stream_frames (struct lsquic_stream *stream) 1006{ 1007 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 1008 { 1009 if (stream->n_unacked) 1010 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 1011 stream->id); 1012 stream->stream_flags |= STREAM_FRAMES_ELIDED; 1013 } 1014} 1015 1016 1017int 1018lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 1019 uint64_t error_code) 1020{ 1021 struct lsquic_conn *lconn; 1022 1023 if ((stream->sm_bflags & SMBF_IETF) 1024 && (stream->stream_flags & STREAM_FIN_RECVD) 1025 && stream->sm_fin_off != offset) 1026 { 1027 lconn = stream->conn_pub->lconn; 1028 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1029 "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") " 1030 "does not match previous final size %"PRIu64, offset, 1031 stream->id, stream->sm_fin_off); 1032 return -1; 1033 } 1034 1035 if (stream->stream_flags & STREAM_RST_RECVD) 1036 { 1037 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 1038 return 0; 1039 } 1040 1041 SM_HISTORY_APPEND(stream, SHE_RST_IN); 1042 /* This flag must always be set, even if we are "ignoring" it: it is 1043 * used by elision code. 1044 */ 1045 stream->stream_flags |= STREAM_RST_RECVD; 1046 1047 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 1048 { 1049 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is " 1050 "smaller than that of byte following the last byte we have seen: " 1051 "0x%"PRIX64, offset, 1052 lsquic_sfcw_get_max_recv_off(&stream->fc)); 1053 return -1; 1054 } 1055 1056 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 1057 { 1058 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64 1059 " violates flow control", offset); 1060 return -1; 1061 } 1062 1063 /* Let user collect error: */ 1064 maybe_conn_to_tickable_if_readable(stream); 1065 1066 lsquic_sfcw_consume_rem(&stream->fc); 1067 drop_frames_in(stream); 1068 drop_buffered_data(stream); 1069 maybe_elide_stream_frames(stream); 1070 1071 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1072 && !(stream->sm_qflags & SMQF_SEND_RST)) 1073 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1074 1075 stream->stream_flags |= STREAM_RST_RECVD; 1076 1077 maybe_finish_stream(stream); 1078 maybe_schedule_call_on_close(stream); 1079 1080 return 0; 1081} 1082 1083 1084void 1085lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1086 uint64_t error_code) 1087{ 1088 if (stream->stream_flags & STREAM_SS_RECVD) 1089 { 1090 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1091 return; 1092 } 1093 1094 SM_HISTORY_APPEND(stream, SHE_SS_IN); 1095 stream->stream_flags |= STREAM_SS_RECVD; 1096 1097 /* Let user collect error: */ 1098 maybe_conn_to_tickable_if_readable(stream); 1099 1100 lsquic_sfcw_consume_rem(&stream->fc); 1101 drop_frames_in(stream); 1102 drop_buffered_data(stream); 1103 maybe_elide_stream_frames(stream); 1104 1105 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1106 && !(stream->sm_qflags & SMQF_SEND_RST)) 1107 lsquic_stream_reset_ext(stream, error_code, 0); 1108 1109 maybe_finish_stream(stream); 1110 maybe_schedule_call_on_close(stream); 1111} 1112 1113 1114uint64_t 1115lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1116{ 1117 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1118} 1119 1120 1121void 1122lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1123{ 1124 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1125 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1126 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1127 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1128 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1129} 1130 1131 1132uint64_t 1133lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1134{ 1135 assert(stream->sm_qflags & SMQF_SEND_WUF); 1136 stream->sm_qflags &= ~SMQF_SEND_WUF; 1137 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1138 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1139 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1140} 1141 1142 1143void 1144lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1145{ 1146 uint64_t last_off; 1147 1148 if (stream->sm_last_recv_off) 1149 last_off = stream->sm_last_recv_off; 1150 else 1151 /* This gets advertized in transport parameters */ 1152 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1153 1154 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1155 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1156 1157 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1158 { 1159 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1160 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1161 next_send_stream); 1162 stream->sm_qflags |= SMQF_SEND_WUF; 1163 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1164 } 1165 else if (stream->sm_qflags & SMQF_SEND_WUF) 1166 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1167 else if (stream->sm_last_recv_off) 1168 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1169 "packetized or sent", stream->sm_last_recv_off); 1170 else 1171 LSQ_INFO("Peer should have receive transport param limit " 1172 "of %"PRIu64"; odd.", last_off); 1173} 1174 1175 1176/* GQUIC's BLOCKED frame does not have an offset */ 1177void 1178lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1179{ 1180 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1181 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1182 { 1183 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1184 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1185 next_send_stream); 1186 stream->sm_qflags |= SMQF_SEND_WUF; 1187 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1188 } 1189 else 1190 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1191} 1192 1193 1194void 1195lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1196{ 1197 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1198 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1199 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1200 stream->stream_flags |= STREAM_BLOCKED_SENT; 1201 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1202 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1203} 1204 1205 1206void 1207lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1208{ 1209 assert(stream->sm_qflags & SMQF_SEND_RST); 1210 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1211 stream->sm_qflags &= ~SMQF_SEND_RST; 1212 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1213 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1214 stream->stream_flags |= STREAM_RST_SENT; 1215 maybe_finish_stream(stream); 1216} 1217 1218 1219static size_t 1220read_uh (struct lsquic_stream *stream, 1221 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1222{ 1223 struct http1x_headers *const h1h = stream->uh->uh_hset; 1224 size_t nread; 1225 1226 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1227 h1h->h1h_size - h1h->h1h_off, 1228 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1229 h1h->h1h_off += nread; 1230 if (h1h->h1h_off == h1h->h1h_size) 1231 { 1232 LSQ_DEBUG("read all uncompressed headers"); 1233 destroy_uh(stream); 1234 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1235 { 1236 stream->stream_flags |= STREAM_FIN_REACHED; 1237 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1238 } 1239 } 1240 return nread; 1241} 1242 1243 1244static void 1245verify_cl_on_fin (struct lsquic_stream *stream) 1246{ 1247 struct lsquic_conn *lconn; 1248 1249 /* The rules in RFC7230, Section 3.3.2 are a bit too intricate. We take 1250 * a simple approach and verify content-length only when there was any 1251 * payload at all. 1252 */ 1253 if (stream->sm_data_in != 0 && stream->sm_cont_len != stream->sm_data_in) 1254 { 1255 lconn = stream->conn_pub->lconn; 1256 lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR, 1257 "number of bytes in DATA frames of stream %"PRIu64" is %llu, " 1258 "while content-length specified of %llu", stream->id, 1259 stream->sm_data_in, stream->sm_cont_len); 1260 } 1261} 1262 1263 1264static void 1265stream_consumed_bytes (struct lsquic_stream *stream) 1266{ 1267 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1268 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1269 { 1270 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1271 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1272 next_send_stream); 1273 stream->sm_qflags |= SMQF_SEND_WUF; 1274 maybe_conn_to_tickable_if_writeable(stream, 1); 1275 } 1276} 1277 1278 1279struct read_frames_status 1280{ 1281 int error; 1282 int processed_frames; 1283 size_t total_nread; 1284}; 1285 1286 1287static struct read_frames_status 1288read_data_frames (struct lsquic_stream *stream, int do_filtering, 1289 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1290{ 1291 struct data_frame *data_frame; 1292 size_t nread, toread, total_nread; 1293 int short_read, processed_frames; 1294 1295 processed_frames = 0; 1296 total_nread = 0; 1297 1298 while ((data_frame = stream->data_in->di_if->di_get_frame( 1299 stream->data_in, stream->read_offset))) 1300 { 1301 1302 ++processed_frames; 1303 1304 do 1305 { 1306 if (do_filtering && stream->sm_sfi) 1307 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1308 else 1309 toread = data_frame->df_size - data_frame->df_read_off; 1310 1311 if (toread || data_frame->df_fin) 1312 { 1313 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1314 toread, data_frame->df_fin); 1315 if (do_filtering && stream->sm_sfi) 1316 stream->sm_sfi->sfi_decr_left(stream, nread); 1317 data_frame->df_read_off += nread; 1318 stream->read_offset += nread; 1319 total_nread += nread; 1320 short_read = nread < toread; 1321 } 1322 else 1323 short_read = 0; 1324 1325 if (data_frame->df_read_off == data_frame->df_size) 1326 { 1327 const int fin = data_frame->df_fin; 1328 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1329 data_frame = NULL; 1330 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 1331 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 1332 { 1333 stream->data_in = stream->data_in->di_if->di_switch_impl( 1334 stream->data_in, stream->read_offset); 1335 if (!stream->data_in) 1336 { 1337 stream->data_in = data_in_error_new(); 1338 return (struct read_frames_status) { .error = 1, }; 1339 } 1340 } 1341 if (fin) 1342 { 1343 stream->stream_flags |= STREAM_FIN_REACHED; 1344 if (stream->sm_bflags & SMBF_VERIFY_CL) 1345 verify_cl_on_fin(stream); 1346 goto end_while; 1347 } 1348 } 1349 else if (short_read) 1350 goto end_while; 1351 } 1352 while (data_frame); 1353 } 1354 end_while: 1355 1356 if (processed_frames) 1357 stream_consumed_bytes(stream); 1358 1359 return (struct read_frames_status) { 1360 .error = 0, 1361 .processed_frames = processed_frames, 1362 .total_nread = total_nread, 1363 }; 1364} 1365 1366 1367static ssize_t 1368stream_readf (struct lsquic_stream *stream, 1369 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1370{ 1371 size_t total_nread, nread; 1372 int read_unc_headers; 1373 1374 total_nread = 0; 1375 1376 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1377 == (SMBF_USE_HEADERS|SMBF_IETF) 1378 && !(stream->stream_flags & STREAM_HAVE_UH) 1379 && !stream->uh) 1380 { 1381 if (stream->sm_readable(stream)) 1382 { 1383 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1384 { 1385 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1386 errno = EBADMSG; 1387 return -1; 1388 } 1389 assert(stream->uh); 1390 } 1391 else 1392 { 1393 errno = EWOULDBLOCK; 1394 return -1; 1395 } 1396 } 1397 1398 if (stream->uh) 1399 { 1400 if (stream->uh->uh_flags & UH_H1H) 1401 { 1402 nread = read_uh(stream, readf, ctx); 1403 read_unc_headers = nread > 0; 1404 total_nread += nread; 1405 if (stream->uh) 1406 return total_nread; 1407 } 1408 else 1409 { 1410 LSQ_INFO("header set not claimed: cannot read from stream"); 1411 return -1; 1412 } 1413 } 1414 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1415 && !(stream->stream_flags & STREAM_HAVE_UH)) 1416 { 1417 LSQ_DEBUG("cannot read: headers not available"); 1418 errno = EWOULDBLOCK; 1419 return -1; 1420 } 1421 else 1422 read_unc_headers = 0; 1423 1424 const struct read_frames_status rfs 1425 = read_data_frames(stream, 1, readf, ctx); 1426 if (rfs.error) 1427 return -1; 1428 total_nread += rfs.total_nread; 1429 1430 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 1431 total_nread, stream->read_offset); 1432 1433 if (rfs.processed_frames || read_unc_headers) 1434 { 1435 return total_nread; 1436 } 1437 else 1438 { 1439 assert(0 == total_nread); 1440 errno = EWOULDBLOCK; 1441 return -1; 1442 } 1443} 1444 1445 1446/* This function returns 0 when EOF is reached. 1447 */ 1448ssize_t 1449lsquic_stream_readf (struct lsquic_stream *stream, 1450 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1451{ 1452 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1453 1454 if (lsquic_stream_is_reset(stream)) 1455 { 1456 if (stream->stream_flags & STREAM_RST_RECVD) 1457 stream->stream_flags |= STREAM_RST_READ; 1458 errno = ECONNRESET; 1459 return -1; 1460 } 1461 if (stream->stream_flags & STREAM_U_READ_DONE) 1462 { 1463 errno = EBADF; 1464 return -1; 1465 } 1466 if (stream->stream_flags & STREAM_FIN_REACHED) 1467 { 1468 if (stream->sm_bflags & SMBF_USE_HEADERS) 1469 { 1470 if ((stream->stream_flags & STREAM_HAVE_UH) && !stream->uh) 1471 return 0; 1472 } 1473 else 1474 return 0; 1475 } 1476 1477 return stream_readf(stream, readf, ctx); 1478} 1479 1480 1481struct readv_ctx 1482{ 1483 const struct iovec *iov; 1484 const struct iovec *const end; 1485 unsigned char *p; 1486}; 1487 1488 1489static size_t 1490readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1491{ 1492 struct readv_ctx *const ctx = ctx_p; 1493 const unsigned char *const end = buf + len; 1494 size_t ntocopy; 1495 1496 while (ctx->iov < ctx->end && buf < end) 1497 { 1498 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1499 - ctx->p; 1500 if (ntocopy > (size_t) (end - buf)) 1501 ntocopy = end - buf; 1502 memcpy(ctx->p, buf, ntocopy); 1503 ctx->p += ntocopy; 1504 buf += ntocopy; 1505 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1506 { 1507 do 1508 ++ctx->iov; 1509 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1510 if (ctx->iov < ctx->end) 1511 ctx->p = ctx->iov->iov_base; 1512 else 1513 break; 1514 } 1515 } 1516 1517 return len - (end - buf); 1518} 1519 1520 1521ssize_t 1522lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1523 int iovcnt) 1524{ 1525 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1526 return lsquic_stream_readf(stream, readv_f, &ctx); 1527} 1528 1529 1530ssize_t 1531lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1532{ 1533 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1534 return lsquic_stream_readv(stream, &iov, 1); 1535} 1536 1537 1538static void 1539stream_shutdown_read (lsquic_stream_t *stream) 1540{ 1541 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1542 { 1543 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1544 stream->stream_flags |= STREAM_U_READ_DONE; 1545 stream_wantread(stream, 0); 1546 maybe_finish_stream(stream); 1547 } 1548} 1549 1550 1551static int 1552stream_is_incoming_unidir (const struct lsquic_stream *stream) 1553{ 1554 enum stream_id_type sit; 1555 1556 if (stream->sm_bflags & SMBF_IETF) 1557 { 1558 sit = stream->id & SIT_MASK; 1559 if (stream->sm_bflags & SMBF_SERVER) 1560 return sit == SIT_UNI_CLIENT; 1561 else 1562 return sit == SIT_UNI_SERVER; 1563 } 1564 else 1565 return 0; 1566} 1567 1568 1569static void 1570stream_shutdown_write (lsquic_stream_t *stream) 1571{ 1572 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1573 return; 1574 1575 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1576 stream->stream_flags |= STREAM_U_WRITE_DONE; 1577 stream_wantwrite(stream, 0); 1578 1579 /* Don't bother to check whether there is anything else to write if 1580 * the flags indicate that nothing else should be written. 1581 */ 1582 if (!(stream->sm_bflags & SMBF_CRYPTO) 1583 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1584 && !stream_is_incoming_unidir(stream) 1585 && !(stream->sm_qflags & SMQF_SEND_RST)) 1586 { 1587 if ((stream->sm_bflags & SMBF_USE_HEADERS) 1588 && !(stream->stream_flags & STREAM_HEADERS_SENT)) 1589 { 1590 LSQ_DEBUG("headers not sent, send a reset"); 1591 lsquic_stream_reset(stream, 0); 1592 } 1593 else if (stream->sm_n_buffered == 0) 1594 { 1595 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1596 stream)) 1597 { 1598 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1599 stream->stream_flags |= STREAM_FIN_SENT; 1600 } 1601 else 1602 { 1603 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1604 "flag in it"); 1605 (void) stream_flush_nocheck(stream); 1606 } 1607 } 1608 else 1609 (void) stream_flush_nocheck(stream); 1610 } 1611} 1612 1613 1614static void 1615maybe_stream_shutdown_write (struct lsquic_stream *stream) 1616{ 1617 if (stream->sm_send_headers_state == SSHS_BEGIN) 1618 stream_shutdown_write(stream); 1619 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1620 { 1621 LSQ_DEBUG("shutdown delayed"); 1622 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1623 stream->stream_flags |= STREAM_DELAYED_SW; 1624 } 1625} 1626 1627 1628int 1629lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1630{ 1631 LSQ_DEBUG("shutdown; how: %d", how); 1632 if (lsquic_stream_is_closed(stream)) 1633 { 1634 LSQ_INFO("Attempt to shut down a closed stream"); 1635 errno = EBADF; 1636 return -1; 1637 } 1638 /* 0: read, 1: write: 2: read and write 1639 */ 1640 if (how < 0 || how > 2) 1641 { 1642 errno = EINVAL; 1643 return -1; 1644 } 1645 1646 if (how) 1647 maybe_stream_shutdown_write(stream); 1648 if (how != 1) 1649 stream_shutdown_read(stream); 1650 1651 maybe_finish_stream(stream); 1652 maybe_schedule_call_on_close(stream); 1653 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1654 maybe_conn_to_tickable_if_writeable(stream, 1); 1655 1656 return 0; 1657} 1658 1659 1660void 1661lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1662{ 1663 LSQ_DEBUG("internal shutdown"); 1664 if (lsquic_stream_is_critical(stream)) 1665 { 1666 LSQ_DEBUG("add flag to force-finish special stream"); 1667 stream->stream_flags |= STREAM_FORCE_FINISH; 1668 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1669 } 1670 maybe_finish_stream(stream); 1671 maybe_schedule_call_on_close(stream); 1672} 1673 1674 1675static void 1676fake_reset_unused_stream (lsquic_stream_t *stream) 1677{ 1678 stream->stream_flags |= 1679 STREAM_RST_RECVD /* User will pick this up on read or write */ 1680 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1681 ; 1682 1683 /* Cancel all writes to the network scheduled for this stream: */ 1684 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1685 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1686 next_send_stream); 1687 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1688 drop_buffered_data(stream); 1689 LSQ_DEBUG("fake-reset stream%s", 1690 stream_stalled(stream) ? " (stalled)" : ""); 1691 maybe_finish_stream(stream); 1692 maybe_schedule_call_on_close(stream); 1693} 1694 1695 1696/* This function should only be called for locally-initiated streams whose ID 1697 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1698 * frame sent by peer but we have not yet received it and created a stream. 1699 * In this situation, we mark the stream as reset, so that user's on_read or 1700 * on_write event callback picks up the error. That, in turn, should result 1701 * in stream being closed. 1702 * 1703 * If we have received any data frames on this stream, this probably indicates 1704 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1705 * lower than this. However, we still try to handle it gracefully and peform 1706 * a shutdown, as if the stream was not reset. 1707 */ 1708void 1709lsquic_stream_received_goaway (lsquic_stream_t *stream) 1710{ 1711 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1712 if (0 == stream->read_offset && 1713 stream->data_in->di_if->di_empty(stream->data_in)) 1714 fake_reset_unused_stream(stream); /* Normal condition */ 1715 else 1716 { /* This is odd, let's handle it the best we can: */ 1717 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1718 lsquic_stream_shutdown_internal(stream); 1719 } 1720} 1721 1722 1723uint64_t 1724lsquic_stream_read_offset (const lsquic_stream_t *stream) 1725{ 1726 return stream->read_offset; 1727} 1728 1729 1730static int 1731stream_wantread (lsquic_stream_t *stream, int is_want) 1732{ 1733 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 1734 const int new_val = !!is_want; 1735 if (old_val != new_val) 1736 { 1737 if (new_val) 1738 { 1739 if (!old_val) 1740 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1741 next_read_stream); 1742 stream->sm_qflags |= SMQF_WANT_READ; 1743 } 1744 else 1745 { 1746 stream->sm_qflags &= ~SMQF_WANT_READ; 1747 if (old_val) 1748 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1749 next_read_stream); 1750 } 1751 } 1752 return old_val; 1753} 1754 1755 1756static void 1757maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1758{ 1759 assert(SMQF_WRITE_Q_FLAGS & flag); 1760 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1761 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1762 next_write_stream); 1763 stream->sm_qflags |= flag; 1764} 1765 1766 1767static void 1768maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1769{ 1770 assert(SMQF_WRITE_Q_FLAGS & flag); 1771 if (stream->sm_qflags & flag) 1772 { 1773 stream->sm_qflags &= ~flag; 1774 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1775 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1776 next_write_stream); 1777 } 1778} 1779 1780 1781static int 1782stream_wantwrite (struct lsquic_stream *stream, int new_val) 1783{ 1784 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1785 1786 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 1787 1788 if (old_val != new_val) 1789 { 1790 if (new_val) 1791 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 1792 else 1793 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 1794 } 1795 return old_val; 1796} 1797 1798 1799int 1800lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1801{ 1802 SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want); 1803 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1804 { 1805 if (is_want) 1806 maybe_conn_to_tickable_if_readable(stream); 1807 return stream_wantread(stream, is_want); 1808 } 1809 else 1810 { 1811 errno = EBADF; 1812 return -1; 1813 } 1814} 1815 1816 1817int 1818lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1819{ 1820 int old_val; 1821 1822 is_want = !!is_want; 1823 1824 SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want); 1825 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 1826 && SSHS_BEGIN == stream->sm_send_headers_state) 1827 { 1828 stream->sm_saved_want_write = is_want; 1829 if (is_want) 1830 maybe_conn_to_tickable_if_writeable(stream, 1); 1831 return stream_wantwrite(stream, is_want); 1832 } 1833 else if (SSHS_BEGIN != stream->sm_send_headers_state) 1834 { 1835 old_val = stream->sm_saved_want_write; 1836 stream->sm_saved_want_write = is_want; 1837 return old_val; 1838 } 1839 else 1840 { 1841 errno = EBADF; 1842 return -1; 1843 } 1844} 1845 1846 1847struct progress 1848{ 1849 enum stream_flags s_flags; 1850 enum stream_q_flags q_flags; 1851}; 1852 1853 1854static struct progress 1855stream_progress (const struct lsquic_stream *stream) 1856{ 1857 return (struct progress) { 1858 .s_flags = stream->stream_flags 1859 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 1860 .q_flags = stream->sm_qflags 1861 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 1862 }; 1863} 1864 1865 1866static int 1867progress_eq (struct progress a, struct progress b) 1868{ 1869 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 1870} 1871 1872 1873static void 1874stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1875{ 1876 unsigned no_progress_count, no_progress_limit; 1877 struct progress progress; 1878 uint64_t size; 1879 1880 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1881 1882 no_progress_count = 0; 1883 while ((stream->sm_qflags & SMQF_WANT_READ) 1884 && lsquic_stream_readable(stream)) 1885 { 1886 progress = stream_progress(stream); 1887 size = stream->read_offset; 1888 1889 stream->stream_if->on_read(stream, stream->st_ctx); 1890 1891 if (no_progress_limit && size == stream->read_offset && 1892 progress_eq(progress, stream_progress(stream))) 1893 { 1894 ++no_progress_count; 1895 if (no_progress_count >= no_progress_limit) 1896 { 1897 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1898 "progress) in user code reading from stream", 1899 no_progress_count, 1900 no_progress_count == 1 ? "" : "s"); 1901 break; 1902 } 1903 } 1904 else 1905 no_progress_count = 0; 1906 } 1907} 1908 1909 1910static void 1911stream_hblock_sent (struct lsquic_stream *stream) 1912{ 1913 int want_write; 1914 1915 LSQ_DEBUG("header block has been sent: restore default behavior"); 1916 stream->sm_send_headers_state = SSHS_BEGIN; 1917 stream->sm_write_avail = stream_write_avail_with_frames; 1918 1919 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1920 if (want_write != stream->sm_saved_want_write) 1921 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 1922 1923 if (stream->stream_flags & STREAM_DELAYED_SW) 1924 { 1925 LSQ_DEBUG("performing delayed shutdown write"); 1926 stream->stream_flags &= ~STREAM_DELAYED_SW; 1927 stream_shutdown_write(stream); 1928 maybe_schedule_call_on_close(stream); 1929 maybe_finish_stream(stream); 1930 maybe_conn_to_tickable_if_writeable(stream, 1); 1931 } 1932} 1933 1934 1935static void 1936on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 1937{ 1938 ssize_t nw; 1939 1940 nw = stream_write_buf(stream, 1941 stream->sm_header_block + stream->sm_hblock_off, 1942 stream->sm_hblock_sz - stream->sm_hblock_off); 1943 if (nw > 0) 1944 { 1945 stream->sm_hblock_off += nw; 1946 if (stream->sm_hblock_off == stream->sm_hblock_sz) 1947 { 1948 stream->stream_flags |= STREAM_HEADERS_SENT; 1949 free(stream->sm_header_block); 1950 stream->sm_header_block = NULL; 1951 stream->sm_hblock_sz = 0; 1952 stream_hblock_sent(stream); 1953 LSQ_DEBUG("header block written out successfully"); 1954 /* TODO: if there was eos, do something else */ 1955 if (stream->sm_qflags & SMQF_WANT_WRITE) 1956 stream->stream_if->on_write(stream, h); 1957 } 1958 else 1959 { 1960 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 1961 nw); 1962 } 1963 } 1964 else if (nw < 0) 1965 { 1966 /* XXX What should happen if we hit an error? TODO */ 1967 } 1968} 1969 1970 1971static void 1972(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 1973 lsquic_stream_ctx_t *) 1974{ 1975 if (0 == (stream->stream_flags & STREAM_PUSHING) 1976 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 1977 /* Common case */ 1978 return stream->stream_if->on_write; 1979 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 1980 return on_write_header_wrapper; 1981 else 1982 { 1983 assert(stream->stream_flags & STREAM_PUSHING); 1984 if (stream_is_pushing_promise(stream)) 1985 return on_write_pp_wrapper; 1986 else 1987 return stream->stream_if->on_write; 1988 } 1989} 1990 1991 1992static void 1993stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1994{ 1995 unsigned no_progress_count, no_progress_limit; 1996 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1997 struct progress progress; 1998 1999 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 2000 2001 no_progress_count = 0; 2002 stream->stream_flags |= STREAM_LAST_WRITE_OK; 2003 while ((stream->sm_qflags & SMQF_WANT_WRITE) 2004 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 2005 && lsquic_stream_write_avail(stream)) 2006 { 2007 progress = stream_progress(stream); 2008 2009 on_write = select_on_write(stream); 2010 on_write(stream, stream->st_ctx); 2011 2012 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 2013 { 2014 ++no_progress_count; 2015 if (no_progress_count >= no_progress_limit) 2016 { 2017 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 2018 "progress) in user code writing to stream", 2019 no_progress_count, 2020 no_progress_count == 1 ? "" : "s"); 2021 break; 2022 } 2023 } 2024 else 2025 no_progress_count = 0; 2026 } 2027} 2028 2029 2030static void 2031stream_dispatch_read_events_once (lsquic_stream_t *stream) 2032{ 2033 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 2034 { 2035 stream->stream_if->on_read(stream, stream->st_ctx); 2036 } 2037} 2038 2039 2040uint64_t 2041lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 2042{ 2043 size_t frames_sizes; 2044 2045 frames_sizes = active_hq_frame_sizes(stream); 2046 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 2047} 2048 2049 2050static void 2051maybe_mark_as_blocked (lsquic_stream_t *stream) 2052{ 2053 struct lsquic_conn_cap *cc; 2054 uint64_t used; 2055 2056 used = lsquic_stream_combined_send_off(stream); 2057 if (stream->max_send_off == used) 2058 { 2059 if (stream->blocked_off < stream->max_send_off) 2060 { 2061 stream->blocked_off = used; 2062 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 2063 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2064 next_send_stream); 2065 stream->sm_qflags |= SMQF_SEND_BLOCKED; 2066 LSQ_DEBUG("marked stream-blocked at stream offset " 2067 "%"PRIu64, stream->blocked_off); 2068 } 2069 else 2070 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 2071 " has been, or is about to be, sent", stream->blocked_off); 2072 } 2073 2074 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 2075 && (cc = &stream->conn_pub->conn_cap, 2076 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 2077 { 2078 if (cc->cc_blocked < cc->cc_max) 2079 { 2080 cc->cc_blocked = cc->cc_max; 2081 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 2082 LSQ_DEBUG("marked connection-blocked at connection offset " 2083 "%"PRIu64, cc->cc_max); 2084 } 2085 else 2086 LSQ_DEBUG("stream has already been marked connection-blocked " 2087 "at offset %"PRIu64, cc->cc_blocked); 2088 } 2089} 2090 2091 2092void 2093lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 2094{ 2095 assert(stream->sm_qflags & SMQF_WANT_READ); 2096 2097 if (stream->sm_bflags & SMBF_RW_ONCE) 2098 stream_dispatch_read_events_once(stream); 2099 else 2100 stream_dispatch_read_events_loop(stream); 2101} 2102 2103 2104void 2105lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 2106{ 2107 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2108 int progress; 2109 uint64_t tosend_off; 2110 unsigned short n_buffered; 2111 enum stream_q_flags q_flags; 2112 2113 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 2114 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 2115 tosend_off = stream->tosend_off; 2116 n_buffered = stream->sm_n_buffered; 2117 2118 if (stream->sm_qflags & SMQF_WANT_FLUSH) 2119 (void) stream_flush(stream); 2120 2121 if (stream->sm_bflags & SMBF_RW_ONCE) 2122 { 2123 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2124 && lsquic_stream_write_avail(stream)) 2125 { 2126 on_write = select_on_write(stream); 2127 on_write(stream, stream->st_ctx); 2128 } 2129 } 2130 else 2131 stream_dispatch_write_events_loop(stream); 2132 2133 /* Progress means either flags or offsets changed: */ 2134 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2135 stream->tosend_off == tosend_off && 2136 stream->sm_n_buffered == n_buffered); 2137 2138 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2139 { 2140 if (progress) 2141 { /* Move the stream to the end of the list to ensure fairness. */ 2142 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2143 next_write_stream); 2144 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2145 next_write_stream); 2146 } 2147 } 2148} 2149 2150 2151static size_t 2152inner_reader_empty_size (void *ctx) 2153{ 2154 return 0; 2155} 2156 2157 2158static size_t 2159inner_reader_empty_read (void *ctx, void *buf, size_t count) 2160{ 2161 return 0; 2162} 2163 2164 2165static int 2166stream_flush (lsquic_stream_t *stream) 2167{ 2168 struct lsquic_reader empty_reader; 2169 ssize_t nw; 2170 2171 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2172 assert(stream->sm_n_buffered > 0 || 2173 /* Flushing is also used to packetize standalone FIN: */ 2174 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2175 == STREAM_U_WRITE_DONE)); 2176 2177 empty_reader.lsqr_size = inner_reader_empty_size; 2178 empty_reader.lsqr_read = inner_reader_empty_read; 2179 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2180 nw = stream_write_to_packets(stream, &empty_reader, 0); 2181 2182 if (nw >= 0) 2183 { 2184 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2185 return 0; 2186 } 2187 else 2188 return -1; 2189} 2190 2191 2192static int 2193stream_flush_nocheck (lsquic_stream_t *stream) 2194{ 2195 size_t frames; 2196 2197 frames = active_hq_frame_sizes(stream); 2198 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2199 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2200 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2201 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2202 2203 return stream_flush(stream); 2204} 2205 2206 2207int 2208lsquic_stream_flush (lsquic_stream_t *stream) 2209{ 2210 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2211 { 2212 LSQ_DEBUG("cannot flush closed stream"); 2213 errno = EBADF; 2214 return -1; 2215 } 2216 2217 if (0 == stream->sm_n_buffered) 2218 { 2219 LSQ_DEBUG("flushing 0 bytes: noop"); 2220 return 0; 2221 } 2222 2223 return stream_flush_nocheck(stream); 2224} 2225 2226 2227static size_t 2228stream_get_n_allowed (const struct lsquic_stream *stream) 2229{ 2230 if (stream->sm_n_allocated) 2231 return stream->sm_n_allocated; 2232 else 2233 return stream->conn_pub->path->np_pack_size; 2234} 2235 2236 2237/* The flush threshold is the maximum size of stream data that can be sent 2238 * in a full packet. 2239 */ 2240#ifdef NDEBUG 2241static 2242#endif 2243 size_t 2244lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2245 unsigned data_sz) 2246{ 2247 enum packet_out_flags flags; 2248 enum packno_bits bits; 2249 size_t packet_header_sz, stream_header_sz, tag_len; 2250 size_t threshold; 2251 2252 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 2253 flags = bits << POBIT_SHIFT; 2254 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2255 flags |= PO_CONN_ID; 2256 if (stream_is_hsk(stream)) 2257 flags |= PO_LONGHEAD; 2258 2259 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2260 stream->conn_pub->path->np_dcid.len); 2261 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2262 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2263 2264 threshold = stream_get_n_allowed(stream) - tag_len 2265 - packet_header_sz - stream_header_sz; 2266 return threshold; 2267} 2268 2269 2270#define COMMON_WRITE_CHECKS() do { \ 2271 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2272 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2273 { \ 2274 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2275 { \ 2276 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2277 return 0; \ 2278 } \ 2279 else \ 2280 { \ 2281 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2282 "headers"); \ 2283 errno = EILSEQ; \ 2284 return -1; \ 2285 } \ 2286 } \ 2287 if (lsquic_stream_is_reset(stream)) \ 2288 { \ 2289 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2290 errno = ECONNRESET; \ 2291 return -1; \ 2292 } \ 2293 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2294 { \ 2295 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2296 "writing"); \ 2297 errno = EBADF; \ 2298 return -1; \ 2299 } \ 2300} while (0) 2301 2302 2303struct frame_gen_ctx 2304{ 2305 lsquic_stream_t *fgc_stream; 2306 struct lsquic_reader *fgc_reader; 2307 /* We keep our own count of how many bytes were read from reader because 2308 * some readers are external. The external caller does not have to rely 2309 * on our count, but it can. 2310 */ 2311 size_t fgc_nread_from_reader; 2312 size_t (*fgc_size) (void *ctx); 2313 int (*fgc_fin) (void *ctx); 2314 gsf_read_f fgc_read; 2315}; 2316 2317 2318static size_t 2319frame_std_gen_size (void *ctx) 2320{ 2321 struct frame_gen_ctx *fg_ctx = ctx; 2322 size_t available, remaining; 2323 2324 /* Make sure we are not writing past available size: */ 2325 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2326 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2327 if (available < remaining) 2328 remaining = available; 2329 2330 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2331} 2332 2333 2334static size_t 2335stream_hq_frame_size (const struct stream_hq_frame *shf) 2336{ 2337 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2338 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2339 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2340 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2341 else 2342 { 2343 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2344 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2345 return 0; 2346 } 2347} 2348 2349 2350static size_t 2351active_hq_frame_sizes (const struct lsquic_stream *stream) 2352{ 2353 const struct stream_hq_frame *shf; 2354 size_t size; 2355 2356 size = 0; 2357 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2358 == (SMBF_IETF|SMBF_USE_HEADERS)) 2359 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2360 if (!(shf->shf_flags & SHF_WRITTEN)) 2361 size += stream_hq_frame_size(shf); 2362 2363 return size; 2364} 2365 2366 2367static uint64_t 2368stream_hq_frame_end (const struct stream_hq_frame *shf) 2369{ 2370 if (shf->shf_flags & SHF_FIXED_SIZE) 2371 return shf->shf_off + shf->shf_frame_size; 2372 else if (shf->shf_flags & SHF_TWO_BYTES) 2373 return shf->shf_off + ((1 << 14) - 1); 2374 else 2375 return shf->shf_off + ((1 << 6) - 1); 2376} 2377 2378 2379static int 2380frame_in_stream (const struct lsquic_stream *stream, 2381 const struct stream_hq_frame *shf) 2382{ 2383 return shf >= stream->sm_hq_frame_arr 2384 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2385 / sizeof(stream->sm_hq_frame_arr[0]) 2386 ; 2387} 2388 2389 2390static void 2391stream_hq_frame_put (struct lsquic_stream *stream, 2392 struct stream_hq_frame *shf) 2393{ 2394 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2395 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2396 if (frame_in_stream(stream, shf)) 2397 memset(shf, 0, sizeof(*shf)); 2398 else 2399 lsquic_malo_put(shf); 2400} 2401 2402 2403static void 2404stream_hq_frame_close (struct lsquic_stream *stream, 2405 struct stream_hq_frame *shf) 2406{ 2407 unsigned bits; 2408 2409 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2410 " (actual offset %"PRIu64")", shf->shf_frame_type, 2411 stream->sm_payload, stream->tosend_off); 2412 assert(shf->shf_flags & SHF_ACTIVE); 2413 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2414 { 2415 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2416 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2417 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2418 bits, 1 << bits); 2419 } 2420 stream_hq_frame_put(stream, shf); 2421} 2422 2423 2424static size_t 2425frame_hq_gen_size (void *ctx) 2426{ 2427 struct frame_gen_ctx *fg_ctx = ctx; 2428 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2429 size_t available, remaining, frames; 2430 const struct stream_hq_frame *shf; 2431 2432 frames = 0; 2433 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2434 if (shf->shf_off >= stream->sm_payload) 2435 frames += stream_hq_frame_size(shf); 2436 2437 /* Make sure we are not writing past available size: */ 2438 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2439 available = lsquic_stream_write_avail(stream); 2440 if (available < remaining) 2441 remaining = available; 2442 2443 return remaining + stream->sm_n_buffered + frames; 2444} 2445 2446 2447static int 2448frame_std_gen_fin (void *ctx) 2449{ 2450 struct frame_gen_ctx *fg_ctx = ctx; 2451 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2452 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2453 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2454 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2455 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2456} 2457 2458 2459static void 2460incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2461{ 2462 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2463 { 2464 stream->conn_pub->conn_cap.cc_sent += incr; 2465 assert(stream->conn_pub->conn_cap.cc_sent 2466 <= stream->conn_pub->conn_cap.cc_max); 2467 } 2468} 2469 2470 2471void 2472incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2473{ 2474 stream->sm_payload += incr; 2475 stream->tosend_off += incr; 2476 assert(stream->tosend_off <= stream->max_send_off); 2477} 2478 2479 2480static size_t 2481frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2482{ 2483 struct frame_gen_ctx *fg_ctx = ctx; 2484 unsigned char *p = begin_buf; 2485 unsigned char *const end = p + len; 2486 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2487 size_t n_written, available, n_to_write; 2488 2489 if (stream->sm_n_buffered > 0) 2490 { 2491 if (len <= stream->sm_n_buffered) 2492 { 2493 memcpy(p, stream->sm_buf, len); 2494 memmove(stream->sm_buf, stream->sm_buf + len, 2495 stream->sm_n_buffered - len); 2496 stream->sm_n_buffered -= len; 2497 if (0 == stream->sm_n_buffered) 2498 maybe_resize_stream_buffer(stream); 2499 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2500 incr_sm_payload(stream, len); 2501 *fin = fg_ctx->fgc_fin(fg_ctx); 2502 return len; 2503 } 2504 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2505 p += stream->sm_n_buffered; 2506 stream->sm_n_buffered = 0; 2507 maybe_resize_stream_buffer(stream); 2508 } 2509 2510 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2511 n_to_write = end - p; 2512 if (n_to_write > available) 2513 n_to_write = available; 2514 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2515 n_to_write); 2516 p += n_written; 2517 fg_ctx->fgc_nread_from_reader += n_written; 2518 *fin = fg_ctx->fgc_fin(fg_ctx); 2519 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2520 incr_conn_cap(stream, n_written); 2521 return p - (const unsigned char *) begin_buf; 2522} 2523 2524 2525static struct stream_hq_frame * 2526find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2527{ 2528 struct stream_hq_frame *shf; 2529 2530 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2531 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2532 return shf; 2533 2534 return NULL; 2535} 2536 2537 2538static struct stream_hq_frame * 2539find_cur_hq_frame (const struct lsquic_stream *stream) 2540{ 2541 return find_hq_frame(stream, stream->sm_payload); 2542} 2543 2544 2545static struct stream_hq_frame * 2546open_hq_frame (struct lsquic_stream *stream) 2547{ 2548 struct stream_hq_frame *shf; 2549 2550 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2551 + sizeof(stream->sm_hq_frame_arr) 2552 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2553 if (!(shf->shf_flags & SHF_ACTIVE)) 2554 goto found; 2555 2556 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2557 if (!shf) 2558 { 2559 LSQ_WARN("cannot allocate HQ frame"); 2560 return NULL; 2561 } 2562 memset(shf, 0, sizeof(*shf)); 2563 2564 found: 2565 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2566 shf->shf_flags = SHF_ACTIVE; 2567 return shf; 2568} 2569 2570 2571static struct stream_hq_frame * 2572stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2573 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2574{ 2575 struct stream_hq_frame *shf; 2576 2577 shf = open_hq_frame(stream); 2578 if (!shf) 2579 { 2580 LSQ_WARN("could not open HQ frame"); 2581 return NULL; 2582 } 2583 2584 shf->shf_off = off; 2585 shf->shf_flags |= flags; 2586 shf->shf_frame_type = frame_type; 2587 if (shf->shf_flags & SHF_FIXED_SIZE) 2588 { 2589 shf->shf_frame_size = size; 2590 LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset " 2591 "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size); 2592 } 2593 else 2594 { 2595 shf->shf_frame_ptr = NULL; 2596 if (size >= (1 << 6)) 2597 shf->shf_flags |= SHF_TWO_BYTES; 2598 LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset " 2599 "%"PRIu64, shf->shf_frame_type, shf->shf_off); 2600 } 2601 2602 return shf; 2603} 2604 2605 2606static size_t 2607frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2608{ 2609 struct frame_gen_ctx *fg_ctx = ctx; 2610 unsigned char *p = begin_buf; 2611 unsigned char *const end = p + len; 2612 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2613 struct stream_hq_frame *shf; 2614 size_t nw, frame_sz, avail, rem; 2615 unsigned bits; 2616 2617 while (p < end) 2618 { 2619 shf = find_cur_hq_frame(stream); 2620 if (shf) 2621 LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64, 2622 shf->shf_frame_type, shf->shf_off); 2623 else 2624 { 2625 rem = frame_std_gen_size(ctx); 2626 if (rem) 2627 { 2628 if (rem > ((1 << 14) - 1)) 2629 rem = (1 << 14) - 1; 2630 shf = stream_activate_hq_frame(stream, 2631 stream->sm_payload, HQFT_DATA, 0, rem); 2632 if (shf) 2633 goto insert; 2634 else 2635 { 2636 /* TODO: abort connection? Handle failure somehow */ 2637 break; 2638 } 2639 } 2640 else 2641 break; 2642 } 2643 if (shf->shf_off == stream->sm_payload 2644 && !(shf->shf_flags & SHF_WRITTEN)) 2645 { 2646 insert: 2647 frame_sz = stream_hq_frame_size(shf); 2648 if (frame_sz > (uintptr_t) (end - p)) 2649 { 2650 stream_hq_frame_put(stream, shf); 2651 break; 2652 } 2653 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2654 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2655 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2656 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2657 { 2658 shf->shf_frame_ptr = p; 2659 memset(p, 0, frame_sz); 2660 p += frame_sz; 2661 } 2662 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2663 == SHF_FIXED_SIZE) 2664 { 2665 *p++ = shf->shf_frame_type; 2666 bits = vint_val2bits(shf->shf_frame_size); 2667 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2668 p += 1 << bits; 2669 } 2670 else 2671 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2672 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2673 if (!(shf->shf_flags & SHF_CC_PAID)) 2674 { 2675 incr_conn_cap(stream, frame_sz); 2676 shf->shf_flags |= SHF_CC_PAID; 2677 } 2678 shf->shf_flags |= SHF_WRITTEN; 2679 stream->tosend_off += frame_sz; 2680 assert(stream->tosend_off <= stream->max_send_off); 2681 } 2682 else 2683 { 2684 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 2685 len = stream_hq_frame_end(shf) - stream->sm_payload; 2686 assert(len); 2687 if (len > (unsigned) (end - p)) 2688 len = end - p; 2689 if (len > avail) 2690 len = avail; 2691 if (!len) 2692 break; 2693 nw = frame_std_gen_read(ctx, p, len, fin); 2694 p += nw; 2695 if (nw < len) 2696 break; 2697 if (stream_hq_frame_end(shf) == stream->sm_payload) 2698 stream_hq_frame_close(stream, shf); 2699 } 2700 } 2701 2702 return p - (unsigned char *) begin_buf; 2703} 2704 2705 2706static size_t 2707crypto_frame_gen_read (void *ctx, void *buf, size_t len) 2708{ 2709 int fin_ignored; 2710 2711 return frame_std_gen_read(ctx, buf, len, &fin_ignored); 2712} 2713 2714 2715static void 2716check_flush_threshold (lsquic_stream_t *stream) 2717{ 2718 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 2719 stream->tosend_off >= stream->sm_flush_to) 2720 { 2721 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 2722 stream->sm_flush_to); 2723 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 2724 } 2725} 2726 2727 2728#if LSQUIC_EXTRA_CHECKS 2729static void 2730verify_conn_cap (const struct lsquic_conn_public *conn_pub) 2731{ 2732 const struct lsquic_stream *stream; 2733 struct lsquic_hash_elem *el; 2734 unsigned n_buffered; 2735 2736 if (!conn_pub->all_streams) 2737 /* TODO: enable this check for unit tests as well */ 2738 return; 2739 2740 n_buffered = 0; 2741 for (el = lsquic_hash_first(conn_pub->all_streams); el; 2742 el = lsquic_hash_next(conn_pub->all_streams)) 2743 { 2744 stream = lsquic_hashelem_getdata(el); 2745 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2746 n_buffered += stream->sm_n_buffered; 2747 } 2748 2749 assert(n_buffered + conn_pub->stream_frame_bytes 2750 == conn_pub->conn_cap.cc_sent); 2751} 2752 2753 2754#endif 2755 2756 2757static int 2758write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 2759 struct lsquic_packet_out *packet_out) 2760{ 2761 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2762 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2763 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2764 unsigned off; 2765 int len, s; 2766 2767#if LSQUIC_CONN_STATS || LSQUIC_EXTRA_CHECKS 2768 const uint64_t begin_off = stream->tosend_off; 2769#endif 2770 off = packet_out->po_data_sz; 2771 len = pf->pf_gen_stream_frame( 2772 packet_out->po_data + packet_out->po_data_sz, 2773 lsquic_packet_out_avail(packet_out), stream->id, 2774 stream->tosend_off, 2775 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 2776 if (len < 0) 2777 return len; 2778 2779#if LSQUIC_CONN_STATS 2780 stream->conn_pub->conn_stats->out.stream_frames += 1; 2781 stream->conn_pub->conn_stats->out.stream_data_sz 2782 += stream->tosend_off - begin_off; 2783#endif 2784 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 2785 packet_out->po_data + packet_out->po_data_sz, len); 2786 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2787 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 2788 if (0 == lsquic_packet_out_avail(packet_out)) 2789 packet_out->po_flags |= PO_STREAM_END; 2790 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2791 stream, QUIC_FRAME_STREAM, off, len); 2792 if (s != 0) 2793 { 2794 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 2795 return -1; 2796 } 2797#if LSQUIC_EXTRA_CHECKS 2798 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2799 { 2800 stream->conn_pub->stream_frame_bytes += stream->tosend_off - begin_off; 2801 verify_conn_cap(stream->conn_pub); 2802 } 2803#endif 2804 2805 check_flush_threshold(stream); 2806 return len; 2807} 2808 2809 2810static enum swtp_status 2811stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 2812{ 2813 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2814 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2815 struct lsquic_packet_out *packet_out; 2816 int len; 2817 2818 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 2819 stream->conn_pub->path); 2820 if (!packet_out) 2821 return SWTP_STOP; 2822 packet_out->po_header_type = stream->tosend_off == 0 2823 ? HETY_INITIAL : HETY_HANDSHAKE; 2824 2825 len = write_stream_frame(fg_ctx, size, packet_out); 2826 2827 if (len > 0) 2828 { 2829 packet_out->po_flags |= PO_HELLO; 2830 lsquic_packet_out_zero_pad(packet_out); 2831 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 2832 return SWTP_OK; 2833 } 2834 else 2835 return SWTP_ERROR; 2836} 2837 2838 2839static enum swtp_status 2840stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 2841{ 2842 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2843 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2844 unsigned stream_header_sz, need_at_least; 2845 struct lsquic_packet_out *packet_out; 2846 struct lsquic_stream *headers_stream; 2847 int len; 2848 2849 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 2850 == STREAM_HEADERS_SENT) 2851 { 2852 if (stream->sm_bflags & SMBF_IETF) 2853 { 2854 if (stream->stream_flags & STREAM_ENCODER_DEP) 2855 headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out; 2856 else 2857 headers_stream = NULL; 2858 } 2859 else 2860 headers_stream = 2861 lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 2862 if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream)) 2863 { 2864 LSQ_DEBUG("flushing headers stream before packetizing stream data"); 2865 (void) lsquic_stream_flush(headers_stream); 2866 } 2867 /* If there is nothing to flush, some other stream must have flushed it: 2868 * this means our headers are flushed. Either way, only do this once. 2869 */ 2870 stream->stream_flags |= STREAM_HDRS_FLUSHED; 2871 } 2872 2873 stream_header_sz = stream->sm_frame_header_sz(stream, size); 2874 need_at_least = stream_header_sz; 2875 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2876 == (SMBF_IETF|SMBF_USE_HEADERS)) 2877 { 2878 if (size > 0) 2879 need_at_least += 3; /* Enough room for HTTP/3 frame */ 2880 } 2881 else 2882 need_at_least += size > 0; 2883 get_packet: 2884 packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl, 2885 need_at_least, stream->conn_pub->path, stream); 2886 if (packet_out) 2887 { 2888 len = write_stream_frame(fg_ctx, size, packet_out); 2889 if (len > 0) 2890 return SWTP_OK; 2891 assert(len < 0); 2892 if (-len > (int) need_at_least) 2893 { 2894 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 2895 "%u bytes, will try again", -len, need_at_least); 2896 need_at_least = -len; 2897 goto get_packet; 2898 } 2899 return SWTP_ERROR; 2900 } 2901 else 2902 return SWTP_STOP; 2903} 2904 2905 2906/* Use for IETF crypto streams and gQUIC crypto stream for versions >= Q050. */ 2907static enum swtp_status 2908stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 2909{ 2910 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2911 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2912 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2913 unsigned crypto_header_sz, need_at_least; 2914 struct lsquic_packet_out *packet_out; 2915 unsigned short off; 2916 enum packnum_space pns; 2917 int len, s; 2918 2919 if (stream->sm_bflags & SMBF_IETF) 2920 pns = lsquic_enclev2pns[ crypto_level(stream) ]; 2921 else 2922 pns = PNS_APP; 2923 2924 assert(size > 0); 2925 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 2926 need_at_least = crypto_header_sz + 1; 2927 2928 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 2929 need_at_least, pns, stream->conn_pub->path); 2930 if (!packet_out) 2931 return SWTP_STOP; 2932 2933 off = packet_out->po_data_sz; 2934 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 2935 lsquic_packet_out_avail(packet_out), stream->tosend_off, 2936 size, crypto_frame_gen_read, fg_ctx); 2937 if (len < 0) 2938 return len; 2939 2940 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 2941 packet_out->po_data + packet_out->po_data_sz, len); 2942 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2943 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 2944 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2945 stream, QUIC_FRAME_CRYPTO, off, len); 2946 if (s != 0) 2947 { 2948 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 2949 return -1; 2950 } 2951 2952 packet_out->po_flags |= PO_HELLO; 2953 2954 if (!(stream->sm_bflags & SMBF_IETF)) 2955 { 2956 const unsigned short before = packet_out->po_data_sz; 2957 lsquic_packet_out_zero_pad(packet_out); 2958 /* XXX: too hacky */ 2959 if (before < packet_out->po_data_sz) 2960 send_ctl->sc_bytes_scheduled += packet_out->po_data_sz - before; 2961 } 2962 2963 check_flush_threshold(stream); 2964 return SWTP_OK; 2965} 2966 2967 2968static void 2969abort_connection (struct lsquic_stream *stream) 2970{ 2971 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 2972 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 2973 next_service_stream); 2974 stream->sm_qflags |= SMQF_ABORT_CONN; 2975 LSQ_WARN("connection will be aborted"); 2976 maybe_conn_to_tickable(stream); 2977} 2978 2979 2980static void 2981maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 2982{ 2983 struct stream_hq_frame *shf; 2984 uint64_t size; 2985 unsigned bits; 2986 2987 shf = find_cur_hq_frame(stream); 2988 if (!shf) 2989 return; 2990 2991 if (shf->shf_flags & SHF_FIXED_SIZE) 2992 { 2993 if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload) 2994 stream_hq_frame_put(stream, shf); 2995 return; 2996 } 2997 2998 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2999 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 3000 if (size <= VINT_MAX_B(bits) && shf->shf_frame_ptr) 3001 { 3002 if (0 == stream->sm_n_buffered) 3003 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 3004 shf->shf_frame_type, size); 3005 else 3006 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 3007 shf->shf_frame_type, size); 3008 shf->shf_frame_ptr[0] = shf->shf_frame_type; 3009 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 3010 if (0 == stream->sm_n_buffered) 3011 stream_hq_frame_put(stream, shf); 3012 else 3013 { 3014 shf->shf_frame_size = size; 3015 shf->shf_flags |= SHF_FIXED_SIZE; 3016 } 3017 } 3018 else if (!shf->shf_frame_ptr) 3019 LSQ_DEBUG("HQ frame of type 0x%X has not yet been written, not " 3020 "closing", shf->shf_frame_type); 3021 else 3022 { 3023 assert(stream->sm_n_buffered); 3024 LSQ_ERROR("cannot close frame of size %"PRIu64" on stream %"PRIu64 3025 " -- too large", size, stream->id); 3026 stream->conn_pub->lconn->cn_if->ci_internal_error( 3027 stream->conn_pub->lconn, "HTTP/3 frame too large"); 3028 stream_hq_frame_put(stream, shf); 3029 } 3030} 3031 3032 3033static ssize_t 3034stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 3035 size_t thresh) 3036{ 3037 size_t size; 3038 ssize_t nw; 3039 unsigned seen_ok; 3040 int use_framing; 3041 struct frame_gen_ctx fg_ctx = { 3042 .fgc_stream = stream, 3043 .fgc_reader = reader, 3044 .fgc_nread_from_reader = 0, 3045 }; 3046 3047 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3048 == (SMBF_IETF|SMBF_USE_HEADERS); 3049 if (use_framing) 3050 { 3051 fg_ctx.fgc_size = frame_hq_gen_size; 3052 fg_ctx.fgc_read = frame_hq_gen_read; 3053 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 3054 } 3055 else 3056 { 3057 fg_ctx.fgc_size = frame_std_gen_size; 3058 fg_ctx.fgc_read = frame_std_gen_read; 3059 fg_ctx.fgc_fin = frame_std_gen_fin; 3060 } 3061 3062 seen_ok = 0; 3063 while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0) 3064 || fg_ctx.fgc_fin(&fg_ctx)) 3065 { 3066 switch (stream->sm_write_to_packet(&fg_ctx, size)) 3067 { 3068 case SWTP_OK: 3069 if (!seen_ok++) 3070 maybe_conn_to_tickable_if_writeable(stream, 0); 3071 if (fg_ctx.fgc_fin(&fg_ctx)) 3072 { 3073 if (use_framing && seen_ok) 3074 maybe_close_varsize_hq_frame(stream); 3075 stream->stream_flags |= STREAM_FIN_SENT; 3076 goto end; 3077 } 3078 else 3079 break; 3080 case SWTP_STOP: 3081 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3082 if (use_framing && seen_ok) 3083 maybe_close_varsize_hq_frame(stream); 3084 goto end; 3085 default: 3086 abort_connection(stream); 3087 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3088 return -1; 3089 } 3090 } 3091 3092 if (use_framing && seen_ok) 3093 maybe_close_varsize_hq_frame(stream); 3094 3095 if (thresh) 3096 { 3097 assert(size < thresh); 3098 assert(size >= stream->sm_n_buffered); 3099 size -= stream->sm_n_buffered; 3100 if (size > 0) 3101 { 3102 nw = save_to_buffer(stream, reader, size); 3103 if (nw < 0) 3104 return -1; 3105 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 3106 } 3107 } 3108 else 3109 { 3110 /* We count flushed data towards both stream and connection limits, 3111 * so we should have been able to packetize all of it: 3112 */ 3113 assert(0 == stream->sm_n_buffered); 3114 assert(size == 0); 3115 } 3116 3117 maybe_mark_as_blocked(stream); 3118 3119 end: 3120 return fg_ctx.fgc_nread_from_reader; 3121} 3122 3123 3124/* Perform an implicit flush when we hit connection limit while buffering 3125 * data. This is to prevent a (theoretical) stall: 3126 * 3127 * Imagine a number of streams, all of which buffered some data. The buffered 3128 * data is up to connection cap, which means no further writes are possible. 3129 * None of them flushes, which means that data is not sent and connection 3130 * WINDOW_UPDATE frame never arrives from peer. Stall. 3131 */ 3132static int 3133maybe_flush_stream (struct lsquic_stream *stream) 3134{ 3135 if (stream->sm_n_buffered > 0 3136 && (stream->sm_bflags & SMBF_CONN_LIMITED) 3137 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 3138 return stream_flush_nocheck(stream); 3139 else 3140 return 0; 3141} 3142 3143 3144static int 3145stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 3146 unsigned len) 3147{ 3148 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 3149 && cur_off - shf->shf_off < (1 << 6) 3150 && cur_off - shf->shf_off + len >= (1 << 6) 3151 ; 3152} 3153 3154 3155/* Update currently buffered HQ frame or create a new one, if possible. 3156 * Return update length to be buffered. If a HQ frame cannot be 3157 * buffered due to size, 0 is returned, thereby preventing both HQ frame 3158 * creation and buffering. 3159 */ 3160static size_t 3161update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 3162 size_t avail) 3163{ 3164 struct stream_hq_frame *shf; 3165 uint64_t cur_off, end; 3166 size_t frame_sz; 3167 unsigned extendable; 3168 3169 cur_off = stream->sm_payload + stream->sm_n_buffered; 3170 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3171 if (shf->shf_off <= cur_off) 3172 { 3173 end = stream_hq_frame_end(shf); 3174 extendable = stream_hq_frame_extendable(shf, cur_off, len); 3175 if (cur_off < end + extendable) 3176 break; 3177 } 3178 3179 if (shf) 3180 { 3181 if (len > end + extendable - cur_off) 3182 len = end + extendable - cur_off; 3183 frame_sz = stream_hq_frame_size(shf); 3184 } 3185 else 3186 { 3187 assert(avail >= 3); 3188 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 3189 if (!shf) 3190 return 0; 3191 if (len > stream_hq_frame_end(shf) - cur_off) 3192 len = stream_hq_frame_end(shf) - cur_off; 3193 extendable = 0; 3194 frame_sz = stream_hq_frame_size(shf); 3195 if (avail < frame_sz) 3196 return 0; 3197 avail -= frame_sz; 3198 } 3199 3200 if (!(shf->shf_flags & SHF_CC_PAID)) 3201 { 3202 incr_conn_cap(stream, frame_sz); 3203 shf->shf_flags |= SHF_CC_PAID; 3204 } 3205 if (extendable) 3206 { 3207 shf->shf_flags |= SHF_TWO_BYTES; 3208 incr_conn_cap(stream, 1); 3209 avail -= 1; 3210 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3211 && shf->shf_off <= stream->sm_payload 3212 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3213 stream->sm_flush_to += 1; 3214 } 3215 3216 if (len <= avail) 3217 return len; 3218 else 3219 return avail; 3220} 3221 3222 3223static ssize_t 3224save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3225 size_t len) 3226{ 3227 size_t avail, n_written, n_allowed; 3228 3229 avail = lsquic_stream_write_avail(stream); 3230 if (avail < len) 3231 len = avail; 3232 if (len == 0) 3233 { 3234 LSQ_DEBUG("zero-byte write (avail: %zu)", avail); 3235 return 0; 3236 } 3237 3238 n_allowed = stream_get_n_allowed(stream); 3239 assert(stream->sm_n_buffered + len <= n_allowed); 3240 3241 if (!stream->sm_buf) 3242 { 3243 stream->sm_buf = malloc(n_allowed); 3244 if (!stream->sm_buf) 3245 return -1; 3246 stream->sm_n_allocated = n_allowed; 3247 } 3248 3249 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3250 == (SMBF_IETF|SMBF_USE_HEADERS)) 3251 len = update_buffered_hq_frames(stream, len, avail); 3252 3253 n_written = reader->lsqr_read(reader->lsqr_ctx, 3254 stream->sm_buf + stream->sm_n_buffered, len); 3255 stream->sm_n_buffered += n_written; 3256 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3257 incr_conn_cap(stream, n_written); 3258 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3259 n_written, stream->sm_n_buffered); 3260 if (0 != maybe_flush_stream(stream)) 3261 return -1; 3262 return n_written; 3263} 3264 3265 3266static ssize_t 3267stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 3268{ 3269 const struct stream_hq_frame *shf; 3270 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3271 ssize_t nw; 3272 3273 len = reader->lsqr_size(reader->lsqr_ctx); 3274 if (len == 0) 3275 return 0; 3276 3277 frames = 0; 3278 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3279 == (SMBF_IETF|SMBF_USE_HEADERS)) 3280 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3281 if (shf->shf_off >= stream->sm_payload) 3282 frames += stream_hq_frame_size(shf); 3283 total_len = len + frames + stream->sm_n_buffered; 3284 thresh = lsquic_stream_flush_threshold(stream, total_len); 3285 n_allowed = stream_get_n_allowed(stream); 3286 if (total_len <= n_allowed && total_len < thresh) 3287 { 3288 nwritten = 0; 3289 do 3290 { 3291 nw = save_to_buffer(stream, reader, len - nwritten); 3292 if (nw > 0) 3293 nwritten += (size_t) nw; 3294 else if (nw == 0) 3295 break; 3296 else 3297 return nw; 3298 } 3299 while (nwritten < len 3300 && stream->sm_n_buffered < stream->sm_n_allocated); 3301 return nwritten; 3302 } 3303 else 3304 return stream_write_to_packets(stream, reader, thresh); 3305} 3306 3307 3308ssize_t 3309lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3310{ 3311 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3312 return lsquic_stream_writev(stream, &iov, 1); 3313} 3314 3315 3316struct inner_reader_iovec { 3317 const struct iovec *iov; 3318 const struct iovec *end; 3319 unsigned cur_iovec_off; 3320}; 3321 3322 3323static size_t 3324inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3325{ 3326 struct inner_reader_iovec *const iro = ctx; 3327 unsigned char *p = buf; 3328 unsigned char *const end = p + count; 3329 unsigned n_tocopy; 3330 3331 while (iro->iov < iro->end && p < end) 3332 { 3333 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3334 if (n_tocopy > (unsigned) (end - p)) 3335 n_tocopy = end - p; 3336 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3337 n_tocopy); 3338 p += n_tocopy; 3339 iro->cur_iovec_off += n_tocopy; 3340 if (iro->iov->iov_len == iro->cur_iovec_off) 3341 { 3342 ++iro->iov; 3343 iro->cur_iovec_off = 0; 3344 } 3345 } 3346 3347 return p + count - end; 3348} 3349 3350 3351static size_t 3352inner_reader_iovec_size (void *ctx) 3353{ 3354 struct inner_reader_iovec *const iro = ctx; 3355 const struct iovec *iov; 3356 size_t size; 3357 3358 size = 0; 3359 for (iov = iro->iov; iov < iro->end; ++iov) 3360 size += iov->iov_len; 3361 3362 return size - iro->cur_iovec_off; 3363} 3364 3365 3366ssize_t 3367lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3368 int iovcnt) 3369{ 3370 COMMON_WRITE_CHECKS(); 3371 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3372 3373 struct inner_reader_iovec iro = { 3374 .iov = iov, 3375 .end = iov + iovcnt, 3376 .cur_iovec_off = 0, 3377 }; 3378 struct lsquic_reader reader = { 3379 .lsqr_read = inner_reader_iovec_read, 3380 .lsqr_size = inner_reader_iovec_size, 3381 .lsqr_ctx = &iro, 3382 }; 3383 3384 return stream_write(stream, &reader); 3385} 3386 3387 3388ssize_t 3389lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3390{ 3391 COMMON_WRITE_CHECKS(); 3392 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3393 return stream_write(stream, reader); 3394} 3395 3396 3397/* This bypasses COMMON_WRITE_CHECKS */ 3398static ssize_t 3399stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3400{ 3401 const struct iovec iov[1] = {{ (void *) buf, sz, }}; 3402 struct inner_reader_iovec iro = { 3403 .iov = iov, 3404 .end = iov + 1, 3405 .cur_iovec_off = 0, 3406 }; 3407 struct lsquic_reader reader = { 3408 .lsqr_read = inner_reader_iovec_read, 3409 .lsqr_size = inner_reader_iovec_size, 3410 .lsqr_ctx = &iro, 3411 }; 3412 return stream_write(stream, &reader); 3413} 3414 3415 3416/* This limits the cumulative size of the compressed header fields */ 3417#define MAX_HEADERS_SIZE (64 * 1024) 3418 3419static int 3420send_headers_ietf (struct lsquic_stream *stream, 3421 const struct lsquic_http_headers *headers, int eos) 3422{ 3423 enum qwh_status qwh; 3424 const size_t max_prefix_size = 3425 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 3426 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 3427 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 3428 unsigned bits; 3429 ssize_t nw; 3430 unsigned char *header_block; 3431 enum lsqpack_enc_header_flags hflags; 3432 unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE]; 3433 3434 stream->stream_flags &= ~STREAM_PUSHING; 3435 stream->stream_flags |= STREAM_NOPUSH; 3436 3437 /* TODO: Optimize for the common case: write directly to sm_buf and fall 3438 * back to a larger buffer if that fails. 3439 */ 3440 prefix_sz = max_prefix_size; 3441 headers_sz = sizeof(buf) - max_prefix_size - max_push_size; 3442 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 3443 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 3444 &headers_sz, &stream->sm_hb_compl, &hflags); 3445 3446 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 3447 { 3448 if (qwh == QWH_ENOBUF) 3449 LSQ_INFO("not enough room for header block"); 3450 else 3451 LSQ_WARN("internal error encoding and sending HTTP headers"); 3452 return -1; 3453 } 3454 3455 if (hflags & LSQECH_REF_NEW_ENTRIES) 3456 stream->stream_flags |= STREAM_ENCODER_DEP; 3457 3458 if (stream->sm_promise) 3459 { 3460 assert(lsquic_stream_is_pushed(stream)); 3461 bits = vint_val2bits(stream->sm_promise->pp_id); 3462 push_sz = 1 + (1 << bits); 3463 if (!stream_activate_hq_frame(stream, 3464 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 3465 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 3466 return -1; 3467 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 3468 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 3469 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 3470 } 3471 else 3472 push_sz = 0; 3473 3474 /* Construct contiguous header block buffer including HQ framing */ 3475 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 3476 hblock_sz = push_sz + prefix_sz + headers_sz; 3477 if (!stream_activate_hq_frame(stream, 3478 stream->sm_payload + stream->sm_n_buffered + push_sz, 3479 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 3480 return -1; 3481 3482 if (qwh == QWH_FULL) 3483 { 3484 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 3485 if (lsquic_stream_write_avail(stream)) 3486 { 3487 nw = stream_write_buf(stream, header_block, hblock_sz); 3488 if (nw < 0) 3489 { 3490 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 3491 return -1; 3492 } 3493 if ((size_t) nw == hblock_sz) 3494 { 3495 stream->stream_flags |= STREAM_HEADERS_SENT; 3496 stream_hblock_sent(stream); 3497 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 3498 return 0; 3499 } 3500 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 3501 } 3502 else 3503 { 3504 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 3505 "header block", hblock_sz); 3506 nw = 0; 3507 } 3508 } 3509 else 3510 { 3511 stream->sm_send_headers_state = SSHS_ENC_SENDING; 3512 nw = 0; 3513 } 3514 3515 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 3516 stream_wantwrite(stream, 1); 3517 3518 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 3519 if (!stream->sm_header_block) 3520 { 3521 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 3522 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 3523 return -1; 3524 } 3525 memcpy(stream->sm_header_block, header_block + (size_t) nw, 3526 hblock_sz - (size_t) nw); 3527 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 3528 stream->sm_hblock_off = 0; 3529 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 3530 return 0; 3531} 3532 3533 3534static int 3535send_headers_gquic (struct lsquic_stream *stream, 3536 const struct lsquic_http_headers *headers, int eos) 3537{ 3538 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 3539 stream->id, headers, eos, lsquic_stream_priority(stream)); 3540 if (0 == s) 3541 { 3542 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 3543 stream->stream_flags |= STREAM_HEADERS_SENT; 3544 if (eos) 3545 stream->stream_flags |= STREAM_FIN_SENT; 3546 LSQ_INFO("sent headers"); 3547 } 3548 else 3549 LSQ_WARN("could not send headers: %s", strerror(errno)); 3550 return s; 3551} 3552 3553 3554int 3555lsquic_stream_send_headers (lsquic_stream_t *stream, 3556 const lsquic_http_headers_t *headers, int eos) 3557{ 3558 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3559 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 3560 { 3561 if (stream->sm_bflags & SMBF_IETF) 3562 return send_headers_ietf(stream, headers, eos); 3563 else 3564 return send_headers_gquic(stream, headers, eos); 3565 } 3566 else 3567 { 3568 LSQ_INFO("cannot send headers in this state"); 3569 errno = EBADMSG; 3570 return -1; 3571 } 3572} 3573 3574 3575void 3576lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 3577{ 3578 if (offset > stream->max_send_off) 3579 { 3580 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 3581 LSQ_DEBUG("update max send offset from 0x%"PRIX64" to " 3582 "0x%"PRIX64, stream->max_send_off, offset); 3583 stream->max_send_off = offset; 3584 } 3585 else 3586 LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old " 3587 "max send offset 0x%"PRIX64", ignoring", offset, 3588 stream->max_send_off); 3589} 3590 3591 3592/* This function is used to update offsets after handshake completes and we 3593 * learn of peer's limits from the handshake values. 3594 */ 3595int 3596lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 3597{ 3598 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 3599 if (offset > stream->max_send_off) 3600 { 3601 lsquic_stream_window_update(stream, offset); 3602 return 0; 3603 } 3604 else if (offset < stream->tosend_off) 3605 { 3606 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 3607 "data already sent on this stream (%"PRIu64" bytes)", offset, 3608 stream->tosend_off); 3609 return -1; 3610 } 3611 else 3612 { 3613 stream->max_send_off = offset; 3614 return 0; 3615 } 3616} 3617 3618 3619void 3620lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 3621{ 3622 lsquic_stream_reset_ext(stream, error_code, 1); 3623} 3624 3625 3626void 3627lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 3628 int do_close) 3629{ 3630 if ((stream->stream_flags & STREAM_RST_SENT) 3631 || (stream->sm_qflags & SMQF_SEND_RST)) 3632 { 3633 LSQ_INFO("reset already sent"); 3634 return; 3635 } 3636 3637 SM_HISTORY_APPEND(stream, SHE_RESET); 3638 3639 LSQ_INFO("reset, error code %"PRIu64, error_code); 3640 stream->error_code = error_code; 3641 3642 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 3643 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 3644 next_send_stream); 3645 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 3646 stream->sm_qflags |= SMQF_SEND_RST; 3647 3648 if (stream->sm_qflags & SMQF_QPACK_DEC) 3649 { 3650 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 3651 stream->sm_qflags &= ~SMQF_QPACK_DEC; 3652 } 3653 3654 drop_buffered_data(stream); 3655 maybe_elide_stream_frames(stream); 3656 maybe_schedule_call_on_close(stream); 3657 3658 if (do_close) 3659 lsquic_stream_close(stream); 3660 else 3661 maybe_conn_to_tickable_if_writeable(stream, 1); 3662} 3663 3664 3665lsquic_stream_id_t 3666lsquic_stream_id (const lsquic_stream_t *stream) 3667{ 3668 return stream->id; 3669} 3670 3671 3672#if !defined(NDEBUG) && __GNUC__ 3673__attribute__((weak)) 3674#endif 3675struct lsquic_conn * 3676lsquic_stream_conn (const lsquic_stream_t *stream) 3677{ 3678 return stream->conn_pub->lconn; 3679} 3680 3681 3682int 3683lsquic_stream_close (lsquic_stream_t *stream) 3684{ 3685 LSQ_DEBUG("lsquic_stream_close() called"); 3686 SM_HISTORY_APPEND(stream, SHE_CLOSE); 3687 if (lsquic_stream_is_closed(stream)) 3688 { 3689 LSQ_INFO("Attempt to close an already-closed stream"); 3690 errno = EBADF; 3691 return -1; 3692 } 3693 maybe_stream_shutdown_write(stream); 3694 stream_shutdown_read(stream); 3695 maybe_schedule_call_on_close(stream); 3696 maybe_finish_stream(stream); 3697 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 3698 maybe_conn_to_tickable_if_writeable(stream, 1); 3699 return 0; 3700} 3701 3702 3703#ifndef NDEBUG 3704#if __GNUC__ 3705__attribute__((weak)) 3706#endif 3707#endif 3708void 3709lsquic_stream_acked (struct lsquic_stream *stream, 3710 enum quic_frame_type frame_type) 3711{ 3712 assert(stream->n_unacked); 3713 --stream->n_unacked; 3714 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 3715 if (frame_type == QUIC_FRAME_RST_STREAM) 3716 { 3717 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 3718 LSQ_DEBUG("RESET that we sent has been acked by peer"); 3719 stream->stream_flags |= STREAM_RST_ACKED; 3720 } 3721 if (0 == stream->n_unacked) 3722 maybe_finish_stream(stream); 3723} 3724 3725 3726void 3727lsquic_stream_push_req (lsquic_stream_t *stream, 3728 struct uncompressed_headers *push_req) 3729{ 3730 assert(!stream->push_req); 3731 stream->push_req = push_req; 3732 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 3733} 3734 3735 3736int 3737lsquic_stream_is_pushed (const lsquic_stream_t *stream) 3738{ 3739 enum stream_id_type sit; 3740 3741 if (stream->sm_bflags & SMBF_IETF) 3742 { 3743 sit = stream->id & SIT_MASK; 3744 return sit == SIT_UNI_SERVER; 3745 } 3746 else 3747 return 1 & ~stream->id; 3748} 3749 3750 3751int 3752lsquic_stream_push_info (const lsquic_stream_t *stream, 3753 lsquic_stream_id_t *ref_stream_id, void **hset) 3754{ 3755 if (lsquic_stream_is_pushed(stream)) 3756 { 3757 assert(stream->push_req); 3758 *ref_stream_id = stream->push_req->uh_stream_id; 3759 *hset = stream->push_req->uh_hset; 3760 return 0; 3761 } 3762 else 3763 return -1; 3764} 3765 3766 3767static int 3768stream_uh_in_gquic (struct lsquic_stream *stream, 3769 struct uncompressed_headers *uh) 3770{ 3771 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3772 && !(stream->stream_flags & STREAM_HAVE_UH)) 3773 { 3774 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3775 LSQ_DEBUG("received uncompressed headers"); 3776 stream->stream_flags |= STREAM_HAVE_UH; 3777 if (uh->uh_flags & UH_FIN) 3778 { 3779 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3780 * mark request as done: 3781 */ 3782 if (stream->sm_bflags & SMBF_IETF) 3783 assert((stream->sm_bflags & SMBF_SERVER) 3784 && lsquic_stream_is_pushed(stream)); 3785 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3786 } 3787 stream->uh = uh; 3788 if (uh->uh_oth_stream_id == 0) 3789 { 3790 if (uh->uh_weight) 3791 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3792 } 3793 else 3794 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3795 uh->uh_oth_stream_id); 3796 return 0; 3797 } 3798 else 3799 { 3800 LSQ_ERROR("received unexpected uncompressed headers"); 3801 return -1; 3802 } 3803} 3804 3805 3806static int 3807stream_uh_in_ietf (struct lsquic_stream *stream, 3808 struct uncompressed_headers *uh) 3809{ 3810 int push_promise; 3811 3812 push_promise = lsquic_stream_header_is_pp(stream); 3813 if (!(stream->stream_flags & STREAM_HAVE_UH) && !push_promise) 3814 { 3815 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3816 LSQ_DEBUG("received uncompressed headers"); 3817 stream->stream_flags |= STREAM_HAVE_UH; 3818 if (uh->uh_flags & UH_FIN) 3819 { 3820 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3821 * mark request as done: 3822 */ 3823 if (stream->sm_bflags & SMBF_IETF) 3824 assert((stream->sm_bflags & SMBF_SERVER) 3825 && lsquic_stream_is_pushed(stream)); 3826 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3827 } 3828 stream->uh = uh; 3829 if (uh->uh_oth_stream_id == 0) 3830 { 3831 if (uh->uh_weight) 3832 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3833 } 3834 else 3835 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3836 uh->uh_oth_stream_id); 3837 } 3838 else 3839 { 3840 /* Trailer should never make here, as we discard it in qdh */ 3841 LSQ_DEBUG("discard %s header set", 3842 push_promise ? "push promise" : "trailer"); 3843 if (uh->uh_hset) 3844 stream->conn_pub->enpub->enp_hsi_if 3845 ->hsi_discard_header_set(uh->uh_hset); 3846 free(uh); 3847 } 3848 3849 return 0; 3850} 3851 3852 3853int 3854lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 3855{ 3856 if (stream->sm_bflags & SMBF_USE_HEADERS) 3857 { 3858 if (stream->sm_bflags & SMBF_IETF) 3859 return stream_uh_in_ietf(stream, uh); 3860 else 3861 return stream_uh_in_gquic(stream, uh); 3862 } 3863 else 3864 return -1; 3865} 3866 3867 3868unsigned 3869lsquic_stream_priority (const lsquic_stream_t *stream) 3870{ 3871 return 256 - stream->sm_priority; 3872} 3873 3874 3875int 3876lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 3877{ 3878 /* The user should never get a reference to the special streams, 3879 * but let's check just in case: 3880 */ 3881 if (lsquic_stream_is_critical(stream)) 3882 return -1; 3883 if (priority < 1 || priority > 256) 3884 return -1; 3885 stream->sm_priority = 256 - priority; 3886 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 3887 LSQ_DEBUG("set priority to %u", priority); 3888 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 3889 return 0; 3890} 3891 3892 3893static int 3894maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 3895{ 3896 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3897 && (stream->stream_flags & STREAM_HEADERS_SENT)) 3898 { 3899 /* We need to send headers only if we are a) using HEADERS stream 3900 * and b) we already sent initial headers. If initial headers 3901 * have not been sent yet, stream priority will be sent in the 3902 * HEADERS frame. 3903 */ 3904 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 3905 stream->id, 0, 0, priority); 3906 } 3907 else 3908 return 0; 3909} 3910 3911 3912static int 3913send_priority_ietf (struct lsquic_stream *stream, unsigned priority) 3914{ 3915 LSQ_WARN("%s: TODO", __func__); /* TODO */ 3916 return -1; 3917} 3918 3919 3920int 3921lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 3922{ 3923 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 3924 { 3925 if (stream->sm_bflags & SMBF_IETF) 3926 return send_priority_ietf(stream, priority); 3927 else 3928 return maybe_send_priority_gquic(stream, priority); 3929 } 3930 else 3931 return -1; 3932} 3933 3934 3935lsquic_stream_ctx_t * 3936lsquic_stream_get_ctx (const lsquic_stream_t *stream) 3937{ 3938 fiu_return_on("stream/get_ctx", NULL); 3939 return stream->st_ctx; 3940} 3941 3942 3943int 3944lsquic_stream_refuse_push (lsquic_stream_t *stream) 3945{ 3946 if (lsquic_stream_is_pushed(stream) 3947 && !(stream->sm_qflags & SMQF_SEND_RST) 3948 && !(stream->stream_flags & STREAM_RST_SENT)) 3949 { 3950 LSQ_DEBUG("refusing pushed stream: send reset"); 3951 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 3952 return 0; 3953 } 3954 else 3955 return -1; 3956} 3957 3958 3959size_t 3960lsquic_stream_mem_used (const struct lsquic_stream *stream) 3961{ 3962 size_t size; 3963 3964 size = sizeof(stream); 3965 if (stream->sm_buf) 3966 size += stream->sm_n_allocated; 3967 if (stream->data_in) 3968 size += stream->data_in->di_if->di_mem_used(stream->data_in); 3969 3970 return size; 3971} 3972 3973 3974const lsquic_cid_t * 3975lsquic_stream_cid (const struct lsquic_stream *stream) 3976{ 3977 return LSQUIC_LOG_CONN_ID; 3978} 3979 3980 3981void 3982lsquic_stream_dump_state (const struct lsquic_stream *stream) 3983{ 3984 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 3985 stream->read_offset); 3986 stream->data_in->di_if->di_dump_state(stream->data_in); 3987} 3988 3989 3990void * 3991lsquic_stream_get_hset (struct lsquic_stream *stream) 3992{ 3993 void *hset; 3994 3995 if (lsquic_stream_is_reset(stream)) 3996 { 3997 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 3998 errno = ECONNRESET; 3999 return NULL; 4000 } 4001 4002 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 4003 && (stream->stream_flags & STREAM_HAVE_UH))) 4004 { 4005 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 4006 stream->stream_flags); 4007 return NULL; 4008 } 4009 4010 if (!stream->uh) 4011 { 4012 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 4013 return NULL; 4014 } 4015 4016 if (stream->uh->uh_flags & UH_H1H) 4017 { 4018 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 4019 return NULL; 4020 } 4021 4022 hset = stream->uh->uh_hset; 4023 stream->uh->uh_hset = NULL; 4024 destroy_uh(stream); 4025 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 4026 { 4027 stream->stream_flags |= STREAM_FIN_REACHED; 4028 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 4029 } 4030 LSQ_DEBUG("return header set"); 4031 return hset; 4032} 4033 4034 4035void 4036lsquic_stream_set_stream_if (struct lsquic_stream *stream, 4037 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 4038{ 4039 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 4040 stream->stream_if = stream_if; 4041 stream->sm_onnew_arg = stream_if_ctx; 4042 LSQ_DEBUG("switched interface"); 4043 assert(stream->stream_flags & STREAM_ONNEW_DONE); 4044 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 4045 stream); 4046} 4047 4048 4049static int 4050update_type_hist_and_check (const struct lsquic_stream *stream, 4051 struct hq_filter *filter) 4052{ 4053 /* 3-bit codes: */ 4054 enum { 4055 CODE_UNSET, 4056 CODE_HEADER, /* H Header */ 4057 CODE_DATA, /* D Data */ 4058 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 4059 }; 4060 static const unsigned valid_seqs[] = { 4061 /* Ordered by expected frequency */ 4062 0123, /* HD+ */ 4063 012, /* HD */ 4064 01, /* H */ 4065 01231, /* HD+H */ 4066 0121, /* HDH */ 4067 }; 4068 unsigned code, i; 4069 4070 switch (filter->hqfi_type) 4071 { 4072 case HQFT_HEADERS: 4073 code = CODE_HEADER; 4074 break; 4075 case HQFT_DATA: 4076 code = CODE_DATA; 4077 break; 4078 case HQFT_PUSH_PROMISE: 4079 /* [draft-ietf-quic-http-24], Section 7 */ 4080 if ((stream->id & SIT_MASK) == SIT_BIDI_CLIENT 4081 && !(stream->sm_bflags & SMBF_SERVER)) 4082 return 0; 4083 else 4084 return -1; 4085 case HQFT_CANCEL_PUSH: 4086 case HQFT_SETTINGS: 4087 case HQFT_GOAWAY: 4088 case HQFT_MAX_PUSH_ID: 4089 /* [draft-ietf-quic-http-24], Section 7 */ 4090 return -1; 4091 default: 4092 /* Ignore unknown frames */ 4093 return 0; 4094 } 4095 4096 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 4097 return -1; 4098 4099 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 4100 { 4101 filter->hqfi_hist_buf <<= 3; 4102 filter->hqfi_hist_buf |= CODE_PLUS; 4103 filter->hqfi_hist_idx++; 4104 } 4105 else if (filter->hqfi_hist_idx > 1 4106 && ((filter->hqfi_hist_buf >> 3) & 7) == code 4107 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 4108 /* Keep it at plus, do nothing */; 4109 else 4110 { 4111 filter->hqfi_hist_buf <<= 3; 4112 filter->hqfi_hist_buf |= code; 4113 filter->hqfi_hist_idx++; 4114 } 4115 4116 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 4117 if (filter->hqfi_hist_buf == valid_seqs[i]) 4118 return 0; 4119 4120 return -1; 4121} 4122 4123 4124int 4125lsquic_stream_header_is_pp (const struct lsquic_stream *stream) 4126{ 4127 return stream->sm_hq_filter.hqfi_type == HQFT_PUSH_PROMISE; 4128} 4129 4130 4131int 4132lsquic_stream_header_is_trailer (const struct lsquic_stream *stream) 4133{ 4134 return (stream->stream_flags & STREAM_HAVE_UH) 4135 && stream->sm_hq_filter.hqfi_type == HQFT_HEADERS; 4136} 4137 4138 4139static void 4140verify_cl_on_new_data_frame (struct lsquic_stream *stream, 4141 struct hq_filter *filter) 4142{ 4143 struct lsquic_conn *lconn; 4144 4145 stream->sm_data_in += filter->hqfi_left; 4146 if (stream->sm_data_in > stream->sm_cont_len) 4147 { 4148 lconn = stream->conn_pub->lconn; 4149 lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR, 4150 "number of bytes in DATA frames of stream %"PRIu64" exceeds " 4151 "content-length limit of %llu", stream->id, stream->sm_cont_len); 4152 } 4153} 4154 4155 4156static size_t 4157hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 4158{ 4159 struct lsquic_stream *const stream = ctx; 4160 struct hq_filter *const filter = &stream->sm_hq_filter; 4161 const unsigned char *p = buf, *prev; 4162 const unsigned char *const end = buf + sz; 4163 struct lsquic_conn *lconn; 4164 enum lsqpack_read_header_status rhs; 4165 int s; 4166 4167 while (p < end) 4168 { 4169 switch (filter->hqfi_state) 4170 { 4171 case HQFI_STATE_FRAME_HEADER_BEGIN: 4172 filter->hqfi_vint2_state.vr2s_state = 0; 4173 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 4174 /* fall-through */ 4175 case HQFI_STATE_FRAME_HEADER_CONTINUE: 4176 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint2_state); 4177 if (s < 0) 4178 break; 4179 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 4180 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 4181 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 4182 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 4183 filter->hqfi_left); 4184 if (0 != update_type_hist_and_check(stream, filter)) 4185 { 4186 lconn = stream->conn_pub->lconn; 4187 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4188 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 4189 filter->hqfi_hist_buf); 4190 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED, 4191 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 4192 stream->id); 4193 goto end; 4194 } 4195 if (filter->hqfi_left > 0) 4196 { 4197 if (filter->hqfi_type == HQFT_DATA) 4198 { 4199 if (stream->sm_bflags & SMBF_VERIFY_CL) 4200 verify_cl_on_new_data_frame(stream, filter); 4201 goto end; 4202 } 4203 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 4204 { 4205 if (stream->sm_bflags & SMBF_SERVER) 4206 { 4207 lconn = stream->conn_pub->lconn; 4208 lconn->cn_if->ci_abort_error(lconn, 1, 4209 HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame " 4210 "on stream %"PRIu64" (clients are not supposed to " 4211 "send those)", stream->id); 4212 goto end; 4213 } 4214 else 4215 filter->hqfi_state = HQFI_STATE_PUSH_ID_BEGIN; 4216 } 4217 } 4218 else 4219 { 4220 switch (filter->hqfi_type) 4221 { 4222 case HQFT_CANCEL_PUSH: 4223 case HQFT_GOAWAY: 4224 case HQFT_HEADERS: 4225 case HQFT_MAX_PUSH_ID: 4226 case HQFT_PUSH_PROMISE: 4227 case HQFT_SETTINGS: 4228 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4229 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 4230 filter->hqfi_type); 4231 abort_connection(stream); /* XXX Overkill? */ 4232 goto end; 4233 default: 4234 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4235 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4236 break; 4237 } 4238 } 4239 break; 4240 case HQFI_STATE_PUSH_ID_BEGIN: 4241 filter->hqfi_vint1_state.pos = 0; 4242 filter->hqfi_state = HQFI_STATE_PUSH_ID_CONTINUE; 4243 /* Fall-through */ 4244 case HQFI_STATE_PUSH_ID_CONTINUE: 4245 prev = p; 4246 s = lsquic_varint_read_nb(&p, end, &filter->hqfi_vint1_state); 4247 filter->hqfi_left -= p - prev; 4248 if (s == 0) 4249 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 4250 /* A bit of a white lie here */ 4251 break; 4252 case HQFI_STATE_READING_PAYLOAD: 4253 if (filter->hqfi_type == HQFT_DATA) 4254 goto end; 4255 sz = filter->hqfi_left; 4256 if (sz > (uintptr_t) (end - p)) 4257 sz = (uintptr_t) (end - p); 4258 switch (filter->hqfi_type) 4259 { 4260 case HQFT_HEADERS: 4261 case HQFT_PUSH_PROMISE: 4262 prev = p; 4263 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 4264 { 4265 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4266 rhs = lsquic_qdh_header_in_begin( 4267 stream->conn_pub->u.ietf.qdh, 4268 stream, filter->hqfi_left, &p, sz); 4269 } 4270 else 4271 rhs = lsquic_qdh_header_in_continue( 4272 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 4273 assert(p > prev || LQRHS_ERROR == rhs); 4274 filter->hqfi_left -= p - prev; 4275 if (filter->hqfi_left == 0) 4276 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4277 switch (rhs) 4278 { 4279 case LQRHS_DONE: 4280 assert(filter->hqfi_left == 0); 4281 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4282 break; 4283 case LQRHS_NEED: 4284 stream->sm_qflags |= SMQF_QPACK_DEC; 4285 break; 4286 case LQRHS_BLOCKED: 4287 stream->sm_qflags |= SMQF_QPACK_DEC; 4288 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 4289 goto end; 4290 default: 4291 assert(LQRHS_ERROR == rhs); 4292 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4293 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4294 LSQ_INFO("error processing header block"); 4295 abort_connection(stream); /* XXX Overkill? */ 4296 goto end; 4297 } 4298 break; 4299 default: 4300 /* Simply skip unknown frame type payload for now */ 4301 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4302 p += sz; 4303 filter->hqfi_left -= sz; 4304 if (filter->hqfi_left == 0) 4305 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4306 break; 4307 } 4308 break; 4309 default: 4310 assert(0); 4311 goto end; 4312 } 4313 } 4314 4315 end: 4316 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4317 { 4318 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4319 filter->hqfi_state); 4320 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4321/* From [draft-ietf-quic-http-16] Section 3.1: 4322 * When a stream terminates cleanly, if the last frame on 4323 * the stream was truncated, this MUST be treated as a connection error 4324 * (see HTTP_MALFORMED_FRAME in Section 8.1). 4325 */ 4326 abort_connection(stream); 4327 } 4328 4329 return p - buf; 4330} 4331 4332 4333static int 4334hq_filter_readable_now (const struct lsquic_stream *stream) 4335{ 4336 const struct hq_filter *const filter = &stream->sm_hq_filter; 4337 4338 return (filter->hqfi_type == HQFT_DATA 4339 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4340 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4341 || stream->uh 4342 || (stream->stream_flags & STREAM_FIN_REACHED) 4343 ; 4344} 4345 4346 4347static int 4348hq_filter_readable (struct lsquic_stream *stream) 4349{ 4350 struct hq_filter *const filter = &stream->sm_hq_filter; 4351 struct read_frames_status rfs; 4352 4353 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 4354 return 0; 4355 4356 if (!hq_filter_readable_now(stream)) 4357 { 4358 rfs = read_data_frames(stream, 0, hq_read, stream); 4359 if (rfs.total_nread == 0) 4360 { 4361 if (rfs.error) 4362 { 4363 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4364 abort_connection(stream); /* XXX Overkill? */ 4365 return 1; /* Collect error */ 4366 } 4367 return 0; 4368 } 4369 } 4370 4371 return hq_filter_readable_now(stream); 4372} 4373 4374 4375static size_t 4376hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 4377{ 4378 struct hq_filter *const filter = &stream->sm_hq_filter; 4379 size_t nr; 4380 4381 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4382 && filter->hqfi_type == HQFT_DATA)) 4383 { 4384 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 4385 data_frame->df_size - data_frame->df_read_off, 4386 data_frame->df_fin); 4387 if (nr) 4388 { 4389 stream->read_offset += nr; 4390 stream_consumed_bytes(stream); 4391 } 4392 } 4393 else 4394 nr = 0; 4395 4396 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 4397 { 4398 data_frame->df_read_off += nr; 4399 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4400 && filter->hqfi_type == HQFT_DATA) 4401 return MIN(filter->hqfi_left, 4402 (unsigned) data_frame->df_size - data_frame->df_read_off); 4403 else 4404 { 4405 assert(data_frame->df_read_off == data_frame->df_size); 4406 return 0; 4407 } 4408 } 4409 else 4410 { 4411 data_frame->df_read_off = data_frame->df_size; 4412 return 0; 4413 } 4414} 4415 4416 4417static void 4418hq_decr_left (struct lsquic_stream *stream, size_t read) 4419{ 4420 struct hq_filter *const filter = &stream->sm_hq_filter; 4421 4422 if (read) 4423 { 4424 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4425 && filter->hqfi_type == HQFT_DATA); 4426 assert(read <= filter->hqfi_left); 4427 } 4428 4429 filter->hqfi_left -= read; 4430 if (0 == filter->hqfi_left) 4431 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4432} 4433 4434 4435struct qpack_dec_hdl * 4436lsquic_stream_get_qdh (const struct lsquic_stream *stream) 4437{ 4438 return stream->conn_pub->u.ietf.qdh; 4439} 4440 4441 4442/* These are IETF QUIC states */ 4443enum stream_state_sending 4444lsquic_stream_sending_state (const struct lsquic_stream *stream) 4445{ 4446 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 4447 { 4448 if (stream->stream_flags & STREAM_FIN_SENT) 4449 { 4450 if (stream->n_unacked) 4451 return SSS_DATA_SENT; 4452 else 4453 return SSS_DATA_RECVD; 4454 } 4455 else 4456 { 4457 if (stream->tosend_off 4458 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 4459 return SSS_SEND; 4460 else 4461 return SSS_READY; 4462 } 4463 } 4464 else if (stream->stream_flags & STREAM_RST_ACKED) 4465 return SSS_RESET_RECVD; 4466 else 4467 return SSS_RESET_SENT; 4468} 4469 4470 4471const char *const lsquic_sss2str[] = 4472{ 4473 [SSS_READY] = "Ready", 4474 [SSS_SEND] = "Send", 4475 [SSS_DATA_SENT] = "Data Sent", 4476 [SSS_RESET_SENT] = "Reset Sent", 4477 [SSS_DATA_RECVD] = "Data Recvd", 4478 [SSS_RESET_RECVD] = "Reset Recvd", 4479}; 4480 4481 4482const char *const lsquic_ssr2str[] = 4483{ 4484 [SSR_RECV] = "Recv", 4485 [SSR_SIZE_KNOWN] = "Size Known", 4486 [SSR_DATA_RECVD] = "Data Recvd", 4487 [SSR_RESET_RECVD] = "Reset Recvd", 4488 [SSR_DATA_READ] = "Data Read", 4489 [SSR_RESET_READ] = "Reset Read", 4490}; 4491 4492 4493/* These are IETF QUIC states */ 4494enum stream_state_receiving 4495lsquic_stream_receiving_state (struct lsquic_stream *stream) 4496{ 4497 uint64_t n_bytes; 4498 4499 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 4500 { 4501 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 4502 return SSR_RECV; 4503 if (stream->stream_flags & STREAM_FIN_REACHED) 4504 return SSR_DATA_READ; 4505 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 4506 { 4507 n_bytes = stream->data_in->di_if->di_readable_bytes( 4508 stream->data_in, stream->read_offset); 4509 if (stream->read_offset + n_bytes == stream->sm_fin_off) 4510 { 4511 stream->stream_flags |= STREAM_DATA_RECVD; 4512 return SSR_DATA_RECVD; 4513 } 4514 else 4515 return SSR_SIZE_KNOWN; 4516 } 4517 else 4518 return SSR_DATA_RECVD; 4519 } 4520 else if (stream->stream_flags & STREAM_RST_READ) 4521 return SSR_RESET_READ; 4522 else 4523 return SSR_RESET_RECVD; 4524} 4525 4526 4527void 4528lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 4529{ 4530 struct hq_filter *const filter = &stream->sm_hq_filter; 4531 4532 assert(stream->sm_qflags & SMQF_QPACK_DEC); 4533 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 4534 4535 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 4536 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 4537 LSQ_DEBUG("QPACK decoder unblocked"); 4538} 4539 4540 4541int 4542lsquic_stream_is_rejected (const struct lsquic_stream *stream) 4543{ 4544 return stream->stream_flags & STREAM_SS_RECVD; 4545} 4546 4547 4548int 4549lsquic_stream_can_push (const struct lsquic_stream *stream) 4550{ 4551 if (lsquic_stream_is_pushed(stream)) 4552 return 0; 4553 else if (stream->sm_bflags & SMBF_IETF) 4554 return (stream->sm_bflags & SMBF_USE_HEADERS) 4555 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 4556 && stream->sm_send_headers_state == SSHS_BEGIN 4557 ; 4558 else 4559 return 1; 4560} 4561 4562 4563static size_t 4564pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4565{ 4566 struct push_promise *const promise = lsqr_ctx; 4567 unsigned char *dst = buf; 4568 unsigned char *const end = buf + count; 4569 size_t len; 4570 4571 while (dst < end) 4572 { 4573 switch (promise->pp_write_state) 4574 { 4575 case PPWS_ID0: 4576 case PPWS_ID1: 4577 case PPWS_ID2: 4578 case PPWS_ID3: 4579 case PPWS_ID4: 4580 case PPWS_ID5: 4581 case PPWS_ID6: 4582 case PPWS_ID7: 4583 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 4584 ++promise->pp_write_state; 4585 break; 4586 case PPWS_PFX0: 4587 *dst++ = 0; 4588 ++promise->pp_write_state; 4589 break; 4590 case PPWS_PFX1: 4591 *dst++ = 0; 4592 ++promise->pp_write_state; 4593 break; 4594 case PPWS_HBLOCK: 4595 len = MIN(promise->pp_content_len - promise->pp_write_off, 4596 (size_t) (end - dst)); 4597 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 4598 len); 4599 promise->pp_write_off += len; 4600 dst += len; 4601 if (promise->pp_content_len == promise->pp_write_off) 4602 { 4603 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 4604 ": reset push state", promise->pp_id); 4605 promise->pp_write_state = PPWS_DONE; 4606 } 4607 goto end; 4608 default: 4609 goto end; 4610 } 4611 } 4612 4613 end: 4614 return dst - (unsigned char *) buf; 4615} 4616 4617 4618static size_t 4619pp_reader_size (void *lsqr_ctx) 4620{ 4621 struct push_promise *const promise = lsqr_ctx; 4622 size_t size; 4623 4624 size = 0; 4625 switch (promise->pp_write_state) 4626 { 4627 case PPWS_ID0: 4628 case PPWS_ID1: 4629 case PPWS_ID2: 4630 case PPWS_ID3: 4631 case PPWS_ID4: 4632 case PPWS_ID5: 4633 case PPWS_ID6: 4634 case PPWS_ID7: 4635 size += 8 - promise->pp_write_state; 4636 /* fall-through */ 4637 case PPWS_PFX0: 4638 ++size; 4639 /* fall-through */ 4640 case PPWS_PFX1: 4641 ++size; 4642 /* fall-through */ 4643 case PPWS_HBLOCK: 4644 size += promise->pp_content_len - promise->pp_write_off; 4645 break; 4646 default: 4647 break; 4648 } 4649 4650 return size; 4651} 4652 4653 4654static void 4655init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 4656{ 4657 reader->lsqr_read = pp_reader_read; 4658 reader->lsqr_size = pp_reader_size; 4659 reader->lsqr_ctx = promise; 4660} 4661 4662 4663static void 4664on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4665{ 4666 struct lsquic_reader pp_reader; 4667 struct push_promise *promise; 4668 ssize_t nw; 4669 int want_write; 4670 4671 assert(stream_is_pushing_promise(stream)); 4672 4673 promise = SLIST_FIRST(&stream->sm_promises); 4674 init_pp_reader(promise, &pp_reader); 4675 nw = stream_write(stream, &pp_reader); 4676 if (nw > 0) 4677 { 4678 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 4679 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 4680 if (promise->pp_write_state == PPWS_DONE) 4681 { 4682 /* Restore want_write flag */ 4683 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4684 if (want_write != stream->sm_saved_want_write) 4685 (void) lsquic_stream_wantwrite(stream, 4686 stream->sm_saved_want_write); 4687 } 4688 } 4689 else if (nw < 0) 4690 { 4691 LSQ_WARN("could not write push promise (wrapper)"); 4692 /* XXX What should happen if we hit an error? TODO */ 4693 } 4694} 4695 4696 4697/* Success means that the push promise has been placed on sm_promises list and 4698 * the stream now owns it. Failure means that the push promise should be 4699 * destroyed by the caller. 4700 * 4701 * A push promise is written immediately. If it cannot be written to packets 4702 * or buffered whole, the stream is marked as unable to push further promises. 4703 */ 4704int 4705lsquic_stream_push_promise (struct lsquic_stream *stream, 4706 struct push_promise *promise) 4707{ 4708 struct lsquic_reader pp_reader; 4709 unsigned bits, len; 4710 ssize_t nw; 4711 4712 assert(stream->sm_bflags & SMBF_IETF); 4713 assert(lsquic_stream_can_push(stream)); 4714 4715 bits = vint_val2bits(promise->pp_id); 4716 len = 1 << bits; 4717 promise->pp_write_state = 8 - len; 4718 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 4719 bits, 1 << bits); 4720 4721 if (!stream_activate_hq_frame(stream, 4722 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 4723 SHF_FIXED_SIZE, pp_reader_size(promise))) 4724 return -1; 4725 4726 stream->stream_flags |= STREAM_PUSHING; 4727 4728 init_pp_reader(promise, &pp_reader); 4729 nw = stream_write(stream, &pp_reader); 4730 if (nw > 0) 4731 { 4732 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 4733 ++promise->pp_refcnt; 4734 if (promise->pp_write_state == PPWS_DONE) 4735 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 4736 else 4737 { 4738 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 4739 ", disable further pushing", promise->pp_id, 4740 promise->pp_write_state, promise->pp_write_off); 4741 stream->stream_flags |= STREAM_NOPUSH; 4742 stream->sm_saved_want_write = 4743 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4744 stream_wantwrite(stream, 1); 4745 } 4746 return 0; 4747 } 4748 else 4749 { 4750 if (nw < 0) 4751 LSQ_WARN("failure writing push promise"); 4752 stream->stream_flags |= STREAM_NOPUSH; 4753 stream->stream_flags &= ~STREAM_PUSHING; 4754 return -1; 4755 } 4756} 4757 4758 4759int 4760lsquic_stream_verify_len (struct lsquic_stream *stream, 4761 unsigned long long cont_len) 4762{ 4763 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 4764 == (SMBF_IETF|SMBF_USE_HEADERS)) 4765 { 4766 stream->sm_cont_len = cont_len; 4767 stream->sm_bflags |= SMBF_VERIFY_CL; 4768 LSQ_DEBUG("will verify that incoming DATA frames have %llu bytes", 4769 cont_len); 4770 return 0; 4771 } 4772 else 4773 return -1; 4774} 4775