lsquic_stream.c revision 0adf085a
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_varint.h" 39#include "lsquic_hq.h" 40#include "lsquic_hash.h" 41#include "lsquic_stream.h" 42#include "lsquic_conn_public.h" 43#include "lsquic_util.h" 44#include "lsquic_mm.h" 45#include "lsquic_headers_stream.h" 46#include "lsquic_conn.h" 47#include "lsquic_data_in_if.h" 48#include "lsquic_parse.h" 49#include "lsquic_packet_out.h" 50#include "lsquic_engine_public.h" 51#include "lsquic_senhist.h" 52#include "lsquic_pacer.h" 53#include "lsquic_cubic.h" 54#include "lsquic_bw_sampler.h" 55#include "lsquic_minmax.h" 56#include "lsquic_bbr.h" 57#include "lsquic_send_ctl.h" 58#include "lsquic_headers.h" 59#include "lsquic_ev_log.h" 60#include "lsquic_enc_sess.h" 61#include "lsqpack.h" 62#include "lsquic_frab_list.h" 63#include "lsquic_http1x_if.h" 64#include "lsquic_qdec_hdl.h" 65#include "lsquic_qenc_hdl.h" 66#include "lsquic_byteswap.h" 67#include "lsquic_ietf.h" 68#include "lsquic_push_promise.h" 69 70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 72#define LSQUIC_LOG_STREAM_ID stream->id 73#include "lsquic_logger.h" 74 75#define MIN(a, b) ((a) < (b) ? (a) : (b)) 76 77static void 78drop_frames_in (lsquic_stream_t *stream); 79 80static void 81maybe_schedule_call_on_close (lsquic_stream_t *stream); 82 83static int 84stream_wantread (lsquic_stream_t *stream, int is_want); 85 86static int 87stream_wantwrite (lsquic_stream_t *stream, int is_want); 88 89static ssize_t 90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 91 92static ssize_t 93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 94 95static int 96stream_flush (lsquic_stream_t *stream); 97 98static int 99stream_flush_nocheck (lsquic_stream_t *stream); 100 101static void 102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 103 104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 105 106static enum swtp_status 107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 108 109static enum swtp_status 110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 111 112static enum swtp_status 113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 114 115static size_t 116stream_write_avail_no_frames (struct lsquic_stream *); 117 118static size_t 119stream_write_avail_with_frames (struct lsquic_stream *); 120 121static size_t 122stream_write_avail_with_headers (struct lsquic_stream *); 123 124static int 125hq_filter_readable (struct lsquic_stream *stream); 126 127static void 128hq_decr_left (struct lsquic_stream *stream, size_t); 129 130static size_t 131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 132 133static int 134stream_readable_non_http (struct lsquic_stream *stream); 135 136static int 137stream_readable_http_gquic (struct lsquic_stream *stream); 138 139static int 140stream_readable_http_ietf (struct lsquic_stream *stream); 141 142static ssize_t 143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 144 145static size_t 146active_hq_frame_sizes (const struct lsquic_stream *); 147 148static void 149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 150 151static void 152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 153 154static void 155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 156 157static size_t 158stream_hq_frame_size (const struct stream_hq_frame *); 159 160const struct stream_filter_if hq_stream_filter_if = 161{ 162 .sfi_readable = hq_filter_readable, 163 .sfi_filter_df = hq_filter_df, 164 .sfi_decr_left = hq_decr_left, 165}; 166 167 168#if LSQUIC_KEEP_STREAM_HISTORY 169/* These values are printable ASCII characters for ease of printing the 170 * whole history in a single line of a log message. 171 * 172 * The list of events is not exhaustive: only most interesting events 173 * are recorded. 174 */ 175enum stream_history_event 176{ 177 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 178 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 179 SHE_REACH_FIN = 'a', 180 SHE_BLOCKED_OUT = 'b', 181 SHE_CREATED = 'C', 182 SHE_FRAME_IN = 'd', 183 SHE_FRAME_OUT = 'D', 184 SHE_RESET = 'e', 185 SHE_WINDOW_UPDATE = 'E', 186 SHE_FIN_IN = 'f', 187 SHE_FINISHED = 'F', 188 SHE_GOAWAY_IN = 'g', 189 SHE_USER_WRITE_HEADER = 'h', 190 SHE_HEADERS_IN = 'H', 191 SHE_IF_SWITCH = 'i', 192 SHE_ONCLOSE_SCHED = 'l', 193 SHE_ONCLOSE_CALL = 'L', 194 SHE_ONNEW = 'N', 195 SHE_SET_PRIO = 'p', 196 SHE_USER_READ = 'r', 197 SHE_SHUTDOWN_READ = 'R', 198 SHE_RST_IN = 's', 199 SHE_SS_IN = 'S', 200 SHE_RST_OUT = 't', 201 SHE_RST_ACKED = 'T', 202 SHE_FLUSH = 'u', 203 SHE_USER_WRITE_DATA = 'w', 204 SHE_SHUTDOWN_WRITE = 'W', 205 SHE_CLOSE = 'X', 206 SHE_DELAY_SW = 'y', 207 SHE_FORCE_FINISH = 'Z', 208 SHE_WANTREAD_NO = '0', /* "YES" must be one more than "NO" */ 209 SHE_WANTREAD_YES = '1', 210 SHE_WANTWRITE_NO = '2', 211 SHE_WANTWRITE_YES = '3', 212}; 213 214static void 215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 216{ 217 enum stream_history_event prev_event; 218 sm_hist_idx_t idx; 219 int plus; 220 221 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 222 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 223 idx = (idx - plus) & SM_HIST_IDX_MASK; 224 prev_event = stream->sm_hist_buf[idx]; 225 226 if (prev_event == sh_event && plus) 227 return; 228 229 if (prev_event == sh_event) 230 sh_event = SHE_PLUS; 231 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 232 233 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 234 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 235 stream->sm_hist_buf); 236} 237 238 239# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 240# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 241 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 242 LSQ_DEBUG("history: [%.*s]", \ 243 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 244 (stream)->sm_hist_buf); \ 245 } while (0) 246#else 247# define SM_HISTORY_APPEND(stream, event) 248# define SM_HISTORY_DUMP_REMAINING(stream) 249#endif 250 251 252static int 253stream_inside_callback (const lsquic_stream_t *stream) 254{ 255 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 256} 257 258 259static void 260maybe_conn_to_tickable (lsquic_stream_t *stream) 261{ 262 if (!stream_inside_callback(stream)) 263 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 264 stream->conn_pub->lconn); 265} 266 267 268/* Here, "readable" means that the user is able to read from the stream. */ 269static void 270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 271{ 272 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 273 { 274 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 275 stream->conn_pub->lconn); 276 } 277} 278 279 280/* Here, "writeable" means that data can be put into packets to be 281 * scheduled to be sent out. 282 * 283 * If `check_can_send' is false, it means that we do not need to check 284 * whether packets can be sent. This check was already performed when 285 * we packetized stream data. 286 */ 287static void 288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 289 int check_can_send) 290{ 291 if (!stream_inside_callback(stream) && 292 (!check_can_send 293 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 294 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 295 { 296 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 297 stream->conn_pub->lconn); 298 } 299} 300 301 302static int 303stream_stalled (const lsquic_stream_t *stream) 304{ 305 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 306 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 307 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 308} 309 310 311static size_t 312stream_stream_frame_header_sz (const struct lsquic_stream *stream, 313 unsigned data_sz) 314{ 315 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 316 stream->id, stream->tosend_off, data_sz); 317} 318 319 320static size_t 321stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 322 unsigned data_sz_IGNORED) 323{ 324 return stream->conn_pub->lconn->cn_pf 325 ->pf_calc_crypto_frame_header_sz(stream->tosend_off); 326} 327 328 329/* GQUIC-only function */ 330static int 331stream_is_hsk (const struct lsquic_stream *stream) 332{ 333 if (stream->sm_bflags & SMBF_IETF) 334 return 0; 335 else 336 return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE; 337} 338 339 340static struct lsquic_stream * 341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 342 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 343 enum stream_ctor_flags ctor_flags) 344{ 345 struct lsquic_stream *stream; 346 347 stream = calloc(1, sizeof(*stream)); 348 if (!stream) 349 return NULL; 350 351 if (ctor_flags & SCF_USE_DI_HASH) 352 stream->data_in = data_in_hash_new(conn_pub, id, 0); 353 else 354 stream->data_in = data_in_nocopy_new(conn_pub, id); 355 if (!stream->data_in) 356 { 357 free(stream); 358 return NULL; 359 } 360 361 stream->id = id; 362 stream->stream_if = stream_if; 363 stream->conn_pub = conn_pub; 364 stream->sm_onnew_arg = stream_if_ctx; 365 stream->sm_write_avail = stream_write_avail_no_frames; 366 367 STAILQ_INIT(&stream->sm_hq_frames); 368 369 stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1); 370 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 371 stream->sm_bflags |= SMBF_SERVER; 372 373 return stream; 374} 375 376 377/* TODO: The logic to figure out whether the stream is connection limited 378 * should be taken out of the constructor. The caller should specify this 379 * via one of enum stream_ctor_flags. 380 */ 381lsquic_stream_t * 382lsquic_stream_new (lsquic_stream_id_t id, 383 struct lsquic_conn_public *conn_pub, 384 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 385 unsigned initial_window, uint64_t initial_send_off, 386 enum stream_ctor_flags ctor_flags) 387{ 388 lsquic_cfcw_t *cfcw; 389 lsquic_stream_t *stream; 390 391 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 392 ctor_flags); 393 if (!stream) 394 return NULL; 395 396 if (!initial_window) 397 initial_window = 16 * 1024; 398 399 if (ctor_flags & SCF_IETF) 400 { 401 cfcw = &conn_pub->cfcw; 402 stream->sm_bflags |= SMBF_CONN_LIMITED; 403 if (ctor_flags & SCF_HTTP) 404 { 405 stream->sm_write_avail = stream_write_avail_with_headers; 406 stream->sm_readable = stream_readable_http_ietf; 407 stream->sm_sfi = &hq_stream_filter_if; 408 } 409 else 410 stream->sm_readable = stream_readable_non_http; 411 lsquic_stream_set_priority_internal(stream, 412 LSQUIC_STREAM_DEFAULT_PRIO); 413 stream->sm_write_to_packet = stream_write_to_packet_std; 414 } 415 else 416 { 417 if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id)) 418 cfcw = NULL; 419 else 420 { 421 cfcw = &conn_pub->cfcw; 422 stream->sm_bflags |= SMBF_CONN_LIMITED; 423 lsquic_stream_set_priority_internal(stream, 424 LSQUIC_STREAM_DEFAULT_PRIO); 425 } 426 if (stream->sm_bflags & SMBF_USE_HEADERS) 427 stream->sm_readable = stream_readable_http_gquic; 428 else 429 stream->sm_readable = stream_readable_non_http; 430 if (stream_is_hsk(stream)) 431 stream->sm_write_to_packet = stream_write_to_packet_hsk; 432 else 433 stream->sm_write_to_packet = stream_write_to_packet_std; 434 } 435 436 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 437 stream->max_send_off = initial_send_off; 438 LSQ_DEBUG("created stream"); 439 SM_HISTORY_APPEND(stream, SHE_CREATED); 440 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 441 if (ctor_flags & SCF_CALL_ON_NEW) 442 lsquic_stream_call_on_new(stream); 443 return stream; 444} 445 446 447struct lsquic_stream * 448lsquic_stream_new_crypto (enum enc_level enc_level, 449 struct lsquic_conn_public *conn_pub, 450 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 451 enum stream_ctor_flags ctor_flags) 452{ 453 struct lsquic_stream *stream; 454 lsquic_stream_id_t stream_id; 455 456 assert(ctor_flags & SCF_CRITICAL); 457 458 stream_id = ~0ULL - enc_level; 459 stream = stream_new_common(stream_id, conn_pub, stream_if, 460 stream_if_ctx, ctor_flags); 461 if (!stream) 462 return NULL; 463 464 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 465 stream->sm_enc_level = enc_level; 466 /* TODO: why have limit in crypto stream? Set it to UINT64_MAX? */ 467 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 468 stream->max_send_off = 16 * 1024; 469 LSQ_DEBUG("created crypto stream"); 470 SM_HISTORY_APPEND(stream, SHE_CREATED); 471 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 472 stream->sm_write_to_packet = stream_write_to_packet_crypto; 473 stream->sm_readable = stream_readable_non_http; 474 if (ctor_flags & SCF_CALL_ON_NEW) 475 lsquic_stream_call_on_new(stream); 476 return stream; 477} 478 479 480void 481lsquic_stream_call_on_new (lsquic_stream_t *stream) 482{ 483 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 484 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 485 { 486 LSQ_DEBUG("calling on_new_stream"); 487 SM_HISTORY_APPEND(stream, SHE_ONNEW); 488 stream->stream_flags |= STREAM_ONNEW_DONE; 489 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 490 stream); 491 } 492} 493 494 495static void 496decr_conn_cap (struct lsquic_stream *stream, size_t incr) 497{ 498 if (stream->sm_bflags & SMBF_CONN_LIMITED) 499 { 500 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 501 stream->conn_pub->conn_cap.cc_sent -= incr; 502 } 503} 504 505 506static void 507maybe_resize_stream_buffer (struct lsquic_stream *stream) 508{ 509 assert(0 == stream->sm_n_buffered); 510 511 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 512 { 513 free(stream->sm_buf); 514 stream->sm_buf = NULL; 515 stream->sm_n_allocated = 0; 516 } 517 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 518 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 519} 520 521 522static void 523drop_buffered_data (struct lsquic_stream *stream) 524{ 525 decr_conn_cap(stream, stream->sm_n_buffered); 526 stream->sm_n_buffered = 0; 527 maybe_resize_stream_buffer(stream); 528 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 529 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 530} 531 532 533static void 534destroy_uh (struct lsquic_stream *stream) 535{ 536 if (stream->uh) 537 { 538 if (stream->uh->uh_hset) 539 stream->conn_pub->enpub->enp_hsi_if 540 ->hsi_discard_header_set(stream->uh->uh_hset); 541 free(stream->uh); 542 stream->uh = NULL; 543 } 544} 545 546 547void 548lsquic_stream_destroy (lsquic_stream_t *stream) 549{ 550 struct push_promise *promise; 551 struct stream_hq_frame *shf; 552 553 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 554 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 555 STREAM_ONNEW_DONE) 556 { 557 stream->stream_flags |= STREAM_ONCLOSE_DONE; 558 stream->stream_if->on_close(stream, stream->st_ctx); 559 } 560 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 561 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 562 if (stream->sm_qflags & SMQF_WANT_READ) 563 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 564 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 565 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 566 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 567 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 568 if (stream->sm_qflags & SMQF_QPACK_DEC) 569 lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream); 570 drop_buffered_data(stream); 571 lsquic_sfcw_consume_rem(&stream->fc); 572 drop_frames_in(stream); 573 if (stream->push_req) 574 { 575 if (stream->push_req->uh_hset) 576 stream->conn_pub->enpub->enp_hsi_if 577 ->hsi_discard_header_set(stream->push_req->uh_hset); 578 free(stream->push_req); 579 } 580 while ((promise = SLIST_FIRST(&stream->sm_promises))) 581 { 582 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 583 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 584 } 585 if (stream->sm_promise) 586 { 587 assert(stream->sm_promise->pp_pushed_stream == stream); 588 stream->sm_promise->pp_pushed_stream = NULL; 589 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 590 } 591 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 592 stream_hq_frame_put(stream, shf); 593 destroy_uh(stream); 594 free(stream->sm_buf); 595 free(stream->sm_header_block); 596 LSQ_DEBUG("destroyed stream"); 597 SM_HISTORY_DUMP_REMAINING(stream); 598 free(stream); 599} 600 601 602static int 603stream_is_finished (const lsquic_stream_t *stream) 604{ 605 return lsquic_stream_is_closed(stream) 606 /* n_unacked checks that no outgoing packets that reference this 607 * stream are outstanding: 608 */ 609 && 0 == stream->n_unacked 610 /* This checks that no packets that reference this stream will 611 * become outstanding: 612 */ 613 && 0 == (stream->sm_qflags & SMQF_SEND_RST) 614 && ((stream->stream_flags & STREAM_FORCE_FINISH) 615 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 616} 617 618 619/* This is an internal function */ 620void 621lsquic_stream_force_finish (struct lsquic_stream *stream) 622{ 623 LSQ_DEBUG("stream is now finished"); 624 SM_HISTORY_APPEND(stream, SHE_FINISHED); 625 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 626 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 627 next_service_stream); 628 stream->sm_qflags |= SMQF_FREE_STREAM; 629 stream->stream_flags |= STREAM_FINISHED; 630} 631 632 633static void 634maybe_finish_stream (lsquic_stream_t *stream) 635{ 636 if (0 == (stream->stream_flags & STREAM_FINISHED) && 637 stream_is_finished(stream)) 638 lsquic_stream_force_finish(stream); 639} 640 641 642static void 643maybe_schedule_call_on_close (lsquic_stream_t *stream) 644{ 645 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 646 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 647 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 648 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 649 { 650 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 651 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 652 next_service_stream); 653 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 654 LSQ_DEBUG("scheduled calling on_close"); 655 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 656 } 657} 658 659 660void 661lsquic_stream_call_on_close (lsquic_stream_t *stream) 662{ 663 assert(stream->stream_flags & STREAM_ONNEW_DONE); 664 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 665 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 666 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 667 next_service_stream); 668 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 669 { 670 LSQ_DEBUG("calling on_close"); 671 stream->stream_flags |= STREAM_ONCLOSE_DONE; 672 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 673 stream->stream_if->on_close(stream, stream->st_ctx); 674 } 675 else 676 assert(0); 677} 678 679 680static int 681stream_readable_non_http (struct lsquic_stream *stream) 682{ 683 return stream->data_in->di_if->di_get_frame(stream->data_in, 684 stream->read_offset) != NULL; 685} 686 687 688static int 689stream_readable_http_gquic (struct lsquic_stream *stream) 690{ 691 return (stream->stream_flags & STREAM_HAVE_UH) 692 && (stream->uh 693 || stream->data_in->di_if->di_get_frame(stream->data_in, 694 stream->read_offset)); 695} 696 697 698static int 699stream_readable_http_ietf (struct lsquic_stream *stream) 700{ 701 return 702 /* If we have read the header set and the header set has not yet 703 * been read, the stream is readable. 704 */ 705 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 706 || 707 /* Alternatively, run the filter and check for payload availability. */ 708 (stream->sm_sfi->sfi_readable(stream) 709 && (/* Running the filter may result in hitting FIN: */ 710 (stream->stream_flags & STREAM_FIN_REACHED) 711 || stream->data_in->di_if->di_get_frame(stream->data_in, 712 stream->read_offset))); 713} 714 715 716int 717lsquic_stream_readable (struct lsquic_stream *stream) 718{ 719 /* A stream is readable if one of the following is true: */ 720 return 721 /* - It is already finished: in that case, lsquic_stream_read() will 722 * return 0. 723 */ 724 (stream->stream_flags & STREAM_FIN_REACHED) 725 /* - The stream is reset, by either side. In this case, 726 * lsquic_stream_read() will return -1 (we want the user to be 727 * able to collect the error). 728 */ 729 || lsquic_stream_is_reset(stream) 730 /* Type-dependent readability check: */ 731 || stream->sm_readable(stream); 732 ; 733} 734 735 736static size_t 737stream_write_avail_no_frames (struct lsquic_stream *stream) 738{ 739 uint64_t stream_avail, conn_avail; 740 741 stream_avail = stream->max_send_off - stream->tosend_off 742 - stream->sm_n_buffered; 743 744 if (stream->sm_bflags & SMBF_CONN_LIMITED) 745 { 746 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 747 if (conn_avail < stream_avail) 748 stream_avail = conn_avail; 749 } 750 751 return stream_avail; 752} 753 754 755static size_t 756stream_write_avail_with_frames (struct lsquic_stream *stream) 757{ 758 uint64_t stream_avail, conn_avail; 759 const struct stream_hq_frame *shf; 760 size_t size; 761 762 stream_avail = stream->max_send_off - stream->tosend_off 763 - stream->sm_n_buffered; 764 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 765 if (!(shf->shf_flags & SHF_WRITTEN)) 766 { 767 size = stream_hq_frame_size(shf); 768 assert(size <= stream_avail); 769 stream_avail -= size; 770 } 771 772 if (stream->sm_bflags & SMBF_CONN_LIMITED) 773 { 774 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 775 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 776 if (!(shf->shf_flags & SHF_CC_PAID)) 777 { 778 size = stream_hq_frame_size(shf); 779 if (size < conn_avail) 780 conn_avail -= size; 781 else 782 return 0; 783 } 784 if (conn_avail < stream_avail) 785 stream_avail = conn_avail; 786 } 787 788 if (stream_avail >= 3 /* Smallest new frame */) 789 return stream_avail; 790 else 791 return 0; 792} 793 794 795static int 796stream_is_pushing_promise (const struct lsquic_stream *stream) 797{ 798 return (stream->stream_flags & STREAM_PUSHING) 799 && SLIST_FIRST(&stream->sm_promises) 800 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 801 ; 802} 803 804 805/* To prevent deadlocks, ensure that when headers are sent, the bytes 806 * sent on the encoder stream are written first. 807 * 808 * XXX If the encoder is set up in non-risking mode, it is perfectly 809 * fine to send the header block first. TODO: update the logic to 810 * reflect this. There should be two sending behaviors: risk and non-risk. 811 * For now, we assume risk for everything to be on the conservative side. 812 */ 813static size_t 814stream_write_avail_with_headers (struct lsquic_stream *stream) 815{ 816 if (stream->stream_flags & STREAM_PUSHING) 817 return stream_write_avail_with_frames(stream); 818 819 switch (stream->sm_send_headers_state) 820 { 821 case SSHS_BEGIN: 822 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 823 case SSHS_ENC_SENDING: 824 if (stream->sm_hb_compl > 825 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 826 return 0; 827 LSQ_DEBUG("encoder stream bytes have all been sent"); 828 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 829 /* fall-through */ 830 default: 831 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 832 return stream_write_avail_with_frames(stream); 833 } 834} 835 836 837size_t 838lsquic_stream_write_avail (struct lsquic_stream *stream) 839{ 840 return stream->sm_write_avail(stream); 841} 842 843 844int 845lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 846{ 847 struct lsquic_conn *lconn; 848 849 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 850 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 851 { 852 if (stream->sm_bflags & SMBF_IETF) 853 { 854 lconn = stream->conn_pub->lconn; 855 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 856 "flow control violation on stream %"PRIu64, stream->id); 857 } 858 return -1; 859 } 860 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 861 { 862 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 863 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 864 next_send_stream); 865 stream->sm_qflags |= SMQF_SEND_WUF; 866 } 867 return 0; 868} 869 870 871int 872lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 873{ 874 uint64_t max_off; 875 int got_next_offset, rv, free_frame; 876 enum ins_frame ins_frame; 877 struct lsquic_conn *lconn; 878 879 assert(frame->packet_in); 880 881 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 882 LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; " 883 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 884 885 if ((stream->sm_bflags & SMBF_USE_HEADERS) 886 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 887 { 888 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 889 lsquic_malo_put(frame); 890 return -1; 891 } 892 893 if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF) 894 && (stream->stream_flags & STREAM_FIN_RECVD) 895 && stream->sm_fin_off != DF_END(frame)) 896 { 897 lconn = stream->conn_pub->lconn; 898 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 899 "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does " 900 "not match previous final size %"PRIu64, DF_END(frame), 901 stream->id, stream->sm_fin_off); 902 return -1; 903 } 904 905 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 906 insert_frame: 907 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 908 if (INS_FRAME_OK == ins_frame) 909 { 910 /* Update maximum offset in the flow controller and check for flow 911 * control violation: 912 */ 913 rv = -1; 914 free_frame = !stream->data_in->di_if->di_own_on_ok; 915 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 916 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 917 goto end_ok; 918 if (frame->data_frame.df_fin) 919 { 920 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 921 stream->stream_flags |= STREAM_FIN_RECVD; 922 stream->sm_fin_off = DF_END(frame); 923 maybe_finish_stream(stream); 924 } 925 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 926 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 927 { 928 stream->data_in = stream->data_in->di_if->di_switch_impl( 929 stream->data_in, stream->read_offset); 930 if (!stream->data_in) 931 { 932 stream->data_in = data_in_error_new(); 933 goto end_ok; 934 } 935 } 936 if (got_next_offset) 937 /* Checking the offset saves di_get_frame() call */ 938 maybe_conn_to_tickable_if_readable(stream); 939 rv = 0; 940 end_ok: 941 if (free_frame) 942 lsquic_malo_put(frame); 943 return rv; 944 } 945 else if (INS_FRAME_DUP == ins_frame) 946 { 947 return 0; 948 } 949 else if (INS_FRAME_OVERLAP == ins_frame) 950 { 951 LSQ_DEBUG("overlap: switching DATA IN implementation"); 952 stream->data_in = stream->data_in->di_if->di_switch_impl( 953 stream->data_in, stream->read_offset); 954 if (stream->data_in) 955 goto insert_frame; 956 stream->data_in = data_in_error_new(); 957 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 958 lsquic_malo_put(frame); 959 return -1; 960 } 961 else 962 { 963 assert(INS_FRAME_ERR == ins_frame); 964 return -1; 965 } 966} 967 968 969static void 970drop_frames_in (lsquic_stream_t *stream) 971{ 972 if (stream->data_in) 973 { 974 stream->data_in->di_if->di_destroy(stream->data_in); 975 /* To avoid checking whether `data_in` is set, just set to the error 976 * data-in stream. It does the right thing after incoming data is 977 * dropped. 978 */ 979 stream->data_in = data_in_error_new(); 980 } 981} 982 983 984static void 985maybe_elide_stream_frames (struct lsquic_stream *stream) 986{ 987 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 988 { 989 if (stream->n_unacked) 990 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 991 stream->id); 992 stream->stream_flags |= STREAM_FRAMES_ELIDED; 993 } 994} 995 996 997int 998lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 999 uint64_t error_code) 1000{ 1001 struct lsquic_conn *lconn; 1002 1003 if ((stream->sm_bflags & SMBF_IETF) 1004 && (stream->stream_flags & STREAM_FIN_RECVD) 1005 && stream->sm_fin_off != offset) 1006 { 1007 lconn = stream->conn_pub->lconn; 1008 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR, 1009 "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") " 1010 "does not match previous final size %"PRIu64, offset, 1011 stream->id, stream->sm_fin_off); 1012 return -1; 1013 } 1014 1015 if (stream->stream_flags & STREAM_RST_RECVD) 1016 { 1017 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 1018 return 0; 1019 } 1020 1021 SM_HISTORY_APPEND(stream, SHE_RST_IN); 1022 /* This flag must always be set, even if we are "ignoring" it: it is 1023 * used by elision code. 1024 */ 1025 stream->stream_flags |= STREAM_RST_RECVD; 1026 1027 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 1028 { 1029 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is " 1030 "smaller than that of byte following the last byte we have seen: " 1031 "0x%"PRIX64, offset, 1032 lsquic_sfcw_get_max_recv_off(&stream->fc)); 1033 return -1; 1034 } 1035 1036 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 1037 { 1038 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64 1039 " violates flow control", offset); 1040 return -1; 1041 } 1042 1043 /* Let user collect error: */ 1044 maybe_conn_to_tickable_if_readable(stream); 1045 1046 lsquic_sfcw_consume_rem(&stream->fc); 1047 drop_frames_in(stream); 1048 drop_buffered_data(stream); 1049 maybe_elide_stream_frames(stream); 1050 1051 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1052 && !(stream->sm_qflags & SMQF_SEND_RST)) 1053 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1054 1055 stream->stream_flags |= STREAM_RST_RECVD; 1056 1057 maybe_finish_stream(stream); 1058 maybe_schedule_call_on_close(stream); 1059 1060 return 0; 1061} 1062 1063 1064void 1065lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1066 uint64_t error_code) 1067{ 1068 if (stream->stream_flags & STREAM_SS_RECVD) 1069 { 1070 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1071 return; 1072 } 1073 1074 SM_HISTORY_APPEND(stream, SHE_SS_IN); 1075 stream->stream_flags |= STREAM_SS_RECVD; 1076 1077 /* Let user collect error: */ 1078 maybe_conn_to_tickable_if_readable(stream); 1079 1080 lsquic_sfcw_consume_rem(&stream->fc); 1081 drop_frames_in(stream); 1082 drop_buffered_data(stream); 1083 maybe_elide_stream_frames(stream); 1084 1085 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1086 && !(stream->sm_qflags & SMQF_SEND_RST)) 1087 lsquic_stream_reset_ext(stream, error_code, 0); 1088 1089 maybe_finish_stream(stream); 1090 maybe_schedule_call_on_close(stream); 1091} 1092 1093 1094uint64_t 1095lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1096{ 1097 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1098} 1099 1100 1101void 1102lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1103{ 1104 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1105 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1106 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1107 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1108 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1109} 1110 1111 1112uint64_t 1113lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1114{ 1115 assert(stream->sm_qflags & SMQF_SEND_WUF); 1116 stream->sm_qflags &= ~SMQF_SEND_WUF; 1117 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1118 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1119 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1120} 1121 1122 1123void 1124lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1125{ 1126 uint64_t last_off; 1127 1128 if (stream->sm_last_recv_off) 1129 last_off = stream->sm_last_recv_off; 1130 else 1131 /* This gets advertized in transport parameters */ 1132 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1133 1134 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1135 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1136 1137 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1138 { 1139 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1140 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1141 next_send_stream); 1142 stream->sm_qflags |= SMQF_SEND_WUF; 1143 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1144 } 1145 else if (stream->sm_qflags & SMQF_SEND_WUF) 1146 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1147 else if (stream->sm_last_recv_off) 1148 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1149 "packetized or sent", stream->sm_last_recv_off); 1150 else 1151 LSQ_INFO("Peer should have receive transport param limit " 1152 "of %"PRIu64"; odd.", last_off); 1153} 1154 1155 1156/* GQUIC's BLOCKED frame does not have an offset */ 1157void 1158lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1159{ 1160 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1161 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1162 { 1163 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1164 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1165 next_send_stream); 1166 stream->sm_qflags |= SMQF_SEND_WUF; 1167 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1168 } 1169 else 1170 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1171} 1172 1173 1174void 1175lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1176{ 1177 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1178 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1179 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1180 stream->stream_flags |= STREAM_BLOCKED_SENT; 1181 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1182 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1183} 1184 1185 1186void 1187lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1188{ 1189 assert(stream->sm_qflags & SMQF_SEND_RST); 1190 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1191 stream->sm_qflags &= ~SMQF_SEND_RST; 1192 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1193 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1194 stream->stream_flags |= STREAM_RST_SENT; 1195 maybe_finish_stream(stream); 1196} 1197 1198 1199static size_t 1200read_uh (struct lsquic_stream *stream, 1201 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1202{ 1203 struct http1x_headers *const h1h = stream->uh->uh_hset; 1204 size_t nread; 1205 1206 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1207 h1h->h1h_size - h1h->h1h_off, 1208 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1209 h1h->h1h_off += nread; 1210 if (h1h->h1h_off == h1h->h1h_size) 1211 { 1212 LSQ_DEBUG("read all uncompressed headers"); 1213 destroy_uh(stream); 1214 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1215 { 1216 stream->stream_flags |= STREAM_FIN_REACHED; 1217 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1218 } 1219 } 1220 return nread; 1221} 1222 1223 1224static void 1225stream_consumed_bytes (struct lsquic_stream *stream) 1226{ 1227 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1228 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1229 { 1230 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1231 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1232 next_send_stream); 1233 stream->sm_qflags |= SMQF_SEND_WUF; 1234 maybe_conn_to_tickable_if_writeable(stream, 1); 1235 } 1236} 1237 1238 1239struct read_frames_status 1240{ 1241 int error; 1242 int processed_frames; 1243 size_t total_nread; 1244}; 1245 1246 1247static struct read_frames_status 1248read_data_frames (struct lsquic_stream *stream, int do_filtering, 1249 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1250{ 1251 struct data_frame *data_frame; 1252 size_t nread, toread, total_nread; 1253 int short_read, processed_frames; 1254 1255 processed_frames = 0; 1256 total_nread = 0; 1257 1258 while ((data_frame = stream->data_in->di_if->di_get_frame( 1259 stream->data_in, stream->read_offset))) 1260 { 1261 1262 ++processed_frames; 1263 1264 do 1265 { 1266 if (do_filtering && stream->sm_sfi) 1267 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1268 else 1269 toread = data_frame->df_size - data_frame->df_read_off; 1270 1271 if (toread || data_frame->df_fin) 1272 { 1273 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1274 toread, data_frame->df_fin); 1275 if (do_filtering && stream->sm_sfi) 1276 stream->sm_sfi->sfi_decr_left(stream, nread); 1277 data_frame->df_read_off += nread; 1278 stream->read_offset += nread; 1279 total_nread += nread; 1280 short_read = nread < toread; 1281 } 1282 else 1283 short_read = 0; 1284 1285 if (data_frame->df_read_off == data_frame->df_size) 1286 { 1287 const int fin = data_frame->df_fin; 1288 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1289 data_frame = NULL; 1290 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 1291 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 1292 { 1293 stream->data_in = stream->data_in->di_if->di_switch_impl( 1294 stream->data_in, stream->read_offset); 1295 if (!stream->data_in) 1296 { 1297 stream->data_in = data_in_error_new(); 1298 return (struct read_frames_status) { .error = 1, }; 1299 } 1300 } 1301 if (fin) 1302 { 1303 stream->stream_flags |= STREAM_FIN_REACHED; 1304 goto end_while; 1305 } 1306 } 1307 else if (short_read) 1308 goto end_while; 1309 } 1310 while (data_frame); 1311 } 1312 end_while: 1313 1314 if (processed_frames) 1315 stream_consumed_bytes(stream); 1316 1317 return (struct read_frames_status) { 1318 .error = 0, 1319 .processed_frames = processed_frames, 1320 .total_nread = total_nread, 1321 }; 1322} 1323 1324 1325static ssize_t 1326stream_readf (struct lsquic_stream *stream, 1327 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1328{ 1329 size_t total_nread, nread; 1330 int read_unc_headers; 1331 1332 total_nread = 0; 1333 1334 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1335 == (SMBF_USE_HEADERS|SMBF_IETF) 1336 && !(stream->stream_flags & STREAM_HAVE_UH) 1337 && !stream->uh) 1338 { 1339 if (stream->sm_readable(stream)) 1340 { 1341 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1342 { 1343 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1344 errno = EBADMSG; 1345 return -1; 1346 } 1347 assert(stream->uh); 1348 } 1349 else 1350 { 1351 errno = EWOULDBLOCK; 1352 return -1; 1353 } 1354 } 1355 1356 if (stream->uh) 1357 { 1358 if (stream->uh->uh_flags & UH_H1H) 1359 { 1360 nread = read_uh(stream, readf, ctx); 1361 read_unc_headers = nread > 0; 1362 total_nread += nread; 1363 if (stream->uh) 1364 return total_nread; 1365 } 1366 else 1367 { 1368 LSQ_INFO("header set not claimed: cannot read from stream"); 1369 return -1; 1370 } 1371 } 1372 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1373 && !(stream->stream_flags & STREAM_HAVE_UH)) 1374 { 1375 LSQ_DEBUG("cannot read: headers not available"); 1376 errno = EWOULDBLOCK; 1377 return -1; 1378 } 1379 else 1380 read_unc_headers = 0; 1381 1382 const struct read_frames_status rfs 1383 = read_data_frames(stream, 1, readf, ctx); 1384 if (rfs.error) 1385 return -1; 1386 total_nread += rfs.total_nread; 1387 1388 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 1389 total_nread, stream->read_offset); 1390 1391 if (rfs.processed_frames || read_unc_headers) 1392 { 1393 return total_nread; 1394 } 1395 else 1396 { 1397 assert(0 == total_nread); 1398 errno = EWOULDBLOCK; 1399 return -1; 1400 } 1401} 1402 1403 1404/* This function returns 0 when EOF is reached. 1405 */ 1406ssize_t 1407lsquic_stream_readf (struct lsquic_stream *stream, 1408 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1409{ 1410 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1411 1412 if (lsquic_stream_is_reset(stream)) 1413 { 1414 if (stream->stream_flags & STREAM_RST_RECVD) 1415 stream->stream_flags |= STREAM_RST_READ; 1416 errno = ECONNRESET; 1417 return -1; 1418 } 1419 if (stream->stream_flags & STREAM_U_READ_DONE) 1420 { 1421 errno = EBADF; 1422 return -1; 1423 } 1424 if ((stream->stream_flags & STREAM_FIN_REACHED) 1425 && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH) 1426 ^ !!(stream->sm_bflags & SMBF_USE_HEADERS))) 1427 return 0; 1428 1429 return stream_readf(stream, readf, ctx); 1430} 1431 1432 1433struct readv_ctx 1434{ 1435 const struct iovec *iov; 1436 const struct iovec *const end; 1437 unsigned char *p; 1438}; 1439 1440 1441static size_t 1442readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1443{ 1444 struct readv_ctx *const ctx = ctx_p; 1445 const unsigned char *const end = buf + len; 1446 size_t ntocopy; 1447 1448 while (ctx->iov < ctx->end && buf < end) 1449 { 1450 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1451 - ctx->p; 1452 if (ntocopy > (size_t) (end - buf)) 1453 ntocopy = end - buf; 1454 memcpy(ctx->p, buf, ntocopy); 1455 ctx->p += ntocopy; 1456 buf += ntocopy; 1457 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1458 { 1459 do 1460 ++ctx->iov; 1461 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1462 if (ctx->iov < ctx->end) 1463 ctx->p = ctx->iov->iov_base; 1464 else 1465 ctx->p = NULL; 1466 } 1467 } 1468 1469 return len - (end - buf); 1470} 1471 1472 1473ssize_t 1474lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1475 int iovcnt) 1476{ 1477 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1478 return lsquic_stream_readf(stream, readv_f, &ctx); 1479} 1480 1481 1482ssize_t 1483lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1484{ 1485 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1486 return lsquic_stream_readv(stream, &iov, 1); 1487} 1488 1489 1490static void 1491stream_shutdown_read (lsquic_stream_t *stream) 1492{ 1493 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1494 { 1495 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1496 stream->stream_flags |= STREAM_U_READ_DONE; 1497 stream_wantread(stream, 0); 1498 maybe_finish_stream(stream); 1499 } 1500} 1501 1502 1503static int 1504stream_is_incoming_unidir (const struct lsquic_stream *stream) 1505{ 1506 enum stream_id_type sit; 1507 1508 if (stream->sm_bflags & SMBF_IETF) 1509 { 1510 sit = stream->id & SIT_MASK; 1511 if (stream->sm_bflags & SMBF_SERVER) 1512 return sit == SIT_UNI_CLIENT; 1513 else 1514 return sit == SIT_UNI_SERVER; 1515 } 1516 else 1517 return 0; 1518} 1519 1520 1521static void 1522stream_shutdown_write (lsquic_stream_t *stream) 1523{ 1524 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1525 return; 1526 1527 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1528 stream->stream_flags |= STREAM_U_WRITE_DONE; 1529 stream_wantwrite(stream, 0); 1530 1531 /* Don't bother to check whether there is anything else to write if 1532 * the flags indicate that nothing else should be written. 1533 */ 1534 if (!(stream->sm_bflags & SMBF_CRYPTO) 1535 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1536 && !stream_is_incoming_unidir(stream) 1537 && !(stream->sm_qflags & SMQF_SEND_RST)) 1538 { 1539 if ((stream->sm_bflags & SMBF_USE_HEADERS) 1540 && !(stream->stream_flags & STREAM_HEADERS_SENT)) 1541 { 1542 LSQ_DEBUG("headers not sent, send a reset"); 1543 lsquic_stream_reset(stream, 0); 1544 } 1545 else if (stream->sm_n_buffered == 0) 1546 { 1547 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1548 stream)) 1549 { 1550 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1551 stream->stream_flags |= STREAM_FIN_SENT; 1552 } 1553 else 1554 { 1555 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1556 "flag in it"); 1557 (void) stream_flush_nocheck(stream); 1558 } 1559 } 1560 else 1561 (void) stream_flush_nocheck(stream); 1562 } 1563} 1564 1565 1566static void 1567maybe_stream_shutdown_write (struct lsquic_stream *stream) 1568{ 1569 if (stream->sm_send_headers_state == SSHS_BEGIN) 1570 stream_shutdown_write(stream); 1571 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1572 { 1573 LSQ_DEBUG("shutdown delayed"); 1574 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1575 stream->stream_flags |= STREAM_DELAYED_SW; 1576 } 1577} 1578 1579 1580int 1581lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1582{ 1583 LSQ_DEBUG("shutdown; how: %d", how); 1584 if (lsquic_stream_is_closed(stream)) 1585 { 1586 LSQ_INFO("Attempt to shut down a closed stream"); 1587 errno = EBADF; 1588 return -1; 1589 } 1590 /* 0: read, 1: write: 2: read and write 1591 */ 1592 if (how < 0 || how > 2) 1593 { 1594 errno = EINVAL; 1595 return -1; 1596 } 1597 1598 if (how) 1599 maybe_stream_shutdown_write(stream); 1600 if (how != 1) 1601 stream_shutdown_read(stream); 1602 1603 maybe_finish_stream(stream); 1604 maybe_schedule_call_on_close(stream); 1605 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1606 maybe_conn_to_tickable_if_writeable(stream, 1); 1607 1608 return 0; 1609} 1610 1611 1612void 1613lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1614{ 1615 LSQ_DEBUG("internal shutdown"); 1616 if (lsquic_stream_is_critical(stream)) 1617 { 1618 LSQ_DEBUG("add flag to force-finish special stream"); 1619 stream->stream_flags |= STREAM_FORCE_FINISH; 1620 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1621 } 1622 maybe_finish_stream(stream); 1623 maybe_schedule_call_on_close(stream); 1624} 1625 1626 1627static void 1628fake_reset_unused_stream (lsquic_stream_t *stream) 1629{ 1630 stream->stream_flags |= 1631 STREAM_RST_RECVD /* User will pick this up on read or write */ 1632 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1633 ; 1634 1635 /* Cancel all writes to the network scheduled for this stream: */ 1636 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1637 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1638 next_send_stream); 1639 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1640 drop_buffered_data(stream); 1641 LSQ_DEBUG("fake-reset stream%s", 1642 stream_stalled(stream) ? " (stalled)" : ""); 1643 maybe_finish_stream(stream); 1644 maybe_schedule_call_on_close(stream); 1645} 1646 1647 1648/* This function should only be called for locally-initiated streams whose ID 1649 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1650 * frame sent by peer but we have not yet received it and created a stream. 1651 * In this situation, we mark the stream as reset, so that user's on_read or 1652 * on_write event callback picks up the error. That, in turn, should result 1653 * in stream being closed. 1654 * 1655 * If we have received any data frames on this stream, this probably indicates 1656 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1657 * lower than this. However, we still try to handle it gracefully and peform 1658 * a shutdown, as if the stream was not reset. 1659 */ 1660void 1661lsquic_stream_received_goaway (lsquic_stream_t *stream) 1662{ 1663 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1664 if (0 == stream->read_offset && 1665 stream->data_in->di_if->di_empty(stream->data_in)) 1666 fake_reset_unused_stream(stream); /* Normal condition */ 1667 else 1668 { /* This is odd, let's handle it the best we can: */ 1669 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1670 lsquic_stream_shutdown_internal(stream); 1671 } 1672} 1673 1674 1675uint64_t 1676lsquic_stream_read_offset (const lsquic_stream_t *stream) 1677{ 1678 return stream->read_offset; 1679} 1680 1681 1682static int 1683stream_wantread (lsquic_stream_t *stream, int is_want) 1684{ 1685 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 1686 const int new_val = !!is_want; 1687 if (old_val != new_val) 1688 { 1689 if (new_val) 1690 { 1691 if (!old_val) 1692 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1693 next_read_stream); 1694 stream->sm_qflags |= SMQF_WANT_READ; 1695 } 1696 else 1697 { 1698 stream->sm_qflags &= ~SMQF_WANT_READ; 1699 if (old_val) 1700 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1701 next_read_stream); 1702 } 1703 } 1704 return old_val; 1705} 1706 1707 1708static void 1709maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1710{ 1711 assert(SMQF_WRITE_Q_FLAGS & flag); 1712 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1713 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1714 next_write_stream); 1715 stream->sm_qflags |= flag; 1716} 1717 1718 1719static void 1720maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1721{ 1722 assert(SMQF_WRITE_Q_FLAGS & flag); 1723 if (stream->sm_qflags & flag) 1724 { 1725 stream->sm_qflags &= ~flag; 1726 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1727 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1728 next_write_stream); 1729 } 1730} 1731 1732 1733static int 1734stream_wantwrite (struct lsquic_stream *stream, int new_val) 1735{ 1736 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1737 1738 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 1739 1740 if (old_val != new_val) 1741 { 1742 if (new_val) 1743 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 1744 else 1745 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 1746 } 1747 return old_val; 1748} 1749 1750 1751int 1752lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1753{ 1754 SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want); 1755 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1756 { 1757 if (is_want) 1758 maybe_conn_to_tickable_if_readable(stream); 1759 return stream_wantread(stream, is_want); 1760 } 1761 else 1762 { 1763 errno = EBADF; 1764 return -1; 1765 } 1766} 1767 1768 1769int 1770lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1771{ 1772 int old_val; 1773 1774 is_want = !!is_want; 1775 1776 SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want); 1777 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 1778 && SSHS_BEGIN == stream->sm_send_headers_state) 1779 { 1780 stream->sm_saved_want_write = is_want; 1781 if (is_want) 1782 maybe_conn_to_tickable_if_writeable(stream, 1); 1783 return stream_wantwrite(stream, is_want); 1784 } 1785 else if (SSHS_BEGIN != stream->sm_send_headers_state) 1786 { 1787 old_val = stream->sm_saved_want_write; 1788 stream->sm_saved_want_write = is_want; 1789 return old_val; 1790 } 1791 else 1792 { 1793 errno = EBADF; 1794 return -1; 1795 } 1796} 1797 1798 1799struct progress 1800{ 1801 enum stream_flags s_flags; 1802 enum stream_q_flags q_flags; 1803}; 1804 1805 1806static struct progress 1807stream_progress (const struct lsquic_stream *stream) 1808{ 1809 return (struct progress) { 1810 .s_flags = stream->stream_flags 1811 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 1812 .q_flags = stream->sm_qflags 1813 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 1814 }; 1815} 1816 1817 1818static int 1819progress_eq (struct progress a, struct progress b) 1820{ 1821 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 1822} 1823 1824 1825static void 1826stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1827{ 1828 unsigned no_progress_count, no_progress_limit; 1829 struct progress progress; 1830 uint64_t size; 1831 1832 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1833 1834 no_progress_count = 0; 1835 while ((stream->sm_qflags & SMQF_WANT_READ) 1836 && lsquic_stream_readable(stream)) 1837 { 1838 progress = stream_progress(stream); 1839 size = stream->read_offset; 1840 1841 stream->stream_if->on_read(stream, stream->st_ctx); 1842 1843 if (no_progress_limit && size == stream->read_offset && 1844 progress_eq(progress, stream_progress(stream))) 1845 { 1846 ++no_progress_count; 1847 if (no_progress_count >= no_progress_limit) 1848 { 1849 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1850 "progress) in user code reading from stream", 1851 no_progress_count, 1852 no_progress_count == 1 ? "" : "s"); 1853 break; 1854 } 1855 } 1856 else 1857 no_progress_count = 0; 1858 } 1859} 1860 1861 1862static void 1863stream_hblock_sent (struct lsquic_stream *stream) 1864{ 1865 int want_write; 1866 1867 LSQ_DEBUG("header block has been sent: restore default behavior"); 1868 stream->sm_send_headers_state = SSHS_BEGIN; 1869 stream->sm_write_avail = stream_write_avail_with_frames; 1870 1871 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1872 if (want_write != stream->sm_saved_want_write) 1873 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 1874 1875 if (stream->stream_flags & STREAM_DELAYED_SW) 1876 { 1877 LSQ_DEBUG("performing delayed shutdown write"); 1878 stream->stream_flags &= ~STREAM_DELAYED_SW; 1879 stream_shutdown_write(stream); 1880 maybe_schedule_call_on_close(stream); 1881 maybe_finish_stream(stream); 1882 maybe_conn_to_tickable_if_writeable(stream, 1); 1883 } 1884} 1885 1886 1887static void 1888on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 1889{ 1890 ssize_t nw; 1891 1892 nw = stream_write_buf(stream, 1893 stream->sm_header_block + stream->sm_hblock_off, 1894 stream->sm_hblock_sz - stream->sm_hblock_off); 1895 if (nw > 0) 1896 { 1897 stream->sm_hblock_off += nw; 1898 if (stream->sm_hblock_off == stream->sm_hblock_sz) 1899 { 1900 stream->stream_flags |= STREAM_HEADERS_SENT; 1901 free(stream->sm_header_block); 1902 stream->sm_header_block = NULL; 1903 stream->sm_hblock_sz = 0; 1904 stream_hblock_sent(stream); 1905 LSQ_DEBUG("header block written out successfully"); 1906 /* TODO: if there was eos, do something else */ 1907 if (stream->sm_qflags & SMQF_WANT_WRITE) 1908 stream->stream_if->on_write(stream, h); 1909 } 1910 else 1911 { 1912 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 1913 nw); 1914 } 1915 } 1916 else if (nw < 0) 1917 { 1918 /* XXX What should happen if we hit an error? TODO */ 1919 } 1920} 1921 1922 1923static void 1924(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 1925 lsquic_stream_ctx_t *) 1926{ 1927 if (0 == (stream->stream_flags & STREAM_PUSHING) 1928 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 1929 /* Common case */ 1930 return stream->stream_if->on_write; 1931 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 1932 return on_write_header_wrapper; 1933 else 1934 { 1935 assert(stream->stream_flags & STREAM_PUSHING); 1936 if (stream_is_pushing_promise(stream)) 1937 return on_write_pp_wrapper; 1938 else if (stream->sm_dup_push_off < stream->sm_dup_push_len) 1939 return on_write_dp_wrapper; 1940 else 1941 return stream->stream_if->on_write; 1942 } 1943} 1944 1945 1946static void 1947stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1948{ 1949 unsigned no_progress_count, no_progress_limit; 1950 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1951 struct progress progress; 1952 1953 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1954 1955 no_progress_count = 0; 1956 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1957 while ((stream->sm_qflags & SMQF_WANT_WRITE) 1958 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 1959 && lsquic_stream_write_avail(stream)) 1960 { 1961 progress = stream_progress(stream); 1962 1963 on_write = select_on_write(stream); 1964 on_write(stream, stream->st_ctx); 1965 1966 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 1967 { 1968 ++no_progress_count; 1969 if (no_progress_count >= no_progress_limit) 1970 { 1971 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1972 "progress) in user code writing to stream", 1973 no_progress_count, 1974 no_progress_count == 1 ? "" : "s"); 1975 break; 1976 } 1977 } 1978 else 1979 no_progress_count = 0; 1980 } 1981} 1982 1983 1984static void 1985stream_dispatch_read_events_once (lsquic_stream_t *stream) 1986{ 1987 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 1988 { 1989 stream->stream_if->on_read(stream, stream->st_ctx); 1990 } 1991} 1992 1993 1994uint64_t 1995lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 1996{ 1997 size_t frames_sizes; 1998 1999 frames_sizes = active_hq_frame_sizes(stream); 2000 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 2001} 2002 2003 2004static void 2005maybe_mark_as_blocked (lsquic_stream_t *stream) 2006{ 2007 struct lsquic_conn_cap *cc; 2008 uint64_t used; 2009 2010 used = lsquic_stream_combined_send_off(stream); 2011 if (stream->max_send_off == used) 2012 { 2013 if (stream->blocked_off < stream->max_send_off) 2014 { 2015 stream->blocked_off = used; 2016 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 2017 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2018 next_send_stream); 2019 stream->sm_qflags |= SMQF_SEND_BLOCKED; 2020 LSQ_DEBUG("marked stream-blocked at stream offset " 2021 "%"PRIu64, stream->blocked_off); 2022 } 2023 else 2024 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 2025 " has been, or is about to be, sent", stream->blocked_off); 2026 } 2027 2028 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 2029 && (cc = &stream->conn_pub->conn_cap, 2030 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 2031 { 2032 if (cc->cc_blocked < cc->cc_max) 2033 { 2034 cc->cc_blocked = cc->cc_max; 2035 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 2036 LSQ_DEBUG("marked connection-blocked at connection offset " 2037 "%"PRIu64, cc->cc_max); 2038 } 2039 else 2040 LSQ_DEBUG("stream has already been marked connection-blocked " 2041 "at offset %"PRIu64, cc->cc_blocked); 2042 } 2043} 2044 2045 2046void 2047lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 2048{ 2049 assert(stream->sm_qflags & SMQF_WANT_READ); 2050 2051 if (stream->sm_bflags & SMBF_RW_ONCE) 2052 stream_dispatch_read_events_once(stream); 2053 else 2054 stream_dispatch_read_events_loop(stream); 2055} 2056 2057 2058void 2059lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 2060{ 2061 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 2062 int progress; 2063 uint64_t tosend_off; 2064 unsigned short n_buffered; 2065 enum stream_q_flags q_flags; 2066 2067 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 2068 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 2069 tosend_off = stream->tosend_off; 2070 n_buffered = stream->sm_n_buffered; 2071 2072 if (stream->sm_qflags & SMQF_WANT_FLUSH) 2073 (void) stream_flush(stream); 2074 2075 if (stream->sm_bflags & SMBF_RW_ONCE) 2076 { 2077 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2078 && lsquic_stream_write_avail(stream)) 2079 { 2080 on_write = select_on_write(stream); 2081 on_write(stream, stream->st_ctx); 2082 } 2083 } 2084 else 2085 stream_dispatch_write_events_loop(stream); 2086 2087 /* Progress means either flags or offsets changed: */ 2088 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2089 stream->tosend_off == tosend_off && 2090 stream->sm_n_buffered == n_buffered); 2091 2092 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2093 { 2094 if (progress) 2095 { /* Move the stream to the end of the list to ensure fairness. */ 2096 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2097 next_write_stream); 2098 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2099 next_write_stream); 2100 } 2101 } 2102} 2103 2104 2105static size_t 2106inner_reader_empty_size (void *ctx) 2107{ 2108 return 0; 2109} 2110 2111 2112static size_t 2113inner_reader_empty_read (void *ctx, void *buf, size_t count) 2114{ 2115 return 0; 2116} 2117 2118 2119static int 2120stream_flush (lsquic_stream_t *stream) 2121{ 2122 struct lsquic_reader empty_reader; 2123 ssize_t nw; 2124 2125 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2126 assert(stream->sm_n_buffered > 0 || 2127 /* Flushing is also used to packetize standalone FIN: */ 2128 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2129 == STREAM_U_WRITE_DONE)); 2130 2131 empty_reader.lsqr_size = inner_reader_empty_size; 2132 empty_reader.lsqr_read = inner_reader_empty_read; 2133 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2134 nw = stream_write_to_packets(stream, &empty_reader, 0); 2135 2136 if (nw >= 0) 2137 { 2138 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2139 return 0; 2140 } 2141 else 2142 return -1; 2143} 2144 2145 2146static int 2147stream_flush_nocheck (lsquic_stream_t *stream) 2148{ 2149 size_t frames; 2150 2151 frames = active_hq_frame_sizes(stream); 2152 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2153 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2154 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2155 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2156 2157 return stream_flush(stream); 2158} 2159 2160 2161int 2162lsquic_stream_flush (lsquic_stream_t *stream) 2163{ 2164 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2165 { 2166 LSQ_DEBUG("cannot flush closed stream"); 2167 errno = EBADF; 2168 return -1; 2169 } 2170 2171 if (0 == stream->sm_n_buffered) 2172 { 2173 LSQ_DEBUG("flushing 0 bytes: noop"); 2174 return 0; 2175 } 2176 2177 return stream_flush_nocheck(stream); 2178} 2179 2180 2181static size_t 2182stream_get_n_allowed (const struct lsquic_stream *stream) 2183{ 2184 if (stream->sm_n_allocated) 2185 return stream->sm_n_allocated; 2186 else 2187 return stream->conn_pub->path->np_pack_size; 2188} 2189 2190 2191/* The flush threshold is the maximum size of stream data that can be sent 2192 * in a full packet. 2193 */ 2194#ifdef NDEBUG 2195static 2196#endif 2197 size_t 2198lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2199 unsigned data_sz) 2200{ 2201 enum packet_out_flags flags; 2202 enum packno_bits bits; 2203 size_t packet_header_sz, stream_header_sz, tag_len; 2204 size_t threshold; 2205 2206 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 2207 flags = bits << POBIT_SHIFT; 2208 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2209 flags |= PO_CONN_ID; 2210 if (stream_is_hsk(stream)) 2211 flags |= PO_LONGHEAD; 2212 2213 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2214 stream->conn_pub->path->np_dcid.len); 2215 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2216 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2217 2218 threshold = stream_get_n_allowed(stream) - tag_len 2219 - packet_header_sz - stream_header_sz; 2220 return threshold; 2221} 2222 2223 2224#define COMMON_WRITE_CHECKS() do { \ 2225 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2226 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2227 { \ 2228 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2229 { \ 2230 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2231 return 0; \ 2232 } \ 2233 else \ 2234 { \ 2235 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2236 "headers"); \ 2237 errno = EILSEQ; \ 2238 return -1; \ 2239 } \ 2240 } \ 2241 if (lsquic_stream_is_reset(stream)) \ 2242 { \ 2243 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2244 errno = ECONNRESET; \ 2245 return -1; \ 2246 } \ 2247 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2248 { \ 2249 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2250 "writing"); \ 2251 errno = EBADF; \ 2252 return -1; \ 2253 } \ 2254} while (0) 2255 2256 2257struct frame_gen_ctx 2258{ 2259 lsquic_stream_t *fgc_stream; 2260 struct lsquic_reader *fgc_reader; 2261 /* We keep our own count of how many bytes were read from reader because 2262 * some readers are external. The external caller does not have to rely 2263 * on our count, but it can. 2264 */ 2265 size_t fgc_nread_from_reader; 2266 size_t (*fgc_size) (void *ctx); 2267 int (*fgc_fin) (void *ctx); 2268 gsf_read_f fgc_read; 2269}; 2270 2271 2272static size_t 2273frame_std_gen_size (void *ctx) 2274{ 2275 struct frame_gen_ctx *fg_ctx = ctx; 2276 size_t available, remaining; 2277 2278 /* Make sure we are not writing past available size: */ 2279 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2280 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2281 if (available < remaining) 2282 remaining = available; 2283 2284 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2285} 2286 2287 2288static size_t 2289stream_hq_frame_size (const struct stream_hq_frame *shf) 2290{ 2291 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2292 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2293 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2294 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2295 else 2296 { 2297 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2298 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2299 return 0; 2300 } 2301} 2302 2303 2304static size_t 2305active_hq_frame_sizes (const struct lsquic_stream *stream) 2306{ 2307 const struct stream_hq_frame *shf; 2308 size_t size; 2309 2310 size = 0; 2311 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2312 == (SMBF_IETF|SMBF_USE_HEADERS)) 2313 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2314 if (!(shf->shf_flags & SHF_WRITTEN)) 2315 size += stream_hq_frame_size(shf); 2316 2317 return size; 2318} 2319 2320 2321static uint64_t 2322stream_hq_frame_end (const struct stream_hq_frame *shf) 2323{ 2324 if (shf->shf_flags & SHF_FIXED_SIZE) 2325 return shf->shf_off + shf->shf_frame_size; 2326 else if (shf->shf_flags & SHF_TWO_BYTES) 2327 return shf->shf_off + ((1 << 14) - 1); 2328 else 2329 return shf->shf_off + ((1 << 6) - 1); 2330} 2331 2332 2333static int 2334frame_in_stream (const struct lsquic_stream *stream, 2335 const struct stream_hq_frame *shf) 2336{ 2337 return shf >= stream->sm_hq_frame_arr 2338 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2339 / sizeof(stream->sm_hq_frame_arr[0]) 2340 ; 2341} 2342 2343 2344static void 2345stream_hq_frame_put (struct lsquic_stream *stream, 2346 struct stream_hq_frame *shf) 2347{ 2348 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2349 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2350 if (frame_in_stream(stream, shf)) 2351 memset(shf, 0, sizeof(*shf)); 2352 else 2353 lsquic_malo_put(shf); 2354} 2355 2356 2357static void 2358stream_hq_frame_close (struct lsquic_stream *stream, 2359 struct stream_hq_frame *shf) 2360{ 2361 unsigned bits; 2362 2363 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2364 " (actual offset %"PRIu64")", shf->shf_frame_type, 2365 stream->sm_payload, stream->tosend_off); 2366 assert(shf->shf_flags & SHF_ACTIVE); 2367 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2368 { 2369 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2370 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2371 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2372 bits, 1 << bits); 2373 } 2374 stream_hq_frame_put(stream, shf); 2375} 2376 2377 2378static size_t 2379frame_hq_gen_size (void *ctx) 2380{ 2381 struct frame_gen_ctx *fg_ctx = ctx; 2382 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2383 size_t available, remaining, frames; 2384 const struct stream_hq_frame *shf; 2385 2386 frames = 0; 2387 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2388 if (shf->shf_off >= stream->sm_payload) 2389 frames += stream_hq_frame_size(shf); 2390 2391 /* Make sure we are not writing past available size: */ 2392 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2393 available = lsquic_stream_write_avail(stream); 2394 if (available < remaining) 2395 remaining = available; 2396 2397 return remaining + stream->sm_n_buffered + frames; 2398} 2399 2400 2401static int 2402frame_std_gen_fin (void *ctx) 2403{ 2404 struct frame_gen_ctx *fg_ctx = ctx; 2405 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2406 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2407 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2408 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2409 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2410} 2411 2412 2413static void 2414incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2415{ 2416 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2417 { 2418 stream->conn_pub->conn_cap.cc_sent += incr; 2419 assert(stream->conn_pub->conn_cap.cc_sent 2420 <= stream->conn_pub->conn_cap.cc_max); 2421 } 2422} 2423 2424 2425void 2426incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2427{ 2428 stream->sm_payload += incr; 2429 stream->tosend_off += incr; 2430 assert(stream->tosend_off <= stream->max_send_off); 2431} 2432 2433 2434static size_t 2435frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2436{ 2437 struct frame_gen_ctx *fg_ctx = ctx; 2438 unsigned char *p = begin_buf; 2439 unsigned char *const end = p + len; 2440 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2441 size_t n_written, available, n_to_write; 2442 2443 if (stream->sm_n_buffered > 0) 2444 { 2445 if (len <= stream->sm_n_buffered) 2446 { 2447 memcpy(p, stream->sm_buf, len); 2448 memmove(stream->sm_buf, stream->sm_buf + len, 2449 stream->sm_n_buffered - len); 2450 stream->sm_n_buffered -= len; 2451 if (0 == stream->sm_n_buffered) 2452 maybe_resize_stream_buffer(stream); 2453 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2454 incr_sm_payload(stream, len); 2455 *fin = fg_ctx->fgc_fin(fg_ctx); 2456 return len; 2457 } 2458 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2459 p += stream->sm_n_buffered; 2460 stream->sm_n_buffered = 0; 2461 maybe_resize_stream_buffer(stream); 2462 } 2463 2464 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2465 n_to_write = end - p; 2466 if (n_to_write > available) 2467 n_to_write = available; 2468 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2469 n_to_write); 2470 p += n_written; 2471 fg_ctx->fgc_nread_from_reader += n_written; 2472 *fin = fg_ctx->fgc_fin(fg_ctx); 2473 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2474 incr_conn_cap(stream, n_written); 2475 return p - (const unsigned char *) begin_buf; 2476} 2477 2478 2479static struct stream_hq_frame * 2480find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2481{ 2482 struct stream_hq_frame *shf; 2483 2484 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2485 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2486 return shf; 2487 2488 return NULL; 2489} 2490 2491 2492static struct stream_hq_frame * 2493find_cur_hq_frame (const struct lsquic_stream *stream) 2494{ 2495 return find_hq_frame(stream, stream->sm_payload); 2496} 2497 2498 2499static struct stream_hq_frame * 2500open_hq_frame (struct lsquic_stream *stream) 2501{ 2502 struct stream_hq_frame *shf; 2503 2504 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2505 + sizeof(stream->sm_hq_frame_arr) 2506 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2507 if (!(shf->shf_flags & SHF_ACTIVE)) 2508 goto found; 2509 2510 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2511 if (!shf) 2512 { 2513 LSQ_WARN("cannot allocate HQ frame"); 2514 return NULL; 2515 } 2516 memset(shf, 0, sizeof(*shf)); 2517 2518 found: 2519 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2520 shf->shf_flags = SHF_ACTIVE; 2521 return shf; 2522} 2523 2524 2525/* Returns index of the new frame */ 2526static struct stream_hq_frame * 2527stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2528 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2529{ 2530 struct stream_hq_frame *shf; 2531 2532 shf = open_hq_frame(stream); 2533 if (!shf) 2534 { 2535 LSQ_WARN("could not open HQ frame"); 2536 return NULL; 2537 } 2538 2539 shf->shf_off = off; 2540 shf->shf_flags |= flags; 2541 shf->shf_frame_type = frame_type; 2542 if (shf->shf_flags & SHF_FIXED_SIZE) 2543 { 2544 shf->shf_frame_size = size; 2545 LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset " 2546 "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size); 2547 } 2548 else 2549 { 2550 shf->shf_frame_ptr = NULL; 2551 if (size >= (1 << 6)) 2552 shf->shf_flags |= SHF_TWO_BYTES; 2553 LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset " 2554 "%"PRIu64, shf->shf_frame_type, shf->shf_off); 2555 } 2556 2557 return shf; 2558} 2559 2560 2561static size_t 2562frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2563{ 2564 struct frame_gen_ctx *fg_ctx = ctx; 2565 unsigned char *p = begin_buf; 2566 unsigned char *const end = p + len; 2567 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2568 struct stream_hq_frame *shf; 2569 size_t nw, frame_sz, avail, rem; 2570 unsigned bits; 2571 2572 while (p < end) 2573 { 2574 shf = find_cur_hq_frame(stream); 2575 if (shf) 2576 LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64, 2577 shf->shf_frame_type, shf->shf_off); 2578 else 2579 { 2580 rem = frame_std_gen_size(ctx); 2581 if (rem) 2582 { 2583 if (rem > ((1 << 14) - 1)) 2584 rem = (1 << 14) - 1; 2585 shf = stream_activate_hq_frame(stream, 2586 stream->sm_payload, HQFT_DATA, 0, rem); 2587 if (!shf) 2588 { 2589 /* TODO: abort connection? Handle failure somehow */ 2590 break; 2591 } 2592 } 2593 else 2594 break; 2595 } 2596 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 2597 if (shf->shf_off == stream->sm_payload 2598 && !(shf->shf_flags & SHF_WRITTEN)) 2599 { 2600 frame_sz = stream_hq_frame_size(shf); 2601 if (frame_sz > (uintptr_t) (end - p)) 2602 { 2603 stream_hq_frame_put(stream, shf); 2604 break; 2605 } 2606 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2607 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2608 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2609 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2610 { 2611 shf->shf_frame_ptr = p; 2612 memset(p, 0, frame_sz); 2613 p += frame_sz; 2614 } 2615 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2616 == SHF_FIXED_SIZE) 2617 { 2618 *p++ = shf->shf_frame_type; 2619 bits = vint_val2bits(shf->shf_frame_size); 2620 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2621 p += 1 << bits; 2622 } 2623 else 2624 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2625 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2626 if (!(shf->shf_flags & SHF_CC_PAID)) 2627 { 2628 incr_conn_cap(stream, frame_sz); 2629 shf->shf_flags |= SHF_CC_PAID; 2630 } 2631 shf->shf_flags |= SHF_WRITTEN; 2632 stream->tosend_off += frame_sz; 2633 assert(stream->tosend_off <= stream->max_send_off); 2634 } 2635 else 2636 { 2637 len = stream_hq_frame_end(shf) - stream->sm_payload; 2638 assert(len); 2639 if (len > (unsigned) (end - p)) 2640 len = end - p; 2641 if (len > avail) 2642 len = avail; 2643 if (!len) 2644 break; 2645 nw = frame_std_gen_read(ctx, p, len, fin); 2646 p += nw; 2647 if (nw < len) 2648 break; 2649 if (stream_hq_frame_end(shf) == stream->sm_payload) 2650 stream_hq_frame_close(stream, shf); 2651 } 2652 } 2653 2654 return p - (unsigned char *) begin_buf; 2655} 2656 2657 2658static size_t 2659crypto_frame_gen_read (void *ctx, void *buf, size_t len) 2660{ 2661 int fin_ignored; 2662 2663 return frame_std_gen_read(ctx, buf, len, &fin_ignored); 2664} 2665 2666 2667static void 2668check_flush_threshold (lsquic_stream_t *stream) 2669{ 2670 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 2671 stream->tosend_off >= stream->sm_flush_to) 2672 { 2673 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 2674 stream->sm_flush_to); 2675 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 2676 } 2677} 2678 2679 2680static int 2681write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 2682 struct lsquic_packet_out *packet_out) 2683{ 2684 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2685 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2686 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2687 unsigned off; 2688 int len, s; 2689 2690#if LSQUIC_CONN_STATS 2691 const uint64_t begin_off = stream->tosend_off; 2692#endif 2693 off = packet_out->po_data_sz; 2694 len = pf->pf_gen_stream_frame( 2695 packet_out->po_data + packet_out->po_data_sz, 2696 lsquic_packet_out_avail(packet_out), stream->id, 2697 stream->tosend_off, 2698 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 2699 if (len < 0) 2700 return len; 2701 2702#if LSQUIC_CONN_STATS 2703 stream->conn_pub->conn_stats->out.stream_frames += 1; 2704 stream->conn_pub->conn_stats->out.stream_data_sz 2705 += stream->tosend_off - begin_off; 2706#endif 2707 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 2708 packet_out->po_data + packet_out->po_data_sz, len); 2709 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2710 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 2711 if (0 == lsquic_packet_out_avail(packet_out)) 2712 packet_out->po_flags |= PO_STREAM_END; 2713 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2714 stream, QUIC_FRAME_STREAM, off, len); 2715 if (s != 0) 2716 { 2717 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 2718 return -1; 2719 } 2720 2721 check_flush_threshold(stream); 2722 return len; 2723} 2724 2725 2726static enum swtp_status 2727stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 2728{ 2729 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2730 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2731 struct lsquic_packet_out *packet_out; 2732 int len; 2733 2734 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 2735 stream->conn_pub->path); 2736 if (!packet_out) 2737 return SWTP_STOP; 2738 packet_out->po_header_type = stream->tosend_off == 0 2739 ? HETY_INITIAL : HETY_HANDSHAKE; 2740 2741 len = write_stream_frame(fg_ctx, size, packet_out); 2742 2743 if (len > 0) 2744 { 2745 packet_out->po_flags |= PO_HELLO; 2746 lsquic_packet_out_zero_pad(packet_out); 2747 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 2748 return SWTP_OK; 2749 } 2750 else 2751 return SWTP_ERROR; 2752} 2753 2754 2755static enum swtp_status 2756stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 2757{ 2758 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2759 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2760 unsigned stream_header_sz, need_at_least; 2761 struct lsquic_packet_out *packet_out; 2762 struct lsquic_stream *headers_stream; 2763 int len; 2764 2765 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 2766 == STREAM_HEADERS_SENT) 2767 { 2768 if (stream->sm_bflags & SMBF_IETF) 2769 { 2770 if (stream->stream_flags & STREAM_ENCODER_DEP) 2771 headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out; 2772 else 2773 headers_stream = NULL; 2774 } 2775 else 2776 headers_stream = 2777 lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 2778 if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream)) 2779 { 2780 LSQ_DEBUG("flushing headers stream before packetizing stream data"); 2781 (void) lsquic_stream_flush(headers_stream); 2782 } 2783 /* If there is nothing to flush, some other stream must have flushed it: 2784 * this means our headers are flushed. Either way, only do this once. 2785 */ 2786 stream->stream_flags |= STREAM_HDRS_FLUSHED; 2787 } 2788 2789 stream_header_sz = stream->sm_frame_header_sz(stream, size); 2790 need_at_least = stream_header_sz; 2791 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2792 == (SMBF_IETF|SMBF_USE_HEADERS)) 2793 { 2794 if (size > 0) 2795 need_at_least += 3; /* Enough room for HTTP/3 frame */ 2796 } 2797 else 2798 need_at_least += size > 0; 2799 get_packet: 2800 packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl, 2801 need_at_least, stream->conn_pub->path, stream); 2802 if (packet_out) 2803 { 2804 len = write_stream_frame(fg_ctx, size, packet_out); 2805 if (len > 0) 2806 return SWTP_OK; 2807 assert(len < 0); 2808 if (-len > (int) need_at_least) 2809 { 2810 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 2811 "%u bytes, will try again", -len, need_at_least); 2812 need_at_least = -len; 2813 goto get_packet; 2814 } 2815 return SWTP_ERROR; 2816 } 2817 else 2818 return SWTP_STOP; 2819} 2820 2821 2822static enum swtp_status 2823stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 2824{ 2825 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2826 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2827 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2828 unsigned crypto_header_sz, need_at_least; 2829 struct lsquic_packet_out *packet_out; 2830 unsigned short off; 2831 const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ]; 2832 int len, s; 2833 2834 assert(size > 0); 2835 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 2836 need_at_least = crypto_header_sz + 1; 2837 2838 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 2839 need_at_least, pns, stream->conn_pub->path); 2840 if (!packet_out) 2841 return SWTP_STOP; 2842 2843 off = packet_out->po_data_sz; 2844 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 2845 lsquic_packet_out_avail(packet_out), stream->tosend_off, 2846 size, crypto_frame_gen_read, fg_ctx); 2847 if (len < 0) 2848 return len; 2849 2850 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 2851 packet_out->po_data + packet_out->po_data_sz, len); 2852 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2853 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 2854 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2855 stream, QUIC_FRAME_CRYPTO, off, len); 2856 if (s != 0) 2857 { 2858 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 2859 return -1; 2860 } 2861 2862 packet_out->po_flags |= PO_HELLO; 2863 2864 check_flush_threshold(stream); 2865 return SWTP_OK; 2866} 2867 2868 2869static void 2870abort_connection (struct lsquic_stream *stream) 2871{ 2872 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 2873 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 2874 next_service_stream); 2875 stream->sm_qflags |= SMQF_ABORT_CONN; 2876 LSQ_WARN("connection will be aborted"); 2877 maybe_conn_to_tickable(stream); 2878} 2879 2880 2881static void 2882maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 2883{ 2884 struct stream_hq_frame *shf; 2885 uint64_t size; 2886 unsigned bits; 2887 2888 shf = find_cur_hq_frame(stream); 2889 if (!shf) 2890 return; 2891 2892 if (shf->shf_flags & SHF_FIXED_SIZE) 2893 { 2894 if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload) 2895 stream_hq_frame_put(stream, shf); 2896 return; 2897 } 2898 2899 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2900 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 2901 if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr) 2902 { 2903 if (0 == stream->sm_n_buffered) 2904 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 2905 shf->shf_frame_type, size); 2906 else 2907 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 2908 shf->shf_frame_type, size); 2909 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2910 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 2911 if (0 == stream->sm_n_buffered) 2912 stream_hq_frame_put(stream, shf); 2913 else 2914 { 2915 shf->shf_frame_size = size; 2916 shf->shf_flags |= SHF_FIXED_SIZE; 2917 } 2918 } 2919 else if (!shf->shf_frame_ptr) 2920 { 2921 LSQ_WARN("dangling HTTP/3 frame"); 2922 stream->conn_pub->lconn->cn_if->ci_internal_error( 2923 stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64, 2924 stream->id); 2925 stream_hq_frame_put(stream, shf); 2926 } 2927 else if (!size) 2928 { 2929 assert(!shf->shf_frame_ptr); 2930 LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")", 2931 shf->shf_frame_type, shf->shf_off); 2932 stream_hq_frame_put(stream, shf); 2933 } 2934 else 2935 { 2936 assert(stream->sm_n_buffered); 2937 LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size); 2938 /* TODO: abort connection */ 2939 stream_hq_frame_put(stream, shf); 2940 } 2941} 2942 2943 2944static ssize_t 2945stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 2946 size_t thresh) 2947{ 2948 size_t size; 2949 ssize_t nw; 2950 unsigned seen_ok; 2951 int use_framing; 2952 struct frame_gen_ctx fg_ctx = { 2953 .fgc_stream = stream, 2954 .fgc_reader = reader, 2955 .fgc_nread_from_reader = 0, 2956 }; 2957 2958 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2959 == (SMBF_IETF|SMBF_USE_HEADERS); 2960 if (use_framing) 2961 { 2962 fg_ctx.fgc_size = frame_hq_gen_size; 2963 fg_ctx.fgc_read = frame_hq_gen_read; 2964 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 2965 } 2966 else 2967 { 2968 fg_ctx.fgc_size = frame_std_gen_size; 2969 fg_ctx.fgc_read = frame_std_gen_read; 2970 fg_ctx.fgc_fin = frame_std_gen_fin; 2971 } 2972 2973 seen_ok = 0; 2974 while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0) 2975 || fg_ctx.fgc_fin(&fg_ctx)) 2976 { 2977 switch (stream->sm_write_to_packet(&fg_ctx, size)) 2978 { 2979 case SWTP_OK: 2980 if (!seen_ok++) 2981 maybe_conn_to_tickable_if_writeable(stream, 0); 2982 if (fg_ctx.fgc_fin(&fg_ctx)) 2983 { 2984 if (use_framing && seen_ok) 2985 maybe_close_varsize_hq_frame(stream); 2986 stream->stream_flags |= STREAM_FIN_SENT; 2987 goto end; 2988 } 2989 else 2990 break; 2991 case SWTP_STOP: 2992 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2993 if (use_framing && seen_ok) 2994 maybe_close_varsize_hq_frame(stream); 2995 goto end; 2996 default: 2997 abort_connection(stream); 2998 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2999 return -1; 3000 } 3001 } 3002 3003 if (use_framing && seen_ok) 3004 maybe_close_varsize_hq_frame(stream); 3005 3006 if (thresh) 3007 { 3008 assert(size < thresh); 3009 assert(size >= stream->sm_n_buffered); 3010 size -= stream->sm_n_buffered; 3011 if (size > 0) 3012 { 3013 nw = save_to_buffer(stream, reader, size); 3014 if (nw < 0) 3015 return -1; 3016 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 3017 } 3018 } 3019 else 3020 { 3021 /* We count flushed data towards both stream and connection limits, 3022 * so we should have been able to packetize all of it: 3023 */ 3024 assert(0 == stream->sm_n_buffered); 3025 assert(size == 0); 3026 } 3027 3028 maybe_mark_as_blocked(stream); 3029 3030 end: 3031 return fg_ctx.fgc_nread_from_reader; 3032} 3033 3034 3035/* Perform an implicit flush when we hit connection limit while buffering 3036 * data. This is to prevent a (theoretical) stall: 3037 * 3038 * Imagine a number of streams, all of which buffered some data. The buffered 3039 * data is up to connection cap, which means no further writes are possible. 3040 * None of them flushes, which means that data is not sent and connection 3041 * WINDOW_UPDATE frame never arrives from peer. Stall. 3042 */ 3043static int 3044maybe_flush_stream (struct lsquic_stream *stream) 3045{ 3046 if (stream->sm_n_buffered > 0 3047 && (stream->sm_bflags & SMBF_CONN_LIMITED) 3048 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 3049 return stream_flush_nocheck(stream); 3050 else 3051 return 0; 3052} 3053 3054 3055static int 3056stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 3057 unsigned len) 3058{ 3059 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 3060 && cur_off - shf->shf_off < (1 << 6) 3061 && cur_off - shf->shf_off + len >= (1 << 6) 3062 ; 3063} 3064 3065 3066/* Update currently buffered HQ frame or create a new one, if possible. 3067 * Return update length to be buffered. If a HQ frame cannot be 3068 * buffered due to size, 0 is returned, thereby preventing both HQ frame 3069 * creation and buffering. 3070 */ 3071static size_t 3072update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 3073 size_t avail) 3074{ 3075 struct stream_hq_frame *shf; 3076 uint64_t cur_off, end; 3077 size_t frame_sz; 3078 unsigned extendable; 3079 3080 cur_off = stream->sm_payload + stream->sm_n_buffered; 3081 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3082 if (shf->shf_off <= cur_off) 3083 { 3084 end = stream_hq_frame_end(shf); 3085 extendable = stream_hq_frame_extendable(shf, cur_off, len); 3086 if (cur_off < end + extendable) 3087 break; 3088 } 3089 3090 if (shf) 3091 { 3092 if (len > end + extendable - cur_off) 3093 len = end + extendable - cur_off; 3094 frame_sz = stream_hq_frame_size(shf); 3095 } 3096 else 3097 { 3098 assert(avail >= 3); 3099 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 3100 /* XXX malloc can fail */ 3101 if (len > stream_hq_frame_end(shf) - cur_off) 3102 len = stream_hq_frame_end(shf) - cur_off; 3103 extendable = 0; 3104 frame_sz = stream_hq_frame_size(shf); 3105 if (avail < frame_sz) 3106 return 0; 3107 avail -= frame_sz; 3108 } 3109 3110 if (!(shf->shf_flags & SHF_CC_PAID)) 3111 { 3112 incr_conn_cap(stream, frame_sz); 3113 shf->shf_flags |= SHF_CC_PAID; 3114 } 3115 if (extendable) 3116 { 3117 shf->shf_flags |= SHF_TWO_BYTES; 3118 incr_conn_cap(stream, 1); 3119 avail -= 1; 3120 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3121 && shf->shf_off <= stream->sm_payload 3122 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3123 stream->sm_flush_to += 1; 3124 } 3125 3126 if (len <= avail) 3127 return len; 3128 else 3129 return avail; 3130} 3131 3132 3133static ssize_t 3134save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3135 size_t len) 3136{ 3137 size_t avail, n_written, n_allowed; 3138 3139 avail = lsquic_stream_write_avail(stream); 3140 if (avail < len) 3141 len = avail; 3142 if (len == 0) 3143 { 3144 LSQ_DEBUG("zero-byte write (avail: %zu)", avail); 3145 return 0; 3146 } 3147 3148 n_allowed = stream_get_n_allowed(stream); 3149 assert(stream->sm_n_buffered + len <= n_allowed); 3150 3151 if (!stream->sm_buf) 3152 { 3153 stream->sm_buf = malloc(n_allowed); 3154 if (!stream->sm_buf) 3155 return -1; 3156 stream->sm_n_allocated = n_allowed; 3157 } 3158 3159 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3160 == (SMBF_IETF|SMBF_USE_HEADERS)) 3161 len = update_buffered_hq_frames(stream, len, avail); 3162 3163 n_written = reader->lsqr_read(reader->lsqr_ctx, 3164 stream->sm_buf + stream->sm_n_buffered, len); 3165 stream->sm_n_buffered += n_written; 3166 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3167 incr_conn_cap(stream, n_written); 3168 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3169 n_written, stream->sm_n_buffered); 3170 if (0 != maybe_flush_stream(stream)) 3171 return -1; 3172 return n_written; 3173} 3174 3175 3176static ssize_t 3177stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 3178{ 3179 const struct stream_hq_frame *shf; 3180 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3181 ssize_t nw; 3182 3183 len = reader->lsqr_size(reader->lsqr_ctx); 3184 if (len == 0) 3185 return 0; 3186 3187 frames = 0; 3188 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3189 == (SMBF_IETF|SMBF_USE_HEADERS)) 3190 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3191 if (shf->shf_off >= stream->sm_payload) 3192 frames += stream_hq_frame_size(shf); 3193 total_len = len + frames + stream->sm_n_buffered; 3194 thresh = lsquic_stream_flush_threshold(stream, total_len); 3195 n_allowed = stream_get_n_allowed(stream); 3196 if (total_len <= n_allowed && total_len < thresh) 3197 { 3198 nwritten = 0; 3199 do 3200 { 3201 nw = save_to_buffer(stream, reader, len - nwritten); 3202 if (nw > 0) 3203 nwritten += (size_t) nw; 3204 else if (nw == 0) 3205 break; 3206 else 3207 return nw; 3208 } 3209 while (nwritten < len 3210 && stream->sm_n_buffered < stream->sm_n_allocated); 3211 return nwritten; 3212 } 3213 else 3214 return stream_write_to_packets(stream, reader, thresh); 3215} 3216 3217 3218ssize_t 3219lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3220{ 3221 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3222 return lsquic_stream_writev(stream, &iov, 1); 3223} 3224 3225 3226struct inner_reader_iovec { 3227 const struct iovec *iov; 3228 const struct iovec *end; 3229 unsigned cur_iovec_off; 3230}; 3231 3232 3233static size_t 3234inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3235{ 3236 struct inner_reader_iovec *const iro = ctx; 3237 unsigned char *p = buf; 3238 unsigned char *const end = p + count; 3239 unsigned n_tocopy; 3240 3241 while (iro->iov < iro->end && p < end) 3242 { 3243 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3244 if (n_tocopy > (unsigned) (end - p)) 3245 n_tocopy = end - p; 3246 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3247 n_tocopy); 3248 p += n_tocopy; 3249 iro->cur_iovec_off += n_tocopy; 3250 if (iro->iov->iov_len == iro->cur_iovec_off) 3251 { 3252 ++iro->iov; 3253 iro->cur_iovec_off = 0; 3254 } 3255 } 3256 3257 return p + count - end; 3258} 3259 3260 3261static size_t 3262inner_reader_iovec_size (void *ctx) 3263{ 3264 struct inner_reader_iovec *const iro = ctx; 3265 const struct iovec *iov; 3266 size_t size; 3267 3268 size = 0; 3269 for (iov = iro->iov; iov < iro->end; ++iov) 3270 size += iov->iov_len; 3271 3272 return size - iro->cur_iovec_off; 3273} 3274 3275 3276ssize_t 3277lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3278 int iovcnt) 3279{ 3280 COMMON_WRITE_CHECKS(); 3281 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3282 3283 struct inner_reader_iovec iro = { 3284 .iov = iov, 3285 .end = iov + iovcnt, 3286 .cur_iovec_off = 0, 3287 }; 3288 struct lsquic_reader reader = { 3289 .lsqr_read = inner_reader_iovec_read, 3290 .lsqr_size = inner_reader_iovec_size, 3291 .lsqr_ctx = &iro, 3292 }; 3293 3294 return stream_write(stream, &reader); 3295} 3296 3297 3298ssize_t 3299lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3300{ 3301 COMMON_WRITE_CHECKS(); 3302 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3303 return stream_write(stream, reader); 3304} 3305 3306 3307/* This bypasses COMMON_WRITE_CHECKS */ 3308static ssize_t 3309stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3310{ 3311 struct iovec iov = { (void *) buf, sz, }; 3312 struct inner_reader_iovec iro = { 3313 .iov = &iov, 3314 .end = &iov + 1, 3315 .cur_iovec_off = 0, 3316 }; 3317 struct lsquic_reader reader = { 3318 .lsqr_read = inner_reader_iovec_read, 3319 .lsqr_size = inner_reader_iovec_size, 3320 .lsqr_ctx = &iro, 3321 }; 3322 return stream_write(stream, &reader); 3323} 3324 3325 3326/* XXX Move this define elsewhere? */ 3327#define MAX_HEADERS_SIZE (64 * 1024) 3328 3329static int 3330send_headers_ietf (struct lsquic_stream *stream, 3331 const struct lsquic_http_headers *headers, int eos) 3332{ 3333 enum qwh_status qwh; 3334 const size_t max_prefix_size = 3335 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 3336 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 3337 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 3338 unsigned bits; 3339 ssize_t nw; 3340 unsigned char *header_block; 3341 enum lsqpack_enc_header_flags hflags; 3342 unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE]; 3343 3344 stream->stream_flags &= ~STREAM_PUSHING; 3345 stream->stream_flags |= STREAM_NOPUSH; 3346 3347 /* TODO: Optimize for the common case: write directly to sm_buf and fall 3348 * back to a larger buffer if that fails. 3349 */ 3350 prefix_sz = max_prefix_size; 3351 headers_sz = sizeof(buf) - max_prefix_size - max_push_size; 3352 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 3353 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 3354 &headers_sz, &stream->sm_hb_compl, &hflags); 3355 3356 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 3357 { 3358 if (qwh == QWH_ENOBUF) 3359 LSQ_INFO("not enough room for header block"); 3360 else 3361 LSQ_WARN("internal error encoding and sending HTTP headers"); 3362 return -1; 3363 } 3364 3365 if (hflags & LSQECH_REF_NEW_ENTRIES) 3366 stream->stream_flags |= STREAM_ENCODER_DEP; 3367 3368 if (stream->sm_promise) 3369 { 3370 assert(lsquic_stream_is_pushed(stream)); 3371 bits = vint_val2bits(stream->sm_promise->pp_id); 3372 push_sz = 1 + (1 << bits); 3373 if (!stream_activate_hq_frame(stream, 3374 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 3375 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 3376 return -1; 3377 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 3378 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 3379 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 3380 } 3381 else 3382 push_sz = 0; 3383 3384 /* Construct contiguous header block buffer including HQ framing */ 3385 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 3386 hblock_sz = push_sz + prefix_sz + headers_sz; 3387 if (!stream_activate_hq_frame(stream, 3388 stream->sm_payload + stream->sm_n_buffered + push_sz, 3389 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 3390 return -1; 3391 3392 if (qwh == QWH_FULL) 3393 { 3394 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 3395 if (lsquic_stream_write_avail(stream)) 3396 { 3397 nw = stream_write_buf(stream, header_block, hblock_sz); 3398 if (nw < 0) 3399 { 3400 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 3401 return -1; 3402 } 3403 if ((size_t) nw == hblock_sz) 3404 { 3405 stream->stream_flags |= STREAM_HEADERS_SENT; 3406 stream_hblock_sent(stream); 3407 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 3408 return 0; 3409 } 3410 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 3411 } 3412 else 3413 { 3414 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 3415 "header block", hblock_sz); 3416 nw = 0; 3417 } 3418 } 3419 else 3420 { 3421 stream->sm_send_headers_state = SSHS_ENC_SENDING; 3422 nw = 0; 3423 } 3424 3425 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 3426 stream_wantwrite(stream, 1); 3427 3428 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 3429 if (!stream->sm_header_block) 3430 { 3431 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 3432 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 3433 return -1; 3434 } 3435 memcpy(stream->sm_header_block, header_block + (size_t) nw, 3436 hblock_sz - (size_t) nw); 3437 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 3438 stream->sm_hblock_off = 0; 3439 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 3440 return 0; 3441} 3442 3443 3444static int 3445send_headers_gquic (struct lsquic_stream *stream, 3446 const struct lsquic_http_headers *headers, int eos) 3447{ 3448 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 3449 stream->id, headers, eos, lsquic_stream_priority(stream)); 3450 if (0 == s) 3451 { 3452 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 3453 stream->stream_flags |= STREAM_HEADERS_SENT; 3454 if (eos) 3455 stream->stream_flags |= STREAM_FIN_SENT; 3456 LSQ_INFO("sent headers"); 3457 } 3458 else 3459 LSQ_WARN("could not send headers: %s", strerror(errno)); 3460 return s; 3461} 3462 3463 3464int 3465lsquic_stream_send_headers (lsquic_stream_t *stream, 3466 const lsquic_http_headers_t *headers, int eos) 3467{ 3468 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3469 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 3470 { 3471 if (stream->sm_bflags & SMBF_IETF) 3472 return send_headers_ietf(stream, headers, eos); 3473 else 3474 return send_headers_gquic(stream, headers, eos); 3475 } 3476 else 3477 { 3478 LSQ_INFO("cannot send headers in this state"); 3479 errno = EBADMSG; 3480 return -1; 3481 } 3482} 3483 3484 3485void 3486lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 3487{ 3488 if (offset > stream->max_send_off) 3489 { 3490 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 3491 LSQ_DEBUG("update max send offset from 0x%"PRIX64" to " 3492 "0x%"PRIX64, stream->max_send_off, offset); 3493 stream->max_send_off = offset; 3494 } 3495 else 3496 LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old " 3497 "max send offset 0x%"PRIX64", ignoring", offset, 3498 stream->max_send_off); 3499} 3500 3501 3502/* This function is used to update offsets after handshake completes and we 3503 * learn of peer's limits from the handshake values. 3504 */ 3505int 3506lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 3507{ 3508 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 3509 if (offset > stream->max_send_off) 3510 { 3511 lsquic_stream_window_update(stream, offset); 3512 return 0; 3513 } 3514 else if (offset < stream->tosend_off) 3515 { 3516 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 3517 "data already sent on this stream (%"PRIu64" bytes)", offset, 3518 stream->tosend_off); 3519 return -1; 3520 } 3521 else 3522 { 3523 stream->max_send_off = offset; 3524 return 0; 3525 } 3526} 3527 3528 3529void 3530lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 3531{ 3532 lsquic_stream_reset_ext(stream, error_code, 1); 3533} 3534 3535 3536void 3537lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 3538 int do_close) 3539{ 3540 if ((stream->stream_flags & STREAM_RST_SENT) 3541 || (stream->sm_qflags & SMQF_SEND_RST)) 3542 { 3543 LSQ_INFO("reset already sent"); 3544 return; 3545 } 3546 3547 SM_HISTORY_APPEND(stream, SHE_RESET); 3548 3549 LSQ_INFO("reset, error code %"PRIu64, error_code); 3550 stream->error_code = error_code; 3551 3552 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 3553 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 3554 next_send_stream); 3555 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 3556 stream->sm_qflags |= SMQF_SEND_RST; 3557 3558 if (stream->sm_qflags & SMQF_QPACK_DEC) 3559 { 3560 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 3561 stream->sm_qflags |= ~SMQF_QPACK_DEC; 3562 } 3563 3564 drop_buffered_data(stream); 3565 maybe_elide_stream_frames(stream); 3566 maybe_schedule_call_on_close(stream); 3567 3568 if (do_close) 3569 lsquic_stream_close(stream); 3570 else 3571 maybe_conn_to_tickable_if_writeable(stream, 1); 3572} 3573 3574 3575lsquic_stream_id_t 3576lsquic_stream_id (const lsquic_stream_t *stream) 3577{ 3578 return stream->id; 3579} 3580 3581 3582#if !defined(NDEBUG) && __GNUC__ 3583__attribute__((weak)) 3584#endif 3585struct lsquic_conn * 3586lsquic_stream_conn (const lsquic_stream_t *stream) 3587{ 3588 return stream->conn_pub->lconn; 3589} 3590 3591 3592int 3593lsquic_stream_close (lsquic_stream_t *stream) 3594{ 3595 LSQ_DEBUG("lsquic_stream_close() called"); 3596 SM_HISTORY_APPEND(stream, SHE_CLOSE); 3597 if (lsquic_stream_is_closed(stream)) 3598 { 3599 LSQ_INFO("Attempt to close an already-closed stream"); 3600 errno = EBADF; 3601 return -1; 3602 } 3603 maybe_stream_shutdown_write(stream); 3604 stream_shutdown_read(stream); 3605 maybe_schedule_call_on_close(stream); 3606 maybe_finish_stream(stream); 3607 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 3608 maybe_conn_to_tickable_if_writeable(stream, 1); 3609 return 0; 3610} 3611 3612 3613#ifndef NDEBUG 3614#if __GNUC__ 3615__attribute__((weak)) 3616#endif 3617#endif 3618void 3619lsquic_stream_acked (struct lsquic_stream *stream, 3620 enum quic_frame_type frame_type) 3621{ 3622 assert(stream->n_unacked); 3623 --stream->n_unacked; 3624 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 3625 if (frame_type == QUIC_FRAME_RST_STREAM) 3626 { 3627 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 3628 LSQ_DEBUG("RESET that we sent has been acked by peer"); 3629 stream->stream_flags |= STREAM_RST_ACKED; 3630 } 3631 if (0 == stream->n_unacked) 3632 maybe_finish_stream(stream); 3633} 3634 3635 3636void 3637lsquic_stream_push_req (lsquic_stream_t *stream, 3638 struct uncompressed_headers *push_req) 3639{ 3640 assert(!stream->push_req); 3641 stream->push_req = push_req; 3642 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 3643} 3644 3645 3646int 3647lsquic_stream_is_pushed (const lsquic_stream_t *stream) 3648{ 3649 enum stream_id_type sit; 3650 3651 if (stream->sm_bflags & SMBF_IETF) 3652 { 3653 sit = stream->id & SIT_MASK; 3654 return sit == SIT_UNI_SERVER; 3655 } 3656 else 3657 return 1 & ~stream->id; 3658} 3659 3660 3661int 3662lsquic_stream_push_info (const lsquic_stream_t *stream, 3663 lsquic_stream_id_t *ref_stream_id, void **hset) 3664{ 3665 if (lsquic_stream_is_pushed(stream)) 3666 { 3667 assert(stream->push_req); 3668 *ref_stream_id = stream->push_req->uh_stream_id; 3669 *hset = stream->push_req->uh_hset; 3670 return 0; 3671 } 3672 else 3673 return -1; 3674} 3675 3676 3677int 3678lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 3679{ 3680 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3681 && !(stream->stream_flags & STREAM_HAVE_UH)) 3682 { 3683 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3684 LSQ_DEBUG("received uncompressed headers"); 3685 stream->stream_flags |= STREAM_HAVE_UH; 3686 if (uh->uh_flags & UH_FIN) 3687 { 3688 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3689 * mark request as done: 3690 */ 3691 if (stream->sm_bflags & SMBF_IETF) 3692 assert((stream->sm_bflags & SMBF_SERVER) 3693 && lsquic_stream_is_pushed(stream)); 3694 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3695 } 3696 stream->uh = uh; 3697 if (uh->uh_oth_stream_id == 0) 3698 { 3699 if (uh->uh_weight) 3700 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3701 } 3702 else 3703 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3704 uh->uh_oth_stream_id); 3705 return 0; 3706 } 3707 else 3708 { 3709 LSQ_ERROR("received unexpected uncompressed headers"); 3710 return -1; 3711 } 3712} 3713 3714 3715unsigned 3716lsquic_stream_priority (const lsquic_stream_t *stream) 3717{ 3718 return 256 - stream->sm_priority; 3719} 3720 3721 3722int 3723lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 3724{ 3725 /* The user should never get a reference to the special streams, 3726 * but let's check just in case: 3727 */ 3728 if (lsquic_stream_is_critical(stream)) 3729 return -1; 3730 if (priority < 1 || priority > 256) 3731 return -1; 3732 stream->sm_priority = 256 - priority; 3733 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 3734 LSQ_DEBUG("set priority to %u", priority); 3735 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 3736 return 0; 3737} 3738 3739 3740static int 3741maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 3742{ 3743 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3744 && (stream->stream_flags & STREAM_HEADERS_SENT)) 3745 { 3746 /* We need to send headers only if we are a) using HEADERS stream 3747 * and b) we already sent initial headers. If initial headers 3748 * have not been sent yet, stream priority will be sent in the 3749 * HEADERS frame. 3750 */ 3751 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 3752 stream->id, 0, 0, priority); 3753 } 3754 else 3755 return 0; 3756} 3757 3758 3759static int 3760send_priority_ietf (struct lsquic_stream *stream, unsigned priority) 3761{ 3762 LSQ_WARN("%s: TODO", __func__); /* TODO */ 3763 return -1; 3764} 3765 3766 3767int 3768lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 3769{ 3770 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 3771 { 3772 if (stream->sm_bflags & SMBF_IETF) 3773 return send_priority_ietf(stream, priority); 3774 else 3775 return maybe_send_priority_gquic(stream, priority); 3776 } 3777 else 3778 return -1; 3779} 3780 3781 3782lsquic_stream_ctx_t * 3783lsquic_stream_get_ctx (const lsquic_stream_t *stream) 3784{ 3785 return stream->st_ctx; 3786} 3787 3788 3789int 3790lsquic_stream_refuse_push (lsquic_stream_t *stream) 3791{ 3792 if (lsquic_stream_is_pushed(stream) 3793 && !(stream->sm_qflags & SMQF_SEND_RST) 3794 && !(stream->stream_flags & STREAM_RST_SENT)) 3795 { 3796 LSQ_DEBUG("refusing pushed stream: send reset"); 3797 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 3798 return 0; 3799 } 3800 else 3801 return -1; 3802} 3803 3804 3805size_t 3806lsquic_stream_mem_used (const struct lsquic_stream *stream) 3807{ 3808 size_t size; 3809 3810 size = sizeof(stream); 3811 if (stream->sm_buf) 3812 size += stream->sm_n_allocated; 3813 if (stream->data_in) 3814 size += stream->data_in->di_if->di_mem_used(stream->data_in); 3815 3816 return size; 3817} 3818 3819 3820const lsquic_cid_t * 3821lsquic_stream_cid (const struct lsquic_stream *stream) 3822{ 3823 return LSQUIC_LOG_CONN_ID; 3824} 3825 3826 3827void 3828lsquic_stream_dump_state (const struct lsquic_stream *stream) 3829{ 3830 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 3831 stream->read_offset); 3832 stream->data_in->di_if->di_dump_state(stream->data_in); 3833} 3834 3835 3836void * 3837lsquic_stream_get_hset (struct lsquic_stream *stream) 3838{ 3839 void *hset; 3840 3841 if (lsquic_stream_is_reset(stream)) 3842 { 3843 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 3844 errno = ECONNRESET; 3845 return NULL; 3846 } 3847 3848 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 3849 && (stream->stream_flags & STREAM_HAVE_UH))) 3850 { 3851 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 3852 stream->stream_flags); 3853 return NULL; 3854 } 3855 3856 if (!stream->uh) 3857 { 3858 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 3859 return NULL; 3860 } 3861 3862 if (stream->uh->uh_flags & UH_H1H) 3863 { 3864 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 3865 return NULL; 3866 } 3867 3868 hset = stream->uh->uh_hset; 3869 stream->uh->uh_hset = NULL; 3870 destroy_uh(stream); 3871 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 3872 { 3873 stream->stream_flags |= STREAM_FIN_REACHED; 3874 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 3875 } 3876 LSQ_DEBUG("return header set"); 3877 return hset; 3878} 3879 3880 3881/* GQUIC-only function */ 3882int 3883lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id) 3884{ 3885 return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE 3886 || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http); 3887} 3888 3889 3890int 3891lsquic_stream_is_critical (const struct lsquic_stream *stream) 3892{ 3893 return stream->sm_bflags & SMBF_CRITICAL; 3894} 3895 3896 3897void 3898lsquic_stream_set_stream_if (struct lsquic_stream *stream, 3899 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 3900{ 3901 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 3902 stream->stream_if = stream_if; 3903 stream->sm_onnew_arg = stream_if_ctx; 3904 LSQ_DEBUG("switched interface"); 3905 assert(stream->stream_flags & STREAM_ONNEW_DONE); 3906 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 3907 stream); 3908} 3909 3910 3911static int 3912update_type_hist_and_check (struct hq_filter *filter) 3913{ 3914 /* 3-bit codes: */ 3915 enum { 3916 CODE_UNSET, 3917 CODE_HEADER, /* H Header */ 3918 CODE_DATA, /* D Data */ 3919 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 3920 }; 3921 static const unsigned valid_seqs[] = { 3922 /* Ordered by expected frequency */ 3923 0123, /* HD+ */ 3924 012, /* HD */ 3925 01, /* H */ 3926 01231, /* HD+H */ 3927 0121, /* HDH */ 3928 }; 3929 unsigned code, i; 3930 3931 switch (filter->hqfi_type) 3932 { 3933 case HQFT_HEADERS: 3934 code = CODE_HEADER; 3935 break; 3936 case HQFT_DATA: 3937 code = CODE_DATA; 3938 break; 3939 default: 3940 /* Ignore unknown frames */ 3941 return 0; 3942 } 3943 3944 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 3945 return -1; 3946 3947 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 3948 { 3949 filter->hqfi_hist_buf <<= 3; 3950 filter->hqfi_hist_buf |= CODE_PLUS; 3951 filter->hqfi_hist_idx++; 3952 } 3953 else if (filter->hqfi_hist_idx > 1 3954 && ((filter->hqfi_hist_buf >> 3) & 7) == code 3955 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 3956 /* Keep it at plus, do nothing */; 3957 else 3958 { 3959 filter->hqfi_hist_buf <<= 3; 3960 filter->hqfi_hist_buf |= code; 3961 filter->hqfi_hist_idx++; 3962 } 3963 3964 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 3965 if (filter->hqfi_hist_buf == valid_seqs[i]) 3966 return 0; 3967 3968 return -1; 3969} 3970 3971 3972static size_t 3973hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 3974{ 3975 struct lsquic_stream *const stream = ctx; 3976 struct hq_filter *const filter = &stream->sm_hq_filter; 3977 const unsigned char *p = buf, *prev; 3978 const unsigned char *const end = buf + sz; 3979 struct lsquic_conn *lconn; 3980 enum lsqpack_read_header_status rhs; 3981 int s; 3982 3983 while (p < end) 3984 { 3985 switch (filter->hqfi_state) 3986 { 3987 case HQFI_STATE_FRAME_HEADER_BEGIN: 3988 filter->hqfi_vint_state.vr2s_state = 0; 3989 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 3990 /* fall-through */ 3991 case HQFI_STATE_FRAME_HEADER_CONTINUE: 3992 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state); 3993 if (s < 0) 3994 break; 3995 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 3996 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 3997 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 3998 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 3999 filter->hqfi_left); 4000 if (0 != update_type_hist_and_check(filter)) 4001 { 4002 lconn = stream->conn_pub->lconn; 4003 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4004 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 4005 filter->hqfi_hist_buf); 4006 lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED, 4007 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 4008 stream->id); 4009 goto end; 4010 } 4011 if (filter->hqfi_type == HQFT_HEADERS) 4012 { 4013 if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS)) 4014 filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS; 4015 else 4016 { 4017 filter->hqfi_type = (1ull << 62) - 1; 4018 LSQ_DEBUG("Ignoring HEADERS frame"); 4019 } 4020 } 4021 if (filter->hqfi_left > 0) 4022 { 4023 if (filter->hqfi_type == HQFT_DATA) 4024 goto end; 4025 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 4026 { 4027 lconn = stream->conn_pub->lconn; 4028 if (stream->sm_bflags & SMBF_SERVER) 4029 lconn->cn_if->ci_abort_error(lconn, 1, 4030 HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame " 4031 "on stream %"PRIu64" (clients are not supposed to " 4032 "send those)", stream->id); 4033 else 4034 /* Our client implementation does not support server 4035 * push. 4036 */ 4037 lconn->cn_if->ci_abort_error(lconn, 1, 4038 HEC_FRAME_UNEXPECTED, 4039 "Received PUSH_PROMISE frame (not supported)" 4040 "on stream %"PRIu64, stream->id); 4041 goto end; 4042 } 4043 } 4044 else 4045 { 4046 switch (filter->hqfi_type) 4047 { 4048 case HQFT_CANCEL_PUSH: 4049 case HQFT_GOAWAY: 4050 case HQFT_HEADERS: 4051 case HQFT_MAX_PUSH_ID: 4052 case HQFT_PUSH_PROMISE: 4053 case HQFT_SETTINGS: 4054 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4055 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 4056 filter->hqfi_type); 4057 abort_connection(stream); /* XXX Overkill? */ 4058 goto end; 4059 default: 4060 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4061 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4062 break; 4063 } 4064 } 4065 break; 4066 case HQFI_STATE_READING_PAYLOAD: 4067 if (filter->hqfi_type == HQFT_DATA) 4068 goto end; 4069 sz = filter->hqfi_left; 4070 if (sz > (uintptr_t) (end - p)) 4071 sz = (uintptr_t) (end - p); 4072 switch (filter->hqfi_type) 4073 { 4074 case HQFT_HEADERS: 4075 prev = p; 4076 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 4077 { 4078 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4079 rhs = lsquic_qdh_header_in_begin( 4080 stream->conn_pub->u.ietf.qdh, 4081 stream, filter->hqfi_left, &p, sz); 4082 } 4083 else 4084 rhs = lsquic_qdh_header_in_continue( 4085 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 4086 assert(p > prev || LQRHS_ERROR == rhs); 4087 filter->hqfi_left -= p - prev; 4088 if (filter->hqfi_left == 0) 4089 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4090 switch (rhs) 4091 { 4092 case LQRHS_DONE: 4093 assert(filter->hqfi_left == 0); 4094 stream->sm_qflags &= ~SMQF_QPACK_DEC; 4095 break; 4096 case LQRHS_NEED: 4097 stream->sm_qflags |= SMQF_QPACK_DEC; 4098 break; 4099 case LQRHS_BLOCKED: 4100 stream->sm_qflags |= SMQF_QPACK_DEC; 4101 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 4102 goto end; 4103 default: 4104 assert(LQRHS_ERROR == rhs); 4105 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4106 LSQ_INFO("error processing header block"); 4107 abort_connection(stream); /* XXX Overkill? */ 4108 goto end; 4109 } 4110 break; 4111 default: 4112 /* Simply skip unknown frame type payload for now */ 4113 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 4114 p += sz; 4115 filter->hqfi_left -= sz; 4116 if (filter->hqfi_left == 0) 4117 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4118 break; 4119 } 4120 break; 4121 default: 4122 assert(0); 4123 goto end; 4124 } 4125 } 4126 4127 end: 4128 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4129 { 4130 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4131 filter->hqfi_state); 4132 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4133/* From [draft-ietf-quic-http-16] Section 3.1: 4134 * When a stream terminates cleanly, if the last frame on 4135 * the stream was truncated, this MUST be treated as a connection error 4136 * (see HTTP_MALFORMED_FRAME in Section 8.1). 4137 */ 4138 abort_connection(stream); 4139 } 4140 4141 return p - buf; 4142} 4143 4144 4145static int 4146hq_filter_readable_now (const struct lsquic_stream *stream) 4147{ 4148 const struct hq_filter *const filter = &stream->sm_hq_filter; 4149 4150 return (filter->hqfi_type == HQFT_DATA 4151 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4152 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4153 || stream->uh 4154 || (stream->stream_flags & STREAM_FIN_REACHED) 4155 ; 4156} 4157 4158 4159static int 4160hq_filter_readable (struct lsquic_stream *stream) 4161{ 4162 struct hq_filter *const filter = &stream->sm_hq_filter; 4163 struct read_frames_status rfs; 4164 4165 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 4166 return 0; 4167 4168 if (!hq_filter_readable_now(stream)) 4169 { 4170 rfs = read_data_frames(stream, 0, hq_read, stream); 4171 if (rfs.total_nread == 0) 4172 { 4173 if (rfs.error) 4174 { 4175 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4176 abort_connection(stream); /* XXX Overkill? */ 4177 return 1; /* Collect error */ 4178 } 4179 return 0; 4180 } 4181 } 4182 4183 return hq_filter_readable_now(stream); 4184} 4185 4186 4187static size_t 4188hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 4189{ 4190 struct hq_filter *const filter = &stream->sm_hq_filter; 4191 size_t nr; 4192 4193 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4194 && filter->hqfi_type == HQFT_DATA)) 4195 { 4196 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 4197 data_frame->df_size - data_frame->df_read_off, 4198 data_frame->df_fin); 4199 if (nr) 4200 { 4201 stream->read_offset += nr; 4202 stream_consumed_bytes(stream); 4203 } 4204 } 4205 else 4206 nr = 0; 4207 4208 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 4209 { 4210 data_frame->df_read_off += nr; 4211 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4212 && filter->hqfi_type == HQFT_DATA) 4213 return MIN(filter->hqfi_left, 4214 (unsigned) data_frame->df_size - data_frame->df_read_off); 4215 else 4216 { 4217 assert(data_frame->df_read_off == data_frame->df_size); 4218 return 0; 4219 } 4220 } 4221 else 4222 { 4223 data_frame->df_read_off = data_frame->df_size; 4224 return 0; 4225 } 4226} 4227 4228 4229static void 4230hq_decr_left (struct lsquic_stream *stream, size_t read) 4231{ 4232 struct hq_filter *const filter = &stream->sm_hq_filter; 4233 4234 if (read) 4235 { 4236 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4237 && filter->hqfi_type == HQFT_DATA); 4238 assert(read <= filter->hqfi_left); 4239 } 4240 4241 filter->hqfi_left -= read; 4242 if (0 == filter->hqfi_left) 4243 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4244} 4245 4246 4247struct qpack_dec_hdl * 4248lsquic_stream_get_qdh (const struct lsquic_stream *stream) 4249{ 4250 return stream->conn_pub->u.ietf.qdh; 4251} 4252 4253 4254/* These are IETF QUIC states */ 4255enum stream_state_sending 4256lsquic_stream_sending_state (const struct lsquic_stream *stream) 4257{ 4258 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 4259 { 4260 if (stream->stream_flags & STREAM_FIN_SENT) 4261 { 4262 if (stream->n_unacked) 4263 return SSS_DATA_SENT; 4264 else 4265 return SSS_DATA_RECVD; 4266 } 4267 else 4268 { 4269 if (stream->tosend_off 4270 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 4271 return SSS_SEND; 4272 else 4273 return SSS_READY; 4274 } 4275 } 4276 else if (stream->stream_flags & STREAM_RST_ACKED) 4277 return SSS_RESET_RECVD; 4278 else 4279 return SSS_RESET_SENT; 4280} 4281 4282 4283const char *const lsquic_sss2str[] = 4284{ 4285 [SSS_READY] = "Ready", 4286 [SSS_SEND] = "Send", 4287 [SSS_DATA_SENT] = "Data Sent", 4288 [SSS_RESET_SENT] = "Reset Sent", 4289 [SSS_DATA_RECVD] = "Data Recvd", 4290 [SSS_RESET_RECVD] = "Reset Recvd", 4291}; 4292 4293 4294const char *const lsquic_ssr2str[] = 4295{ 4296 [SSR_RECV] = "Recv", 4297 [SSR_SIZE_KNOWN] = "Size Known", 4298 [SSR_DATA_RECVD] = "Data Recvd", 4299 [SSR_RESET_RECVD] = "Reset Recvd", 4300 [SSR_DATA_READ] = "Data Read", 4301 [SSR_RESET_READ] = "Reset Read", 4302}; 4303 4304 4305/* These are IETF QUIC states */ 4306enum stream_state_receiving 4307lsquic_stream_receiving_state (struct lsquic_stream *stream) 4308{ 4309 uint64_t n_bytes; 4310 4311 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 4312 { 4313 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 4314 return SSR_RECV; 4315 if (stream->stream_flags & STREAM_FIN_REACHED) 4316 return SSR_DATA_READ; 4317 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 4318 { 4319 n_bytes = stream->data_in->di_if->di_readable_bytes( 4320 stream->data_in, stream->read_offset); 4321 if (stream->read_offset + n_bytes == stream->sm_fin_off) 4322 { 4323 stream->stream_flags |= STREAM_DATA_RECVD; 4324 return SSR_DATA_RECVD; 4325 } 4326 else 4327 return SSR_SIZE_KNOWN; 4328 } 4329 else 4330 return SSR_DATA_RECVD; 4331 } 4332 else if (stream->stream_flags & STREAM_RST_READ) 4333 return SSR_RESET_READ; 4334 else 4335 return SSR_RESET_RECVD; 4336} 4337 4338 4339void 4340lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 4341{ 4342 struct hq_filter *const filter = &stream->sm_hq_filter; 4343 4344 assert(stream->sm_qflags & SMQF_QPACK_DEC); 4345 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 4346 4347 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 4348 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 4349 LSQ_DEBUG("QPACK decoder unblocked"); 4350} 4351 4352 4353int 4354lsquic_stream_is_rejected (const struct lsquic_stream *stream) 4355{ 4356 return stream->stream_flags & STREAM_SS_RECVD; 4357} 4358 4359 4360int 4361lsquic_stream_can_push (const struct lsquic_stream *stream) 4362{ 4363 if (lsquic_stream_is_pushed(stream)) 4364 return 0; 4365 else if (stream->sm_bflags & SMBF_IETF) 4366 return (stream->sm_bflags & SMBF_USE_HEADERS) 4367 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 4368 && stream->sm_send_headers_state == SSHS_BEGIN 4369 ; 4370 else 4371 return 1; 4372} 4373 4374 4375static size_t 4376dp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4377{ 4378 struct lsquic_stream *const stream = lsqr_ctx; 4379 unsigned char *dst = buf; 4380 unsigned char *const end = buf + count; 4381 size_t len; 4382 4383 len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off), 4384 (size_t) (end - dst)); 4385 memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len); 4386 stream->sm_dup_push_off += len; 4387 4388 if (stream->sm_dup_push_len == stream->sm_dup_push_off) 4389 LSQ_DEBUG("finish writing duplicate push"); 4390 4391 return len; 4392} 4393 4394 4395static size_t 4396dp_reader_size (void *lsqr_ctx) 4397{ 4398 struct lsquic_stream *const stream = lsqr_ctx; 4399 4400 return stream->sm_dup_push_len - stream->sm_dup_push_off; 4401} 4402 4403 4404static void 4405init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader) 4406{ 4407 reader->lsqr_read = dp_reader_read; 4408 reader->lsqr_size = dp_reader_size; 4409 reader->lsqr_ctx = stream; 4410} 4411 4412 4413static void 4414on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4415{ 4416 struct lsquic_reader dp_reader; 4417 ssize_t nw; 4418 int want_write; 4419 4420 assert(stream->sm_dup_push_off < stream->sm_dup_push_len); 4421 4422 init_dp_reader(stream, &dp_reader); 4423 nw = stream_write(stream, &dp_reader); 4424 if (nw > 0) 4425 { 4426 LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)", 4427 nw, stream->sm_dup_push_off == stream->sm_dup_push_len ? 4428 "done" : "not done"); 4429 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4430 { 4431 /* Restore want_write flag */ 4432 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4433 if (want_write != stream->sm_saved_want_write) 4434 (void) lsquic_stream_wantwrite(stream, 4435 stream->sm_saved_want_write); 4436 } 4437 } 4438 else if (nw < 0) 4439 { 4440 LSQ_WARN("could not write duplicate push (wrapper)"); 4441 /* XXX What should happen if we hit an error? TODO */ 4442 } 4443} 4444 4445 4446int 4447lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id) 4448{ 4449 struct lsquic_reader dp_reader; 4450 unsigned bits, len; 4451 ssize_t nw; 4452 4453 assert(stream->sm_bflags & SMBF_IETF); 4454 assert(lsquic_stream_can_push(stream)); 4455 4456 bits = vint_val2bits(push_id); 4457 len = 1 << bits; 4458 4459 if (!stream_activate_hq_frame(stream, 4460 stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH, 4461 SHF_FIXED_SIZE, len)) 4462 return -1; 4463 4464 stream->stream_flags |= STREAM_PUSHING; 4465 4466 stream->sm_dup_push_len = len; 4467 stream->sm_dup_push_off = 0; 4468 vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits); 4469 4470 init_dp_reader(stream, &dp_reader); 4471 nw = stream_write(stream, &dp_reader); 4472 if (nw > 0) 4473 { 4474 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4475 LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id); 4476 else 4477 { 4478 LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id); 4479 stream->stream_flags |= STREAM_NOPUSH; 4480 stream->sm_saved_want_write = 4481 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4482 stream_wantwrite(stream, 1); 4483 } 4484 return 0; 4485 } 4486 else 4487 { 4488 if (nw < 0) 4489 LSQ_WARN("failure writing DUPLICATE_PUSH"); 4490 stream->stream_flags |= STREAM_NOPUSH; 4491 stream->stream_flags &= ~STREAM_PUSHING; 4492 return -1; 4493 } 4494} 4495 4496 4497static size_t 4498pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4499{ 4500 struct push_promise *const promise = lsqr_ctx; 4501 unsigned char *dst = buf; 4502 unsigned char *const end = buf + count; 4503 size_t len; 4504 4505 while (dst < end) 4506 { 4507 switch (promise->pp_write_state) 4508 { 4509 case PPWS_ID0: 4510 case PPWS_ID1: 4511 case PPWS_ID2: 4512 case PPWS_ID3: 4513 case PPWS_ID4: 4514 case PPWS_ID5: 4515 case PPWS_ID6: 4516 case PPWS_ID7: 4517 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 4518 ++promise->pp_write_state; 4519 break; 4520 case PPWS_PFX0: 4521 *dst++ = 0; 4522 ++promise->pp_write_state; 4523 break; 4524 case PPWS_PFX1: 4525 *dst++ = 0; 4526 ++promise->pp_write_state; 4527 break; 4528 case PPWS_HBLOCK: 4529 len = MIN(promise->pp_content_len - promise->pp_write_off, 4530 (size_t) (end - dst)); 4531 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 4532 len); 4533 promise->pp_write_off += len; 4534 dst += len; 4535 if (promise->pp_content_len == promise->pp_write_off) 4536 { 4537 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 4538 ": reset push state", promise->pp_id); 4539 promise->pp_write_state = PPWS_DONE; 4540 } 4541 goto end; 4542 default: 4543 goto end; 4544 } 4545 } 4546 4547 end: 4548 return dst - (unsigned char *) buf; 4549} 4550 4551 4552static size_t 4553pp_reader_size (void *lsqr_ctx) 4554{ 4555 struct push_promise *const promise = lsqr_ctx; 4556 size_t size; 4557 4558 size = 0; 4559 switch (promise->pp_write_state) 4560 { 4561 case PPWS_ID0: 4562 case PPWS_ID1: 4563 case PPWS_ID2: 4564 case PPWS_ID3: 4565 case PPWS_ID4: 4566 case PPWS_ID5: 4567 case PPWS_ID6: 4568 case PPWS_ID7: 4569 size += 8 - promise->pp_write_state; 4570 case PPWS_PFX0: 4571 ++size; 4572 /* fall-through */ 4573 case PPWS_PFX1: 4574 ++size; 4575 /* fall-through */ 4576 case PPWS_HBLOCK: 4577 size += promise->pp_content_len - promise->pp_write_off; 4578 break; 4579 default: 4580 break; 4581 } 4582 4583 return size; 4584} 4585 4586 4587static void 4588init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 4589{ 4590 reader->lsqr_read = pp_reader_read; 4591 reader->lsqr_size = pp_reader_size; 4592 reader->lsqr_ctx = promise; 4593} 4594 4595 4596static void 4597on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4598{ 4599 struct lsquic_reader pp_reader; 4600 struct push_promise *promise; 4601 ssize_t nw; 4602 int want_write; 4603 4604 assert(stream_is_pushing_promise(stream)); 4605 4606 promise = SLIST_FIRST(&stream->sm_promises); 4607 init_pp_reader(promise, &pp_reader); 4608 nw = stream_write(stream, &pp_reader); 4609 if (nw > 0) 4610 { 4611 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 4612 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 4613 if (promise->pp_write_state == PPWS_DONE) 4614 { 4615 /* Restore want_write flag */ 4616 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4617 if (want_write != stream->sm_saved_want_write) 4618 (void) lsquic_stream_wantwrite(stream, 4619 stream->sm_saved_want_write); 4620 } 4621 } 4622 else if (nw < 0) 4623 { 4624 LSQ_WARN("could not write push promise (wrapper)"); 4625 /* XXX What should happen if we hit an error? TODO */ 4626 } 4627} 4628 4629 4630/* Success means that the push promise has been placed on sm_promises list and 4631 * the stream now owns it. Failure means that the push promise should be 4632 * destroyed by the caller. 4633 * 4634 * A push promise is written immediately. If it cannot be written to packets 4635 * or buffered whole, the stream is marked as unable to push further promises. 4636 */ 4637int 4638lsquic_stream_push_promise (struct lsquic_stream *stream, 4639 struct push_promise *promise) 4640{ 4641 struct lsquic_reader pp_reader; 4642 unsigned bits, len; 4643 ssize_t nw; 4644 4645 assert(stream->sm_bflags & SMBF_IETF); 4646 assert(lsquic_stream_can_push(stream)); 4647 4648 bits = vint_val2bits(promise->pp_id); 4649 len = 1 << bits; 4650 promise->pp_write_state = 8 - len; 4651 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 4652 bits, 1 << bits); 4653 4654 if (!stream_activate_hq_frame(stream, 4655 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 4656 SHF_FIXED_SIZE, pp_reader_size(promise))) 4657 return -1; 4658 4659 stream->stream_flags |= STREAM_PUSHING; 4660 4661 init_pp_reader(promise, &pp_reader); 4662 nw = stream_write(stream, &pp_reader); 4663 if (nw > 0) 4664 { 4665 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 4666 ++promise->pp_refcnt; 4667 if (promise->pp_write_state == PPWS_DONE) 4668 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 4669 else 4670 { 4671 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 4672 ", disable further pushing", promise->pp_id, 4673 promise->pp_write_state, promise->pp_write_off); 4674 stream->stream_flags |= STREAM_NOPUSH; 4675 stream->sm_saved_want_write = 4676 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4677 stream_wantwrite(stream, 1); 4678 } 4679 return 0; 4680 } 4681 else 4682 { 4683 if (nw < 0) 4684 LSQ_WARN("failure writing push promise"); 4685 stream->stream_flags |= STREAM_NOPUSH; 4686 stream->stream_flags &= ~STREAM_PUSHING; 4687 return -1; 4688 } 4689} 4690