lsquic_stream.c revision 292abba1
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdarg.h> 10#include <stdlib.h> 11#include <string.h> 12#include <sys/queue.h> 13#include <stddef.h> 14 15#ifdef WIN32 16#include <malloc.h> 17#endif 18 19#include "fiu-local.h" 20 21#include "lsquic.h" 22 23#include "lsquic_int_types.h" 24#include "lsquic_packet_common.h" 25#include "lsquic_packet_in.h" 26#include "lsquic_malo.h" 27#include "lsquic_conn_flow.h" 28#include "lsquic_rtt.h" 29#include "lsquic_sfcw.h" 30#include "lsquic_varint.h" 31#include "lsquic_hq.h" 32#include "lsquic_hash.h" 33#include "lsquic_stream.h" 34#include "lsquic_conn_public.h" 35#include "lsquic_util.h" 36#include "lsquic_mm.h" 37#include "lsquic_headers_stream.h" 38#include "lsquic_conn.h" 39#include "lsquic_data_in_if.h" 40#include "lsquic_parse.h" 41#include "lsquic_packet_in.h" 42#include "lsquic_packet_out.h" 43#include "lsquic_engine_public.h" 44#include "lsquic_senhist.h" 45#include "lsquic_pacer.h" 46#include "lsquic_cubic.h" 47#include "lsquic_bw_sampler.h" 48#include "lsquic_minmax.h" 49#include "lsquic_bbr.h" 50#include "lsquic_adaptive_cc.h" 51#include "lsquic_send_ctl.h" 52#include "lsquic_headers.h" 53#include "lsquic_ev_log.h" 54#include "lsquic_enc_sess.h" 55#include "lsqpack.h" 56#include "lsquic_frab_list.h" 57#include "lsquic_http1x_if.h" 58#include "lsquic_qdec_hdl.h" 59#include "lsquic_qenc_hdl.h" 60#include "lsquic_byteswap.h" 61#include "lsquic_ietf.h" 62#include "lsquic_push_promise.h" 63#include "lsquic_hcso_writer.h" 64 65#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 66#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 67#define LSQUIC_LOG_STREAM_ID stream->id 68#include "lsquic_logger.h" 69 70#define MIN(a, b) ((a) < (b) ? (a) : (b)) 71 72static void 73drop_frames_in (lsquic_stream_t *stream); 74 75static void 76maybe_schedule_call_on_close (lsquic_stream_t *stream); 77 78static int 79stream_wantread (lsquic_stream_t *stream, int is_want); 80 81static int 82stream_wantwrite (lsquic_stream_t *stream, int is_want); 83 84enum stream_write_options 85{ 86 SWO_BUFFER = 1 << 0, /* Allow buffering in sm_buf */ 87}; 88 89 90static ssize_t 91stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t, 92 enum stream_write_options); 93 94static ssize_t 95save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 96 97static int 98stream_flush (lsquic_stream_t *stream); 99 100static int 101stream_flush_nocheck (lsquic_stream_t *stream); 102 103static void 104maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 105 106enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 107 108static enum swtp_status 109stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 110 111static enum swtp_status 112stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 113 114static enum swtp_status 115stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 116 117static size_t 118stream_write_avail_no_frames (struct lsquic_stream *); 119 120static size_t 121stream_write_avail_with_frames (struct lsquic_stream *); 122 123static size_t 124stream_write_avail_with_headers (struct lsquic_stream *); 125 126static int 127hq_filter_readable (struct lsquic_stream *stream); 128 129static void 130hq_decr_left (struct lsquic_stream *stream, size_t); 131 132static size_t 133hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 134 135static int 136stream_readable_non_http (struct lsquic_stream *stream); 137 138static int 139stream_readable_http_gquic (struct lsquic_stream *stream); 140 141static int 142stream_readable_http_ietf (struct lsquic_stream *stream); 143 144static ssize_t 145stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 146 147static size_t 148active_hq_frame_sizes (const struct lsquic_stream *); 149 150static void 151on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 152 153static void 154stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 155 156static size_t 157stream_hq_frame_size (const struct stream_hq_frame *); 158 159const struct stream_filter_if hq_stream_filter_if = 160{ 161 .sfi_readable = hq_filter_readable, 162 .sfi_filter_df = hq_filter_df, 163 .sfi_decr_left = hq_decr_left, 164}; 165 166 167#if LSQUIC_KEEP_STREAM_HISTORY 168/* These values are printable ASCII characters for ease of printing the 169 * whole history in a single line of a log message. 170 * 171 * The list of events is not exhaustive: only most interesting events 172 * are recorded. 173 */ 174enum stream_history_event 175{ 176 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 177 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 178 SHE_REACH_FIN = 'a', 179 SHE_EARLY_READ_STOP = 'A', 180 SHE_BLOCKED_OUT = 'b', 181 SHE_CREATED = 'C', 182 SHE_FRAME_IN = 'd', 183 SHE_FRAME_OUT = 'D', 184 SHE_RESET = 'e', 185 SHE_WINDOW_UPDATE = 'E', 186 SHE_FIN_IN = 'f', 187 SHE_FINISHED = 'F', 188 SHE_GOAWAY_IN = 'g', 189 SHE_USER_WRITE_HEADER = 'h', 190 SHE_HEADERS_IN = 'H', 191 SHE_IF_SWITCH = 'i', 192 SHE_ONCLOSE_SCHED = 'l', 193 SHE_ONCLOSE_CALL = 'L', 194 SHE_ONNEW = 'N', 195 SHE_SET_PRIO = 'p', 196 SHE_SHORT_WRITE = 'q', 197 SHE_USER_READ = 'r', 198 SHE_SHUTDOWN_READ = 'R', 199 SHE_RST_IN = 's', 200 SHE_STOP_SENDIG_IN = 'S', 201 SHE_RST_OUT = 't', 202 SHE_RST_ACKED = 'T', 203 SHE_FLUSH = 'u', 204 SHE_STOP_SENDIG_OUT = 'U', 205 SHE_USER_WRITE_DATA = 'w', 206 SHE_SHUTDOWN_WRITE = 'W', 207 SHE_CLOSE = 'X', 208 SHE_DELAY_SW = 'y', 209 SHE_FORCE_FINISH = 'Z', 210 SHE_WANTREAD_NO = '0', /* "YES" must be one more than "NO" */ 211 SHE_WANTREAD_YES = '1', 212 SHE_WANTWRITE_NO = '2', 213 SHE_WANTWRITE_YES = '3', 214}; 215 216static void 217sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 218{ 219 enum stream_history_event prev_event; 220 sm_hist_idx_t idx; 221 int plus; 222 223 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 224 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 225 idx = (idx - plus) & SM_HIST_IDX_MASK; 226 prev_event = stream->sm_hist_buf[idx]; 227 228 if (prev_event == sh_event && plus) 229 return; 230 231 if (prev_event == sh_event) 232 sh_event = SHE_PLUS; 233 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 234 235 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 236 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 237 stream->sm_hist_buf); 238} 239 240 241# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 242# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 243 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 244 LSQ_DEBUG("history: [%.*s]", \ 245 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 246 (stream)->sm_hist_buf); \ 247 } while (0) 248#else 249# define SM_HISTORY_APPEND(stream, event) 250# define SM_HISTORY_DUMP_REMAINING(stream) 251#endif 252 253 254static int 255stream_inside_callback (const lsquic_stream_t *stream) 256{ 257 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 258} 259 260 261/* This is an approximation. If data is written or read outside of the 262 * event loop, last_prog will be somewhat out of date, but it's close 263 * enough for our purposes. 264 */ 265static void 266maybe_update_last_progress (struct lsquic_stream *stream) 267{ 268 if (stream->conn_pub && !lsquic_stream_is_critical(stream)) 269 { 270 if (stream->conn_pub->last_prog != stream->conn_pub->last_tick) 271 LSQ_DEBUG("update last progress to %"PRIu64, 272 stream->conn_pub->last_tick); 273 stream->conn_pub->last_prog = stream->conn_pub->last_tick; 274#ifndef NDEBUG 275 stream->sm_last_prog = stream->conn_pub->last_tick; 276#endif 277 } 278} 279 280 281static void 282maybe_conn_to_tickable (lsquic_stream_t *stream) 283{ 284 if (!stream_inside_callback(stream)) 285 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 286 stream->conn_pub->lconn); 287} 288 289 290/* Here, "readable" means that the user is able to read from the stream. */ 291static void 292maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 293{ 294 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 295 { 296 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 297 stream->conn_pub->lconn); 298 } 299} 300 301 302/* Here, "writeable" means that data can be put into packets to be 303 * scheduled to be sent out. 304 * 305 * If `check_can_send' is false, it means that we do not need to check 306 * whether packets can be sent. This check was already performed when 307 * we packetized stream data. 308 */ 309static void 310maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 311 int check_can_send) 312{ 313 if (!stream_inside_callback(stream) && 314 (!check_can_send 315 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 316 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 317 { 318 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 319 stream->conn_pub->lconn); 320 } 321} 322 323 324static int 325stream_stalled (const lsquic_stream_t *stream) 326{ 327 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 328 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 329 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 330} 331 332 333static size_t 334stream_stream_frame_header_sz (const struct lsquic_stream *stream, 335 unsigned data_sz) 336{ 337 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 338 stream->id, stream->tosend_off, data_sz); 339} 340 341 342static size_t 343stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 344 unsigned data_sz) 345{ 346 return stream->conn_pub->lconn->cn_pf 347 ->pf_calc_crypto_frame_header_sz(stream->tosend_off, data_sz); 348} 349 350 351/* GQUIC-only function */ 352static int 353stream_is_hsk (const struct lsquic_stream *stream) 354{ 355 if (stream->sm_bflags & SMBF_IETF) 356 return 0; 357 else 358 return lsquic_stream_is_crypto(stream); 359} 360 361 362/* This function's only job is to change the allocated packet's header 363 * type to HETY_0RTT when stream frames are written before handshake 364 * is complete. 365 */ 366static struct lsquic_packet_out * 367stream_get_packet_for_stream_0rtt (struct lsquic_send_ctl *ctl, 368 unsigned need_at_least, const struct network_path *path, 369 const struct lsquic_stream *stream) 370{ 371 struct lsquic_packet_out *packet_out; 372 373 if (stream->conn_pub->lconn->cn_flags & LSCONN_HANDSHAKE_DONE) 374 { 375 LSQ_DEBUG("switch to regular \"get packet for stream\" function"); 376 /* Here we drop the "const" because this is a static function. 377 * Otherwise, we would not condone such sorcery. 378 */ 379 ((struct lsquic_stream *) stream)->sm_get_packet_for_stream 380 = lsquic_send_ctl_get_packet_for_stream; 381 return lsquic_send_ctl_get_packet_for_stream(ctl, need_at_least, 382 path, stream); 383 } 384 else 385 { 386 packet_out = lsquic_send_ctl_get_packet_for_stream(ctl, need_at_least, 387 path, stream); 388 if (packet_out) 389 packet_out->po_header_type = HETY_0RTT; 390 return packet_out; 391 } 392} 393 394 395static struct lsquic_stream * 396stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 397 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 398 enum stream_ctor_flags ctor_flags) 399{ 400 struct lsquic_stream *stream; 401 402 stream = calloc(1, sizeof(*stream)); 403 if (!stream) 404 return NULL; 405 406 if (ctor_flags & SCF_USE_DI_HASH) 407 stream->data_in = lsquic_data_in_hash_new(conn_pub, id, 0); 408 else 409 stream->data_in = lsquic_data_in_nocopy_new(conn_pub, id); 410 if (!stream->data_in) 411 { 412 free(stream); 413 return NULL; 414 } 415 416 stream->id = id; 417 stream->stream_if = stream_if; 418 stream->conn_pub = conn_pub; 419 stream->sm_onnew_arg = stream_if_ctx; 420 stream->sm_write_avail = stream_write_avail_no_frames; 421 422 STAILQ_INIT(&stream->sm_hq_frames); 423 424 stream->sm_bflags |= ctor_flags & ((1 << N_SMBF_FLAGS) - 1); 425 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 426 stream->sm_bflags |= SMBF_SERVER; 427 stream->sm_get_packet_for_stream = lsquic_send_ctl_get_packet_for_stream; 428 429 return stream; 430} 431 432 433lsquic_stream_t * 434lsquic_stream_new (lsquic_stream_id_t id, 435 struct lsquic_conn_public *conn_pub, 436 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 437 unsigned initial_window, uint64_t initial_send_off, 438 enum stream_ctor_flags ctor_flags) 439{ 440 lsquic_cfcw_t *cfcw; 441 lsquic_stream_t *stream; 442 443 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 444 ctor_flags); 445 if (!stream) 446 return NULL; 447 448 if (!initial_window) 449 initial_window = 16 * 1024; 450 451 if (ctor_flags & SCF_IETF) 452 { 453 cfcw = &conn_pub->cfcw; 454 stream->sm_bflags |= SMBF_CONN_LIMITED; 455 if (ctor_flags & SCF_HTTP) 456 { 457 stream->sm_write_avail = stream_write_avail_with_headers; 458 stream->sm_readable = stream_readable_http_ietf; 459 stream->sm_sfi = &hq_stream_filter_if; 460 } 461 else 462 stream->sm_readable = stream_readable_non_http; 463 if ((ctor_flags & (SCF_HTTP|SCF_HTTP_PRIO)) 464 == (SCF_HTTP|SCF_HTTP_PRIO)) 465 lsquic_stream_set_priority_internal(stream, LSQUIC_DEF_HTTP_URGENCY); 466 else 467 lsquic_stream_set_priority_internal(stream, 468 LSQUIC_STREAM_DEFAULT_PRIO); 469 stream->sm_write_to_packet = stream_write_to_packet_std; 470 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 471 } 472 else 473 { 474 if (ctor_flags & SCF_CRITICAL) 475 cfcw = NULL; 476 else 477 { 478 cfcw = &conn_pub->cfcw; 479 stream->sm_bflags |= SMBF_CONN_LIMITED; 480 lsquic_stream_set_priority_internal(stream, 481 LSQUIC_STREAM_DEFAULT_PRIO); 482 } 483 if (stream->sm_bflags & SMBF_USE_HEADERS) 484 stream->sm_readable = stream_readable_http_gquic; 485 else 486 stream->sm_readable = stream_readable_non_http; 487 if (ctor_flags & SCF_CRYPTO_FRAMES) 488 { 489 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 490 stream->sm_write_to_packet = stream_write_to_packet_crypto; 491 } 492 else 493 { 494 if (stream_is_hsk(stream)) 495 stream->sm_write_to_packet = stream_write_to_packet_hsk; 496 else 497 stream->sm_write_to_packet = stream_write_to_packet_std; 498 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 499 } 500 } 501 502 if ((stream->sm_bflags & (SMBF_SERVER|SMBF_IETF)) == SMBF_IETF 503 && !(conn_pub->lconn->cn_flags & LSCONN_HANDSHAKE_DONE)) 504 { 505 LSQ_DEBUG("use wrapper \"get packet for stream\" function"); 506 stream->sm_get_packet_for_stream = stream_get_packet_for_stream_0rtt; 507 } 508 509 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 510 stream->max_send_off = initial_send_off; 511 LSQ_DEBUG("created stream"); 512 SM_HISTORY_APPEND(stream, SHE_CREATED); 513 if (ctor_flags & SCF_CALL_ON_NEW) 514 lsquic_stream_call_on_new(stream); 515 return stream; 516} 517 518 519struct lsquic_stream * 520lsquic_stream_new_crypto (enum enc_level enc_level, 521 struct lsquic_conn_public *conn_pub, 522 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 523 enum stream_ctor_flags ctor_flags) 524{ 525 struct lsquic_stream *stream; 526 lsquic_stream_id_t stream_id; 527 528 assert(ctor_flags & SCF_CRITICAL); 529 530 fiu_return_on("stream/new_crypto", NULL); 531 532 stream_id = ~0ULL - enc_level; 533 stream = stream_new_common(stream_id, conn_pub, stream_if, 534 stream_if_ctx, ctor_flags); 535 if (!stream) 536 return NULL; 537 538 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 539 stream->sm_enc_level = enc_level; 540 /* We allow buffering of up to 16 KB of CRYPTO data (I guess we could 541 * make this configurable?). The window is opened (without sending 542 * MAX_STREAM_DATA) as CRYPTO data is consumed. If too much comes in 543 * at a time, we abort with TEC_CRYPTO_BUFFER_EXCEEDED. 544 */ 545 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 546 /* Don't limit ourselves from sending CRYPTO data. We assume that 547 * the underlying crypto library behaves in a sane manner. 548 */ 549 stream->max_send_off = UINT64_MAX; 550 LSQ_DEBUG("created crypto stream"); 551 SM_HISTORY_APPEND(stream, SHE_CREATED); 552 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 553 stream->sm_write_to_packet = stream_write_to_packet_crypto; 554 stream->sm_readable = stream_readable_non_http; 555 if (ctor_flags & SCF_CALL_ON_NEW) 556 lsquic_stream_call_on_new(stream); 557 return stream; 558} 559 560 561void 562lsquic_stream_call_on_new (lsquic_stream_t *stream) 563{ 564 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 565 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 566 { 567 LSQ_DEBUG("calling on_new_stream"); 568 SM_HISTORY_APPEND(stream, SHE_ONNEW); 569 stream->stream_flags |= STREAM_ONNEW_DONE; 570 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 571 stream); 572 } 573} 574 575 576static void 577decr_conn_cap (struct lsquic_stream *stream, size_t incr) 578{ 579 if (stream->sm_bflags & SMBF_CONN_LIMITED) 580 { 581 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 582 stream->conn_pub->conn_cap.cc_sent -= incr; 583 } 584} 585 586 587static void 588maybe_resize_stream_buffer (struct lsquic_stream *stream) 589{ 590 assert(0 == stream->sm_n_buffered); 591 592 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 593 { 594 free(stream->sm_buf); 595 stream->sm_buf = NULL; 596 stream->sm_n_allocated = 0; 597 } 598 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 599 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 600} 601 602 603static void 604drop_buffered_data (struct lsquic_stream *stream) 605{ 606 decr_conn_cap(stream, stream->sm_n_buffered); 607 stream->sm_n_buffered = 0; 608 maybe_resize_stream_buffer(stream); 609 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 610 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 611} 612 613 614void 615lsquic_stream_drop_hset_ref (struct lsquic_stream *stream) 616{ 617 if (stream->uh) 618 stream->uh->uh_hset = NULL; 619} 620 621 622static void 623destroy_uh (struct lsquic_stream *stream) 624{ 625 if (stream->uh) 626 { 627 if (stream->uh->uh_hset) 628 stream->conn_pub->enpub->enp_hsi_if 629 ->hsi_discard_header_set(stream->uh->uh_hset); 630 free(stream->uh); 631 stream->uh = NULL; 632 } 633} 634 635 636void 637lsquic_stream_destroy (lsquic_stream_t *stream) 638{ 639 struct push_promise *promise; 640 struct stream_hq_frame *shf; 641 642 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 643 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 644 STREAM_ONNEW_DONE) 645 { 646 stream->stream_flags |= STREAM_ONCLOSE_DONE; 647 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 648 stream->stream_if->on_close(stream, stream->st_ctx); 649 } 650 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 651 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 652 if (stream->sm_qflags & SMQF_WANT_READ) 653 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 654 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 655 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 656 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 657 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 658 if (stream->sm_qflags & SMQF_QPACK_DEC) 659 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 660 else if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 661 == (SMBF_IETF|SMBF_USE_HEADERS) 662 && !(stream->stream_flags & STREAM_FIN_REACHED)) 663 lsquic_qdh_cancel_stream_id(stream->conn_pub->u.ietf.qdh, stream->id); 664 drop_buffered_data(stream); 665 lsquic_sfcw_consume_rem(&stream->fc); 666 drop_frames_in(stream); 667 if (stream->push_req) 668 { 669 if (stream->push_req->uh_hset) 670 stream->conn_pub->enpub->enp_hsi_if 671 ->hsi_discard_header_set(stream->push_req->uh_hset); 672 free(stream->push_req); 673 } 674 while ((promise = SLIST_FIRST(&stream->sm_promises))) 675 { 676 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 677 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 678 } 679 if (stream->sm_promise) 680 { 681 assert(stream->sm_promise->pp_pushed_stream == stream); 682 stream->sm_promise->pp_pushed_stream = NULL; 683 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 684 } 685 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 686 stream_hq_frame_put(stream, shf); 687 destroy_uh(stream); 688 free(stream->sm_buf); 689 free(stream->sm_header_block); 690 LSQ_DEBUG("destroyed stream"); 691 SM_HISTORY_DUMP_REMAINING(stream); 692 free(stream); 693} 694 695 696static int 697stream_is_finished (struct lsquic_stream *stream) 698{ 699 return lsquic_stream_is_closed(stream) 700 && (stream->sm_bflags & SMBF_DELAY_ONCLOSE ? 701 /* Need a stricter check when on_close() is delayed: */ 702 !lsquic_stream_has_unacked_data(stream) : 703 /* n_unacked checks that no outgoing packets that reference this 704 * stream are outstanding: 705 */ 706 0 == stream->n_unacked) 707 && 0 == (stream->sm_qflags & ( 708 /* This checks that no packets that reference this stream will 709 * become outstanding: 710 */ 711 SMQF_SEND_RST 712 /* Can't finish stream until all "self" flags are unset: */ 713 | SMQF_SELF_FLAGS)) 714 && ((stream->stream_flags & STREAM_FORCE_FINISH) 715 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 716} 717 718 719/* This is an internal function */ 720void 721lsquic_stream_force_finish (struct lsquic_stream *stream) 722{ 723 LSQ_DEBUG("stream is now finished"); 724 SM_HISTORY_APPEND(stream, SHE_FINISHED); 725 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 726 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 727 next_service_stream); 728 stream->sm_qflags |= SMQF_FREE_STREAM; 729 stream->stream_flags |= STREAM_FINISHED; 730} 731 732 733static void 734maybe_finish_stream (lsquic_stream_t *stream) 735{ 736 if (0 == (stream->stream_flags & STREAM_FINISHED) && 737 stream_is_finished(stream)) 738 lsquic_stream_force_finish(stream); 739} 740 741 742static void 743maybe_schedule_call_on_close (lsquic_stream_t *stream) 744{ 745 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 746 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 747 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 748 && (!(stream->sm_bflags & SMBF_DELAY_ONCLOSE) 749 || !lsquic_stream_has_unacked_data(stream)) 750 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 751 { 752 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 753 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 754 next_service_stream); 755 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 756 LSQ_DEBUG("scheduled calling on_close"); 757 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 758 } 759} 760 761 762void 763lsquic_stream_call_on_close (lsquic_stream_t *stream) 764{ 765 assert(stream->stream_flags & STREAM_ONNEW_DONE); 766 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 767 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 768 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 769 next_service_stream); 770 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 771 { 772 LSQ_DEBUG("calling on_close"); 773 stream->stream_flags |= STREAM_ONCLOSE_DONE; 774 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 775 stream->stream_if->on_close(stream, stream->st_ctx); 776 } 777 else 778 assert(0); 779} 780 781 782static int 783stream_has_frame_at_read_offset (struct lsquic_stream *stream) 784{ 785 if (!((stream->stream_flags & STREAM_CACHED_FRAME) 786 && stream->read_offset == stream->sm_last_frame_off)) 787 { 788 stream->sm_has_frame = stream->data_in->di_if->di_get_frame( 789 stream->data_in, stream->read_offset) != NULL; 790 stream->sm_last_frame_off = stream->read_offset; 791 stream->stream_flags |= STREAM_CACHED_FRAME; 792 } 793 return stream->sm_has_frame; 794} 795 796 797static int 798stream_readable_non_http (struct lsquic_stream *stream) 799{ 800 return stream_has_frame_at_read_offset(stream); 801} 802 803 804static int 805stream_readable_http_gquic (struct lsquic_stream *stream) 806{ 807 return (stream->stream_flags & STREAM_HAVE_UH) 808 && (stream->uh 809 || stream_has_frame_at_read_offset(stream)); 810} 811 812 813static int 814stream_readable_http_ietf (struct lsquic_stream *stream) 815{ 816 return 817 /* If we have read the header set and the header set has not yet 818 * been read, the stream is readable. 819 */ 820 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 821 || 822 /* Alternatively, run the filter and check for payload availability. */ 823 (stream->sm_sfi->sfi_readable(stream) 824 && (/* Running the filter may result in hitting FIN: */ 825 (stream->stream_flags & STREAM_FIN_REACHED) 826 || stream_has_frame_at_read_offset(stream))); 827} 828 829 830static int 831maybe_switch_data_in (struct lsquic_stream *stream) 832{ 833 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 834 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 835 { 836 stream->data_in = stream->data_in->di_if->di_switch_impl( 837 stream->data_in, stream->read_offset); 838 if (!stream->data_in) 839 { 840 stream->data_in = lsquic_data_in_error_new(); 841 return -1; 842 } 843 } 844 845 return 0; 846} 847 848 849/* Drain and discard any incoming data */ 850static int 851stream_readable_discard (struct lsquic_stream *stream) 852{ 853 struct data_frame *data_frame; 854 uint64_t toread; 855 int fin; 856 857 while ((data_frame = stream->data_in->di_if->di_get_frame( 858 stream->data_in, stream->read_offset))) 859 { 860 fin = data_frame->df_fin; 861 toread = data_frame->df_size - data_frame->df_read_off; 862 stream->read_offset += toread; 863 data_frame->df_read_off = data_frame->df_size; 864 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 865 if (fin) 866 break; 867 } 868 869 (void) maybe_switch_data_in(stream); 870 871 return 0; /* Never readable */ 872} 873 874 875static int 876stream_is_read_reset (const struct lsquic_stream *stream) 877{ 878 if (stream->sm_bflags & SMBF_IETF) 879 return stream->stream_flags & STREAM_RST_RECVD; 880 else 881 return (stream->stream_flags & (STREAM_RST_RECVD|STREAM_RST_SENT)) 882 || (stream->sm_qflags & SMQF_SEND_RST); 883} 884 885 886int 887lsquic_stream_readable (struct lsquic_stream *stream) 888{ 889 /* A stream is readable if one of the following is true: */ 890 return 891 /* - It is already finished: in that case, lsquic_stream_read() will 892 * return 0. 893 */ 894 (stream->stream_flags & STREAM_FIN_REACHED) 895 /* - The stream is reset, by either side. In this case, 896 * lsquic_stream_read() will return -1 (we want the user to be 897 * able to collect the error). 898 */ 899 || stream_is_read_reset(stream) 900 /* Type-dependent readability check: */ 901 || stream->sm_readable(stream); 902 ; 903} 904 905 906/* Return true if write end of the stream has been reset. 907 * Note that the logic for gQUIC is the same for write and read resets. 908 */ 909int 910lsquic_stream_is_write_reset (const struct lsquic_stream *stream) 911{ 912 if (stream->sm_bflags & SMBF_IETF) 913 return stream->stream_flags & STREAM_SS_RECVD; 914 else 915 return (stream->stream_flags & (STREAM_RST_RECVD|STREAM_RST_SENT)) 916 || (stream->sm_qflags & SMQF_SEND_RST); 917} 918 919 920static int 921stream_writeable (struct lsquic_stream *stream) 922{ 923 /* A stream is writeable if one of the following is true: */ 924 return 925 /* - The stream is reset, by either side. In this case, 926 * lsquic_stream_write() will return -1 (we want the user to be 927 * able to collect the error). 928 */ 929 lsquic_stream_is_write_reset(stream) 930 /* - Data can be written to stream: */ 931 || lsquic_stream_write_avail(stream) 932 ; 933} 934 935 936static size_t 937stream_write_avail_no_frames (struct lsquic_stream *stream) 938{ 939 uint64_t stream_avail, conn_avail; 940 941 stream_avail = stream->max_send_off - stream->tosend_off 942 - stream->sm_n_buffered; 943 944 if (stream->sm_bflags & SMBF_CONN_LIMITED) 945 { 946 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 947 if (conn_avail < stream_avail) 948 stream_avail = conn_avail; 949 } 950 951 return stream_avail; 952} 953 954 955static size_t 956stream_write_avail_with_frames (struct lsquic_stream *stream) 957{ 958 uint64_t stream_avail, conn_avail; 959 const struct stream_hq_frame *shf; 960 size_t size; 961 962 stream_avail = stream->max_send_off - stream->tosend_off 963 - stream->sm_n_buffered; 964 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 965 if (!(shf->shf_flags & SHF_WRITTEN)) 966 { 967 size = stream_hq_frame_size(shf); 968 assert(size <= stream_avail); 969 stream_avail -= size; 970 } 971 972 if (stream->sm_bflags & SMBF_CONN_LIMITED) 973 { 974 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 975 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 976 if (!(shf->shf_flags & SHF_CC_PAID)) 977 { 978 size = stream_hq_frame_size(shf); 979 if (size < conn_avail) 980 conn_avail -= size; 981 else 982 return 0; 983 } 984 if (conn_avail < stream_avail) 985 stream_avail = conn_avail; 986 } 987 988 if (stream_avail >= 3 /* Smallest new frame */) 989 return stream_avail; 990 else 991 return 0; 992} 993 994 995static int 996stream_is_pushing_promise (const struct lsquic_stream *stream) 997{ 998 return (stream->stream_flags & STREAM_PUSHING) 999 && SLIST_FIRST(&stream->sm_promises) 1000 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 1001 ; 1002} 1003 1004 1005/* To prevent deadlocks, ensure that when headers are sent, the bytes 1006 * sent on the encoder stream are written first. 1007 * 1008 * XXX If the encoder is set up in non-risking mode, it is perfectly 1009 * fine to send the header block first. TODO: update the logic to 1010 * reflect this. There should be two sending behaviors: risk and non-risk. 1011 * For now, we assume risk for everything to be on the conservative side. 1012 */ 1013static size_t 1014stream_write_avail_with_headers (struct lsquic_stream *stream) 1015{ 1016 if (stream->stream_flags & STREAM_PUSHING) 1017 return stream_write_avail_with_frames(stream); 1018 1019 switch (stream->sm_send_headers_state) 1020 { 1021 case SSHS_BEGIN: 1022 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 1023 case SSHS_ENC_SENDING: 1024 if (stream->sm_hb_compl > 1025 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 1026 return 0; 1027 LSQ_DEBUG("encoder stream bytes have all been sent"); 1028 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 1029 /* fall-through */ 1030 default: 1031 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 1032 return stream_write_avail_with_frames(stream); 1033 } 1034} 1035 1036 1037size_t 1038lsquic_stream_write_avail (struct lsquic_stream *stream) 1039{ 1040 return stream->sm_write_avail(stream); 1041} 1042 1043 1044int 1045lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 1046{ 1047 struct lsquic_conn *lconn; 1048 1049 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 1050 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 1051 { 1052 if (stream->sm_bflags & SMBF_IETF) 1053 { 1054 lconn = stream->conn_pub->lconn; 1055 if (lsquic_stream_is_crypto(stream)) 1056 lconn->cn_if->ci_abort_error(lconn, 0, 1057 TEC_CRYPTO_BUFFER_EXCEEDED, 1058 "crypto buffer exceeded on in crypto level %"PRIu64, 1059 crypto_level(stream)); 1060 else 1061 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 1062 "flow control violation on stream %"PRIu64, stream->id); 1063 } 1064 return -1; 1065 } 1066 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1067 { 1068 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1069 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1070 next_send_stream); 1071 stream->sm_qflags |= SMQF_SEND_WUF; 1072 } 1073 return 0; 1074} 1075 1076 1077int 1078lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 1079{ 1080 uint64_t max_off; 1081 int got_next_offset, rv, free_frame; 1082 enum ins_frame ins_frame; 1083 struct lsquic_conn *lconn; 1084 1085 assert(frame->packet_in); 1086 1087 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 1088 LSQ_DEBUG("received stream frame, offset %"PRIu64", len %u; " 1089 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 1090 1091 if ((stream->sm_bflags & SMBF_USE_HEADERS) 1092 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 1093 { 1094 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 1095 lsquic_malo_put(frame); 1096 return -1; 1097 } 1098 1099 if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF) 1100 && (stream->stream_flags & STREAM_FIN_RECVD) 1101 && stream->sm_fin_off != DF_END(frame)) 1102 { 1103 lconn = stream->conn_pub->lconn; 1104 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1105 "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does " 1106 "not match previous final size %"PRIu64, DF_END(frame), 1107 stream->id, stream->sm_fin_off); 1108 return -1; 1109 } 1110 1111 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 1112 insert_frame: 1113 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 1114 if (INS_FRAME_OK == ins_frame) 1115 { 1116 /* Update maximum offset in the flow controller and check for flow 1117 * control violation: 1118 */ 1119 rv = -1; 1120 free_frame = !stream->data_in->di_if->di_own_on_ok; 1121 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 1122 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 1123 goto end_ok; 1124 if (frame->data_frame.df_fin) 1125 { 1126 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 1127 stream->stream_flags |= STREAM_FIN_RECVD; 1128 stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF; 1129 stream->sm_fin_off = DF_END(frame); 1130 maybe_finish_stream(stream); 1131 } 1132 if (0 != maybe_switch_data_in(stream)) 1133 goto end_ok; 1134 if (got_next_offset) 1135 /* Checking the offset saves di_get_frame() call */ 1136 maybe_conn_to_tickable_if_readable(stream); 1137 rv = 0; 1138 end_ok: 1139 if (free_frame) 1140 lsquic_malo_put(frame); 1141 stream->stream_flags &= ~STREAM_CACHED_FRAME; 1142 return rv; 1143 } 1144 else if (INS_FRAME_DUP == ins_frame) 1145 { 1146 return 0; 1147 } 1148 else if (INS_FRAME_OVERLAP == ins_frame) 1149 { 1150 LSQ_DEBUG("overlap: switching DATA IN implementation"); 1151 stream->data_in = stream->data_in->di_if->di_switch_impl( 1152 stream->data_in, stream->read_offset); 1153 if (stream->data_in) 1154 goto insert_frame; 1155 stream->data_in = lsquic_data_in_error_new(); 1156 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 1157 lsquic_malo_put(frame); 1158 return -1; 1159 } 1160 else 1161 { 1162 assert(INS_FRAME_ERR == ins_frame); 1163 return -1; 1164 } 1165} 1166 1167 1168static void 1169drop_frames_in (lsquic_stream_t *stream) 1170{ 1171 if (stream->data_in) 1172 { 1173 stream->data_in->di_if->di_destroy(stream->data_in); 1174 /* To avoid checking whether `data_in` is set, just set to the error 1175 * data-in stream. It does the right thing after incoming data is 1176 * dropped. 1177 */ 1178 stream->data_in = lsquic_data_in_error_new(); 1179 stream->stream_flags &= ~STREAM_CACHED_FRAME; 1180 } 1181} 1182 1183 1184static void 1185maybe_elide_stream_frames (struct lsquic_stream *stream) 1186{ 1187 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 1188 { 1189 if (stream->n_unacked) 1190 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 1191 stream->id); 1192 stream->stream_flags |= STREAM_FRAMES_ELIDED; 1193 } 1194} 1195 1196 1197int 1198lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 1199 uint64_t error_code) 1200{ 1201 struct lsquic_conn *lconn; 1202 1203 if ((stream->sm_bflags & SMBF_IETF) 1204 && (stream->stream_flags & STREAM_FIN_RECVD) 1205 && stream->sm_fin_off != offset) 1206 { 1207 lconn = stream->conn_pub->lconn; 1208 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1209 "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") " 1210 "does not match previous final size %"PRIu64, offset, 1211 stream->id, stream->sm_fin_off); 1212 return -1; 1213 } 1214 1215 if (stream->stream_flags & STREAM_RST_RECVD) 1216 { 1217 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 1218 return 0; 1219 } 1220 1221 SM_HISTORY_APPEND(stream, SHE_RST_IN); 1222 /* This flag must always be set, even if we are "ignoring" it: it is 1223 * used by elision code. 1224 */ 1225 stream->stream_flags |= STREAM_RST_RECVD; 1226 1227 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 1228 { 1229 LSQ_INFO("RST_STREAM invalid: its offset %"PRIu64" is " 1230 "smaller than that of byte following the last byte we have seen: " 1231 "%"PRIu64, offset, 1232 lsquic_sfcw_get_max_recv_off(&stream->fc)); 1233 return -1; 1234 } 1235 1236 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 1237 { 1238 LSQ_INFO("RST_STREAM invalid: its offset %"PRIu64 1239 " violates flow control", offset); 1240 return -1; 1241 } 1242 1243 if (stream->stream_if->on_reset 1244 && !(stream->stream_flags & STREAM_ONCLOSE_DONE)) 1245 { 1246 if (stream->sm_bflags & SMBF_IETF) 1247 { 1248 if (!(stream->sm_dflags & SMDF_ONRESET0)) 1249 { 1250 stream->stream_if->on_reset(stream, stream->st_ctx, 0); 1251 stream->sm_dflags |= SMDF_ONRESET0; 1252 } 1253 } 1254 else 1255 { 1256 if ((stream->sm_dflags & (SMDF_ONRESET0|SMDF_ONRESET1)) 1257 != (SMDF_ONRESET0|SMDF_ONRESET1)) 1258 { 1259 stream->stream_if->on_reset(stream, stream->st_ctx, 2); 1260 stream->sm_dflags |= SMDF_ONRESET0|SMDF_ONRESET1; 1261 } 1262 } 1263 } 1264 1265 /* Let user collect error: */ 1266 maybe_conn_to_tickable_if_readable(stream); 1267 1268 lsquic_sfcw_consume_rem(&stream->fc); 1269 drop_frames_in(stream); 1270 1271 if (!(stream->sm_bflags & SMBF_IETF)) 1272 { 1273 drop_buffered_data(stream); 1274 maybe_elide_stream_frames(stream); 1275 } 1276 1277 if (stream->sm_qflags & SMQF_WAIT_FIN_OFF) 1278 { 1279 stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF; 1280 LSQ_DEBUG("final offset is now known: %"PRIu64, offset); 1281 } 1282 1283 if (!(stream->stream_flags & 1284 (STREAM_RST_SENT|STREAM_SS_SENT|STREAM_FIN_SENT)) 1285 && !(stream->sm_bflags & SMBF_IETF) 1286 && !(stream->sm_qflags & SMQF_SEND_RST)) 1287 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1288 1289 stream->stream_flags |= STREAM_RST_RECVD; 1290 1291 maybe_finish_stream(stream); 1292 maybe_schedule_call_on_close(stream); 1293 1294 return 0; 1295} 1296 1297 1298void 1299lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1300 uint64_t error_code) 1301{ 1302 if (stream->stream_flags & STREAM_SS_RECVD) 1303 { 1304 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1305 return; 1306 } 1307 1308 SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_IN); 1309 stream->stream_flags |= STREAM_SS_RECVD; 1310 1311 if (stream->stream_if->on_reset && !(stream->sm_dflags & SMDF_ONRESET1) 1312 && !(stream->stream_flags & STREAM_ONCLOSE_DONE)) 1313 { 1314 stream->stream_if->on_reset(stream, stream->st_ctx, 1); 1315 stream->sm_dflags |= SMDF_ONRESET1; 1316 } 1317 1318 /* Let user collect error: */ 1319 maybe_conn_to_tickable_if_writeable(stream, 0); 1320 1321 lsquic_sfcw_consume_rem(&stream->fc); 1322 drop_buffered_data(stream); 1323 maybe_elide_stream_frames(stream); 1324 1325 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1326 && !(stream->sm_qflags & SMQF_SEND_RST)) 1327 lsquic_stream_reset_ext(stream, 0, 0); 1328 1329 maybe_finish_stream(stream); 1330 maybe_schedule_call_on_close(stream); 1331} 1332 1333 1334uint64_t 1335lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1336{ 1337 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1338} 1339 1340 1341void 1342lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1343{ 1344 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1345 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1346 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1347 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1348 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1349} 1350 1351 1352uint64_t 1353lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1354{ 1355 assert(stream->sm_qflags & SMQF_SEND_WUF); 1356 stream->sm_qflags &= ~SMQF_SEND_WUF; 1357 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1358 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1359 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1360} 1361 1362 1363void 1364lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1365{ 1366 uint64_t last_off; 1367 1368 if (stream->sm_last_recv_off) 1369 last_off = stream->sm_last_recv_off; 1370 else 1371 /* This gets advertized in transport parameters */ 1372 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1373 1374 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1375 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1376 1377 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1378 { 1379 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1380 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1381 next_send_stream); 1382 stream->sm_qflags |= SMQF_SEND_WUF; 1383 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1384 } 1385 else if (stream->sm_qflags & SMQF_SEND_WUF) 1386 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1387 else if (stream->sm_last_recv_off) 1388 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1389 "packetized or sent", stream->sm_last_recv_off); 1390 else 1391 LSQ_INFO("Peer should have receive transport param limit " 1392 "of %"PRIu64"; odd.", last_off); 1393} 1394 1395 1396/* GQUIC's BLOCKED frame does not have an offset */ 1397void 1398lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1399{ 1400 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1401 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1402 { 1403 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1404 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1405 next_send_stream); 1406 stream->sm_qflags |= SMQF_SEND_WUF; 1407 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1408 } 1409 else 1410 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1411} 1412 1413 1414void 1415lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1416{ 1417 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1418 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1419 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1420 stream->stream_flags |= STREAM_BLOCKED_SENT; 1421 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1422 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1423} 1424 1425 1426void 1427lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1428{ 1429 assert(stream->sm_qflags & SMQF_SEND_RST); 1430 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1431 stream->sm_qflags &= ~SMQF_SEND_RST; 1432 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1433 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1434 stream->stream_flags |= STREAM_RST_SENT; 1435 maybe_finish_stream(stream); 1436} 1437 1438 1439static size_t 1440read_uh (struct lsquic_stream *stream, 1441 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1442{ 1443 struct http1x_headers *const h1h = stream->uh->uh_hset; 1444 size_t nread; 1445 1446 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1447 h1h->h1h_size - h1h->h1h_off, 1448 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1449 h1h->h1h_off += nread; 1450 if (h1h->h1h_off == h1h->h1h_size) 1451 { 1452 LSQ_DEBUG("read all uncompressed headers"); 1453 destroy_uh(stream); 1454 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1455 { 1456 stream->stream_flags |= STREAM_FIN_REACHED; 1457 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1458 } 1459 } 1460 return nread; 1461} 1462 1463 1464static void 1465verify_cl_on_fin (struct lsquic_stream *stream) 1466{ 1467 struct lsquic_conn *lconn; 1468 1469 /* The rules in RFC7230, Section 3.3.2 are a bit too intricate. We take 1470 * a simple approach and verify content-length only when there was any 1471 * payload at all. 1472 */ 1473 if (stream->sm_data_in != 0 && stream->sm_cont_len != stream->sm_data_in) 1474 { 1475 lconn = stream->conn_pub->lconn; 1476 lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR, 1477 "number of bytes in DATA frames of stream %"PRIu64" is %llu, " 1478 "while content-length specified of %llu", stream->id, 1479 stream->sm_data_in, stream->sm_cont_len); 1480 } 1481} 1482 1483 1484static void 1485stream_consumed_bytes (struct lsquic_stream *stream) 1486{ 1487 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1488 if (lsquic_sfcw_fc_offsets_changed(&stream->fc) 1489 /* We advance crypto streams' offsets (to control amount of 1490 * buffering we allow), but do not send MAX_STREAM_DATA frames. 1491 */ 1492 && !((stream->sm_bflags & (SMBF_IETF|SMBF_CRYPTO)) 1493 == (SMBF_IETF|SMBF_CRYPTO))) 1494 { 1495 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1496 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1497 next_send_stream); 1498 stream->sm_qflags |= SMQF_SEND_WUF; 1499 maybe_conn_to_tickable_if_writeable(stream, 1); 1500 } 1501} 1502 1503 1504static ssize_t 1505read_data_frames (struct lsquic_stream *stream, int do_filtering, 1506 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1507{ 1508 struct data_frame *data_frame; 1509 size_t nread, toread, total_nread; 1510 int short_read, processed_frames; 1511 1512 processed_frames = 0; 1513 total_nread = 0; 1514 1515 while ((data_frame = stream->data_in->di_if->di_get_frame( 1516 stream->data_in, stream->read_offset))) 1517 { 1518 1519 ++processed_frames; 1520 1521 do 1522 { 1523 if (do_filtering && stream->sm_sfi) 1524 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1525 else 1526 toread = data_frame->df_size - data_frame->df_read_off; 1527 1528 if (toread || data_frame->df_fin) 1529 { 1530 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1531 toread, data_frame->df_fin); 1532 if (do_filtering && stream->sm_sfi) 1533 stream->sm_sfi->sfi_decr_left(stream, nread); 1534 data_frame->df_read_off += nread; 1535 stream->read_offset += nread; 1536 total_nread += nread; 1537 short_read = nread < toread; 1538 } 1539 else 1540 short_read = 0; 1541 1542 if (data_frame->df_read_off == data_frame->df_size) 1543 { 1544 const int fin = data_frame->df_fin; 1545 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1546 data_frame = NULL; 1547 if (0 != maybe_switch_data_in(stream)) 1548 return -1; 1549 if (fin) 1550 { 1551 stream->stream_flags |= STREAM_FIN_REACHED; 1552 if (stream->sm_bflags & SMBF_VERIFY_CL) 1553 verify_cl_on_fin(stream); 1554 goto end_while; 1555 } 1556 } 1557 else if (short_read) 1558 goto end_while; 1559 } 1560 while (data_frame); 1561 } 1562 end_while: 1563 1564 if (processed_frames) 1565 stream_consumed_bytes(stream); 1566 1567 return total_nread; 1568} 1569 1570 1571static ssize_t 1572stream_readf (struct lsquic_stream *stream, 1573 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1574{ 1575 size_t total_nread; 1576 ssize_t nread; 1577 1578 total_nread = 0; 1579 1580 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1581 == (SMBF_USE_HEADERS|SMBF_IETF) 1582 && !(stream->stream_flags & STREAM_HAVE_UH) 1583 && !stream->uh) 1584 { 1585 if (stream->sm_readable(stream)) 1586 { 1587 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1588 { 1589 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1590 errno = EBADMSG; 1591 return -1; 1592 } 1593 assert(stream->uh); 1594 } 1595 else 1596 { 1597 errno = EWOULDBLOCK; 1598 return -1; 1599 } 1600 } 1601 1602 if (stream->uh) 1603 { 1604 if (stream->uh->uh_flags & UH_H1H) 1605 { 1606 total_nread += read_uh(stream, readf, ctx); 1607 if (stream->uh) 1608 return total_nread; 1609 } 1610 else 1611 { 1612 LSQ_INFO("header set not claimed: cannot read from stream"); 1613 return -1; 1614 } 1615 } 1616 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1617 && !(stream->stream_flags & STREAM_HAVE_UH)) 1618 { 1619 LSQ_DEBUG("cannot read: headers not available"); 1620 errno = EWOULDBLOCK; 1621 return -1; 1622 } 1623 1624 nread = read_data_frames(stream, 1, readf, ctx); 1625 if (nread < 0) 1626 return nread; 1627 total_nread += (size_t) nread; 1628 1629 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64", reached fin: %d", 1630 __func__, total_nread, stream->read_offset, 1631 !!(stream->stream_flags & STREAM_FIN_REACHED)); 1632 1633 if (total_nread) 1634 return total_nread; 1635 else if (stream->stream_flags & STREAM_FIN_REACHED) 1636 return 0; 1637 else 1638 { 1639 errno = EWOULDBLOCK; 1640 return -1; 1641 } 1642} 1643 1644 1645/* This function returns 0 when EOF is reached. 1646 */ 1647ssize_t 1648lsquic_stream_readf (struct lsquic_stream *stream, 1649 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1650{ 1651 ssize_t nread; 1652 1653 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1654 1655 if (stream_is_read_reset(stream)) 1656 { 1657 if (stream->stream_flags & STREAM_RST_RECVD) 1658 stream->stream_flags |= STREAM_RST_READ; 1659 errno = ECONNRESET; 1660 return -1; 1661 } 1662 if (stream->stream_flags & STREAM_U_READ_DONE) 1663 { 1664 errno = EBADF; 1665 return -1; 1666 } 1667 if (stream->stream_flags & STREAM_FIN_REACHED) 1668 { 1669 if (stream->sm_bflags & SMBF_USE_HEADERS) 1670 { 1671 if ((stream->stream_flags & STREAM_HAVE_UH) && !stream->uh) 1672 return 0; 1673 } 1674 else 1675 return 0; 1676 } 1677 1678 nread = stream_readf(stream, readf, ctx); 1679 if (nread >= 0) 1680 maybe_update_last_progress(stream); 1681 1682 return nread; 1683} 1684 1685 1686struct readv_ctx 1687{ 1688 const struct iovec *iov; 1689 const struct iovec *const end; 1690 unsigned char *p; 1691}; 1692 1693 1694static size_t 1695readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1696{ 1697 struct readv_ctx *const ctx = ctx_p; 1698 const unsigned char *const end = buf + len; 1699 size_t ntocopy; 1700 1701 while (ctx->iov < ctx->end && buf < end) 1702 { 1703 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1704 - ctx->p; 1705 if (ntocopy > (size_t) (end - buf)) 1706 ntocopy = end - buf; 1707 memcpy(ctx->p, buf, ntocopy); 1708 ctx->p += ntocopy; 1709 buf += ntocopy; 1710 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1711 { 1712 do 1713 ++ctx->iov; 1714 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1715 if (ctx->iov < ctx->end) 1716 ctx->p = ctx->iov->iov_base; 1717 else 1718 break; 1719 } 1720 } 1721 1722 return len - (end - buf); 1723} 1724 1725 1726ssize_t 1727lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1728 int iovcnt) 1729{ 1730 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1731 return lsquic_stream_readf(stream, readv_f, &ctx); 1732} 1733 1734 1735ssize_t 1736lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1737{ 1738 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1739 return lsquic_stream_readv(stream, &iov, 1); 1740} 1741 1742 1743void 1744lsquic_stream_ss_frame_sent (struct lsquic_stream *stream) 1745{ 1746 assert(stream->sm_qflags & SMQF_SEND_STOP_SENDING); 1747 SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_OUT); 1748 stream->sm_qflags &= ~SMQF_SEND_STOP_SENDING; 1749 stream->stream_flags |= STREAM_SS_SENT; 1750 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1751 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1752} 1753 1754 1755static void 1756handle_early_read_shutdown_ietf (struct lsquic_stream *stream) 1757{ 1758 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1759 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1760 next_send_stream); 1761 stream->sm_qflags |= SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF; 1762} 1763 1764 1765static void 1766handle_early_read_shutdown_gquic (struct lsquic_stream *stream) 1767{ 1768 if (!(stream->stream_flags & STREAM_RST_SENT)) 1769 { 1770 lsquic_stream_reset_ext(stream, 7 /* QUIC_STREAM_CANCELLED */, 0); 1771 stream->sm_qflags |= SMQF_WAIT_FIN_OFF; 1772 } 1773} 1774 1775 1776static void 1777handle_early_read_shutdown (struct lsquic_stream *stream) 1778{ 1779 if (stream->sm_bflags & SMBF_IETF) 1780 handle_early_read_shutdown_ietf(stream); 1781 else 1782 handle_early_read_shutdown_gquic(stream); 1783} 1784 1785 1786static void 1787stream_shutdown_read (lsquic_stream_t *stream) 1788{ 1789 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1790 { 1791 if (!(stream->stream_flags & STREAM_FIN_REACHED)) 1792 { 1793 LSQ_DEBUG("read shut down before reading FIN. (FIN received: %d)", 1794 !!(stream->stream_flags & STREAM_FIN_RECVD)); 1795 SM_HISTORY_APPEND(stream, SHE_EARLY_READ_STOP); 1796 if (!(stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))) 1797 handle_early_read_shutdown(stream); 1798 } 1799 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1800 stream->stream_flags |= STREAM_U_READ_DONE; 1801 stream->sm_readable = stream_readable_discard; 1802 stream_wantread(stream, 0); 1803 maybe_finish_stream(stream); 1804 } 1805} 1806 1807 1808static int 1809stream_is_incoming_unidir (const struct lsquic_stream *stream) 1810{ 1811 enum stream_id_type sit; 1812 1813 if (stream->sm_bflags & SMBF_IETF) 1814 { 1815 sit = stream->id & SIT_MASK; 1816 if (stream->sm_bflags & SMBF_SERVER) 1817 return sit == SIT_UNI_CLIENT; 1818 else 1819 return sit == SIT_UNI_SERVER; 1820 } 1821 else 1822 return 0; 1823} 1824 1825 1826static void 1827stream_shutdown_write (lsquic_stream_t *stream) 1828{ 1829 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1830 return; 1831 1832 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1833 stream->stream_flags |= STREAM_U_WRITE_DONE; 1834 stream_wantwrite(stream, 0); 1835 1836 /* Don't bother to check whether there is anything else to write if 1837 * the flags indicate that nothing else should be written. 1838 */ 1839 if (!(stream->sm_bflags & SMBF_CRYPTO) 1840 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1841 && !stream_is_incoming_unidir(stream) 1842 /* In gQUIC, receiving a RESET means "stop sending" */ 1843 && !(!(stream->sm_bflags & SMBF_IETF) 1844 && (stream->stream_flags & STREAM_RST_RECVD))) 1845 { 1846 if ((stream->sm_bflags & SMBF_USE_HEADERS) 1847 && !(stream->stream_flags & STREAM_HEADERS_SENT)) 1848 { 1849 LSQ_DEBUG("headers not sent, send a reset"); 1850 lsquic_stream_reset(stream, 0); 1851 } 1852 else if (stream->sm_n_buffered == 0) 1853 { 1854 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1855 stream)) 1856 { 1857 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1858 stream->stream_flags |= STREAM_FIN_SENT; 1859 } 1860 else 1861 { 1862 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1863 "flag in it"); 1864 (void) stream_flush_nocheck(stream); 1865 } 1866 } 1867 else 1868 (void) stream_flush_nocheck(stream); 1869 } 1870} 1871 1872 1873static void 1874maybe_stream_shutdown_write (struct lsquic_stream *stream) 1875{ 1876 if (stream->sm_send_headers_state == SSHS_BEGIN) 1877 stream_shutdown_write(stream); 1878 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1879 { 1880 LSQ_DEBUG("shutdown delayed"); 1881 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1882 stream->stream_flags |= STREAM_DELAYED_SW; 1883 } 1884} 1885 1886 1887int 1888lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1889{ 1890 LSQ_DEBUG("shutdown; how: %d", how); 1891 if (lsquic_stream_is_closed(stream)) 1892 { 1893 LSQ_INFO("Attempt to shut down a closed stream"); 1894 errno = EBADF; 1895 return -1; 1896 } 1897 /* 0: read, 1: write: 2: read and write 1898 */ 1899 if (how < 0 || how > 2) 1900 { 1901 errno = EINVAL; 1902 return -1; 1903 } 1904 1905 if (how) 1906 maybe_stream_shutdown_write(stream); 1907 if (how != 1) 1908 stream_shutdown_read(stream); 1909 1910 maybe_finish_stream(stream); 1911 maybe_schedule_call_on_close(stream); 1912 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1913 maybe_conn_to_tickable_if_writeable(stream, 1); 1914 1915 return 0; 1916} 1917 1918 1919void 1920lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1921{ 1922 LSQ_DEBUG("internal shutdown"); 1923 stream->stream_flags |= STREAM_U_READ_DONE|STREAM_U_WRITE_DONE; 1924 stream_wantwrite(stream, 0); 1925 stream_wantread(stream, 0); 1926 if (lsquic_stream_is_critical(stream)) 1927 { 1928 LSQ_DEBUG("add flag to force-finish special stream"); 1929 stream->stream_flags |= STREAM_FORCE_FINISH; 1930 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1931 } 1932 maybe_finish_stream(stream); 1933 maybe_schedule_call_on_close(stream); 1934} 1935 1936 1937static void 1938fake_reset_unused_stream (lsquic_stream_t *stream) 1939{ 1940 stream->stream_flags |= 1941 STREAM_RST_RECVD /* User will pick this up on read or write */ 1942 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1943 ; 1944 1945 /* Cancel all writes to the network scheduled for this stream: */ 1946 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1947 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1948 next_send_stream); 1949 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1950 drop_buffered_data(stream); 1951 LSQ_DEBUG("fake-reset stream%s", 1952 stream_stalled(stream) ? " (stalled)" : ""); 1953 maybe_finish_stream(stream); 1954 maybe_schedule_call_on_close(stream); 1955} 1956 1957 1958/* This function should only be called for locally-initiated streams whose ID 1959 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1960 * frame sent by peer but we have not yet received it and created a stream. 1961 * In this situation, we mark the stream as reset, so that user's on_read or 1962 * on_write event callback picks up the error. That, in turn, should result 1963 * in stream being closed. 1964 * 1965 * If we have received any data frames on this stream, this probably indicates 1966 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1967 * lower than this. However, we still try to handle it gracefully and peform 1968 * a shutdown, as if the stream was not reset. 1969 */ 1970void 1971lsquic_stream_received_goaway (lsquic_stream_t *stream) 1972{ 1973 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1974 1975 if (stream->stream_flags & STREAM_GOAWAY_IN) 1976 { 1977 LSQ_DEBUG("ignore duplicate GOAWAY"); 1978 return; 1979 } 1980 stream->stream_flags |= STREAM_GOAWAY_IN; 1981 1982 if (0 == stream->read_offset && 1983 stream->data_in->di_if->di_empty(stream->data_in)) 1984 fake_reset_unused_stream(stream); /* Normal condition */ 1985 else 1986 { /* This is odd, let's handle it the best we can: */ 1987 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1988 lsquic_stream_shutdown_internal(stream); 1989 } 1990} 1991 1992 1993uint64_t 1994lsquic_stream_read_offset (const lsquic_stream_t *stream) 1995{ 1996 return stream->read_offset; 1997} 1998 1999 2000static int 2001stream_wantread (lsquic_stream_t *stream, int is_want) 2002{ 2003 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 2004 const int new_val = !!is_want; 2005 if (old_val != new_val) 2006 { 2007 if (new_val) 2008 { 2009 if (!old_val) 2010 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 2011 next_read_stream); 2012 stream->sm_qflags |= SMQF_WANT_READ; 2013 } 2014 else 2015 { 2016 stream->sm_qflags &= ~SMQF_WANT_READ; 2017 if (old_val) 2018 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 2019 next_read_stream); 2020 } 2021 } 2022 return old_val; 2023} 2024 2025 2026static void 2027maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 2028{ 2029 assert(SMQF_WRITE_Q_FLAGS & flag); 2030 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 2031 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2032 next_write_stream); 2033 stream->sm_qflags |= flag; 2034} 2035 2036 2037static void 2038maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 2039{ 2040 assert(SMQF_WRITE_Q_FLAGS & flag); 2041 if (stream->sm_qflags & flag) 2042 { 2043 stream->sm_qflags &= ~flag; 2044 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 2045 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2046 next_write_stream); 2047 } 2048} 2049 2050 2051static int 2052stream_wantwrite (struct lsquic_stream *stream, int new_val) 2053{ 2054 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 2055 2056 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 2057 2058 if (old_val != new_val) 2059 { 2060 if (new_val) 2061 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 2062 else 2063 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 2064 } 2065 return old_val; 2066} 2067 2068 2069int 2070lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 2071{ 2072 SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want); 2073 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 2074 { 2075 if (is_want) 2076 maybe_conn_to_tickable_if_readable(stream); 2077 return stream_wantread(stream, is_want); 2078 } 2079 else 2080 { 2081 errno = EBADF; 2082 return -1; 2083 } 2084} 2085 2086 2087int 2088lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 2089{ 2090 int old_val; 2091 2092 is_want = !!is_want; 2093 2094 SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want); 2095 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 2096 && SSHS_BEGIN == stream->sm_send_headers_state) 2097 { 2098 stream->sm_saved_want_write = is_want; 2099 if (is_want) 2100 maybe_conn_to_tickable_if_writeable(stream, 1); 2101 return stream_wantwrite(stream, is_want); 2102 } 2103 else if (SSHS_BEGIN != stream->sm_send_headers_state) 2104 { 2105 old_val = stream->sm_saved_want_write; 2106 stream->sm_saved_want_write = is_want; 2107 return old_val; 2108 } 2109 else 2110 { 2111 errno = EBADF; 2112 return -1; 2113 } 2114} 2115 2116 2117struct progress 2118{ 2119 enum stream_flags s_flags; 2120 enum stream_q_flags q_flags; 2121}; 2122 2123 2124static struct progress 2125stream_progress (const struct lsquic_stream *stream) 2126{ 2127 return (struct progress) { 2128 .s_flags = stream->stream_flags 2129 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 2130 .q_flags = stream->sm_qflags 2131 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 2132 }; 2133} 2134 2135 2136static int 2137progress_eq (struct progress a, struct progress b) 2138{ 2139 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 2140} 2141 2142 2143static void 2144stream_dispatch_read_events_loop (lsquic_stream_t *stream) 2145{ 2146 unsigned no_progress_count, no_progress_limit; 2147 struct progress progress; 2148 uint64_t size; 2149 2150 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 2151 2152 no_progress_count = 0; 2153 while ((stream->sm_qflags & SMQF_WANT_READ) 2154 && lsquic_stream_readable(stream)) 2155 { 2156 progress = stream_progress(stream); 2157 size = stream->read_offset; 2158 2159 stream->stream_if->on_read(stream, stream->st_ctx); 2160 2161 if (no_progress_limit && size == stream->read_offset && 2162 progress_eq(progress, stream_progress(stream))) 2163 { 2164 ++no_progress_count; 2165 if (no_progress_count >= no_progress_limit) 2166 { 2167 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 2168 "progress) in user code reading from stream", 2169 no_progress_count, 2170 no_progress_count == 1 ? "" : "s"); 2171 break; 2172 } 2173 } 2174 else 2175 no_progress_count = 0; 2176 } 2177} 2178 2179 2180static void 2181stream_hblock_sent (struct lsquic_stream *stream) 2182{ 2183 int want_write; 2184 2185 LSQ_DEBUG("header block has been sent: restore default behavior"); 2186 stream->sm_send_headers_state = SSHS_BEGIN; 2187 stream->sm_write_avail = stream_write_avail_with_frames; 2188 2189 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 2190 if (want_write != stream->sm_saved_want_write) 2191 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 2192 2193 if (stream->stream_flags & STREAM_DELAYED_SW) 2194 { 2195 LSQ_DEBUG("performing delayed shutdown write"); 2196 stream->stream_flags &= ~STREAM_DELAYED_SW; 2197 stream_shutdown_write(stream); 2198 maybe_schedule_call_on_close(stream); 2199 maybe_finish_stream(stream); 2200 maybe_conn_to_tickable_if_writeable(stream, 1); 2201 } 2202} 2203 2204 2205static void 2206on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 2207{ 2208 ssize_t nw; 2209 2210 nw = stream_write_buf(stream, 2211 stream->sm_header_block + stream->sm_hblock_off, 2212 stream->sm_hblock_sz - stream->sm_hblock_off); 2213 if (nw > 0) 2214 { 2215 stream->sm_hblock_off += nw; 2216 if (stream->sm_hblock_off == stream->sm_hblock_sz) 2217 { 2218 stream->stream_flags |= STREAM_HEADERS_SENT; 2219 free(stream->sm_header_block); 2220 stream->sm_header_block = NULL; 2221 stream->sm_hblock_sz = 0; 2222 stream_hblock_sent(stream); 2223 LSQ_DEBUG("header block written out successfully"); 2224 /* TODO: if there was eos, do something else */ 2225 if (stream->sm_qflags & SMQF_WANT_WRITE) 2226 stream->stream_if->on_write(stream, h); 2227 } 2228 else 2229 { 2230 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 2231 nw); 2232 } 2233 } 2234 else if (nw < 0) 2235 { 2236 /* XXX What should happen if we hit an error? TODO */ 2237 } 2238} 2239 2240 2241static void 2242(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 2243 lsquic_stream_ctx_t *) 2244{ 2245 if (0 == (stream->stream_flags & STREAM_PUSHING) 2246 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 2247 /* Common case */ 2248 return stream->stream_if->on_write; 2249 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 2250 return on_write_header_wrapper; 2251 else 2252 { 2253 assert(stream->stream_flags & STREAM_PUSHING); 2254 if (stream_is_pushing_promise(stream)) 2255 return on_write_pp_wrapper; 2256 else 2257 return stream->stream_if->on_write; 2258 } 2259} 2260 2261 2262static void 2263stream_dispatch_write_events_loop (lsquic_stream_t *stream) 2264{ 2265 unsigned no_progress_count, no_progress_limit; 2266 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2267 struct progress progress; 2268 2269 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 2270 2271 no_progress_count = 0; 2272 stream->stream_flags |= STREAM_LAST_WRITE_OK; 2273 while ((stream->sm_qflags & SMQF_WANT_WRITE) 2274 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 2275 && stream_writeable(stream)) 2276 { 2277 progress = stream_progress(stream); 2278 2279 on_write = select_on_write(stream); 2280 on_write(stream, stream->st_ctx); 2281 2282 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 2283 { 2284 ++no_progress_count; 2285 if (no_progress_count >= no_progress_limit) 2286 { 2287 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 2288 "progress) in user code writing to stream", 2289 no_progress_count, 2290 no_progress_count == 1 ? "" : "s"); 2291 break; 2292 } 2293 } 2294 else 2295 no_progress_count = 0; 2296 } 2297} 2298 2299 2300static void 2301stream_dispatch_read_events_once (lsquic_stream_t *stream) 2302{ 2303 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 2304 { 2305 stream->stream_if->on_read(stream, stream->st_ctx); 2306 } 2307} 2308 2309 2310uint64_t 2311lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 2312{ 2313 size_t frames_sizes; 2314 2315 frames_sizes = active_hq_frame_sizes(stream); 2316 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 2317} 2318 2319 2320static void 2321maybe_mark_as_blocked (lsquic_stream_t *stream) 2322{ 2323 struct lsquic_conn_cap *cc; 2324 uint64_t used; 2325 2326 used = lsquic_stream_combined_send_off(stream); 2327 if (stream->max_send_off == used) 2328 { 2329 if (stream->blocked_off < stream->max_send_off) 2330 { 2331 stream->blocked_off = used; 2332 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 2333 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2334 next_send_stream); 2335 stream->sm_qflags |= SMQF_SEND_BLOCKED; 2336 LSQ_DEBUG("marked stream-blocked at stream offset " 2337 "%"PRIu64, stream->blocked_off); 2338 } 2339 else 2340 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 2341 " has been, or is about to be, sent", stream->blocked_off); 2342 } 2343 2344 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 2345 && (cc = &stream->conn_pub->conn_cap, 2346 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 2347 { 2348 if (cc->cc_blocked < cc->cc_max) 2349 { 2350 cc->cc_blocked = cc->cc_max; 2351 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 2352 LSQ_DEBUG("marked connection-blocked at connection offset " 2353 "%"PRIu64, cc->cc_max); 2354 } 2355 else 2356 LSQ_DEBUG("stream has already been marked connection-blocked " 2357 "at offset %"PRIu64, cc->cc_blocked); 2358 } 2359} 2360 2361 2362void 2363lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 2364{ 2365 assert(stream->sm_qflags & SMQF_WANT_READ); 2366 2367 if (stream->sm_bflags & SMBF_RW_ONCE) 2368 stream_dispatch_read_events_once(stream); 2369 else 2370 stream_dispatch_read_events_loop(stream); 2371} 2372 2373 2374void 2375lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 2376{ 2377 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2378 int progress; 2379 uint64_t tosend_off; 2380 unsigned short n_buffered; 2381 enum stream_q_flags q_flags; 2382 2383 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 2384 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 2385 tosend_off = stream->tosend_off; 2386 n_buffered = stream->sm_n_buffered; 2387 2388 if (stream->sm_qflags & SMQF_WANT_FLUSH) 2389 (void) stream_flush(stream); 2390 2391 if (stream->sm_bflags & SMBF_RW_ONCE) 2392 { 2393 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2394 && stream_writeable(stream)) 2395 { 2396 on_write = select_on_write(stream); 2397 on_write(stream, stream->st_ctx); 2398 } 2399 } 2400 else 2401 stream_dispatch_write_events_loop(stream); 2402 2403 /* Progress means either flags or offsets changed: */ 2404 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2405 stream->tosend_off == tosend_off && 2406 stream->sm_n_buffered == n_buffered); 2407 2408 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2409 { 2410 if (progress) 2411 { /* Move the stream to the end of the list to ensure fairness. */ 2412 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2413 next_write_stream); 2414 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2415 next_write_stream); 2416 } 2417 } 2418} 2419 2420 2421static size_t 2422inner_reader_empty_size (void *ctx) 2423{ 2424 return 0; 2425} 2426 2427 2428static size_t 2429inner_reader_empty_read (void *ctx, void *buf, size_t count) 2430{ 2431 return 0; 2432} 2433 2434 2435static int 2436stream_flush (lsquic_stream_t *stream) 2437{ 2438 struct lsquic_reader empty_reader; 2439 ssize_t nw; 2440 2441 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2442 assert(stream->sm_n_buffered > 0 || 2443 /* Flushing is also used to packetize standalone FIN: */ 2444 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2445 == STREAM_U_WRITE_DONE)); 2446 2447 empty_reader.lsqr_size = inner_reader_empty_size; 2448 empty_reader.lsqr_read = inner_reader_empty_read; 2449 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2450 nw = stream_write_to_packets(stream, &empty_reader, 0, SWO_BUFFER); 2451 2452 if (nw >= 0) 2453 { 2454 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2455 return 0; 2456 } 2457 else 2458 return -1; 2459} 2460 2461 2462static int 2463stream_flush_nocheck (lsquic_stream_t *stream) 2464{ 2465 size_t frames; 2466 2467 frames = active_hq_frame_sizes(stream); 2468 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2469 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2470 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2471 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2472 2473 return stream_flush(stream); 2474} 2475 2476 2477int 2478lsquic_stream_flush (lsquic_stream_t *stream) 2479{ 2480 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2481 { 2482 LSQ_DEBUG("cannot flush closed stream"); 2483 errno = EBADF; 2484 return -1; 2485 } 2486 2487 if (0 == stream->sm_n_buffered) 2488 { 2489 LSQ_DEBUG("flushing 0 bytes: noop"); 2490 return 0; 2491 } 2492 2493 return stream_flush_nocheck(stream); 2494} 2495 2496 2497static size_t 2498stream_get_n_allowed (const struct lsquic_stream *stream) 2499{ 2500 if (stream->sm_n_allocated) 2501 return stream->sm_n_allocated; 2502 else 2503 return stream->conn_pub->path->np_pack_size; 2504} 2505 2506 2507/* The flush threshold is the maximum size of stream data that can be sent 2508 * in a full packet. 2509 */ 2510#ifdef NDEBUG 2511static 2512#endif 2513 size_t 2514lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2515 unsigned data_sz) 2516{ 2517 enum packet_out_flags flags; 2518 enum packno_bits bits; 2519 size_t packet_header_sz, stream_header_sz, tag_len; 2520 size_t threshold; 2521 2522 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl, PNS_APP); 2523 flags = bits << POBIT_SHIFT; 2524 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2525 flags |= PO_CONN_ID; 2526 if (stream_is_hsk(stream)) 2527 flags |= PO_LONGHEAD; 2528 2529 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2530 stream->conn_pub->path->np_dcid.len, HETY_NOT_SET); 2531 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2532 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2533 2534 threshold = stream_get_n_allowed(stream) - tag_len 2535 - packet_header_sz - stream_header_sz; 2536 return threshold; 2537} 2538 2539 2540#define COMMON_WRITE_CHECKS() do { \ 2541 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2542 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2543 { \ 2544 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2545 { \ 2546 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2547 return 0; \ 2548 } \ 2549 else \ 2550 { \ 2551 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2552 "headers"); \ 2553 errno = EILSEQ; \ 2554 return -1; \ 2555 } \ 2556 } \ 2557 if (lsquic_stream_is_write_reset(stream)) \ 2558 { \ 2559 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2560 errno = ECONNRESET; \ 2561 return -1; \ 2562 } \ 2563 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2564 { \ 2565 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2566 "writing"); \ 2567 errno = EBADF; \ 2568 return -1; \ 2569 } \ 2570} while (0) 2571 2572 2573struct frame_gen_ctx 2574{ 2575 lsquic_stream_t *fgc_stream; 2576 struct lsquic_reader *fgc_reader; 2577 /* We keep our own count of how many bytes were read from reader because 2578 * some readers are external. The external caller does not have to rely 2579 * on our count, but it can. 2580 */ 2581 size_t fgc_nread_from_reader; 2582 size_t (*fgc_size) (void *ctx); 2583 int (*fgc_fin) (void *ctx); 2584 gsf_read_f fgc_read; 2585 size_t fgc_thresh; 2586}; 2587 2588 2589static size_t 2590frame_std_gen_size (void *ctx) 2591{ 2592 struct frame_gen_ctx *fg_ctx = ctx; 2593 size_t available, remaining; 2594 2595 /* Make sure we are not writing past available size: */ 2596 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2597 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2598 if (available < remaining) 2599 remaining = available; 2600 2601 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2602} 2603 2604 2605static size_t 2606stream_hq_frame_size (const struct stream_hq_frame *shf) 2607{ 2608 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2609 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2610 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2611 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2612 else 2613 { 2614 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2615 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2616 return 0; 2617 } 2618} 2619 2620 2621static size_t 2622active_hq_frame_sizes (const struct lsquic_stream *stream) 2623{ 2624 const struct stream_hq_frame *shf; 2625 size_t size; 2626 2627 size = 0; 2628 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2629 == (SMBF_IETF|SMBF_USE_HEADERS)) 2630 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2631 if (!(shf->shf_flags & SHF_WRITTEN)) 2632 size += stream_hq_frame_size(shf); 2633 2634 return size; 2635} 2636 2637 2638static uint64_t 2639stream_hq_frame_end (const struct stream_hq_frame *shf) 2640{ 2641 if (shf->shf_flags & SHF_FIXED_SIZE) 2642 return shf->shf_off + shf->shf_frame_size; 2643 else if (shf->shf_flags & SHF_TWO_BYTES) 2644 return shf->shf_off + ((1 << 14) - 1); 2645 else 2646 return shf->shf_off + ((1 << 6) - 1); 2647} 2648 2649 2650static int 2651frame_in_stream (const struct lsquic_stream *stream, 2652 const struct stream_hq_frame *shf) 2653{ 2654 return shf >= stream->sm_hq_frame_arr 2655 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2656 / sizeof(stream->sm_hq_frame_arr[0]) 2657 ; 2658} 2659 2660 2661static void 2662stream_hq_frame_put (struct lsquic_stream *stream, 2663 struct stream_hq_frame *shf) 2664{ 2665 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2666 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2667 if (frame_in_stream(stream, shf)) 2668 memset(shf, 0, sizeof(*shf)); 2669 else 2670 lsquic_malo_put(shf); 2671} 2672 2673 2674static void 2675stream_hq_frame_close (struct lsquic_stream *stream, 2676 struct stream_hq_frame *shf) 2677{ 2678 unsigned bits; 2679 2680 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2681 " (actual offset %"PRIu64")", shf->shf_frame_type, 2682 stream->sm_payload, stream->tosend_off); 2683 assert(shf->shf_flags & SHF_ACTIVE); 2684 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2685 { 2686 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2687 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2688 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2689 bits, 1 << bits); 2690 } 2691 stream_hq_frame_put(stream, shf); 2692} 2693 2694 2695static size_t 2696frame_hq_gen_size (void *ctx) 2697{ 2698 struct frame_gen_ctx *fg_ctx = ctx; 2699 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2700 size_t available, remaining, frames; 2701 const struct stream_hq_frame *shf; 2702 2703 frames = 0; 2704 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2705 if (shf->shf_off >= stream->sm_payload) 2706 frames += stream_hq_frame_size(shf); 2707 2708 /* Make sure we are not writing past available size: */ 2709 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2710 available = lsquic_stream_write_avail(stream); 2711 if (available < remaining) 2712 remaining = available; 2713 2714 return remaining + stream->sm_n_buffered + frames; 2715} 2716 2717 2718static int 2719frame_std_gen_fin (void *ctx) 2720{ 2721 struct frame_gen_ctx *fg_ctx = ctx; 2722 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2723 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2724 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2725 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2726 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2727} 2728 2729 2730static void 2731incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2732{ 2733 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2734 { 2735 stream->conn_pub->conn_cap.cc_sent += incr; 2736 assert(stream->conn_pub->conn_cap.cc_sent 2737 <= stream->conn_pub->conn_cap.cc_max); 2738 } 2739} 2740 2741 2742static void 2743incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2744{ 2745 stream->sm_payload += incr; 2746 stream->tosend_off += incr; 2747 assert(stream->tosend_off <= stream->max_send_off); 2748} 2749 2750 2751static void 2752maybe_resize_threshold (struct frame_gen_ctx *fg_ctx) 2753{ 2754 struct lsquic_stream *stream = fg_ctx->fgc_stream; 2755 size_t old; 2756 2757 if (fg_ctx->fgc_thresh) 2758 { 2759 old = fg_ctx->fgc_thresh; 2760 fg_ctx->fgc_thresh 2761 = lsquic_stream_flush_threshold(stream, fg_ctx->fgc_size(fg_ctx)); 2762 LSQ_DEBUG("changed threshold from %zd to %zd", old, fg_ctx->fgc_thresh); 2763 } 2764} 2765 2766 2767static size_t 2768frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2769{ 2770 struct frame_gen_ctx *fg_ctx = ctx; 2771 unsigned char *p = begin_buf; 2772 unsigned char *const end = p + len; 2773 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2774 size_t n_written, available, n_to_write; 2775 2776 if (stream->sm_n_buffered > 0) 2777 { 2778 if (len <= stream->sm_n_buffered) 2779 { 2780 memcpy(p, stream->sm_buf, len); 2781 memmove(stream->sm_buf, stream->sm_buf + len, 2782 stream->sm_n_buffered - len); 2783 stream->sm_n_buffered -= len; 2784 if (0 == stream->sm_n_buffered) 2785 { 2786 maybe_resize_stream_buffer(stream); 2787 maybe_resize_threshold(fg_ctx); 2788 } 2789 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2790 incr_sm_payload(stream, len); 2791 *fin = fg_ctx->fgc_fin(fg_ctx); 2792 return len; 2793 } 2794 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2795 p += stream->sm_n_buffered; 2796 stream->sm_n_buffered = 0; 2797 maybe_resize_stream_buffer(stream); 2798 maybe_resize_threshold(fg_ctx); 2799 } 2800 2801 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2802 n_to_write = end - p; 2803 if (n_to_write > available) 2804 n_to_write = available; 2805 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2806 n_to_write); 2807 p += n_written; 2808 fg_ctx->fgc_nread_from_reader += n_written; 2809 *fin = fg_ctx->fgc_fin(fg_ctx); 2810 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2811 incr_conn_cap(stream, n_written); 2812 return p - (const unsigned char *) begin_buf; 2813} 2814 2815 2816static struct stream_hq_frame * 2817find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2818{ 2819 struct stream_hq_frame *shf; 2820 2821 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2822 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2823 return shf; 2824 2825 return NULL; 2826} 2827 2828 2829static struct stream_hq_frame * 2830find_cur_hq_frame (const struct lsquic_stream *stream) 2831{ 2832 return find_hq_frame(stream, stream->sm_payload); 2833} 2834 2835 2836static struct stream_hq_frame * 2837open_hq_frame (struct lsquic_stream *stream) 2838{ 2839 struct stream_hq_frame *shf; 2840 2841 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2842 + sizeof(stream->sm_hq_frame_arr) 2843 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2844 if (!(shf->shf_flags & SHF_ACTIVE)) 2845 goto found; 2846 2847 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2848 if (!shf) 2849 { 2850 LSQ_WARN("cannot allocate HQ frame"); 2851 return NULL; 2852 } 2853 memset(shf, 0, sizeof(*shf)); 2854 2855 found: 2856 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2857 shf->shf_flags = SHF_ACTIVE; 2858 return shf; 2859} 2860 2861 2862static struct stream_hq_frame * 2863stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2864 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2865{ 2866 struct stream_hq_frame *shf; 2867 2868 shf = open_hq_frame(stream); 2869 if (!shf) 2870 { 2871 LSQ_WARN("could not open HQ frame"); 2872 return NULL; 2873 } 2874 2875 shf->shf_off = off; 2876 shf->shf_flags |= flags; 2877 shf->shf_frame_type = frame_type; 2878 if (shf->shf_flags & SHF_FIXED_SIZE) 2879 { 2880 shf->shf_frame_size = size; 2881 LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset " 2882 "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size); 2883 } 2884 else 2885 { 2886 shf->shf_frame_ptr = NULL; 2887 if (size >= (1 << 6)) 2888 shf->shf_flags |= SHF_TWO_BYTES; 2889 LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset " 2890 "%"PRIu64, shf->shf_frame_type, shf->shf_off); 2891 } 2892 2893 return shf; 2894} 2895 2896 2897struct hq_arr 2898{ 2899 unsigned char **p; 2900 unsigned count; 2901 unsigned max; 2902}; 2903 2904 2905static int 2906save_hq_ptr (struct hq_arr *hq_arr, void *p) 2907{ 2908 if (hq_arr->count < hq_arr->max) 2909 { 2910 hq_arr->p[hq_arr->count++] = p; 2911 return 0; 2912 } 2913 else 2914 return -1; 2915} 2916 2917 2918static size_t 2919frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2920{ 2921 struct frame_gen_ctx *fg_ctx = ctx; 2922 unsigned char *p = begin_buf; 2923 unsigned char *const end = p + len; 2924 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2925 struct stream_hq_frame *shf; 2926 size_t nw, frame_sz, avail, rem; 2927 unsigned bits; 2928 int new; 2929 2930 while (p < end) 2931 { 2932 shf = find_cur_hq_frame(stream); 2933 if (shf) 2934 { 2935 new = 0; 2936 LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64, 2937 shf->shf_frame_type, shf->shf_off); 2938 } 2939 else 2940 { 2941 rem = frame_std_gen_size(ctx); 2942 if (rem) 2943 { 2944 if (rem > ((1 << 14) - 1)) 2945 rem = (1 << 14) - 1; 2946 shf = stream_activate_hq_frame(stream, 2947 stream->sm_payload, HQFT_DATA, 0, rem); 2948 if (shf) 2949 { 2950 new = 1; 2951 goto insert; 2952 } 2953 else 2954 { 2955 /* TODO: abort connection? Handle failure somehow */ 2956 break; 2957 } 2958 } 2959 else 2960 break; 2961 } 2962 if (shf->shf_off == stream->sm_payload 2963 && !(shf->shf_flags & SHF_WRITTEN)) 2964 { 2965 insert: 2966 frame_sz = stream_hq_frame_size(shf); 2967 if (frame_sz > (uintptr_t) (end - p)) 2968 { 2969 if (new) 2970 stream_hq_frame_put(stream, shf); 2971 break; 2972 } 2973 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2974 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2975 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2976 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2977 { 2978 shf->shf_frame_ptr = p; 2979 if (stream->sm_hq_arr && 0 != save_hq_ptr(stream->sm_hq_arr, p)) 2980 { 2981 stream_hq_frame_put(stream, shf); 2982 break; 2983 } 2984 memset(p, 0, frame_sz); 2985 p += frame_sz; 2986 } 2987 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2988 == SHF_FIXED_SIZE) 2989 { 2990 *p++ = shf->shf_frame_type; 2991 bits = vint_val2bits(shf->shf_frame_size); 2992 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2993 p += 1 << bits; 2994 } 2995 else 2996 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2997 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2998 if (!(shf->shf_flags & SHF_CC_PAID)) 2999 { 3000 incr_conn_cap(stream, frame_sz); 3001 shf->shf_flags |= SHF_CC_PAID; 3002 } 3003 shf->shf_flags |= SHF_WRITTEN; 3004 stream->tosend_off += frame_sz; 3005 assert(stream->tosend_off <= stream->max_send_off); 3006 } 3007 else 3008 { 3009 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 3010 len = stream_hq_frame_end(shf) - stream->sm_payload; 3011 assert(len); 3012 if (len > (unsigned) (end - p)) 3013 len = end - p; 3014 if (len > avail) 3015 len = avail; 3016 if (!len) 3017 break; 3018 nw = frame_std_gen_read(ctx, p, len, fin); 3019 p += nw; 3020 if (nw < len) 3021 break; 3022 if (stream_hq_frame_end(shf) == stream->sm_payload) 3023 stream_hq_frame_close(stream, shf); 3024 } 3025 } 3026 3027 return p - (unsigned char *) begin_buf; 3028} 3029 3030 3031static void 3032check_flush_threshold (lsquic_stream_t *stream) 3033{ 3034 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 3035 stream->tosend_off >= stream->sm_flush_to) 3036 { 3037 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 3038 stream->sm_flush_to); 3039 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 3040 } 3041} 3042 3043 3044#if LSQUIC_EXTRA_CHECKS 3045static void 3046verify_conn_cap (const struct lsquic_conn_public *conn_pub) 3047{ 3048 const struct lsquic_stream *stream; 3049 struct lsquic_hash_elem *el; 3050 unsigned n_buffered; 3051 3052 if (conn_pub->wtp_level > 1) 3053 return; 3054 3055 if (!conn_pub->all_streams) 3056 /* TODO: enable this check for unit tests as well */ 3057 return; 3058 3059 n_buffered = 0; 3060 for (el = lsquic_hash_first(conn_pub->all_streams); el; 3061 el = lsquic_hash_next(conn_pub->all_streams)) 3062 { 3063 stream = lsquic_hashelem_getdata(el); 3064 if (stream->sm_bflags & SMBF_CONN_LIMITED) 3065 n_buffered += stream->sm_n_buffered; 3066 } 3067 3068 assert(n_buffered + conn_pub->stream_frame_bytes 3069 == conn_pub->conn_cap.cc_sent); 3070 LSQ_DEBUG("%s: cc_sent: %"PRIu64, __func__, conn_pub->conn_cap.cc_sent); 3071} 3072 3073 3074#endif 3075 3076 3077static int 3078write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 3079 struct lsquic_packet_out *packet_out) 3080{ 3081 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 3082 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 3083 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 3084 unsigned off; 3085 int len, s; 3086 3087#if LSQUIC_CONN_STATS || LSQUIC_EXTRA_CHECKS 3088 const uint64_t begin_off = stream->tosend_off; 3089#endif 3090 off = packet_out->po_data_sz; 3091 len = pf->pf_gen_stream_frame( 3092 packet_out->po_data + packet_out->po_data_sz, 3093 lsquic_packet_out_avail(packet_out), stream->id, 3094 stream->tosend_off, 3095 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 3096 if (len < 0) 3097 return len; 3098 3099#if LSQUIC_CONN_STATS 3100 stream->conn_pub->conn_stats->out.stream_frames += 1; 3101 stream->conn_pub->conn_stats->out.stream_data_sz 3102 += stream->tosend_off - begin_off; 3103#endif 3104 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 3105 packet_out->po_data + packet_out->po_data_sz, len); 3106 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 3107 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 3108 if (0 == lsquic_packet_out_avail(packet_out)) 3109 packet_out->po_flags |= PO_STREAM_END; 3110 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 3111 stream, QUIC_FRAME_STREAM, off, len); 3112 if (s != 0) 3113 { 3114 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 3115 return -1; 3116 } 3117#if LSQUIC_EXTRA_CHECKS 3118 if (stream->sm_bflags & SMBF_CONN_LIMITED) 3119 { 3120 stream->conn_pub->stream_frame_bytes += stream->tosend_off - begin_off; 3121 verify_conn_cap(stream->conn_pub); 3122 } 3123#endif 3124 3125 check_flush_threshold(stream); 3126 return len; 3127} 3128 3129 3130static enum swtp_status 3131stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 3132{ 3133 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 3134 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 3135 struct lsquic_packet_out *packet_out; 3136 int len; 3137 3138 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 3139 stream->conn_pub->path); 3140 if (!packet_out) 3141 return SWTP_STOP; 3142 packet_out->po_header_type = stream->tosend_off == 0 3143 ? HETY_INITIAL : HETY_HANDSHAKE; 3144 3145 len = write_stream_frame(fg_ctx, size, packet_out); 3146 3147 if (len > 0) 3148 { 3149 packet_out->po_flags |= PO_HELLO; 3150 lsquic_packet_out_zero_pad(packet_out); 3151 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 3152 return SWTP_OK; 3153 } 3154 else 3155 return SWTP_ERROR; 3156} 3157 3158 3159static enum swtp_status 3160stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 3161{ 3162 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 3163 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 3164 unsigned stream_header_sz, need_at_least; 3165 struct lsquic_packet_out *packet_out; 3166 struct lsquic_stream *headers_stream; 3167 int len; 3168 3169 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 3170 == STREAM_HEADERS_SENT) 3171 { 3172 if (stream->sm_bflags & SMBF_IETF) 3173 { 3174 if (stream->stream_flags & STREAM_ENCODER_DEP) 3175 headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out; 3176 else 3177 headers_stream = NULL; 3178 } 3179 else 3180 headers_stream = 3181 lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 3182 if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream)) 3183 { 3184 LSQ_DEBUG("flushing headers stream before packetizing stream data"); 3185 (void) lsquic_stream_flush(headers_stream); 3186 } 3187 /* If there is nothing to flush, some other stream must have flushed it: 3188 * this means our headers are flushed. Either way, only do this once. 3189 */ 3190 stream->stream_flags |= STREAM_HDRS_FLUSHED; 3191 } 3192 3193 stream_header_sz = stream->sm_frame_header_sz(stream, size); 3194 need_at_least = stream_header_sz; 3195 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3196 == (SMBF_IETF|SMBF_USE_HEADERS)) 3197 { 3198 if (size > 0) 3199 need_at_least += 3; /* Enough room for HTTP/3 frame */ 3200 } 3201 else 3202 need_at_least += size > 0; 3203 get_packet: 3204 packet_out = stream->sm_get_packet_for_stream(send_ctl, 3205 need_at_least, stream->conn_pub->path, stream); 3206 if (packet_out) 3207 { 3208 len = write_stream_frame(fg_ctx, size, packet_out); 3209 if (len > 0) 3210 return SWTP_OK; 3211 assert(len < 0); 3212 if (-len > (int) need_at_least) 3213 { 3214 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 3215 "%u bytes, will try again", -len, need_at_least); 3216 need_at_least = -len; 3217 goto get_packet; 3218 } 3219 return SWTP_ERROR; 3220 } 3221 else 3222 return SWTP_STOP; 3223} 3224 3225 3226/* Use for IETF crypto streams and gQUIC crypto stream for versions >= Q050. */ 3227static enum swtp_status 3228stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 3229{ 3230 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 3231 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 3232 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 3233 unsigned crypto_header_sz, need_at_least; 3234 struct lsquic_packet_out *packet_out; 3235 unsigned short off; 3236 enum packnum_space pns; 3237 int len, s; 3238 3239 if (stream->sm_bflags & SMBF_IETF) 3240 pns = lsquic_enclev2pns[ crypto_level(stream) ]; 3241 else 3242 pns = PNS_APP; 3243 3244 assert(size > 0); 3245 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 3246 need_at_least = crypto_header_sz + 1; 3247 3248 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 3249 need_at_least, pns, stream->conn_pub->path); 3250 if (!packet_out) 3251 return SWTP_STOP; 3252 3253 off = packet_out->po_data_sz; 3254 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 3255 lsquic_packet_out_avail(packet_out), 0, stream->tosend_off, 0, 3256 size, frame_std_gen_read, fg_ctx); 3257 if (len < 0) 3258 return len; 3259 3260 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 3261 packet_out->po_data + packet_out->po_data_sz, len); 3262 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 3263 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 3264 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 3265 stream, QUIC_FRAME_CRYPTO, off, len); 3266 if (s != 0) 3267 { 3268 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 3269 return -1; 3270 } 3271 3272 packet_out->po_flags |= PO_HELLO; 3273 3274 if (!(stream->sm_bflags & SMBF_IETF)) 3275 { 3276 const unsigned short before = packet_out->po_data_sz; 3277 lsquic_packet_out_zero_pad(packet_out); 3278 /* XXX: too hacky */ 3279 if (before < packet_out->po_data_sz) 3280 send_ctl->sc_bytes_scheduled += packet_out->po_data_sz - before; 3281 } 3282 3283 check_flush_threshold(stream); 3284 return SWTP_OK; 3285} 3286 3287 3288static void 3289abort_connection (struct lsquic_stream *stream) 3290{ 3291 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 3292 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 3293 next_service_stream); 3294 stream->sm_qflags |= SMQF_ABORT_CONN; 3295 LSQ_WARN("connection will be aborted"); 3296 maybe_conn_to_tickable(stream); 3297} 3298 3299 3300static void 3301maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 3302{ 3303 struct stream_hq_frame *shf; 3304 uint64_t size; 3305 unsigned bits; 3306 3307 shf = find_cur_hq_frame(stream); 3308 if (!shf) 3309 return; 3310 3311 if (shf->shf_flags & SHF_FIXED_SIZE) 3312 { 3313 if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload) 3314 stream_hq_frame_put(stream, shf); 3315 return; 3316 } 3317 3318 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 3319 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 3320 if (size <= VINT_MAX_B(bits) && shf->shf_frame_ptr) 3321 { 3322 if (0 == stream->sm_n_buffered) 3323 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 3324 shf->shf_frame_type, size); 3325 else 3326 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 3327 shf->shf_frame_type, size); 3328 shf->shf_frame_ptr[0] = shf->shf_frame_type; 3329 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 3330 if (0 == stream->sm_n_buffered) 3331 stream_hq_frame_put(stream, shf); 3332 else 3333 { 3334 shf->shf_frame_size = size; 3335 shf->shf_flags |= SHF_FIXED_SIZE; 3336 } 3337 } 3338 else if (!shf->shf_frame_ptr) 3339 LSQ_DEBUG("HQ frame of type 0x%X has not yet been written, not " 3340 "closing", shf->shf_frame_type); 3341 else 3342 { 3343 assert(stream->sm_n_buffered); 3344 LSQ_ERROR("cannot close frame of size %"PRIu64" on stream %"PRIu64 3345 " -- too large", size, stream->id); 3346 stream->conn_pub->lconn->cn_if->ci_internal_error( 3347 stream->conn_pub->lconn, "HTTP/3 frame too large"); 3348 stream_hq_frame_put(stream, shf); 3349 } 3350} 3351 3352 3353static ssize_t 3354stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 3355 size_t thresh, enum stream_write_options swo) 3356{ 3357 size_t size; 3358 ssize_t nw; 3359 unsigned seen_ok; 3360 int use_framing; 3361 struct frame_gen_ctx fg_ctx = { 3362 .fgc_stream = stream, 3363 .fgc_reader = reader, 3364 .fgc_nread_from_reader = 0, 3365 .fgc_thresh = thresh, 3366 }; 3367 3368#if LSQUIC_EXTRA_CHECKS 3369 if (stream->conn_pub) 3370 ++stream->conn_pub->wtp_level; 3371#endif 3372 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3373 == (SMBF_IETF|SMBF_USE_HEADERS); 3374 if (use_framing) 3375 { 3376 fg_ctx.fgc_size = frame_hq_gen_size; 3377 fg_ctx.fgc_read = frame_hq_gen_read; 3378 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 3379 } 3380 else 3381 { 3382 fg_ctx.fgc_size = frame_std_gen_size; 3383 fg_ctx.fgc_read = frame_std_gen_read; 3384 fg_ctx.fgc_fin = frame_std_gen_fin; 3385 } 3386 3387 seen_ok = 0; 3388 while ((size = fg_ctx.fgc_size(&fg_ctx), 3389 fg_ctx.fgc_thresh 3390 ? size >= fg_ctx.fgc_thresh : size > 0) 3391 || fg_ctx.fgc_fin(&fg_ctx)) 3392 { 3393 switch (stream->sm_write_to_packet(&fg_ctx, size)) 3394 { 3395 case SWTP_OK: 3396 if (!seen_ok++) 3397 { 3398 maybe_conn_to_tickable_if_writeable(stream, 0); 3399 maybe_update_last_progress(stream); 3400 } 3401 if (fg_ctx.fgc_fin(&fg_ctx)) 3402 { 3403 if (use_framing && seen_ok) 3404 maybe_close_varsize_hq_frame(stream); 3405 stream->stream_flags |= STREAM_FIN_SENT; 3406 goto end; 3407 } 3408 else 3409 break; 3410 case SWTP_STOP: 3411 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3412 if (use_framing && seen_ok) 3413 maybe_close_varsize_hq_frame(stream); 3414 goto end; 3415 default: 3416 abort_connection(stream); 3417 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3418 goto err; 3419 } 3420 } 3421 3422 if (use_framing && seen_ok) 3423 maybe_close_varsize_hq_frame(stream); 3424 3425 if (fg_ctx.fgc_thresh && (swo & SWO_BUFFER)) 3426 { 3427 assert(size < fg_ctx.fgc_thresh); 3428 assert(size >= stream->sm_n_buffered); 3429 size -= stream->sm_n_buffered; 3430 if (size > 0) 3431 { 3432 nw = save_to_buffer(stream, reader, size); 3433 if (nw < 0) 3434 goto err; 3435 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 3436 } 3437 } 3438#ifndef NDEBUG 3439 else if (swo & SWO_BUFFER) 3440 { 3441 /* We count flushed data towards both stream and connection limits, 3442 * so we should have been able to packetize all of it: 3443 */ 3444 assert(0 == stream->sm_n_buffered); 3445 assert(size == 0); 3446 } 3447#endif 3448 3449 maybe_mark_as_blocked(stream); 3450 3451 end: 3452#if LSQUIC_EXTRA_CHECKS 3453 if (stream->conn_pub) 3454 --stream->conn_pub->wtp_level; 3455#endif 3456 return fg_ctx.fgc_nread_from_reader; 3457 3458 err: 3459#if LSQUIC_EXTRA_CHECKS 3460 if (stream->conn_pub) 3461 --stream->conn_pub->wtp_level; 3462#endif 3463 return -1; 3464} 3465 3466 3467/* Perform an implicit flush when we hit connection limit while buffering 3468 * data. This is to prevent a (theoretical) stall: 3469 * 3470 * Imagine a number of streams, all of which buffered some data. The buffered 3471 * data is up to connection cap, which means no further writes are possible. 3472 * None of them flushes, which means that data is not sent and connection 3473 * WINDOW_UPDATE frame never arrives from peer. Stall. 3474 */ 3475static int 3476maybe_flush_stream (struct lsquic_stream *stream) 3477{ 3478 if (stream->sm_n_buffered > 0 3479 && (stream->sm_bflags & SMBF_CONN_LIMITED) 3480 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 3481 return stream_flush_nocheck(stream); 3482 else 3483 return 0; 3484} 3485 3486 3487static int 3488stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 3489 unsigned len) 3490{ 3491 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 3492 && cur_off - shf->shf_off < (1 << 6) 3493 && cur_off - shf->shf_off + len >= (1 << 6) 3494 ; 3495} 3496 3497 3498/* Update currently buffered HQ frame or create a new one, if possible. 3499 * Return update length to be buffered. If a HQ frame cannot be 3500 * buffered due to size, 0 is returned, thereby preventing both HQ frame 3501 * creation and buffering. 3502 */ 3503static size_t 3504update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 3505 size_t avail) 3506{ 3507 struct stream_hq_frame *shf; 3508 uint64_t cur_off, end; 3509 size_t frame_sz; 3510 unsigned extendable; 3511#if _MSC_VER 3512 end = 0; 3513 extendable = 0; 3514#endif 3515 3516 cur_off = stream->sm_payload + stream->sm_n_buffered; 3517 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3518 if (shf->shf_off <= cur_off) 3519 { 3520 end = stream_hq_frame_end(shf); 3521 extendable = stream_hq_frame_extendable(shf, cur_off, len); 3522 if (cur_off < end + extendable) 3523 break; 3524 } 3525 3526 if (shf) 3527 { 3528 if (len > end + extendable - cur_off) 3529 len = end + extendable - cur_off; 3530 frame_sz = stream_hq_frame_size(shf); 3531 } 3532 else 3533 { 3534 assert(avail >= 3); 3535 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 3536 if (!shf) 3537 return 0; 3538 if (len > stream_hq_frame_end(shf) - cur_off) 3539 len = stream_hq_frame_end(shf) - cur_off; 3540 extendable = 0; 3541 frame_sz = stream_hq_frame_size(shf); 3542 if (avail < frame_sz) 3543 return 0; 3544 avail -= frame_sz; 3545 } 3546 3547 if (!(shf->shf_flags & SHF_CC_PAID)) 3548 { 3549 incr_conn_cap(stream, frame_sz); 3550 shf->shf_flags |= SHF_CC_PAID; 3551 } 3552 if (extendable) 3553 { 3554 shf->shf_flags |= SHF_TWO_BYTES; 3555 incr_conn_cap(stream, 1); 3556 avail -= 1; 3557 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3558 && shf->shf_off <= stream->sm_payload 3559 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3560 stream->sm_flush_to += 1; 3561 } 3562 3563 if (len <= avail) 3564 return len; 3565 else 3566 return avail; 3567} 3568 3569 3570static ssize_t 3571save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3572 size_t len) 3573{ 3574 size_t avail, n_written, n_allowed; 3575 3576 avail = lsquic_stream_write_avail(stream); 3577 if (avail < len) 3578 len = avail; 3579 if (len == 0) 3580 { 3581 LSQ_DEBUG("zero-byte write (avail: %zu)", avail); 3582 return 0; 3583 } 3584 3585 n_allowed = stream_get_n_allowed(stream); 3586 assert(stream->sm_n_buffered + len <= n_allowed); 3587 3588 if (!stream->sm_buf) 3589 { 3590 stream->sm_buf = malloc(n_allowed); 3591 if (!stream->sm_buf) 3592 return -1; 3593 stream->sm_n_allocated = n_allowed; 3594 } 3595 3596 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3597 == (SMBF_IETF|SMBF_USE_HEADERS)) 3598 len = update_buffered_hq_frames(stream, len, avail); 3599 3600 n_written = reader->lsqr_read(reader->lsqr_ctx, 3601 stream->sm_buf + stream->sm_n_buffered, len); 3602 stream->sm_n_buffered += n_written; 3603 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3604 incr_conn_cap(stream, n_written); 3605 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3606 n_written, stream->sm_n_buffered); 3607 if (0 != maybe_flush_stream(stream)) 3608 return -1; 3609 return n_written; 3610} 3611 3612 3613static ssize_t 3614stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader, 3615 enum stream_write_options swo) 3616{ 3617 const struct stream_hq_frame *shf; 3618 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3619 ssize_t nw; 3620 3621 len = reader->lsqr_size(reader->lsqr_ctx); 3622 if (len == 0) 3623 return 0; 3624 3625 frames = 0; 3626 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3627 == (SMBF_IETF|SMBF_USE_HEADERS)) 3628 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3629 if (shf->shf_off >= stream->sm_payload) 3630 frames += stream_hq_frame_size(shf); 3631 total_len = len + frames + stream->sm_n_buffered; 3632 thresh = lsquic_stream_flush_threshold(stream, total_len); 3633 n_allowed = stream_get_n_allowed(stream); 3634 if (total_len <= n_allowed && total_len < thresh) 3635 { 3636 if (!(swo & SWO_BUFFER)) 3637 return 0; 3638 nwritten = 0; 3639 do 3640 { 3641 nw = save_to_buffer(stream, reader, len - nwritten); 3642 if (nw > 0) 3643 nwritten += (size_t) nw; 3644 else if (nw == 0) 3645 break; 3646 else 3647 return nw; 3648 } 3649 while (nwritten < len 3650 && stream->sm_n_buffered < stream->sm_n_allocated); 3651 return nwritten; 3652 } 3653 else 3654 return stream_write_to_packets(stream, reader, thresh, swo); 3655} 3656 3657 3658ssize_t 3659lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3660{ 3661 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3662 return lsquic_stream_writev(stream, &iov, 1); 3663} 3664 3665 3666struct inner_reader_iovec { 3667 const struct iovec *iov; 3668 const struct iovec *end; 3669 unsigned cur_iovec_off; 3670}; 3671 3672 3673static size_t 3674inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3675{ 3676 struct inner_reader_iovec *const iro = ctx; 3677 unsigned char *p = buf; 3678 unsigned char *const end = p + count; 3679 unsigned n_tocopy; 3680 3681 while (iro->iov < iro->end && p < end) 3682 { 3683 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3684 if (n_tocopy > (unsigned) (end - p)) 3685 n_tocopy = end - p; 3686 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3687 n_tocopy); 3688 p += n_tocopy; 3689 iro->cur_iovec_off += n_tocopy; 3690 if (iro->iov->iov_len == iro->cur_iovec_off) 3691 { 3692 ++iro->iov; 3693 iro->cur_iovec_off = 0; 3694 } 3695 } 3696 3697 return p + count - end; 3698} 3699 3700 3701static size_t 3702inner_reader_iovec_size (void *ctx) 3703{ 3704 struct inner_reader_iovec *const iro = ctx; 3705 const struct iovec *iov; 3706 size_t size; 3707 3708 size = 0; 3709 for (iov = iro->iov; iov < iro->end; ++iov) 3710 size += iov->iov_len; 3711 3712 return size - iro->cur_iovec_off; 3713} 3714 3715 3716ssize_t 3717lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3718 int iovcnt) 3719{ 3720 COMMON_WRITE_CHECKS(); 3721 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3722 3723 struct inner_reader_iovec iro = { 3724 .iov = iov, 3725 .end = iov + iovcnt, 3726 .cur_iovec_off = 0, 3727 }; 3728 struct lsquic_reader reader = { 3729 .lsqr_read = inner_reader_iovec_read, 3730 .lsqr_size = inner_reader_iovec_size, 3731 .lsqr_ctx = &iro, 3732 }; 3733 3734 return stream_write(stream, &reader, SWO_BUFFER); 3735} 3736 3737 3738ssize_t 3739lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3740{ 3741 COMMON_WRITE_CHECKS(); 3742 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3743 return stream_write(stream, reader, SWO_BUFFER); 3744} 3745 3746 3747/* Configuration for lsquic_stream_pwritev: */ 3748#ifndef LSQUIC_PWRITEV_DEF_IOVECS 3749#define LSQUIC_PWRITEV_DEF_IOVECS 16 3750#endif 3751/* This is an overkill, this limit should only be reached during testing: */ 3752#ifndef LSQUIC_PWRITEV_DEF_FRAMES 3753#define LSQUIC_PWRITEV_DEF_FRAMES (LSQUIC_PWRITEV_DEF_IOVECS * 2) 3754#endif 3755 3756#ifdef NDEBUG 3757#define PWRITEV_IOVECS LSQUIC_PWRITEV_DEF_IOVECS 3758#define PWRITEV_FRAMES LSQUIC_PWRITEV_DEF_FRAMES 3759#else 3760#if _MSC_VER 3761#define MALLOC_PWRITEV 1 3762#else 3763#define MALLOC_PWRITEV 0 3764#endif 3765static unsigned 3766 PWRITEV_IOVECS = LSQUIC_PWRITEV_DEF_IOVECS, 3767 PWRITEV_FRAMES = LSQUIC_PWRITEV_DEF_FRAMES; 3768 3769void 3770lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames) 3771{ 3772 PWRITEV_IOVECS = iovecs; 3773 PWRITEV_FRAMES = frames; 3774} 3775 3776 3777#endif 3778 3779struct pwritev_ctx 3780{ 3781 struct iovec *iov; 3782 const struct hq_arr *hq_arr; 3783 size_t total_bytes; 3784 size_t n_to_write; 3785 unsigned n_iovecs, max_iovecs; 3786}; 3787 3788 3789static size_t 3790pwritev_size (void *lsqr_ctx) 3791{ 3792 struct pwritev_ctx *const ctx = lsqr_ctx; 3793 3794 if (ctx->n_iovecs < ctx->max_iovecs 3795 && ctx->hq_arr->count < ctx->hq_arr->max) 3796 return ctx->n_to_write - ctx->total_bytes; 3797 else 3798 return 0; 3799} 3800 3801 3802static size_t 3803pwritev_read (void *lsqr_ctx, void *buf, size_t count) 3804{ 3805 struct pwritev_ctx *const ctx = lsqr_ctx; 3806 3807 assert(ctx->n_iovecs < ctx->max_iovecs); 3808 ctx->iov[ctx->n_iovecs].iov_base = buf; 3809 ctx->iov[ctx->n_iovecs].iov_len = count; 3810 ++ctx->n_iovecs; 3811 ctx->total_bytes += count; 3812 return count; 3813} 3814 3815 3816/* pwritev works as follows: allocate packets via lsquic_stream_writef() call 3817 * and record pointers and sizes into an iovec array. Then issue a single call 3818 * to user-supplied preadv() to populate all packets in one shot. 3819 * 3820 * Unwinding state changes due to a short write is by far the most complicated 3821 * part of the machinery that follows. We optimize the normal path: it should 3822 * be cheap to be prepared for the unwinding; unwinding itself can be more 3823 * expensive, as we do not expect it to happen often. 3824 */ 3825ssize_t 3826lsquic_stream_pwritev (struct lsquic_stream *stream, 3827 ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt), 3828 void *user_data, size_t n_to_write) 3829{ 3830 struct lsquic_send_ctl *const ctl = stream->conn_pub->send_ctl; 3831#if MALLOC_PWRITEV 3832 struct iovec *iovecs; 3833 unsigned char **hq_frames; 3834#else 3835 struct iovec iovecs[PWRITEV_IOVECS]; 3836 unsigned char *hq_frames[PWRITEV_FRAMES]; 3837#endif 3838 struct iovec *last_iov; 3839 struct pwritev_ctx ctx; 3840 struct lsquic_reader reader; 3841 struct send_ctl_state ctl_state; 3842 struct hq_arr hq_arr; 3843 ssize_t nw; 3844 size_t n_allocated, sum; 3845#ifndef NDEBUG 3846 const unsigned short n_buffered = stream->sm_n_buffered; 3847#endif 3848 3849 COMMON_WRITE_CHECKS(); 3850 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3851 3852#if MALLOC_PWRITEV 3853 iovecs = malloc(sizeof(iovecs[0]) * PWRITEV_IOVECS); 3854 hq_frames = malloc(sizeof(hq_frames[0]) * PWRITEV_FRAMES); 3855 if (!(iovecs && hq_frames)) 3856 { 3857 free(iovecs); 3858 free(hq_frames); 3859 return -1; 3860 } 3861#endif 3862 3863 lsquic_send_ctl_snapshot(ctl, &ctl_state); 3864 3865 ctx.total_bytes = 0; 3866 ctx.n_to_write = n_to_write; 3867 ctx.n_iovecs = 0; 3868 ctx.max_iovecs = PWRITEV_IOVECS; 3869 ctx.iov = iovecs; 3870 ctx.hq_arr = &hq_arr; 3871 3872 hq_arr.p = hq_frames; 3873 hq_arr.count = 0; 3874 hq_arr.max = PWRITEV_FRAMES; 3875 stream->sm_hq_arr = &hq_arr; 3876 3877 reader.lsqr_ctx = &ctx; 3878 reader.lsqr_size = pwritev_size; 3879 reader.lsqr_read = pwritev_read; 3880 3881 nw = stream_write(stream, &reader, 0); 3882 LSQ_DEBUG("pwritev: stream_write returned %zd, n_iovecs: %d", nw, 3883 ctx.n_iovecs); 3884 if (nw > 0) 3885 { 3886 /* Amount of buffered data shouldn't have increased */ 3887 assert(n_buffered >= stream->sm_n_buffered); 3888 n_allocated = (size_t) nw; 3889 nw = preadv(user_data, ctx.iov, ctx.n_iovecs); 3890 LSQ_DEBUG("pwritev: preadv returned %zd", nw); 3891 if (nw >= 0 && (size_t) nw < n_allocated) 3892 goto unwind_short_write; 3893 } 3894 3895 cleanup: 3896 stream->sm_hq_arr = NULL; 3897#if MALLOC_PWRITEV 3898 free(iovecs); 3899 free(hq_frames); 3900#endif 3901 return nw; 3902 3903 unwind_short_write: 3904 /* What follows is not the most efficient process. The emphasis here is 3905 * on being simple instead. We expect short writes to be rare, so being 3906 * slower than possible is a good tradeoff for being correct. 3907 */ 3908 LSQ_DEBUG("short write occurred, unwind"); 3909 SM_HISTORY_APPEND(stream, SHE_SHORT_WRITE); 3910 3911 /* First, adjust connection cap and stream offsets, and HTTP/3 framing, 3912 * if necessary. 3913 */ 3914 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 3915 == (SMBF_USE_HEADERS|SMBF_IETF)) 3916 { 3917 size_t shortfall, payload_sz, decr; 3918 unsigned char *p; 3919 unsigned bits; 3920 3921 assert(hq_arr.count > 0); 3922 shortfall = n_allocated - (size_t) nw; 3923 do 3924 { 3925 const unsigned count = hq_arr.count; 3926 (void) count; 3927 p = hq_frames[--hq_arr.count]; 3928 assert(p[0] == HQFT_DATA); 3929 assert(!(p[1] & 0x80)); /* Only one- and two-byte frame sizes */ 3930 if (p[1] & 0x40) 3931 { 3932 payload_sz = (p[1] & 0x3F) << 8; 3933 payload_sz |= p[2]; 3934 } 3935 else 3936 payload_sz = p[1]; 3937 if (payload_sz > shortfall) 3938 { 3939 bits = p[1] >> 6; 3940 vint_write(p + 1, payload_sz - shortfall, bits, 1 << bits); 3941 decr = shortfall; 3942 if (stream->sm_bflags & SMBF_CONN_LIMITED) 3943 stream->conn_pub->conn_cap.cc_sent -= decr; 3944 stream->sm_payload -= decr; 3945 stream->tosend_off -= decr; 3946 shortfall = 0; 3947 } 3948 else 3949 { 3950 decr = payload_sz + 2 + (p[1] >> 6); 3951 if (stream->sm_bflags & SMBF_CONN_LIMITED) 3952 stream->conn_pub->conn_cap.cc_sent -= decr; 3953 stream->sm_payload -= payload_sz; 3954 stream->tosend_off -= decr; 3955 shortfall -= payload_sz; 3956 } 3957 } 3958 while (hq_arr.count); 3959 assert(shortfall == 0); 3960 } 3961 else 3962 { 3963 const size_t shortfall = n_allocated - (size_t) nw; 3964 if (stream->sm_bflags & SMBF_CONN_LIMITED) 3965 stream->conn_pub->conn_cap.cc_sent -= shortfall; 3966 stream->sm_payload -= shortfall; 3967 stream->tosend_off -= shortfall; 3968 } 3969 3970 /* Find last iovec: */ 3971 sum = 0; 3972 for (last_iov = iovecs; last_iov < iovecs + PWRITEV_IOVECS; ++last_iov) 3973 { 3974 sum += last_iov->iov_len; 3975 if ((last_iov == iovecs || (size_t) nw > sum - last_iov->iov_len) 3976 && (size_t) nw <= sum) 3977 break; 3978 } 3979 assert(last_iov < iovecs + PWRITEV_IOVECS); 3980 lsquic_send_ctl_rollback(ctl, &ctl_state, last_iov, sum - nw); 3981 3982 goto cleanup; 3983} 3984 3985 3986/* This bypasses COMMON_WRITE_CHECKS */ 3987static ssize_t 3988stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3989{ 3990 const struct iovec iov[1] = {{ (void *) buf, sz, }}; 3991 struct inner_reader_iovec iro = { 3992 .iov = iov, 3993 .end = iov + 1, 3994 .cur_iovec_off = 0, 3995 }; 3996 struct lsquic_reader reader = { 3997 .lsqr_read = inner_reader_iovec_read, 3998 .lsqr_size = inner_reader_iovec_size, 3999 .lsqr_ctx = &iro, 4000 }; 4001 return stream_write(stream, &reader, SWO_BUFFER); 4002} 4003 4004 4005/* This limits the cumulative size of the compressed header fields */ 4006#define MAX_HEADERS_SIZE (64 * 1024) 4007 4008static int 4009send_headers_ietf (struct lsquic_stream *stream, 4010 const struct lsquic_http_headers *headers, int eos) 4011{ 4012 enum qwh_status qwh; 4013 const size_t max_prefix_size = 4014 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 4015 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 4016 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 4017 unsigned bits; 4018 ssize_t nw; 4019 unsigned char *header_block; 4020 enum lsqpack_enc_header_flags hflags; 4021 int rv; 4022 const size_t buf_sz = max_push_size + max_prefix_size + MAX_HEADERS_SIZE; 4023#ifndef WIN32 4024 unsigned char buf[buf_sz]; 4025#else 4026 unsigned char *buf = _malloca(buf_sz); 4027 if (!buf) 4028 return -1; 4029#endif 4030 4031 stream->stream_flags &= ~STREAM_PUSHING; 4032 stream->stream_flags |= STREAM_NOPUSH; 4033 4034 /* TODO: Optimize for the common case: write directly to sm_buf and fall 4035 * back to a larger buffer if that fails. 4036 */ 4037 prefix_sz = max_prefix_size; 4038 headers_sz = buf_sz - max_prefix_size - max_push_size; 4039 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 4040 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 4041 &headers_sz, &stream->sm_hb_compl, &hflags); 4042 4043 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 4044 { 4045 if (qwh == QWH_ENOBUF) 4046 LSQ_INFO("not enough room for header block"); 4047 else 4048 LSQ_WARN("internal error encoding and sending HTTP headers"); 4049 goto err; 4050 } 4051 4052 if (hflags & LSQECH_REF_NEW_ENTRIES) 4053 stream->stream_flags |= STREAM_ENCODER_DEP; 4054 4055 if (stream->sm_promise) 4056 { 4057 assert(lsquic_stream_is_pushed(stream)); 4058 bits = vint_val2bits(stream->sm_promise->pp_id); 4059 push_sz = 1 + (1 << bits); 4060 if (!stream_activate_hq_frame(stream, 4061 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 4062 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 4063 goto err; 4064 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 4065 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 4066 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 4067 } 4068 else 4069 push_sz = 0; 4070 4071 /* Construct contiguous header block buffer including HQ framing */ 4072 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 4073 hblock_sz = push_sz + prefix_sz + headers_sz; 4074 if (!stream_activate_hq_frame(stream, 4075 stream->sm_payload + stream->sm_n_buffered + push_sz, 4076 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 4077 goto err; 4078 4079 if (qwh == QWH_FULL) 4080 { 4081 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 4082 if (lsquic_stream_write_avail(stream)) 4083 { 4084 nw = stream_write_buf(stream, header_block, hblock_sz); 4085 if (nw < 0) 4086 { 4087 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 4088 goto err; 4089 } 4090 if ((size_t) nw == hblock_sz) 4091 { 4092 stream->stream_flags |= STREAM_HEADERS_SENT; 4093 stream_hblock_sent(stream); 4094 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 4095 goto end; 4096 } 4097 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 4098 } 4099 else 4100 { 4101 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 4102 "header block", hblock_sz); 4103 nw = 0; 4104 } 4105 } 4106 else 4107 { 4108 stream->sm_send_headers_state = SSHS_ENC_SENDING; 4109 nw = 0; 4110 } 4111 4112 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4113 stream_wantwrite(stream, 1); 4114 4115 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 4116 if (!stream->sm_header_block) 4117 { 4118 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 4119 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 4120 goto err; 4121 } 4122 memcpy(stream->sm_header_block, header_block + (size_t) nw, 4123 hblock_sz - (size_t) nw); 4124 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 4125 stream->sm_hblock_off = 0; 4126 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 4127 4128 end: 4129 rv = 0; 4130 clean: 4131#ifdef WIN32 4132 _freea(buf); 4133#endif 4134 return rv; 4135 4136 err: 4137 rv = -1; 4138 goto clean; 4139} 4140 4141 4142static int 4143send_headers_gquic (struct lsquic_stream *stream, 4144 const struct lsquic_http_headers *headers, int eos) 4145{ 4146 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 4147 stream->id, headers, eos, lsquic_stream_priority(stream)); 4148 if (0 == s) 4149 { 4150 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 4151 stream->stream_flags |= STREAM_HEADERS_SENT; 4152 if (eos) 4153 stream->stream_flags |= STREAM_FIN_SENT; 4154 LSQ_INFO("sent headers"); 4155 } 4156 else 4157 LSQ_WARN("could not send headers: %s", strerror(errno)); 4158 return s; 4159} 4160 4161 4162int 4163lsquic_stream_send_headers (lsquic_stream_t *stream, 4164 const lsquic_http_headers_t *headers, int eos) 4165{ 4166 if ((stream->sm_bflags & SMBF_USE_HEADERS) 4167 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 4168 { 4169 if (stream->sm_bflags & SMBF_IETF) 4170 return send_headers_ietf(stream, headers, eos); 4171 else 4172 return send_headers_gquic(stream, headers, eos); 4173 } 4174 else 4175 { 4176 LSQ_INFO("cannot send headers in this state"); 4177 errno = EBADMSG; 4178 return -1; 4179 } 4180} 4181 4182 4183void 4184lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 4185{ 4186 if (offset > stream->max_send_off) 4187 { 4188 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 4189 LSQ_DEBUG("update max send offset from %"PRIu64" to " 4190 "%"PRIu64, stream->max_send_off, offset); 4191 stream->max_send_off = offset; 4192 } 4193 else 4194 LSQ_DEBUG("new offset %"PRIu64" is not larger than old " 4195 "max send offset %"PRIu64", ignoring", offset, 4196 stream->max_send_off); 4197} 4198 4199 4200/* This function is used to update offsets after handshake completes and we 4201 * learn of peer's limits from the handshake values. 4202 */ 4203int 4204lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 4205{ 4206 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 4207 if (offset > stream->max_send_off) 4208 { 4209 lsquic_stream_window_update(stream, offset); 4210 return 0; 4211 } 4212 else if (offset < stream->tosend_off) 4213 { 4214 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 4215 "data already sent on this stream (%"PRIu64" bytes)", offset, 4216 stream->tosend_off); 4217 return -1; 4218 } 4219 else 4220 { 4221 stream->max_send_off = offset; 4222 return 0; 4223 } 4224} 4225 4226 4227void 4228lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 4229{ 4230 lsquic_stream_reset_ext(stream, error_code, 1); 4231} 4232 4233 4234void 4235lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 4236 int do_close) 4237{ 4238 if ((stream->stream_flags & STREAM_RST_SENT) 4239 || (stream->sm_qflags & SMQF_SEND_RST)) 4240 { 4241 LSQ_INFO("reset already sent"); 4242 return; 4243 } 4244 4245 SM_HISTORY_APPEND(stream, SHE_RESET); 4246 4247 LSQ_INFO("reset, error code %"PRIu64, error_code); 4248 stream->error_code = error_code; 4249 4250 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 4251 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 4252 next_send_stream); 4253 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 4254 stream->sm_qflags |= SMQF_SEND_RST; 4255 4256 if (stream->sm_qflags & SMQF_QPACK_DEC) 4257 { 4258 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 4259 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4260 } 4261 4262 drop_buffered_data(stream); 4263 maybe_elide_stream_frames(stream); 4264 maybe_schedule_call_on_close(stream); 4265 4266 if (do_close) 4267 lsquic_stream_close(stream); 4268 else 4269 maybe_conn_to_tickable_if_writeable(stream, 1); 4270} 4271 4272 4273lsquic_stream_id_t 4274lsquic_stream_id (const lsquic_stream_t *stream) 4275{ 4276 return stream->id; 4277} 4278 4279 4280#if !defined(NDEBUG) && __GNUC__ 4281__attribute__((weak)) 4282#endif 4283struct lsquic_conn * 4284lsquic_stream_conn (const lsquic_stream_t *stream) 4285{ 4286 return stream->conn_pub->lconn; 4287} 4288 4289 4290int 4291lsquic_stream_close (lsquic_stream_t *stream) 4292{ 4293 LSQ_DEBUG("lsquic_stream_close() called"); 4294 SM_HISTORY_APPEND(stream, SHE_CLOSE); 4295 if (lsquic_stream_is_closed(stream)) 4296 { 4297 LSQ_INFO("Attempt to close an already-closed stream"); 4298 errno = EBADF; 4299 return -1; 4300 } 4301 maybe_stream_shutdown_write(stream); 4302 stream_shutdown_read(stream); 4303 maybe_schedule_call_on_close(stream); 4304 maybe_finish_stream(stream); 4305 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 4306 maybe_conn_to_tickable_if_writeable(stream, 1); 4307 return 0; 4308} 4309 4310 4311#ifndef NDEBUG 4312#if __GNUC__ 4313__attribute__((weak)) 4314#endif 4315#endif 4316void 4317lsquic_stream_acked (struct lsquic_stream *stream, 4318 enum quic_frame_type frame_type) 4319{ 4320 assert(stream->n_unacked); 4321 --stream->n_unacked; 4322 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 4323 if (frame_type == QUIC_FRAME_RST_STREAM) 4324 { 4325 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 4326 LSQ_DEBUG("RESET that we sent has been acked by peer"); 4327 stream->stream_flags |= STREAM_RST_ACKED; 4328 } 4329 if (0 == stream->n_unacked) 4330 { 4331 maybe_schedule_call_on_close(stream); 4332 maybe_finish_stream(stream); 4333 } 4334} 4335 4336 4337void 4338lsquic_stream_push_req (lsquic_stream_t *stream, 4339 struct uncompressed_headers *push_req) 4340{ 4341 assert(!stream->push_req); 4342 stream->push_req = push_req; 4343 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 4344} 4345 4346 4347int 4348lsquic_stream_is_pushed (const lsquic_stream_t *stream) 4349{ 4350 enum stream_id_type sit; 4351 4352 switch (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 4353 { 4354 case SMBF_IETF|SMBF_USE_HEADERS: 4355 sit = stream->id & SIT_MASK; 4356 return sit == SIT_UNI_SERVER; 4357 case SMBF_USE_HEADERS: 4358 return 1 & ~stream->id; 4359 default: 4360 return 0; 4361 } 4362} 4363 4364 4365int 4366lsquic_stream_push_info (const lsquic_stream_t *stream, 4367 lsquic_stream_id_t *ref_stream_id, void **hset) 4368{ 4369 if (lsquic_stream_is_pushed(stream)) 4370 { 4371 assert(stream->push_req); 4372 *ref_stream_id = stream->push_req->uh_stream_id; 4373 *hset = stream->push_req->uh_hset; 4374 return 0; 4375 } 4376 else 4377 return -1; 4378} 4379 4380 4381static int 4382stream_uh_in_gquic (struct lsquic_stream *stream, 4383 struct uncompressed_headers *uh) 4384{ 4385 if ((stream->sm_bflags & SMBF_USE_HEADERS) 4386 && !(stream->stream_flags & STREAM_HAVE_UH)) 4387 { 4388 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 4389 LSQ_DEBUG("received uncompressed headers"); 4390 stream->stream_flags |= STREAM_HAVE_UH; 4391 if (uh->uh_flags & UH_FIN) 4392 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 4393 stream->uh = uh; 4394 if (uh->uh_oth_stream_id == 0) 4395 { 4396 if (uh->uh_weight) 4397 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 4398 } 4399 else 4400 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 4401 uh->uh_oth_stream_id); 4402 return 0; 4403 } 4404 else 4405 { 4406 LSQ_ERROR("received unexpected uncompressed headers"); 4407 return -1; 4408 } 4409} 4410 4411 4412static int 4413stream_uh_in_ietf (struct lsquic_stream *stream, 4414 struct uncompressed_headers *uh) 4415{ 4416 int push_promise; 4417 4418 push_promise = lsquic_stream_header_is_pp(stream); 4419 if (!(stream->stream_flags & STREAM_HAVE_UH) && !push_promise) 4420 { 4421 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 4422 LSQ_DEBUG("received uncompressed headers"); 4423 stream->stream_flags |= STREAM_HAVE_UH; 4424 if (uh->uh_flags & UH_FIN) 4425 { 4426 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 4427 * mark request as done: 4428 */ 4429 if (stream->sm_bflags & SMBF_IETF) 4430 assert((stream->sm_bflags & SMBF_SERVER) 4431 && lsquic_stream_is_pushed(stream)); 4432 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 4433 } 4434 stream->uh = uh; 4435 if (uh->uh_oth_stream_id == 0) 4436 { 4437 if (uh->uh_weight) 4438 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 4439 } 4440 else 4441 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 4442 uh->uh_oth_stream_id); 4443 } 4444 else 4445 { 4446 /* Trailer should never make here, as we discard it in qdh */ 4447 LSQ_DEBUG("discard %s header set", 4448 push_promise ? "push promise" : "trailer"); 4449 if (uh->uh_hset) 4450 stream->conn_pub->enpub->enp_hsi_if 4451 ->hsi_discard_header_set(uh->uh_hset); 4452 free(uh); 4453 } 4454 4455 return 0; 4456} 4457 4458 4459int 4460lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 4461{ 4462 if (stream->sm_bflags & SMBF_USE_HEADERS) 4463 { 4464 if (stream->sm_bflags & SMBF_IETF) 4465 return stream_uh_in_ietf(stream, uh); 4466 else 4467 return stream_uh_in_gquic(stream, uh); 4468 } 4469 else 4470 return -1; 4471} 4472 4473 4474unsigned 4475lsquic_stream_priority (const lsquic_stream_t *stream) 4476{ 4477 if (stream->sm_bflags & SMBF_HTTP_PRIO) 4478 return stream->sm_priority; 4479 else 4480 return 256 - stream->sm_priority; 4481} 4482 4483 4484int 4485lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 4486{ 4487 /* The user should never get a reference to the special streams, 4488 * but let's check just in case: 4489 */ 4490 if (lsquic_stream_is_critical(stream)) 4491 return -1; 4492 4493 if (stream->sm_bflags & SMBF_HTTP_PRIO) 4494 { 4495 if (priority > LSQUIC_MAX_HTTP_URGENCY) 4496 return -1; 4497 stream->sm_priority = priority; 4498 } 4499 else 4500 { 4501 if (priority < 1 || priority > 256) 4502 return -1; 4503 stream->sm_priority = 256 - priority; 4504 } 4505 4506 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 4507 LSQ_DEBUG("set priority to %u", priority); 4508 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 4509 return 0; 4510} 4511 4512 4513static int 4514maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 4515{ 4516 if ((stream->sm_bflags & SMBF_USE_HEADERS) 4517 && (stream->stream_flags & STREAM_HEADERS_SENT)) 4518 { 4519 /* We need to send headers only if we are a) using HEADERS stream 4520 * and b) we already sent initial headers. If initial headers 4521 * have not been sent yet, stream priority will be sent in the 4522 * HEADERS frame. 4523 */ 4524 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 4525 stream->id, 0, 0, priority); 4526 } 4527 else 4528 return 0; 4529} 4530 4531 4532static int 4533send_priority_ietf (struct lsquic_stream *stream) 4534{ 4535 struct lsquic_ext_http_prio ehp; 4536 4537 if (0 == lsquic_stream_get_http_prio(stream, &ehp) 4538 && 0 == lsquic_hcso_write_priority_update( 4539 stream->conn_pub->u.ietf.hcso, 4540 HQFT_PRIORITY_UPDATE_STREAM, stream->id, &ehp)) 4541 return 0; 4542 else 4543 return -1; 4544} 4545 4546 4547int 4548lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 4549{ 4550 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 4551 { 4552 if (stream->sm_bflags & SMBF_IETF) 4553 { 4554 if (stream->sm_bflags & SMBF_HTTP_PRIO) 4555 return send_priority_ietf(stream); 4556 else 4557 return 0; 4558 } 4559 else 4560 return maybe_send_priority_gquic(stream, priority); 4561 } 4562 else 4563 return -1; 4564} 4565 4566 4567lsquic_stream_ctx_t * 4568lsquic_stream_get_ctx (const lsquic_stream_t *stream) 4569{ 4570 fiu_return_on("stream/get_ctx", NULL); 4571 return stream->st_ctx; 4572} 4573 4574 4575int 4576lsquic_stream_refuse_push (lsquic_stream_t *stream) 4577{ 4578 if (lsquic_stream_is_pushed(stream) 4579 && !(stream->sm_qflags & SMQF_SEND_RST) 4580 && !(stream->stream_flags & STREAM_RST_SENT)) 4581 { 4582 LSQ_DEBUG("refusing pushed stream: send reset"); 4583 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 4584 return 0; 4585 } 4586 else 4587 return -1; 4588} 4589 4590 4591size_t 4592lsquic_stream_mem_used (const struct lsquic_stream *stream) 4593{ 4594 size_t size; 4595 4596 size = sizeof(stream); 4597 if (stream->sm_buf) 4598 size += stream->sm_n_allocated; 4599 if (stream->data_in) 4600 size += stream->data_in->di_if->di_mem_used(stream->data_in); 4601 4602 return size; 4603} 4604 4605 4606const lsquic_cid_t * 4607lsquic_stream_cid (const struct lsquic_stream *stream) 4608{ 4609 return LSQUIC_LOG_CONN_ID; 4610} 4611 4612 4613void 4614lsquic_stream_dump_state (const struct lsquic_stream *stream) 4615{ 4616 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 4617 stream->read_offset); 4618 stream->data_in->di_if->di_dump_state(stream->data_in); 4619} 4620 4621 4622void * 4623lsquic_stream_get_hset (struct lsquic_stream *stream) 4624{ 4625 void *hset; 4626 4627 if (stream_is_read_reset(stream)) 4628 { 4629 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 4630 errno = ECONNRESET; 4631 return NULL; 4632 } 4633 4634 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 4635 && (stream->stream_flags & STREAM_HAVE_UH))) 4636 { 4637 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 4638 stream->stream_flags); 4639 return NULL; 4640 } 4641 4642 if (!stream->uh) 4643 { 4644 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 4645 return NULL; 4646 } 4647 4648 hset = stream->uh->uh_hset; 4649 stream->uh->uh_hset = NULL; 4650 destroy_uh(stream); 4651 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 4652 { 4653 stream->stream_flags |= STREAM_FIN_REACHED; 4654 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 4655 } 4656 maybe_update_last_progress(stream); 4657 LSQ_DEBUG("return header set"); 4658 return hset; 4659} 4660 4661 4662void 4663lsquic_stream_set_stream_if (struct lsquic_stream *stream, 4664 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 4665{ 4666 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 4667 stream->stream_if = stream_if; 4668 stream->sm_onnew_arg = stream_if_ctx; 4669 LSQ_DEBUG("switched interface"); 4670 assert(stream->stream_flags & STREAM_ONNEW_DONE); 4671 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 4672 stream); 4673} 4674 4675 4676static int 4677update_type_hist_and_check (const struct lsquic_stream *stream, 4678 struct hq_filter *filter) 4679{ 4680 /* 3-bit codes: */ 4681 enum { 4682 CODE_UNSET, 4683 CODE_HEADER, /* H Header */ 4684 CODE_DATA, /* D Data */ 4685 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 4686 }; 4687 static const unsigned valid_seqs[] = { 4688 /* Ordered by expected frequency */ 4689 0123, /* HD+ */ 4690 012, /* HD */ 4691 01, /* H */ 4692 013, /* H+ */ /* Really HH, but we don't record it like this */ 4693 01231, /* HD+H */ 4694 0121, /* HDH */ 4695 }; 4696 unsigned code, i; 4697 4698 switch (filter->hqfi_type) 4699 { 4700 case HQFT_HEADERS: 4701 code = CODE_HEADER; 4702 break; 4703 case HQFT_DATA: 4704 code = CODE_DATA; 4705 break; 4706 case HQFT_PUSH_PROMISE: 4707 /* [draft-ietf-quic-http-24], Section 7 */ 4708 if ((stream->id & SIT_MASK) == SIT_BIDI_CLIENT 4709 && !(stream->sm_bflags & SMBF_SERVER)) 4710 return 0; 4711 else 4712 return -1; 4713 case HQFT_CANCEL_PUSH: 4714 case HQFT_SETTINGS: 4715 case HQFT_GOAWAY: 4716 case HQFT_MAX_PUSH_ID: 4717 /* [draft-ietf-quic-http-24], Section 7 */ 4718 return -1; 4719 case 2: /* HTTP/2 PRIORITY */ 4720 case 6: /* HTTP/2 PING */ 4721 case 8: /* HTTP/2 WINDOW_UPDATE */ 4722 case 9: /* HTTP/2 CONTINUATION */ 4723 /* [draft-ietf-quic-http-30], Section 7.2.8 */ 4724 return -1; 4725 case HQFT_PRIORITY_UPDATE_STREAM: 4726 case HQFT_PRIORITY_UPDATE_PUSH: 4727 if (stream->sm_bflags & SMBF_HTTP_PRIO) 4728 /* If we know about Extensible HTTP Priorities, we should check 4729 * that they do not arrive on any but the control stream: 4730 */ 4731 return -1; 4732 else 4733 /* On the other hand, if we do not support Priorities, treat it 4734 * as an unknown frame: 4735 */ 4736 return 0; 4737 default: 4738 /* Ignore unknown frames */ 4739 return 0; 4740 } 4741 4742 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 4743 return -1; 4744 4745 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 4746 { 4747 filter->hqfi_hist_buf <<= 3; 4748 filter->hqfi_hist_buf |= CODE_PLUS; 4749 filter->hqfi_hist_idx++; 4750 } 4751 else if (filter->hqfi_hist_idx > 1 4752 && ((filter->hqfi_hist_buf >> 3) & 7) == code 4753 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 4754 /* Keep it at plus, do nothing */; 4755 else 4756 { 4757 filter->hqfi_hist_buf <<= 3; 4758 filter->hqfi_hist_buf |= code; 4759 filter->hqfi_hist_idx++; 4760 } 4761 4762 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 4763 if (filter->hqfi_hist_buf == valid_seqs[i]) 4764 return 0; 4765 4766 return -1; 4767} 4768 4769 4770int 4771lsquic_stream_header_is_pp (const struct lsquic_stream *stream) 4772{ 4773 return stream->sm_hq_filter.hqfi_type == HQFT_PUSH_PROMISE; 4774} 4775 4776 4777int 4778lsquic_stream_header_is_trailer (const struct lsquic_stream *stream) 4779{ 4780 return (stream->stream_flags & STREAM_HAVE_UH) 4781 && stream->sm_hq_filter.hqfi_type == HQFT_HEADERS; 4782} 4783 4784 4785static void 4786verify_cl_on_new_data_frame (struct lsquic_stream *stream, 4787 struct hq_filter *filter) 4788{ 4789 struct lsquic_conn *lconn; 4790 4791 stream->sm_data_in += filter->hqfi_left; 4792 if (stream->sm_data_in > stream->sm_cont_len) 4793 { 4794 lconn = stream->conn_pub->lconn; 4795 lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR, 4796 "number of bytes in DATA frames of stream %"PRIu64" exceeds " 4797 "content-length limit of %llu", stream->id, stream->sm_cont_len); 4798 } 4799} 4800 4801 4802static size_t 4803hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 4804{ 4805 struct lsquic_stream *const stream = ctx; 4806 struct hq_filter *const filter = &stream->sm_hq_filter; 4807 const unsigned char *p = buf, *prev; 4808 const unsigned char *const end = buf + sz; 4809 struct lsquic_conn *lconn; 4810 enum lsqpack_read_header_status rhs; 4811 int s; 4812 4813 while (p < end) 4814 { 4815 switch (filter->hqfi_state) 4816 { 4817 case HQFI_STATE_FRAME_HEADER_BEGIN: 4818 filter->hqfi_vint2_state.vr2s_state = 0; 4819 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 4820 /* fall-through */ 4821 case HQFI_STATE_FRAME_HEADER_CONTINUE: 4822 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint2_state); 4823 if (s < 0) 4824 break; 4825 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 4826 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 4827 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 4828 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 4829 filter->hqfi_left); 4830 if (0 != update_type_hist_and_check(stream, filter)) 4831 { 4832 lconn = stream->conn_pub->lconn; 4833 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4834 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 4835 filter->hqfi_hist_buf); 4836 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED, 4837 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 4838 stream->id); 4839 goto end; 4840 } 4841 if (filter->hqfi_left > 0) 4842 { 4843 if (filter->hqfi_type == HQFT_DATA) 4844 { 4845 if (stream->sm_bflags & SMBF_VERIFY_CL) 4846 verify_cl_on_new_data_frame(stream, filter); 4847 goto end; 4848 } 4849 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 4850 { 4851 if (stream->sm_bflags & SMBF_SERVER) 4852 { 4853 lconn = stream->conn_pub->lconn; 4854 lconn->cn_if->ci_abort_error(lconn, 1, 4855 HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame " 4856 "on stream %"PRIu64" (clients are not supposed to " 4857 "send those)", stream->id); 4858 goto end; 4859 } 4860 else 4861 filter->hqfi_state = HQFI_STATE_PUSH_ID_BEGIN; 4862 } 4863 } 4864 else 4865 { 4866 switch (filter->hqfi_type) 4867 { 4868 case HQFT_CANCEL_PUSH: 4869 case HQFT_GOAWAY: 4870 case HQFT_HEADERS: 4871 case HQFT_MAX_PUSH_ID: 4872 case HQFT_PUSH_PROMISE: 4873 case HQFT_SETTINGS: 4874 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4875 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 4876 filter->hqfi_type); 4877 abort_connection(stream); /* XXX Overkill? */ 4878 goto end; 4879 default: 4880 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4881 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4882 break; 4883 } 4884 } 4885 break; 4886 case HQFI_STATE_PUSH_ID_BEGIN: 4887 filter->hqfi_vint1_state.pos = 0; 4888 filter->hqfi_state = HQFI_STATE_PUSH_ID_CONTINUE; 4889 /* Fall-through */ 4890 case HQFI_STATE_PUSH_ID_CONTINUE: 4891 prev = p; 4892 s = lsquic_varint_read_nb(&p, end, &filter->hqfi_vint1_state); 4893 filter->hqfi_left -= p - prev; 4894 if (s == 0) 4895 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 4896 /* A bit of a white lie here */ 4897 break; 4898 case HQFI_STATE_READING_PAYLOAD: 4899 if (filter->hqfi_type == HQFT_DATA) 4900 goto end; 4901 sz = filter->hqfi_left; 4902 if (sz > (uintptr_t) (end - p)) 4903 sz = (uintptr_t) (end - p); 4904 switch (filter->hqfi_type) 4905 { 4906 case HQFT_HEADERS: 4907 case HQFT_PUSH_PROMISE: 4908 prev = p; 4909 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 4910 { 4911 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4912 rhs = lsquic_qdh_header_in_begin( 4913 stream->conn_pub->u.ietf.qdh, 4914 stream, filter->hqfi_left, &p, sz); 4915 } 4916 else 4917 rhs = lsquic_qdh_header_in_continue( 4918 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 4919 assert(p > prev || LQRHS_ERROR == rhs); 4920 filter->hqfi_left -= p - prev; 4921 if (filter->hqfi_left == 0) 4922 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4923 switch (rhs) 4924 { 4925 case LQRHS_DONE: 4926 assert(filter->hqfi_left == 0); 4927 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4928 break; 4929 case LQRHS_NEED: 4930 stream->sm_qflags |= SMQF_QPACK_DEC; 4931 break; 4932 case LQRHS_BLOCKED: 4933 stream->sm_qflags |= SMQF_QPACK_DEC; 4934 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 4935 goto end; 4936 default: 4937 assert(LQRHS_ERROR == rhs); 4938 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4939 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4940 LSQ_INFO("error processing header block"); 4941 abort_connection(stream); /* XXX Overkill? */ 4942 goto end; 4943 } 4944 break; 4945 default: 4946 /* Simply skip unknown frame type payload for now */ 4947 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4948 p += sz; 4949 filter->hqfi_left -= sz; 4950 if (filter->hqfi_left == 0) 4951 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4952 break; 4953 } 4954 break; 4955 default: 4956 assert(0); 4957 goto end; 4958 } 4959 } 4960 4961 end: 4962 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4963 { 4964 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4965 filter->hqfi_state); 4966 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4967/* From [draft-ietf-quic-http-28] Section 7.1: 4968 " When a stream terminates cleanly, if the last frame on the stream was 4969 " truncated, this MUST be treated as a connection error (Section 8) of 4970 " type H3_FRAME_ERROR. Streams which terminate abruptly may be reset 4971 " at any point in a frame. 4972 */ 4973 lconn = stream->conn_pub->lconn; 4974 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_ERROR, 4975 "last HTTP/3 frame on stream %"PRIu64" was truncated", stream->id); 4976 } 4977 4978 return p - buf; 4979} 4980 4981 4982static int 4983hq_filter_readable_now (const struct lsquic_stream *stream) 4984{ 4985 const struct hq_filter *const filter = &stream->sm_hq_filter; 4986 4987 return (filter->hqfi_type == HQFT_DATA 4988 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4989 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4990 || stream->uh 4991 || (stream->stream_flags & STREAM_FIN_REACHED) 4992 ; 4993} 4994 4995 4996static int 4997hq_filter_readable (struct lsquic_stream *stream) 4998{ 4999 struct hq_filter *const filter = &stream->sm_hq_filter; 5000 ssize_t nread; 5001 5002 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 5003 return 0; 5004 5005 if (!hq_filter_readable_now(stream)) 5006 { 5007 nread = read_data_frames(stream, 0, hq_read, stream); 5008 if (nread <= 0) 5009 { 5010 if (nread < 0) 5011 { 5012 filter->hqfi_flags |= HQFI_FLAG_ERROR; 5013 abort_connection(stream); /* XXX Overkill? */ 5014 return 1; /* Collect error */ 5015 } 5016 return 0; 5017 } 5018 } 5019 5020 return hq_filter_readable_now(stream); 5021} 5022 5023 5024static size_t 5025hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 5026{ 5027 struct hq_filter *const filter = &stream->sm_hq_filter; 5028 size_t nr; 5029 5030 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 5031 && filter->hqfi_type == HQFT_DATA)) 5032 { 5033 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 5034 data_frame->df_size - data_frame->df_read_off, 5035 data_frame->df_fin); 5036 if (nr) 5037 { 5038 stream->read_offset += nr; 5039 stream_consumed_bytes(stream); 5040 } 5041 } 5042 else 5043 nr = 0; 5044 5045 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 5046 { 5047 data_frame->df_read_off += nr; 5048 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 5049 && filter->hqfi_type == HQFT_DATA) 5050 return MIN(filter->hqfi_left, 5051 (unsigned) data_frame->df_size - data_frame->df_read_off); 5052 else 5053 { 5054 assert(data_frame->df_read_off == data_frame->df_size); 5055 return 0; 5056 } 5057 } 5058 else 5059 { 5060 data_frame->df_read_off = data_frame->df_size; 5061 return 0; 5062 } 5063} 5064 5065 5066static void 5067hq_decr_left (struct lsquic_stream *stream, size_t read) 5068{ 5069 struct hq_filter *const filter = &stream->sm_hq_filter; 5070 5071 if (read) 5072 { 5073 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 5074 && filter->hqfi_type == HQFT_DATA); 5075 assert(read <= filter->hqfi_left); 5076 } 5077 5078 filter->hqfi_left -= read; 5079 if (0 == filter->hqfi_left) 5080 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 5081} 5082 5083 5084/* These are IETF QUIC states */ 5085enum stream_state_sending 5086lsquic_stream_sending_state (const struct lsquic_stream *stream) 5087{ 5088 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 5089 { 5090 if (stream->stream_flags & STREAM_FIN_SENT) 5091 { 5092 if (stream->n_unacked) 5093 return SSS_DATA_SENT; 5094 else 5095 return SSS_DATA_RECVD; 5096 } 5097 else 5098 { 5099 if (stream->tosend_off 5100 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 5101 return SSS_SEND; 5102 else 5103 return SSS_READY; 5104 } 5105 } 5106 else if (stream->stream_flags & STREAM_RST_ACKED) 5107 return SSS_RESET_RECVD; 5108 else 5109 return SSS_RESET_SENT; 5110} 5111 5112 5113const char *const lsquic_sss2str[] = 5114{ 5115 [SSS_READY] = "Ready", 5116 [SSS_SEND] = "Send", 5117 [SSS_DATA_SENT] = "Data Sent", 5118 [SSS_RESET_SENT] = "Reset Sent", 5119 [SSS_DATA_RECVD] = "Data Recvd", 5120 [SSS_RESET_RECVD] = "Reset Recvd", 5121}; 5122 5123 5124const char *const lsquic_ssr2str[] = 5125{ 5126 [SSR_RECV] = "Recv", 5127 [SSR_SIZE_KNOWN] = "Size Known", 5128 [SSR_DATA_RECVD] = "Data Recvd", 5129 [SSR_RESET_RECVD] = "Reset Recvd", 5130 [SSR_DATA_READ] = "Data Read", 5131 [SSR_RESET_READ] = "Reset Read", 5132}; 5133 5134 5135/* These are IETF QUIC states */ 5136enum stream_state_receiving 5137lsquic_stream_receiving_state (struct lsquic_stream *stream) 5138{ 5139 uint64_t n_bytes; 5140 5141 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 5142 { 5143 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 5144 return SSR_RECV; 5145 if (stream->stream_flags & STREAM_FIN_REACHED) 5146 return SSR_DATA_READ; 5147 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 5148 { 5149 n_bytes = stream->data_in->di_if->di_readable_bytes( 5150 stream->data_in, stream->read_offset); 5151 if (stream->read_offset + n_bytes == stream->sm_fin_off) 5152 { 5153 stream->stream_flags |= STREAM_DATA_RECVD; 5154 return SSR_DATA_RECVD; 5155 } 5156 else 5157 return SSR_SIZE_KNOWN; 5158 } 5159 else 5160 return SSR_DATA_RECVD; 5161 } 5162 else if (stream->stream_flags & STREAM_RST_READ) 5163 return SSR_RESET_READ; 5164 else 5165 return SSR_RESET_RECVD; 5166} 5167 5168 5169void 5170lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 5171{ 5172 struct hq_filter *const filter = &stream->sm_hq_filter; 5173 5174 assert(stream->sm_qflags & SMQF_QPACK_DEC); 5175 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 5176 5177 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 5178 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 5179 LSQ_DEBUG("QPACK decoder unblocked"); 5180} 5181 5182 5183int 5184lsquic_stream_is_rejected (const struct lsquic_stream *stream) 5185{ 5186 return stream->stream_flags & STREAM_SS_RECVD; 5187} 5188 5189 5190int 5191lsquic_stream_can_push (const struct lsquic_stream *stream) 5192{ 5193 if (lsquic_stream_is_pushed(stream)) 5194 return 0; 5195 else if (stream->sm_bflags & SMBF_IETF) 5196 return (stream->sm_bflags & SMBF_USE_HEADERS) 5197 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 5198 && stream->sm_send_headers_state == SSHS_BEGIN 5199 ; 5200 else 5201 return 1; 5202} 5203 5204 5205static size_t 5206pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 5207{ 5208 struct push_promise *const promise = lsqr_ctx; 5209 unsigned char *dst = buf; 5210 unsigned char *const end = dst + count; 5211 size_t len; 5212 5213 while (dst < end) 5214 { 5215 switch (promise->pp_write_state) 5216 { 5217 case PPWS_ID0: 5218 case PPWS_ID1: 5219 case PPWS_ID2: 5220 case PPWS_ID3: 5221 case PPWS_ID4: 5222 case PPWS_ID5: 5223 case PPWS_ID6: 5224 case PPWS_ID7: 5225 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 5226 ++promise->pp_write_state; 5227 break; 5228 case PPWS_PFX0: 5229 *dst++ = 0; 5230 ++promise->pp_write_state; 5231 break; 5232 case PPWS_PFX1: 5233 *dst++ = 0; 5234 ++promise->pp_write_state; 5235 break; 5236 case PPWS_HBLOCK: 5237 len = MIN(promise->pp_content_len - promise->pp_write_off, 5238 (size_t) (end - dst)); 5239 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 5240 len); 5241 promise->pp_write_off += len; 5242 dst += len; 5243 if (promise->pp_content_len == promise->pp_write_off) 5244 { 5245 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 5246 ": reset push state", promise->pp_id); 5247 promise->pp_write_state = PPWS_DONE; 5248 } 5249 goto end; 5250 default: 5251 goto end; 5252 } 5253 } 5254 5255 end: 5256 return dst - (unsigned char *) buf; 5257} 5258 5259 5260static size_t 5261pp_reader_size (void *lsqr_ctx) 5262{ 5263 struct push_promise *const promise = lsqr_ctx; 5264 size_t size; 5265 5266 size = 0; 5267 switch (promise->pp_write_state) 5268 { 5269 case PPWS_ID0: 5270 case PPWS_ID1: 5271 case PPWS_ID2: 5272 case PPWS_ID3: 5273 case PPWS_ID4: 5274 case PPWS_ID5: 5275 case PPWS_ID6: 5276 case PPWS_ID7: 5277 size += 8 - promise->pp_write_state; 5278 /* fall-through */ 5279 case PPWS_PFX0: 5280 ++size; 5281 /* fall-through */ 5282 case PPWS_PFX1: 5283 ++size; 5284 /* fall-through */ 5285 case PPWS_HBLOCK: 5286 size += promise->pp_content_len - promise->pp_write_off; 5287 break; 5288 default: 5289 break; 5290 } 5291 5292 return size; 5293} 5294 5295 5296static void 5297init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 5298{ 5299 reader->lsqr_read = pp_reader_read; 5300 reader->lsqr_size = pp_reader_size; 5301 reader->lsqr_ctx = promise; 5302} 5303 5304 5305static void 5306on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 5307{ 5308 struct lsquic_reader pp_reader; 5309 struct push_promise *promise; 5310 ssize_t nw; 5311 int want_write; 5312 5313 assert(stream_is_pushing_promise(stream)); 5314 5315 promise = SLIST_FIRST(&stream->sm_promises); 5316 init_pp_reader(promise, &pp_reader); 5317 nw = stream_write(stream, &pp_reader, SWO_BUFFER); 5318 if (nw > 0) 5319 { 5320 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 5321 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 5322 if (promise->pp_write_state == PPWS_DONE) 5323 { 5324 /* Restore want_write flag */ 5325 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 5326 if (want_write != stream->sm_saved_want_write) 5327 (void) lsquic_stream_wantwrite(stream, 5328 stream->sm_saved_want_write); 5329 } 5330 } 5331 else if (nw < 0) 5332 { 5333 LSQ_WARN("could not write push promise (wrapper)"); 5334 /* XXX What should happen if we hit an error? TODO */ 5335 } 5336} 5337 5338 5339/* Success means that the push promise has been placed on sm_promises list and 5340 * the stream now owns it. Failure means that the push promise should be 5341 * destroyed by the caller. 5342 * 5343 * A push promise is written immediately. If it cannot be written to packets 5344 * or buffered whole, the stream is marked as unable to push further promises. 5345 */ 5346int 5347lsquic_stream_push_promise (struct lsquic_stream *stream, 5348 struct push_promise *promise) 5349{ 5350 struct lsquic_reader pp_reader; 5351 unsigned bits, len; 5352 ssize_t nw; 5353 5354 assert(stream->sm_bflags & SMBF_IETF); 5355 assert(lsquic_stream_can_push(stream)); 5356 5357 bits = vint_val2bits(promise->pp_id); 5358 len = 1 << bits; 5359 promise->pp_write_state = 8 - len; 5360 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 5361 bits, 1 << bits); 5362 5363 if (!stream_activate_hq_frame(stream, 5364 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 5365 SHF_FIXED_SIZE, pp_reader_size(promise))) 5366 return -1; 5367 5368 stream->stream_flags |= STREAM_PUSHING; 5369 5370 init_pp_reader(promise, &pp_reader); 5371 nw = stream_write(stream, &pp_reader, SWO_BUFFER); 5372 if (nw > 0) 5373 { 5374 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 5375 ++promise->pp_refcnt; 5376 if (promise->pp_write_state == PPWS_DONE) 5377 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 5378 else 5379 { 5380 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 5381 ", disable further pushing", promise->pp_id, 5382 promise->pp_write_state, promise->pp_write_off); 5383 stream->stream_flags |= STREAM_NOPUSH; 5384 stream->sm_saved_want_write = 5385 !!(stream->sm_qflags & SMQF_WANT_WRITE); 5386 stream_wantwrite(stream, 1); 5387 } 5388 return 0; 5389 } 5390 else 5391 { 5392 if (nw < 0) 5393 LSQ_WARN("failure writing push promise"); 5394 stream->stream_flags |= STREAM_NOPUSH; 5395 stream->stream_flags &= ~STREAM_PUSHING; 5396 return -1; 5397 } 5398} 5399 5400 5401int 5402lsquic_stream_verify_len (struct lsquic_stream *stream, 5403 unsigned long long cont_len) 5404{ 5405 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 5406 == (SMBF_IETF|SMBF_USE_HEADERS)) 5407 { 5408 stream->sm_cont_len = cont_len; 5409 stream->sm_bflags |= SMBF_VERIFY_CL; 5410 LSQ_DEBUG("will verify that incoming DATA frames have %llu bytes", 5411 cont_len); 5412 return 0; 5413 } 5414 else 5415 return -1; 5416} 5417 5418 5419int 5420lsquic_stream_get_http_prio (struct lsquic_stream *stream, 5421 struct lsquic_ext_http_prio *ehp) 5422{ 5423 if (stream->sm_bflags & SMBF_HTTP_PRIO) 5424 { 5425 ehp->urgency = MIN(stream->sm_priority, LSQUIC_MAX_HTTP_URGENCY); 5426 ehp->incremental = !!(stream->sm_bflags & SMBF_INCREMENTAL); 5427 return 0; 5428 } 5429 else 5430 return -1; 5431} 5432 5433 5434int 5435lsquic_stream_set_http_prio (struct lsquic_stream *stream, 5436 const struct lsquic_ext_http_prio *ehp) 5437{ 5438 if (stream->sm_bflags & SMBF_HTTP_PRIO) 5439 { 5440 if (ehp->urgency > LSQUIC_MAX_HTTP_URGENCY) 5441 { 5442 LSQ_INFO("%s: invalid urgency: %hhu", __func__, ehp->urgency); 5443 return -1; 5444 } 5445 stream->sm_priority = ehp->urgency; 5446 if (ehp->incremental) 5447 stream->sm_bflags |= SMBF_INCREMENTAL; 5448 else 5449 stream->sm_bflags &= ~SMBF_INCREMENTAL; 5450 stream->sm_bflags |= SMBF_HPRIO_SET; 5451 LSQ_DEBUG("set urgency to %hhu, incremental to %hhd", ehp->urgency, 5452 ehp->incremental); 5453 if (!(stream->sm_bflags & SMBF_SERVER)) 5454 return send_priority_ietf(stream); 5455 else 5456 return 0; 5457 } 5458 else 5459 return -1; 5460} 5461 5462 5463int 5464lsquic_stream_has_unacked_data (struct lsquic_stream *stream) 5465{ 5466 return stream->n_unacked > 0 || stream->sm_n_buffered > 0; 5467} 5468