lsquic_stream.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_varint.h" 39#include "lsquic_hq.h" 40#include "lsquic_hash.h" 41#include "lsquic_stream.h" 42#include "lsquic_conn_public.h" 43#include "lsquic_util.h" 44#include "lsquic_mm.h" 45#include "lsquic_headers_stream.h" 46#include "lsquic_conn.h" 47#include "lsquic_data_in_if.h" 48#include "lsquic_parse.h" 49#include "lsquic_packet_out.h" 50#include "lsquic_engine_public.h" 51#include "lsquic_senhist.h" 52#include "lsquic_pacer.h" 53#include "lsquic_cubic.h" 54#include "lsquic_bw_sampler.h" 55#include "lsquic_minmax.h" 56#include "lsquic_bbr.h" 57#include "lsquic_send_ctl.h" 58#include "lsquic_headers.h" 59#include "lsquic_ev_log.h" 60#include "lsquic_enc_sess.h" 61#include "lsqpack.h" 62#include "lsquic_frab_list.h" 63#include "lsquic_http1x_if.h" 64#include "lsquic_qdec_hdl.h" 65#include "lsquic_qenc_hdl.h" 66#include "lsquic_byteswap.h" 67#include "lsquic_ietf.h" 68#include "lsquic_push_promise.h" 69 70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 72#define LSQUIC_LOG_STREAM_ID stream->id 73#include "lsquic_logger.h" 74 75#define MIN(a, b) ((a) < (b) ? (a) : (b)) 76 77static void 78drop_frames_in (lsquic_stream_t *stream); 79 80static void 81maybe_schedule_call_on_close (lsquic_stream_t *stream); 82 83static int 84stream_wantread (lsquic_stream_t *stream, int is_want); 85 86static int 87stream_wantwrite (lsquic_stream_t *stream, int is_want); 88 89static ssize_t 90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 91 92static ssize_t 93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 94 95static int 96stream_flush (lsquic_stream_t *stream); 97 98static int 99stream_flush_nocheck (lsquic_stream_t *stream); 100 101static void 102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 103 104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 105 106static enum swtp_status 107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 108 109static enum swtp_status 110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 111 112static enum swtp_status 113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 114 115static size_t 116stream_write_avail_no_frames (struct lsquic_stream *); 117 118static size_t 119stream_write_avail_with_frames (struct lsquic_stream *); 120 121static size_t 122stream_write_avail_with_headers (struct lsquic_stream *); 123 124static int 125hq_filter_readable (struct lsquic_stream *stream); 126 127static void 128hq_decr_left (struct lsquic_stream *stream, size_t); 129 130static size_t 131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 132 133static int 134stream_readable_non_http (struct lsquic_stream *stream); 135 136static int 137stream_readable_http_gquic (struct lsquic_stream *stream); 138 139static int 140stream_readable_http_ietf (struct lsquic_stream *stream); 141 142static ssize_t 143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 144 145static size_t 146active_hq_frame_sizes (const struct lsquic_stream *); 147 148static void 149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 150 151static void 152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 153 154static void 155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 156 157static size_t 158stream_hq_frame_size (const struct stream_hq_frame *); 159 160const struct stream_filter_if hq_stream_filter_if = 161{ 162 .sfi_readable = hq_filter_readable, 163 .sfi_filter_df = hq_filter_df, 164 .sfi_decr_left = hq_decr_left, 165}; 166 167 168#if LSQUIC_KEEP_STREAM_HISTORY 169/* These values are printable ASCII characters for ease of printing the 170 * whole history in a single line of a log message. 171 * 172 * The list of events is not exhaustive: only most interesting events 173 * are recorded. 174 */ 175enum stream_history_event 176{ 177 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 178 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 179 SHE_REACH_FIN = 'a', 180 SHE_BLOCKED_OUT = 'b', 181 SHE_CREATED = 'C', 182 SHE_FRAME_IN = 'd', 183 SHE_FRAME_OUT = 'D', 184 SHE_RESET = 'e', 185 SHE_WINDOW_UPDATE = 'E', 186 SHE_FIN_IN = 'f', 187 SHE_FINISHED = 'F', 188 SHE_GOAWAY_IN = 'g', 189 SHE_USER_WRITE_HEADER = 'h', 190 SHE_HEADERS_IN = 'H', 191 SHE_IF_SWITCH = 'i', 192 SHE_ONCLOSE_SCHED = 'l', 193 SHE_ONCLOSE_CALL = 'L', 194 SHE_ONNEW = 'N', 195 SHE_SET_PRIO = 'p', 196 SHE_USER_READ = 'r', 197 SHE_SHUTDOWN_READ = 'R', 198 SHE_RST_IN = 's', 199 SHE_SS_IN = 'S', 200 SHE_RST_OUT = 't', 201 SHE_RST_ACKED = 'T', 202 SHE_FLUSH = 'u', 203 SHE_USER_WRITE_DATA = 'w', 204 SHE_SHUTDOWN_WRITE = 'W', 205 SHE_CLOSE = 'X', 206 SHE_DELAY_SW = 'y', 207 SHE_FORCE_FINISH = 'Z', 208 SHE_WANTREAD_NO = '0', /* "YES" must be one more than "NO" */ 209 SHE_WANTREAD_YES = '1', 210 SHE_WANTWRITE_NO = '2', 211 SHE_WANTWRITE_YES = '3', 212}; 213 214static void 215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 216{ 217 enum stream_history_event prev_event; 218 sm_hist_idx_t idx; 219 int plus; 220 221 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 222 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 223 idx = (idx - plus) & SM_HIST_IDX_MASK; 224 prev_event = stream->sm_hist_buf[idx]; 225 226 if (prev_event == sh_event && plus) 227 return; 228 229 if (prev_event == sh_event) 230 sh_event = SHE_PLUS; 231 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 232 233 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 234 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 235 stream->sm_hist_buf); 236} 237 238 239# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 240# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 241 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 242 LSQ_DEBUG("history: [%.*s]", \ 243 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 244 (stream)->sm_hist_buf); \ 245 } while (0) 246#else 247# define SM_HISTORY_APPEND(stream, event) 248# define SM_HISTORY_DUMP_REMAINING(stream) 249#endif 250 251 252static int 253stream_inside_callback (const lsquic_stream_t *stream) 254{ 255 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 256} 257 258 259static void 260maybe_conn_to_tickable (lsquic_stream_t *stream) 261{ 262 if (!stream_inside_callback(stream)) 263 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 264 stream->conn_pub->lconn); 265} 266 267 268/* Here, "readable" means that the user is able to read from the stream. */ 269static void 270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 271{ 272 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 273 { 274 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 275 stream->conn_pub->lconn); 276 } 277} 278 279 280/* Here, "writeable" means that data can be put into packets to be 281 * scheduled to be sent out. 282 * 283 * If `check_can_send' is false, it means that we do not need to check 284 * whether packets can be sent. This check was already performed when 285 * we packetized stream data. 286 */ 287static void 288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 289 int check_can_send) 290{ 291 if (!stream_inside_callback(stream) && 292 (!check_can_send 293 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 294 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 295 { 296 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 297 stream->conn_pub->lconn); 298 } 299} 300 301 302static int 303stream_stalled (const lsquic_stream_t *stream) 304{ 305 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 306 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 307 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 308} 309 310 311static size_t 312stream_stream_frame_header_sz (const struct lsquic_stream *stream, 313 unsigned data_sz) 314{ 315 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 316 stream->id, stream->tosend_off, data_sz); 317} 318 319 320static size_t 321stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 322 unsigned data_sz_IGNORED) 323{ 324 return stream->conn_pub->lconn->cn_pf 325 ->pf_calc_crypto_frame_header_sz(stream->tosend_off); 326} 327 328 329/* GQUIC-only function */ 330static int 331stream_is_hsk (const struct lsquic_stream *stream) 332{ 333 if (stream->sm_bflags & SMBF_IETF) 334 return 0; 335 else 336 return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE; 337} 338 339 340static struct lsquic_stream * 341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 342 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 343 enum stream_ctor_flags ctor_flags) 344{ 345 struct lsquic_stream *stream; 346 347 stream = calloc(1, sizeof(*stream)); 348 if (!stream) 349 return NULL; 350 351 if (ctor_flags & SCF_USE_DI_HASH) 352 stream->data_in = data_in_hash_new(conn_pub, id, 0); 353 else 354 stream->data_in = data_in_nocopy_new(conn_pub, id); 355 if (!stream->data_in) 356 { 357 free(stream); 358 return NULL; 359 } 360 361 stream->id = id; 362 stream->stream_if = stream_if; 363 stream->conn_pub = conn_pub; 364 stream->sm_onnew_arg = stream_if_ctx; 365 stream->sm_write_avail = stream_write_avail_no_frames; 366 367 STAILQ_INIT(&stream->sm_hq_frames); 368 369 stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1); 370 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 371 stream->sm_bflags |= SMBF_SERVER; 372 373 return stream; 374} 375 376 377/* TODO: The logic to figure out whether the stream is connection limited 378 * should be taken out of the constructor. The caller should specify this 379 * via one of enum stream_ctor_flags. 380 */ 381lsquic_stream_t * 382lsquic_stream_new (lsquic_stream_id_t id, 383 struct lsquic_conn_public *conn_pub, 384 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 385 unsigned initial_window, uint64_t initial_send_off, 386 enum stream_ctor_flags ctor_flags) 387{ 388 lsquic_cfcw_t *cfcw; 389 lsquic_stream_t *stream; 390 391 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 392 ctor_flags); 393 if (!stream) 394 return NULL; 395 396 if (!initial_window) 397 initial_window = 16 * 1024; 398 399 if (ctor_flags & SCF_IETF) 400 { 401 cfcw = &conn_pub->cfcw; 402 stream->sm_bflags |= SMBF_CONN_LIMITED; 403 if (ctor_flags & SCF_HTTP) 404 { 405 stream->sm_write_avail = stream_write_avail_with_headers; 406 stream->sm_readable = stream_readable_http_ietf; 407 stream->sm_sfi = &hq_stream_filter_if; 408 } 409 else 410 stream->sm_readable = stream_readable_non_http; 411 lsquic_stream_set_priority_internal(stream, 412 LSQUIC_STREAM_DEFAULT_PRIO); 413 stream->sm_write_to_packet = stream_write_to_packet_std; 414 } 415 else 416 { 417 if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id)) 418 cfcw = NULL; 419 else 420 { 421 cfcw = &conn_pub->cfcw; 422 stream->sm_bflags |= SMBF_CONN_LIMITED; 423 lsquic_stream_set_priority_internal(stream, 424 LSQUIC_STREAM_DEFAULT_PRIO); 425 } 426 if (stream->sm_bflags & SMBF_USE_HEADERS) 427 stream->sm_readable = stream_readable_http_gquic; 428 else 429 stream->sm_readable = stream_readable_non_http; 430 if (stream_is_hsk(stream)) 431 stream->sm_write_to_packet = stream_write_to_packet_hsk; 432 else 433 stream->sm_write_to_packet = stream_write_to_packet_std; 434 } 435 436 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 437 stream->max_send_off = initial_send_off; 438 LSQ_DEBUG("created stream"); 439 SM_HISTORY_APPEND(stream, SHE_CREATED); 440 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 441 if (ctor_flags & SCF_CALL_ON_NEW) 442 lsquic_stream_call_on_new(stream); 443 return stream; 444} 445 446 447struct lsquic_stream * 448lsquic_stream_new_crypto (enum enc_level enc_level, 449 struct lsquic_conn_public *conn_pub, 450 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 451 enum stream_ctor_flags ctor_flags) 452{ 453 struct lsquic_stream *stream; 454 lsquic_stream_id_t stream_id; 455 456 assert(ctor_flags & SCF_CRITICAL); 457 458 stream_id = ~0ULL - enc_level; 459 stream = stream_new_common(stream_id, conn_pub, stream_if, 460 stream_if_ctx, ctor_flags); 461 if (!stream) 462 return NULL; 463 464 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 465 stream->sm_enc_level = enc_level; 466 /* TODO: why have limit in crypto stream? Set it to UINT64_MAX? */ 467 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 468 stream->max_send_off = 16 * 1024; 469 LSQ_DEBUG("created crypto stream"); 470 SM_HISTORY_APPEND(stream, SHE_CREATED); 471 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 472 stream->sm_write_to_packet = stream_write_to_packet_crypto; 473 stream->sm_readable = stream_readable_non_http; 474 if (ctor_flags & SCF_CALL_ON_NEW) 475 lsquic_stream_call_on_new(stream); 476 return stream; 477} 478 479 480void 481lsquic_stream_call_on_new (lsquic_stream_t *stream) 482{ 483 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 484 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 485 { 486 LSQ_DEBUG("calling on_new_stream"); 487 SM_HISTORY_APPEND(stream, SHE_ONNEW); 488 stream->stream_flags |= STREAM_ONNEW_DONE; 489 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 490 stream); 491 } 492} 493 494 495static void 496decr_conn_cap (struct lsquic_stream *stream, size_t incr) 497{ 498 if (stream->sm_bflags & SMBF_CONN_LIMITED) 499 { 500 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 501 stream->conn_pub->conn_cap.cc_sent -= incr; 502 } 503} 504 505 506static void 507maybe_resize_stream_buffer (struct lsquic_stream *stream) 508{ 509 assert(0 == stream->sm_n_buffered); 510 511 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 512 { 513 free(stream->sm_buf); 514 stream->sm_buf = NULL; 515 stream->sm_n_allocated = 0; 516 } 517 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 518 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 519} 520 521 522static void 523drop_buffered_data (struct lsquic_stream *stream) 524{ 525 decr_conn_cap(stream, stream->sm_n_buffered); 526 stream->sm_n_buffered = 0; 527 maybe_resize_stream_buffer(stream); 528 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 529 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 530} 531 532 533static void 534destroy_uh (struct lsquic_stream *stream) 535{ 536 if (stream->uh) 537 { 538 if (stream->uh->uh_hset) 539 stream->conn_pub->enpub->enp_hsi_if 540 ->hsi_discard_header_set(stream->uh->uh_hset); 541 free(stream->uh); 542 stream->uh = NULL; 543 } 544} 545 546 547void 548lsquic_stream_destroy (lsquic_stream_t *stream) 549{ 550 struct push_promise *promise; 551 struct stream_hq_frame *shf; 552 553 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 554 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 555 STREAM_ONNEW_DONE) 556 { 557 stream->stream_flags |= STREAM_ONCLOSE_DONE; 558 stream->stream_if->on_close(stream, stream->st_ctx); 559 } 560 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 561 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 562 if (stream->sm_qflags & SMQF_WANT_READ) 563 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 564 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 565 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 566 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 567 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 568 if (stream->sm_qflags & SMQF_QPACK_DEC) 569 lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream); 570 drop_buffered_data(stream); 571 lsquic_sfcw_consume_rem(&stream->fc); 572 drop_frames_in(stream); 573 if (stream->push_req) 574 { 575 if (stream->push_req->uh_hset) 576 stream->conn_pub->enpub->enp_hsi_if 577 ->hsi_discard_header_set(stream->push_req->uh_hset); 578 free(stream->push_req); 579 } 580 while ((promise = SLIST_FIRST(&stream->sm_promises))) 581 { 582 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 583 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 584 } 585 if (stream->sm_promise) 586 { 587 assert(stream->sm_promise->pp_pushed_stream == stream); 588 stream->sm_promise->pp_pushed_stream = NULL; 589 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 590 } 591 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 592 stream_hq_frame_put(stream, shf); 593 destroy_uh(stream); 594 free(stream->sm_buf); 595 free(stream->sm_header_block); 596 LSQ_DEBUG("destroyed stream"); 597 SM_HISTORY_DUMP_REMAINING(stream); 598 free(stream); 599} 600 601 602static int 603stream_is_finished (const lsquic_stream_t *stream) 604{ 605 return lsquic_stream_is_closed(stream) 606 /* n_unacked checks that no outgoing packets that reference this 607 * stream are outstanding: 608 */ 609 && 0 == stream->n_unacked 610 /* This checks that no packets that reference this stream will 611 * become outstanding: 612 */ 613 && 0 == (stream->sm_qflags & SMQF_SEND_RST) 614 && ((stream->stream_flags & STREAM_FORCE_FINISH) 615 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 616} 617 618 619/* This is an internal function */ 620void 621lsquic_stream_force_finish (struct lsquic_stream *stream) 622{ 623 LSQ_DEBUG("stream is now finished"); 624 SM_HISTORY_APPEND(stream, SHE_FINISHED); 625 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 626 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 627 next_service_stream); 628 stream->sm_qflags |= SMQF_FREE_STREAM; 629 stream->stream_flags |= STREAM_FINISHED; 630} 631 632 633static void 634maybe_finish_stream (lsquic_stream_t *stream) 635{ 636 if (0 == (stream->stream_flags & STREAM_FINISHED) && 637 stream_is_finished(stream)) 638 lsquic_stream_force_finish(stream); 639} 640 641 642static void 643maybe_schedule_call_on_close (lsquic_stream_t *stream) 644{ 645 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 646 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 647 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 648 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 649 { 650 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 651 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 652 next_service_stream); 653 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 654 LSQ_DEBUG("scheduled calling on_close"); 655 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 656 } 657} 658 659 660void 661lsquic_stream_call_on_close (lsquic_stream_t *stream) 662{ 663 assert(stream->stream_flags & STREAM_ONNEW_DONE); 664 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 665 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 666 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 667 next_service_stream); 668 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 669 { 670 LSQ_DEBUG("calling on_close"); 671 stream->stream_flags |= STREAM_ONCLOSE_DONE; 672 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 673 stream->stream_if->on_close(stream, stream->st_ctx); 674 } 675 else 676 assert(0); 677} 678 679 680static int 681stream_has_frame_at_read_offset (struct lsquic_stream *stream) 682{ 683 if (!((stream->stream_flags & STREAM_CACHED_FRAME) 684 && stream->read_offset == stream->sm_last_frame_off)) 685 { 686 stream->sm_has_frame = stream->data_in->di_if->di_get_frame( 687 stream->data_in, stream->read_offset) != NULL; 688 stream->sm_last_frame_off = stream->read_offset; 689 stream->stream_flags |= STREAM_CACHED_FRAME; 690 } 691 return stream->sm_has_frame; 692} 693 694 695static int 696stream_readable_non_http (struct lsquic_stream *stream) 697{ 698 return stream_has_frame_at_read_offset(stream); 699} 700 701 702static int 703stream_readable_http_gquic (struct lsquic_stream *stream) 704{ 705 return (stream->stream_flags & STREAM_HAVE_UH) 706 && (stream->uh 707 || stream_has_frame_at_read_offset(stream)); 708} 709 710 711static int 712stream_readable_http_ietf (struct lsquic_stream *stream) 713{ 714 return 715 /* If we have read the header set and the header set has not yet 716 * been read, the stream is readable. 717 */ 718 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 719 || 720 /* Alternatively, run the filter and check for payload availability. */ 721 (stream->sm_sfi->sfi_readable(stream) 722 && (/* Running the filter may result in hitting FIN: */ 723 (stream->stream_flags & STREAM_FIN_REACHED) 724 || stream_has_frame_at_read_offset(stream))); 725} 726 727 728int 729lsquic_stream_readable (struct lsquic_stream *stream) 730{ 731 /* A stream is readable if one of the following is true: */ 732 return 733 /* - It is already finished: in that case, lsquic_stream_read() will 734 * return 0. 735 */ 736 (stream->stream_flags & STREAM_FIN_REACHED) 737 /* - The stream is reset, by either side. In this case, 738 * lsquic_stream_read() will return -1 (we want the user to be 739 * able to collect the error). 740 */ 741 || lsquic_stream_is_reset(stream) 742 /* Type-dependent readability check: */ 743 || stream->sm_readable(stream); 744 ; 745} 746 747 748static size_t 749stream_write_avail_no_frames (struct lsquic_stream *stream) 750{ 751 uint64_t stream_avail, conn_avail; 752 753 stream_avail = stream->max_send_off - stream->tosend_off 754 - stream->sm_n_buffered; 755 756 if (stream->sm_bflags & SMBF_CONN_LIMITED) 757 { 758 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 759 if (conn_avail < stream_avail) 760 stream_avail = conn_avail; 761 } 762 763 return stream_avail; 764} 765 766 767static size_t 768stream_write_avail_with_frames (struct lsquic_stream *stream) 769{ 770 uint64_t stream_avail, conn_avail; 771 const struct stream_hq_frame *shf; 772 size_t size; 773 774 stream_avail = stream->max_send_off - stream->tosend_off 775 - stream->sm_n_buffered; 776 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 777 if (!(shf->shf_flags & SHF_WRITTEN)) 778 { 779 size = stream_hq_frame_size(shf); 780 assert(size <= stream_avail); 781 stream_avail -= size; 782 } 783 784 if (stream->sm_bflags & SMBF_CONN_LIMITED) 785 { 786 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 787 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 788 if (!(shf->shf_flags & SHF_CC_PAID)) 789 { 790 size = stream_hq_frame_size(shf); 791 if (size < conn_avail) 792 conn_avail -= size; 793 else 794 return 0; 795 } 796 if (conn_avail < stream_avail) 797 stream_avail = conn_avail; 798 } 799 800 if (stream_avail >= 3 /* Smallest new frame */) 801 return stream_avail; 802 else 803 return 0; 804} 805 806 807static int 808stream_is_pushing_promise (const struct lsquic_stream *stream) 809{ 810 return (stream->stream_flags & STREAM_PUSHING) 811 && SLIST_FIRST(&stream->sm_promises) 812 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 813 ; 814} 815 816 817/* To prevent deadlocks, ensure that when headers are sent, the bytes 818 * sent on the encoder stream are written first. 819 * 820 * XXX If the encoder is set up in non-risking mode, it is perfectly 821 * fine to send the header block first. TODO: update the logic to 822 * reflect this. There should be two sending behaviors: risk and non-risk. 823 * For now, we assume risk for everything to be on the conservative side. 824 */ 825static size_t 826stream_write_avail_with_headers (struct lsquic_stream *stream) 827{ 828 if (stream->stream_flags & STREAM_PUSHING) 829 return stream_write_avail_with_frames(stream); 830 831 switch (stream->sm_send_headers_state) 832 { 833 case SSHS_BEGIN: 834 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 835 case SSHS_ENC_SENDING: 836 if (stream->sm_hb_compl > 837 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 838 return 0; 839 LSQ_DEBUG("encoder stream bytes have all been sent"); 840 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 841 /* fall-through */ 842 default: 843 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 844 return stream_write_avail_with_frames(stream); 845 } 846} 847 848 849size_t 850lsquic_stream_write_avail (struct lsquic_stream *stream) 851{ 852 return stream->sm_write_avail(stream); 853} 854 855 856int 857lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 858{ 859 struct lsquic_conn *lconn; 860 861 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 862 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 863 { 864 if (stream->sm_bflags & SMBF_IETF) 865 { 866 lconn = stream->conn_pub->lconn; 867 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 868 "flow control violation on stream %"PRIu64, stream->id); 869 } 870 return -1; 871 } 872 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 873 { 874 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 875 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 876 next_send_stream); 877 stream->sm_qflags |= SMQF_SEND_WUF; 878 } 879 return 0; 880} 881 882 883int 884lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 885{ 886 uint64_t max_off; 887 int got_next_offset, rv, free_frame; 888 enum ins_frame ins_frame; 889 struct lsquic_conn *lconn; 890 891 assert(frame->packet_in); 892 893 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 894 LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; " 895 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 896 897 if ((stream->sm_bflags & SMBF_USE_HEADERS) 898 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 899 { 900 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 901 lsquic_malo_put(frame); 902 return -1; 903 } 904 905 if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF) 906 && (stream->stream_flags & STREAM_FIN_RECVD) 907 && stream->sm_fin_off != DF_END(frame)) 908 { 909 lconn = stream->conn_pub->lconn; 910 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 911 "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does " 912 "not match previous final size %"PRIu64, DF_END(frame), 913 stream->id, stream->sm_fin_off); 914 return -1; 915 } 916 917 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 918 insert_frame: 919 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 920 if (INS_FRAME_OK == ins_frame) 921 { 922 /* Update maximum offset in the flow controller and check for flow 923 * control violation: 924 */ 925 rv = -1; 926 free_frame = !stream->data_in->di_if->di_own_on_ok; 927 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 928 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 929 goto end_ok; 930 if (frame->data_frame.df_fin) 931 { 932 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 933 stream->stream_flags |= STREAM_FIN_RECVD; 934 stream->sm_fin_off = DF_END(frame); 935 maybe_finish_stream(stream); 936 } 937 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 938 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 939 { 940 stream->data_in = stream->data_in->di_if->di_switch_impl( 941 stream->data_in, stream->read_offset); 942 if (!stream->data_in) 943 { 944 stream->data_in = data_in_error_new(); 945 goto end_ok; 946 } 947 } 948 if (got_next_offset) 949 /* Checking the offset saves di_get_frame() call */ 950 maybe_conn_to_tickable_if_readable(stream); 951 rv = 0; 952 end_ok: 953 if (free_frame) 954 lsquic_malo_put(frame); 955 stream->stream_flags &= ~STREAM_CACHED_FRAME; 956 return rv; 957 } 958 else if (INS_FRAME_DUP == ins_frame) 959 { 960 return 0; 961 } 962 else if (INS_FRAME_OVERLAP == ins_frame) 963 { 964 LSQ_DEBUG("overlap: switching DATA IN implementation"); 965 stream->data_in = stream->data_in->di_if->di_switch_impl( 966 stream->data_in, stream->read_offset); 967 if (stream->data_in) 968 goto insert_frame; 969 stream->data_in = data_in_error_new(); 970 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 971 lsquic_malo_put(frame); 972 return -1; 973 } 974 else 975 { 976 assert(INS_FRAME_ERR == ins_frame); 977 return -1; 978 } 979} 980 981 982static void 983drop_frames_in (lsquic_stream_t *stream) 984{ 985 if (stream->data_in) 986 { 987 stream->data_in->di_if->di_destroy(stream->data_in); 988 /* To avoid checking whether `data_in` is set, just set to the error 989 * data-in stream. It does the right thing after incoming data is 990 * dropped. 991 */ 992 stream->data_in = data_in_error_new(); 993 stream->stream_flags &= ~STREAM_CACHED_FRAME; 994 } 995} 996 997 998static void 999maybe_elide_stream_frames (struct lsquic_stream *stream) 1000{ 1001 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 1002 { 1003 if (stream->n_unacked) 1004 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 1005 stream->id); 1006 stream->stream_flags |= STREAM_FRAMES_ELIDED; 1007 } 1008} 1009 1010 1011int 1012lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 1013 uint64_t error_code) 1014{ 1015 struct lsquic_conn *lconn; 1016 1017 if ((stream->sm_bflags & SMBF_IETF) 1018 && (stream->stream_flags & STREAM_FIN_RECVD) 1019 && stream->sm_fin_off != offset) 1020 { 1021 lconn = stream->conn_pub->lconn; 1022 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1023 "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") " 1024 "does not match previous final size %"PRIu64, offset, 1025 stream->id, stream->sm_fin_off); 1026 return -1; 1027 } 1028 1029 if (stream->stream_flags & STREAM_RST_RECVD) 1030 { 1031 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 1032 return 0; 1033 } 1034 1035 SM_HISTORY_APPEND(stream, SHE_RST_IN); 1036 /* This flag must always be set, even if we are "ignoring" it: it is 1037 * used by elision code. 1038 */ 1039 stream->stream_flags |= STREAM_RST_RECVD; 1040 1041 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 1042 { 1043 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is " 1044 "smaller than that of byte following the last byte we have seen: " 1045 "0x%"PRIX64, offset, 1046 lsquic_sfcw_get_max_recv_off(&stream->fc)); 1047 return -1; 1048 } 1049 1050 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 1051 { 1052 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64 1053 " violates flow control", offset); 1054 return -1; 1055 } 1056 1057 /* Let user collect error: */ 1058 maybe_conn_to_tickable_if_readable(stream); 1059 1060 lsquic_sfcw_consume_rem(&stream->fc); 1061 drop_frames_in(stream); 1062 drop_buffered_data(stream); 1063 maybe_elide_stream_frames(stream); 1064 1065 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1066 && !(stream->sm_qflags & SMQF_SEND_RST)) 1067 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1068 1069 stream->stream_flags |= STREAM_RST_RECVD; 1070 1071 maybe_finish_stream(stream); 1072 maybe_schedule_call_on_close(stream); 1073 1074 return 0; 1075} 1076 1077 1078void 1079lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1080 uint64_t error_code) 1081{ 1082 if (stream->stream_flags & STREAM_SS_RECVD) 1083 { 1084 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1085 return; 1086 } 1087 1088 SM_HISTORY_APPEND(stream, SHE_SS_IN); 1089 stream->stream_flags |= STREAM_SS_RECVD; 1090 1091 /* Let user collect error: */ 1092 maybe_conn_to_tickable_if_readable(stream); 1093 1094 lsquic_sfcw_consume_rem(&stream->fc); 1095 drop_frames_in(stream); 1096 drop_buffered_data(stream); 1097 maybe_elide_stream_frames(stream); 1098 1099 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1100 && !(stream->sm_qflags & SMQF_SEND_RST)) 1101 lsquic_stream_reset_ext(stream, error_code, 0); 1102 1103 maybe_finish_stream(stream); 1104 maybe_schedule_call_on_close(stream); 1105} 1106 1107 1108uint64_t 1109lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1110{ 1111 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1112} 1113 1114 1115void 1116lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1117{ 1118 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1119 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1120 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1121 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1122 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1123} 1124 1125 1126uint64_t 1127lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1128{ 1129 assert(stream->sm_qflags & SMQF_SEND_WUF); 1130 stream->sm_qflags &= ~SMQF_SEND_WUF; 1131 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1132 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1133 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1134} 1135 1136 1137void 1138lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1139{ 1140 uint64_t last_off; 1141 1142 if (stream->sm_last_recv_off) 1143 last_off = stream->sm_last_recv_off; 1144 else 1145 /* This gets advertized in transport parameters */ 1146 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1147 1148 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1149 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1150 1151 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1152 { 1153 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1154 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1155 next_send_stream); 1156 stream->sm_qflags |= SMQF_SEND_WUF; 1157 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1158 } 1159 else if (stream->sm_qflags & SMQF_SEND_WUF) 1160 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1161 else if (stream->sm_last_recv_off) 1162 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1163 "packetized or sent", stream->sm_last_recv_off); 1164 else 1165 LSQ_INFO("Peer should have receive transport param limit " 1166 "of %"PRIu64"; odd.", last_off); 1167} 1168 1169 1170/* GQUIC's BLOCKED frame does not have an offset */ 1171void 1172lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1173{ 1174 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1175 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1176 { 1177 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1178 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1179 next_send_stream); 1180 stream->sm_qflags |= SMQF_SEND_WUF; 1181 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1182 } 1183 else 1184 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1185} 1186 1187 1188void 1189lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1190{ 1191 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1192 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1193 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1194 stream->stream_flags |= STREAM_BLOCKED_SENT; 1195 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1196 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1197} 1198 1199 1200void 1201lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1202{ 1203 assert(stream->sm_qflags & SMQF_SEND_RST); 1204 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1205 stream->sm_qflags &= ~SMQF_SEND_RST; 1206 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1207 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1208 stream->stream_flags |= STREAM_RST_SENT; 1209 maybe_finish_stream(stream); 1210} 1211 1212 1213static size_t 1214read_uh (struct lsquic_stream *stream, 1215 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1216{ 1217 struct http1x_headers *const h1h = stream->uh->uh_hset; 1218 size_t nread; 1219 1220 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1221 h1h->h1h_size - h1h->h1h_off, 1222 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1223 h1h->h1h_off += nread; 1224 if (h1h->h1h_off == h1h->h1h_size) 1225 { 1226 LSQ_DEBUG("read all uncompressed headers"); 1227 destroy_uh(stream); 1228 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1229 { 1230 stream->stream_flags |= STREAM_FIN_REACHED; 1231 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1232 } 1233 } 1234 return nread; 1235} 1236 1237 1238static void 1239stream_consumed_bytes (struct lsquic_stream *stream) 1240{ 1241 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1242 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1243 { 1244 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1245 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1246 next_send_stream); 1247 stream->sm_qflags |= SMQF_SEND_WUF; 1248 maybe_conn_to_tickable_if_writeable(stream, 1); 1249 } 1250} 1251 1252 1253struct read_frames_status 1254{ 1255 int error; 1256 int processed_frames; 1257 size_t total_nread; 1258}; 1259 1260 1261static struct read_frames_status 1262read_data_frames (struct lsquic_stream *stream, int do_filtering, 1263 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1264{ 1265 struct data_frame *data_frame; 1266 size_t nread, toread, total_nread; 1267 int short_read, processed_frames; 1268 1269 processed_frames = 0; 1270 total_nread = 0; 1271 1272 while ((data_frame = stream->data_in->di_if->di_get_frame( 1273 stream->data_in, stream->read_offset))) 1274 { 1275 1276 ++processed_frames; 1277 1278 do 1279 { 1280 if (do_filtering && stream->sm_sfi) 1281 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1282 else 1283 toread = data_frame->df_size - data_frame->df_read_off; 1284 1285 if (toread || data_frame->df_fin) 1286 { 1287 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1288 toread, data_frame->df_fin); 1289 if (do_filtering && stream->sm_sfi) 1290 stream->sm_sfi->sfi_decr_left(stream, nread); 1291 data_frame->df_read_off += nread; 1292 stream->read_offset += nread; 1293 total_nread += nread; 1294 short_read = nread < toread; 1295 } 1296 else 1297 short_read = 0; 1298 1299 if (data_frame->df_read_off == data_frame->df_size) 1300 { 1301 const int fin = data_frame->df_fin; 1302 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1303 data_frame = NULL; 1304 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 1305 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 1306 { 1307 stream->data_in = stream->data_in->di_if->di_switch_impl( 1308 stream->data_in, stream->read_offset); 1309 if (!stream->data_in) 1310 { 1311 stream->data_in = data_in_error_new(); 1312 return (struct read_frames_status) { .error = 1, }; 1313 } 1314 } 1315 if (fin) 1316 { 1317 stream->stream_flags |= STREAM_FIN_REACHED; 1318 goto end_while; 1319 } 1320 } 1321 else if (short_read) 1322 goto end_while; 1323 } 1324 while (data_frame); 1325 } 1326 end_while: 1327 1328 if (processed_frames) 1329 stream_consumed_bytes(stream); 1330 1331 return (struct read_frames_status) { 1332 .error = 0, 1333 .processed_frames = processed_frames, 1334 .total_nread = total_nread, 1335 }; 1336} 1337 1338 1339static ssize_t 1340stream_readf (struct lsquic_stream *stream, 1341 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1342{ 1343 size_t total_nread, nread; 1344 int read_unc_headers; 1345 1346 total_nread = 0; 1347 1348 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1349 == (SMBF_USE_HEADERS|SMBF_IETF) 1350 && !(stream->stream_flags & STREAM_HAVE_UH) 1351 && !stream->uh) 1352 { 1353 if (stream->sm_readable(stream)) 1354 { 1355 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1356 { 1357 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1358 errno = EBADMSG; 1359 return -1; 1360 } 1361 assert(stream->uh); 1362 } 1363 else 1364 { 1365 errno = EWOULDBLOCK; 1366 return -1; 1367 } 1368 } 1369 1370 if (stream->uh) 1371 { 1372 if (stream->uh->uh_flags & UH_H1H) 1373 { 1374 nread = read_uh(stream, readf, ctx); 1375 read_unc_headers = nread > 0; 1376 total_nread += nread; 1377 if (stream->uh) 1378 return total_nread; 1379 } 1380 else 1381 { 1382 LSQ_INFO("header set not claimed: cannot read from stream"); 1383 return -1; 1384 } 1385 } 1386 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1387 && !(stream->stream_flags & STREAM_HAVE_UH)) 1388 { 1389 LSQ_DEBUG("cannot read: headers not available"); 1390 errno = EWOULDBLOCK; 1391 return -1; 1392 } 1393 else 1394 read_unc_headers = 0; 1395 1396 const struct read_frames_status rfs 1397 = read_data_frames(stream, 1, readf, ctx); 1398 if (rfs.error) 1399 return -1; 1400 total_nread += rfs.total_nread; 1401 1402 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 1403 total_nread, stream->read_offset); 1404 1405 if (rfs.processed_frames || read_unc_headers) 1406 { 1407 return total_nread; 1408 } 1409 else 1410 { 1411 assert(0 == total_nread); 1412 errno = EWOULDBLOCK; 1413 return -1; 1414 } 1415} 1416 1417 1418/* This function returns 0 when EOF is reached. 1419 */ 1420ssize_t 1421lsquic_stream_readf (struct lsquic_stream *stream, 1422 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1423{ 1424 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1425 1426 if (lsquic_stream_is_reset(stream)) 1427 { 1428 if (stream->stream_flags & STREAM_RST_RECVD) 1429 stream->stream_flags |= STREAM_RST_READ; 1430 errno = ECONNRESET; 1431 return -1; 1432 } 1433 if (stream->stream_flags & STREAM_U_READ_DONE) 1434 { 1435 errno = EBADF; 1436 return -1; 1437 } 1438 if ((stream->stream_flags & STREAM_FIN_REACHED) 1439 && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH) 1440 ^ !!(stream->sm_bflags & SMBF_USE_HEADERS))) 1441 return 0; 1442 1443 return stream_readf(stream, readf, ctx); 1444} 1445 1446 1447struct readv_ctx 1448{ 1449 const struct iovec *iov; 1450 const struct iovec *const end; 1451 unsigned char *p; 1452}; 1453 1454 1455static size_t 1456readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1457{ 1458 struct readv_ctx *const ctx = ctx_p; 1459 const unsigned char *const end = buf + len; 1460 size_t ntocopy; 1461 1462 while (ctx->iov < ctx->end && buf < end) 1463 { 1464 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1465 - ctx->p; 1466 if (ntocopy > (size_t) (end - buf)) 1467 ntocopy = end - buf; 1468 memcpy(ctx->p, buf, ntocopy); 1469 ctx->p += ntocopy; 1470 buf += ntocopy; 1471 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1472 { 1473 do 1474 ++ctx->iov; 1475 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1476 if (ctx->iov < ctx->end) 1477 ctx->p = ctx->iov->iov_base; 1478 else 1479 break; 1480 } 1481 } 1482 1483 return len - (end - buf); 1484} 1485 1486 1487ssize_t 1488lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1489 int iovcnt) 1490{ 1491 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1492 return lsquic_stream_readf(stream, readv_f, &ctx); 1493} 1494 1495 1496ssize_t 1497lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1498{ 1499 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1500 return lsquic_stream_readv(stream, &iov, 1); 1501} 1502 1503 1504static void 1505stream_shutdown_read (lsquic_stream_t *stream) 1506{ 1507 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1508 { 1509 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1510 stream->stream_flags |= STREAM_U_READ_DONE; 1511 stream_wantread(stream, 0); 1512 maybe_finish_stream(stream); 1513 } 1514} 1515 1516 1517static int 1518stream_is_incoming_unidir (const struct lsquic_stream *stream) 1519{ 1520 enum stream_id_type sit; 1521 1522 if (stream->sm_bflags & SMBF_IETF) 1523 { 1524 sit = stream->id & SIT_MASK; 1525 if (stream->sm_bflags & SMBF_SERVER) 1526 return sit == SIT_UNI_CLIENT; 1527 else 1528 return sit == SIT_UNI_SERVER; 1529 } 1530 else 1531 return 0; 1532} 1533 1534 1535static void 1536stream_shutdown_write (lsquic_stream_t *stream) 1537{ 1538 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1539 return; 1540 1541 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1542 stream->stream_flags |= STREAM_U_WRITE_DONE; 1543 stream_wantwrite(stream, 0); 1544 1545 /* Don't bother to check whether there is anything else to write if 1546 * the flags indicate that nothing else should be written. 1547 */ 1548 if (!(stream->sm_bflags & SMBF_CRYPTO) 1549 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1550 && !stream_is_incoming_unidir(stream) 1551 && !(stream->sm_qflags & SMQF_SEND_RST)) 1552 { 1553 if ((stream->sm_bflags & SMBF_USE_HEADERS) 1554 && !(stream->stream_flags & STREAM_HEADERS_SENT)) 1555 { 1556 LSQ_DEBUG("headers not sent, send a reset"); 1557 lsquic_stream_reset(stream, 0); 1558 } 1559 else if (stream->sm_n_buffered == 0) 1560 { 1561 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1562 stream)) 1563 { 1564 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1565 stream->stream_flags |= STREAM_FIN_SENT; 1566 } 1567 else 1568 { 1569 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1570 "flag in it"); 1571 (void) stream_flush_nocheck(stream); 1572 } 1573 } 1574 else 1575 (void) stream_flush_nocheck(stream); 1576 } 1577} 1578 1579 1580static void 1581maybe_stream_shutdown_write (struct lsquic_stream *stream) 1582{ 1583 if (stream->sm_send_headers_state == SSHS_BEGIN) 1584 stream_shutdown_write(stream); 1585 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1586 { 1587 LSQ_DEBUG("shutdown delayed"); 1588 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1589 stream->stream_flags |= STREAM_DELAYED_SW; 1590 } 1591} 1592 1593 1594int 1595lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1596{ 1597 LSQ_DEBUG("shutdown; how: %d", how); 1598 if (lsquic_stream_is_closed(stream)) 1599 { 1600 LSQ_INFO("Attempt to shut down a closed stream"); 1601 errno = EBADF; 1602 return -1; 1603 } 1604 /* 0: read, 1: write: 2: read and write 1605 */ 1606 if (how < 0 || how > 2) 1607 { 1608 errno = EINVAL; 1609 return -1; 1610 } 1611 1612 if (how) 1613 maybe_stream_shutdown_write(stream); 1614 if (how != 1) 1615 stream_shutdown_read(stream); 1616 1617 maybe_finish_stream(stream); 1618 maybe_schedule_call_on_close(stream); 1619 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1620 maybe_conn_to_tickable_if_writeable(stream, 1); 1621 1622 return 0; 1623} 1624 1625 1626void 1627lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1628{ 1629 LSQ_DEBUG("internal shutdown"); 1630 if (lsquic_stream_is_critical(stream)) 1631 { 1632 LSQ_DEBUG("add flag to force-finish special stream"); 1633 stream->stream_flags |= STREAM_FORCE_FINISH; 1634 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1635 } 1636 maybe_finish_stream(stream); 1637 maybe_schedule_call_on_close(stream); 1638} 1639 1640 1641static void 1642fake_reset_unused_stream (lsquic_stream_t *stream) 1643{ 1644 stream->stream_flags |= 1645 STREAM_RST_RECVD /* User will pick this up on read or write */ 1646 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1647 ; 1648 1649 /* Cancel all writes to the network scheduled for this stream: */ 1650 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1651 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1652 next_send_stream); 1653 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1654 drop_buffered_data(stream); 1655 LSQ_DEBUG("fake-reset stream%s", 1656 stream_stalled(stream) ? " (stalled)" : ""); 1657 maybe_finish_stream(stream); 1658 maybe_schedule_call_on_close(stream); 1659} 1660 1661 1662/* This function should only be called for locally-initiated streams whose ID 1663 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1664 * frame sent by peer but we have not yet received it and created a stream. 1665 * In this situation, we mark the stream as reset, so that user's on_read or 1666 * on_write event callback picks up the error. That, in turn, should result 1667 * in stream being closed. 1668 * 1669 * If we have received any data frames on this stream, this probably indicates 1670 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1671 * lower than this. However, we still try to handle it gracefully and peform 1672 * a shutdown, as if the stream was not reset. 1673 */ 1674void 1675lsquic_stream_received_goaway (lsquic_stream_t *stream) 1676{ 1677 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1678 if (0 == stream->read_offset && 1679 stream->data_in->di_if->di_empty(stream->data_in)) 1680 fake_reset_unused_stream(stream); /* Normal condition */ 1681 else 1682 { /* This is odd, let's handle it the best we can: */ 1683 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1684 lsquic_stream_shutdown_internal(stream); 1685 } 1686} 1687 1688 1689uint64_t 1690lsquic_stream_read_offset (const lsquic_stream_t *stream) 1691{ 1692 return stream->read_offset; 1693} 1694 1695 1696static int 1697stream_wantread (lsquic_stream_t *stream, int is_want) 1698{ 1699 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 1700 const int new_val = !!is_want; 1701 if (old_val != new_val) 1702 { 1703 if (new_val) 1704 { 1705 if (!old_val) 1706 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1707 next_read_stream); 1708 stream->sm_qflags |= SMQF_WANT_READ; 1709 } 1710 else 1711 { 1712 stream->sm_qflags &= ~SMQF_WANT_READ; 1713 if (old_val) 1714 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1715 next_read_stream); 1716 } 1717 } 1718 return old_val; 1719} 1720 1721 1722static void 1723maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1724{ 1725 assert(SMQF_WRITE_Q_FLAGS & flag); 1726 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1727 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1728 next_write_stream); 1729 stream->sm_qflags |= flag; 1730} 1731 1732 1733static void 1734maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1735{ 1736 assert(SMQF_WRITE_Q_FLAGS & flag); 1737 if (stream->sm_qflags & flag) 1738 { 1739 stream->sm_qflags &= ~flag; 1740 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1741 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1742 next_write_stream); 1743 } 1744} 1745 1746 1747static int 1748stream_wantwrite (struct lsquic_stream *stream, int new_val) 1749{ 1750 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1751 1752 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 1753 1754 if (old_val != new_val) 1755 { 1756 if (new_val) 1757 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 1758 else 1759 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 1760 } 1761 return old_val; 1762} 1763 1764 1765int 1766lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1767{ 1768 SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want); 1769 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1770 { 1771 if (is_want) 1772 maybe_conn_to_tickable_if_readable(stream); 1773 return stream_wantread(stream, is_want); 1774 } 1775 else 1776 { 1777 errno = EBADF; 1778 return -1; 1779 } 1780} 1781 1782 1783int 1784lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1785{ 1786 int old_val; 1787 1788 is_want = !!is_want; 1789 1790 SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want); 1791 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 1792 && SSHS_BEGIN == stream->sm_send_headers_state) 1793 { 1794 stream->sm_saved_want_write = is_want; 1795 if (is_want) 1796 maybe_conn_to_tickable_if_writeable(stream, 1); 1797 return stream_wantwrite(stream, is_want); 1798 } 1799 else if (SSHS_BEGIN != stream->sm_send_headers_state) 1800 { 1801 old_val = stream->sm_saved_want_write; 1802 stream->sm_saved_want_write = is_want; 1803 return old_val; 1804 } 1805 else 1806 { 1807 errno = EBADF; 1808 return -1; 1809 } 1810} 1811 1812 1813struct progress 1814{ 1815 enum stream_flags s_flags; 1816 enum stream_q_flags q_flags; 1817}; 1818 1819 1820static struct progress 1821stream_progress (const struct lsquic_stream *stream) 1822{ 1823 return (struct progress) { 1824 .s_flags = stream->stream_flags 1825 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 1826 .q_flags = stream->sm_qflags 1827 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 1828 }; 1829} 1830 1831 1832static int 1833progress_eq (struct progress a, struct progress b) 1834{ 1835 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 1836} 1837 1838 1839static void 1840stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1841{ 1842 unsigned no_progress_count, no_progress_limit; 1843 struct progress progress; 1844 uint64_t size; 1845 1846 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1847 1848 no_progress_count = 0; 1849 while ((stream->sm_qflags & SMQF_WANT_READ) 1850 && lsquic_stream_readable(stream)) 1851 { 1852 progress = stream_progress(stream); 1853 size = stream->read_offset; 1854 1855 stream->stream_if->on_read(stream, stream->st_ctx); 1856 1857 if (no_progress_limit && size == stream->read_offset && 1858 progress_eq(progress, stream_progress(stream))) 1859 { 1860 ++no_progress_count; 1861 if (no_progress_count >= no_progress_limit) 1862 { 1863 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1864 "progress) in user code reading from stream", 1865 no_progress_count, 1866 no_progress_count == 1 ? "" : "s"); 1867 break; 1868 } 1869 } 1870 else 1871 no_progress_count = 0; 1872 } 1873} 1874 1875 1876static void 1877stream_hblock_sent (struct lsquic_stream *stream) 1878{ 1879 int want_write; 1880 1881 LSQ_DEBUG("header block has been sent: restore default behavior"); 1882 stream->sm_send_headers_state = SSHS_BEGIN; 1883 stream->sm_write_avail = stream_write_avail_with_frames; 1884 1885 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1886 if (want_write != stream->sm_saved_want_write) 1887 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 1888 1889 if (stream->stream_flags & STREAM_DELAYED_SW) 1890 { 1891 LSQ_DEBUG("performing delayed shutdown write"); 1892 stream->stream_flags &= ~STREAM_DELAYED_SW; 1893 stream_shutdown_write(stream); 1894 maybe_schedule_call_on_close(stream); 1895 maybe_finish_stream(stream); 1896 maybe_conn_to_tickable_if_writeable(stream, 1); 1897 } 1898} 1899 1900 1901static void 1902on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 1903{ 1904 ssize_t nw; 1905 1906 nw = stream_write_buf(stream, 1907 stream->sm_header_block + stream->sm_hblock_off, 1908 stream->sm_hblock_sz - stream->sm_hblock_off); 1909 if (nw > 0) 1910 { 1911 stream->sm_hblock_off += nw; 1912 if (stream->sm_hblock_off == stream->sm_hblock_sz) 1913 { 1914 stream->stream_flags |= STREAM_HEADERS_SENT; 1915 free(stream->sm_header_block); 1916 stream->sm_header_block = NULL; 1917 stream->sm_hblock_sz = 0; 1918 stream_hblock_sent(stream); 1919 LSQ_DEBUG("header block written out successfully"); 1920 /* TODO: if there was eos, do something else */ 1921 if (stream->sm_qflags & SMQF_WANT_WRITE) 1922 stream->stream_if->on_write(stream, h); 1923 } 1924 else 1925 { 1926 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 1927 nw); 1928 } 1929 } 1930 else if (nw < 0) 1931 { 1932 /* XXX What should happen if we hit an error? TODO */ 1933 } 1934} 1935 1936 1937static void 1938(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 1939 lsquic_stream_ctx_t *) 1940{ 1941 if (0 == (stream->stream_flags & STREAM_PUSHING) 1942 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 1943 /* Common case */ 1944 return stream->stream_if->on_write; 1945 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 1946 return on_write_header_wrapper; 1947 else 1948 { 1949 assert(stream->stream_flags & STREAM_PUSHING); 1950 if (stream_is_pushing_promise(stream)) 1951 return on_write_pp_wrapper; 1952 else if (stream->sm_dup_push_off < stream->sm_dup_push_len) 1953 return on_write_dp_wrapper; 1954 else 1955 return stream->stream_if->on_write; 1956 } 1957} 1958 1959 1960static void 1961stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1962{ 1963 unsigned no_progress_count, no_progress_limit; 1964 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1965 struct progress progress; 1966 1967 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1968 1969 no_progress_count = 0; 1970 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1971 while ((stream->sm_qflags & SMQF_WANT_WRITE) 1972 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 1973 && lsquic_stream_write_avail(stream)) 1974 { 1975 progress = stream_progress(stream); 1976 1977 on_write = select_on_write(stream); 1978 on_write(stream, stream->st_ctx); 1979 1980 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 1981 { 1982 ++no_progress_count; 1983 if (no_progress_count >= no_progress_limit) 1984 { 1985 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1986 "progress) in user code writing to stream", 1987 no_progress_count, 1988 no_progress_count == 1 ? "" : "s"); 1989 break; 1990 } 1991 } 1992 else 1993 no_progress_count = 0; 1994 } 1995} 1996 1997 1998static void 1999stream_dispatch_read_events_once (lsquic_stream_t *stream) 2000{ 2001 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 2002 { 2003 stream->stream_if->on_read(stream, stream->st_ctx); 2004 } 2005} 2006 2007 2008uint64_t 2009lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 2010{ 2011 size_t frames_sizes; 2012 2013 frames_sizes = active_hq_frame_sizes(stream); 2014 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 2015} 2016 2017 2018static void 2019maybe_mark_as_blocked (lsquic_stream_t *stream) 2020{ 2021 struct lsquic_conn_cap *cc; 2022 uint64_t used; 2023 2024 used = lsquic_stream_combined_send_off(stream); 2025 if (stream->max_send_off == used) 2026 { 2027 if (stream->blocked_off < stream->max_send_off) 2028 { 2029 stream->blocked_off = used; 2030 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 2031 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2032 next_send_stream); 2033 stream->sm_qflags |= SMQF_SEND_BLOCKED; 2034 LSQ_DEBUG("marked stream-blocked at stream offset " 2035 "%"PRIu64, stream->blocked_off); 2036 } 2037 else 2038 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 2039 " has been, or is about to be, sent", stream->blocked_off); 2040 } 2041 2042 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 2043 && (cc = &stream->conn_pub->conn_cap, 2044 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 2045 { 2046 if (cc->cc_blocked < cc->cc_max) 2047 { 2048 cc->cc_blocked = cc->cc_max; 2049 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 2050 LSQ_DEBUG("marked connection-blocked at connection offset " 2051 "%"PRIu64, cc->cc_max); 2052 } 2053 else 2054 LSQ_DEBUG("stream has already been marked connection-blocked " 2055 "at offset %"PRIu64, cc->cc_blocked); 2056 } 2057} 2058 2059 2060void 2061lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 2062{ 2063 assert(stream->sm_qflags & SMQF_WANT_READ); 2064 2065 if (stream->sm_bflags & SMBF_RW_ONCE) 2066 stream_dispatch_read_events_once(stream); 2067 else 2068 stream_dispatch_read_events_loop(stream); 2069} 2070 2071 2072void 2073lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 2074{ 2075 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2076 int progress; 2077 uint64_t tosend_off; 2078 unsigned short n_buffered; 2079 enum stream_q_flags q_flags; 2080 2081 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 2082 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 2083 tosend_off = stream->tosend_off; 2084 n_buffered = stream->sm_n_buffered; 2085 2086 if (stream->sm_qflags & SMQF_WANT_FLUSH) 2087 (void) stream_flush(stream); 2088 2089 if (stream->sm_bflags & SMBF_RW_ONCE) 2090 { 2091 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2092 && lsquic_stream_write_avail(stream)) 2093 { 2094 on_write = select_on_write(stream); 2095 on_write(stream, stream->st_ctx); 2096 } 2097 } 2098 else 2099 stream_dispatch_write_events_loop(stream); 2100 2101 /* Progress means either flags or offsets changed: */ 2102 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2103 stream->tosend_off == tosend_off && 2104 stream->sm_n_buffered == n_buffered); 2105 2106 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2107 { 2108 if (progress) 2109 { /* Move the stream to the end of the list to ensure fairness. */ 2110 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2111 next_write_stream); 2112 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2113 next_write_stream); 2114 } 2115 } 2116} 2117 2118 2119static size_t 2120inner_reader_empty_size (void *ctx) 2121{ 2122 return 0; 2123} 2124 2125 2126static size_t 2127inner_reader_empty_read (void *ctx, void *buf, size_t count) 2128{ 2129 return 0; 2130} 2131 2132 2133static int 2134stream_flush (lsquic_stream_t *stream) 2135{ 2136 struct lsquic_reader empty_reader; 2137 ssize_t nw; 2138 2139 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2140 assert(stream->sm_n_buffered > 0 || 2141 /* Flushing is also used to packetize standalone FIN: */ 2142 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2143 == STREAM_U_WRITE_DONE)); 2144 2145 empty_reader.lsqr_size = inner_reader_empty_size; 2146 empty_reader.lsqr_read = inner_reader_empty_read; 2147 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2148 nw = stream_write_to_packets(stream, &empty_reader, 0); 2149 2150 if (nw >= 0) 2151 { 2152 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2153 return 0; 2154 } 2155 else 2156 return -1; 2157} 2158 2159 2160static int 2161stream_flush_nocheck (lsquic_stream_t *stream) 2162{ 2163 size_t frames; 2164 2165 frames = active_hq_frame_sizes(stream); 2166 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2167 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2168 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2169 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2170 2171 return stream_flush(stream); 2172} 2173 2174 2175int 2176lsquic_stream_flush (lsquic_stream_t *stream) 2177{ 2178 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2179 { 2180 LSQ_DEBUG("cannot flush closed stream"); 2181 errno = EBADF; 2182 return -1; 2183 } 2184 2185 if (0 == stream->sm_n_buffered) 2186 { 2187 LSQ_DEBUG("flushing 0 bytes: noop"); 2188 return 0; 2189 } 2190 2191 return stream_flush_nocheck(stream); 2192} 2193 2194 2195static size_t 2196stream_get_n_allowed (const struct lsquic_stream *stream) 2197{ 2198 if (stream->sm_n_allocated) 2199 return stream->sm_n_allocated; 2200 else 2201 return stream->conn_pub->path->np_pack_size; 2202} 2203 2204 2205/* The flush threshold is the maximum size of stream data that can be sent 2206 * in a full packet. 2207 */ 2208#ifdef NDEBUG 2209static 2210#endif 2211 size_t 2212lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2213 unsigned data_sz) 2214{ 2215 enum packet_out_flags flags; 2216 enum packno_bits bits; 2217 size_t packet_header_sz, stream_header_sz, tag_len; 2218 size_t threshold; 2219 2220 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 2221 flags = bits << POBIT_SHIFT; 2222 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2223 flags |= PO_CONN_ID; 2224 if (stream_is_hsk(stream)) 2225 flags |= PO_LONGHEAD; 2226 2227 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2228 stream->conn_pub->path->np_dcid.len); 2229 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2230 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2231 2232 threshold = stream_get_n_allowed(stream) - tag_len 2233 - packet_header_sz - stream_header_sz; 2234 return threshold; 2235} 2236 2237 2238#define COMMON_WRITE_CHECKS() do { \ 2239 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2240 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2241 { \ 2242 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2243 { \ 2244 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2245 return 0; \ 2246 } \ 2247 else \ 2248 { \ 2249 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2250 "headers"); \ 2251 errno = EILSEQ; \ 2252 return -1; \ 2253 } \ 2254 } \ 2255 if (lsquic_stream_is_reset(stream)) \ 2256 { \ 2257 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2258 errno = ECONNRESET; \ 2259 return -1; \ 2260 } \ 2261 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2262 { \ 2263 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2264 "writing"); \ 2265 errno = EBADF; \ 2266 return -1; \ 2267 } \ 2268} while (0) 2269 2270 2271struct frame_gen_ctx 2272{ 2273 lsquic_stream_t *fgc_stream; 2274 struct lsquic_reader *fgc_reader; 2275 /* We keep our own count of how many bytes were read from reader because 2276 * some readers are external. The external caller does not have to rely 2277 * on our count, but it can. 2278 */ 2279 size_t fgc_nread_from_reader; 2280 size_t (*fgc_size) (void *ctx); 2281 int (*fgc_fin) (void *ctx); 2282 gsf_read_f fgc_read; 2283}; 2284 2285 2286static size_t 2287frame_std_gen_size (void *ctx) 2288{ 2289 struct frame_gen_ctx *fg_ctx = ctx; 2290 size_t available, remaining; 2291 2292 /* Make sure we are not writing past available size: */ 2293 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2294 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2295 if (available < remaining) 2296 remaining = available; 2297 2298 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2299} 2300 2301 2302static size_t 2303stream_hq_frame_size (const struct stream_hq_frame *shf) 2304{ 2305 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2306 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2307 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2308 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2309 else 2310 { 2311 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2312 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2313 return 0; 2314 } 2315} 2316 2317 2318static size_t 2319active_hq_frame_sizes (const struct lsquic_stream *stream) 2320{ 2321 const struct stream_hq_frame *shf; 2322 size_t size; 2323 2324 size = 0; 2325 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2326 == (SMBF_IETF|SMBF_USE_HEADERS)) 2327 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2328 if (!(shf->shf_flags & SHF_WRITTEN)) 2329 size += stream_hq_frame_size(shf); 2330 2331 return size; 2332} 2333 2334 2335static uint64_t 2336stream_hq_frame_end (const struct stream_hq_frame *shf) 2337{ 2338 if (shf->shf_flags & SHF_FIXED_SIZE) 2339 return shf->shf_off + shf->shf_frame_size; 2340 else if (shf->shf_flags & SHF_TWO_BYTES) 2341 return shf->shf_off + ((1 << 14) - 1); 2342 else 2343 return shf->shf_off + ((1 << 6) - 1); 2344} 2345 2346 2347static int 2348frame_in_stream (const struct lsquic_stream *stream, 2349 const struct stream_hq_frame *shf) 2350{ 2351 return shf >= stream->sm_hq_frame_arr 2352 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2353 / sizeof(stream->sm_hq_frame_arr[0]) 2354 ; 2355} 2356 2357 2358static void 2359stream_hq_frame_put (struct lsquic_stream *stream, 2360 struct stream_hq_frame *shf) 2361{ 2362 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2363 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2364 if (frame_in_stream(stream, shf)) 2365 memset(shf, 0, sizeof(*shf)); 2366 else 2367 lsquic_malo_put(shf); 2368} 2369 2370 2371static void 2372stream_hq_frame_close (struct lsquic_stream *stream, 2373 struct stream_hq_frame *shf) 2374{ 2375 unsigned bits; 2376 2377 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2378 " (actual offset %"PRIu64")", shf->shf_frame_type, 2379 stream->sm_payload, stream->tosend_off); 2380 assert(shf->shf_flags & SHF_ACTIVE); 2381 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2382 { 2383 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2384 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2385 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2386 bits, 1 << bits); 2387 } 2388 stream_hq_frame_put(stream, shf); 2389} 2390 2391 2392static size_t 2393frame_hq_gen_size (void *ctx) 2394{ 2395 struct frame_gen_ctx *fg_ctx = ctx; 2396 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2397 size_t available, remaining, frames; 2398 const struct stream_hq_frame *shf; 2399 2400 frames = 0; 2401 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2402 if (shf->shf_off >= stream->sm_payload) 2403 frames += stream_hq_frame_size(shf); 2404 2405 /* Make sure we are not writing past available size: */ 2406 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2407 available = lsquic_stream_write_avail(stream); 2408 if (available < remaining) 2409 remaining = available; 2410 2411 return remaining + stream->sm_n_buffered + frames; 2412} 2413 2414 2415static int 2416frame_std_gen_fin (void *ctx) 2417{ 2418 struct frame_gen_ctx *fg_ctx = ctx; 2419 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2420 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2421 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2422 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2423 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2424} 2425 2426 2427static void 2428incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2429{ 2430 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2431 { 2432 stream->conn_pub->conn_cap.cc_sent += incr; 2433 assert(stream->conn_pub->conn_cap.cc_sent 2434 <= stream->conn_pub->conn_cap.cc_max); 2435 } 2436} 2437 2438 2439void 2440incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2441{ 2442 stream->sm_payload += incr; 2443 stream->tosend_off += incr; 2444 assert(stream->tosend_off <= stream->max_send_off); 2445} 2446 2447 2448static size_t 2449frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2450{ 2451 struct frame_gen_ctx *fg_ctx = ctx; 2452 unsigned char *p = begin_buf; 2453 unsigned char *const end = p + len; 2454 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2455 size_t n_written, available, n_to_write; 2456 2457 if (stream->sm_n_buffered > 0) 2458 { 2459 if (len <= stream->sm_n_buffered) 2460 { 2461 memcpy(p, stream->sm_buf, len); 2462 memmove(stream->sm_buf, stream->sm_buf + len, 2463 stream->sm_n_buffered - len); 2464 stream->sm_n_buffered -= len; 2465 if (0 == stream->sm_n_buffered) 2466 maybe_resize_stream_buffer(stream); 2467 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2468 incr_sm_payload(stream, len); 2469 *fin = fg_ctx->fgc_fin(fg_ctx); 2470 return len; 2471 } 2472 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2473 p += stream->sm_n_buffered; 2474 stream->sm_n_buffered = 0; 2475 maybe_resize_stream_buffer(stream); 2476 } 2477 2478 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2479 n_to_write = end - p; 2480 if (n_to_write > available) 2481 n_to_write = available; 2482 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2483 n_to_write); 2484 p += n_written; 2485 fg_ctx->fgc_nread_from_reader += n_written; 2486 *fin = fg_ctx->fgc_fin(fg_ctx); 2487 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2488 incr_conn_cap(stream, n_written); 2489 return p - (const unsigned char *) begin_buf; 2490} 2491 2492 2493static struct stream_hq_frame * 2494find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2495{ 2496 struct stream_hq_frame *shf; 2497 2498 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2499 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2500 return shf; 2501 2502 return NULL; 2503} 2504 2505 2506static struct stream_hq_frame * 2507find_cur_hq_frame (const struct lsquic_stream *stream) 2508{ 2509 return find_hq_frame(stream, stream->sm_payload); 2510} 2511 2512 2513static struct stream_hq_frame * 2514open_hq_frame (struct lsquic_stream *stream) 2515{ 2516 struct stream_hq_frame *shf; 2517 2518 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2519 + sizeof(stream->sm_hq_frame_arr) 2520 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2521 if (!(shf->shf_flags & SHF_ACTIVE)) 2522 goto found; 2523 2524 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2525 if (!shf) 2526 { 2527 LSQ_WARN("cannot allocate HQ frame"); 2528 return NULL; 2529 } 2530 memset(shf, 0, sizeof(*shf)); 2531 2532 found: 2533 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2534 shf->shf_flags = SHF_ACTIVE; 2535 return shf; 2536} 2537 2538 2539/* Returns index of the new frame */ 2540static struct stream_hq_frame * 2541stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2542 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2543{ 2544 struct stream_hq_frame *shf; 2545 2546 shf = open_hq_frame(stream); 2547 if (!shf) 2548 { 2549 LSQ_WARN("could not open HQ frame"); 2550 return NULL; 2551 } 2552 2553 shf->shf_off = off; 2554 shf->shf_flags |= flags; 2555 shf->shf_frame_type = frame_type; 2556 if (shf->shf_flags & SHF_FIXED_SIZE) 2557 { 2558 shf->shf_frame_size = size; 2559 LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset " 2560 "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size); 2561 } 2562 else 2563 { 2564 shf->shf_frame_ptr = NULL; 2565 if (size >= (1 << 6)) 2566 shf->shf_flags |= SHF_TWO_BYTES; 2567 LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset " 2568 "%"PRIu64, shf->shf_frame_type, shf->shf_off); 2569 } 2570 2571 return shf; 2572} 2573 2574 2575static size_t 2576frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2577{ 2578 struct frame_gen_ctx *fg_ctx = ctx; 2579 unsigned char *p = begin_buf; 2580 unsigned char *const end = p + len; 2581 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2582 struct stream_hq_frame *shf; 2583 size_t nw, frame_sz, avail, rem; 2584 unsigned bits; 2585 2586 while (p < end) 2587 { 2588 shf = find_cur_hq_frame(stream); 2589 if (shf) 2590 LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64, 2591 shf->shf_frame_type, shf->shf_off); 2592 else 2593 { 2594 rem = frame_std_gen_size(ctx); 2595 if (rem) 2596 { 2597 if (rem > ((1 << 14) - 1)) 2598 rem = (1 << 14) - 1; 2599 shf = stream_activate_hq_frame(stream, 2600 stream->sm_payload, HQFT_DATA, 0, rem); 2601 if (shf) 2602 goto insert; 2603 else 2604 { 2605 /* TODO: abort connection? Handle failure somehow */ 2606 break; 2607 } 2608 } 2609 else 2610 break; 2611 } 2612 if (shf->shf_off == stream->sm_payload 2613 && !(shf->shf_flags & SHF_WRITTEN)) 2614 { 2615 insert: 2616 frame_sz = stream_hq_frame_size(shf); 2617 if (frame_sz > (uintptr_t) (end - p)) 2618 { 2619 stream_hq_frame_put(stream, shf); 2620 break; 2621 } 2622 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2623 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2624 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2625 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2626 { 2627 shf->shf_frame_ptr = p; 2628 memset(p, 0, frame_sz); 2629 p += frame_sz; 2630 } 2631 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2632 == SHF_FIXED_SIZE) 2633 { 2634 *p++ = shf->shf_frame_type; 2635 bits = vint_val2bits(shf->shf_frame_size); 2636 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2637 p += 1 << bits; 2638 } 2639 else 2640 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2641 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2642 if (!(shf->shf_flags & SHF_CC_PAID)) 2643 { 2644 incr_conn_cap(stream, frame_sz); 2645 shf->shf_flags |= SHF_CC_PAID; 2646 } 2647 shf->shf_flags |= SHF_WRITTEN; 2648 stream->tosend_off += frame_sz; 2649 assert(stream->tosend_off <= stream->max_send_off); 2650 } 2651 else 2652 { 2653 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 2654 len = stream_hq_frame_end(shf) - stream->sm_payload; 2655 assert(len); 2656 if (len > (unsigned) (end - p)) 2657 len = end - p; 2658 if (len > avail) 2659 len = avail; 2660 if (!len) 2661 break; 2662 nw = frame_std_gen_read(ctx, p, len, fin); 2663 p += nw; 2664 if (nw < len) 2665 break; 2666 if (stream_hq_frame_end(shf) == stream->sm_payload) 2667 stream_hq_frame_close(stream, shf); 2668 } 2669 } 2670 2671 return p - (unsigned char *) begin_buf; 2672} 2673 2674 2675static size_t 2676crypto_frame_gen_read (void *ctx, void *buf, size_t len) 2677{ 2678 int fin_ignored; 2679 2680 return frame_std_gen_read(ctx, buf, len, &fin_ignored); 2681} 2682 2683 2684static void 2685check_flush_threshold (lsquic_stream_t *stream) 2686{ 2687 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 2688 stream->tosend_off >= stream->sm_flush_to) 2689 { 2690 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 2691 stream->sm_flush_to); 2692 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 2693 } 2694} 2695 2696 2697static int 2698write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 2699 struct lsquic_packet_out *packet_out) 2700{ 2701 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2702 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2703 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2704 unsigned off; 2705 int len, s; 2706 2707#if LSQUIC_CONN_STATS 2708 const uint64_t begin_off = stream->tosend_off; 2709#endif 2710 off = packet_out->po_data_sz; 2711 len = pf->pf_gen_stream_frame( 2712 packet_out->po_data + packet_out->po_data_sz, 2713 lsquic_packet_out_avail(packet_out), stream->id, 2714 stream->tosend_off, 2715 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 2716 if (len < 0) 2717 return len; 2718 2719#if LSQUIC_CONN_STATS 2720 stream->conn_pub->conn_stats->out.stream_frames += 1; 2721 stream->conn_pub->conn_stats->out.stream_data_sz 2722 += stream->tosend_off - begin_off; 2723#endif 2724 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 2725 packet_out->po_data + packet_out->po_data_sz, len); 2726 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2727 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 2728 if (0 == lsquic_packet_out_avail(packet_out)) 2729 packet_out->po_flags |= PO_STREAM_END; 2730 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2731 stream, QUIC_FRAME_STREAM, off, len); 2732 if (s != 0) 2733 { 2734 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 2735 return -1; 2736 } 2737 2738 check_flush_threshold(stream); 2739 return len; 2740} 2741 2742 2743static enum swtp_status 2744stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 2745{ 2746 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2747 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2748 struct lsquic_packet_out *packet_out; 2749 int len; 2750 2751 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 2752 stream->conn_pub->path); 2753 if (!packet_out) 2754 return SWTP_STOP; 2755 packet_out->po_header_type = stream->tosend_off == 0 2756 ? HETY_INITIAL : HETY_HANDSHAKE; 2757 2758 len = write_stream_frame(fg_ctx, size, packet_out); 2759 2760 if (len > 0) 2761 { 2762 packet_out->po_flags |= PO_HELLO; 2763 lsquic_packet_out_zero_pad(packet_out); 2764 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 2765 return SWTP_OK; 2766 } 2767 else 2768 return SWTP_ERROR; 2769} 2770 2771 2772static enum swtp_status 2773stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 2774{ 2775 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2776 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2777 unsigned stream_header_sz, need_at_least; 2778 struct lsquic_packet_out *packet_out; 2779 struct lsquic_stream *headers_stream; 2780 int len; 2781 2782 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 2783 == STREAM_HEADERS_SENT) 2784 { 2785 if (stream->sm_bflags & SMBF_IETF) 2786 { 2787 if (stream->stream_flags & STREAM_ENCODER_DEP) 2788 headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out; 2789 else 2790 headers_stream = NULL; 2791 } 2792 else 2793 headers_stream = 2794 lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 2795 if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream)) 2796 { 2797 LSQ_DEBUG("flushing headers stream before packetizing stream data"); 2798 (void) lsquic_stream_flush(headers_stream); 2799 } 2800 /* If there is nothing to flush, some other stream must have flushed it: 2801 * this means our headers are flushed. Either way, only do this once. 2802 */ 2803 stream->stream_flags |= STREAM_HDRS_FLUSHED; 2804 } 2805 2806 stream_header_sz = stream->sm_frame_header_sz(stream, size); 2807 need_at_least = stream_header_sz; 2808 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2809 == (SMBF_IETF|SMBF_USE_HEADERS)) 2810 { 2811 if (size > 0) 2812 need_at_least += 3; /* Enough room for HTTP/3 frame */ 2813 } 2814 else 2815 need_at_least += size > 0; 2816 get_packet: 2817 packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl, 2818 need_at_least, stream->conn_pub->path, stream); 2819 if (packet_out) 2820 { 2821 len = write_stream_frame(fg_ctx, size, packet_out); 2822 if (len > 0) 2823 return SWTP_OK; 2824 assert(len < 0); 2825 if (-len > (int) need_at_least) 2826 { 2827 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 2828 "%u bytes, will try again", -len, need_at_least); 2829 need_at_least = -len; 2830 goto get_packet; 2831 } 2832 return SWTP_ERROR; 2833 } 2834 else 2835 return SWTP_STOP; 2836} 2837 2838 2839static enum swtp_status 2840stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 2841{ 2842 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2843 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2844 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2845 unsigned crypto_header_sz, need_at_least; 2846 struct lsquic_packet_out *packet_out; 2847 unsigned short off; 2848 const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ]; 2849 int len, s; 2850 2851 assert(size > 0); 2852 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 2853 need_at_least = crypto_header_sz + 1; 2854 2855 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 2856 need_at_least, pns, stream->conn_pub->path); 2857 if (!packet_out) 2858 return SWTP_STOP; 2859 2860 off = packet_out->po_data_sz; 2861 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 2862 lsquic_packet_out_avail(packet_out), stream->tosend_off, 2863 size, crypto_frame_gen_read, fg_ctx); 2864 if (len < 0) 2865 return len; 2866 2867 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 2868 packet_out->po_data + packet_out->po_data_sz, len); 2869 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2870 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 2871 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2872 stream, QUIC_FRAME_CRYPTO, off, len); 2873 if (s != 0) 2874 { 2875 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 2876 return -1; 2877 } 2878 2879 packet_out->po_flags |= PO_HELLO; 2880 2881 check_flush_threshold(stream); 2882 return SWTP_OK; 2883} 2884 2885 2886static void 2887abort_connection (struct lsquic_stream *stream) 2888{ 2889 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 2890 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 2891 next_service_stream); 2892 stream->sm_qflags |= SMQF_ABORT_CONN; 2893 LSQ_WARN("connection will be aborted"); 2894 maybe_conn_to_tickable(stream); 2895} 2896 2897 2898static void 2899maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 2900{ 2901 struct stream_hq_frame *shf; 2902 uint64_t size; 2903 unsigned bits; 2904 2905 shf = find_cur_hq_frame(stream); 2906 if (!shf) 2907 return; 2908 2909 if (shf->shf_flags & SHF_FIXED_SIZE) 2910 { 2911 if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload) 2912 stream_hq_frame_put(stream, shf); 2913 return; 2914 } 2915 2916 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2917 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 2918 if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr) 2919 { 2920 if (0 == stream->sm_n_buffered) 2921 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 2922 shf->shf_frame_type, size); 2923 else 2924 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 2925 shf->shf_frame_type, size); 2926 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2927 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 2928 if (0 == stream->sm_n_buffered) 2929 stream_hq_frame_put(stream, shf); 2930 else 2931 { 2932 shf->shf_frame_size = size; 2933 shf->shf_flags |= SHF_FIXED_SIZE; 2934 } 2935 } 2936 else if (!shf->shf_frame_ptr) 2937 { 2938 LSQ_WARN("dangling HTTP/3 frame"); 2939 stream->conn_pub->lconn->cn_if->ci_internal_error( 2940 stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64, 2941 stream->id); 2942 stream_hq_frame_put(stream, shf); 2943 } 2944 else if (!size) 2945 { 2946 assert(!shf->shf_frame_ptr); 2947 LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")", 2948 shf->shf_frame_type, shf->shf_off); 2949 stream_hq_frame_put(stream, shf); 2950 } 2951 else 2952 { 2953 assert(stream->sm_n_buffered); 2954 LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size); 2955 /* TODO: abort connection */ 2956 stream_hq_frame_put(stream, shf); 2957 } 2958} 2959 2960 2961static ssize_t 2962stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 2963 size_t thresh) 2964{ 2965 size_t size; 2966 ssize_t nw; 2967 unsigned seen_ok; 2968 int use_framing; 2969 struct frame_gen_ctx fg_ctx = { 2970 .fgc_stream = stream, 2971 .fgc_reader = reader, 2972 .fgc_nread_from_reader = 0, 2973 }; 2974 2975 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2976 == (SMBF_IETF|SMBF_USE_HEADERS); 2977 if (use_framing) 2978 { 2979 fg_ctx.fgc_size = frame_hq_gen_size; 2980 fg_ctx.fgc_read = frame_hq_gen_read; 2981 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 2982 } 2983 else 2984 { 2985 fg_ctx.fgc_size = frame_std_gen_size; 2986 fg_ctx.fgc_read = frame_std_gen_read; 2987 fg_ctx.fgc_fin = frame_std_gen_fin; 2988 } 2989 2990 seen_ok = 0; 2991 while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0) 2992 || fg_ctx.fgc_fin(&fg_ctx)) 2993 { 2994 switch (stream->sm_write_to_packet(&fg_ctx, size)) 2995 { 2996 case SWTP_OK: 2997 if (!seen_ok++) 2998 maybe_conn_to_tickable_if_writeable(stream, 0); 2999 if (fg_ctx.fgc_fin(&fg_ctx)) 3000 { 3001 if (use_framing && seen_ok) 3002 maybe_close_varsize_hq_frame(stream); 3003 stream->stream_flags |= STREAM_FIN_SENT; 3004 goto end; 3005 } 3006 else 3007 break; 3008 case SWTP_STOP: 3009 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3010 if (use_framing && seen_ok) 3011 maybe_close_varsize_hq_frame(stream); 3012 goto end; 3013 default: 3014 abort_connection(stream); 3015 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 3016 return -1; 3017 } 3018 } 3019 3020 if (use_framing && seen_ok) 3021 maybe_close_varsize_hq_frame(stream); 3022 3023 if (thresh) 3024 { 3025 assert(size < thresh); 3026 assert(size >= stream->sm_n_buffered); 3027 size -= stream->sm_n_buffered; 3028 if (size > 0) 3029 { 3030 nw = save_to_buffer(stream, reader, size); 3031 if (nw < 0) 3032 return -1; 3033 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 3034 } 3035 } 3036 else 3037 { 3038 /* We count flushed data towards both stream and connection limits, 3039 * so we should have been able to packetize all of it: 3040 */ 3041 assert(0 == stream->sm_n_buffered); 3042 assert(size == 0); 3043 } 3044 3045 maybe_mark_as_blocked(stream); 3046 3047 end: 3048 return fg_ctx.fgc_nread_from_reader; 3049} 3050 3051 3052/* Perform an implicit flush when we hit connection limit while buffering 3053 * data. This is to prevent a (theoretical) stall: 3054 * 3055 * Imagine a number of streams, all of which buffered some data. The buffered 3056 * data is up to connection cap, which means no further writes are possible. 3057 * None of them flushes, which means that data is not sent and connection 3058 * WINDOW_UPDATE frame never arrives from peer. Stall. 3059 */ 3060static int 3061maybe_flush_stream (struct lsquic_stream *stream) 3062{ 3063 if (stream->sm_n_buffered > 0 3064 && (stream->sm_bflags & SMBF_CONN_LIMITED) 3065 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 3066 return stream_flush_nocheck(stream); 3067 else 3068 return 0; 3069} 3070 3071 3072static int 3073stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 3074 unsigned len) 3075{ 3076 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 3077 && cur_off - shf->shf_off < (1 << 6) 3078 && cur_off - shf->shf_off + len >= (1 << 6) 3079 ; 3080} 3081 3082 3083/* Update currently buffered HQ frame or create a new one, if possible. 3084 * Return update length to be buffered. If a HQ frame cannot be 3085 * buffered due to size, 0 is returned, thereby preventing both HQ frame 3086 * creation and buffering. 3087 */ 3088static size_t 3089update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 3090 size_t avail) 3091{ 3092 struct stream_hq_frame *shf; 3093 uint64_t cur_off, end; 3094 size_t frame_sz; 3095 unsigned extendable; 3096 3097 cur_off = stream->sm_payload + stream->sm_n_buffered; 3098 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3099 if (shf->shf_off <= cur_off) 3100 { 3101 end = stream_hq_frame_end(shf); 3102 extendable = stream_hq_frame_extendable(shf, cur_off, len); 3103 if (cur_off < end + extendable) 3104 break; 3105 } 3106 3107 if (shf) 3108 { 3109 if (len > end + extendable - cur_off) 3110 len = end + extendable - cur_off; 3111 frame_sz = stream_hq_frame_size(shf); 3112 } 3113 else 3114 { 3115 assert(avail >= 3); 3116 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 3117 /* XXX malloc can fail */ 3118 if (len > stream_hq_frame_end(shf) - cur_off) 3119 len = stream_hq_frame_end(shf) - cur_off; 3120 extendable = 0; 3121 frame_sz = stream_hq_frame_size(shf); 3122 if (avail < frame_sz) 3123 return 0; 3124 avail -= frame_sz; 3125 } 3126 3127 if (!(shf->shf_flags & SHF_CC_PAID)) 3128 { 3129 incr_conn_cap(stream, frame_sz); 3130 shf->shf_flags |= SHF_CC_PAID; 3131 } 3132 if (extendable) 3133 { 3134 shf->shf_flags |= SHF_TWO_BYTES; 3135 incr_conn_cap(stream, 1); 3136 avail -= 1; 3137 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3138 && shf->shf_off <= stream->sm_payload 3139 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3140 stream->sm_flush_to += 1; 3141 } 3142 3143 if (len <= avail) 3144 return len; 3145 else 3146 return avail; 3147} 3148 3149 3150static ssize_t 3151save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3152 size_t len) 3153{ 3154 size_t avail, n_written, n_allowed; 3155 3156 avail = lsquic_stream_write_avail(stream); 3157 if (avail < len) 3158 len = avail; 3159 if (len == 0) 3160 { 3161 LSQ_DEBUG("zero-byte write (avail: %zu)", avail); 3162 return 0; 3163 } 3164 3165 n_allowed = stream_get_n_allowed(stream); 3166 assert(stream->sm_n_buffered + len <= n_allowed); 3167 3168 if (!stream->sm_buf) 3169 { 3170 stream->sm_buf = malloc(n_allowed); 3171 if (!stream->sm_buf) 3172 return -1; 3173 stream->sm_n_allocated = n_allowed; 3174 } 3175 3176 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3177 == (SMBF_IETF|SMBF_USE_HEADERS)) 3178 len = update_buffered_hq_frames(stream, len, avail); 3179 3180 n_written = reader->lsqr_read(reader->lsqr_ctx, 3181 stream->sm_buf + stream->sm_n_buffered, len); 3182 stream->sm_n_buffered += n_written; 3183 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3184 incr_conn_cap(stream, n_written); 3185 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3186 n_written, stream->sm_n_buffered); 3187 if (0 != maybe_flush_stream(stream)) 3188 return -1; 3189 return n_written; 3190} 3191 3192 3193static ssize_t 3194stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 3195{ 3196 const struct stream_hq_frame *shf; 3197 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3198 ssize_t nw; 3199 3200 len = reader->lsqr_size(reader->lsqr_ctx); 3201 if (len == 0) 3202 return 0; 3203 3204 frames = 0; 3205 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3206 == (SMBF_IETF|SMBF_USE_HEADERS)) 3207 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3208 if (shf->shf_off >= stream->sm_payload) 3209 frames += stream_hq_frame_size(shf); 3210 total_len = len + frames + stream->sm_n_buffered; 3211 thresh = lsquic_stream_flush_threshold(stream, total_len); 3212 n_allowed = stream_get_n_allowed(stream); 3213 if (total_len <= n_allowed && total_len < thresh) 3214 { 3215 nwritten = 0; 3216 do 3217 { 3218 nw = save_to_buffer(stream, reader, len - nwritten); 3219 if (nw > 0) 3220 nwritten += (size_t) nw; 3221 else if (nw == 0) 3222 break; 3223 else 3224 return nw; 3225 } 3226 while (nwritten < len 3227 && stream->sm_n_buffered < stream->sm_n_allocated); 3228 return nwritten; 3229 } 3230 else 3231 return stream_write_to_packets(stream, reader, thresh); 3232} 3233 3234 3235ssize_t 3236lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3237{ 3238 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3239 return lsquic_stream_writev(stream, &iov, 1); 3240} 3241 3242 3243struct inner_reader_iovec { 3244 const struct iovec *iov; 3245 const struct iovec *end; 3246 unsigned cur_iovec_off; 3247}; 3248 3249 3250static size_t 3251inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3252{ 3253 struct inner_reader_iovec *const iro = ctx; 3254 unsigned char *p = buf; 3255 unsigned char *const end = p + count; 3256 unsigned n_tocopy; 3257 3258 while (iro->iov < iro->end && p < end) 3259 { 3260 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3261 if (n_tocopy > (unsigned) (end - p)) 3262 n_tocopy = end - p; 3263 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3264 n_tocopy); 3265 p += n_tocopy; 3266 iro->cur_iovec_off += n_tocopy; 3267 if (iro->iov->iov_len == iro->cur_iovec_off) 3268 { 3269 ++iro->iov; 3270 iro->cur_iovec_off = 0; 3271 } 3272 } 3273 3274 return p + count - end; 3275} 3276 3277 3278static size_t 3279inner_reader_iovec_size (void *ctx) 3280{ 3281 struct inner_reader_iovec *const iro = ctx; 3282 const struct iovec *iov; 3283 size_t size; 3284 3285 size = 0; 3286 for (iov = iro->iov; iov < iro->end; ++iov) 3287 size += iov->iov_len; 3288 3289 return size - iro->cur_iovec_off; 3290} 3291 3292 3293ssize_t 3294lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3295 int iovcnt) 3296{ 3297 COMMON_WRITE_CHECKS(); 3298 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3299 3300 struct inner_reader_iovec iro = { 3301 .iov = iov, 3302 .end = iov + iovcnt, 3303 .cur_iovec_off = 0, 3304 }; 3305 struct lsquic_reader reader = { 3306 .lsqr_read = inner_reader_iovec_read, 3307 .lsqr_size = inner_reader_iovec_size, 3308 .lsqr_ctx = &iro, 3309 }; 3310 3311 return stream_write(stream, &reader); 3312} 3313 3314 3315ssize_t 3316lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3317{ 3318 COMMON_WRITE_CHECKS(); 3319 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3320 return stream_write(stream, reader); 3321} 3322 3323 3324/* This bypasses COMMON_WRITE_CHECKS */ 3325static ssize_t 3326stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3327{ 3328 struct iovec iov = { (void *) buf, sz, }; 3329 struct inner_reader_iovec iro = { 3330 .iov = &iov, 3331 .end = &iov + 1, 3332 .cur_iovec_off = 0, 3333 }; 3334 struct lsquic_reader reader = { 3335 .lsqr_read = inner_reader_iovec_read, 3336 .lsqr_size = inner_reader_iovec_size, 3337 .lsqr_ctx = &iro, 3338 }; 3339 return stream_write(stream, &reader); 3340} 3341 3342 3343/* XXX Move this define elsewhere? */ 3344#define MAX_HEADERS_SIZE (64 * 1024) 3345 3346static int 3347send_headers_ietf (struct lsquic_stream *stream, 3348 const struct lsquic_http_headers *headers, int eos) 3349{ 3350 enum qwh_status qwh; 3351 const size_t max_prefix_size = 3352 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 3353 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 3354 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 3355 unsigned bits; 3356 ssize_t nw; 3357 unsigned char *header_block; 3358 enum lsqpack_enc_header_flags hflags; 3359 unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE]; 3360 3361 stream->stream_flags &= ~STREAM_PUSHING; 3362 stream->stream_flags |= STREAM_NOPUSH; 3363 3364 /* TODO: Optimize for the common case: write directly to sm_buf and fall 3365 * back to a larger buffer if that fails. 3366 */ 3367 prefix_sz = max_prefix_size; 3368 headers_sz = sizeof(buf) - max_prefix_size - max_push_size; 3369 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 3370 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 3371 &headers_sz, &stream->sm_hb_compl, &hflags); 3372 3373 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 3374 { 3375 if (qwh == QWH_ENOBUF) 3376 LSQ_INFO("not enough room for header block"); 3377 else 3378 LSQ_WARN("internal error encoding and sending HTTP headers"); 3379 return -1; 3380 } 3381 3382 if (hflags & LSQECH_REF_NEW_ENTRIES) 3383 stream->stream_flags |= STREAM_ENCODER_DEP; 3384 3385 if (stream->sm_promise) 3386 { 3387 assert(lsquic_stream_is_pushed(stream)); 3388 bits = vint_val2bits(stream->sm_promise->pp_id); 3389 push_sz = 1 + (1 << bits); 3390 if (!stream_activate_hq_frame(stream, 3391 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 3392 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 3393 return -1; 3394 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 3395 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 3396 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 3397 } 3398 else 3399 push_sz = 0; 3400 3401 /* Construct contiguous header block buffer including HQ framing */ 3402 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 3403 hblock_sz = push_sz + prefix_sz + headers_sz; 3404 if (!stream_activate_hq_frame(stream, 3405 stream->sm_payload + stream->sm_n_buffered + push_sz, 3406 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 3407 return -1; 3408 3409 if (qwh == QWH_FULL) 3410 { 3411 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 3412 if (lsquic_stream_write_avail(stream)) 3413 { 3414 nw = stream_write_buf(stream, header_block, hblock_sz); 3415 if (nw < 0) 3416 { 3417 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 3418 return -1; 3419 } 3420 if ((size_t) nw == hblock_sz) 3421 { 3422 stream->stream_flags |= STREAM_HEADERS_SENT; 3423 stream_hblock_sent(stream); 3424 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 3425 return 0; 3426 } 3427 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 3428 } 3429 else 3430 { 3431 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 3432 "header block", hblock_sz); 3433 nw = 0; 3434 } 3435 } 3436 else 3437 { 3438 stream->sm_send_headers_state = SSHS_ENC_SENDING; 3439 nw = 0; 3440 } 3441 3442 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 3443 stream_wantwrite(stream, 1); 3444 3445 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 3446 if (!stream->sm_header_block) 3447 { 3448 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 3449 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 3450 return -1; 3451 } 3452 memcpy(stream->sm_header_block, header_block + (size_t) nw, 3453 hblock_sz - (size_t) nw); 3454 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 3455 stream->sm_hblock_off = 0; 3456 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 3457 return 0; 3458} 3459 3460 3461static int 3462send_headers_gquic (struct lsquic_stream *stream, 3463 const struct lsquic_http_headers *headers, int eos) 3464{ 3465 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 3466 stream->id, headers, eos, lsquic_stream_priority(stream)); 3467 if (0 == s) 3468 { 3469 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 3470 stream->stream_flags |= STREAM_HEADERS_SENT; 3471 if (eos) 3472 stream->stream_flags |= STREAM_FIN_SENT; 3473 LSQ_INFO("sent headers"); 3474 } 3475 else 3476 LSQ_WARN("could not send headers: %s", strerror(errno)); 3477 return s; 3478} 3479 3480 3481int 3482lsquic_stream_send_headers (lsquic_stream_t *stream, 3483 const lsquic_http_headers_t *headers, int eos) 3484{ 3485 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3486 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 3487 { 3488 if (stream->sm_bflags & SMBF_IETF) 3489 return send_headers_ietf(stream, headers, eos); 3490 else 3491 return send_headers_gquic(stream, headers, eos); 3492 } 3493 else 3494 { 3495 LSQ_INFO("cannot send headers in this state"); 3496 errno = EBADMSG; 3497 return -1; 3498 } 3499} 3500 3501 3502void 3503lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 3504{ 3505 if (offset > stream->max_send_off) 3506 { 3507 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 3508 LSQ_DEBUG("update max send offset from 0x%"PRIX64" to " 3509 "0x%"PRIX64, stream->max_send_off, offset); 3510 stream->max_send_off = offset; 3511 } 3512 else 3513 LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old " 3514 "max send offset 0x%"PRIX64", ignoring", offset, 3515 stream->max_send_off); 3516} 3517 3518 3519/* This function is used to update offsets after handshake completes and we 3520 * learn of peer's limits from the handshake values. 3521 */ 3522int 3523lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 3524{ 3525 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 3526 if (offset > stream->max_send_off) 3527 { 3528 lsquic_stream_window_update(stream, offset); 3529 return 0; 3530 } 3531 else if (offset < stream->tosend_off) 3532 { 3533 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 3534 "data already sent on this stream (%"PRIu64" bytes)", offset, 3535 stream->tosend_off); 3536 return -1; 3537 } 3538 else 3539 { 3540 stream->max_send_off = offset; 3541 return 0; 3542 } 3543} 3544 3545 3546void 3547lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 3548{ 3549 lsquic_stream_reset_ext(stream, error_code, 1); 3550} 3551 3552 3553void 3554lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 3555 int do_close) 3556{ 3557 if ((stream->stream_flags & STREAM_RST_SENT) 3558 || (stream->sm_qflags & SMQF_SEND_RST)) 3559 { 3560 LSQ_INFO("reset already sent"); 3561 return; 3562 } 3563 3564 SM_HISTORY_APPEND(stream, SHE_RESET); 3565 3566 LSQ_INFO("reset, error code %"PRIu64, error_code); 3567 stream->error_code = error_code; 3568 3569 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 3570 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 3571 next_send_stream); 3572 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 3573 stream->sm_qflags |= SMQF_SEND_RST; 3574 3575 if (stream->sm_qflags & SMQF_QPACK_DEC) 3576 { 3577 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 3578 stream->sm_qflags |= ~SMQF_QPACK_DEC; 3579 } 3580 3581 drop_buffered_data(stream); 3582 maybe_elide_stream_frames(stream); 3583 maybe_schedule_call_on_close(stream); 3584 3585 if (do_close) 3586 lsquic_stream_close(stream); 3587 else 3588 maybe_conn_to_tickable_if_writeable(stream, 1); 3589} 3590 3591 3592lsquic_stream_id_t 3593lsquic_stream_id (const lsquic_stream_t *stream) 3594{ 3595 return stream->id; 3596} 3597 3598 3599#if !defined(NDEBUG) && __GNUC__ 3600__attribute__((weak)) 3601#endif 3602struct lsquic_conn * 3603lsquic_stream_conn (const lsquic_stream_t *stream) 3604{ 3605 return stream->conn_pub->lconn; 3606} 3607 3608 3609int 3610lsquic_stream_close (lsquic_stream_t *stream) 3611{ 3612 LSQ_DEBUG("lsquic_stream_close() called"); 3613 SM_HISTORY_APPEND(stream, SHE_CLOSE); 3614 if (lsquic_stream_is_closed(stream)) 3615 { 3616 LSQ_INFO("Attempt to close an already-closed stream"); 3617 errno = EBADF; 3618 return -1; 3619 } 3620 maybe_stream_shutdown_write(stream); 3621 stream_shutdown_read(stream); 3622 maybe_schedule_call_on_close(stream); 3623 maybe_finish_stream(stream); 3624 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 3625 maybe_conn_to_tickable_if_writeable(stream, 1); 3626 return 0; 3627} 3628 3629 3630#ifndef NDEBUG 3631#if __GNUC__ 3632__attribute__((weak)) 3633#endif 3634#endif 3635void 3636lsquic_stream_acked (struct lsquic_stream *stream, 3637 enum quic_frame_type frame_type) 3638{ 3639 assert(stream->n_unacked); 3640 --stream->n_unacked; 3641 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 3642 if (frame_type == QUIC_FRAME_RST_STREAM) 3643 { 3644 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 3645 LSQ_DEBUG("RESET that we sent has been acked by peer"); 3646 stream->stream_flags |= STREAM_RST_ACKED; 3647 } 3648 if (0 == stream->n_unacked) 3649 maybe_finish_stream(stream); 3650} 3651 3652 3653void 3654lsquic_stream_push_req (lsquic_stream_t *stream, 3655 struct uncompressed_headers *push_req) 3656{ 3657 assert(!stream->push_req); 3658 stream->push_req = push_req; 3659 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 3660} 3661 3662 3663int 3664lsquic_stream_is_pushed (const lsquic_stream_t *stream) 3665{ 3666 enum stream_id_type sit; 3667 3668 if (stream->sm_bflags & SMBF_IETF) 3669 { 3670 sit = stream->id & SIT_MASK; 3671 return sit == SIT_UNI_SERVER; 3672 } 3673 else 3674 return 1 & ~stream->id; 3675} 3676 3677 3678int 3679lsquic_stream_push_info (const lsquic_stream_t *stream, 3680 lsquic_stream_id_t *ref_stream_id, void **hset) 3681{ 3682 if (lsquic_stream_is_pushed(stream)) 3683 { 3684 assert(stream->push_req); 3685 *ref_stream_id = stream->push_req->uh_stream_id; 3686 *hset = stream->push_req->uh_hset; 3687 return 0; 3688 } 3689 else 3690 return -1; 3691} 3692 3693 3694int 3695lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 3696{ 3697 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3698 && !(stream->stream_flags & STREAM_HAVE_UH)) 3699 { 3700 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3701 LSQ_DEBUG("received uncompressed headers"); 3702 stream->stream_flags |= STREAM_HAVE_UH; 3703 if (uh->uh_flags & UH_FIN) 3704 { 3705 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3706 * mark request as done: 3707 */ 3708 if (stream->sm_bflags & SMBF_IETF) 3709 assert((stream->sm_bflags & SMBF_SERVER) 3710 && lsquic_stream_is_pushed(stream)); 3711 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3712 } 3713 stream->uh = uh; 3714 if (uh->uh_oth_stream_id == 0) 3715 { 3716 if (uh->uh_weight) 3717 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3718 } 3719 else 3720 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3721 uh->uh_oth_stream_id); 3722 return 0; 3723 } 3724 else 3725 { 3726 LSQ_ERROR("received unexpected uncompressed headers"); 3727 return -1; 3728 } 3729} 3730 3731 3732unsigned 3733lsquic_stream_priority (const lsquic_stream_t *stream) 3734{ 3735 return 256 - stream->sm_priority; 3736} 3737 3738 3739int 3740lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 3741{ 3742 /* The user should never get a reference to the special streams, 3743 * but let's check just in case: 3744 */ 3745 if (lsquic_stream_is_critical(stream)) 3746 return -1; 3747 if (priority < 1 || priority > 256) 3748 return -1; 3749 stream->sm_priority = 256 - priority; 3750 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 3751 LSQ_DEBUG("set priority to %u", priority); 3752 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 3753 return 0; 3754} 3755 3756 3757static int 3758maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 3759{ 3760 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3761 && (stream->stream_flags & STREAM_HEADERS_SENT)) 3762 { 3763 /* We need to send headers only if we are a) using HEADERS stream 3764 * and b) we already sent initial headers. If initial headers 3765 * have not been sent yet, stream priority will be sent in the 3766 * HEADERS frame. 3767 */ 3768 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 3769 stream->id, 0, 0, priority); 3770 } 3771 else 3772 return 0; 3773} 3774 3775 3776static int 3777send_priority_ietf (struct lsquic_stream *stream, unsigned priority) 3778{ 3779 LSQ_WARN("%s: TODO", __func__); /* TODO */ 3780 return -1; 3781} 3782 3783 3784int 3785lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 3786{ 3787 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 3788 { 3789 if (stream->sm_bflags & SMBF_IETF) 3790 return send_priority_ietf(stream, priority); 3791 else 3792 return maybe_send_priority_gquic(stream, priority); 3793 } 3794 else 3795 return -1; 3796} 3797 3798 3799lsquic_stream_ctx_t * 3800lsquic_stream_get_ctx (const lsquic_stream_t *stream) 3801{ 3802 return stream->st_ctx; 3803} 3804 3805 3806int 3807lsquic_stream_refuse_push (lsquic_stream_t *stream) 3808{ 3809 if (lsquic_stream_is_pushed(stream) 3810 && !(stream->sm_qflags & SMQF_SEND_RST) 3811 && !(stream->stream_flags & STREAM_RST_SENT)) 3812 { 3813 LSQ_DEBUG("refusing pushed stream: send reset"); 3814 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 3815 return 0; 3816 } 3817 else 3818 return -1; 3819} 3820 3821 3822size_t 3823lsquic_stream_mem_used (const struct lsquic_stream *stream) 3824{ 3825 size_t size; 3826 3827 size = sizeof(stream); 3828 if (stream->sm_buf) 3829 size += stream->sm_n_allocated; 3830 if (stream->data_in) 3831 size += stream->data_in->di_if->di_mem_used(stream->data_in); 3832 3833 return size; 3834} 3835 3836 3837const lsquic_cid_t * 3838lsquic_stream_cid (const struct lsquic_stream *stream) 3839{ 3840 return LSQUIC_LOG_CONN_ID; 3841} 3842 3843 3844void 3845lsquic_stream_dump_state (const struct lsquic_stream *stream) 3846{ 3847 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 3848 stream->read_offset); 3849 stream->data_in->di_if->di_dump_state(stream->data_in); 3850} 3851 3852 3853void * 3854lsquic_stream_get_hset (struct lsquic_stream *stream) 3855{ 3856 void *hset; 3857 3858 if (lsquic_stream_is_reset(stream)) 3859 { 3860 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 3861 errno = ECONNRESET; 3862 return NULL; 3863 } 3864 3865 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 3866 && (stream->stream_flags & STREAM_HAVE_UH))) 3867 { 3868 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 3869 stream->stream_flags); 3870 return NULL; 3871 } 3872 3873 if (!stream->uh) 3874 { 3875 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 3876 return NULL; 3877 } 3878 3879 if (stream->uh->uh_flags & UH_H1H) 3880 { 3881 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 3882 return NULL; 3883 } 3884 3885 hset = stream->uh->uh_hset; 3886 stream->uh->uh_hset = NULL; 3887 destroy_uh(stream); 3888 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 3889 { 3890 stream->stream_flags |= STREAM_FIN_REACHED; 3891 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 3892 } 3893 LSQ_DEBUG("return header set"); 3894 return hset; 3895} 3896 3897 3898/* GQUIC-only function */ 3899int 3900lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id) 3901{ 3902 return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE 3903 || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http); 3904} 3905 3906 3907int 3908lsquic_stream_is_critical (const struct lsquic_stream *stream) 3909{ 3910 return stream->sm_bflags & SMBF_CRITICAL; 3911} 3912 3913 3914void 3915lsquic_stream_set_stream_if (struct lsquic_stream *stream, 3916 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 3917{ 3918 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 3919 stream->stream_if = stream_if; 3920 stream->sm_onnew_arg = stream_if_ctx; 3921 LSQ_DEBUG("switched interface"); 3922 assert(stream->stream_flags & STREAM_ONNEW_DONE); 3923 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 3924 stream); 3925} 3926 3927 3928static int 3929update_type_hist_and_check (struct hq_filter *filter) 3930{ 3931 /* 3-bit codes: */ 3932 enum { 3933 CODE_UNSET, 3934 CODE_HEADER, /* H Header */ 3935 CODE_DATA, /* D Data */ 3936 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 3937 }; 3938 static const unsigned valid_seqs[] = { 3939 /* Ordered by expected frequency */ 3940 0123, /* HD+ */ 3941 012, /* HD */ 3942 01, /* H */ 3943 01231, /* HD+H */ 3944 0121, /* HDH */ 3945 }; 3946 unsigned code, i; 3947 3948 switch (filter->hqfi_type) 3949 { 3950 case HQFT_HEADERS: 3951 code = CODE_HEADER; 3952 break; 3953 case HQFT_DATA: 3954 code = CODE_DATA; 3955 break; 3956 default: 3957 /* Ignore unknown frames */ 3958 return 0; 3959 } 3960 3961 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 3962 return -1; 3963 3964 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 3965 { 3966 filter->hqfi_hist_buf <<= 3; 3967 filter->hqfi_hist_buf |= CODE_PLUS; 3968 filter->hqfi_hist_idx++; 3969 } 3970 else if (filter->hqfi_hist_idx > 1 3971 && ((filter->hqfi_hist_buf >> 3) & 7) == code 3972 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 3973 /* Keep it at plus, do nothing */; 3974 else 3975 { 3976 filter->hqfi_hist_buf <<= 3; 3977 filter->hqfi_hist_buf |= code; 3978 filter->hqfi_hist_idx++; 3979 } 3980 3981 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 3982 if (filter->hqfi_hist_buf == valid_seqs[i]) 3983 return 0; 3984 3985 return -1; 3986} 3987 3988 3989static size_t 3990hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 3991{ 3992 struct lsquic_stream *const stream = ctx; 3993 struct hq_filter *const filter = &stream->sm_hq_filter; 3994 const unsigned char *p = buf, *prev; 3995 const unsigned char *const end = buf + sz; 3996 struct lsquic_conn *lconn; 3997 enum lsqpack_read_header_status rhs; 3998 int s; 3999 4000 while (p < end) 4001 { 4002 switch (filter->hqfi_state) 4003 { 4004 case HQFI_STATE_FRAME_HEADER_BEGIN: 4005 filter->hqfi_vint_state.vr2s_state = 0; 4006 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 4007 /* fall-through */ 4008 case HQFI_STATE_FRAME_HEADER_CONTINUE: 4009 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state); 4010 if (s < 0) 4011 break; 4012 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 4013 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 4014 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 4015 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 4016 filter->hqfi_left); 4017 if (0 != update_type_hist_and_check(filter)) 4018 { 4019 lconn = stream->conn_pub->lconn; 4020 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4021 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 4022 filter->hqfi_hist_buf); 4023 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED, 4024 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 4025 stream->id); 4026 goto end; 4027 } 4028 if (filter->hqfi_type == HQFT_HEADERS) 4029 { 4030 if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS)) 4031 filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS; 4032 else 4033 { 4034 filter->hqfi_type = (1ull << 62) - 1; 4035 LSQ_DEBUG("Ignoring HEADERS frame"); 4036 } 4037 } 4038 if (filter->hqfi_left > 0) 4039 { 4040 if (filter->hqfi_type == HQFT_DATA) 4041 goto end; 4042 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 4043 { 4044 lconn = stream->conn_pub->lconn; 4045 if (stream->sm_bflags & SMBF_SERVER) 4046 lconn->cn_if->ci_abort_error(lconn, 1, 4047 HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame " 4048 "on stream %"PRIu64" (clients are not supposed to " 4049 "send those)", stream->id); 4050 else 4051 /* Our client implementation does not support server 4052 * push. 4053 */ 4054 lconn->cn_if->ci_abort_error(lconn, 1, 4055 HEC_FRAME_UNEXPECTED, 4056 "Received PUSH_PROMISE frame (not supported)" 4057 "on stream %"PRIu64, stream->id); 4058 goto end; 4059 } 4060 } 4061 else 4062 { 4063 switch (filter->hqfi_type) 4064 { 4065 case HQFT_CANCEL_PUSH: 4066 case HQFT_GOAWAY: 4067 case HQFT_HEADERS: 4068 case HQFT_MAX_PUSH_ID: 4069 case HQFT_PUSH_PROMISE: 4070 case HQFT_SETTINGS: 4071 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4072 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 4073 filter->hqfi_type); 4074 abort_connection(stream); /* XXX Overkill? */ 4075 goto end; 4076 default: 4077 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4078 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4079 break; 4080 } 4081 } 4082 break; 4083 case HQFI_STATE_READING_PAYLOAD: 4084 if (filter->hqfi_type == HQFT_DATA) 4085 goto end; 4086 sz = filter->hqfi_left; 4087 if (sz > (uintptr_t) (end - p)) 4088 sz = (uintptr_t) (end - p); 4089 switch (filter->hqfi_type) 4090 { 4091 case HQFT_HEADERS: 4092 prev = p; 4093 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 4094 { 4095 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4096 rhs = lsquic_qdh_header_in_begin( 4097 stream->conn_pub->u.ietf.qdh, 4098 stream, filter->hqfi_left, &p, sz); 4099 } 4100 else 4101 rhs = lsquic_qdh_header_in_continue( 4102 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 4103 assert(p > prev || LQRHS_ERROR == rhs); 4104 filter->hqfi_left -= p - prev; 4105 if (filter->hqfi_left == 0) 4106 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4107 switch (rhs) 4108 { 4109 case LQRHS_DONE: 4110 assert(filter->hqfi_left == 0); 4111 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4112 break; 4113 case LQRHS_NEED: 4114 stream->sm_qflags |= SMQF_QPACK_DEC; 4115 break; 4116 case LQRHS_BLOCKED: 4117 stream->sm_qflags |= SMQF_QPACK_DEC; 4118 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 4119 goto end; 4120 default: 4121 assert(LQRHS_ERROR == rhs); 4122 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4123 LSQ_INFO("error processing header block"); 4124 abort_connection(stream); /* XXX Overkill? */ 4125 goto end; 4126 } 4127 break; 4128 default: 4129 /* Simply skip unknown frame type payload for now */ 4130 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4131 p += sz; 4132 filter->hqfi_left -= sz; 4133 if (filter->hqfi_left == 0) 4134 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4135 break; 4136 } 4137 break; 4138 default: 4139 assert(0); 4140 goto end; 4141 } 4142 } 4143 4144 end: 4145 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4146 { 4147 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4148 filter->hqfi_state); 4149 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4150/* From [draft-ietf-quic-http-16] Section 3.1: 4151 * When a stream terminates cleanly, if the last frame on 4152 * the stream was truncated, this MUST be treated as a connection error 4153 * (see HTTP_MALFORMED_FRAME in Section 8.1). 4154 */ 4155 abort_connection(stream); 4156 } 4157 4158 return p - buf; 4159} 4160 4161 4162static int 4163hq_filter_readable_now (const struct lsquic_stream *stream) 4164{ 4165 const struct hq_filter *const filter = &stream->sm_hq_filter; 4166 4167 return (filter->hqfi_type == HQFT_DATA 4168 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4169 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4170 || stream->uh 4171 || (stream->stream_flags & STREAM_FIN_REACHED) 4172 ; 4173} 4174 4175 4176static int 4177hq_filter_readable (struct lsquic_stream *stream) 4178{ 4179 struct hq_filter *const filter = &stream->sm_hq_filter; 4180 struct read_frames_status rfs; 4181 4182 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 4183 return 0; 4184 4185 if (!hq_filter_readable_now(stream)) 4186 { 4187 rfs = read_data_frames(stream, 0, hq_read, stream); 4188 if (rfs.total_nread == 0) 4189 { 4190 if (rfs.error) 4191 { 4192 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4193 abort_connection(stream); /* XXX Overkill? */ 4194 return 1; /* Collect error */ 4195 } 4196 return 0; 4197 } 4198 } 4199 4200 return hq_filter_readable_now(stream); 4201} 4202 4203 4204static size_t 4205hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 4206{ 4207 struct hq_filter *const filter = &stream->sm_hq_filter; 4208 size_t nr; 4209 4210 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4211 && filter->hqfi_type == HQFT_DATA)) 4212 { 4213 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 4214 data_frame->df_size - data_frame->df_read_off, 4215 data_frame->df_fin); 4216 if (nr) 4217 { 4218 stream->read_offset += nr; 4219 stream_consumed_bytes(stream); 4220 } 4221 } 4222 else 4223 nr = 0; 4224 4225 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 4226 { 4227 data_frame->df_read_off += nr; 4228 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4229 && filter->hqfi_type == HQFT_DATA) 4230 return MIN(filter->hqfi_left, 4231 (unsigned) data_frame->df_size - data_frame->df_read_off); 4232 else 4233 { 4234 assert(data_frame->df_read_off == data_frame->df_size); 4235 return 0; 4236 } 4237 } 4238 else 4239 { 4240 data_frame->df_read_off = data_frame->df_size; 4241 return 0; 4242 } 4243} 4244 4245 4246static void 4247hq_decr_left (struct lsquic_stream *stream, size_t read) 4248{ 4249 struct hq_filter *const filter = &stream->sm_hq_filter; 4250 4251 if (read) 4252 { 4253 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4254 && filter->hqfi_type == HQFT_DATA); 4255 assert(read <= filter->hqfi_left); 4256 } 4257 4258 filter->hqfi_left -= read; 4259 if (0 == filter->hqfi_left) 4260 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4261} 4262 4263 4264struct qpack_dec_hdl * 4265lsquic_stream_get_qdh (const struct lsquic_stream *stream) 4266{ 4267 return stream->conn_pub->u.ietf.qdh; 4268} 4269 4270 4271/* These are IETF QUIC states */ 4272enum stream_state_sending 4273lsquic_stream_sending_state (const struct lsquic_stream *stream) 4274{ 4275 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 4276 { 4277 if (stream->stream_flags & STREAM_FIN_SENT) 4278 { 4279 if (stream->n_unacked) 4280 return SSS_DATA_SENT; 4281 else 4282 return SSS_DATA_RECVD; 4283 } 4284 else 4285 { 4286 if (stream->tosend_off 4287 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 4288 return SSS_SEND; 4289 else 4290 return SSS_READY; 4291 } 4292 } 4293 else if (stream->stream_flags & STREAM_RST_ACKED) 4294 return SSS_RESET_RECVD; 4295 else 4296 return SSS_RESET_SENT; 4297} 4298 4299 4300const char *const lsquic_sss2str[] = 4301{ 4302 [SSS_READY] = "Ready", 4303 [SSS_SEND] = "Send", 4304 [SSS_DATA_SENT] = "Data Sent", 4305 [SSS_RESET_SENT] = "Reset Sent", 4306 [SSS_DATA_RECVD] = "Data Recvd", 4307 [SSS_RESET_RECVD] = "Reset Recvd", 4308}; 4309 4310 4311const char *const lsquic_ssr2str[] = 4312{ 4313 [SSR_RECV] = "Recv", 4314 [SSR_SIZE_KNOWN] = "Size Known", 4315 [SSR_DATA_RECVD] = "Data Recvd", 4316 [SSR_RESET_RECVD] = "Reset Recvd", 4317 [SSR_DATA_READ] = "Data Read", 4318 [SSR_RESET_READ] = "Reset Read", 4319}; 4320 4321 4322/* These are IETF QUIC states */ 4323enum stream_state_receiving 4324lsquic_stream_receiving_state (struct lsquic_stream *stream) 4325{ 4326 uint64_t n_bytes; 4327 4328 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 4329 { 4330 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 4331 return SSR_RECV; 4332 if (stream->stream_flags & STREAM_FIN_REACHED) 4333 return SSR_DATA_READ; 4334 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 4335 { 4336 n_bytes = stream->data_in->di_if->di_readable_bytes( 4337 stream->data_in, stream->read_offset); 4338 if (stream->read_offset + n_bytes == stream->sm_fin_off) 4339 { 4340 stream->stream_flags |= STREAM_DATA_RECVD; 4341 return SSR_DATA_RECVD; 4342 } 4343 else 4344 return SSR_SIZE_KNOWN; 4345 } 4346 else 4347 return SSR_DATA_RECVD; 4348 } 4349 else if (stream->stream_flags & STREAM_RST_READ) 4350 return SSR_RESET_READ; 4351 else 4352 return SSR_RESET_RECVD; 4353} 4354 4355 4356void 4357lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 4358{ 4359 struct hq_filter *const filter = &stream->sm_hq_filter; 4360 4361 assert(stream->sm_qflags & SMQF_QPACK_DEC); 4362 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 4363 4364 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 4365 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 4366 LSQ_DEBUG("QPACK decoder unblocked"); 4367} 4368 4369 4370int 4371lsquic_stream_is_rejected (const struct lsquic_stream *stream) 4372{ 4373 return stream->stream_flags & STREAM_SS_RECVD; 4374} 4375 4376 4377int 4378lsquic_stream_can_push (const struct lsquic_stream *stream) 4379{ 4380 if (lsquic_stream_is_pushed(stream)) 4381 return 0; 4382 else if (stream->sm_bflags & SMBF_IETF) 4383 return (stream->sm_bflags & SMBF_USE_HEADERS) 4384 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 4385 && stream->sm_send_headers_state == SSHS_BEGIN 4386 ; 4387 else 4388 return 1; 4389} 4390 4391 4392static size_t 4393dp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4394{ 4395 struct lsquic_stream *const stream = lsqr_ctx; 4396 unsigned char *dst = buf; 4397 unsigned char *const end = buf + count; 4398 size_t len; 4399 4400 len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off), 4401 (size_t) (end - dst)); 4402 memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len); 4403 stream->sm_dup_push_off += len; 4404 4405 if (stream->sm_dup_push_len == stream->sm_dup_push_off) 4406 LSQ_DEBUG("finish writing duplicate push"); 4407 4408 return len; 4409} 4410 4411 4412static size_t 4413dp_reader_size (void *lsqr_ctx) 4414{ 4415 struct lsquic_stream *const stream = lsqr_ctx; 4416 4417 return stream->sm_dup_push_len - stream->sm_dup_push_off; 4418} 4419 4420 4421static void 4422init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader) 4423{ 4424 reader->lsqr_read = dp_reader_read; 4425 reader->lsqr_size = dp_reader_size; 4426 reader->lsqr_ctx = stream; 4427} 4428 4429 4430static void 4431on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4432{ 4433 struct lsquic_reader dp_reader; 4434 ssize_t nw; 4435 int want_write; 4436 4437 assert(stream->sm_dup_push_off < stream->sm_dup_push_len); 4438 4439 init_dp_reader(stream, &dp_reader); 4440 nw = stream_write(stream, &dp_reader); 4441 if (nw > 0) 4442 { 4443 LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)", 4444 nw, stream->sm_dup_push_off == stream->sm_dup_push_len ? 4445 "done" : "not done"); 4446 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4447 { 4448 /* Restore want_write flag */ 4449 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4450 if (want_write != stream->sm_saved_want_write) 4451 (void) lsquic_stream_wantwrite(stream, 4452 stream->sm_saved_want_write); 4453 } 4454 } 4455 else if (nw < 0) 4456 { 4457 LSQ_WARN("could not write duplicate push (wrapper)"); 4458 /* XXX What should happen if we hit an error? TODO */ 4459 } 4460} 4461 4462 4463int 4464lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id) 4465{ 4466 struct lsquic_reader dp_reader; 4467 unsigned bits, len; 4468 ssize_t nw; 4469 4470 assert(stream->sm_bflags & SMBF_IETF); 4471 assert(lsquic_stream_can_push(stream)); 4472 4473 bits = vint_val2bits(push_id); 4474 len = 1 << bits; 4475 4476 if (!stream_activate_hq_frame(stream, 4477 stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH, 4478 SHF_FIXED_SIZE, len)) 4479 return -1; 4480 4481 stream->stream_flags |= STREAM_PUSHING; 4482 4483 stream->sm_dup_push_len = len; 4484 stream->sm_dup_push_off = 0; 4485 vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits); 4486 4487 init_dp_reader(stream, &dp_reader); 4488 nw = stream_write(stream, &dp_reader); 4489 if (nw > 0) 4490 { 4491 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4492 LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id); 4493 else 4494 { 4495 LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id); 4496 stream->stream_flags |= STREAM_NOPUSH; 4497 stream->sm_saved_want_write = 4498 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4499 stream_wantwrite(stream, 1); 4500 } 4501 return 0; 4502 } 4503 else 4504 { 4505 if (nw < 0) 4506 LSQ_WARN("failure writing DUPLICATE_PUSH"); 4507 stream->stream_flags |= STREAM_NOPUSH; 4508 stream->stream_flags &= ~STREAM_PUSHING; 4509 return -1; 4510 } 4511} 4512 4513 4514static size_t 4515pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4516{ 4517 struct push_promise *const promise = lsqr_ctx; 4518 unsigned char *dst = buf; 4519 unsigned char *const end = buf + count; 4520 size_t len; 4521 4522 while (dst < end) 4523 { 4524 switch (promise->pp_write_state) 4525 { 4526 case PPWS_ID0: 4527 case PPWS_ID1: 4528 case PPWS_ID2: 4529 case PPWS_ID3: 4530 case PPWS_ID4: 4531 case PPWS_ID5: 4532 case PPWS_ID6: 4533 case PPWS_ID7: 4534 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 4535 ++promise->pp_write_state; 4536 break; 4537 case PPWS_PFX0: 4538 *dst++ = 0; 4539 ++promise->pp_write_state; 4540 break; 4541 case PPWS_PFX1: 4542 *dst++ = 0; 4543 ++promise->pp_write_state; 4544 break; 4545 case PPWS_HBLOCK: 4546 len = MIN(promise->pp_content_len - promise->pp_write_off, 4547 (size_t) (end - dst)); 4548 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 4549 len); 4550 promise->pp_write_off += len; 4551 dst += len; 4552 if (promise->pp_content_len == promise->pp_write_off) 4553 { 4554 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 4555 ": reset push state", promise->pp_id); 4556 promise->pp_write_state = PPWS_DONE; 4557 } 4558 goto end; 4559 default: 4560 goto end; 4561 } 4562 } 4563 4564 end: 4565 return dst - (unsigned char *) buf; 4566} 4567 4568 4569static size_t 4570pp_reader_size (void *lsqr_ctx) 4571{ 4572 struct push_promise *const promise = lsqr_ctx; 4573 size_t size; 4574 4575 size = 0; 4576 switch (promise->pp_write_state) 4577 { 4578 case PPWS_ID0: 4579 case PPWS_ID1: 4580 case PPWS_ID2: 4581 case PPWS_ID3: 4582 case PPWS_ID4: 4583 case PPWS_ID5: 4584 case PPWS_ID6: 4585 case PPWS_ID7: 4586 size += 8 - promise->pp_write_state; 4587 case PPWS_PFX0: 4588 ++size; 4589 /* fall-through */ 4590 case PPWS_PFX1: 4591 ++size; 4592 /* fall-through */ 4593 case PPWS_HBLOCK: 4594 size += promise->pp_content_len - promise->pp_write_off; 4595 break; 4596 default: 4597 break; 4598 } 4599 4600 return size; 4601} 4602 4603 4604static void 4605init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 4606{ 4607 reader->lsqr_read = pp_reader_read; 4608 reader->lsqr_size = pp_reader_size; 4609 reader->lsqr_ctx = promise; 4610} 4611 4612 4613static void 4614on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4615{ 4616 struct lsquic_reader pp_reader; 4617 struct push_promise *promise; 4618 ssize_t nw; 4619 int want_write; 4620 4621 assert(stream_is_pushing_promise(stream)); 4622 4623 promise = SLIST_FIRST(&stream->sm_promises); 4624 init_pp_reader(promise, &pp_reader); 4625 nw = stream_write(stream, &pp_reader); 4626 if (nw > 0) 4627 { 4628 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 4629 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 4630 if (promise->pp_write_state == PPWS_DONE) 4631 { 4632 /* Restore want_write flag */ 4633 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4634 if (want_write != stream->sm_saved_want_write) 4635 (void) lsquic_stream_wantwrite(stream, 4636 stream->sm_saved_want_write); 4637 } 4638 } 4639 else if (nw < 0) 4640 { 4641 LSQ_WARN("could not write push promise (wrapper)"); 4642 /* XXX What should happen if we hit an error? TODO */ 4643 } 4644} 4645 4646 4647/* Success means that the push promise has been placed on sm_promises list and 4648 * the stream now owns it. Failure means that the push promise should be 4649 * destroyed by the caller. 4650 * 4651 * A push promise is written immediately. If it cannot be written to packets 4652 * or buffered whole, the stream is marked as unable to push further promises. 4653 */ 4654int 4655lsquic_stream_push_promise (struct lsquic_stream *stream, 4656 struct push_promise *promise) 4657{ 4658 struct lsquic_reader pp_reader; 4659 unsigned bits, len; 4660 ssize_t nw; 4661 4662 assert(stream->sm_bflags & SMBF_IETF); 4663 assert(lsquic_stream_can_push(stream)); 4664 4665 bits = vint_val2bits(promise->pp_id); 4666 len = 1 << bits; 4667 promise->pp_write_state = 8 - len; 4668 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 4669 bits, 1 << bits); 4670 4671 if (!stream_activate_hq_frame(stream, 4672 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 4673 SHF_FIXED_SIZE, pp_reader_size(promise))) 4674 return -1; 4675 4676 stream->stream_flags |= STREAM_PUSHING; 4677 4678 init_pp_reader(promise, &pp_reader); 4679 nw = stream_write(stream, &pp_reader); 4680 if (nw > 0) 4681 { 4682 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 4683 ++promise->pp_refcnt; 4684 if (promise->pp_write_state == PPWS_DONE) 4685 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 4686 else 4687 { 4688 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 4689 ", disable further pushing", promise->pp_id, 4690 promise->pp_write_state, promise->pp_write_off); 4691 stream->stream_flags |= STREAM_NOPUSH; 4692 stream->sm_saved_want_write = 4693 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4694 stream_wantwrite(stream, 1); 4695 } 4696 return 0; 4697 } 4698 else 4699 { 4700 if (nw < 0) 4701 LSQ_WARN("failure writing push promise"); 4702 stream->stream_flags |= STREAM_NOPUSH; 4703 stream->stream_flags &= ~STREAM_PUSHING; 4704 return -1; 4705 } 4706} 4707