lsquic_stream.c revision d539a752
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_conn.h" 44#include "lsquic_data_in_if.h" 45#include "lsquic_parse.h" 46#include "lsquic_packet_out.h" 47#include "lsquic_engine_public.h" 48#include "lsquic_senhist.h" 49#include "lsquic_pacer.h" 50#include "lsquic_cubic.h" 51#include "lsquic_send_ctl.h" 52#include "lsquic_headers.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152 153# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 154# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 155 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 156 LSQ_DEBUG("history: [%.*s]", \ 157 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 158 (stream)->sm_hist_buf); \ 159 } while (0) 160#else 161# define SM_HISTORY_APPEND(stream, event) 162# define SM_HISTORY_DUMP_REMAINING(stream) 163#endif 164 165 166static int 167stream_inside_callback (const lsquic_stream_t *stream) 168{ 169 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 170} 171 172 173static void 174maybe_conn_to_tickable (lsquic_stream_t *stream) 175{ 176 if (!stream_inside_callback(stream)) 177 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 178 stream->conn_pub->lconn); 179} 180 181 182/* Here, "readable" means that the user is able to read from the stream. */ 183static void 184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 185{ 186 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 187 { 188 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 189 stream->conn_pub->lconn); 190 } 191} 192 193 194/* Here, "writeable" means that data can be put into packets to be 195 * scheduled to be sent out. 196 * 197 * If `check_can_send' is false, it means that we do not need to check 198 * whether packets can be sent. This check was already performed when 199 * we packetized stream data. 200 */ 201static void 202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 203 int check_can_send) 204{ 205 if (!stream_inside_callback(stream) && 206 (!check_can_send 207 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 208 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 209 { 210 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 211 stream->conn_pub->lconn); 212 } 213} 214 215 216static int 217stream_stalled (const lsquic_stream_t *stream) 218{ 219 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 220 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 221 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 222} 223 224 225/* TODO: The logic to figure out whether the stream is connection limited 226 * should be taken out of the constructor. The caller should specify this 227 * via one of enum stream_ctor_flags. 228 */ 229lsquic_stream_t * 230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 231 const struct lsquic_stream_if *stream_if, 232 void *stream_if_ctx, unsigned initial_window, 233 unsigned initial_send_off, 234 enum stream_ctor_flags ctor_flags) 235{ 236 lsquic_cfcw_t *cfcw; 237 lsquic_stream_t *stream; 238 239 stream = calloc(1, sizeof(*stream)); 240 if (!stream) 241 return NULL; 242 243 stream->stream_if = stream_if; 244 stream->id = id; 245 stream->conn_pub = conn_pub; 246 stream->sm_onnew_arg = stream_if_ctx; 247 if (!initial_window) 248 initial_window = 16 * 1024; 249 if (LSQUIC_STREAM_HANDSHAKE == id || 250 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 251 cfcw = NULL; 252 else 253 { 254 cfcw = &conn_pub->cfcw; 255 stream->stream_flags |= STREAM_CONN_LIMITED; 256 if (conn_pub->hs) 257 stream->stream_flags |= STREAM_USE_HEADERS; 258 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 259 } 260 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 261 if (!initial_send_off) 262 initial_send_off = 16 * 1024; 263 stream->max_send_off = initial_send_off; 264 if (ctor_flags & SCF_USE_DI_HASH) 265 stream->data_in = data_in_hash_new(conn_pub, id, 0); 266 else 267 stream->data_in = data_in_nocopy_new(conn_pub, id); 268 LSQ_DEBUG("created stream %u @%p", id, stream); 269 SM_HISTORY_APPEND(stream, SHE_CREATED); 270 if (ctor_flags & SCF_DI_AUTOSWITCH) 271 stream->stream_flags |= STREAM_AUTOSWITCH; 272 if (ctor_flags & SCF_CALL_ON_NEW) 273 lsquic_stream_call_on_new(stream); 274 if (ctor_flags & SCF_DISP_RW_ONCE) 275 stream->stream_flags |= STREAM_RW_ONCE; 276 if (ctor_flags & SCF_CRITICAL) 277 stream->stream_flags |= STREAM_CRITICAL; 278 return stream; 279} 280 281 282void 283lsquic_stream_call_on_new (lsquic_stream_t *stream) 284{ 285 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 286 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 287 { 288 LSQ_DEBUG("calling on_new_stream"); 289 SM_HISTORY_APPEND(stream, SHE_ONNEW); 290 stream->stream_flags |= STREAM_ONNEW_DONE; 291 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 292 stream); 293 } 294} 295 296 297static void 298decr_conn_cap (struct lsquic_stream *stream, size_t incr) 299{ 300 if (stream->stream_flags & STREAM_CONN_LIMITED) 301 { 302 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 303 stream->conn_pub->conn_cap.cc_sent -= incr; 304 } 305} 306 307 308static void 309drop_buffered_data (struct lsquic_stream *stream) 310{ 311 decr_conn_cap(stream, stream->sm_n_buffered); 312 stream->sm_n_buffered = 0; 313 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 314 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 315} 316 317 318static void 319destroy_uh (struct lsquic_stream *stream) 320{ 321 if (stream->uh) 322 { 323 if (stream->uh->uh_hset) 324 stream->conn_pub->enpub->enp_hsi_if 325 ->hsi_discard_header_set(stream->uh->uh_hset); 326 free(stream->uh); 327 stream->uh = NULL; 328 } 329} 330 331 332void 333lsquic_stream_destroy (lsquic_stream_t *stream) 334{ 335 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 336 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 337 STREAM_ONNEW_DONE) 338 { 339 stream->stream_flags |= STREAM_ONCLOSE_DONE; 340 stream->stream_if->on_close(stream, stream->st_ctx); 341 } 342 if (stream->stream_flags & STREAM_SENDING_FLAGS) 343 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 344 if (stream->stream_flags & STREAM_WANT_READ) 345 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 346 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 347 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 348 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 349 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 350 drop_buffered_data(stream); 351 lsquic_sfcw_consume_rem(&stream->fc); 352 drop_frames_in(stream); 353 if (stream->push_req) 354 { 355 if (stream->push_req->uh_hset) 356 stream->conn_pub->enpub->enp_hsi_if 357 ->hsi_discard_header_set(stream->push_req->uh_hset); 358 free(stream->push_req); 359 } 360 destroy_uh(stream); 361 free(stream->sm_buf); 362 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 363 SM_HISTORY_DUMP_REMAINING(stream); 364 free(stream); 365} 366 367 368static int 369stream_is_finished (const lsquic_stream_t *stream) 370{ 371 return lsquic_stream_is_closed(stream) 372 /* n_unacked checks that no outgoing packets that reference this 373 * stream are outstanding: 374 */ 375 && 0 == stream->n_unacked 376 /* This checks that no packets that reference this stream will 377 * become outstanding: 378 */ 379 && 0 == (stream->stream_flags & STREAM_SEND_RST) 380 && ((stream->stream_flags & STREAM_FORCE_FINISH) 381 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 382} 383 384 385static void 386maybe_finish_stream (lsquic_stream_t *stream) 387{ 388 if (0 == (stream->stream_flags & STREAM_FINISHED) && 389 stream_is_finished(stream)) 390 { 391 LSQ_DEBUG("stream %u is now finished", stream->id); 392 SM_HISTORY_APPEND(stream, SHE_FINISHED); 393 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 394 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 395 next_service_stream); 396 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 397 } 398} 399 400 401static void 402maybe_schedule_call_on_close (lsquic_stream_t *stream) 403{ 404 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 405 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 406 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 407 { 408 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 409 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 410 next_service_stream); 411 stream->stream_flags |= STREAM_CALL_ONCLOSE; 412 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 413 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 414 } 415} 416 417 418void 419lsquic_stream_call_on_close (lsquic_stream_t *stream) 420{ 421 assert(stream->stream_flags & STREAM_ONNEW_DONE); 422 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 423 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 424 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 425 next_service_stream); 426 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 427 { 428 LSQ_DEBUG("calling on_close for stream %u", stream->id); 429 stream->stream_flags |= STREAM_ONCLOSE_DONE; 430 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 431 stream->stream_if->on_close(stream, stream->st_ctx); 432 } 433 else 434 assert(0); 435} 436 437 438int 439lsquic_stream_readable (const lsquic_stream_t *stream) 440{ 441 /* A stream is readable if one of the following is true: */ 442 return 443 /* - It is already finished: in that case, lsquic_stream_read() will 444 * return 0. 445 */ 446 (stream->stream_flags & STREAM_FIN_REACHED) 447 /* - The stream is reset, by either side. In this case, 448 * lsquic_stream_read() will return -1 (we want the user to be 449 * able to collect the error). 450 */ 451 || (stream->stream_flags & STREAM_RST_FLAGS) 452 /* - Either we are not in HTTP mode or the HTTP headers have been 453 * received and the headers or data from the stream can be read. 454 */ 455 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 456 == STREAM_USE_HEADERS) 457 && (stream->uh != NULL 458 || stream->data_in->di_if->di_get_frame(stream->data_in, 459 stream->read_offset))) 460 ; 461} 462 463 464size_t 465lsquic_stream_write_avail (const struct lsquic_stream *stream) 466{ 467 uint64_t stream_avail, conn_avail; 468 469 stream_avail = stream->max_send_off - stream->tosend_off 470 - stream->sm_n_buffered; 471 if (stream->stream_flags & STREAM_CONN_LIMITED) 472 { 473 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 474 if (conn_avail < stream_avail) 475 return conn_avail; 476 } 477 478 return stream_avail; 479} 480 481 482int 483lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 484{ 485 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 486 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 487 { 488 return -1; 489 } 490 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 491 { 492 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 493 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 494 next_send_stream); 495 stream->stream_flags |= STREAM_SEND_WUF; 496 } 497 return 0; 498} 499 500 501int 502lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 503{ 504 uint64_t max_off; 505 int got_next_offset; 506 enum ins_frame ins_frame; 507 508 assert(frame->packet_in); 509 510 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 511 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 512 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 513 514 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 515 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 516 { 517 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 518 lsquic_malo_put(frame); 519 return -1; 520 } 521 522 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 523 insert_frame: 524 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 525 if (INS_FRAME_OK == ins_frame) 526 { 527 /* Update maximum offset in the flow controller and check for flow 528 * control violation: 529 */ 530 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 531 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 532 return -1; 533 if (frame->data_frame.df_fin) 534 { 535 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 536 stream->stream_flags |= STREAM_FIN_RECVD; 537 maybe_finish_stream(stream); 538 } 539 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 540 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 541 { 542 stream->data_in = stream->data_in->di_if->di_switch_impl( 543 stream->data_in, stream->read_offset); 544 if (!stream->data_in) 545 { 546 stream->data_in = data_in_error_new(); 547 return -1; 548 } 549 } 550 if (got_next_offset) 551 /* Checking the offset saves di_get_frame() call */ 552 maybe_conn_to_tickable_if_readable(stream); 553 return 0; 554 } 555 else if (INS_FRAME_DUP == ins_frame) 556 { 557 return 0; 558 } 559 else if (INS_FRAME_OVERLAP == ins_frame) 560 { 561 LSQ_DEBUG("overlap: switching DATA IN implementation"); 562 stream->data_in = stream->data_in->di_if->di_switch_impl( 563 stream->data_in, stream->read_offset); 564 if (stream->data_in) 565 goto insert_frame; 566 stream->data_in = data_in_error_new(); 567 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 568 lsquic_malo_put(frame); 569 return -1; 570 } 571 else 572 { 573 assert(INS_FRAME_ERR == ins_frame); 574 return -1; 575 } 576} 577 578 579static void 580drop_frames_in (lsquic_stream_t *stream) 581{ 582 if (stream->data_in) 583 { 584 stream->data_in->di_if->di_destroy(stream->data_in); 585 /* To avoid checking whether `data_in` is set, just set to the error 586 * data-in stream. It does the right thing after incoming data is 587 * dropped. 588 */ 589 stream->data_in = data_in_error_new(); 590 } 591} 592 593 594static void 595maybe_elide_stream_frames (struct lsquic_stream *stream) 596{ 597 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 598 { 599 if (stream->n_unacked) 600 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 601 stream->id); 602 stream->stream_flags |= STREAM_FRAMES_ELIDED; 603 } 604} 605 606 607int 608lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 609 uint32_t error_code) 610{ 611 612 if (stream->stream_flags & STREAM_RST_RECVD) 613 { 614 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 615 return 0; 616 } 617 618 SM_HISTORY_APPEND(stream, SHE_RST_IN); 619 /* This flag must always be set, even if we are "ignoring" it: it is 620 * used by elision code. 621 */ 622 stream->stream_flags |= STREAM_RST_RECVD; 623 624 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 625 { 626 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 627 "smaller than that of byte following the last byte we have seen: " 628 "0x%"PRIX64, stream->id, offset, 629 lsquic_sfcw_get_max_recv_off(&stream->fc)); 630 return -1; 631 } 632 633 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 634 { 635 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 636 " violates flow control", stream->id, offset); 637 return -1; 638 } 639 640 /* Let user collect error: */ 641 maybe_conn_to_tickable_if_readable(stream); 642 643 lsquic_sfcw_consume_rem(&stream->fc); 644 drop_frames_in(stream); 645 drop_buffered_data(stream); 646 maybe_elide_stream_frames(stream); 647 648 if (!(stream->stream_flags & 649 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 650 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 651 652 stream->stream_flags |= STREAM_RST_RECVD; 653 654 maybe_finish_stream(stream); 655 maybe_schedule_call_on_close(stream); 656 657 return 0; 658} 659 660 661uint64_t 662lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 663{ 664 assert(stream->stream_flags & STREAM_SEND_WUF); 665 stream->stream_flags &= ~STREAM_SEND_WUF; 666 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 667 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 668 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 669} 670 671 672void 673lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 674{ 675 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 676 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 677 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 678 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 679 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 680} 681 682 683void 684lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 685{ 686 assert(stream->stream_flags & STREAM_SEND_RST); 687 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 688 stream->stream_flags &= ~STREAM_SEND_RST; 689 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 690 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 691 stream->stream_flags |= STREAM_RST_SENT; 692 maybe_finish_stream(stream); 693} 694 695 696static size_t 697read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 698{ 699 struct http1x_headers *h1h = stream->uh->uh_hset; 700 size_t n_avail = h1h->h1h_size - h1h->h1h_off; 701 if (n_avail < len) 702 len = n_avail; 703 memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len); 704 h1h->h1h_off += len; 705 if (h1h->h1h_off == h1h->h1h_size) 706 { 707 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 708 destroy_uh(stream); 709 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 710 { 711 stream->stream_flags |= STREAM_FIN_REACHED; 712 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 713 } 714 } 715 return len; 716} 717 718 719/* This function returns 0 when EOF is reached. 720 */ 721ssize_t 722lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 723 int iovcnt) 724{ 725 size_t total_nread, nread; 726 int processed_frames, read_unc_headers, iovidx; 727 unsigned char *p, *end; 728 729 SM_HISTORY_APPEND(stream, SHE_USER_READ); 730 731#define NEXT_IOV() do { \ 732 ++iovidx; \ 733 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 734 ++iovidx; \ 735 if (iovidx < iovcnt) \ 736 { \ 737 p = iov[iovidx].iov_base; \ 738 end = p + iov[iovidx].iov_len; \ 739 } \ 740 else \ 741 p = end = NULL; \ 742} while (0) 743 744#define AVAIL() (end - p) 745 746 if (stream->stream_flags & STREAM_RST_FLAGS) 747 { 748 errno = ECONNRESET; 749 return -1; 750 } 751 if (stream->stream_flags & STREAM_U_READ_DONE) 752 { 753 errno = EBADF; 754 return -1; 755 } 756 if (stream->stream_flags & STREAM_FIN_REACHED) 757 return 0; 758 759 total_nread = 0; 760 processed_frames = 0; 761 762 iovidx = -1; 763 NEXT_IOV(); 764 765 if (stream->uh) 766 { 767 if (stream->uh->uh_flags & UH_H1H) 768 { 769 if (AVAIL()) 770 { 771 read_unc_headers = 1; 772 do 773 { 774 nread = read_uh(stream, p, AVAIL()); 775 p += nread; 776 total_nread += nread; 777 if (p == end) 778 NEXT_IOV(); 779 } 780 while (stream->uh && AVAIL()); 781 } 782 else 783 read_unc_headers = 0; 784 } 785 else 786 { 787 LSQ_INFO("header set not claimed: cannot read from stream"); 788 return -1; 789 } 790 } 791 else 792 read_unc_headers = 0; 793 794 struct data_frame *data_frame; 795 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 796 { 797 ++processed_frames; 798 size_t navail = data_frame->df_size - data_frame->df_read_off; 799 size_t ntowrite = AVAIL(); 800 if (navail < ntowrite) 801 ntowrite = navail; 802 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 803 p += ntowrite; 804 data_frame->df_read_off += ntowrite; 805 stream->read_offset += ntowrite; 806 total_nread += ntowrite; 807 if (data_frame->df_read_off == data_frame->df_size) 808 { 809 const int fin = data_frame->df_fin; 810 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 811 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 812 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 813 { 814 stream->data_in = stream->data_in->di_if->di_switch_impl( 815 stream->data_in, stream->read_offset); 816 if (!stream->data_in) 817 { 818 stream->data_in = data_in_error_new(); 819 return -1; 820 } 821 } 822 if (fin) 823 { 824 stream->stream_flags |= STREAM_FIN_REACHED; 825 break; 826 } 827 } 828 if (p == end) 829 NEXT_IOV(); 830 } 831 832 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 833 total_nread, stream->read_offset); 834 835 if (processed_frames) 836 { 837 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 838 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 839 { 840 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 841 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 842 stream->stream_flags |= STREAM_SEND_WUF; 843 maybe_conn_to_tickable_if_writeable(stream, 1); 844 } 845 } 846 847 if (processed_frames || read_unc_headers) 848 { 849 return total_nread; 850 } 851 else 852 { 853 assert(0 == total_nread); 854 errno = EWOULDBLOCK; 855 return -1; 856 } 857} 858 859 860ssize_t 861lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 862{ 863 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 864 return lsquic_stream_readv(stream, &iov, 1); 865} 866 867 868static void 869stream_shutdown_read (lsquic_stream_t *stream) 870{ 871 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 872 { 873 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 874 stream->stream_flags |= STREAM_U_READ_DONE; 875 stream_wantread(stream, 0); 876 maybe_finish_stream(stream); 877 } 878} 879 880 881static void 882stream_shutdown_write (lsquic_stream_t *stream) 883{ 884 if (stream->stream_flags & STREAM_U_WRITE_DONE) 885 return; 886 887 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 888 stream->stream_flags |= STREAM_U_WRITE_DONE; 889 stream_wantwrite(stream, 0); 890 891 /* Don't bother to check whether there is anything else to write if 892 * the flags indicate that nothing else should be written. 893 */ 894 if (!(stream->stream_flags & 895 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 896 { 897 if (stream->sm_n_buffered == 0) 898 { 899 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 900 stream)) 901 { 902 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 903 stream->stream_flags |= STREAM_FIN_SENT; 904 } 905 else 906 { 907 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 908 "flag in it"); 909 (void) stream_flush_nocheck(stream); 910 } 911 } 912 else 913 (void) stream_flush_nocheck(stream); 914 } 915} 916 917 918int 919lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 920{ 921 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 922 if (lsquic_stream_is_closed(stream)) 923 { 924 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 925 errno = EBADF; 926 return -1; 927 } 928 /* 0: read, 1: write: 2: read and write 929 */ 930 if (how < 0 || how > 2) 931 { 932 errno = EINVAL; 933 return -1; 934 } 935 936 if (how) 937 stream_shutdown_write(stream); 938 if (how != 1) 939 stream_shutdown_read(stream); 940 941 maybe_finish_stream(stream); 942 maybe_schedule_call_on_close(stream); 943 if (how) 944 maybe_conn_to_tickable_if_writeable(stream, 1); 945 946 return 0; 947} 948 949 950void 951lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 952{ 953 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 954 if (LSQUIC_STREAM_HANDSHAKE == stream->id 955 || ((stream->stream_flags & STREAM_USE_HEADERS) && 956 LSQUIC_STREAM_HEADERS == stream->id)) 957 { 958 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 959 stream->stream_flags |= STREAM_FORCE_FINISH; 960 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 961 } 962 maybe_finish_stream(stream); 963 maybe_schedule_call_on_close(stream); 964} 965 966 967static void 968fake_reset_unused_stream (lsquic_stream_t *stream) 969{ 970 stream->stream_flags |= 971 STREAM_RST_RECVD /* User will pick this up on read or write */ 972 | STREAM_RST_SENT /* Don't send anything else on this stream */ 973 ; 974 975 /* Cancel all writes to the network scheduled for this stream: */ 976 if (stream->stream_flags & STREAM_SENDING_FLAGS) 977 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 978 next_send_stream); 979 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 980 981 LSQ_DEBUG("fake-reset stream %u%s", 982 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 983 maybe_finish_stream(stream); 984 maybe_schedule_call_on_close(stream); 985} 986 987 988/* This function should only be called for locally-initiated streams whose ID 989 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 990 * frame sent by peer but we have not yet received it and created a stream. 991 * In this situation, we mark the stream as reset, so that user's on_read or 992 * on_write event callback picks up the error. That, in turn, should result 993 * in stream being closed. 994 * 995 * If we have received any data frames on this stream, this probably indicates 996 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 997 * lower than this. However, we still try to handle it gracefully and peform 998 * a shutdown, as if the stream was not reset. 999 */ 1000void 1001lsquic_stream_received_goaway (lsquic_stream_t *stream) 1002{ 1003 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1004 if (0 == stream->read_offset && 1005 stream->data_in->di_if->di_empty(stream->data_in)) 1006 fake_reset_unused_stream(stream); /* Normal condition */ 1007 else 1008 { /* This is odd, let's handle it the best we can: */ 1009 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1010 lsquic_stream_shutdown_internal(stream); 1011 } 1012} 1013 1014 1015uint64_t 1016lsquic_stream_read_offset (const lsquic_stream_t *stream) 1017{ 1018 return stream->read_offset; 1019} 1020 1021 1022static int 1023stream_wantread (lsquic_stream_t *stream, int is_want) 1024{ 1025 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 1026 const int new_val = !!is_want; 1027 if (old_val != new_val) 1028 { 1029 if (new_val) 1030 { 1031 if (!old_val) 1032 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1033 next_read_stream); 1034 stream->stream_flags |= STREAM_WANT_READ; 1035 } 1036 else 1037 { 1038 stream->stream_flags &= ~STREAM_WANT_READ; 1039 if (old_val) 1040 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1041 next_read_stream); 1042 } 1043 } 1044 return old_val; 1045} 1046 1047 1048static void 1049maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1050{ 1051 assert(STREAM_WRITE_Q_FLAGS & flag); 1052 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1053 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1054 next_write_stream); 1055 stream->stream_flags |= flag; 1056} 1057 1058 1059static void 1060maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1061{ 1062 assert(STREAM_WRITE_Q_FLAGS & flag); 1063 if (stream->stream_flags & flag) 1064 { 1065 stream->stream_flags &= ~flag; 1066 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1067 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1068 next_write_stream); 1069 } 1070} 1071 1072 1073static int 1074stream_wantwrite (lsquic_stream_t *stream, int is_want) 1075{ 1076 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1077 const int new_val = !!is_want; 1078 if (old_val != new_val) 1079 { 1080 if (new_val) 1081 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1082 else 1083 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1084 } 1085 return old_val; 1086} 1087 1088 1089int 1090lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1091{ 1092 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1093 { 1094 if (is_want) 1095 maybe_conn_to_tickable_if_readable(stream); 1096 return stream_wantread(stream, is_want); 1097 } 1098 else 1099 { 1100 errno = EBADF; 1101 return -1; 1102 } 1103} 1104 1105 1106int 1107lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1108{ 1109 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1110 { 1111 if (is_want) 1112 maybe_conn_to_tickable_if_writeable(stream, 1); 1113 return stream_wantwrite(stream, is_want); 1114 } 1115 else 1116 { 1117 errno = EBADF; 1118 return -1; 1119 } 1120} 1121 1122 1123#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1124 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1125 1126 1127static void 1128stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1129{ 1130 unsigned no_progress_count, no_progress_limit; 1131 enum stream_flags flags; 1132 uint64_t size; 1133 1134 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1135 1136 no_progress_count = 0; 1137 while ((stream->stream_flags & STREAM_WANT_READ) 1138 && lsquic_stream_readable(stream)) 1139 { 1140 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1141 size = stream->read_offset; 1142 1143 stream->stream_if->on_read(stream, stream->st_ctx); 1144 1145 if (no_progress_limit && size == stream->read_offset && 1146 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1147 { 1148 ++no_progress_count; 1149 if (no_progress_count >= no_progress_limit) 1150 { 1151 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1152 "progress) in user code reading from stream", 1153 no_progress_count, 1154 no_progress_count == 1 ? "" : "s"); 1155 break; 1156 } 1157 } 1158 else 1159 no_progress_count = 0; 1160 } 1161} 1162 1163 1164static void 1165stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1166{ 1167 unsigned no_progress_count, no_progress_limit; 1168 enum stream_flags flags; 1169 1170 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1171 1172 no_progress_count = 0; 1173 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1174 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1175 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1176 && lsquic_stream_write_avail(stream)) 1177 { 1178 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1179 1180 stream->stream_if->on_write(stream, stream->st_ctx); 1181 1182 if (no_progress_limit && 1183 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1184 { 1185 ++no_progress_count; 1186 if (no_progress_count >= no_progress_limit) 1187 { 1188 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1189 "progress) in user code writing to stream", 1190 no_progress_count, 1191 no_progress_count == 1 ? "" : "s"); 1192 break; 1193 } 1194 } 1195 else 1196 no_progress_count = 0; 1197 } 1198} 1199 1200 1201static void 1202stream_dispatch_read_events_once (lsquic_stream_t *stream) 1203{ 1204 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1205 { 1206 stream->stream_if->on_read(stream, stream->st_ctx); 1207 } 1208} 1209 1210 1211static void 1212maybe_mark_as_blocked (lsquic_stream_t *stream) 1213{ 1214 struct lsquic_conn_cap *cc; 1215 1216 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1217 { 1218 if (stream->blocked_off < stream->max_send_off) 1219 { 1220 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1221 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1222 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1223 next_send_stream); 1224 stream->stream_flags |= STREAM_SEND_BLOCKED; 1225 LSQ_DEBUG("marked stream-blocked at stream offset " 1226 "%"PRIu64, stream->blocked_off); 1227 } 1228 else 1229 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1230 " has been, or is about to be, sent", stream->blocked_off); 1231 } 1232 1233 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1234 && (cc = &stream->conn_pub->conn_cap, 1235 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1236 { 1237 if (cc->cc_blocked < cc->cc_max) 1238 { 1239 cc->cc_blocked = cc->cc_max; 1240 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1241 LSQ_DEBUG("marked connection-blocked at connection offset " 1242 "%"PRIu64, cc->cc_max); 1243 } 1244 else 1245 LSQ_DEBUG("stream has already been marked connection-blocked " 1246 "at offset %"PRIu64, cc->cc_blocked); 1247 } 1248} 1249 1250 1251void 1252lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1253{ 1254 assert(stream->stream_flags & STREAM_WANT_READ); 1255 1256 if (stream->stream_flags & STREAM_RW_ONCE) 1257 stream_dispatch_read_events_once(stream); 1258 else 1259 stream_dispatch_read_events_loop(stream); 1260} 1261 1262 1263void 1264lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1265{ 1266 int progress; 1267 uint64_t tosend_off; 1268 unsigned short n_buffered; 1269 enum stream_flags flags; 1270 1271 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1272 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1273 tosend_off = stream->tosend_off; 1274 n_buffered = stream->sm_n_buffered; 1275 1276 if (stream->stream_flags & STREAM_WANT_FLUSH) 1277 (void) stream_flush(stream); 1278 1279 if (stream->stream_flags & STREAM_RW_ONCE) 1280 { 1281 if ((stream->stream_flags & STREAM_WANT_WRITE) 1282 && lsquic_stream_write_avail(stream)) 1283 { 1284 stream->stream_if->on_write(stream, stream->st_ctx); 1285 } 1286 } 1287 else 1288 stream_dispatch_write_events_loop(stream); 1289 1290 /* Progress means either flags or offsets changed: */ 1291 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1292 stream->tosend_off == tosend_off && 1293 stream->sm_n_buffered == n_buffered); 1294 1295 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1296 { 1297 if (progress) 1298 { /* Move the stream to the end of the list to ensure fairness. */ 1299 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1300 next_write_stream); 1301 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1302 next_write_stream); 1303 } 1304 } 1305} 1306 1307 1308static size_t 1309inner_reader_empty_size (void *ctx) 1310{ 1311 return 0; 1312} 1313 1314 1315static size_t 1316inner_reader_empty_read (void *ctx, void *buf, size_t count) 1317{ 1318 return 0; 1319} 1320 1321 1322static int 1323stream_flush (lsquic_stream_t *stream) 1324{ 1325 struct lsquic_reader empty_reader; 1326 ssize_t nw; 1327 1328 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1329 assert(stream->sm_n_buffered > 0 || 1330 /* Flushing is also used to packetize standalone FIN: */ 1331 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1332 == STREAM_U_WRITE_DONE)); 1333 1334 empty_reader.lsqr_size = inner_reader_empty_size; 1335 empty_reader.lsqr_read = inner_reader_empty_read; 1336 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1337 nw = stream_write_to_packets(stream, &empty_reader, 0); 1338 1339 if (nw >= 0) 1340 { 1341 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1342 return 0; 1343 } 1344 else 1345 return -1; 1346} 1347 1348 1349static int 1350stream_flush_nocheck (lsquic_stream_t *stream) 1351{ 1352 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1353 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1354 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1355 1356 return stream_flush(stream); 1357} 1358 1359 1360int 1361lsquic_stream_flush (lsquic_stream_t *stream) 1362{ 1363 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1364 { 1365 LSQ_DEBUG("cannot flush closed stream"); 1366 errno = EBADF; 1367 return -1; 1368 } 1369 1370 if (0 == stream->sm_n_buffered) 1371 { 1372 LSQ_DEBUG("flushing 0 bytes: noop"); 1373 return 0; 1374 } 1375 1376 return stream_flush_nocheck(stream); 1377} 1378 1379 1380/* The flush threshold is the maximum size of stream data that can be sent 1381 * in a full packet. 1382 */ 1383#ifdef NDEBUG 1384static 1385#endif 1386 size_t 1387lsquic_stream_flush_threshold (const struct lsquic_stream *stream) 1388{ 1389 enum packet_out_flags flags; 1390 enum packno_bits bits; 1391 unsigned packet_header_sz, stream_header_sz; 1392 size_t threshold; 1393 1394 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1395 flags = bits << POBIT_SHIFT; 1396 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1397 flags |= PO_CONN_ID; 1398 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 1399 flags |= PO_LONGHEAD; 1400 1401 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags); 1402 stream_header_sz = stream->conn_pub->lconn->cn_pf 1403 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1404 1405 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1406 - packet_header_sz - stream_header_sz; 1407 return threshold; 1408} 1409 1410 1411#define COMMON_WRITE_CHECKS() do { \ 1412 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1413 == STREAM_USE_HEADERS) \ 1414 { \ 1415 LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \ 1416 errno = EILSEQ; \ 1417 return -1; \ 1418 } \ 1419 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1420 { \ 1421 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1422 errno = ECONNRESET; \ 1423 return -1; \ 1424 } \ 1425 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1426 { \ 1427 LSQ_INFO("Attempt to write to stream after it was closed for " \ 1428 "writing"); \ 1429 errno = EBADF; \ 1430 return -1; \ 1431 } \ 1432} while (0) 1433 1434 1435struct frame_gen_ctx 1436{ 1437 lsquic_stream_t *fgc_stream; 1438 struct lsquic_reader *fgc_reader; 1439 /* We keep our own count of how many bytes were read from reader because 1440 * some readers are external. The external caller does not have to rely 1441 * on our count, but it can. 1442 */ 1443 size_t fgc_nread_from_reader; 1444}; 1445 1446 1447static size_t 1448frame_gen_size (void *ctx) 1449{ 1450 struct frame_gen_ctx *fg_ctx = ctx; 1451 size_t available, remaining; 1452 1453 /* Make sure we are not writing past available size: */ 1454 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1455 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1456 if (available < remaining) 1457 remaining = available; 1458 1459 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1460} 1461 1462 1463static int 1464frame_gen_fin (void *ctx) 1465{ 1466 struct frame_gen_ctx *fg_ctx = ctx; 1467 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1468 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1469 /* Do not use frame_gen_size() as it may chop the real size: */ 1470 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1471} 1472 1473 1474static void 1475incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1476{ 1477 if (stream->stream_flags & STREAM_CONN_LIMITED) 1478 { 1479 stream->conn_pub->conn_cap.cc_sent += incr; 1480 assert(stream->conn_pub->conn_cap.cc_sent 1481 <= stream->conn_pub->conn_cap.cc_max); 1482 } 1483} 1484 1485 1486static size_t 1487frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1488{ 1489 struct frame_gen_ctx *fg_ctx = ctx; 1490 unsigned char *p = begin_buf; 1491 unsigned char *const end = p + len; 1492 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1493 size_t n_written, available, n_to_write; 1494 1495 if (stream->sm_n_buffered > 0) 1496 { 1497 if (len <= stream->sm_n_buffered) 1498 { 1499 memcpy(p, stream->sm_buf, len); 1500 memmove(stream->sm_buf, stream->sm_buf + len, 1501 stream->sm_n_buffered - len); 1502 stream->sm_n_buffered -= len; 1503 stream->tosend_off += len; 1504 *fin = frame_gen_fin(fg_ctx); 1505 return len; 1506 } 1507 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1508 p += stream->sm_n_buffered; 1509 stream->sm_n_buffered = 0; 1510 } 1511 1512 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1513 n_to_write = end - p; 1514 if (n_to_write > available) 1515 n_to_write = available; 1516 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1517 n_to_write); 1518 p += n_written; 1519 fg_ctx->fgc_nread_from_reader += n_written; 1520 *fin = frame_gen_fin(fg_ctx); 1521 stream->tosend_off += p - (const unsigned char *) begin_buf; 1522 incr_conn_cap(stream, n_written); 1523 return p - (const unsigned char *) begin_buf; 1524} 1525 1526 1527static void 1528check_flush_threshold (lsquic_stream_t *stream) 1529{ 1530 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1531 stream->tosend_off >= stream->sm_flush_to) 1532 { 1533 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1534 stream->sm_flush_to); 1535 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1536 } 1537} 1538 1539 1540static struct lsquic_packet_out * 1541get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1542 const struct lsquic_stream *stream) 1543{ 1544 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1545} 1546 1547 1548static struct lsquic_packet_out * (* const get_packet[])( 1549 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1550{ 1551 lsquic_send_ctl_get_packet_for_stream, 1552 get_brand_new_packet, 1553}; 1554 1555 1556static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1557stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1558{ 1559 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1560 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1561 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1562 unsigned stream_header_sz, need_at_least, off; 1563 lsquic_packet_out_t *packet_out; 1564 int len, s, hsk; 1565 1566 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 1567 == STREAM_HEADERS_SENT 1568 && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream)) 1569 { 1570 struct lsquic_stream *const headers_stream 1571 = lsquic_headers_stream_get_stream(stream->conn_pub->hs); 1572 if (lsquic_stream_has_data_to_flush(headers_stream)) 1573 { 1574 LSQ_DEBUG("flushing headers stream before potential write to a " 1575 "buffered packet"); 1576 (void) lsquic_stream_flush(headers_stream); 1577 } 1578 else 1579 /* Some other stream must have flushed it: this means our headers 1580 * are flushed. 1581 */ 1582 stream->stream_flags |= STREAM_HDRS_FLUSHED; 1583 } 1584 1585 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1586 stream->tosend_off); 1587 need_at_least = stream_header_sz + (size > 0); 1588 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1589 get_packet: 1590 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1591 if (!packet_out) 1592 return SWTP_STOP; 1593 if (hsk) 1594 packet_out->po_header_type = stream->tosend_off == 0 1595 ? HETY_INITIAL : HETY_HANDSHAKE; 1596 1597#if LSQUIC_CONN_STATS 1598 const uint64_t begin_off = stream->tosend_off; 1599#endif 1600 off = packet_out->po_data_sz; 1601 len = pf->pf_gen_stream_frame( 1602 packet_out->po_data + packet_out->po_data_sz, 1603 lsquic_packet_out_avail(packet_out), stream->id, 1604 stream->tosend_off, 1605 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1606 if (len < 0) 1607 { 1608 if (-len > (int) need_at_least) 1609 { 1610 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 1611 "%u bytes, will try again", -len, need_at_least); 1612 need_at_least = -len; 1613 goto get_packet; 1614 } 1615 else 1616 { 1617 LSQ_ERROR("could not generate stream frame"); 1618 return SWTP_ERROR; 1619 } 1620 } 1621 1622#if LSQUIC_CONN_STATS 1623 stream->conn_pub->conn_stats->out.stream_frames += 1; 1624 stream->conn_pub->conn_stats->out.stream_data_sz 1625 += stream->tosend_off - begin_off; 1626#endif 1627 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1628 packet_out->po_data + packet_out->po_data_sz, len); 1629 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1630 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1631 if (0 == lsquic_packet_out_avail(packet_out)) 1632 packet_out->po_flags |= PO_STREAM_END; 1633 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1634 stream, QUIC_FRAME_STREAM, off, len); 1635 if (s != 0) 1636 { 1637 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1638 return SWTP_ERROR; 1639 } 1640 1641 check_flush_threshold(stream); 1642 1643 /* XXX: I don't like it that this is here */ 1644 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1645 { 1646 lsquic_packet_out_zero_pad(packet_out); 1647 packet_out->po_flags |= PO_HELLO; 1648 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1649 } 1650 1651 return SWTP_OK; 1652} 1653 1654 1655static void 1656abort_connection (struct lsquic_stream *stream) 1657{ 1658 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1659 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1660 next_service_stream); 1661 stream->stream_flags |= STREAM_ABORT_CONN; 1662 LSQ_WARN("connection will be aborted"); 1663 maybe_conn_to_tickable(stream); 1664} 1665 1666 1667static ssize_t 1668stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1669 size_t thresh) 1670{ 1671 size_t size; 1672 ssize_t nw; 1673 unsigned seen_ok; 1674 struct frame_gen_ctx fg_ctx = { 1675 .fgc_stream = stream, 1676 .fgc_reader = reader, 1677 .fgc_nread_from_reader = 0, 1678 }; 1679 1680 seen_ok = 0; 1681 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1682 || frame_gen_fin(&fg_ctx)) 1683 { 1684 switch (stream_write_to_packet(&fg_ctx, size)) 1685 { 1686 case SWTP_OK: 1687 if (!seen_ok++) 1688 maybe_conn_to_tickable_if_writeable(stream, 0); 1689 if (frame_gen_fin(&fg_ctx)) 1690 { 1691 stream->stream_flags |= STREAM_FIN_SENT; 1692 goto end; 1693 } 1694 else 1695 break; 1696 case SWTP_STOP: 1697 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1698 goto end; 1699 default: 1700 abort_connection(stream); 1701 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1702 return -1; 1703 } 1704 } 1705 1706 if (thresh) 1707 { 1708 assert(size < thresh); 1709 assert(size >= stream->sm_n_buffered); 1710 size -= stream->sm_n_buffered; 1711 if (size > 0) 1712 { 1713 nw = save_to_buffer(stream, reader, size); 1714 if (nw < 0) 1715 return -1; 1716 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1717 } 1718 } 1719 else 1720 { 1721 /* We count flushed data towards both stream and connection limits, 1722 * so we should have been able to packetize all of it: 1723 */ 1724 assert(0 == stream->sm_n_buffered); 1725 assert(size == 0); 1726 } 1727 1728 maybe_mark_as_blocked(stream); 1729 1730 end: 1731 return fg_ctx.fgc_nread_from_reader; 1732} 1733 1734 1735/* Perform an implicit flush when we hit connection limit while buffering 1736 * data. This is to prevent a (theoretical) stall: 1737 * 1738 * Imagine a number of streams, all of which buffered some data. The buffered 1739 * data is up to connection cap, which means no further writes are possible. 1740 * None of them flushes, which means that data is not sent and connection 1741 * WINDOW_UPDATE frame never arrives from peer. Stall. 1742 */ 1743static int 1744maybe_flush_stream (struct lsquic_stream *stream) 1745{ 1746 if (stream->sm_n_buffered > 0 1747 && (stream->stream_flags & STREAM_CONN_LIMITED) 1748 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1749 return stream_flush_nocheck(stream); 1750 else 1751 return 0; 1752} 1753 1754 1755static ssize_t 1756save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1757 size_t len) 1758{ 1759 size_t avail, n_written; 1760 1761 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1762 1763 if (!stream->sm_buf) 1764 { 1765 stream->sm_buf = malloc(SM_BUF_SIZE); 1766 if (!stream->sm_buf) 1767 return -1; 1768 } 1769 1770 avail = lsquic_stream_write_avail(stream); 1771 if (avail < len) 1772 len = avail; 1773 1774 n_written = reader->lsqr_read(reader->lsqr_ctx, 1775 stream->sm_buf + stream->sm_n_buffered, len); 1776 stream->sm_n_buffered += n_written; 1777 incr_conn_cap(stream, n_written); 1778 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1779 n_written, stream->sm_n_buffered); 1780 if (0 != maybe_flush_stream(stream)) 1781 return -1; 1782 return n_written; 1783} 1784 1785 1786static ssize_t 1787stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1788{ 1789 size_t thresh, len; 1790 1791 thresh = lsquic_stream_flush_threshold(stream); 1792 len = reader->lsqr_size(reader->lsqr_ctx); 1793 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1794 stream->sm_n_buffered + len < thresh) 1795 return save_to_buffer(stream, reader, len); 1796 else 1797 return stream_write_to_packets(stream, reader, thresh); 1798} 1799 1800 1801ssize_t 1802lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1803{ 1804 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1805 return lsquic_stream_writev(stream, &iov, 1); 1806} 1807 1808 1809struct inner_reader_iovec { 1810 const struct iovec *iov; 1811 const struct iovec *end; 1812 unsigned cur_iovec_off; 1813}; 1814 1815 1816static size_t 1817inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1818{ 1819 struct inner_reader_iovec *const iro = ctx; 1820 unsigned char *p = buf; 1821 unsigned char *const end = p + count; 1822 unsigned n_tocopy; 1823 1824 while (iro->iov < iro->end && p < end) 1825 { 1826 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1827 if (n_tocopy > (unsigned) (end - p)) 1828 n_tocopy = end - p; 1829 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1830 n_tocopy); 1831 p += n_tocopy; 1832 iro->cur_iovec_off += n_tocopy; 1833 if (iro->iov->iov_len == iro->cur_iovec_off) 1834 { 1835 ++iro->iov; 1836 iro->cur_iovec_off = 0; 1837 } 1838 } 1839 1840 return p + count - end; 1841} 1842 1843 1844static size_t 1845inner_reader_iovec_size (void *ctx) 1846{ 1847 struct inner_reader_iovec *const iro = ctx; 1848 const struct iovec *iov; 1849 size_t size; 1850 1851 size = 0; 1852 for (iov = iro->iov; iov < iro->end; ++iov) 1853 size += iov->iov_len; 1854 1855 return size - iro->cur_iovec_off; 1856} 1857 1858 1859ssize_t 1860lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1861 int iovcnt) 1862{ 1863 COMMON_WRITE_CHECKS(); 1864 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1865 1866 struct inner_reader_iovec iro = { 1867 .iov = iov, 1868 .end = iov + iovcnt, 1869 .cur_iovec_off = 0, 1870 }; 1871 struct lsquic_reader reader = { 1872 .lsqr_read = inner_reader_iovec_read, 1873 .lsqr_size = inner_reader_iovec_size, 1874 .lsqr_ctx = &iro, 1875 }; 1876 1877 return stream_write(stream, &reader); 1878} 1879 1880 1881ssize_t 1882lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1883{ 1884 COMMON_WRITE_CHECKS(); 1885 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1886 return stream_write(stream, reader); 1887} 1888 1889 1890int 1891lsquic_stream_send_headers (lsquic_stream_t *stream, 1892 const lsquic_http_headers_t *headers, int eos) 1893{ 1894 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1895 STREAM_U_WRITE_DONE)) 1896 == STREAM_USE_HEADERS) 1897 { 1898 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1899 stream->id, headers, eos, lsquic_stream_priority(stream)); 1900 if (0 == s) 1901 { 1902 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1903 stream->stream_flags |= STREAM_HEADERS_SENT; 1904 if (eos) 1905 stream->stream_flags |= STREAM_FIN_SENT; 1906 LSQ_INFO("sent headers for stream %u", stream->id); 1907 } 1908 else 1909 LSQ_WARN("could not send headers: %s", strerror(errno)); 1910 return s; 1911 } 1912 else 1913 { 1914 LSQ_INFO("cannot send headers for stream %u in this state", stream->id); 1915 errno = EBADMSG; 1916 return -1; 1917 } 1918} 1919 1920 1921void 1922lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1923{ 1924 if (offset > stream->max_send_off) 1925 { 1926 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1927 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1928 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1929 stream->max_send_off = offset; 1930 } 1931 else 1932 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1933 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1934 stream->max_send_off); 1935} 1936 1937 1938/* This function is used to update offsets after handshake completes and we 1939 * learn of peer's limits from the handshake values. 1940 */ 1941int 1942lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1943{ 1944 LSQ_DEBUG("setting max_send_off to %u", offset); 1945 if (offset > stream->max_send_off) 1946 { 1947 lsquic_stream_window_update(stream, offset); 1948 return 0; 1949 } 1950 else if (offset < stream->tosend_off) 1951 { 1952 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1953 "already sent on this stream (%"PRIu64" bytes)", offset, 1954 stream->tosend_off); 1955 return -1; 1956 } 1957 else 1958 { 1959 stream->max_send_off = offset; 1960 return 0; 1961 } 1962} 1963 1964 1965void 1966lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1967{ 1968 lsquic_stream_reset_ext(stream, error_code, 1); 1969} 1970 1971 1972void 1973lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1974 int do_close) 1975{ 1976 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1977 { 1978 LSQ_INFO("reset already sent"); 1979 return; 1980 } 1981 1982 SM_HISTORY_APPEND(stream, SHE_RESET); 1983 1984 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1985 stream->error_code = error_code; 1986 1987 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1988 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1989 next_send_stream); 1990 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1991 stream->stream_flags |= STREAM_SEND_RST; 1992 1993 drop_buffered_data(stream); 1994 maybe_elide_stream_frames(stream); 1995 maybe_schedule_call_on_close(stream); 1996 1997 if (do_close) 1998 lsquic_stream_close(stream); 1999 else 2000 maybe_conn_to_tickable_if_writeable(stream, 1); 2001} 2002 2003 2004unsigned 2005lsquic_stream_id (const lsquic_stream_t *stream) 2006{ 2007 return stream->id; 2008} 2009 2010 2011struct lsquic_conn * 2012lsquic_stream_conn (const lsquic_stream_t *stream) 2013{ 2014 return stream->conn_pub->lconn; 2015} 2016 2017 2018int 2019lsquic_stream_close (lsquic_stream_t *stream) 2020{ 2021 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 2022 SM_HISTORY_APPEND(stream, SHE_CLOSE); 2023 if (lsquic_stream_is_closed(stream)) 2024 { 2025 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 2026 errno = EBADF; 2027 return -1; 2028 } 2029 stream_shutdown_write(stream); 2030 stream_shutdown_read(stream); 2031 maybe_schedule_call_on_close(stream); 2032 maybe_finish_stream(stream); 2033 maybe_conn_to_tickable_if_writeable(stream, 1); 2034 return 0; 2035} 2036 2037 2038#ifndef NDEBUG 2039#if __GNUC__ 2040__attribute__((weak)) 2041#endif 2042#endif 2043void 2044lsquic_stream_acked (lsquic_stream_t *stream) 2045{ 2046 assert(stream->n_unacked); 2047 --stream->n_unacked; 2048 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 2049 if (0 == stream->n_unacked) 2050 maybe_finish_stream(stream); 2051} 2052 2053 2054void 2055lsquic_stream_push_req (lsquic_stream_t *stream, 2056 struct uncompressed_headers *push_req) 2057{ 2058 assert(!stream->push_req); 2059 stream->push_req = push_req; 2060 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 2061} 2062 2063 2064int 2065lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2066{ 2067 return 1 & ~stream->id; 2068} 2069 2070 2071int 2072lsquic_stream_push_info (const lsquic_stream_t *stream, 2073 uint32_t *ref_stream_id, void **hset) 2074{ 2075 if (lsquic_stream_is_pushed(stream)) 2076 { 2077 assert(stream->push_req); 2078 *ref_stream_id = stream->push_req->uh_stream_id; 2079 *hset = stream->push_req->uh_hset; 2080 return 0; 2081 } 2082 else 2083 return -1; 2084} 2085 2086 2087int 2088lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2089{ 2090 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2091 { 2092 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2093 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2094 stream->stream_flags |= STREAM_HAVE_UH; 2095 if (uh->uh_flags & UH_FIN) 2096 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2097 stream->uh = uh; 2098 if (uh->uh_oth_stream_id == 0) 2099 { 2100 if (uh->uh_weight) 2101 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2102 } 2103 else 2104 LSQ_NOTICE("don't know how to depend on stream %u", 2105 uh->uh_oth_stream_id); 2106 return 0; 2107 } 2108 else 2109 { 2110 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2111 return -1; 2112 } 2113} 2114 2115 2116unsigned 2117lsquic_stream_priority (const lsquic_stream_t *stream) 2118{ 2119 return 256 - stream->sm_priority; 2120} 2121 2122 2123int 2124lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2125{ 2126 /* The user should never get a reference to the special streams, 2127 * but let's check just in case: 2128 */ 2129 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2130 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2131 LSQUIC_STREAM_HEADERS == stream->id)) 2132 return -1; 2133 if (priority < 1 || priority > 256) 2134 return -1; 2135 stream->sm_priority = 256 - priority; 2136 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2137 LSQ_DEBUG("set priority to %u", priority); 2138 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2139 return 0; 2140} 2141 2142 2143int 2144lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2145{ 2146 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2147 { 2148 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2149 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2150 { 2151 /* We need to send headers only if we are a) using HEADERS stream 2152 * and b) we already sent initial headers. If initial headers 2153 * have not been sent yet, stream priority will be sent in the 2154 * HEADERS frame. 2155 */ 2156 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2157 stream->id, 0, 0, priority); 2158 } 2159 else 2160 return 0; 2161 } 2162 else 2163 return -1; 2164} 2165 2166 2167lsquic_stream_ctx_t * 2168lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2169{ 2170 return stream->st_ctx; 2171} 2172 2173 2174int 2175lsquic_stream_refuse_push (lsquic_stream_t *stream) 2176{ 2177 if (lsquic_stream_is_pushed(stream) && 2178 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2179 { 2180 LSQ_DEBUG("refusing pushed stream: send reset"); 2181 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2182 return 0; 2183 } 2184 else 2185 return -1; 2186} 2187 2188 2189size_t 2190lsquic_stream_mem_used (const struct lsquic_stream *stream) 2191{ 2192 size_t size; 2193 2194 size = sizeof(stream); 2195 if (stream->sm_buf) 2196 size += SM_BUF_SIZE; 2197 if (stream->data_in) 2198 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2199 2200 return size; 2201} 2202 2203 2204lsquic_cid_t 2205lsquic_stream_cid (const struct lsquic_stream *stream) 2206{ 2207 return LSQUIC_LOG_CONN_ID; 2208} 2209 2210 2211void * 2212lsquic_stream_get_hset (struct lsquic_stream *stream) 2213{ 2214 void *hset; 2215 2216 if (stream->stream_flags & STREAM_RST_FLAGS) 2217 { 2218 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 2219 errno = ECONNRESET; 2220 return NULL; 2221 } 2222 2223 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2224 != (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2225 { 2226 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 2227 stream->stream_flags); 2228 return NULL; 2229 } 2230 2231 if (!stream->uh) 2232 { 2233 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 2234 return NULL; 2235 } 2236 2237 if (stream->uh->uh_flags & UH_H1H) 2238 { 2239 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 2240 return NULL; 2241 } 2242 2243 hset = stream->uh->uh_hset; 2244 stream->uh->uh_hset = NULL; 2245 destroy_uh(stream); 2246 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 2247 { 2248 stream->stream_flags |= STREAM_FIN_REACHED; 2249 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 2250 } 2251 LSQ_DEBUG("return header set"); 2252 return hset; 2253} 2254