lsquic_stream.c revision be4cfad0
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_frame_reader.h" 44#include "lsquic_conn.h" 45#include "lsquic_data_in_if.h" 46#include "lsquic_parse.h" 47#include "lsquic_packet_out.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_senhist.h" 50#include "lsquic_pacer.h" 51#include "lsquic_cubic.h" 52#include "lsquic_send_ctl.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 153# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 154 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 155 LSQ_DEBUG("history: [%.*s]", \ 156 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 157 (stream)->sm_hist_buf); \ 158 } while (0) 159#else 160# define SM_HISTORY_APPEND(stream, event) 161# define SM_HISTORY_DUMP_REMAINING(stream) 162#endif 163 164 165static int 166stream_inside_callback (const lsquic_stream_t *stream) 167{ 168 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 169} 170 171 172static void 173maybe_conn_to_tickable (lsquic_stream_t *stream) 174{ 175 if (!stream_inside_callback(stream)) 176 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 177 stream->conn_pub->lconn); 178} 179 180 181/* Here, "readable" means that the user is able to read from the stream. */ 182static void 183maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 184{ 185 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 186 { 187 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 188 stream->conn_pub->lconn); 189 } 190} 191 192 193/* Here, "writeable" means that data can be put into packets to be 194 * scheduled to be sent out. 195 * 196 * If `check_can_send' is false, it means that we do not need to check 197 * whether packets can be sent. This check was already performed when 198 * we packetized stream data. 199 */ 200static void 201maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 202 int check_can_send) 203{ 204 if (!stream_inside_callback(stream) && 205 (!check_can_send 206 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 207 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 208 { 209 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 210 stream->conn_pub->lconn); 211 } 212} 213 214 215static int 216stream_stalled (const lsquic_stream_t *stream) 217{ 218 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 219 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 220 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 221} 222 223 224/* TODO: The logic to figure out whether the stream is connection limited 225 * should be taken out of the constructor. The caller should specify this 226 * via one of enum stream_ctor_flags. 227 */ 228lsquic_stream_t * 229lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 230 const struct lsquic_stream_if *stream_if, 231 void *stream_if_ctx, unsigned initial_window, 232 unsigned initial_send_off, 233 enum stream_ctor_flags ctor_flags) 234{ 235 lsquic_cfcw_t *cfcw; 236 lsquic_stream_t *stream; 237 238 stream = calloc(1, sizeof(*stream)); 239 if (!stream) 240 return NULL; 241 242 stream->stream_if = stream_if; 243 stream->id = id; 244 stream->conn_pub = conn_pub; 245 stream->sm_onnew_arg = stream_if_ctx; 246 if (!initial_window) 247 initial_window = 16 * 1024; 248 if (LSQUIC_STREAM_HANDSHAKE == id || 249 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 250 cfcw = NULL; 251 else 252 { 253 cfcw = &conn_pub->cfcw; 254 stream->stream_flags |= STREAM_CONN_LIMITED; 255 if (conn_pub->hs) 256 stream->stream_flags |= STREAM_USE_HEADERS; 257 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 258 } 259 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 260 if (!initial_send_off) 261 initial_send_off = 16 * 1024; 262 stream->max_send_off = initial_send_off; 263 if (ctor_flags & SCF_USE_DI_HASH) 264 stream->data_in = data_in_hash_new(conn_pub, id, 0); 265 else 266 stream->data_in = data_in_nocopy_new(conn_pub, id); 267 LSQ_DEBUG("created stream %u @%p", id, stream); 268 SM_HISTORY_APPEND(stream, SHE_CREATED); 269 if (ctor_flags & SCF_DI_AUTOSWITCH) 270 stream->stream_flags |= STREAM_AUTOSWITCH; 271 if (ctor_flags & SCF_CALL_ON_NEW) 272 lsquic_stream_call_on_new(stream); 273 if (ctor_flags & SCF_DISP_RW_ONCE) 274 stream->stream_flags |= STREAM_RW_ONCE; 275 if (ctor_flags & SCF_ALLOW_OVERLAP) 276 stream->stream_flags |= STREAM_ALLOW_OVERLAP; 277 return stream; 278} 279 280 281void 282lsquic_stream_call_on_new (lsquic_stream_t *stream) 283{ 284 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 285 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 286 { 287 LSQ_DEBUG("calling on_new_stream"); 288 SM_HISTORY_APPEND(stream, SHE_ONNEW); 289 stream->stream_flags |= STREAM_ONNEW_DONE; 290 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 291 stream); 292 } 293} 294 295 296static void 297decr_conn_cap (struct lsquic_stream *stream, size_t incr) 298{ 299 if (stream->stream_flags & STREAM_CONN_LIMITED) 300 { 301 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 302 stream->conn_pub->conn_cap.cc_sent -= incr; 303 } 304} 305 306 307static void 308drop_buffered_data (struct lsquic_stream *stream) 309{ 310 decr_conn_cap(stream, stream->sm_n_buffered); 311 stream->sm_n_buffered = 0; 312 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 313 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 314} 315 316 317void 318lsquic_stream_destroy (lsquic_stream_t *stream) 319{ 320 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 321 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 322 STREAM_ONNEW_DONE) 323 { 324 stream->stream_flags |= STREAM_ONCLOSE_DONE; 325 stream->stream_if->on_close(stream, stream->st_ctx); 326 } 327 if (stream->stream_flags & STREAM_SENDING_FLAGS) 328 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 329 if (stream->stream_flags & STREAM_WANT_READ) 330 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 331 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 332 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 333 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 334 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 335 drop_buffered_data(stream); 336 lsquic_sfcw_consume_rem(&stream->fc); 337 drop_frames_in(stream); 338 free(stream->push_req); 339 free(stream->uh); 340 free(stream->sm_buf); 341 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 342 SM_HISTORY_DUMP_REMAINING(stream); 343 free(stream); 344} 345 346 347static int 348stream_is_finished (const lsquic_stream_t *stream) 349{ 350 return lsquic_stream_is_closed(stream) 351 /* n_unacked checks that no outgoing packets that reference this 352 * stream are outstanding: 353 */ 354 && 0 == stream->n_unacked 355 /* This checks that no packets that reference this stream will 356 * become outstanding: 357 */ 358 && 0 == (stream->stream_flags & STREAM_SEND_RST) 359 && ((stream->stream_flags & STREAM_FORCE_FINISH) 360 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 361 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 362} 363 364 365static void 366maybe_finish_stream (lsquic_stream_t *stream) 367{ 368 if (0 == (stream->stream_flags & STREAM_FINISHED) && 369 stream_is_finished(stream)) 370 { 371 LSQ_DEBUG("stream %u is now finished", stream->id); 372 SM_HISTORY_APPEND(stream, SHE_FINISHED); 373 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 374 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 375 next_service_stream); 376 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 377 } 378} 379 380 381static void 382maybe_schedule_call_on_close (lsquic_stream_t *stream) 383{ 384 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 385 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 386 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 387 { 388 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 389 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 390 next_service_stream); 391 stream->stream_flags |= STREAM_CALL_ONCLOSE; 392 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 393 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 394 } 395} 396 397 398void 399lsquic_stream_call_on_close (lsquic_stream_t *stream) 400{ 401 assert(stream->stream_flags & STREAM_ONNEW_DONE); 402 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 403 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 404 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 405 next_service_stream); 406 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 407 { 408 LSQ_DEBUG("calling on_close for stream %u", stream->id); 409 stream->stream_flags |= STREAM_ONCLOSE_DONE; 410 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 411 stream->stream_if->on_close(stream, stream->st_ctx); 412 } 413 else 414 assert(0); 415} 416 417 418int 419lsquic_stream_readable (const lsquic_stream_t *stream) 420{ 421 /* A stream is readable if one of the following is true: */ 422 return 423 /* - It is already finished: in that case, lsquic_stream_read() will 424 * return 0. 425 */ 426 (stream->stream_flags & STREAM_FIN_REACHED) 427 /* - The stream is reset, by either side. In this case, 428 * lsquic_stream_read() will return -1 (we want the user to be 429 * able to collect the error). 430 */ 431 || (stream->stream_flags & STREAM_RST_FLAGS) 432 /* - Either we are not in HTTP mode or the HTTP headers have been 433 * received and the headers or data from the stream can be read. 434 */ 435 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 436 == STREAM_USE_HEADERS) 437 && (stream->uh != NULL 438 || stream->data_in->di_if->di_get_frame(stream->data_in, 439 stream->read_offset))) 440 ; 441} 442 443 444size_t 445lsquic_stream_write_avail (const struct lsquic_stream *stream) 446{ 447 uint64_t stream_avail, conn_avail; 448 449 stream_avail = stream->max_send_off - stream->tosend_off 450 - stream->sm_n_buffered; 451 if (stream->stream_flags & STREAM_CONN_LIMITED) 452 { 453 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 454 if (conn_avail < stream_avail) 455 return conn_avail; 456 } 457 458 return stream_avail; 459} 460 461 462int 463lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 464{ 465 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 466 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 467 { 468 return -1; 469 } 470 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 471 { 472 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 473 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 474 next_send_stream); 475 stream->stream_flags |= STREAM_SEND_WUF; 476 } 477 return 0; 478} 479 480 481int 482lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 483{ 484 uint64_t max_off; 485 int got_next_offset; 486 enum ins_frame ins_frame; 487 488 assert(frame->packet_in); 489 490 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 491 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 492 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 493 494 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 495 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 496 { 497 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 498 lsquic_malo_put(frame); 499 return -1; 500 } 501 502 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 503 insert_frame: 504 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 505 if (INS_FRAME_OK == ins_frame) 506 { 507 /* Update maximum offset in the flow controller and check for flow 508 * control violation: 509 */ 510 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 511 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 512 return -1; 513 if (frame->data_frame.df_fin) 514 { 515 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 516 stream->stream_flags |= STREAM_FIN_RECVD; 517 maybe_finish_stream(stream); 518 } 519 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 520 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 521 { 522 stream->data_in = stream->data_in->di_if->di_switch_impl( 523 stream->data_in, stream->read_offset); 524 if (!stream->data_in) 525 { 526 stream->data_in = data_in_error_new(); 527 return -1; 528 } 529 } 530 if (got_next_offset) 531 /* Checking the offset saves di_get_frame() call */ 532 maybe_conn_to_tickable_if_readable(stream); 533 return 0; 534 } 535 else if (INS_FRAME_DUP == ins_frame) 536 { 537 return 0; 538 } 539 else if (INS_FRAME_OVERLAP == ins_frame) 540 { 541 if (stream->stream_flags & STREAM_ALLOW_OVERLAP) 542 { 543 LSQ_DEBUG("overlap: switching DATA IN implementation"); 544 stream->data_in = stream->data_in->di_if->di_switch_impl( 545 stream->data_in, stream->read_offset); 546 if (stream->data_in) 547 goto insert_frame; 548 stream->data_in = data_in_error_new(); 549 } 550 else 551 LSQ_DEBUG("overlap not supported"); 552 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 553 lsquic_malo_put(frame); 554 return -1; 555 } 556 else 557 { 558 assert(INS_FRAME_ERR == ins_frame); 559 return -1; 560 } 561} 562 563 564static void 565drop_frames_in (lsquic_stream_t *stream) 566{ 567 if (stream->data_in) 568 { 569 stream->data_in->di_if->di_destroy(stream->data_in); 570 /* To avoid checking whether `data_in` is set, just set to the error 571 * data-in stream. It does the right thing after incoming data is 572 * dropped. 573 */ 574 stream->data_in = data_in_error_new(); 575 } 576} 577 578 579static void 580maybe_elide_stream_frames (struct lsquic_stream *stream) 581{ 582 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 583 { 584 if (stream->n_unacked) 585 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 586 stream->id); 587 stream->stream_flags |= STREAM_FRAMES_ELIDED; 588 } 589} 590 591 592int 593lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 594 uint32_t error_code) 595{ 596 597 if (stream->stream_flags & STREAM_RST_RECVD) 598 { 599 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 600 return 0; 601 } 602 603 SM_HISTORY_APPEND(stream, SHE_RST_IN); 604 /* This flag must always be set, even if we are "ignoring" it: it is 605 * used by elision code. 606 */ 607 stream->stream_flags |= STREAM_RST_RECVD; 608 609 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 610 { 611 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 612 "smaller than that of byte following the last byte we have seen: " 613 "0x%"PRIX64, stream->id, offset, 614 lsquic_sfcw_get_max_recv_off(&stream->fc)); 615 return -1; 616 } 617 618 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 619 { 620 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 621 " violates flow control", stream->id, offset); 622 return -1; 623 } 624 625 /* Let user collect error: */ 626 maybe_conn_to_tickable_if_readable(stream); 627 628 lsquic_sfcw_consume_rem(&stream->fc); 629 drop_frames_in(stream); 630 drop_buffered_data(stream); 631 maybe_elide_stream_frames(stream); 632 633 if (!(stream->stream_flags & 634 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 635 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 636 637 stream->stream_flags |= STREAM_RST_RECVD; 638 639 maybe_finish_stream(stream); 640 maybe_schedule_call_on_close(stream); 641 642 return 0; 643} 644 645 646uint64_t 647lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 648{ 649 assert(stream->stream_flags & STREAM_SEND_WUF); 650 stream->stream_flags &= ~STREAM_SEND_WUF; 651 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 652 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 653 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 654} 655 656 657void 658lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 659{ 660 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 661 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 662 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 663 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 664 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 665} 666 667 668void 669lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 670{ 671 assert(stream->stream_flags & STREAM_SEND_RST); 672 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 673 stream->stream_flags &= ~STREAM_SEND_RST; 674 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 675 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 676 stream->stream_flags |= STREAM_RST_SENT; 677 maybe_finish_stream(stream); 678} 679 680 681static size_t 682read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 683{ 684 struct uncompressed_headers *uh = stream->uh; 685 size_t n_avail = uh->uh_size - uh->uh_off; 686 if (n_avail < len) 687 len = n_avail; 688 memcpy(dst, uh->uh_headers + uh->uh_off, len); 689 uh->uh_off += len; 690 if (uh->uh_off == uh->uh_size) 691 { 692 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 693 free(uh); 694 stream->uh = NULL; 695 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 696 { 697 stream->stream_flags |= STREAM_FIN_REACHED; 698 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 699 } 700 } 701 return len; 702} 703 704 705/* This function returns 0 when EOF is reached. 706 */ 707ssize_t 708lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 709 int iovcnt) 710{ 711 size_t total_nread, nread; 712 int processed_frames, read_unc_headers, iovidx; 713 unsigned char *p, *end; 714 715 SM_HISTORY_APPEND(stream, SHE_USER_READ); 716 717#define NEXT_IOV() do { \ 718 ++iovidx; \ 719 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 720 ++iovidx; \ 721 if (iovidx < iovcnt) \ 722 { \ 723 p = iov[iovidx].iov_base; \ 724 end = p + iov[iovidx].iov_len; \ 725 } \ 726 else \ 727 p = end = NULL; \ 728} while (0) 729 730#define AVAIL() (end - p) 731 732 if (stream->stream_flags & STREAM_RST_FLAGS) 733 { 734 errno = ECONNRESET; 735 return -1; 736 } 737 if (stream->stream_flags & STREAM_U_READ_DONE) 738 { 739 errno = EBADF; 740 return -1; 741 } 742 if (stream->stream_flags & STREAM_FIN_REACHED) 743 return 0; 744 745 total_nread = 0; 746 processed_frames = 0; 747 748 iovidx = -1; 749 NEXT_IOV(); 750 751 if (stream->uh && AVAIL()) 752 { 753 read_unc_headers = 1; 754 do 755 { 756 nread = read_uh(stream, p, AVAIL()); 757 p += nread; 758 total_nread += nread; 759 if (p == end) 760 NEXT_IOV(); 761 } 762 while (stream->uh && AVAIL()); 763 } 764 else 765 read_unc_headers = 0; 766 767 struct data_frame *data_frame; 768 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 769 { 770 ++processed_frames; 771 size_t navail = data_frame->df_size - data_frame->df_read_off; 772 size_t ntowrite = AVAIL(); 773 if (navail < ntowrite) 774 ntowrite = navail; 775 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 776 p += ntowrite; 777 data_frame->df_read_off += ntowrite; 778 stream->read_offset += ntowrite; 779 total_nread += ntowrite; 780 if (data_frame->df_read_off == data_frame->df_size) 781 { 782 const int fin = data_frame->df_fin; 783 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 784 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 785 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 786 { 787 stream->data_in = stream->data_in->di_if->di_switch_impl( 788 stream->data_in, stream->read_offset); 789 if (!stream->data_in) 790 { 791 stream->data_in = data_in_error_new(); 792 return -1; 793 } 794 } 795 if (fin) 796 { 797 stream->stream_flags |= STREAM_FIN_REACHED; 798 break; 799 } 800 } 801 if (p == end) 802 NEXT_IOV(); 803 } 804 805 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 806 total_nread, stream->read_offset); 807 808 if (processed_frames) 809 { 810 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 811 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 812 { 813 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 814 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 815 stream->stream_flags |= STREAM_SEND_WUF; 816 maybe_conn_to_tickable_if_writeable(stream, 1); 817 } 818 } 819 820 if (processed_frames || read_unc_headers) 821 { 822 return total_nread; 823 } 824 else 825 { 826 assert(0 == total_nread); 827 errno = EWOULDBLOCK; 828 return -1; 829 } 830} 831 832 833ssize_t 834lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 835{ 836 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 837 return lsquic_stream_readv(stream, &iov, 1); 838} 839 840 841static void 842stream_shutdown_read (lsquic_stream_t *stream) 843{ 844 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 845 { 846 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 847 stream->stream_flags |= STREAM_U_READ_DONE; 848 stream_wantread(stream, 0); 849 maybe_finish_stream(stream); 850 } 851} 852 853 854static void 855stream_shutdown_write (lsquic_stream_t *stream) 856{ 857 if (stream->stream_flags & STREAM_U_WRITE_DONE) 858 return; 859 860 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 861 stream->stream_flags |= STREAM_U_WRITE_DONE; 862 stream_wantwrite(stream, 0); 863 864 /* Don't bother to check whether there is anything else to write if 865 * the flags indicate that nothing else should be written. 866 */ 867 if (!(stream->stream_flags & 868 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 869 { 870 if (stream->sm_n_buffered == 0) 871 { 872 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 873 stream)) 874 { 875 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 876 stream->stream_flags |= STREAM_FIN_SENT; 877 } 878 else 879 { 880 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 881 "flag in it"); 882 (void) stream_flush_nocheck(stream); 883 } 884 } 885 else 886 (void) stream_flush_nocheck(stream); 887 } 888} 889 890 891int 892lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 893{ 894 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 895 if (lsquic_stream_is_closed(stream)) 896 { 897 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 898 errno = EBADF; 899 return -1; 900 } 901 /* 0: read, 1: write: 2: read and write 902 */ 903 if (how < 0 || how > 2) 904 { 905 errno = EINVAL; 906 return -1; 907 } 908 909 if (how) 910 stream_shutdown_write(stream); 911 if (how != 1) 912 stream_shutdown_read(stream); 913 914 maybe_finish_stream(stream); 915 maybe_schedule_call_on_close(stream); 916 if (how) 917 maybe_conn_to_tickable_if_writeable(stream, 1); 918 919 return 0; 920} 921 922 923void 924lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 925{ 926 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 927 if (LSQUIC_STREAM_HANDSHAKE == stream->id 928 || ((stream->stream_flags & STREAM_USE_HEADERS) && 929 LSQUIC_STREAM_HEADERS == stream->id)) 930 { 931 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 932 stream->stream_flags |= STREAM_FORCE_FINISH; 933 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 934 } 935 maybe_finish_stream(stream); 936 maybe_schedule_call_on_close(stream); 937} 938 939 940static void 941fake_reset_unused_stream (lsquic_stream_t *stream) 942{ 943 stream->stream_flags |= 944 STREAM_RST_RECVD /* User will pick this up on read or write */ 945 | STREAM_RST_SENT /* Don't send anything else on this stream */ 946 ; 947 948 /* Cancel all writes to the network scheduled for this stream: */ 949 if (stream->stream_flags & STREAM_SENDING_FLAGS) 950 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 951 next_send_stream); 952 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 953 954 LSQ_DEBUG("fake-reset stream %u%s", 955 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 956 maybe_finish_stream(stream); 957 maybe_schedule_call_on_close(stream); 958} 959 960 961/* This function should only be called for locally-initiated streams whose ID 962 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 963 * frame sent by peer but we have not yet received it and created a stream. 964 * In this situation, we mark the stream as reset, so that user's on_read or 965 * on_write event callback picks up the error. That, in turn, should result 966 * in stream being closed. 967 * 968 * If we have received any data frames on this stream, this probably indicates 969 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 970 * lower than this. However, we still try to handle it gracefully and peform 971 * a shutdown, as if the stream was not reset. 972 */ 973void 974lsquic_stream_received_goaway (lsquic_stream_t *stream) 975{ 976 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 977 if (0 == stream->read_offset && 978 stream->data_in->di_if->di_empty(stream->data_in)) 979 fake_reset_unused_stream(stream); /* Normal condition */ 980 else 981 { /* This is odd, let's handle it the best we can: */ 982 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 983 lsquic_stream_shutdown_internal(stream); 984 } 985} 986 987 988uint64_t 989lsquic_stream_read_offset (const lsquic_stream_t *stream) 990{ 991 return stream->read_offset; 992} 993 994 995static int 996stream_wantread (lsquic_stream_t *stream, int is_want) 997{ 998 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 999 const int new_val = !!is_want; 1000 if (old_val != new_val) 1001 { 1002 if (new_val) 1003 { 1004 if (!old_val) 1005 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1006 next_read_stream); 1007 stream->stream_flags |= STREAM_WANT_READ; 1008 } 1009 else 1010 { 1011 stream->stream_flags &= ~STREAM_WANT_READ; 1012 if (old_val) 1013 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1014 next_read_stream); 1015 } 1016 } 1017 return old_val; 1018} 1019 1020 1021static void 1022maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1023{ 1024 assert(STREAM_WRITE_Q_FLAGS & flag); 1025 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1026 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1027 next_write_stream); 1028 stream->stream_flags |= flag; 1029} 1030 1031 1032static void 1033maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1034{ 1035 assert(STREAM_WRITE_Q_FLAGS & flag); 1036 if (stream->stream_flags & flag) 1037 { 1038 stream->stream_flags &= ~flag; 1039 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1040 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1041 next_write_stream); 1042 } 1043} 1044 1045 1046static int 1047stream_wantwrite (lsquic_stream_t *stream, int is_want) 1048{ 1049 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1050 const int new_val = !!is_want; 1051 if (old_val != new_val) 1052 { 1053 if (new_val) 1054 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1055 else 1056 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1057 } 1058 return old_val; 1059} 1060 1061 1062int 1063lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1064{ 1065 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1066 { 1067 if (is_want) 1068 maybe_conn_to_tickable_if_readable(stream); 1069 return stream_wantread(stream, is_want); 1070 } 1071 else 1072 { 1073 errno = EBADF; 1074 return -1; 1075 } 1076} 1077 1078 1079int 1080lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1081{ 1082 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1083 { 1084 if (is_want) 1085 maybe_conn_to_tickable_if_writeable(stream, 1); 1086 return stream_wantwrite(stream, is_want); 1087 } 1088 else 1089 { 1090 errno = EBADF; 1091 return -1; 1092 } 1093} 1094 1095 1096#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1097 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1098 1099 1100static void 1101stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1102{ 1103 unsigned no_progress_count, no_progress_limit; 1104 enum stream_flags flags; 1105 uint64_t size; 1106 1107 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1108 1109 no_progress_count = 0; 1110 while ((stream->stream_flags & STREAM_WANT_READ) 1111 && lsquic_stream_readable(stream)) 1112 { 1113 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1114 size = stream->read_offset; 1115 1116 stream->stream_if->on_read(stream, stream->st_ctx); 1117 1118 if (no_progress_limit && size == stream->read_offset && 1119 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1120 { 1121 ++no_progress_count; 1122 if (no_progress_count >= no_progress_limit) 1123 { 1124 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1125 "progress) in user code reading from stream", 1126 no_progress_count, 1127 no_progress_count == 1 ? "" : "s"); 1128 break; 1129 } 1130 } 1131 else 1132 no_progress_count = 0; 1133 } 1134} 1135 1136 1137static void 1138stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1139{ 1140 unsigned no_progress_count, no_progress_limit; 1141 enum stream_flags flags; 1142 1143 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1144 1145 no_progress_count = 0; 1146 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1147 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1148 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1149 && lsquic_stream_write_avail(stream)) 1150 { 1151 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1152 1153 stream->stream_if->on_write(stream, stream->st_ctx); 1154 1155 if (no_progress_limit && 1156 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1157 { 1158 ++no_progress_count; 1159 if (no_progress_count >= no_progress_limit) 1160 { 1161 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1162 "progress) in user code writing to stream", 1163 no_progress_count, 1164 no_progress_count == 1 ? "" : "s"); 1165 break; 1166 } 1167 } 1168 else 1169 no_progress_count = 0; 1170 } 1171} 1172 1173 1174static void 1175stream_dispatch_read_events_once (lsquic_stream_t *stream) 1176{ 1177 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1178 { 1179 stream->stream_if->on_read(stream, stream->st_ctx); 1180 } 1181} 1182 1183 1184static void 1185maybe_mark_as_blocked (lsquic_stream_t *stream) 1186{ 1187 struct lsquic_conn_cap *cc; 1188 1189 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1190 { 1191 if (stream->blocked_off < stream->max_send_off) 1192 { 1193 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1194 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1195 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1196 next_send_stream); 1197 stream->stream_flags |= STREAM_SEND_BLOCKED; 1198 LSQ_DEBUG("marked stream-blocked at stream offset " 1199 "%"PRIu64, stream->blocked_off); 1200 } 1201 else 1202 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1203 " has been, or is about to be, sent", stream->blocked_off); 1204 } 1205 1206 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1207 && (cc = &stream->conn_pub->conn_cap, 1208 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1209 { 1210 if (cc->cc_blocked < cc->cc_max) 1211 { 1212 cc->cc_blocked = cc->cc_max; 1213 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1214 LSQ_DEBUG("marked connection-blocked at connection offset " 1215 "%"PRIu64, cc->cc_max); 1216 } 1217 else 1218 LSQ_DEBUG("stream has already been marked connection-blocked " 1219 "at offset %"PRIu64, cc->cc_blocked); 1220 } 1221} 1222 1223 1224void 1225lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1226{ 1227 assert(stream->stream_flags & STREAM_WANT_READ); 1228 1229 if (stream->stream_flags & STREAM_RW_ONCE) 1230 stream_dispatch_read_events_once(stream); 1231 else 1232 stream_dispatch_read_events_loop(stream); 1233} 1234 1235 1236void 1237lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1238{ 1239 int progress; 1240 uint64_t tosend_off; 1241 unsigned short n_buffered; 1242 enum stream_flags flags; 1243 1244 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1245 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1246 tosend_off = stream->tosend_off; 1247 n_buffered = stream->sm_n_buffered; 1248 1249 if (stream->stream_flags & STREAM_WANT_FLUSH) 1250 (void) stream_flush(stream); 1251 1252 if (stream->stream_flags & STREAM_RW_ONCE) 1253 { 1254 if ((stream->stream_flags & STREAM_WANT_WRITE) 1255 && lsquic_stream_write_avail(stream)) 1256 { 1257 stream->stream_if->on_write(stream, stream->st_ctx); 1258 } 1259 } 1260 else 1261 stream_dispatch_write_events_loop(stream); 1262 1263 /* Progress means either flags or offsets changed: */ 1264 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1265 stream->tosend_off == tosend_off && 1266 stream->sm_n_buffered == n_buffered); 1267 1268 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1269 { 1270 if (progress) 1271 { /* Move the stream to the end of the list to ensure fairness. */ 1272 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1273 next_write_stream); 1274 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1275 next_write_stream); 1276 } 1277 } 1278} 1279 1280 1281static size_t 1282inner_reader_empty_size (void *ctx) 1283{ 1284 return 0; 1285} 1286 1287 1288static size_t 1289inner_reader_empty_read (void *ctx, void *buf, size_t count) 1290{ 1291 return 0; 1292} 1293 1294 1295static int 1296stream_flush (lsquic_stream_t *stream) 1297{ 1298 struct lsquic_reader empty_reader; 1299 ssize_t nw; 1300 1301 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1302 assert(stream->sm_n_buffered > 0 || 1303 /* Flushing is also used to packetize standalone FIN: */ 1304 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1305 == STREAM_U_WRITE_DONE)); 1306 1307 empty_reader.lsqr_size = inner_reader_empty_size; 1308 empty_reader.lsqr_read = inner_reader_empty_read; 1309 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1310 nw = stream_write_to_packets(stream, &empty_reader, 0); 1311 1312 if (nw >= 0) 1313 { 1314 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1315 return 0; 1316 } 1317 else 1318 return -1; 1319} 1320 1321 1322static int 1323stream_flush_nocheck (lsquic_stream_t *stream) 1324{ 1325 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1326 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1327 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1328 1329 return stream_flush(stream); 1330} 1331 1332 1333int 1334lsquic_stream_flush (lsquic_stream_t *stream) 1335{ 1336 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1337 { 1338 LSQ_DEBUG("cannot flush closed stream"); 1339 errno = EBADF; 1340 return -1; 1341 } 1342 1343 if (0 == stream->sm_n_buffered) 1344 { 1345 LSQ_DEBUG("flushing 0 bytes: noop"); 1346 return 0; 1347 } 1348 1349 return stream_flush_nocheck(stream); 1350} 1351 1352 1353/* The flush threshold is the maximum size of stream data that can be sent 1354 * in a full packet. 1355 */ 1356static size_t 1357flush_threshold (const lsquic_stream_t *stream) 1358{ 1359 enum packet_out_flags flags; 1360 enum lsquic_packno_bits bits; 1361 unsigned packet_header_sz, stream_header_sz; 1362 size_t threshold; 1363 1364 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1365 flags = bits << POBIT_SHIFT; 1366 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1367 flags |= PO_CONN_ID; 1368 1369 packet_header_sz = lsquic_po_header_length(flags); 1370 stream_header_sz = stream->conn_pub->lconn->cn_pf 1371 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1372 1373 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1374 - packet_header_sz - stream_header_sz; 1375 return threshold; 1376} 1377 1378 1379#define COMMON_WRITE_CHECKS() do { \ 1380 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1381 == STREAM_USE_HEADERS) \ 1382 { \ 1383 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1384 errno = EILSEQ; \ 1385 return -1; \ 1386 } \ 1387 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1388 { \ 1389 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1390 errno = ECONNRESET; \ 1391 return -1; \ 1392 } \ 1393 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1394 { \ 1395 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1396 "writing"); \ 1397 errno = EBADF; \ 1398 return -1; \ 1399 } \ 1400} while (0) 1401 1402 1403struct frame_gen_ctx 1404{ 1405 lsquic_stream_t *fgc_stream; 1406 struct lsquic_reader *fgc_reader; 1407 /* We keep our own count of how many bytes were read from reader because 1408 * some readers are external. The external caller does not have to rely 1409 * on our count, but it can. 1410 */ 1411 size_t fgc_nread_from_reader; 1412}; 1413 1414 1415static size_t 1416frame_gen_size (void *ctx) 1417{ 1418 struct frame_gen_ctx *fg_ctx = ctx; 1419 size_t available, remaining; 1420 1421 /* Make sure we are not writing past available size: */ 1422 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1423 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1424 if (available < remaining) 1425 remaining = available; 1426 1427 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1428} 1429 1430 1431static int 1432frame_gen_fin (void *ctx) 1433{ 1434 struct frame_gen_ctx *fg_ctx = ctx; 1435 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1436 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1437 /* Do not use frame_gen_size() as it may chop the real size: */ 1438 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1439} 1440 1441 1442static void 1443incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1444{ 1445 if (stream->stream_flags & STREAM_CONN_LIMITED) 1446 { 1447 stream->conn_pub->conn_cap.cc_sent += incr; 1448 assert(stream->conn_pub->conn_cap.cc_sent 1449 <= stream->conn_pub->conn_cap.cc_max); 1450 } 1451} 1452 1453 1454static size_t 1455frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1456{ 1457 struct frame_gen_ctx *fg_ctx = ctx; 1458 unsigned char *p = begin_buf; 1459 unsigned char *const end = p + len; 1460 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1461 size_t n_written, available, n_to_write; 1462 1463 if (stream->sm_n_buffered > 0) 1464 { 1465 if (len <= stream->sm_n_buffered) 1466 { 1467 memcpy(p, stream->sm_buf, len); 1468 memmove(stream->sm_buf, stream->sm_buf + len, 1469 stream->sm_n_buffered - len); 1470 stream->sm_n_buffered -= len; 1471 stream->tosend_off += len; 1472 *fin = frame_gen_fin(fg_ctx); 1473 return len; 1474 } 1475 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1476 p += stream->sm_n_buffered; 1477 stream->sm_n_buffered = 0; 1478 } 1479 1480 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1481 n_to_write = end - p; 1482 if (n_to_write > available) 1483 n_to_write = available; 1484 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1485 n_to_write); 1486 p += n_written; 1487 fg_ctx->fgc_nread_from_reader += n_written; 1488 *fin = frame_gen_fin(fg_ctx); 1489 stream->tosend_off += p - (const unsigned char *) begin_buf; 1490 incr_conn_cap(stream, n_written); 1491 return p - (const unsigned char *) begin_buf; 1492} 1493 1494 1495static void 1496check_flush_threshold (lsquic_stream_t *stream) 1497{ 1498 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1499 stream->tosend_off >= stream->sm_flush_to) 1500 { 1501 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1502 stream->sm_flush_to); 1503 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1504 } 1505} 1506 1507 1508static struct lsquic_packet_out * 1509get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1510 const struct lsquic_stream *stream) 1511{ 1512 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1513} 1514 1515 1516static struct lsquic_packet_out * (* const get_packet[])( 1517 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1518{ 1519 lsquic_send_ctl_get_packet_for_stream, 1520 get_brand_new_packet, 1521}; 1522 1523 1524static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1525stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1526{ 1527 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1528 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1529 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1530 unsigned stream_header_sz, need_at_least, off; 1531 lsquic_packet_out_t *packet_out; 1532 int len, s, hsk; 1533 1534 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1535 stream->tosend_off); 1536 need_at_least = stream_header_sz + (size > 0); 1537 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1538 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1539 if (!packet_out) 1540 return SWTP_STOP; 1541 1542 off = packet_out->po_data_sz; 1543 len = pf->pf_gen_stream_frame( 1544 packet_out->po_data + packet_out->po_data_sz, 1545 lsquic_packet_out_avail(packet_out), stream->id, 1546 stream->tosend_off, 1547 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1548 if (len < 0) 1549 { 1550 LSQ_ERROR("could not generate stream frame"); 1551 return SWTP_ERROR; 1552 } 1553 1554 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1555 packet_out->po_data + packet_out->po_data_sz, len); 1556 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1557 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1558 if (0 == lsquic_packet_out_avail(packet_out)) 1559 packet_out->po_flags |= PO_STREAM_END; 1560 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1561 stream, QUIC_FRAME_STREAM, off, len); 1562 if (s != 0) 1563 { 1564 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1565 return SWTP_ERROR; 1566 } 1567 1568 check_flush_threshold(stream); 1569 1570 /* XXX: I don't like it that this is here */ 1571 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1572 { 1573 lsquic_packet_out_zero_pad(packet_out); 1574 packet_out->po_flags |= PO_HELLO; 1575 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1576 } 1577 1578 return SWTP_OK; 1579} 1580 1581 1582static void 1583abort_connection (struct lsquic_stream *stream) 1584{ 1585 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1586 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1587 next_service_stream); 1588 stream->stream_flags |= STREAM_ABORT_CONN; 1589 LSQ_WARN("connection will be aborted"); 1590 maybe_conn_to_tickable(stream); 1591} 1592 1593 1594static ssize_t 1595stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1596 size_t thresh) 1597{ 1598 size_t size; 1599 ssize_t nw; 1600 unsigned seen_ok; 1601 struct frame_gen_ctx fg_ctx = { 1602 .fgc_stream = stream, 1603 .fgc_reader = reader, 1604 .fgc_nread_from_reader = 0, 1605 }; 1606 1607 seen_ok = 0; 1608 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1609 || frame_gen_fin(&fg_ctx)) 1610 { 1611 switch (stream_write_to_packet(&fg_ctx, size)) 1612 { 1613 case SWTP_OK: 1614 if (!seen_ok++) 1615 maybe_conn_to_tickable_if_writeable(stream, 0); 1616 if (frame_gen_fin(&fg_ctx)) 1617 { 1618 stream->stream_flags |= STREAM_FIN_SENT; 1619 goto end; 1620 } 1621 else 1622 break; 1623 case SWTP_STOP: 1624 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1625 goto end; 1626 default: 1627 abort_connection(stream); 1628 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1629 return -1; 1630 } 1631 } 1632 1633 if (thresh) 1634 { 1635 assert(size < thresh); 1636 assert(size >= stream->sm_n_buffered); 1637 size -= stream->sm_n_buffered; 1638 if (size > 0) 1639 { 1640 nw = save_to_buffer(stream, reader, size); 1641 if (nw < 0) 1642 return -1; 1643 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1644 } 1645 } 1646 else 1647 { 1648 /* We count flushed data towards both stream and connection limits, 1649 * so we should have been able to packetize all of it: 1650 */ 1651 assert(0 == stream->sm_n_buffered); 1652 assert(size == 0); 1653 } 1654 1655 maybe_mark_as_blocked(stream); 1656 1657 end: 1658 return fg_ctx.fgc_nread_from_reader; 1659} 1660 1661 1662/* Perform an implicit flush when we hit connection limit while buffering 1663 * data. This is to prevent a (theoretical) stall: 1664 * 1665 * Imagine a number of streams, all of which buffered some data. The buffered 1666 * data is up to connection cap, which means no further writes are possible. 1667 * None of them flushes, which means that data is not sent and connection 1668 * WINDOW_UPDATE frame never arrives from peer. Stall. 1669 */ 1670static int 1671maybe_flush_stream (struct lsquic_stream *stream) 1672{ 1673 if (stream->sm_n_buffered > 0 1674 && (stream->stream_flags & STREAM_CONN_LIMITED) 1675 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1676 return stream_flush_nocheck(stream); 1677 else 1678 return 0; 1679} 1680 1681 1682static ssize_t 1683save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1684 size_t len) 1685{ 1686 size_t avail, n_written; 1687 1688 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1689 1690 if (!stream->sm_buf) 1691 { 1692 stream->sm_buf = malloc(SM_BUF_SIZE); 1693 if (!stream->sm_buf) 1694 return -1; 1695 } 1696 1697 avail = lsquic_stream_write_avail(stream); 1698 if (avail < len) 1699 len = avail; 1700 1701 n_written = reader->lsqr_read(reader->lsqr_ctx, 1702 stream->sm_buf + stream->sm_n_buffered, len); 1703 stream->sm_n_buffered += n_written; 1704 incr_conn_cap(stream, n_written); 1705 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1706 n_written, stream->sm_n_buffered); 1707 if (0 != maybe_flush_stream(stream)) 1708 return -1; 1709 return n_written; 1710} 1711 1712 1713static ssize_t 1714stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1715{ 1716 size_t thresh, len; 1717 1718 thresh = flush_threshold(stream); 1719 len = reader->lsqr_size(reader->lsqr_ctx); 1720 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1721 stream->sm_n_buffered + len < thresh) 1722 return save_to_buffer(stream, reader, len); 1723 else 1724 return stream_write_to_packets(stream, reader, thresh); 1725} 1726 1727 1728ssize_t 1729lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1730{ 1731 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1732 return lsquic_stream_writev(stream, &iov, 1); 1733} 1734 1735 1736struct inner_reader_iovec { 1737 const struct iovec *iov; 1738 const struct iovec *end; 1739 unsigned cur_iovec_off; 1740}; 1741 1742 1743static size_t 1744inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1745{ 1746 struct inner_reader_iovec *const iro = ctx; 1747 unsigned char *p = buf; 1748 unsigned char *const end = p + count; 1749 unsigned n_tocopy; 1750 1751 while (iro->iov < iro->end && p < end) 1752 { 1753 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1754 if (n_tocopy > (unsigned) (end - p)) 1755 n_tocopy = end - p; 1756 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1757 n_tocopy); 1758 p += n_tocopy; 1759 iro->cur_iovec_off += n_tocopy; 1760 if (iro->iov->iov_len == iro->cur_iovec_off) 1761 { 1762 ++iro->iov; 1763 iro->cur_iovec_off = 0; 1764 } 1765 } 1766 1767 return p + count - end; 1768} 1769 1770 1771static size_t 1772inner_reader_iovec_size (void *ctx) 1773{ 1774 struct inner_reader_iovec *const iro = ctx; 1775 const struct iovec *iov; 1776 size_t size; 1777 1778 size = 0; 1779 for (iov = iro->iov; iov < iro->end; ++iov) 1780 size += iov->iov_len; 1781 1782 return size - iro->cur_iovec_off; 1783} 1784 1785 1786ssize_t 1787lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1788 int iovcnt) 1789{ 1790 COMMON_WRITE_CHECKS(); 1791 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1792 1793 struct inner_reader_iovec iro = { 1794 .iov = iov, 1795 .end = iov + iovcnt, 1796 .cur_iovec_off = 0, 1797 }; 1798 struct lsquic_reader reader = { 1799 .lsqr_read = inner_reader_iovec_read, 1800 .lsqr_size = inner_reader_iovec_size, 1801 .lsqr_ctx = &iro, 1802 }; 1803 1804 return stream_write(stream, &reader); 1805} 1806 1807 1808ssize_t 1809lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1810{ 1811 COMMON_WRITE_CHECKS(); 1812 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1813 return stream_write(stream, reader); 1814} 1815 1816 1817int 1818lsquic_stream_send_headers (lsquic_stream_t *stream, 1819 const lsquic_http_headers_t *headers, int eos) 1820{ 1821 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1822 STREAM_U_WRITE_DONE)) 1823 == STREAM_USE_HEADERS) 1824 { 1825 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1826 stream->id, headers, eos, lsquic_stream_priority(stream)); 1827 if (0 == s) 1828 { 1829 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1830 stream->stream_flags |= STREAM_HEADERS_SENT; 1831 if (eos) 1832 stream->stream_flags |= STREAM_FIN_SENT; 1833 LSQ_INFO("sent headers for stream %u", stream->id); 1834 } 1835 else 1836 LSQ_WARN("could not send headers: %s", strerror(errno)); 1837 return s; 1838 } 1839 else 1840 { 1841 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1842 errno = EBADMSG; 1843 return -1; 1844 } 1845} 1846 1847 1848void 1849lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1850{ 1851 if (offset > stream->max_send_off) 1852 { 1853 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1854 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1855 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1856 stream->max_send_off = offset; 1857 } 1858 else 1859 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1860 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1861 stream->max_send_off); 1862} 1863 1864 1865/* This function is used to update offsets after handshake completes and we 1866 * learn of peer's limits from the handshake values. 1867 */ 1868int 1869lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1870{ 1871 LSQ_DEBUG("setting max_send_off to %u", offset); 1872 if (offset > stream->max_send_off) 1873 { 1874 lsquic_stream_window_update(stream, offset); 1875 return 0; 1876 } 1877 else if (offset < stream->tosend_off) 1878 { 1879 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1880 "already sent on this stream (%"PRIu64" bytes)", offset, 1881 stream->tosend_off); 1882 return -1; 1883 } 1884 else 1885 { 1886 stream->max_send_off = offset; 1887 return 0; 1888 } 1889} 1890 1891 1892void 1893lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1894{ 1895 lsquic_stream_reset_ext(stream, error_code, 1); 1896} 1897 1898 1899void 1900lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1901 int do_close) 1902{ 1903 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1904 { 1905 LSQ_INFO("reset already sent"); 1906 return; 1907 } 1908 1909 SM_HISTORY_APPEND(stream, SHE_RESET); 1910 1911 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1912 stream->error_code = error_code; 1913 1914 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1915 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1916 next_send_stream); 1917 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1918 stream->stream_flags |= STREAM_SEND_RST; 1919 1920 drop_buffered_data(stream); 1921 maybe_elide_stream_frames(stream); 1922 maybe_schedule_call_on_close(stream); 1923 1924 if (do_close) 1925 lsquic_stream_close(stream); 1926 else 1927 maybe_conn_to_tickable_if_writeable(stream, 1); 1928} 1929 1930 1931unsigned 1932lsquic_stream_id (const lsquic_stream_t *stream) 1933{ 1934 return stream->id; 1935} 1936 1937 1938struct lsquic_conn * 1939lsquic_stream_conn (const lsquic_stream_t *stream) 1940{ 1941 return stream->conn_pub->lconn; 1942} 1943 1944 1945int 1946lsquic_stream_close (lsquic_stream_t *stream) 1947{ 1948 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1949 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1950 if (lsquic_stream_is_closed(stream)) 1951 { 1952 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1953 errno = EBADF; 1954 return -1; 1955 } 1956 stream_shutdown_write(stream); 1957 stream_shutdown_read(stream); 1958 maybe_schedule_call_on_close(stream); 1959 maybe_finish_stream(stream); 1960 maybe_conn_to_tickable_if_writeable(stream, 1); 1961 return 0; 1962} 1963 1964 1965#ifndef NDEBUG 1966#if __GNUC__ 1967__attribute__((weak)) 1968#endif 1969#endif 1970void 1971lsquic_stream_acked (lsquic_stream_t *stream) 1972{ 1973 assert(stream->n_unacked); 1974 --stream->n_unacked; 1975 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 1976 if (0 == stream->n_unacked) 1977 maybe_finish_stream(stream); 1978} 1979 1980 1981void 1982lsquic_stream_push_req (lsquic_stream_t *stream, 1983 struct uncompressed_headers *push_req) 1984{ 1985 assert(!stream->push_req); 1986 stream->push_req = push_req; 1987 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 1988} 1989 1990 1991int 1992lsquic_stream_is_pushed (const lsquic_stream_t *stream) 1993{ 1994 return 1 & ~stream->id; 1995} 1996 1997 1998int 1999lsquic_stream_push_info (const lsquic_stream_t *stream, 2000 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 2001{ 2002 if (lsquic_stream_is_pushed(stream)) 2003 { 2004 assert(stream->push_req); 2005 *ref_stream_id = stream->push_req->uh_stream_id; 2006 *headers = stream->push_req->uh_headers; 2007 *headers_sz = stream->push_req->uh_size; 2008 return 0; 2009 } 2010 else 2011 return -1; 2012} 2013 2014 2015int 2016lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2017{ 2018 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2019 { 2020 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2021 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2022 stream->stream_flags |= STREAM_HAVE_UH; 2023 if (uh->uh_flags & UH_FIN) 2024 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2025 stream->uh = uh; 2026 if (uh->uh_oth_stream_id == 0) 2027 { 2028 if (uh->uh_weight) 2029 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2030 } 2031 else 2032 LSQ_NOTICE("don't know how to depend on stream %u", 2033 uh->uh_oth_stream_id); 2034 return 0; 2035 } 2036 else 2037 { 2038 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2039 return -1; 2040 } 2041} 2042 2043 2044unsigned 2045lsquic_stream_priority (const lsquic_stream_t *stream) 2046{ 2047 return 256 - stream->sm_priority; 2048} 2049 2050 2051int 2052lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2053{ 2054 /* The user should never get a reference to the special streams, 2055 * but let's check just in case: 2056 */ 2057 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2058 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2059 LSQUIC_STREAM_HEADERS == stream->id)) 2060 return -1; 2061 if (priority < 1 || priority > 256) 2062 return -1; 2063 stream->sm_priority = 256 - priority; 2064 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2065 LSQ_DEBUG("set priority to %u", priority); 2066 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2067 return 0; 2068} 2069 2070 2071int 2072lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2073{ 2074 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2075 { 2076 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2077 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2078 { 2079 /* We need to send headers only if we are a) using HEADERS stream 2080 * and b) we already sent initial headers. If initial headers 2081 * have not been sent yet, stream priority will be sent in the 2082 * HEADERS frame. 2083 */ 2084 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2085 stream->id, 0, 0, priority); 2086 } 2087 else 2088 return 0; 2089 } 2090 else 2091 return -1; 2092} 2093 2094 2095lsquic_stream_ctx_t * 2096lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2097{ 2098 return stream->st_ctx; 2099} 2100 2101 2102int 2103lsquic_stream_refuse_push (lsquic_stream_t *stream) 2104{ 2105 if (lsquic_stream_is_pushed(stream) && 2106 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2107 { 2108 LSQ_DEBUG("refusing pushed stream: send reset"); 2109 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2110 return 0; 2111 } 2112 else 2113 return -1; 2114} 2115 2116 2117size_t 2118lsquic_stream_mem_used (const struct lsquic_stream *stream) 2119{ 2120 size_t size; 2121 2122 size = sizeof(stream); 2123 if (stream->sm_buf) 2124 size += SM_BUF_SIZE; 2125 if (stream->data_in) 2126 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2127 2128 return size; 2129} 2130 2131 2132lsquic_cid_t 2133lsquic_stream_cid (const struct lsquic_stream *stream) 2134{ 2135 return LSQUIC_LOG_CONN_ID; 2136} 2137