lsquic_stream.c revision 483646eb
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_frame_reader.h" 44#include "lsquic_conn.h" 45#include "lsquic_data_in_if.h" 46#include "lsquic_parse.h" 47#include "lsquic_packet_out.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_senhist.h" 50#include "lsquic_pacer.h" 51#include "lsquic_cubic.h" 52#include "lsquic_send_ctl.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152 153# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 154# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 155 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 156 LSQ_DEBUG("history: [%.*s]", \ 157 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 158 (stream)->sm_hist_buf); \ 159 } while (0) 160#else 161# define SM_HISTORY_APPEND(stream, event) 162# define SM_HISTORY_DUMP_REMAINING(stream) 163#endif 164 165 166static int 167stream_inside_callback (const lsquic_stream_t *stream) 168{ 169 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 170} 171 172 173static void 174maybe_conn_to_tickable (lsquic_stream_t *stream) 175{ 176 if (!stream_inside_callback(stream)) 177 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 178 stream->conn_pub->lconn); 179} 180 181 182/* Here, "readable" means that the user is able to read from the stream. */ 183static void 184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 185{ 186 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 187 { 188 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 189 stream->conn_pub->lconn); 190 } 191} 192 193 194/* Here, "writeable" means that data can be put into packets to be 195 * scheduled to be sent out. 196 * 197 * If `check_can_send' is false, it means that we do not need to check 198 * whether packets can be sent. This check was already performed when 199 * we packetized stream data. 200 */ 201static void 202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 203 int check_can_send) 204{ 205 if (!stream_inside_callback(stream) && 206 (!check_can_send 207 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 208 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 209 { 210 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 211 stream->conn_pub->lconn); 212 } 213} 214 215 216static int 217stream_stalled (const lsquic_stream_t *stream) 218{ 219 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 220 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 221 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 222} 223 224 225/* TODO: The logic to figure out whether the stream is connection limited 226 * should be taken out of the constructor. The caller should specify this 227 * via one of enum stream_ctor_flags. 228 */ 229lsquic_stream_t * 230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 231 const struct lsquic_stream_if *stream_if, 232 void *stream_if_ctx, unsigned initial_window, 233 unsigned initial_send_off, 234 enum stream_ctor_flags ctor_flags) 235{ 236 lsquic_cfcw_t *cfcw; 237 lsquic_stream_t *stream; 238 239 stream = calloc(1, sizeof(*stream)); 240 if (!stream) 241 return NULL; 242 243 stream->stream_if = stream_if; 244 stream->id = id; 245 stream->conn_pub = conn_pub; 246 stream->sm_onnew_arg = stream_if_ctx; 247 if (!initial_window) 248 initial_window = 16 * 1024; 249 if (LSQUIC_STREAM_HANDSHAKE == id || 250 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 251 cfcw = NULL; 252 else 253 { 254 cfcw = &conn_pub->cfcw; 255 stream->stream_flags |= STREAM_CONN_LIMITED; 256 if (conn_pub->hs) 257 stream->stream_flags |= STREAM_USE_HEADERS; 258 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 259 } 260 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 261 if (!initial_send_off) 262 initial_send_off = 16 * 1024; 263 stream->max_send_off = initial_send_off; 264 if (ctor_flags & SCF_USE_DI_HASH) 265 stream->data_in = data_in_hash_new(conn_pub, id, 0); 266 else 267 stream->data_in = data_in_nocopy_new(conn_pub, id); 268 LSQ_DEBUG("created stream %u @%p", id, stream); 269 SM_HISTORY_APPEND(stream, SHE_CREATED); 270 if (ctor_flags & SCF_DI_AUTOSWITCH) 271 stream->stream_flags |= STREAM_AUTOSWITCH; 272 if (ctor_flags & SCF_CALL_ON_NEW) 273 lsquic_stream_call_on_new(stream); 274 if (ctor_flags & SCF_DISP_RW_ONCE) 275 stream->stream_flags |= STREAM_RW_ONCE; 276 if (ctor_flags & SCF_ALLOW_OVERLAP) 277 stream->stream_flags |= STREAM_ALLOW_OVERLAP; 278 return stream; 279} 280 281 282void 283lsquic_stream_call_on_new (lsquic_stream_t *stream) 284{ 285 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 286 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 287 { 288 LSQ_DEBUG("calling on_new_stream"); 289 SM_HISTORY_APPEND(stream, SHE_ONNEW); 290 stream->stream_flags |= STREAM_ONNEW_DONE; 291 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 292 stream); 293 } 294} 295 296 297static void 298decr_conn_cap (struct lsquic_stream *stream, size_t incr) 299{ 300 if (stream->stream_flags & STREAM_CONN_LIMITED) 301 { 302 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 303 stream->conn_pub->conn_cap.cc_sent -= incr; 304 } 305} 306 307 308static void 309drop_buffered_data (struct lsquic_stream *stream) 310{ 311 decr_conn_cap(stream, stream->sm_n_buffered); 312 stream->sm_n_buffered = 0; 313 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 314 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 315} 316 317 318void 319lsquic_stream_destroy (lsquic_stream_t *stream) 320{ 321 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 322 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 323 STREAM_ONNEW_DONE) 324 { 325 stream->stream_flags |= STREAM_ONCLOSE_DONE; 326 stream->stream_if->on_close(stream, stream->st_ctx); 327 } 328 if (stream->stream_flags & STREAM_SENDING_FLAGS) 329 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 330 if (stream->stream_flags & STREAM_WANT_READ) 331 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 332 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 333 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 334 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 335 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 336 drop_buffered_data(stream); 337 lsquic_sfcw_consume_rem(&stream->fc); 338 drop_frames_in(stream); 339 free(stream->push_req); 340 free(stream->uh); 341 free(stream->sm_buf); 342 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 343 SM_HISTORY_DUMP_REMAINING(stream); 344 free(stream); 345} 346 347 348static int 349stream_is_finished (const lsquic_stream_t *stream) 350{ 351 return lsquic_stream_is_closed(stream) 352 /* n_unacked checks that no outgoing packets that reference this 353 * stream are outstanding: 354 */ 355 && 0 == stream->n_unacked 356 /* This checks that no packets that reference this stream will 357 * become outstanding: 358 */ 359 && 0 == (stream->stream_flags & STREAM_SEND_RST) 360 && ((stream->stream_flags & STREAM_FORCE_FINISH) 361 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 362 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 363} 364 365 366static void 367maybe_finish_stream (lsquic_stream_t *stream) 368{ 369 if (0 == (stream->stream_flags & STREAM_FINISHED) && 370 stream_is_finished(stream)) 371 { 372 LSQ_DEBUG("stream %u is now finished", stream->id); 373 SM_HISTORY_APPEND(stream, SHE_FINISHED); 374 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 375 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 376 next_service_stream); 377 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 378 } 379} 380 381 382static void 383maybe_schedule_call_on_close (lsquic_stream_t *stream) 384{ 385 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 386 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 387 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 388 { 389 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 390 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 391 next_service_stream); 392 stream->stream_flags |= STREAM_CALL_ONCLOSE; 393 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 394 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 395 } 396} 397 398 399void 400lsquic_stream_call_on_close (lsquic_stream_t *stream) 401{ 402 assert(stream->stream_flags & STREAM_ONNEW_DONE); 403 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 404 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 405 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 406 next_service_stream); 407 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 408 { 409 LSQ_DEBUG("calling on_close for stream %u", stream->id); 410 stream->stream_flags |= STREAM_ONCLOSE_DONE; 411 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 412 stream->stream_if->on_close(stream, stream->st_ctx); 413 } 414 else 415 assert(0); 416} 417 418 419int 420lsquic_stream_readable (const lsquic_stream_t *stream) 421{ 422 /* A stream is readable if one of the following is true: */ 423 return 424 /* - It is already finished: in that case, lsquic_stream_read() will 425 * return 0. 426 */ 427 (stream->stream_flags & STREAM_FIN_REACHED) 428 /* - The stream is reset, by either side. In this case, 429 * lsquic_stream_read() will return -1 (we want the user to be 430 * able to collect the error). 431 */ 432 || (stream->stream_flags & STREAM_RST_FLAGS) 433 /* - Either we are not in HTTP mode or the HTTP headers have been 434 * received and the headers or data from the stream can be read. 435 */ 436 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 437 == STREAM_USE_HEADERS) 438 && (stream->uh != NULL 439 || stream->data_in->di_if->di_get_frame(stream->data_in, 440 stream->read_offset))) 441 ; 442} 443 444 445size_t 446lsquic_stream_write_avail (const struct lsquic_stream *stream) 447{ 448 uint64_t stream_avail, conn_avail; 449 450 stream_avail = stream->max_send_off - stream->tosend_off 451 - stream->sm_n_buffered; 452 if (stream->stream_flags & STREAM_CONN_LIMITED) 453 { 454 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 455 if (conn_avail < stream_avail) 456 return conn_avail; 457 } 458 459 return stream_avail; 460} 461 462 463int 464lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 465{ 466 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 467 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 468 { 469 return -1; 470 } 471 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 472 { 473 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 474 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 475 next_send_stream); 476 stream->stream_flags |= STREAM_SEND_WUF; 477 } 478 return 0; 479} 480 481 482int 483lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 484{ 485 uint64_t max_off; 486 int got_next_offset; 487 enum ins_frame ins_frame; 488 489 assert(frame->packet_in); 490 491 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 492 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 493 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 494 495 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 496 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 497 { 498 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 499 lsquic_malo_put(frame); 500 return -1; 501 } 502 503 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 504 insert_frame: 505 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 506 if (INS_FRAME_OK == ins_frame) 507 { 508 /* Update maximum offset in the flow controller and check for flow 509 * control violation: 510 */ 511 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 512 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 513 return -1; 514 if (frame->data_frame.df_fin) 515 { 516 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 517 stream->stream_flags |= STREAM_FIN_RECVD; 518 maybe_finish_stream(stream); 519 } 520 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 521 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 522 { 523 stream->data_in = stream->data_in->di_if->di_switch_impl( 524 stream->data_in, stream->read_offset); 525 if (!stream->data_in) 526 { 527 stream->data_in = data_in_error_new(); 528 return -1; 529 } 530 } 531 if (got_next_offset) 532 /* Checking the offset saves di_get_frame() call */ 533 maybe_conn_to_tickable_if_readable(stream); 534 return 0; 535 } 536 else if (INS_FRAME_DUP == ins_frame) 537 { 538 return 0; 539 } 540 else if (INS_FRAME_OVERLAP == ins_frame) 541 { 542 if (stream->stream_flags & STREAM_ALLOW_OVERLAP) 543 { 544 LSQ_DEBUG("overlap: switching DATA IN implementation"); 545 stream->data_in = stream->data_in->di_if->di_switch_impl( 546 stream->data_in, stream->read_offset); 547 if (stream->data_in) 548 goto insert_frame; 549 stream->data_in = data_in_error_new(); 550 } 551 else 552 LSQ_DEBUG("overlap not supported"); 553 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 554 lsquic_malo_put(frame); 555 return -1; 556 } 557 else 558 { 559 assert(INS_FRAME_ERR == ins_frame); 560 return -1; 561 } 562} 563 564 565static void 566drop_frames_in (lsquic_stream_t *stream) 567{ 568 if (stream->data_in) 569 { 570 stream->data_in->di_if->di_destroy(stream->data_in); 571 /* To avoid checking whether `data_in` is set, just set to the error 572 * data-in stream. It does the right thing after incoming data is 573 * dropped. 574 */ 575 stream->data_in = data_in_error_new(); 576 } 577} 578 579 580static void 581maybe_elide_stream_frames (struct lsquic_stream *stream) 582{ 583 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 584 { 585 if (stream->n_unacked) 586 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 587 stream->id); 588 stream->stream_flags |= STREAM_FRAMES_ELIDED; 589 } 590} 591 592 593int 594lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 595 uint32_t error_code) 596{ 597 598 if (stream->stream_flags & STREAM_RST_RECVD) 599 { 600 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 601 return 0; 602 } 603 604 SM_HISTORY_APPEND(stream, SHE_RST_IN); 605 /* This flag must always be set, even if we are "ignoring" it: it is 606 * used by elision code. 607 */ 608 stream->stream_flags |= STREAM_RST_RECVD; 609 610 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 611 { 612 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 613 "smaller than that of byte following the last byte we have seen: " 614 "0x%"PRIX64, stream->id, offset, 615 lsquic_sfcw_get_max_recv_off(&stream->fc)); 616 return -1; 617 } 618 619 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 620 { 621 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 622 " violates flow control", stream->id, offset); 623 return -1; 624 } 625 626 /* Let user collect error: */ 627 maybe_conn_to_tickable_if_readable(stream); 628 629 lsquic_sfcw_consume_rem(&stream->fc); 630 drop_frames_in(stream); 631 drop_buffered_data(stream); 632 maybe_elide_stream_frames(stream); 633 634 if (!(stream->stream_flags & 635 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 636 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 637 638 stream->stream_flags |= STREAM_RST_RECVD; 639 640 maybe_finish_stream(stream); 641 maybe_schedule_call_on_close(stream); 642 643 return 0; 644} 645 646 647uint64_t 648lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 649{ 650 assert(stream->stream_flags & STREAM_SEND_WUF); 651 stream->stream_flags &= ~STREAM_SEND_WUF; 652 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 653 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 654 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 655} 656 657 658void 659lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 660{ 661 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 662 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 663 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 664 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 665 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 666} 667 668 669void 670lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 671{ 672 assert(stream->stream_flags & STREAM_SEND_RST); 673 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 674 stream->stream_flags &= ~STREAM_SEND_RST; 675 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 676 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 677 stream->stream_flags |= STREAM_RST_SENT; 678 maybe_finish_stream(stream); 679} 680 681 682static size_t 683read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 684{ 685 struct uncompressed_headers *uh = stream->uh; 686 size_t n_avail = uh->uh_size - uh->uh_off; 687 if (n_avail < len) 688 len = n_avail; 689 memcpy(dst, uh->uh_headers + uh->uh_off, len); 690 uh->uh_off += len; 691 if (uh->uh_off == uh->uh_size) 692 { 693 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 694 free(uh); 695 stream->uh = NULL; 696 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 697 { 698 stream->stream_flags |= STREAM_FIN_REACHED; 699 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 700 } 701 } 702 return len; 703} 704 705 706/* This function returns 0 when EOF is reached. 707 */ 708ssize_t 709lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 710 int iovcnt) 711{ 712 size_t total_nread, nread; 713 int processed_frames, read_unc_headers, iovidx; 714 unsigned char *p, *end; 715 716 SM_HISTORY_APPEND(stream, SHE_USER_READ); 717 718#define NEXT_IOV() do { \ 719 ++iovidx; \ 720 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 721 ++iovidx; \ 722 if (iovidx < iovcnt) \ 723 { \ 724 p = iov[iovidx].iov_base; \ 725 end = p + iov[iovidx].iov_len; \ 726 } \ 727 else \ 728 p = end = NULL; \ 729} while (0) 730 731#define AVAIL() (end - p) 732 733 if (stream->stream_flags & STREAM_RST_FLAGS) 734 { 735 errno = ECONNRESET; 736 return -1; 737 } 738 if (stream->stream_flags & STREAM_U_READ_DONE) 739 { 740 errno = EBADF; 741 return -1; 742 } 743 if (stream->stream_flags & STREAM_FIN_REACHED) 744 return 0; 745 746 total_nread = 0; 747 processed_frames = 0; 748 749 iovidx = -1; 750 NEXT_IOV(); 751 752 if (stream->uh && AVAIL()) 753 { 754 read_unc_headers = 1; 755 do 756 { 757 nread = read_uh(stream, p, AVAIL()); 758 p += nread; 759 total_nread += nread; 760 if (p == end) 761 NEXT_IOV(); 762 } 763 while (stream->uh && AVAIL()); 764 } 765 else 766 read_unc_headers = 0; 767 768 struct data_frame *data_frame; 769 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 770 { 771 ++processed_frames; 772 size_t navail = data_frame->df_size - data_frame->df_read_off; 773 size_t ntowrite = AVAIL(); 774 if (navail < ntowrite) 775 ntowrite = navail; 776 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 777 p += ntowrite; 778 data_frame->df_read_off += ntowrite; 779 stream->read_offset += ntowrite; 780 total_nread += ntowrite; 781 if (data_frame->df_read_off == data_frame->df_size) 782 { 783 const int fin = data_frame->df_fin; 784 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 785 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 786 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 787 { 788 stream->data_in = stream->data_in->di_if->di_switch_impl( 789 stream->data_in, stream->read_offset); 790 if (!stream->data_in) 791 { 792 stream->data_in = data_in_error_new(); 793 return -1; 794 } 795 } 796 if (fin) 797 { 798 stream->stream_flags |= STREAM_FIN_REACHED; 799 break; 800 } 801 } 802 if (p == end) 803 NEXT_IOV(); 804 } 805 806 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 807 total_nread, stream->read_offset); 808 809 if (processed_frames) 810 { 811 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 812 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 813 { 814 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 815 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 816 stream->stream_flags |= STREAM_SEND_WUF; 817 maybe_conn_to_tickable_if_writeable(stream, 1); 818 } 819 } 820 821 if (processed_frames || read_unc_headers) 822 { 823 return total_nread; 824 } 825 else 826 { 827 assert(0 == total_nread); 828 errno = EWOULDBLOCK; 829 return -1; 830 } 831} 832 833 834ssize_t 835lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 836{ 837 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 838 return lsquic_stream_readv(stream, &iov, 1); 839} 840 841 842static void 843stream_shutdown_read (lsquic_stream_t *stream) 844{ 845 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 846 { 847 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 848 stream->stream_flags |= STREAM_U_READ_DONE; 849 stream_wantread(stream, 0); 850 maybe_finish_stream(stream); 851 } 852} 853 854 855static void 856stream_shutdown_write (lsquic_stream_t *stream) 857{ 858 if (stream->stream_flags & STREAM_U_WRITE_DONE) 859 return; 860 861 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 862 stream->stream_flags |= STREAM_U_WRITE_DONE; 863 stream_wantwrite(stream, 0); 864 865 /* Don't bother to check whether there is anything else to write if 866 * the flags indicate that nothing else should be written. 867 */ 868 if (!(stream->stream_flags & 869 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 870 { 871 if (stream->sm_n_buffered == 0) 872 { 873 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 874 stream)) 875 { 876 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 877 stream->stream_flags |= STREAM_FIN_SENT; 878 } 879 else 880 { 881 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 882 "flag in it"); 883 (void) stream_flush_nocheck(stream); 884 } 885 } 886 else 887 (void) stream_flush_nocheck(stream); 888 } 889} 890 891 892int 893lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 894{ 895 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 896 if (lsquic_stream_is_closed(stream)) 897 { 898 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 899 errno = EBADF; 900 return -1; 901 } 902 /* 0: read, 1: write: 2: read and write 903 */ 904 if (how < 0 || how > 2) 905 { 906 errno = EINVAL; 907 return -1; 908 } 909 910 if (how) 911 stream_shutdown_write(stream); 912 if (how != 1) 913 stream_shutdown_read(stream); 914 915 maybe_finish_stream(stream); 916 maybe_schedule_call_on_close(stream); 917 if (how) 918 maybe_conn_to_tickable_if_writeable(stream, 1); 919 920 return 0; 921} 922 923 924void 925lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 926{ 927 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 928 if (LSQUIC_STREAM_HANDSHAKE == stream->id 929 || ((stream->stream_flags & STREAM_USE_HEADERS) && 930 LSQUIC_STREAM_HEADERS == stream->id)) 931 { 932 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 933 stream->stream_flags |= STREAM_FORCE_FINISH; 934 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 935 } 936 maybe_finish_stream(stream); 937 maybe_schedule_call_on_close(stream); 938} 939 940 941static void 942fake_reset_unused_stream (lsquic_stream_t *stream) 943{ 944 stream->stream_flags |= 945 STREAM_RST_RECVD /* User will pick this up on read or write */ 946 | STREAM_RST_SENT /* Don't send anything else on this stream */ 947 ; 948 949 /* Cancel all writes to the network scheduled for this stream: */ 950 if (stream->stream_flags & STREAM_SENDING_FLAGS) 951 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 952 next_send_stream); 953 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 954 955 LSQ_DEBUG("fake-reset stream %u%s", 956 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 957 maybe_finish_stream(stream); 958 maybe_schedule_call_on_close(stream); 959} 960 961 962/* This function should only be called for locally-initiated streams whose ID 963 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 964 * frame sent by peer but we have not yet received it and created a stream. 965 * In this situation, we mark the stream as reset, so that user's on_read or 966 * on_write event callback picks up the error. That, in turn, should result 967 * in stream being closed. 968 * 969 * If we have received any data frames on this stream, this probably indicates 970 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 971 * lower than this. However, we still try to handle it gracefully and peform 972 * a shutdown, as if the stream was not reset. 973 */ 974void 975lsquic_stream_received_goaway (lsquic_stream_t *stream) 976{ 977 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 978 if (0 == stream->read_offset && 979 stream->data_in->di_if->di_empty(stream->data_in)) 980 fake_reset_unused_stream(stream); /* Normal condition */ 981 else 982 { /* This is odd, let's handle it the best we can: */ 983 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 984 lsquic_stream_shutdown_internal(stream); 985 } 986} 987 988 989uint64_t 990lsquic_stream_read_offset (const lsquic_stream_t *stream) 991{ 992 return stream->read_offset; 993} 994 995 996static int 997stream_wantread (lsquic_stream_t *stream, int is_want) 998{ 999 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 1000 const int new_val = !!is_want; 1001 if (old_val != new_val) 1002 { 1003 if (new_val) 1004 { 1005 if (!old_val) 1006 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1007 next_read_stream); 1008 stream->stream_flags |= STREAM_WANT_READ; 1009 } 1010 else 1011 { 1012 stream->stream_flags &= ~STREAM_WANT_READ; 1013 if (old_val) 1014 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1015 next_read_stream); 1016 } 1017 } 1018 return old_val; 1019} 1020 1021 1022static void 1023maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1024{ 1025 assert(STREAM_WRITE_Q_FLAGS & flag); 1026 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1027 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1028 next_write_stream); 1029 stream->stream_flags |= flag; 1030} 1031 1032 1033static void 1034maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1035{ 1036 assert(STREAM_WRITE_Q_FLAGS & flag); 1037 if (stream->stream_flags & flag) 1038 { 1039 stream->stream_flags &= ~flag; 1040 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1041 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1042 next_write_stream); 1043 } 1044} 1045 1046 1047static int 1048stream_wantwrite (lsquic_stream_t *stream, int is_want) 1049{ 1050 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1051 const int new_val = !!is_want; 1052 if (old_val != new_val) 1053 { 1054 if (new_val) 1055 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1056 else 1057 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1058 } 1059 return old_val; 1060} 1061 1062 1063int 1064lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1065{ 1066 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1067 { 1068 if (is_want) 1069 maybe_conn_to_tickable_if_readable(stream); 1070 return stream_wantread(stream, is_want); 1071 } 1072 else 1073 { 1074 errno = EBADF; 1075 return -1; 1076 } 1077} 1078 1079 1080int 1081lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1082{ 1083 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1084 { 1085 if (is_want) 1086 maybe_conn_to_tickable_if_writeable(stream, 1); 1087 return stream_wantwrite(stream, is_want); 1088 } 1089 else 1090 { 1091 errno = EBADF; 1092 return -1; 1093 } 1094} 1095 1096 1097#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1098 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1099 1100 1101static void 1102stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1103{ 1104 unsigned no_progress_count, no_progress_limit; 1105 enum stream_flags flags; 1106 uint64_t size; 1107 1108 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1109 1110 no_progress_count = 0; 1111 while ((stream->stream_flags & STREAM_WANT_READ) 1112 && lsquic_stream_readable(stream)) 1113 { 1114 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1115 size = stream->read_offset; 1116 1117 stream->stream_if->on_read(stream, stream->st_ctx); 1118 1119 if (no_progress_limit && size == stream->read_offset && 1120 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1121 { 1122 ++no_progress_count; 1123 if (no_progress_count >= no_progress_limit) 1124 { 1125 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1126 "progress) in user code reading from stream", 1127 no_progress_count, 1128 no_progress_count == 1 ? "" : "s"); 1129 break; 1130 } 1131 } 1132 else 1133 no_progress_count = 0; 1134 } 1135} 1136 1137 1138static void 1139stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1140{ 1141 unsigned no_progress_count, no_progress_limit; 1142 enum stream_flags flags; 1143 1144 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1145 1146 no_progress_count = 0; 1147 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1148 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1149 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1150 && lsquic_stream_write_avail(stream)) 1151 { 1152 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1153 1154 stream->stream_if->on_write(stream, stream->st_ctx); 1155 1156 if (no_progress_limit && 1157 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1158 { 1159 ++no_progress_count; 1160 if (no_progress_count >= no_progress_limit) 1161 { 1162 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1163 "progress) in user code writing to stream", 1164 no_progress_count, 1165 no_progress_count == 1 ? "" : "s"); 1166 break; 1167 } 1168 } 1169 else 1170 no_progress_count = 0; 1171 } 1172} 1173 1174 1175static void 1176stream_dispatch_read_events_once (lsquic_stream_t *stream) 1177{ 1178 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1179 { 1180 stream->stream_if->on_read(stream, stream->st_ctx); 1181 } 1182} 1183 1184 1185static void 1186maybe_mark_as_blocked (lsquic_stream_t *stream) 1187{ 1188 struct lsquic_conn_cap *cc; 1189 1190 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1191 { 1192 if (stream->blocked_off < stream->max_send_off) 1193 { 1194 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1195 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1196 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1197 next_send_stream); 1198 stream->stream_flags |= STREAM_SEND_BLOCKED; 1199 LSQ_DEBUG("marked stream-blocked at stream offset " 1200 "%"PRIu64, stream->blocked_off); 1201 } 1202 else 1203 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1204 " has been, or is about to be, sent", stream->blocked_off); 1205 } 1206 1207 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1208 && (cc = &stream->conn_pub->conn_cap, 1209 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1210 { 1211 if (cc->cc_blocked < cc->cc_max) 1212 { 1213 cc->cc_blocked = cc->cc_max; 1214 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1215 LSQ_DEBUG("marked connection-blocked at connection offset " 1216 "%"PRIu64, cc->cc_max); 1217 } 1218 else 1219 LSQ_DEBUG("stream has already been marked connection-blocked " 1220 "at offset %"PRIu64, cc->cc_blocked); 1221 } 1222} 1223 1224 1225void 1226lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1227{ 1228 assert(stream->stream_flags & STREAM_WANT_READ); 1229 1230 if (stream->stream_flags & STREAM_RW_ONCE) 1231 stream_dispatch_read_events_once(stream); 1232 else 1233 stream_dispatch_read_events_loop(stream); 1234} 1235 1236 1237void 1238lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1239{ 1240 int progress; 1241 uint64_t tosend_off; 1242 unsigned short n_buffered; 1243 enum stream_flags flags; 1244 1245 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1246 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1247 tosend_off = stream->tosend_off; 1248 n_buffered = stream->sm_n_buffered; 1249 1250 if (stream->stream_flags & STREAM_WANT_FLUSH) 1251 (void) stream_flush(stream); 1252 1253 if (stream->stream_flags & STREAM_RW_ONCE) 1254 { 1255 if ((stream->stream_flags & STREAM_WANT_WRITE) 1256 && lsquic_stream_write_avail(stream)) 1257 { 1258 stream->stream_if->on_write(stream, stream->st_ctx); 1259 } 1260 } 1261 else 1262 stream_dispatch_write_events_loop(stream); 1263 1264 /* Progress means either flags or offsets changed: */ 1265 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1266 stream->tosend_off == tosend_off && 1267 stream->sm_n_buffered == n_buffered); 1268 1269 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1270 { 1271 if (progress) 1272 { /* Move the stream to the end of the list to ensure fairness. */ 1273 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1274 next_write_stream); 1275 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1276 next_write_stream); 1277 } 1278 } 1279} 1280 1281 1282static size_t 1283inner_reader_empty_size (void *ctx) 1284{ 1285 return 0; 1286} 1287 1288 1289static size_t 1290inner_reader_empty_read (void *ctx, void *buf, size_t count) 1291{ 1292 return 0; 1293} 1294 1295 1296static int 1297stream_flush (lsquic_stream_t *stream) 1298{ 1299 struct lsquic_reader empty_reader; 1300 ssize_t nw; 1301 1302 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1303 assert(stream->sm_n_buffered > 0 || 1304 /* Flushing is also used to packetize standalone FIN: */ 1305 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1306 == STREAM_U_WRITE_DONE)); 1307 1308 empty_reader.lsqr_size = inner_reader_empty_size; 1309 empty_reader.lsqr_read = inner_reader_empty_read; 1310 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1311 nw = stream_write_to_packets(stream, &empty_reader, 0); 1312 1313 if (nw >= 0) 1314 { 1315 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1316 return 0; 1317 } 1318 else 1319 return -1; 1320} 1321 1322 1323static int 1324stream_flush_nocheck (lsquic_stream_t *stream) 1325{ 1326 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1327 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1328 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1329 1330 return stream_flush(stream); 1331} 1332 1333 1334int 1335lsquic_stream_flush (lsquic_stream_t *stream) 1336{ 1337 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1338 { 1339 LSQ_DEBUG("cannot flush closed stream"); 1340 errno = EBADF; 1341 return -1; 1342 } 1343 1344 if (0 == stream->sm_n_buffered) 1345 { 1346 LSQ_DEBUG("flushing 0 bytes: noop"); 1347 return 0; 1348 } 1349 1350 return stream_flush_nocheck(stream); 1351} 1352 1353 1354/* The flush threshold is the maximum size of stream data that can be sent 1355 * in a full packet. 1356 */ 1357#ifdef NDEBUG 1358static 1359#endif 1360 size_t 1361lsquic_stream_flush_threshold (const struct lsquic_stream *stream) 1362{ 1363 enum packet_out_flags flags; 1364 enum lsquic_packno_bits bits; 1365 unsigned packet_header_sz, stream_header_sz; 1366 size_t threshold; 1367 1368 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1369 flags = bits << POBIT_SHIFT; 1370 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1371 flags |= PO_CONN_ID; 1372 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 1373 flags |= PO_LONGHEAD; 1374 1375 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags); 1376 stream_header_sz = stream->conn_pub->lconn->cn_pf 1377 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1378 1379 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1380 - packet_header_sz - stream_header_sz; 1381 return threshold; 1382} 1383 1384 1385#define COMMON_WRITE_CHECKS() do { \ 1386 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1387 == STREAM_USE_HEADERS) \ 1388 { \ 1389 LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \ 1390 errno = EILSEQ; \ 1391 return -1; \ 1392 } \ 1393 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1394 { \ 1395 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1396 errno = ECONNRESET; \ 1397 return -1; \ 1398 } \ 1399 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1400 { \ 1401 LSQ_INFO("Attempt to write to stream after it was closed for " \ 1402 "writing"); \ 1403 errno = EBADF; \ 1404 return -1; \ 1405 } \ 1406} while (0) 1407 1408 1409struct frame_gen_ctx 1410{ 1411 lsquic_stream_t *fgc_stream; 1412 struct lsquic_reader *fgc_reader; 1413 /* We keep our own count of how many bytes were read from reader because 1414 * some readers are external. The external caller does not have to rely 1415 * on our count, but it can. 1416 */ 1417 size_t fgc_nread_from_reader; 1418}; 1419 1420 1421static size_t 1422frame_gen_size (void *ctx) 1423{ 1424 struct frame_gen_ctx *fg_ctx = ctx; 1425 size_t available, remaining; 1426 1427 /* Make sure we are not writing past available size: */ 1428 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1429 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1430 if (available < remaining) 1431 remaining = available; 1432 1433 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1434} 1435 1436 1437static int 1438frame_gen_fin (void *ctx) 1439{ 1440 struct frame_gen_ctx *fg_ctx = ctx; 1441 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1442 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1443 /* Do not use frame_gen_size() as it may chop the real size: */ 1444 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1445} 1446 1447 1448static void 1449incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1450{ 1451 if (stream->stream_flags & STREAM_CONN_LIMITED) 1452 { 1453 stream->conn_pub->conn_cap.cc_sent += incr; 1454 assert(stream->conn_pub->conn_cap.cc_sent 1455 <= stream->conn_pub->conn_cap.cc_max); 1456 } 1457} 1458 1459 1460static size_t 1461frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1462{ 1463 struct frame_gen_ctx *fg_ctx = ctx; 1464 unsigned char *p = begin_buf; 1465 unsigned char *const end = p + len; 1466 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1467 size_t n_written, available, n_to_write; 1468 1469 if (stream->sm_n_buffered > 0) 1470 { 1471 if (len <= stream->sm_n_buffered) 1472 { 1473 memcpy(p, stream->sm_buf, len); 1474 memmove(stream->sm_buf, stream->sm_buf + len, 1475 stream->sm_n_buffered - len); 1476 stream->sm_n_buffered -= len; 1477 stream->tosend_off += len; 1478 *fin = frame_gen_fin(fg_ctx); 1479 return len; 1480 } 1481 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1482 p += stream->sm_n_buffered; 1483 stream->sm_n_buffered = 0; 1484 } 1485 1486 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1487 n_to_write = end - p; 1488 if (n_to_write > available) 1489 n_to_write = available; 1490 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1491 n_to_write); 1492 p += n_written; 1493 fg_ctx->fgc_nread_from_reader += n_written; 1494 *fin = frame_gen_fin(fg_ctx); 1495 stream->tosend_off += p - (const unsigned char *) begin_buf; 1496 incr_conn_cap(stream, n_written); 1497 return p - (const unsigned char *) begin_buf; 1498} 1499 1500 1501static void 1502check_flush_threshold (lsquic_stream_t *stream) 1503{ 1504 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1505 stream->tosend_off >= stream->sm_flush_to) 1506 { 1507 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1508 stream->sm_flush_to); 1509 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1510 } 1511} 1512 1513 1514static struct lsquic_packet_out * 1515get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1516 const struct lsquic_stream *stream) 1517{ 1518 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1519} 1520 1521 1522static struct lsquic_packet_out * (* const get_packet[])( 1523 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1524{ 1525 lsquic_send_ctl_get_packet_for_stream, 1526 get_brand_new_packet, 1527}; 1528 1529 1530static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1531stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1532{ 1533 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1534 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1535 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1536 unsigned stream_header_sz, need_at_least, off; 1537 lsquic_packet_out_t *packet_out; 1538 int len, s, hsk; 1539 1540 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1541 stream->tosend_off); 1542 need_at_least = stream_header_sz + (size > 0); 1543 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1544 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1545 if (!packet_out) 1546 return SWTP_STOP; 1547 if (hsk) 1548 packet_out->po_header_type = stream->tosend_off == 0 1549 ? HETY_INITIAL : HETY_HANDSHAKE; 1550 1551 off = packet_out->po_data_sz; 1552 len = pf->pf_gen_stream_frame( 1553 packet_out->po_data + packet_out->po_data_sz, 1554 lsquic_packet_out_avail(packet_out), stream->id, 1555 stream->tosend_off, 1556 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1557 if (len < 0) 1558 { 1559 LSQ_ERROR("could not generate stream frame"); 1560 return SWTP_ERROR; 1561 } 1562 1563 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1564 packet_out->po_data + packet_out->po_data_sz, len); 1565 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1566 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1567 if (0 == lsquic_packet_out_avail(packet_out)) 1568 packet_out->po_flags |= PO_STREAM_END; 1569 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1570 stream, QUIC_FRAME_STREAM, off, len); 1571 if (s != 0) 1572 { 1573 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1574 return SWTP_ERROR; 1575 } 1576 1577 check_flush_threshold(stream); 1578 1579 /* XXX: I don't like it that this is here */ 1580 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1581 { 1582 lsquic_packet_out_zero_pad(packet_out); 1583 packet_out->po_flags |= PO_HELLO; 1584 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1585 } 1586 1587 return SWTP_OK; 1588} 1589 1590 1591static void 1592abort_connection (struct lsquic_stream *stream) 1593{ 1594 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1595 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1596 next_service_stream); 1597 stream->stream_flags |= STREAM_ABORT_CONN; 1598 LSQ_WARN("connection will be aborted"); 1599 maybe_conn_to_tickable(stream); 1600} 1601 1602 1603static ssize_t 1604stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1605 size_t thresh) 1606{ 1607 size_t size; 1608 ssize_t nw; 1609 unsigned seen_ok; 1610 struct frame_gen_ctx fg_ctx = { 1611 .fgc_stream = stream, 1612 .fgc_reader = reader, 1613 .fgc_nread_from_reader = 0, 1614 }; 1615 1616 seen_ok = 0; 1617 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1618 || frame_gen_fin(&fg_ctx)) 1619 { 1620 switch (stream_write_to_packet(&fg_ctx, size)) 1621 { 1622 case SWTP_OK: 1623 if (!seen_ok++) 1624 maybe_conn_to_tickable_if_writeable(stream, 0); 1625 if (frame_gen_fin(&fg_ctx)) 1626 { 1627 stream->stream_flags |= STREAM_FIN_SENT; 1628 goto end; 1629 } 1630 else 1631 break; 1632 case SWTP_STOP: 1633 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1634 goto end; 1635 default: 1636 abort_connection(stream); 1637 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1638 return -1; 1639 } 1640 } 1641 1642 if (thresh) 1643 { 1644 assert(size < thresh); 1645 assert(size >= stream->sm_n_buffered); 1646 size -= stream->sm_n_buffered; 1647 if (size > 0) 1648 { 1649 nw = save_to_buffer(stream, reader, size); 1650 if (nw < 0) 1651 return -1; 1652 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1653 } 1654 } 1655 else 1656 { 1657 /* We count flushed data towards both stream and connection limits, 1658 * so we should have been able to packetize all of it: 1659 */ 1660 assert(0 == stream->sm_n_buffered); 1661 assert(size == 0); 1662 } 1663 1664 maybe_mark_as_blocked(stream); 1665 1666 end: 1667 return fg_ctx.fgc_nread_from_reader; 1668} 1669 1670 1671/* Perform an implicit flush when we hit connection limit while buffering 1672 * data. This is to prevent a (theoretical) stall: 1673 * 1674 * Imagine a number of streams, all of which buffered some data. The buffered 1675 * data is up to connection cap, which means no further writes are possible. 1676 * None of them flushes, which means that data is not sent and connection 1677 * WINDOW_UPDATE frame never arrives from peer. Stall. 1678 */ 1679static int 1680maybe_flush_stream (struct lsquic_stream *stream) 1681{ 1682 if (stream->sm_n_buffered > 0 1683 && (stream->stream_flags & STREAM_CONN_LIMITED) 1684 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1685 return stream_flush_nocheck(stream); 1686 else 1687 return 0; 1688} 1689 1690 1691static ssize_t 1692save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1693 size_t len) 1694{ 1695 size_t avail, n_written; 1696 1697 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1698 1699 if (!stream->sm_buf) 1700 { 1701 stream->sm_buf = malloc(SM_BUF_SIZE); 1702 if (!stream->sm_buf) 1703 return -1; 1704 } 1705 1706 avail = lsquic_stream_write_avail(stream); 1707 if (avail < len) 1708 len = avail; 1709 1710 n_written = reader->lsqr_read(reader->lsqr_ctx, 1711 stream->sm_buf + stream->sm_n_buffered, len); 1712 stream->sm_n_buffered += n_written; 1713 incr_conn_cap(stream, n_written); 1714 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1715 n_written, stream->sm_n_buffered); 1716 if (0 != maybe_flush_stream(stream)) 1717 return -1; 1718 return n_written; 1719} 1720 1721 1722static ssize_t 1723stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1724{ 1725 size_t thresh, len; 1726 1727 thresh = lsquic_stream_flush_threshold(stream); 1728 len = reader->lsqr_size(reader->lsqr_ctx); 1729 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1730 stream->sm_n_buffered + len < thresh) 1731 return save_to_buffer(stream, reader, len); 1732 else 1733 return stream_write_to_packets(stream, reader, thresh); 1734} 1735 1736 1737ssize_t 1738lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1739{ 1740 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1741 return lsquic_stream_writev(stream, &iov, 1); 1742} 1743 1744 1745struct inner_reader_iovec { 1746 const struct iovec *iov; 1747 const struct iovec *end; 1748 unsigned cur_iovec_off; 1749}; 1750 1751 1752static size_t 1753inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1754{ 1755 struct inner_reader_iovec *const iro = ctx; 1756 unsigned char *p = buf; 1757 unsigned char *const end = p + count; 1758 unsigned n_tocopy; 1759 1760 while (iro->iov < iro->end && p < end) 1761 { 1762 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1763 if (n_tocopy > (unsigned) (end - p)) 1764 n_tocopy = end - p; 1765 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1766 n_tocopy); 1767 p += n_tocopy; 1768 iro->cur_iovec_off += n_tocopy; 1769 if (iro->iov->iov_len == iro->cur_iovec_off) 1770 { 1771 ++iro->iov; 1772 iro->cur_iovec_off = 0; 1773 } 1774 } 1775 1776 return p + count - end; 1777} 1778 1779 1780static size_t 1781inner_reader_iovec_size (void *ctx) 1782{ 1783 struct inner_reader_iovec *const iro = ctx; 1784 const struct iovec *iov; 1785 size_t size; 1786 1787 size = 0; 1788 for (iov = iro->iov; iov < iro->end; ++iov) 1789 size += iov->iov_len; 1790 1791 return size - iro->cur_iovec_off; 1792} 1793 1794 1795ssize_t 1796lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1797 int iovcnt) 1798{ 1799 COMMON_WRITE_CHECKS(); 1800 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1801 1802 struct inner_reader_iovec iro = { 1803 .iov = iov, 1804 .end = iov + iovcnt, 1805 .cur_iovec_off = 0, 1806 }; 1807 struct lsquic_reader reader = { 1808 .lsqr_read = inner_reader_iovec_read, 1809 .lsqr_size = inner_reader_iovec_size, 1810 .lsqr_ctx = &iro, 1811 }; 1812 1813 return stream_write(stream, &reader); 1814} 1815 1816 1817ssize_t 1818lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1819{ 1820 COMMON_WRITE_CHECKS(); 1821 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1822 return stream_write(stream, reader); 1823} 1824 1825 1826int 1827lsquic_stream_send_headers (lsquic_stream_t *stream, 1828 const lsquic_http_headers_t *headers, int eos) 1829{ 1830 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1831 STREAM_U_WRITE_DONE)) 1832 == STREAM_USE_HEADERS) 1833 { 1834 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1835 stream->id, headers, eos, lsquic_stream_priority(stream)); 1836 if (0 == s) 1837 { 1838 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1839 stream->stream_flags |= STREAM_HEADERS_SENT; 1840 if (eos) 1841 stream->stream_flags |= STREAM_FIN_SENT; 1842 LSQ_INFO("sent headers for stream %u", stream->id); 1843 } 1844 else 1845 LSQ_WARN("could not send headers: %s", strerror(errno)); 1846 return s; 1847 } 1848 else 1849 { 1850 LSQ_INFO("cannot send headers for stream %u in this state", stream->id); 1851 errno = EBADMSG; 1852 return -1; 1853 } 1854} 1855 1856 1857void 1858lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1859{ 1860 if (offset > stream->max_send_off) 1861 { 1862 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1863 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1864 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1865 stream->max_send_off = offset; 1866 } 1867 else 1868 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1869 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1870 stream->max_send_off); 1871} 1872 1873 1874/* This function is used to update offsets after handshake completes and we 1875 * learn of peer's limits from the handshake values. 1876 */ 1877int 1878lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1879{ 1880 LSQ_DEBUG("setting max_send_off to %u", offset); 1881 if (offset > stream->max_send_off) 1882 { 1883 lsquic_stream_window_update(stream, offset); 1884 return 0; 1885 } 1886 else if (offset < stream->tosend_off) 1887 { 1888 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1889 "already sent on this stream (%"PRIu64" bytes)", offset, 1890 stream->tosend_off); 1891 return -1; 1892 } 1893 else 1894 { 1895 stream->max_send_off = offset; 1896 return 0; 1897 } 1898} 1899 1900 1901void 1902lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1903{ 1904 lsquic_stream_reset_ext(stream, error_code, 1); 1905} 1906 1907 1908void 1909lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1910 int do_close) 1911{ 1912 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1913 { 1914 LSQ_INFO("reset already sent"); 1915 return; 1916 } 1917 1918 SM_HISTORY_APPEND(stream, SHE_RESET); 1919 1920 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1921 stream->error_code = error_code; 1922 1923 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1924 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1925 next_send_stream); 1926 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1927 stream->stream_flags |= STREAM_SEND_RST; 1928 1929 drop_buffered_data(stream); 1930 maybe_elide_stream_frames(stream); 1931 maybe_schedule_call_on_close(stream); 1932 1933 if (do_close) 1934 lsquic_stream_close(stream); 1935 else 1936 maybe_conn_to_tickable_if_writeable(stream, 1); 1937} 1938 1939 1940unsigned 1941lsquic_stream_id (const lsquic_stream_t *stream) 1942{ 1943 return stream->id; 1944} 1945 1946 1947struct lsquic_conn * 1948lsquic_stream_conn (const lsquic_stream_t *stream) 1949{ 1950 return stream->conn_pub->lconn; 1951} 1952 1953 1954int 1955lsquic_stream_close (lsquic_stream_t *stream) 1956{ 1957 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1958 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1959 if (lsquic_stream_is_closed(stream)) 1960 { 1961 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1962 errno = EBADF; 1963 return -1; 1964 } 1965 stream_shutdown_write(stream); 1966 stream_shutdown_read(stream); 1967 maybe_schedule_call_on_close(stream); 1968 maybe_finish_stream(stream); 1969 maybe_conn_to_tickable_if_writeable(stream, 1); 1970 return 0; 1971} 1972 1973 1974#ifndef NDEBUG 1975#if __GNUC__ 1976__attribute__((weak)) 1977#endif 1978#endif 1979void 1980lsquic_stream_acked (lsquic_stream_t *stream) 1981{ 1982 assert(stream->n_unacked); 1983 --stream->n_unacked; 1984 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 1985 if (0 == stream->n_unacked) 1986 maybe_finish_stream(stream); 1987} 1988 1989 1990void 1991lsquic_stream_push_req (lsquic_stream_t *stream, 1992 struct uncompressed_headers *push_req) 1993{ 1994 assert(!stream->push_req); 1995 stream->push_req = push_req; 1996 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 1997} 1998 1999 2000int 2001lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2002{ 2003 return 1 & ~stream->id; 2004} 2005 2006 2007int 2008lsquic_stream_push_info (const lsquic_stream_t *stream, 2009 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 2010{ 2011 if (lsquic_stream_is_pushed(stream)) 2012 { 2013 assert(stream->push_req); 2014 *ref_stream_id = stream->push_req->uh_stream_id; 2015 *headers = stream->push_req->uh_headers; 2016 *headers_sz = stream->push_req->uh_size; 2017 return 0; 2018 } 2019 else 2020 return -1; 2021} 2022 2023 2024int 2025lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2026{ 2027 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2028 { 2029 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2030 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2031 stream->stream_flags |= STREAM_HAVE_UH; 2032 if (uh->uh_flags & UH_FIN) 2033 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2034 stream->uh = uh; 2035 if (uh->uh_oth_stream_id == 0) 2036 { 2037 if (uh->uh_weight) 2038 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2039 } 2040 else 2041 LSQ_NOTICE("don't know how to depend on stream %u", 2042 uh->uh_oth_stream_id); 2043 return 0; 2044 } 2045 else 2046 { 2047 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2048 return -1; 2049 } 2050} 2051 2052 2053unsigned 2054lsquic_stream_priority (const lsquic_stream_t *stream) 2055{ 2056 return 256 - stream->sm_priority; 2057} 2058 2059 2060int 2061lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2062{ 2063 /* The user should never get a reference to the special streams, 2064 * but let's check just in case: 2065 */ 2066 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2067 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2068 LSQUIC_STREAM_HEADERS == stream->id)) 2069 return -1; 2070 if (priority < 1 || priority > 256) 2071 return -1; 2072 stream->sm_priority = 256 - priority; 2073 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2074 LSQ_DEBUG("set priority to %u", priority); 2075 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2076 return 0; 2077} 2078 2079 2080int 2081lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2082{ 2083 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2084 { 2085 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2086 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2087 { 2088 /* We need to send headers only if we are a) using HEADERS stream 2089 * and b) we already sent initial headers. If initial headers 2090 * have not been sent yet, stream priority will be sent in the 2091 * HEADERS frame. 2092 */ 2093 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2094 stream->id, 0, 0, priority); 2095 } 2096 else 2097 return 0; 2098 } 2099 else 2100 return -1; 2101} 2102 2103 2104lsquic_stream_ctx_t * 2105lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2106{ 2107 return stream->st_ctx; 2108} 2109 2110 2111int 2112lsquic_stream_refuse_push (lsquic_stream_t *stream) 2113{ 2114 if (lsquic_stream_is_pushed(stream) && 2115 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2116 { 2117 LSQ_DEBUG("refusing pushed stream: send reset"); 2118 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2119 return 0; 2120 } 2121 else 2122 return -1; 2123} 2124 2125 2126size_t 2127lsquic_stream_mem_used (const struct lsquic_stream *stream) 2128{ 2129 size_t size; 2130 2131 size = sizeof(stream); 2132 if (stream->sm_buf) 2133 size += SM_BUF_SIZE; 2134 if (stream->data_in) 2135 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2136 2137 return size; 2138} 2139 2140 2141lsquic_cid_t 2142lsquic_stream_cid (const struct lsquic_stream *stream) 2143{ 2144 return LSQUIC_LOG_CONN_ID; 2145} 2146 2147 2148