lsquic_stream.c revision 229fce07
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_conn.h" 44#include "lsquic_data_in_if.h" 45#include "lsquic_parse.h" 46#include "lsquic_packet_out.h" 47#include "lsquic_engine_public.h" 48#include "lsquic_senhist.h" 49#include "lsquic_pacer.h" 50#include "lsquic_cubic.h" 51#include "lsquic_send_ctl.h" 52#include "lsquic_headers.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152 153# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 154# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 155 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 156 LSQ_DEBUG("history: [%.*s]", \ 157 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 158 (stream)->sm_hist_buf); \ 159 } while (0) 160#else 161# define SM_HISTORY_APPEND(stream, event) 162# define SM_HISTORY_DUMP_REMAINING(stream) 163#endif 164 165 166static int 167stream_inside_callback (const lsquic_stream_t *stream) 168{ 169 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 170} 171 172 173static void 174maybe_conn_to_tickable (lsquic_stream_t *stream) 175{ 176 if (!stream_inside_callback(stream)) 177 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 178 stream->conn_pub->lconn); 179} 180 181 182/* Here, "readable" means that the user is able to read from the stream. */ 183static void 184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 185{ 186 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 187 { 188 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 189 stream->conn_pub->lconn); 190 } 191} 192 193 194/* Here, "writeable" means that data can be put into packets to be 195 * scheduled to be sent out. 196 * 197 * If `check_can_send' is false, it means that we do not need to check 198 * whether packets can be sent. This check was already performed when 199 * we packetized stream data. 200 */ 201static void 202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 203 int check_can_send) 204{ 205 if (!stream_inside_callback(stream) && 206 (!check_can_send 207 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 208 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 209 { 210 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 211 stream->conn_pub->lconn); 212 } 213} 214 215 216static int 217stream_stalled (const lsquic_stream_t *stream) 218{ 219 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 220 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 221 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 222} 223 224 225/* TODO: The logic to figure out whether the stream is connection limited 226 * should be taken out of the constructor. The caller should specify this 227 * via one of enum stream_ctor_flags. 228 */ 229lsquic_stream_t * 230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 231 const struct lsquic_stream_if *stream_if, 232 void *stream_if_ctx, unsigned initial_window, 233 unsigned initial_send_off, 234 enum stream_ctor_flags ctor_flags) 235{ 236 lsquic_cfcw_t *cfcw; 237 lsquic_stream_t *stream; 238 239 stream = calloc(1, sizeof(*stream)); 240 if (!stream) 241 return NULL; 242 243 stream->stream_if = stream_if; 244 stream->id = id; 245 stream->conn_pub = conn_pub; 246 stream->sm_onnew_arg = stream_if_ctx; 247 if (!initial_window) 248 initial_window = 16 * 1024; 249 if (LSQUIC_STREAM_HANDSHAKE == id || 250 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 251 cfcw = NULL; 252 else 253 { 254 cfcw = &conn_pub->cfcw; 255 stream->stream_flags |= STREAM_CONN_LIMITED; 256 if (conn_pub->hs) 257 stream->stream_flags |= STREAM_USE_HEADERS; 258 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 259 } 260 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 261 if (!initial_send_off) 262 initial_send_off = 16 * 1024; 263 stream->max_send_off = initial_send_off; 264 if (ctor_flags & SCF_USE_DI_HASH) 265 stream->data_in = data_in_hash_new(conn_pub, id, 0); 266 else 267 stream->data_in = data_in_nocopy_new(conn_pub, id); 268 LSQ_DEBUG("created stream %u @%p", id, stream); 269 SM_HISTORY_APPEND(stream, SHE_CREATED); 270 if (ctor_flags & SCF_DI_AUTOSWITCH) 271 stream->stream_flags |= STREAM_AUTOSWITCH; 272 if (ctor_flags & SCF_CALL_ON_NEW) 273 lsquic_stream_call_on_new(stream); 274 if (ctor_flags & SCF_DISP_RW_ONCE) 275 stream->stream_flags |= STREAM_RW_ONCE; 276 return stream; 277} 278 279 280void 281lsquic_stream_call_on_new (lsquic_stream_t *stream) 282{ 283 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 284 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 285 { 286 LSQ_DEBUG("calling on_new_stream"); 287 SM_HISTORY_APPEND(stream, SHE_ONNEW); 288 stream->stream_flags |= STREAM_ONNEW_DONE; 289 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 290 stream); 291 } 292} 293 294 295static void 296decr_conn_cap (struct lsquic_stream *stream, size_t incr) 297{ 298 if (stream->stream_flags & STREAM_CONN_LIMITED) 299 { 300 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 301 stream->conn_pub->conn_cap.cc_sent -= incr; 302 } 303} 304 305 306static void 307drop_buffered_data (struct lsquic_stream *stream) 308{ 309 decr_conn_cap(stream, stream->sm_n_buffered); 310 stream->sm_n_buffered = 0; 311 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 312 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 313} 314 315 316static void 317destroy_uh (struct lsquic_stream *stream) 318{ 319 if (stream->uh) 320 { 321 if (stream->uh->uh_hset) 322 stream->conn_pub->enpub->enp_hsi_if 323 ->hsi_discard_header_set(stream->uh->uh_hset); 324 free(stream->uh); 325 stream->uh = NULL; 326 } 327} 328 329 330void 331lsquic_stream_destroy (lsquic_stream_t *stream) 332{ 333 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 334 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 335 STREAM_ONNEW_DONE) 336 { 337 stream->stream_flags |= STREAM_ONCLOSE_DONE; 338 stream->stream_if->on_close(stream, stream->st_ctx); 339 } 340 if (stream->stream_flags & STREAM_SENDING_FLAGS) 341 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 342 if (stream->stream_flags & STREAM_WANT_READ) 343 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 344 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 345 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 346 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 347 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 348 drop_buffered_data(stream); 349 lsquic_sfcw_consume_rem(&stream->fc); 350 drop_frames_in(stream); 351 if (stream->push_req) 352 { 353 if (stream->push_req->uh_hset) 354 stream->conn_pub->enpub->enp_hsi_if 355 ->hsi_discard_header_set(stream->push_req->uh_hset); 356 free(stream->push_req); 357 } 358 destroy_uh(stream); 359 free(stream->sm_buf); 360 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 361 SM_HISTORY_DUMP_REMAINING(stream); 362 free(stream); 363} 364 365 366static int 367stream_is_finished (const lsquic_stream_t *stream) 368{ 369 return lsquic_stream_is_closed(stream) 370 /* n_unacked checks that no outgoing packets that reference this 371 * stream are outstanding: 372 */ 373 && 0 == stream->n_unacked 374 /* This checks that no packets that reference this stream will 375 * become outstanding: 376 */ 377 && 0 == (stream->stream_flags & STREAM_SEND_RST) 378 && ((stream->stream_flags & STREAM_FORCE_FINISH) 379 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 380 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 381} 382 383 384static void 385maybe_finish_stream (lsquic_stream_t *stream) 386{ 387 if (0 == (stream->stream_flags & STREAM_FINISHED) && 388 stream_is_finished(stream)) 389 { 390 LSQ_DEBUG("stream %u is now finished", stream->id); 391 SM_HISTORY_APPEND(stream, SHE_FINISHED); 392 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 393 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 394 next_service_stream); 395 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 396 } 397} 398 399 400static void 401maybe_schedule_call_on_close (lsquic_stream_t *stream) 402{ 403 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 404 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 405 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 406 { 407 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 408 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 409 next_service_stream); 410 stream->stream_flags |= STREAM_CALL_ONCLOSE; 411 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 412 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 413 } 414} 415 416 417void 418lsquic_stream_call_on_close (lsquic_stream_t *stream) 419{ 420 assert(stream->stream_flags & STREAM_ONNEW_DONE); 421 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 422 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 423 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 424 next_service_stream); 425 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 426 { 427 LSQ_DEBUG("calling on_close for stream %u", stream->id); 428 stream->stream_flags |= STREAM_ONCLOSE_DONE; 429 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 430 stream->stream_if->on_close(stream, stream->st_ctx); 431 } 432 else 433 assert(0); 434} 435 436 437int 438lsquic_stream_readable (const lsquic_stream_t *stream) 439{ 440 /* A stream is readable if one of the following is true: */ 441 return 442 /* - It is already finished: in that case, lsquic_stream_read() will 443 * return 0. 444 */ 445 (stream->stream_flags & STREAM_FIN_REACHED) 446 /* - The stream is reset, by either side. In this case, 447 * lsquic_stream_read() will return -1 (we want the user to be 448 * able to collect the error). 449 */ 450 || (stream->stream_flags & STREAM_RST_FLAGS) 451 /* - Either we are not in HTTP mode or the HTTP headers have been 452 * received and the headers or data from the stream can be read. 453 */ 454 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 455 == STREAM_USE_HEADERS) 456 && (stream->uh != NULL 457 || stream->data_in->di_if->di_get_frame(stream->data_in, 458 stream->read_offset))) 459 ; 460} 461 462 463size_t 464lsquic_stream_write_avail (const struct lsquic_stream *stream) 465{ 466 uint64_t stream_avail, conn_avail; 467 468 stream_avail = stream->max_send_off - stream->tosend_off 469 - stream->sm_n_buffered; 470 if (stream->stream_flags & STREAM_CONN_LIMITED) 471 { 472 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 473 if (conn_avail < stream_avail) 474 return conn_avail; 475 } 476 477 return stream_avail; 478} 479 480 481int 482lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 483{ 484 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 485 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 486 { 487 return -1; 488 } 489 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 490 { 491 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 492 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 493 next_send_stream); 494 stream->stream_flags |= STREAM_SEND_WUF; 495 } 496 return 0; 497} 498 499 500int 501lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 502{ 503 uint64_t max_off; 504 int got_next_offset; 505 enum ins_frame ins_frame; 506 507 assert(frame->packet_in); 508 509 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 510 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 511 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 512 513 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 514 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 515 { 516 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 517 lsquic_malo_put(frame); 518 return -1; 519 } 520 521 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 522 insert_frame: 523 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 524 if (INS_FRAME_OK == ins_frame) 525 { 526 /* Update maximum offset in the flow controller and check for flow 527 * control violation: 528 */ 529 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 530 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 531 return -1; 532 if (frame->data_frame.df_fin) 533 { 534 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 535 stream->stream_flags |= STREAM_FIN_RECVD; 536 maybe_finish_stream(stream); 537 } 538 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 539 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 540 { 541 stream->data_in = stream->data_in->di_if->di_switch_impl( 542 stream->data_in, stream->read_offset); 543 if (!stream->data_in) 544 { 545 stream->data_in = data_in_error_new(); 546 return -1; 547 } 548 } 549 if (got_next_offset) 550 /* Checking the offset saves di_get_frame() call */ 551 maybe_conn_to_tickable_if_readable(stream); 552 return 0; 553 } 554 else if (INS_FRAME_DUP == ins_frame) 555 { 556 return 0; 557 } 558 else if (INS_FRAME_OVERLAP == ins_frame) 559 { 560 LSQ_DEBUG("overlap: switching DATA IN implementation"); 561 stream->data_in = stream->data_in->di_if->di_switch_impl( 562 stream->data_in, stream->read_offset); 563 if (stream->data_in) 564 goto insert_frame; 565 stream->data_in = data_in_error_new(); 566 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 567 lsquic_malo_put(frame); 568 return -1; 569 } 570 else 571 { 572 assert(INS_FRAME_ERR == ins_frame); 573 return -1; 574 } 575} 576 577 578static void 579drop_frames_in (lsquic_stream_t *stream) 580{ 581 if (stream->data_in) 582 { 583 stream->data_in->di_if->di_destroy(stream->data_in); 584 /* To avoid checking whether `data_in` is set, just set to the error 585 * data-in stream. It does the right thing after incoming data is 586 * dropped. 587 */ 588 stream->data_in = data_in_error_new(); 589 } 590} 591 592 593static void 594maybe_elide_stream_frames (struct lsquic_stream *stream) 595{ 596 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 597 { 598 if (stream->n_unacked) 599 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 600 stream->id); 601 stream->stream_flags |= STREAM_FRAMES_ELIDED; 602 } 603} 604 605 606int 607lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 608 uint32_t error_code) 609{ 610 611 if (stream->stream_flags & STREAM_RST_RECVD) 612 { 613 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 614 return 0; 615 } 616 617 SM_HISTORY_APPEND(stream, SHE_RST_IN); 618 /* This flag must always be set, even if we are "ignoring" it: it is 619 * used by elision code. 620 */ 621 stream->stream_flags |= STREAM_RST_RECVD; 622 623 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 624 { 625 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 626 "smaller than that of byte following the last byte we have seen: " 627 "0x%"PRIX64, stream->id, offset, 628 lsquic_sfcw_get_max_recv_off(&stream->fc)); 629 return -1; 630 } 631 632 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 633 { 634 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 635 " violates flow control", stream->id, offset); 636 return -1; 637 } 638 639 /* Let user collect error: */ 640 maybe_conn_to_tickable_if_readable(stream); 641 642 lsquic_sfcw_consume_rem(&stream->fc); 643 drop_frames_in(stream); 644 drop_buffered_data(stream); 645 maybe_elide_stream_frames(stream); 646 647 if (!(stream->stream_flags & 648 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 649 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 650 651 stream->stream_flags |= STREAM_RST_RECVD; 652 653 maybe_finish_stream(stream); 654 maybe_schedule_call_on_close(stream); 655 656 return 0; 657} 658 659 660uint64_t 661lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 662{ 663 assert(stream->stream_flags & STREAM_SEND_WUF); 664 stream->stream_flags &= ~STREAM_SEND_WUF; 665 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 666 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 667 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 668} 669 670 671void 672lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 673{ 674 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 675 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 676 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 677 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 678 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 679} 680 681 682void 683lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 684{ 685 assert(stream->stream_flags & STREAM_SEND_RST); 686 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 687 stream->stream_flags &= ~STREAM_SEND_RST; 688 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 689 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 690 stream->stream_flags |= STREAM_RST_SENT; 691 maybe_finish_stream(stream); 692} 693 694 695static size_t 696read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 697{ 698 struct http1x_headers *h1h = stream->uh->uh_hset; 699 size_t n_avail = h1h->h1h_size - h1h->h1h_off; 700 if (n_avail < len) 701 len = n_avail; 702 memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len); 703 h1h->h1h_off += len; 704 if (h1h->h1h_off == h1h->h1h_size) 705 { 706 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 707 destroy_uh(stream); 708 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 709 { 710 stream->stream_flags |= STREAM_FIN_REACHED; 711 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 712 } 713 } 714 return len; 715} 716 717 718/* This function returns 0 when EOF is reached. 719 */ 720ssize_t 721lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 722 int iovcnt) 723{ 724 size_t total_nread, nread; 725 int processed_frames, read_unc_headers, iovidx; 726 unsigned char *p, *end; 727 728 SM_HISTORY_APPEND(stream, SHE_USER_READ); 729 730#define NEXT_IOV() do { \ 731 ++iovidx; \ 732 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 733 ++iovidx; \ 734 if (iovidx < iovcnt) \ 735 { \ 736 p = iov[iovidx].iov_base; \ 737 end = p + iov[iovidx].iov_len; \ 738 } \ 739 else \ 740 p = end = NULL; \ 741} while (0) 742 743#define AVAIL() (end - p) 744 745 if (stream->stream_flags & STREAM_RST_FLAGS) 746 { 747 errno = ECONNRESET; 748 return -1; 749 } 750 if (stream->stream_flags & STREAM_U_READ_DONE) 751 { 752 errno = EBADF; 753 return -1; 754 } 755 if (stream->stream_flags & STREAM_FIN_REACHED) 756 return 0; 757 758 total_nread = 0; 759 processed_frames = 0; 760 761 iovidx = -1; 762 NEXT_IOV(); 763 764 if (stream->uh) 765 { 766 if (stream->uh->uh_flags & UH_H1H) 767 { 768 if (AVAIL()) 769 { 770 read_unc_headers = 1; 771 do 772 { 773 nread = read_uh(stream, p, AVAIL()); 774 p += nread; 775 total_nread += nread; 776 if (p == end) 777 NEXT_IOV(); 778 } 779 while (stream->uh && AVAIL()); 780 } 781 else 782 read_unc_headers = 0; 783 } 784 else 785 { 786 LSQ_INFO("header set not claimed: cannot read from stream"); 787 return -1; 788 } 789 } 790 else 791 read_unc_headers = 0; 792 793 struct data_frame *data_frame; 794 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 795 { 796 ++processed_frames; 797 size_t navail = data_frame->df_size - data_frame->df_read_off; 798 size_t ntowrite = AVAIL(); 799 if (navail < ntowrite) 800 ntowrite = navail; 801 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 802 p += ntowrite; 803 data_frame->df_read_off += ntowrite; 804 stream->read_offset += ntowrite; 805 total_nread += ntowrite; 806 if (data_frame->df_read_off == data_frame->df_size) 807 { 808 const int fin = data_frame->df_fin; 809 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 810 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 811 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 812 { 813 stream->data_in = stream->data_in->di_if->di_switch_impl( 814 stream->data_in, stream->read_offset); 815 if (!stream->data_in) 816 { 817 stream->data_in = data_in_error_new(); 818 return -1; 819 } 820 } 821 if (fin) 822 { 823 stream->stream_flags |= STREAM_FIN_REACHED; 824 break; 825 } 826 } 827 if (p == end) 828 NEXT_IOV(); 829 } 830 831 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 832 total_nread, stream->read_offset); 833 834 if (processed_frames) 835 { 836 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 837 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 838 { 839 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 840 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 841 stream->stream_flags |= STREAM_SEND_WUF; 842 maybe_conn_to_tickable_if_writeable(stream, 1); 843 } 844 } 845 846 if (processed_frames || read_unc_headers) 847 { 848 return total_nread; 849 } 850 else 851 { 852 assert(0 == total_nread); 853 errno = EWOULDBLOCK; 854 return -1; 855 } 856} 857 858 859ssize_t 860lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 861{ 862 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 863 return lsquic_stream_readv(stream, &iov, 1); 864} 865 866 867static void 868stream_shutdown_read (lsquic_stream_t *stream) 869{ 870 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 871 { 872 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 873 stream->stream_flags |= STREAM_U_READ_DONE; 874 stream_wantread(stream, 0); 875 maybe_finish_stream(stream); 876 } 877} 878 879 880static void 881stream_shutdown_write (lsquic_stream_t *stream) 882{ 883 if (stream->stream_flags & STREAM_U_WRITE_DONE) 884 return; 885 886 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 887 stream->stream_flags |= STREAM_U_WRITE_DONE; 888 stream_wantwrite(stream, 0); 889 890 /* Don't bother to check whether there is anything else to write if 891 * the flags indicate that nothing else should be written. 892 */ 893 if (!(stream->stream_flags & 894 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 895 { 896 if (stream->sm_n_buffered == 0) 897 { 898 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 899 stream)) 900 { 901 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 902 stream->stream_flags |= STREAM_FIN_SENT; 903 } 904 else 905 { 906 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 907 "flag in it"); 908 (void) stream_flush_nocheck(stream); 909 } 910 } 911 else 912 (void) stream_flush_nocheck(stream); 913 } 914} 915 916 917int 918lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 919{ 920 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 921 if (lsquic_stream_is_closed(stream)) 922 { 923 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 924 errno = EBADF; 925 return -1; 926 } 927 /* 0: read, 1: write: 2: read and write 928 */ 929 if (how < 0 || how > 2) 930 { 931 errno = EINVAL; 932 return -1; 933 } 934 935 if (how) 936 stream_shutdown_write(stream); 937 if (how != 1) 938 stream_shutdown_read(stream); 939 940 maybe_finish_stream(stream); 941 maybe_schedule_call_on_close(stream); 942 if (how) 943 maybe_conn_to_tickable_if_writeable(stream, 1); 944 945 return 0; 946} 947 948 949void 950lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 951{ 952 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 953 if (LSQUIC_STREAM_HANDSHAKE == stream->id 954 || ((stream->stream_flags & STREAM_USE_HEADERS) && 955 LSQUIC_STREAM_HEADERS == stream->id)) 956 { 957 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 958 stream->stream_flags |= STREAM_FORCE_FINISH; 959 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 960 } 961 maybe_finish_stream(stream); 962 maybe_schedule_call_on_close(stream); 963} 964 965 966static void 967fake_reset_unused_stream (lsquic_stream_t *stream) 968{ 969 stream->stream_flags |= 970 STREAM_RST_RECVD /* User will pick this up on read or write */ 971 | STREAM_RST_SENT /* Don't send anything else on this stream */ 972 ; 973 974 /* Cancel all writes to the network scheduled for this stream: */ 975 if (stream->stream_flags & STREAM_SENDING_FLAGS) 976 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 977 next_send_stream); 978 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 979 980 LSQ_DEBUG("fake-reset stream %u%s", 981 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 982 maybe_finish_stream(stream); 983 maybe_schedule_call_on_close(stream); 984} 985 986 987/* This function should only be called for locally-initiated streams whose ID 988 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 989 * frame sent by peer but we have not yet received it and created a stream. 990 * In this situation, we mark the stream as reset, so that user's on_read or 991 * on_write event callback picks up the error. That, in turn, should result 992 * in stream being closed. 993 * 994 * If we have received any data frames on this stream, this probably indicates 995 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 996 * lower than this. However, we still try to handle it gracefully and peform 997 * a shutdown, as if the stream was not reset. 998 */ 999void 1000lsquic_stream_received_goaway (lsquic_stream_t *stream) 1001{ 1002 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1003 if (0 == stream->read_offset && 1004 stream->data_in->di_if->di_empty(stream->data_in)) 1005 fake_reset_unused_stream(stream); /* Normal condition */ 1006 else 1007 { /* This is odd, let's handle it the best we can: */ 1008 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1009 lsquic_stream_shutdown_internal(stream); 1010 } 1011} 1012 1013 1014uint64_t 1015lsquic_stream_read_offset (const lsquic_stream_t *stream) 1016{ 1017 return stream->read_offset; 1018} 1019 1020 1021static int 1022stream_wantread (lsquic_stream_t *stream, int is_want) 1023{ 1024 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 1025 const int new_val = !!is_want; 1026 if (old_val != new_val) 1027 { 1028 if (new_val) 1029 { 1030 if (!old_val) 1031 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1032 next_read_stream); 1033 stream->stream_flags |= STREAM_WANT_READ; 1034 } 1035 else 1036 { 1037 stream->stream_flags &= ~STREAM_WANT_READ; 1038 if (old_val) 1039 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1040 next_read_stream); 1041 } 1042 } 1043 return old_val; 1044} 1045 1046 1047static void 1048maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1049{ 1050 assert(STREAM_WRITE_Q_FLAGS & flag); 1051 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1052 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1053 next_write_stream); 1054 stream->stream_flags |= flag; 1055} 1056 1057 1058static void 1059maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1060{ 1061 assert(STREAM_WRITE_Q_FLAGS & flag); 1062 if (stream->stream_flags & flag) 1063 { 1064 stream->stream_flags &= ~flag; 1065 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1066 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1067 next_write_stream); 1068 } 1069} 1070 1071 1072static int 1073stream_wantwrite (lsquic_stream_t *stream, int is_want) 1074{ 1075 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1076 const int new_val = !!is_want; 1077 if (old_val != new_val) 1078 { 1079 if (new_val) 1080 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1081 else 1082 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1083 } 1084 return old_val; 1085} 1086 1087 1088int 1089lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1090{ 1091 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1092 { 1093 if (is_want) 1094 maybe_conn_to_tickable_if_readable(stream); 1095 return stream_wantread(stream, is_want); 1096 } 1097 else 1098 { 1099 errno = EBADF; 1100 return -1; 1101 } 1102} 1103 1104 1105int 1106lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1107{ 1108 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1109 { 1110 if (is_want) 1111 maybe_conn_to_tickable_if_writeable(stream, 1); 1112 return stream_wantwrite(stream, is_want); 1113 } 1114 else 1115 { 1116 errno = EBADF; 1117 return -1; 1118 } 1119} 1120 1121 1122#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1123 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1124 1125 1126static void 1127stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1128{ 1129 unsigned no_progress_count, no_progress_limit; 1130 enum stream_flags flags; 1131 uint64_t size; 1132 1133 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1134 1135 no_progress_count = 0; 1136 while ((stream->stream_flags & STREAM_WANT_READ) 1137 && lsquic_stream_readable(stream)) 1138 { 1139 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1140 size = stream->read_offset; 1141 1142 stream->stream_if->on_read(stream, stream->st_ctx); 1143 1144 if (no_progress_limit && size == stream->read_offset && 1145 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1146 { 1147 ++no_progress_count; 1148 if (no_progress_count >= no_progress_limit) 1149 { 1150 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1151 "progress) in user code reading from stream", 1152 no_progress_count, 1153 no_progress_count == 1 ? "" : "s"); 1154 break; 1155 } 1156 } 1157 else 1158 no_progress_count = 0; 1159 } 1160} 1161 1162 1163static void 1164stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1165{ 1166 unsigned no_progress_count, no_progress_limit; 1167 enum stream_flags flags; 1168 1169 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1170 1171 no_progress_count = 0; 1172 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1173 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1174 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1175 && lsquic_stream_write_avail(stream)) 1176 { 1177 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1178 1179 stream->stream_if->on_write(stream, stream->st_ctx); 1180 1181 if (no_progress_limit && 1182 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1183 { 1184 ++no_progress_count; 1185 if (no_progress_count >= no_progress_limit) 1186 { 1187 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1188 "progress) in user code writing to stream", 1189 no_progress_count, 1190 no_progress_count == 1 ? "" : "s"); 1191 break; 1192 } 1193 } 1194 else 1195 no_progress_count = 0; 1196 } 1197} 1198 1199 1200static void 1201stream_dispatch_read_events_once (lsquic_stream_t *stream) 1202{ 1203 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1204 { 1205 stream->stream_if->on_read(stream, stream->st_ctx); 1206 } 1207} 1208 1209 1210static void 1211maybe_mark_as_blocked (lsquic_stream_t *stream) 1212{ 1213 struct lsquic_conn_cap *cc; 1214 1215 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1216 { 1217 if (stream->blocked_off < stream->max_send_off) 1218 { 1219 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1220 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1221 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1222 next_send_stream); 1223 stream->stream_flags |= STREAM_SEND_BLOCKED; 1224 LSQ_DEBUG("marked stream-blocked at stream offset " 1225 "%"PRIu64, stream->blocked_off); 1226 } 1227 else 1228 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1229 " has been, or is about to be, sent", stream->blocked_off); 1230 } 1231 1232 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1233 && (cc = &stream->conn_pub->conn_cap, 1234 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1235 { 1236 if (cc->cc_blocked < cc->cc_max) 1237 { 1238 cc->cc_blocked = cc->cc_max; 1239 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1240 LSQ_DEBUG("marked connection-blocked at connection offset " 1241 "%"PRIu64, cc->cc_max); 1242 } 1243 else 1244 LSQ_DEBUG("stream has already been marked connection-blocked " 1245 "at offset %"PRIu64, cc->cc_blocked); 1246 } 1247} 1248 1249 1250void 1251lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1252{ 1253 assert(stream->stream_flags & STREAM_WANT_READ); 1254 1255 if (stream->stream_flags & STREAM_RW_ONCE) 1256 stream_dispatch_read_events_once(stream); 1257 else 1258 stream_dispatch_read_events_loop(stream); 1259} 1260 1261 1262void 1263lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1264{ 1265 int progress; 1266 uint64_t tosend_off; 1267 unsigned short n_buffered; 1268 enum stream_flags flags; 1269 1270 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1271 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1272 tosend_off = stream->tosend_off; 1273 n_buffered = stream->sm_n_buffered; 1274 1275 if (stream->stream_flags & STREAM_WANT_FLUSH) 1276 (void) stream_flush(stream); 1277 1278 if (stream->stream_flags & STREAM_RW_ONCE) 1279 { 1280 if ((stream->stream_flags & STREAM_WANT_WRITE) 1281 && lsquic_stream_write_avail(stream)) 1282 { 1283 stream->stream_if->on_write(stream, stream->st_ctx); 1284 } 1285 } 1286 else 1287 stream_dispatch_write_events_loop(stream); 1288 1289 /* Progress means either flags or offsets changed: */ 1290 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1291 stream->tosend_off == tosend_off && 1292 stream->sm_n_buffered == n_buffered); 1293 1294 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1295 { 1296 if (progress) 1297 { /* Move the stream to the end of the list to ensure fairness. */ 1298 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1299 next_write_stream); 1300 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1301 next_write_stream); 1302 } 1303 } 1304} 1305 1306 1307static size_t 1308inner_reader_empty_size (void *ctx) 1309{ 1310 return 0; 1311} 1312 1313 1314static size_t 1315inner_reader_empty_read (void *ctx, void *buf, size_t count) 1316{ 1317 return 0; 1318} 1319 1320 1321static int 1322stream_flush (lsquic_stream_t *stream) 1323{ 1324 struct lsquic_reader empty_reader; 1325 ssize_t nw; 1326 1327 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1328 assert(stream->sm_n_buffered > 0 || 1329 /* Flushing is also used to packetize standalone FIN: */ 1330 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1331 == STREAM_U_WRITE_DONE)); 1332 1333 empty_reader.lsqr_size = inner_reader_empty_size; 1334 empty_reader.lsqr_read = inner_reader_empty_read; 1335 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1336 nw = stream_write_to_packets(stream, &empty_reader, 0); 1337 1338 if (nw >= 0) 1339 { 1340 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1341 return 0; 1342 } 1343 else 1344 return -1; 1345} 1346 1347 1348static int 1349stream_flush_nocheck (lsquic_stream_t *stream) 1350{ 1351 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1352 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1353 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1354 1355 return stream_flush(stream); 1356} 1357 1358 1359int 1360lsquic_stream_flush (lsquic_stream_t *stream) 1361{ 1362 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1363 { 1364 LSQ_DEBUG("cannot flush closed stream"); 1365 errno = EBADF; 1366 return -1; 1367 } 1368 1369 if (0 == stream->sm_n_buffered) 1370 { 1371 LSQ_DEBUG("flushing 0 bytes: noop"); 1372 return 0; 1373 } 1374 1375 return stream_flush_nocheck(stream); 1376} 1377 1378 1379/* The flush threshold is the maximum size of stream data that can be sent 1380 * in a full packet. 1381 */ 1382#ifdef NDEBUG 1383static 1384#endif 1385 size_t 1386lsquic_stream_flush_threshold (const struct lsquic_stream *stream) 1387{ 1388 enum packet_out_flags flags; 1389 enum lsquic_packno_bits bits; 1390 unsigned packet_header_sz, stream_header_sz; 1391 size_t threshold; 1392 1393 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1394 flags = bits << POBIT_SHIFT; 1395 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1396 flags |= PO_CONN_ID; 1397 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 1398 flags |= PO_LONGHEAD; 1399 1400 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags); 1401 stream_header_sz = stream->conn_pub->lconn->cn_pf 1402 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1403 1404 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1405 - packet_header_sz - stream_header_sz; 1406 return threshold; 1407} 1408 1409 1410#define COMMON_WRITE_CHECKS() do { \ 1411 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1412 == STREAM_USE_HEADERS) \ 1413 { \ 1414 LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \ 1415 errno = EILSEQ; \ 1416 return -1; \ 1417 } \ 1418 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1419 { \ 1420 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1421 errno = ECONNRESET; \ 1422 return -1; \ 1423 } \ 1424 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1425 { \ 1426 LSQ_INFO("Attempt to write to stream after it was closed for " \ 1427 "writing"); \ 1428 errno = EBADF; \ 1429 return -1; \ 1430 } \ 1431} while (0) 1432 1433 1434struct frame_gen_ctx 1435{ 1436 lsquic_stream_t *fgc_stream; 1437 struct lsquic_reader *fgc_reader; 1438 /* We keep our own count of how many bytes were read from reader because 1439 * some readers are external. The external caller does not have to rely 1440 * on our count, but it can. 1441 */ 1442 size_t fgc_nread_from_reader; 1443}; 1444 1445 1446static size_t 1447frame_gen_size (void *ctx) 1448{ 1449 struct frame_gen_ctx *fg_ctx = ctx; 1450 size_t available, remaining; 1451 1452 /* Make sure we are not writing past available size: */ 1453 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1454 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1455 if (available < remaining) 1456 remaining = available; 1457 1458 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1459} 1460 1461 1462static int 1463frame_gen_fin (void *ctx) 1464{ 1465 struct frame_gen_ctx *fg_ctx = ctx; 1466 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1467 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1468 /* Do not use frame_gen_size() as it may chop the real size: */ 1469 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1470} 1471 1472 1473static void 1474incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1475{ 1476 if (stream->stream_flags & STREAM_CONN_LIMITED) 1477 { 1478 stream->conn_pub->conn_cap.cc_sent += incr; 1479 assert(stream->conn_pub->conn_cap.cc_sent 1480 <= stream->conn_pub->conn_cap.cc_max); 1481 } 1482} 1483 1484 1485static size_t 1486frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1487{ 1488 struct frame_gen_ctx *fg_ctx = ctx; 1489 unsigned char *p = begin_buf; 1490 unsigned char *const end = p + len; 1491 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1492 size_t n_written, available, n_to_write; 1493 1494 if (stream->sm_n_buffered > 0) 1495 { 1496 if (len <= stream->sm_n_buffered) 1497 { 1498 memcpy(p, stream->sm_buf, len); 1499 memmove(stream->sm_buf, stream->sm_buf + len, 1500 stream->sm_n_buffered - len); 1501 stream->sm_n_buffered -= len; 1502 stream->tosend_off += len; 1503 *fin = frame_gen_fin(fg_ctx); 1504 return len; 1505 } 1506 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1507 p += stream->sm_n_buffered; 1508 stream->sm_n_buffered = 0; 1509 } 1510 1511 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1512 n_to_write = end - p; 1513 if (n_to_write > available) 1514 n_to_write = available; 1515 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1516 n_to_write); 1517 p += n_written; 1518 fg_ctx->fgc_nread_from_reader += n_written; 1519 *fin = frame_gen_fin(fg_ctx); 1520 stream->tosend_off += p - (const unsigned char *) begin_buf; 1521 incr_conn_cap(stream, n_written); 1522 return p - (const unsigned char *) begin_buf; 1523} 1524 1525 1526static void 1527check_flush_threshold (lsquic_stream_t *stream) 1528{ 1529 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1530 stream->tosend_off >= stream->sm_flush_to) 1531 { 1532 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1533 stream->sm_flush_to); 1534 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1535 } 1536} 1537 1538 1539static struct lsquic_packet_out * 1540get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1541 const struct lsquic_stream *stream) 1542{ 1543 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1544} 1545 1546 1547static struct lsquic_packet_out * (* const get_packet[])( 1548 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1549{ 1550 lsquic_send_ctl_get_packet_for_stream, 1551 get_brand_new_packet, 1552}; 1553 1554 1555static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1556stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1557{ 1558 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1559 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1560 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1561 unsigned stream_header_sz, need_at_least, off; 1562 lsquic_packet_out_t *packet_out; 1563 int len, s, hsk; 1564 1565 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1566 stream->tosend_off); 1567 need_at_least = stream_header_sz + (size > 0); 1568 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1569 get_packet: 1570 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1571 if (!packet_out) 1572 return SWTP_STOP; 1573 if (hsk) 1574 packet_out->po_header_type = stream->tosend_off == 0 1575 ? HETY_INITIAL : HETY_HANDSHAKE; 1576 1577 off = packet_out->po_data_sz; 1578 len = pf->pf_gen_stream_frame( 1579 packet_out->po_data + packet_out->po_data_sz, 1580 lsquic_packet_out_avail(packet_out), stream->id, 1581 stream->tosend_off, 1582 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1583 if (len < 0) 1584 { 1585 if (-len > (int) need_at_least) 1586 { 1587 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 1588 "%u bytes, will try again", -len, need_at_least); 1589 need_at_least = -len; 1590 goto get_packet; 1591 } 1592 else 1593 { 1594 LSQ_ERROR("could not generate stream frame"); 1595 return SWTP_ERROR; 1596 } 1597 } 1598 1599 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1600 packet_out->po_data + packet_out->po_data_sz, len); 1601 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1602 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1603 if (0 == lsquic_packet_out_avail(packet_out)) 1604 packet_out->po_flags |= PO_STREAM_END; 1605 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1606 stream, QUIC_FRAME_STREAM, off, len); 1607 if (s != 0) 1608 { 1609 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1610 return SWTP_ERROR; 1611 } 1612 1613 check_flush_threshold(stream); 1614 1615 /* XXX: I don't like it that this is here */ 1616 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1617 { 1618 lsquic_packet_out_zero_pad(packet_out); 1619 packet_out->po_flags |= PO_HELLO; 1620 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1621 } 1622 1623 return SWTP_OK; 1624} 1625 1626 1627static void 1628abort_connection (struct lsquic_stream *stream) 1629{ 1630 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1631 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1632 next_service_stream); 1633 stream->stream_flags |= STREAM_ABORT_CONN; 1634 LSQ_WARN("connection will be aborted"); 1635 maybe_conn_to_tickable(stream); 1636} 1637 1638 1639static ssize_t 1640stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1641 size_t thresh) 1642{ 1643 size_t size; 1644 ssize_t nw; 1645 unsigned seen_ok; 1646 struct frame_gen_ctx fg_ctx = { 1647 .fgc_stream = stream, 1648 .fgc_reader = reader, 1649 .fgc_nread_from_reader = 0, 1650 }; 1651 1652 seen_ok = 0; 1653 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1654 || frame_gen_fin(&fg_ctx)) 1655 { 1656 switch (stream_write_to_packet(&fg_ctx, size)) 1657 { 1658 case SWTP_OK: 1659 if (!seen_ok++) 1660 maybe_conn_to_tickable_if_writeable(stream, 0); 1661 if (frame_gen_fin(&fg_ctx)) 1662 { 1663 stream->stream_flags |= STREAM_FIN_SENT; 1664 goto end; 1665 } 1666 else 1667 break; 1668 case SWTP_STOP: 1669 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1670 goto end; 1671 default: 1672 abort_connection(stream); 1673 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1674 return -1; 1675 } 1676 } 1677 1678 if (thresh) 1679 { 1680 assert(size < thresh); 1681 assert(size >= stream->sm_n_buffered); 1682 size -= stream->sm_n_buffered; 1683 if (size > 0) 1684 { 1685 nw = save_to_buffer(stream, reader, size); 1686 if (nw < 0) 1687 return -1; 1688 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1689 } 1690 } 1691 else 1692 { 1693 /* We count flushed data towards both stream and connection limits, 1694 * so we should have been able to packetize all of it: 1695 */ 1696 assert(0 == stream->sm_n_buffered); 1697 assert(size == 0); 1698 } 1699 1700 maybe_mark_as_blocked(stream); 1701 1702 end: 1703 return fg_ctx.fgc_nread_from_reader; 1704} 1705 1706 1707/* Perform an implicit flush when we hit connection limit while buffering 1708 * data. This is to prevent a (theoretical) stall: 1709 * 1710 * Imagine a number of streams, all of which buffered some data. The buffered 1711 * data is up to connection cap, which means no further writes are possible. 1712 * None of them flushes, which means that data is not sent and connection 1713 * WINDOW_UPDATE frame never arrives from peer. Stall. 1714 */ 1715static int 1716maybe_flush_stream (struct lsquic_stream *stream) 1717{ 1718 if (stream->sm_n_buffered > 0 1719 && (stream->stream_flags & STREAM_CONN_LIMITED) 1720 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1721 return stream_flush_nocheck(stream); 1722 else 1723 return 0; 1724} 1725 1726 1727static ssize_t 1728save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1729 size_t len) 1730{ 1731 size_t avail, n_written; 1732 1733 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1734 1735 if (!stream->sm_buf) 1736 { 1737 stream->sm_buf = malloc(SM_BUF_SIZE); 1738 if (!stream->sm_buf) 1739 return -1; 1740 } 1741 1742 avail = lsquic_stream_write_avail(stream); 1743 if (avail < len) 1744 len = avail; 1745 1746 n_written = reader->lsqr_read(reader->lsqr_ctx, 1747 stream->sm_buf + stream->sm_n_buffered, len); 1748 stream->sm_n_buffered += n_written; 1749 incr_conn_cap(stream, n_written); 1750 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1751 n_written, stream->sm_n_buffered); 1752 if (0 != maybe_flush_stream(stream)) 1753 return -1; 1754 return n_written; 1755} 1756 1757 1758static ssize_t 1759stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1760{ 1761 size_t thresh, len; 1762 1763 thresh = lsquic_stream_flush_threshold(stream); 1764 len = reader->lsqr_size(reader->lsqr_ctx); 1765 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1766 stream->sm_n_buffered + len < thresh) 1767 return save_to_buffer(stream, reader, len); 1768 else 1769 return stream_write_to_packets(stream, reader, thresh); 1770} 1771 1772 1773ssize_t 1774lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1775{ 1776 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1777 return lsquic_stream_writev(stream, &iov, 1); 1778} 1779 1780 1781struct inner_reader_iovec { 1782 const struct iovec *iov; 1783 const struct iovec *end; 1784 unsigned cur_iovec_off; 1785}; 1786 1787 1788static size_t 1789inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1790{ 1791 struct inner_reader_iovec *const iro = ctx; 1792 unsigned char *p = buf; 1793 unsigned char *const end = p + count; 1794 unsigned n_tocopy; 1795 1796 while (iro->iov < iro->end && p < end) 1797 { 1798 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1799 if (n_tocopy > (unsigned) (end - p)) 1800 n_tocopy = end - p; 1801 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1802 n_tocopy); 1803 p += n_tocopy; 1804 iro->cur_iovec_off += n_tocopy; 1805 if (iro->iov->iov_len == iro->cur_iovec_off) 1806 { 1807 ++iro->iov; 1808 iro->cur_iovec_off = 0; 1809 } 1810 } 1811 1812 return p + count - end; 1813} 1814 1815 1816static size_t 1817inner_reader_iovec_size (void *ctx) 1818{ 1819 struct inner_reader_iovec *const iro = ctx; 1820 const struct iovec *iov; 1821 size_t size; 1822 1823 size = 0; 1824 for (iov = iro->iov; iov < iro->end; ++iov) 1825 size += iov->iov_len; 1826 1827 return size - iro->cur_iovec_off; 1828} 1829 1830 1831ssize_t 1832lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1833 int iovcnt) 1834{ 1835 COMMON_WRITE_CHECKS(); 1836 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1837 1838 struct inner_reader_iovec iro = { 1839 .iov = iov, 1840 .end = iov + iovcnt, 1841 .cur_iovec_off = 0, 1842 }; 1843 struct lsquic_reader reader = { 1844 .lsqr_read = inner_reader_iovec_read, 1845 .lsqr_size = inner_reader_iovec_size, 1846 .lsqr_ctx = &iro, 1847 }; 1848 1849 return stream_write(stream, &reader); 1850} 1851 1852 1853ssize_t 1854lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1855{ 1856 COMMON_WRITE_CHECKS(); 1857 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1858 return stream_write(stream, reader); 1859} 1860 1861 1862int 1863lsquic_stream_send_headers (lsquic_stream_t *stream, 1864 const lsquic_http_headers_t *headers, int eos) 1865{ 1866 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1867 STREAM_U_WRITE_DONE)) 1868 == STREAM_USE_HEADERS) 1869 { 1870 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1871 stream->id, headers, eos, lsquic_stream_priority(stream)); 1872 if (0 == s) 1873 { 1874 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1875 stream->stream_flags |= STREAM_HEADERS_SENT; 1876 if (eos) 1877 stream->stream_flags |= STREAM_FIN_SENT; 1878 LSQ_INFO("sent headers for stream %u", stream->id); 1879 } 1880 else 1881 LSQ_WARN("could not send headers: %s", strerror(errno)); 1882 return s; 1883 } 1884 else 1885 { 1886 LSQ_INFO("cannot send headers for stream %u in this state", stream->id); 1887 errno = EBADMSG; 1888 return -1; 1889 } 1890} 1891 1892 1893void 1894lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1895{ 1896 if (offset > stream->max_send_off) 1897 { 1898 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1899 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1900 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1901 stream->max_send_off = offset; 1902 } 1903 else 1904 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1905 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1906 stream->max_send_off); 1907} 1908 1909 1910/* This function is used to update offsets after handshake completes and we 1911 * learn of peer's limits from the handshake values. 1912 */ 1913int 1914lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1915{ 1916 LSQ_DEBUG("setting max_send_off to %u", offset); 1917 if (offset > stream->max_send_off) 1918 { 1919 lsquic_stream_window_update(stream, offset); 1920 return 0; 1921 } 1922 else if (offset < stream->tosend_off) 1923 { 1924 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1925 "already sent on this stream (%"PRIu64" bytes)", offset, 1926 stream->tosend_off); 1927 return -1; 1928 } 1929 else 1930 { 1931 stream->max_send_off = offset; 1932 return 0; 1933 } 1934} 1935 1936 1937void 1938lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1939{ 1940 lsquic_stream_reset_ext(stream, error_code, 1); 1941} 1942 1943 1944void 1945lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1946 int do_close) 1947{ 1948 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1949 { 1950 LSQ_INFO("reset already sent"); 1951 return; 1952 } 1953 1954 SM_HISTORY_APPEND(stream, SHE_RESET); 1955 1956 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1957 stream->error_code = error_code; 1958 1959 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1960 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1961 next_send_stream); 1962 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1963 stream->stream_flags |= STREAM_SEND_RST; 1964 1965 drop_buffered_data(stream); 1966 maybe_elide_stream_frames(stream); 1967 maybe_schedule_call_on_close(stream); 1968 1969 if (do_close) 1970 lsquic_stream_close(stream); 1971 else 1972 maybe_conn_to_tickable_if_writeable(stream, 1); 1973} 1974 1975 1976unsigned 1977lsquic_stream_id (const lsquic_stream_t *stream) 1978{ 1979 return stream->id; 1980} 1981 1982 1983struct lsquic_conn * 1984lsquic_stream_conn (const lsquic_stream_t *stream) 1985{ 1986 return stream->conn_pub->lconn; 1987} 1988 1989 1990int 1991lsquic_stream_close (lsquic_stream_t *stream) 1992{ 1993 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1994 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1995 if (lsquic_stream_is_closed(stream)) 1996 { 1997 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1998 errno = EBADF; 1999 return -1; 2000 } 2001 stream_shutdown_write(stream); 2002 stream_shutdown_read(stream); 2003 maybe_schedule_call_on_close(stream); 2004 maybe_finish_stream(stream); 2005 maybe_conn_to_tickable_if_writeable(stream, 1); 2006 return 0; 2007} 2008 2009 2010#ifndef NDEBUG 2011#if __GNUC__ 2012__attribute__((weak)) 2013#endif 2014#endif 2015void 2016lsquic_stream_acked (lsquic_stream_t *stream) 2017{ 2018 assert(stream->n_unacked); 2019 --stream->n_unacked; 2020 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 2021 if (0 == stream->n_unacked) 2022 maybe_finish_stream(stream); 2023} 2024 2025 2026void 2027lsquic_stream_push_req (lsquic_stream_t *stream, 2028 struct uncompressed_headers *push_req) 2029{ 2030 assert(!stream->push_req); 2031 stream->push_req = push_req; 2032 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 2033} 2034 2035 2036int 2037lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2038{ 2039 return 1 & ~stream->id; 2040} 2041 2042 2043int 2044lsquic_stream_push_info (const lsquic_stream_t *stream, 2045 uint32_t *ref_stream_id, void **hset) 2046{ 2047 if (lsquic_stream_is_pushed(stream)) 2048 { 2049 assert(stream->push_req); 2050 *ref_stream_id = stream->push_req->uh_stream_id; 2051 *hset = stream->push_req->uh_hset; 2052 return 0; 2053 } 2054 else 2055 return -1; 2056} 2057 2058 2059int 2060lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2061{ 2062 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2063 { 2064 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2065 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2066 stream->stream_flags |= STREAM_HAVE_UH; 2067 if (uh->uh_flags & UH_FIN) 2068 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2069 stream->uh = uh; 2070 if (uh->uh_oth_stream_id == 0) 2071 { 2072 if (uh->uh_weight) 2073 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2074 } 2075 else 2076 LSQ_NOTICE("don't know how to depend on stream %u", 2077 uh->uh_oth_stream_id); 2078 return 0; 2079 } 2080 else 2081 { 2082 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2083 return -1; 2084 } 2085} 2086 2087 2088unsigned 2089lsquic_stream_priority (const lsquic_stream_t *stream) 2090{ 2091 return 256 - stream->sm_priority; 2092} 2093 2094 2095int 2096lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2097{ 2098 /* The user should never get a reference to the special streams, 2099 * but let's check just in case: 2100 */ 2101 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2102 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2103 LSQUIC_STREAM_HEADERS == stream->id)) 2104 return -1; 2105 if (priority < 1 || priority > 256) 2106 return -1; 2107 stream->sm_priority = 256 - priority; 2108 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2109 LSQ_DEBUG("set priority to %u", priority); 2110 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2111 return 0; 2112} 2113 2114 2115int 2116lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2117{ 2118 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2119 { 2120 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2121 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2122 { 2123 /* We need to send headers only if we are a) using HEADERS stream 2124 * and b) we already sent initial headers. If initial headers 2125 * have not been sent yet, stream priority will be sent in the 2126 * HEADERS frame. 2127 */ 2128 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2129 stream->id, 0, 0, priority); 2130 } 2131 else 2132 return 0; 2133 } 2134 else 2135 return -1; 2136} 2137 2138 2139lsquic_stream_ctx_t * 2140lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2141{ 2142 return stream->st_ctx; 2143} 2144 2145 2146int 2147lsquic_stream_refuse_push (lsquic_stream_t *stream) 2148{ 2149 if (lsquic_stream_is_pushed(stream) && 2150 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2151 { 2152 LSQ_DEBUG("refusing pushed stream: send reset"); 2153 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2154 return 0; 2155 } 2156 else 2157 return -1; 2158} 2159 2160 2161size_t 2162lsquic_stream_mem_used (const struct lsquic_stream *stream) 2163{ 2164 size_t size; 2165 2166 size = sizeof(stream); 2167 if (stream->sm_buf) 2168 size += SM_BUF_SIZE; 2169 if (stream->data_in) 2170 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2171 2172 return size; 2173} 2174 2175 2176lsquic_cid_t 2177lsquic_stream_cid (const struct lsquic_stream *stream) 2178{ 2179 return LSQUIC_LOG_CONN_ID; 2180} 2181 2182 2183void * 2184lsquic_stream_get_hset (struct lsquic_stream *stream) 2185{ 2186 void *hset; 2187 2188 if (stream->stream_flags & STREAM_RST_FLAGS) 2189 { 2190 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 2191 errno = ECONNRESET; 2192 return NULL; 2193 } 2194 2195 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2196 != (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2197 { 2198 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 2199 stream->stream_flags); 2200 return NULL; 2201 } 2202 2203 if (!stream->uh) 2204 { 2205 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 2206 return NULL; 2207 } 2208 2209 if (stream->uh->uh_flags & UH_H1H) 2210 { 2211 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 2212 return NULL; 2213 } 2214 2215 hset = stream->uh->uh_hset; 2216 stream->uh->uh_hset = NULL; 2217 destroy_uh(stream); 2218 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 2219 { 2220 stream->stream_flags |= STREAM_FIN_REACHED; 2221 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 2222 } 2223 LSQ_DEBUG("return header set"); 2224 return hset; 2225} 2226