lsquic_stream.c revision 82f3bcef
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_frame_reader.h" 44#include "lsquic_conn.h" 45#include "lsquic_data_in_if.h" 46#include "lsquic_parse.h" 47#include "lsquic_packet_out.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_senhist.h" 50#include "lsquic_pacer.h" 51#include "lsquic_cubic.h" 52#include "lsquic_send_ctl.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 153# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 154 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 155 LSQ_DEBUG("history: [%.*s]", \ 156 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 157 (stream)->sm_hist_buf); \ 158 } while (0) 159#else 160# define SM_HISTORY_APPEND(stream, event) 161# define SM_HISTORY_DUMP_REMAINING(stream) 162#endif 163 164 165static int 166stream_inside_callback (const lsquic_stream_t *stream) 167{ 168 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 169} 170 171 172static void 173maybe_conn_to_tickable (lsquic_stream_t *stream) 174{ 175 if (!stream_inside_callback(stream)) 176 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 177 stream->conn_pub->lconn); 178} 179 180 181/* Here, "readable" means that the user is able to read from the stream. */ 182static void 183maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 184{ 185 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 186 { 187 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 188 stream->conn_pub->lconn); 189 } 190} 191 192 193/* Here, "writeable" means that data can be put into packets to be 194 * scheduled to be sent out. 195 * 196 * If `check_can_send' is false, it means that we do not need to check 197 * whether packets can be sent. This check was already performed when 198 * we packetized stream data. 199 */ 200static void 201maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 202 int check_can_send) 203{ 204 if (!stream_inside_callback(stream) && 205 (!check_can_send 206 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 207 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 208 { 209 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 210 stream->conn_pub->lconn); 211 } 212} 213 214 215static int 216stream_stalled (const lsquic_stream_t *stream) 217{ 218 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 219 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 220 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 221} 222 223 224/* TODO: The logic to figure out whether the stream is connection limited 225 * should be taken out of the constructor. The caller should specify this 226 * via one of enum stream_ctor_flags. 227 */ 228lsquic_stream_t * 229lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 230 const struct lsquic_stream_if *stream_if, 231 void *stream_if_ctx, unsigned initial_window, 232 unsigned initial_send_off, 233 enum stream_ctor_flags ctor_flags) 234{ 235 lsquic_cfcw_t *cfcw; 236 lsquic_stream_t *stream; 237 238 stream = calloc(1, sizeof(*stream)); 239 if (!stream) 240 return NULL; 241 242 stream->stream_if = stream_if; 243 stream->id = id; 244 stream->conn_pub = conn_pub; 245 stream->sm_onnew_arg = stream_if_ctx; 246 if (!initial_window) 247 initial_window = 16 * 1024; 248 if (LSQUIC_STREAM_HANDSHAKE == id || 249 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 250 cfcw = NULL; 251 else 252 { 253 cfcw = &conn_pub->cfcw; 254 stream->stream_flags |= STREAM_CONN_LIMITED; 255 if (conn_pub->hs) 256 stream->stream_flags |= STREAM_USE_HEADERS; 257 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 258 } 259 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 260 if (!initial_send_off) 261 initial_send_off = 16 * 1024; 262 stream->max_send_off = initial_send_off; 263 if (ctor_flags & SCF_USE_DI_HASH) 264 stream->data_in = data_in_hash_new(conn_pub, id, 0); 265 else 266 stream->data_in = data_in_nocopy_new(conn_pub, id); 267 LSQ_DEBUG("created stream %u @%p", id, stream); 268 SM_HISTORY_APPEND(stream, SHE_CREATED); 269 if (ctor_flags & SCF_DI_AUTOSWITCH) 270 stream->stream_flags |= STREAM_AUTOSWITCH; 271 if (ctor_flags & SCF_CALL_ON_NEW) 272 lsquic_stream_call_on_new(stream); 273 if (ctor_flags & SCF_DISP_RW_ONCE) 274 stream->stream_flags |= STREAM_RW_ONCE; 275 return stream; 276} 277 278 279void 280lsquic_stream_call_on_new (lsquic_stream_t *stream) 281{ 282 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 283 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 284 { 285 LSQ_DEBUG("calling on_new_stream"); 286 SM_HISTORY_APPEND(stream, SHE_ONNEW); 287 stream->stream_flags |= STREAM_ONNEW_DONE; 288 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 289 stream); 290 } 291} 292 293 294static void 295decr_conn_cap (struct lsquic_stream *stream, size_t incr) 296{ 297 if (stream->stream_flags & STREAM_CONN_LIMITED) 298 { 299 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 300 stream->conn_pub->conn_cap.cc_sent -= incr; 301 } 302} 303 304 305static void 306drop_buffered_data (struct lsquic_stream *stream) 307{ 308 decr_conn_cap(stream, stream->sm_n_buffered); 309 stream->sm_n_buffered = 0; 310 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 311 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 312} 313 314 315void 316lsquic_stream_destroy (lsquic_stream_t *stream) 317{ 318 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 319 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 320 STREAM_ONNEW_DONE) 321 { 322 stream->stream_flags |= STREAM_ONCLOSE_DONE; 323 stream->stream_if->on_close(stream, stream->st_ctx); 324 } 325 if (stream->stream_flags & STREAM_SENDING_FLAGS) 326 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 327 if (stream->stream_flags & STREAM_WANT_READ) 328 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 329 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 330 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 331 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 332 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 333 drop_buffered_data(stream); 334 lsquic_sfcw_consume_rem(&stream->fc); 335 drop_frames_in(stream); 336 free(stream->push_req); 337 free(stream->uh); 338 free(stream->sm_buf); 339 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 340 SM_HISTORY_DUMP_REMAINING(stream); 341 free(stream); 342} 343 344 345static int 346stream_is_finished (const lsquic_stream_t *stream) 347{ 348 return lsquic_stream_is_closed(stream) 349 /* n_unacked checks that no outgoing packets that reference this 350 * stream are outstanding: 351 */ 352 && 0 == stream->n_unacked 353 /* This checks that no packets that reference this stream will 354 * become outstanding: 355 */ 356 && 0 == (stream->stream_flags & STREAM_SEND_RST) 357 && ((stream->stream_flags & STREAM_FORCE_FINISH) 358 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 359 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 360} 361 362 363static void 364maybe_finish_stream (lsquic_stream_t *stream) 365{ 366 if (0 == (stream->stream_flags & STREAM_FINISHED) && 367 stream_is_finished(stream)) 368 { 369 LSQ_DEBUG("stream %u is now finished", stream->id); 370 SM_HISTORY_APPEND(stream, SHE_FINISHED); 371 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 372 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 373 next_service_stream); 374 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 375 } 376} 377 378 379static void 380maybe_schedule_call_on_close (lsquic_stream_t *stream) 381{ 382 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 383 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 384 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 385 { 386 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 387 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 388 next_service_stream); 389 stream->stream_flags |= STREAM_CALL_ONCLOSE; 390 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 391 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 392 } 393} 394 395 396void 397lsquic_stream_call_on_close (lsquic_stream_t *stream) 398{ 399 assert(stream->stream_flags & STREAM_ONNEW_DONE); 400 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 401 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 402 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 403 next_service_stream); 404 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 405 { 406 LSQ_DEBUG("calling on_close for stream %u", stream->id); 407 stream->stream_flags |= STREAM_ONCLOSE_DONE; 408 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 409 stream->stream_if->on_close(stream, stream->st_ctx); 410 } 411 else 412 assert(0); 413} 414 415 416int 417lsquic_stream_readable (const lsquic_stream_t *stream) 418{ 419 /* A stream is readable if one of the following is true: */ 420 return 421 /* - It is already finished: in that case, lsquic_stream_read() will 422 * return 0. 423 */ 424 (stream->stream_flags & STREAM_FIN_REACHED) 425 /* - The stream is reset, by either side. In this case, 426 * lsquic_stream_read() will return -1 (we want the user to be 427 * able to collect the error). 428 */ 429 || (stream->stream_flags & STREAM_RST_FLAGS) 430 /* - Either we are not in HTTP mode or the HTTP headers have been 431 * received and the headers or data from the stream can be read. 432 */ 433 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 434 == STREAM_USE_HEADERS) 435 && (stream->uh != NULL 436 || stream->data_in->di_if->di_get_frame(stream->data_in, 437 stream->read_offset))) 438 ; 439} 440 441 442size_t 443lsquic_stream_write_avail (const struct lsquic_stream *stream) 444{ 445 uint64_t stream_avail, conn_avail; 446 447 stream_avail = stream->max_send_off - stream->tosend_off 448 - stream->sm_n_buffered; 449 if (stream->stream_flags & STREAM_CONN_LIMITED) 450 { 451 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 452 if (conn_avail < stream_avail) 453 return conn_avail; 454 } 455 456 return stream_avail; 457} 458 459 460int 461lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 462{ 463 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 464 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 465 { 466 return -1; 467 } 468 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 469 { 470 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 471 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 472 next_send_stream); 473 stream->stream_flags |= STREAM_SEND_WUF; 474 } 475 return 0; 476} 477 478 479int 480lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 481{ 482 uint64_t max_off; 483 int got_next_offset; 484 enum ins_frame ins_frame; 485 486 assert(frame->packet_in); 487 488 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 489 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 490 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 491 492 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 493 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 494 { 495 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 496 lsquic_malo_put(frame); 497 return -1; 498 } 499 500 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 501 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 502 if (INS_FRAME_OK == ins_frame) 503 { 504 /* Update maximum offset in the flow controller and check for flow 505 * control violation: 506 */ 507 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 508 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 509 return -1; 510 if (frame->data_frame.df_fin) 511 { 512 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 513 stream->stream_flags |= STREAM_FIN_RECVD; 514 maybe_finish_stream(stream); 515 } 516 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 517 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 518 { 519 stream->data_in = stream->data_in->di_if->di_switch_impl( 520 stream->data_in, stream->read_offset); 521 if (!stream->data_in) 522 { 523 stream->data_in = data_in_error_new(); 524 return -1; 525 } 526 } 527 if (got_next_offset) 528 /* Checking the offset saves di_get_frame() call */ 529 maybe_conn_to_tickable_if_readable(stream); 530 return 0; 531 } 532 else if (INS_FRAME_DUP == ins_frame) 533 { 534 return 0; 535 } 536 else 537 { 538 assert(INS_FRAME_ERR == ins_frame); 539 return -1; 540 } 541} 542 543 544static void 545drop_frames_in (lsquic_stream_t *stream) 546{ 547 if (stream->data_in) 548 { 549 stream->data_in->di_if->di_destroy(stream->data_in); 550 /* To avoid checking whether `data_in` is set, just set to the error 551 * data-in stream. It does the right thing after incoming data is 552 * dropped. 553 */ 554 stream->data_in = data_in_error_new(); 555 } 556} 557 558 559static void 560maybe_elide_stream_frames (struct lsquic_stream *stream) 561{ 562 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 563 { 564 if (stream->n_unacked) 565 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 566 stream->id); 567 stream->stream_flags |= STREAM_FRAMES_ELIDED; 568 } 569} 570 571 572int 573lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 574 uint32_t error_code) 575{ 576 577 if (stream->stream_flags & STREAM_RST_RECVD) 578 { 579 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 580 return 0; 581 } 582 583 SM_HISTORY_APPEND(stream, SHE_RST_IN); 584 /* This flag must always be set, even if we are "ignoring" it: it is 585 * used by elision code. 586 */ 587 stream->stream_flags |= STREAM_RST_RECVD; 588 589 if ((stream->stream_flags & STREAM_FIN_RECVD) && 590 /* Pushed streams have fake STREAM_FIN_RECVD set, thus 591 * we need a special check: 592 */ 593 !lsquic_stream_is_pushed(stream)) 594 { 595 LSQ_DEBUG("ignore RST_STREAM frame after FIN is received"); 596 return 0; 597 } 598 599 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 600 { 601 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 602 "smaller than that of byte following the last byte we have seen: " 603 "0x%"PRIX64, stream->id, offset, 604 lsquic_sfcw_get_max_recv_off(&stream->fc)); 605 return -1; 606 } 607 608 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 609 { 610 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 611 " violates flow control", stream->id, offset); 612 return -1; 613 } 614 615 /* Let user collect error: */ 616 maybe_conn_to_tickable_if_readable(stream); 617 618 lsquic_sfcw_consume_rem(&stream->fc); 619 drop_frames_in(stream); 620 drop_buffered_data(stream); 621 maybe_elide_stream_frames(stream); 622 623 if (!(stream->stream_flags & 624 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 625 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 626 627 stream->stream_flags |= STREAM_RST_RECVD; 628 629 maybe_finish_stream(stream); 630 maybe_schedule_call_on_close(stream); 631 632 return 0; 633} 634 635 636uint64_t 637lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 638{ 639 assert(stream->stream_flags & STREAM_SEND_WUF); 640 stream->stream_flags &= ~STREAM_SEND_WUF; 641 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 642 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 643 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 644} 645 646 647void 648lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 649{ 650 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 651 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 652 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 653 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 654 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 655} 656 657 658void 659lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 660{ 661 assert(stream->stream_flags & STREAM_SEND_RST); 662 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 663 stream->stream_flags &= ~STREAM_SEND_RST; 664 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 665 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 666 stream->stream_flags |= STREAM_RST_SENT; 667 maybe_finish_stream(stream); 668} 669 670 671static size_t 672read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 673{ 674 struct uncompressed_headers *uh = stream->uh; 675 size_t n_avail = uh->uh_size - uh->uh_off; 676 if (n_avail < len) 677 len = n_avail; 678 memcpy(dst, uh->uh_headers + uh->uh_off, len); 679 uh->uh_off += len; 680 if (uh->uh_off == uh->uh_size) 681 { 682 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 683 free(uh); 684 stream->uh = NULL; 685 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 686 { 687 stream->stream_flags |= STREAM_FIN_REACHED; 688 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 689 } 690 } 691 return len; 692} 693 694 695/* This function returns 0 when EOF is reached. 696 */ 697ssize_t 698lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 699 int iovcnt) 700{ 701 size_t total_nread, nread; 702 int processed_frames, read_unc_headers, iovidx; 703 unsigned char *p, *end; 704 705 SM_HISTORY_APPEND(stream, SHE_USER_READ); 706 707#define NEXT_IOV() do { \ 708 ++iovidx; \ 709 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 710 ++iovidx; \ 711 if (iovidx < iovcnt) \ 712 { \ 713 p = iov[iovidx].iov_base; \ 714 end = p + iov[iovidx].iov_len; \ 715 } \ 716 else \ 717 p = end = NULL; \ 718} while (0) 719 720#define AVAIL() (end - p) 721 722 if (stream->stream_flags & STREAM_RST_FLAGS) 723 { 724 errno = ECONNRESET; 725 return -1; 726 } 727 if (stream->stream_flags & STREAM_U_READ_DONE) 728 { 729 errno = EBADF; 730 return -1; 731 } 732 if (stream->stream_flags & STREAM_FIN_REACHED) 733 return 0; 734 735 total_nread = 0; 736 processed_frames = 0; 737 738 iovidx = -1; 739 NEXT_IOV(); 740 741 if (stream->uh && AVAIL()) 742 { 743 read_unc_headers = 1; 744 do 745 { 746 nread = read_uh(stream, p, AVAIL()); 747 p += nread; 748 total_nread += nread; 749 if (p == end) 750 NEXT_IOV(); 751 } 752 while (stream->uh && AVAIL()); 753 } 754 else 755 read_unc_headers = 0; 756 757 struct data_frame *data_frame; 758 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 759 { 760 ++processed_frames; 761 size_t navail = data_frame->df_size - data_frame->df_read_off; 762 size_t ntowrite = AVAIL(); 763 if (navail < ntowrite) 764 ntowrite = navail; 765 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 766 p += ntowrite; 767 data_frame->df_read_off += ntowrite; 768 stream->read_offset += ntowrite; 769 total_nread += ntowrite; 770 if (data_frame->df_read_off == data_frame->df_size) 771 { 772 const int fin = data_frame->df_fin; 773 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 774 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 775 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 776 { 777 stream->data_in = stream->data_in->di_if->di_switch_impl( 778 stream->data_in, stream->read_offset); 779 if (!stream->data_in) 780 { 781 stream->data_in = data_in_error_new(); 782 return -1; 783 } 784 } 785 if (fin) 786 { 787 stream->stream_flags |= STREAM_FIN_REACHED; 788 break; 789 } 790 } 791 if (p == end) 792 NEXT_IOV(); 793 } 794 795 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 796 total_nread, stream->read_offset); 797 798 if (processed_frames) 799 { 800 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 801 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 802 { 803 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 804 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 805 stream->stream_flags |= STREAM_SEND_WUF; 806 maybe_conn_to_tickable_if_writeable(stream, 1); 807 } 808 } 809 810 if (processed_frames || read_unc_headers) 811 { 812 return total_nread; 813 } 814 else 815 { 816 assert(0 == total_nread); 817 errno = EWOULDBLOCK; 818 return -1; 819 } 820} 821 822 823ssize_t 824lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 825{ 826 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 827 return lsquic_stream_readv(stream, &iov, 1); 828} 829 830 831static void 832stream_shutdown_read (lsquic_stream_t *stream) 833{ 834 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 835 { 836 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 837 stream->stream_flags |= STREAM_U_READ_DONE; 838 stream_wantread(stream, 0); 839 maybe_finish_stream(stream); 840 } 841} 842 843 844static void 845stream_shutdown_write (lsquic_stream_t *stream) 846{ 847 if (stream->stream_flags & STREAM_U_WRITE_DONE) 848 return; 849 850 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 851 stream->stream_flags |= STREAM_U_WRITE_DONE; 852 stream_wantwrite(stream, 0); 853 854 /* Don't bother to check whether there is anything else to write if 855 * the flags indicate that nothing else should be written. 856 */ 857 if (!(stream->stream_flags & 858 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 859 { 860 if (stream->sm_n_buffered == 0) 861 { 862 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 863 stream)) 864 { 865 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 866 stream->stream_flags |= STREAM_FIN_SENT; 867 } 868 else 869 { 870 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 871 "flag in it"); 872 (void) stream_flush_nocheck(stream); 873 } 874 } 875 else 876 (void) stream_flush_nocheck(stream); 877 } 878} 879 880 881int 882lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 883{ 884 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 885 if (lsquic_stream_is_closed(stream)) 886 { 887 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 888 errno = EBADF; 889 return -1; 890 } 891 /* 0: read, 1: write: 2: read and write 892 */ 893 if (how < 0 || how > 2) 894 { 895 errno = EINVAL; 896 return -1; 897 } 898 899 if (how) 900 stream_shutdown_write(stream); 901 if (how != 1) 902 stream_shutdown_read(stream); 903 904 maybe_finish_stream(stream); 905 maybe_schedule_call_on_close(stream); 906 if (how) 907 maybe_conn_to_tickable_if_writeable(stream, 1); 908 909 return 0; 910} 911 912 913void 914lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 915{ 916 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 917 if (LSQUIC_STREAM_HANDSHAKE == stream->id 918 || ((stream->stream_flags & STREAM_USE_HEADERS) && 919 LSQUIC_STREAM_HEADERS == stream->id)) 920 { 921 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 922 stream->stream_flags |= STREAM_FORCE_FINISH; 923 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 924 } 925 maybe_finish_stream(stream); 926 maybe_schedule_call_on_close(stream); 927} 928 929 930static void 931fake_reset_unused_stream (lsquic_stream_t *stream) 932{ 933 stream->stream_flags |= 934 STREAM_RST_RECVD /* User will pick this up on read or write */ 935 | STREAM_RST_SENT /* Don't send anything else on this stream */ 936 ; 937 938 /* Cancel all writes to the network scheduled for this stream: */ 939 if (stream->stream_flags & STREAM_SENDING_FLAGS) 940 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 941 next_send_stream); 942 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 943 944 LSQ_DEBUG("fake-reset stream %u%s", 945 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 946 maybe_finish_stream(stream); 947 maybe_schedule_call_on_close(stream); 948} 949 950 951/* This function should only be called for locally-initiated streams whose ID 952 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 953 * frame sent by peer but we have not yet received it and created a stream. 954 * In this situation, we mark the stream as reset, so that user's on_read or 955 * on_write event callback picks up the error. That, in turn, should result 956 * in stream being closed. 957 * 958 * If we have received any data frames on this stream, this probably indicates 959 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 960 * lower than this. However, we still try to handle it gracefully and peform 961 * a shutdown, as if the stream was not reset. 962 */ 963void 964lsquic_stream_received_goaway (lsquic_stream_t *stream) 965{ 966 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 967 if (0 == stream->read_offset && 968 stream->data_in->di_if->di_empty(stream->data_in)) 969 fake_reset_unused_stream(stream); /* Normal condition */ 970 else 971 { /* This is odd, let's handle it the best we can: */ 972 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 973 lsquic_stream_shutdown_internal(stream); 974 } 975} 976 977 978uint64_t 979lsquic_stream_read_offset (const lsquic_stream_t *stream) 980{ 981 return stream->read_offset; 982} 983 984 985static int 986stream_wantread (lsquic_stream_t *stream, int is_want) 987{ 988 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 989 const int new_val = !!is_want; 990 if (old_val != new_val) 991 { 992 if (new_val) 993 { 994 if (!old_val) 995 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 996 next_read_stream); 997 stream->stream_flags |= STREAM_WANT_READ; 998 } 999 else 1000 { 1001 stream->stream_flags &= ~STREAM_WANT_READ; 1002 if (old_val) 1003 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1004 next_read_stream); 1005 } 1006 } 1007 return old_val; 1008} 1009 1010 1011static void 1012maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1013{ 1014 assert(STREAM_WRITE_Q_FLAGS & flag); 1015 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1016 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1017 next_write_stream); 1018 stream->stream_flags |= flag; 1019} 1020 1021 1022static void 1023maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1024{ 1025 assert(STREAM_WRITE_Q_FLAGS & flag); 1026 if (stream->stream_flags & flag) 1027 { 1028 stream->stream_flags &= ~flag; 1029 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1030 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1031 next_write_stream); 1032 } 1033} 1034 1035 1036static int 1037stream_wantwrite (lsquic_stream_t *stream, int is_want) 1038{ 1039 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1040 const int new_val = !!is_want; 1041 if (old_val != new_val) 1042 { 1043 if (new_val) 1044 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1045 else 1046 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1047 } 1048 return old_val; 1049} 1050 1051 1052int 1053lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1054{ 1055 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1056 { 1057 if (is_want) 1058 maybe_conn_to_tickable_if_readable(stream); 1059 return stream_wantread(stream, is_want); 1060 } 1061 else 1062 { 1063 errno = EBADF; 1064 return -1; 1065 } 1066} 1067 1068 1069int 1070lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1071{ 1072 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1073 { 1074 if (is_want) 1075 maybe_conn_to_tickable_if_writeable(stream, 1); 1076 return stream_wantwrite(stream, is_want); 1077 } 1078 else 1079 { 1080 errno = EBADF; 1081 return -1; 1082 } 1083} 1084 1085 1086#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1087 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1088 1089 1090static void 1091stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1092{ 1093 unsigned no_progress_count, no_progress_limit; 1094 enum stream_flags flags; 1095 uint64_t size; 1096 1097 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1098 1099 no_progress_count = 0; 1100 while ((stream->stream_flags & STREAM_WANT_READ) 1101 && lsquic_stream_readable(stream)) 1102 { 1103 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1104 size = stream->read_offset; 1105 1106 stream->stream_if->on_read(stream, stream->st_ctx); 1107 1108 if (no_progress_limit && size == stream->read_offset && 1109 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1110 { 1111 ++no_progress_count; 1112 if (no_progress_count >= no_progress_limit) 1113 { 1114 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1115 "progress) in user code reading from stream", 1116 no_progress_count, 1117 no_progress_count == 1 ? "" : "s"); 1118 break; 1119 } 1120 } 1121 else 1122 no_progress_count = 0; 1123 } 1124} 1125 1126 1127static void 1128stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1129{ 1130 unsigned no_progress_count, no_progress_limit; 1131 enum stream_flags flags; 1132 1133 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1134 1135 no_progress_count = 0; 1136 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1137 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1138 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1139 && lsquic_stream_write_avail(stream)) 1140 { 1141 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1142 1143 stream->stream_if->on_write(stream, stream->st_ctx); 1144 1145 if (no_progress_limit && 1146 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1147 { 1148 ++no_progress_count; 1149 if (no_progress_count >= no_progress_limit) 1150 { 1151 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1152 "progress) in user code writing to stream", 1153 no_progress_count, 1154 no_progress_count == 1 ? "" : "s"); 1155 break; 1156 } 1157 } 1158 else 1159 no_progress_count = 0; 1160 } 1161} 1162 1163 1164static void 1165stream_dispatch_read_events_once (lsquic_stream_t *stream) 1166{ 1167 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1168 { 1169 stream->stream_if->on_read(stream, stream->st_ctx); 1170 } 1171} 1172 1173 1174static void 1175maybe_mark_as_blocked (lsquic_stream_t *stream) 1176{ 1177 struct lsquic_conn_cap *cc; 1178 1179 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1180 { 1181 if (stream->blocked_off < stream->max_send_off) 1182 { 1183 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1184 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1185 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1186 next_send_stream); 1187 stream->stream_flags |= STREAM_SEND_BLOCKED; 1188 LSQ_DEBUG("marked stream-blocked at stream offset " 1189 "%"PRIu64, stream->blocked_off); 1190 } 1191 else 1192 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1193 " has been, or is about to be, sent", stream->blocked_off); 1194 } 1195 1196 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1197 && (cc = &stream->conn_pub->conn_cap, 1198 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1199 { 1200 if (cc->cc_blocked < cc->cc_max) 1201 { 1202 cc->cc_blocked = cc->cc_max; 1203 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1204 LSQ_DEBUG("marked connection-blocked at connection offset " 1205 "%"PRIu64, cc->cc_max); 1206 } 1207 else 1208 LSQ_DEBUG("stream has already been marked connection-blocked " 1209 "at offset %"PRIu64, cc->cc_blocked); 1210 } 1211} 1212 1213 1214void 1215lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1216{ 1217 assert(stream->stream_flags & STREAM_WANT_READ); 1218 1219 if (stream->stream_flags & STREAM_RW_ONCE) 1220 stream_dispatch_read_events_once(stream); 1221 else 1222 stream_dispatch_read_events_loop(stream); 1223} 1224 1225 1226void 1227lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1228{ 1229 int progress; 1230 uint64_t tosend_off; 1231 unsigned short n_buffered; 1232 enum stream_flags flags; 1233 1234 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1235 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1236 tosend_off = stream->tosend_off; 1237 n_buffered = stream->sm_n_buffered; 1238 1239 if (stream->stream_flags & STREAM_WANT_FLUSH) 1240 (void) stream_flush(stream); 1241 1242 if (stream->stream_flags & STREAM_RW_ONCE) 1243 { 1244 if ((stream->stream_flags & STREAM_WANT_WRITE) 1245 && lsquic_stream_write_avail(stream)) 1246 { 1247 stream->stream_if->on_write(stream, stream->st_ctx); 1248 } 1249 } 1250 else 1251 stream_dispatch_write_events_loop(stream); 1252 1253 /* Progress means either flags or offsets changed: */ 1254 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1255 stream->tosend_off == tosend_off && 1256 stream->sm_n_buffered == n_buffered); 1257 1258 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1259 { 1260 if (progress) 1261 { /* Move the stream to the end of the list to ensure fairness. */ 1262 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1263 next_write_stream); 1264 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1265 next_write_stream); 1266 } 1267 } 1268} 1269 1270 1271static size_t 1272inner_reader_empty_size (void *ctx) 1273{ 1274 return 0; 1275} 1276 1277 1278static size_t 1279inner_reader_empty_read (void *ctx, void *buf, size_t count) 1280{ 1281 return 0; 1282} 1283 1284 1285static int 1286stream_flush (lsquic_stream_t *stream) 1287{ 1288 struct lsquic_reader empty_reader; 1289 ssize_t nw; 1290 1291 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1292 assert(stream->sm_n_buffered > 0 || 1293 /* Flushing is also used to packetize standalone FIN: */ 1294 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1295 == STREAM_U_WRITE_DONE)); 1296 1297 empty_reader.lsqr_size = inner_reader_empty_size; 1298 empty_reader.lsqr_read = inner_reader_empty_read; 1299 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1300 nw = stream_write_to_packets(stream, &empty_reader, 0); 1301 1302 if (nw >= 0) 1303 { 1304 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1305 return 0; 1306 } 1307 else 1308 return -1; 1309} 1310 1311 1312static int 1313stream_flush_nocheck (lsquic_stream_t *stream) 1314{ 1315 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1316 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1317 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1318 1319 return stream_flush(stream); 1320} 1321 1322 1323int 1324lsquic_stream_flush (lsquic_stream_t *stream) 1325{ 1326 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1327 { 1328 LSQ_DEBUG("cannot flush closed stream"); 1329 errno = EBADF; 1330 return -1; 1331 } 1332 1333 if (0 == stream->sm_n_buffered) 1334 { 1335 LSQ_DEBUG("flushing 0 bytes: noop"); 1336 return 0; 1337 } 1338 1339 return stream_flush_nocheck(stream); 1340} 1341 1342 1343/* The flush threshold is the maximum size of stream data that can be sent 1344 * in a full packet. 1345 */ 1346static size_t 1347flush_threshold (const lsquic_stream_t *stream) 1348{ 1349 enum packet_out_flags flags; 1350 enum lsquic_packno_bits bits; 1351 unsigned packet_header_sz, stream_header_sz; 1352 size_t threshold; 1353 1354 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1355 flags = bits << POBIT_SHIFT; 1356 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1357 flags |= PO_CONN_ID; 1358 1359 packet_header_sz = lsquic_po_header_length(flags); 1360 stream_header_sz = stream->conn_pub->lconn->cn_pf 1361 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1362 1363 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1364 - packet_header_sz - stream_header_sz; 1365 return threshold; 1366} 1367 1368 1369#define COMMON_WRITE_CHECKS() do { \ 1370 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1371 == STREAM_USE_HEADERS) \ 1372 { \ 1373 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1374 errno = EILSEQ; \ 1375 return -1; \ 1376 } \ 1377 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1378 { \ 1379 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1380 errno = ECONNRESET; \ 1381 return -1; \ 1382 } \ 1383 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1384 { \ 1385 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1386 "writing"); \ 1387 errno = EBADF; \ 1388 return -1; \ 1389 } \ 1390} while (0) 1391 1392 1393struct frame_gen_ctx 1394{ 1395 lsquic_stream_t *fgc_stream; 1396 struct lsquic_reader *fgc_reader; 1397 /* We keep our own count of how many bytes were read from reader because 1398 * some readers are external. The external caller does not have to rely 1399 * on our count, but it can. 1400 */ 1401 size_t fgc_nread_from_reader; 1402}; 1403 1404 1405static size_t 1406frame_gen_size (void *ctx) 1407{ 1408 struct frame_gen_ctx *fg_ctx = ctx; 1409 size_t available, remaining; 1410 1411 /* Make sure we are not writing past available size: */ 1412 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1413 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1414 if (available < remaining) 1415 remaining = available; 1416 1417 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1418} 1419 1420 1421static int 1422frame_gen_fin (void *ctx) 1423{ 1424 struct frame_gen_ctx *fg_ctx = ctx; 1425 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1426 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1427 /* Do not use frame_gen_size() as it may chop the real size: */ 1428 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1429} 1430 1431 1432static void 1433incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1434{ 1435 if (stream->stream_flags & STREAM_CONN_LIMITED) 1436 { 1437 stream->conn_pub->conn_cap.cc_sent += incr; 1438 assert(stream->conn_pub->conn_cap.cc_sent 1439 <= stream->conn_pub->conn_cap.cc_max); 1440 } 1441} 1442 1443 1444static size_t 1445frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1446{ 1447 struct frame_gen_ctx *fg_ctx = ctx; 1448 unsigned char *p = begin_buf; 1449 unsigned char *const end = p + len; 1450 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1451 size_t n_written, available, n_to_write; 1452 1453 if (stream->sm_n_buffered > 0) 1454 { 1455 if (len <= stream->sm_n_buffered) 1456 { 1457 memcpy(p, stream->sm_buf, len); 1458 memmove(stream->sm_buf, stream->sm_buf + len, 1459 stream->sm_n_buffered - len); 1460 stream->sm_n_buffered -= len; 1461 stream->tosend_off += len; 1462 *fin = frame_gen_fin(fg_ctx); 1463 return len; 1464 } 1465 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1466 p += stream->sm_n_buffered; 1467 stream->sm_n_buffered = 0; 1468 } 1469 1470 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1471 n_to_write = end - p; 1472 if (n_to_write > available) 1473 n_to_write = available; 1474 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1475 n_to_write); 1476 p += n_written; 1477 fg_ctx->fgc_nread_from_reader += n_written; 1478 *fin = frame_gen_fin(fg_ctx); 1479 stream->tosend_off += p - (const unsigned char *) begin_buf; 1480 incr_conn_cap(stream, n_written); 1481 return p - (const unsigned char *) begin_buf; 1482} 1483 1484 1485static void 1486check_flush_threshold (lsquic_stream_t *stream) 1487{ 1488 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1489 stream->tosend_off >= stream->sm_flush_to) 1490 { 1491 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1492 stream->sm_flush_to); 1493 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1494 } 1495} 1496 1497 1498static struct lsquic_packet_out * 1499get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1500 const struct lsquic_stream *stream) 1501{ 1502 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1503} 1504 1505 1506static struct lsquic_packet_out * (* const get_packet[])( 1507 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1508{ 1509 lsquic_send_ctl_get_packet_for_stream, 1510 get_brand_new_packet, 1511}; 1512 1513 1514static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1515stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1516{ 1517 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1518 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1519 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1520 unsigned stream_header_sz, need_at_least, off; 1521 lsquic_packet_out_t *packet_out; 1522 int len, s, hsk; 1523 1524 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1525 stream->tosend_off); 1526 need_at_least = stream_header_sz + (size > 0); 1527 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1528 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1529 if (!packet_out) 1530 return SWTP_STOP; 1531 1532 off = packet_out->po_data_sz; 1533 len = pf->pf_gen_stream_frame( 1534 packet_out->po_data + packet_out->po_data_sz, 1535 lsquic_packet_out_avail(packet_out), stream->id, 1536 stream->tosend_off, 1537 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1538 if (len < 0) 1539 { 1540 LSQ_ERROR("could not generate stream frame"); 1541 return SWTP_ERROR; 1542 } 1543 1544 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1545 packet_out->po_data + packet_out->po_data_sz, len); 1546 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1547 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1548 if (0 == lsquic_packet_out_avail(packet_out)) 1549 packet_out->po_flags |= PO_STREAM_END; 1550 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1551 stream, QUIC_FRAME_STREAM, off, len); 1552 if (s != 0) 1553 { 1554 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1555 return SWTP_ERROR; 1556 } 1557 1558 check_flush_threshold(stream); 1559 1560 /* XXX: I don't like it that this is here */ 1561 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1562 { 1563 lsquic_packet_out_zero_pad(packet_out); 1564 packet_out->po_flags |= PO_HELLO; 1565 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1566 } 1567 1568 return SWTP_OK; 1569} 1570 1571 1572static void 1573abort_connection (struct lsquic_stream *stream) 1574{ 1575 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1576 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1577 next_service_stream); 1578 stream->stream_flags |= STREAM_ABORT_CONN; 1579 LSQ_WARN("connection will be aborted"); 1580 maybe_conn_to_tickable(stream); 1581} 1582 1583 1584static ssize_t 1585stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1586 size_t thresh) 1587{ 1588 size_t size; 1589 ssize_t nw; 1590 unsigned seen_ok; 1591 struct frame_gen_ctx fg_ctx = { 1592 .fgc_stream = stream, 1593 .fgc_reader = reader, 1594 .fgc_nread_from_reader = 0, 1595 }; 1596 1597 seen_ok = 0; 1598 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1599 || frame_gen_fin(&fg_ctx)) 1600 { 1601 switch (stream_write_to_packet(&fg_ctx, size)) 1602 { 1603 case SWTP_OK: 1604 if (!seen_ok++) 1605 maybe_conn_to_tickable_if_writeable(stream, 0); 1606 if (frame_gen_fin(&fg_ctx)) 1607 { 1608 stream->stream_flags |= STREAM_FIN_SENT; 1609 goto end; 1610 } 1611 else 1612 break; 1613 case SWTP_STOP: 1614 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1615 goto end; 1616 default: 1617 abort_connection(stream); 1618 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1619 return -1; 1620 } 1621 } 1622 1623 if (thresh) 1624 { 1625 assert(size < thresh); 1626 assert(size >= stream->sm_n_buffered); 1627 size -= stream->sm_n_buffered; 1628 if (size > 0) 1629 { 1630 nw = save_to_buffer(stream, reader, size); 1631 if (nw < 0) 1632 return -1; 1633 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1634 } 1635 } 1636 else 1637 { 1638 /* We count flushed data towards both stream and connection limits, 1639 * so we should have been able to packetize all of it: 1640 */ 1641 assert(0 == stream->sm_n_buffered); 1642 assert(size == 0); 1643 } 1644 1645 maybe_mark_as_blocked(stream); 1646 1647 end: 1648 return fg_ctx.fgc_nread_from_reader; 1649} 1650 1651 1652/* Perform an implicit flush when we hit connection limit while buffering 1653 * data. This is to prevent a (theoretical) stall: 1654 * 1655 * Imagine a number of streams, all of which buffered some data. The buffered 1656 * data is up to connection cap, which means no further writes are possible. 1657 * None of them flushes, which means that data is not sent and connection 1658 * WINDOW_UPDATE frame never arrives from peer. Stall. 1659 */ 1660static int 1661maybe_flush_stream (struct lsquic_stream *stream) 1662{ 1663 if (stream->sm_n_buffered > 0 1664 && (stream->stream_flags & STREAM_CONN_LIMITED) 1665 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1666 return stream_flush_nocheck(stream); 1667 else 1668 return 0; 1669} 1670 1671 1672static ssize_t 1673save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1674 size_t len) 1675{ 1676 size_t avail, n_written; 1677 1678 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1679 1680 if (!stream->sm_buf) 1681 { 1682 stream->sm_buf = malloc(SM_BUF_SIZE); 1683 if (!stream->sm_buf) 1684 return -1; 1685 } 1686 1687 avail = lsquic_stream_write_avail(stream); 1688 if (avail < len) 1689 len = avail; 1690 1691 n_written = reader->lsqr_read(reader->lsqr_ctx, 1692 stream->sm_buf + stream->sm_n_buffered, len); 1693 stream->sm_n_buffered += n_written; 1694 incr_conn_cap(stream, n_written); 1695 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1696 n_written, stream->sm_n_buffered); 1697 if (0 != maybe_flush_stream(stream)) 1698 return -1; 1699 return n_written; 1700} 1701 1702 1703static ssize_t 1704stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1705{ 1706 size_t thresh, len; 1707 1708 thresh = flush_threshold(stream); 1709 len = reader->lsqr_size(reader->lsqr_ctx); 1710 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1711 stream->sm_n_buffered + len < thresh) 1712 return save_to_buffer(stream, reader, len); 1713 else 1714 return stream_write_to_packets(stream, reader, thresh); 1715} 1716 1717 1718ssize_t 1719lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1720{ 1721 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1722 return lsquic_stream_writev(stream, &iov, 1); 1723} 1724 1725 1726struct inner_reader_iovec { 1727 const struct iovec *iov; 1728 const struct iovec *end; 1729 unsigned cur_iovec_off; 1730}; 1731 1732 1733static size_t 1734inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1735{ 1736 struct inner_reader_iovec *const iro = ctx; 1737 unsigned char *p = buf; 1738 unsigned char *const end = p + count; 1739 unsigned n_tocopy; 1740 1741 while (iro->iov < iro->end && p < end) 1742 { 1743 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1744 if (n_tocopy > (unsigned) (end - p)) 1745 n_tocopy = end - p; 1746 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1747 n_tocopy); 1748 p += n_tocopy; 1749 iro->cur_iovec_off += n_tocopy; 1750 if (iro->iov->iov_len == iro->cur_iovec_off) 1751 { 1752 ++iro->iov; 1753 iro->cur_iovec_off = 0; 1754 } 1755 } 1756 1757 return p + count - end; 1758} 1759 1760 1761static size_t 1762inner_reader_iovec_size (void *ctx) 1763{ 1764 struct inner_reader_iovec *const iro = ctx; 1765 const struct iovec *iov; 1766 size_t size; 1767 1768 size = 0; 1769 for (iov = iro->iov; iov < iro->end; ++iov) 1770 size += iov->iov_len; 1771 1772 return size - iro->cur_iovec_off; 1773} 1774 1775 1776ssize_t 1777lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1778 int iovcnt) 1779{ 1780 COMMON_WRITE_CHECKS(); 1781 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1782 1783 struct inner_reader_iovec iro = { 1784 .iov = iov, 1785 .end = iov + iovcnt, 1786 .cur_iovec_off = 0, 1787 }; 1788 struct lsquic_reader reader = { 1789 .lsqr_read = inner_reader_iovec_read, 1790 .lsqr_size = inner_reader_iovec_size, 1791 .lsqr_ctx = &iro, 1792 }; 1793 1794 return stream_write(stream, &reader); 1795} 1796 1797 1798ssize_t 1799lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1800{ 1801 COMMON_WRITE_CHECKS(); 1802 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1803 return stream_write(stream, reader); 1804} 1805 1806 1807int 1808lsquic_stream_send_headers (lsquic_stream_t *stream, 1809 const lsquic_http_headers_t *headers, int eos) 1810{ 1811 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1812 STREAM_U_WRITE_DONE)) 1813 == STREAM_USE_HEADERS) 1814 { 1815 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1816 stream->id, headers, eos, lsquic_stream_priority(stream)); 1817 if (0 == s) 1818 { 1819 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1820 stream->stream_flags |= STREAM_HEADERS_SENT; 1821 if (eos) 1822 stream->stream_flags |= STREAM_FIN_SENT; 1823 LSQ_INFO("sent headers for stream %u", stream->id); 1824 } 1825 else 1826 LSQ_WARN("could not send headers: %s", strerror(errno)); 1827 return s; 1828 } 1829 else 1830 { 1831 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1832 errno = EBADMSG; 1833 return -1; 1834 } 1835} 1836 1837 1838void 1839lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1840{ 1841 if (offset > stream->max_send_off) 1842 { 1843 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1844 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1845 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1846 stream->max_send_off = offset; 1847 } 1848 else 1849 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1850 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1851 stream->max_send_off); 1852} 1853 1854 1855/* This function is used to update offsets after handshake completes and we 1856 * learn of peer's limits from the handshake values. 1857 */ 1858int 1859lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1860{ 1861 LSQ_DEBUG("setting max_send_off to %u", offset); 1862 if (offset > stream->max_send_off) 1863 { 1864 lsquic_stream_window_update(stream, offset); 1865 return 0; 1866 } 1867 else if (offset < stream->tosend_off) 1868 { 1869 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1870 "already sent on this stream (%"PRIu64" bytes)", offset, 1871 stream->tosend_off); 1872 return -1; 1873 } 1874 else 1875 { 1876 stream->max_send_off = offset; 1877 return 0; 1878 } 1879} 1880 1881 1882void 1883lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1884{ 1885 lsquic_stream_reset_ext(stream, error_code, 1); 1886} 1887 1888 1889void 1890lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1891 int do_close) 1892{ 1893 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1894 { 1895 LSQ_INFO("reset already sent"); 1896 return; 1897 } 1898 1899 SM_HISTORY_APPEND(stream, SHE_RESET); 1900 1901 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1902 stream->error_code = error_code; 1903 1904 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1905 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1906 next_send_stream); 1907 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1908 stream->stream_flags |= STREAM_SEND_RST; 1909 1910 drop_buffered_data(stream); 1911 maybe_elide_stream_frames(stream); 1912 maybe_schedule_call_on_close(stream); 1913 1914 if (do_close) 1915 lsquic_stream_close(stream); 1916 else 1917 maybe_conn_to_tickable_if_writeable(stream, 1); 1918} 1919 1920 1921unsigned 1922lsquic_stream_id (const lsquic_stream_t *stream) 1923{ 1924 return stream->id; 1925} 1926 1927 1928struct lsquic_conn * 1929lsquic_stream_conn (const lsquic_stream_t *stream) 1930{ 1931 return stream->conn_pub->lconn; 1932} 1933 1934 1935int 1936lsquic_stream_close (lsquic_stream_t *stream) 1937{ 1938 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1939 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1940 if (lsquic_stream_is_closed(stream)) 1941 { 1942 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1943 errno = EBADF; 1944 return -1; 1945 } 1946 stream_shutdown_write(stream); 1947 stream_shutdown_read(stream); 1948 maybe_schedule_call_on_close(stream); 1949 maybe_finish_stream(stream); 1950 maybe_conn_to_tickable_if_writeable(stream, 1); 1951 return 0; 1952} 1953 1954 1955#ifndef NDEBUG 1956#if __GNUC__ 1957__attribute__((weak)) 1958#endif 1959#endif 1960void 1961lsquic_stream_acked (lsquic_stream_t *stream) 1962{ 1963 assert(stream->n_unacked); 1964 --stream->n_unacked; 1965 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 1966 if (0 == stream->n_unacked) 1967 maybe_finish_stream(stream); 1968} 1969 1970 1971void 1972lsquic_stream_push_req (lsquic_stream_t *stream, 1973 struct uncompressed_headers *push_req) 1974{ 1975 assert(!stream->push_req); 1976 stream->push_req = push_req; 1977 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 1978} 1979 1980 1981int 1982lsquic_stream_is_pushed (const lsquic_stream_t *stream) 1983{ 1984 return 1 & ~stream->id; 1985} 1986 1987 1988int 1989lsquic_stream_push_info (const lsquic_stream_t *stream, 1990 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 1991{ 1992 if (lsquic_stream_is_pushed(stream)) 1993 { 1994 assert(stream->push_req); 1995 *ref_stream_id = stream->push_req->uh_stream_id; 1996 *headers = stream->push_req->uh_headers; 1997 *headers_sz = stream->push_req->uh_size; 1998 return 0; 1999 } 2000 else 2001 return -1; 2002} 2003 2004 2005int 2006lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2007{ 2008 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2009 { 2010 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2011 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2012 stream->stream_flags |= STREAM_HAVE_UH; 2013 if (uh->uh_flags & UH_FIN) 2014 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2015 stream->uh = uh; 2016 if (uh->uh_oth_stream_id == 0) 2017 { 2018 if (uh->uh_weight) 2019 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2020 } 2021 else 2022 LSQ_NOTICE("don't know how to depend on stream %u", 2023 uh->uh_oth_stream_id); 2024 return 0; 2025 } 2026 else 2027 { 2028 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2029 return -1; 2030 } 2031} 2032 2033 2034unsigned 2035lsquic_stream_priority (const lsquic_stream_t *stream) 2036{ 2037 return 256 - stream->sm_priority; 2038} 2039 2040 2041int 2042lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2043{ 2044 /* The user should never get a reference to the special streams, 2045 * but let's check just in case: 2046 */ 2047 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2048 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2049 LSQUIC_STREAM_HEADERS == stream->id)) 2050 return -1; 2051 if (priority < 1 || priority > 256) 2052 return -1; 2053 stream->sm_priority = 256 - priority; 2054 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2055 LSQ_DEBUG("set priority to %u", priority); 2056 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2057 return 0; 2058} 2059 2060 2061int 2062lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2063{ 2064 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2065 { 2066 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2067 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2068 { 2069 /* We need to send headers only if we are a) using HEADERS stream 2070 * and b) we already sent initial headers. If initial headers 2071 * have not been sent yet, stream priority will be sent in the 2072 * HEADERS frame. 2073 */ 2074 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2075 stream->id, 0, 0, priority); 2076 } 2077 else 2078 return 0; 2079 } 2080 else 2081 return -1; 2082} 2083 2084 2085lsquic_stream_ctx_t * 2086lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2087{ 2088 return stream->st_ctx; 2089} 2090 2091 2092int 2093lsquic_stream_refuse_push (lsquic_stream_t *stream) 2094{ 2095 if (lsquic_stream_is_pushed(stream) && 2096 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2097 { 2098 LSQ_DEBUG("refusing pushed stream: send reset"); 2099 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2100 return 0; 2101 } 2102 else 2103 return -1; 2104} 2105 2106 2107size_t 2108lsquic_stream_mem_used (const struct lsquic_stream *stream) 2109{ 2110 size_t size; 2111 2112 size = sizeof(stream); 2113 if (stream->sm_buf) 2114 size += SM_BUF_SIZE; 2115 if (stream->data_in) 2116 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2117 2118 return size; 2119} 2120 2121 2122lsquic_cid_t 2123lsquic_stream_cid (const struct lsquic_stream *stream) 2124{ 2125 return LSQUIC_LOG_CONN_ID; 2126} 2127