lsquic_stream.c revision e8bd737d
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_frame_reader.h" 44#include "lsquic_conn.h" 45#include "lsquic_data_in_if.h" 46#include "lsquic_parse.h" 47#include "lsquic_packet_out.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_senhist.h" 50#include "lsquic_pacer.h" 51#include "lsquic_cubic.h" 52#include "lsquic_send_ctl.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 153# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 154 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 155 LSQ_DEBUG("history: [%.*s]", \ 156 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 157 (stream)->sm_hist_buf); \ 158 } while (0) 159#else 160# define SM_HISTORY_APPEND(stream, event) 161# define SM_HISTORY_DUMP_REMAINING(stream) 162#endif 163 164 165static int 166stream_inside_callback (const lsquic_stream_t *stream) 167{ 168 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 169} 170 171 172/* Here, "readable" means that the user is able to read from the stream. */ 173static void 174maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 175{ 176 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 177 { 178 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 179 stream->conn_pub->lconn); 180 } 181} 182 183 184/* Here, "writeable" means that data can be put into packets to be 185 * scheduled to be sent out. 186 */ 187static void 188maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream) 189{ 190 if (!stream_inside_callback(stream) && 191 lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) && 192 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 193 { 194 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 195 stream->conn_pub->lconn); 196 } 197} 198 199 200static int 201stream_stalled (const lsquic_stream_t *stream) 202{ 203 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 204 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 205 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 206} 207 208 209/* TODO: The logic to figure out whether the stream is connection limited 210 * should be taken out of the constructor. The caller should specify this 211 * via one of enum stream_ctor_flags. 212 */ 213lsquic_stream_t * 214lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 215 const struct lsquic_stream_if *stream_if, 216 void *stream_if_ctx, unsigned initial_window, 217 unsigned initial_send_off, 218 enum stream_ctor_flags ctor_flags) 219{ 220 lsquic_cfcw_t *cfcw; 221 lsquic_stream_t *stream; 222 223 stream = calloc(1, sizeof(*stream)); 224 if (!stream) 225 return NULL; 226 227 stream->stream_if = stream_if; 228 stream->id = id; 229 stream->conn_pub = conn_pub; 230 stream->sm_onnew_arg = stream_if_ctx; 231 if (!initial_window) 232 initial_window = 16 * 1024; 233 if (LSQUIC_STREAM_HANDSHAKE == id || 234 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 235 cfcw = NULL; 236 else 237 { 238 cfcw = &conn_pub->cfcw; 239 stream->stream_flags |= STREAM_CONN_LIMITED; 240 if (conn_pub->hs) 241 stream->stream_flags |= STREAM_USE_HEADERS; 242 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 243 } 244 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 245 if (!initial_send_off) 246 initial_send_off = 16 * 1024; 247 stream->max_send_off = initial_send_off; 248 if (ctor_flags & SCF_USE_DI_HASH) 249 stream->data_in = data_in_hash_new(conn_pub, id, 0); 250 else 251 stream->data_in = data_in_nocopy_new(conn_pub, id); 252 LSQ_DEBUG("created stream %u @%p", id, stream); 253 SM_HISTORY_APPEND(stream, SHE_CREATED); 254 if (ctor_flags & SCF_DI_AUTOSWITCH) 255 stream->stream_flags |= STREAM_AUTOSWITCH; 256 if (ctor_flags & SCF_CALL_ON_NEW) 257 lsquic_stream_call_on_new(stream); 258 if (ctor_flags & SCF_DISP_RW_ONCE) 259 stream->stream_flags |= STREAM_RW_ONCE; 260 return stream; 261} 262 263 264void 265lsquic_stream_call_on_new (lsquic_stream_t *stream) 266{ 267 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 268 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 269 { 270 LSQ_DEBUG("calling on_new_stream"); 271 SM_HISTORY_APPEND(stream, SHE_ONNEW); 272 stream->stream_flags |= STREAM_ONNEW_DONE; 273 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 274 stream); 275 } 276} 277 278 279static void 280decr_conn_cap (struct lsquic_stream *stream, size_t incr) 281{ 282 if (stream->stream_flags & STREAM_CONN_LIMITED) 283 { 284 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 285 stream->conn_pub->conn_cap.cc_sent -= incr; 286 } 287} 288 289 290static void 291drop_buffered_data (struct lsquic_stream *stream) 292{ 293 decr_conn_cap(stream, stream->sm_n_buffered); 294 stream->sm_n_buffered = 0; 295 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 296 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 297} 298 299 300void 301lsquic_stream_destroy (lsquic_stream_t *stream) 302{ 303 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 304 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 305 STREAM_ONNEW_DONE) 306 { 307 stream->stream_flags |= STREAM_ONCLOSE_DONE; 308 stream->stream_if->on_close(stream, stream->st_ctx); 309 } 310 if (stream->stream_flags & STREAM_SENDING_FLAGS) 311 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 312 if (stream->stream_flags & STREAM_WANT_READ) 313 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 314 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 315 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 316 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 317 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 318 drop_buffered_data(stream); 319 lsquic_sfcw_consume_rem(&stream->fc); 320 drop_frames_in(stream); 321 free(stream->push_req); 322 free(stream->uh); 323 free(stream->sm_buf); 324 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 325 SM_HISTORY_DUMP_REMAINING(stream); 326 free(stream); 327} 328 329 330static int 331stream_is_finished (const lsquic_stream_t *stream) 332{ 333 return lsquic_stream_is_closed(stream) 334 /* n_unacked checks that no outgoing packets that reference this 335 * stream are outstanding: 336 */ 337 && 0 == stream->n_unacked 338 /* This checks that no packets that reference this stream will 339 * become outstanding: 340 */ 341 && 0 == (stream->stream_flags & STREAM_SEND_RST) 342 && ((stream->stream_flags & STREAM_FORCE_FINISH) 343 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 344 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 345} 346 347 348static void 349maybe_finish_stream (lsquic_stream_t *stream) 350{ 351 if (0 == (stream->stream_flags & STREAM_FINISHED) && 352 stream_is_finished(stream)) 353 { 354 LSQ_DEBUG("stream %u is now finished", stream->id); 355 SM_HISTORY_APPEND(stream, SHE_FINISHED); 356 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 357 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 358 next_service_stream); 359 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 360 } 361} 362 363 364static void 365maybe_schedule_call_on_close (lsquic_stream_t *stream) 366{ 367 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 368 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 369 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 370 { 371 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 372 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 373 next_service_stream); 374 stream->stream_flags |= STREAM_CALL_ONCLOSE; 375 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 376 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 377 } 378} 379 380 381void 382lsquic_stream_call_on_close (lsquic_stream_t *stream) 383{ 384 assert(stream->stream_flags & STREAM_ONNEW_DONE); 385 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 386 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 387 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 388 next_service_stream); 389 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 390 { 391 LSQ_DEBUG("calling on_close for stream %u", stream->id); 392 stream->stream_flags |= STREAM_ONCLOSE_DONE; 393 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 394 stream->stream_if->on_close(stream, stream->st_ctx); 395 } 396 else 397 assert(0); 398} 399 400 401int 402lsquic_stream_readable (const lsquic_stream_t *stream) 403{ 404 /* A stream is readable if one of the following is true: */ 405 return 406 /* - It is already finished: in that case, lsquic_stream_read() will 407 * return 0. 408 */ 409 (stream->stream_flags & STREAM_FIN_REACHED) 410 /* - The stream is reset, by either side. In this case, 411 * lsquic_stream_read() will return -1 (we want the user to be 412 * able to collect the error). 413 */ 414 || (stream->stream_flags & STREAM_RST_FLAGS) 415 /* - Either we are not in HTTP mode or the HTTP headers have been 416 * received and the headers or data from the stream can be read. 417 */ 418 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 419 == STREAM_USE_HEADERS) 420 && (stream->uh != NULL 421 || stream->data_in->di_if->di_get_frame(stream->data_in, 422 stream->read_offset))) 423 ; 424} 425 426 427size_t 428lsquic_stream_write_avail (const struct lsquic_stream *stream) 429{ 430 uint64_t stream_avail, conn_avail; 431 432 stream_avail = stream->max_send_off - stream->tosend_off 433 - stream->sm_n_buffered; 434 if (stream->stream_flags & STREAM_CONN_LIMITED) 435 { 436 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 437 if (conn_avail < stream_avail) 438 return conn_avail; 439 } 440 441 return stream_avail; 442} 443 444 445int 446lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 447{ 448 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 449 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 450 { 451 return -1; 452 } 453 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 454 { 455 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 456 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 457 next_send_stream); 458 stream->stream_flags |= STREAM_SEND_WUF; 459 } 460 return 0; 461} 462 463 464int 465lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 466{ 467 uint64_t max_off; 468 int got_next_offset; 469 enum ins_frame ins_frame; 470 471 assert(frame->packet_in); 472 473 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 474 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 475 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 476 477 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 478 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 479 { 480 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 481 lsquic_malo_put(frame); 482 return -1; 483 } 484 485 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 486 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 487 if (INS_FRAME_OK == ins_frame) 488 { 489 /* Update maximum offset in the flow controller and check for flow 490 * control violation: 491 */ 492 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 493 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 494 return -1; 495 if (frame->data_frame.df_fin) 496 { 497 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 498 stream->stream_flags |= STREAM_FIN_RECVD; 499 maybe_finish_stream(stream); 500 } 501 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 502 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 503 { 504 stream->data_in = stream->data_in->di_if->di_switch_impl( 505 stream->data_in, stream->read_offset); 506 if (!stream->data_in) 507 { 508 stream->data_in = data_in_error_new(); 509 return -1; 510 } 511 } 512 if (got_next_offset) 513 /* Checking the offset saves di_get_frame() call */ 514 maybe_conn_to_tickable_if_readable(stream); 515 return 0; 516 } 517 else if (INS_FRAME_DUP == ins_frame) 518 { 519 return 0; 520 } 521 else 522 { 523 assert(INS_FRAME_ERR == ins_frame); 524 return -1; 525 } 526} 527 528 529static void 530drop_frames_in (lsquic_stream_t *stream) 531{ 532 if (stream->data_in) 533 { 534 stream->data_in->di_if->di_destroy(stream->data_in); 535 /* To avoid checking whether `data_in` is set, just set to the error 536 * data-in stream. It does the right thing after incoming data is 537 * dropped. 538 */ 539 stream->data_in = data_in_error_new(); 540 } 541} 542 543 544static void 545maybe_elide_stream_frames (struct lsquic_stream *stream) 546{ 547 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 548 { 549 if (stream->n_unacked) 550 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 551 stream->id); 552 stream->stream_flags |= STREAM_FRAMES_ELIDED; 553 } 554} 555 556 557int 558lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 559 uint32_t error_code) 560{ 561 562 if (stream->stream_flags & STREAM_RST_RECVD) 563 { 564 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 565 return 0; 566 } 567 568 SM_HISTORY_APPEND(stream, SHE_RST_IN); 569 /* This flag must always be set, even if we are "ignoring" it: it is 570 * used by elision code. 571 */ 572 stream->stream_flags |= STREAM_RST_RECVD; 573 574 if ((stream->stream_flags & STREAM_FIN_RECVD) && 575 /* Pushed streams have fake STREAM_FIN_RECVD set, thus 576 * we need a special check: 577 */ 578 !lsquic_stream_is_pushed(stream)) 579 { 580 LSQ_DEBUG("ignore RST_STREAM frame after FIN is received"); 581 return 0; 582 } 583 584 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 585 { 586 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 587 "smaller than that of byte following the last byte we have seen: " 588 "0x%"PRIX64, stream->id, offset, 589 lsquic_sfcw_get_max_recv_off(&stream->fc)); 590 return -1; 591 } 592 593 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 594 { 595 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 596 " violates flow control", stream->id, offset); 597 return -1; 598 } 599 600 /* Let user collect error: */ 601 maybe_conn_to_tickable_if_readable(stream); 602 603 lsquic_sfcw_consume_rem(&stream->fc); 604 drop_frames_in(stream); 605 drop_buffered_data(stream); 606 maybe_elide_stream_frames(stream); 607 608 if (!(stream->stream_flags & 609 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 610 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 611 612 stream->stream_flags |= STREAM_RST_RECVD; 613 614 maybe_finish_stream(stream); 615 maybe_schedule_call_on_close(stream); 616 617 return 0; 618} 619 620 621uint64_t 622lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 623{ 624 assert(stream->stream_flags & STREAM_SEND_WUF); 625 stream->stream_flags &= ~STREAM_SEND_WUF; 626 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 627 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 628 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 629} 630 631 632void 633lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 634{ 635 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 636 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 637 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 638 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 639 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 640} 641 642 643void 644lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 645{ 646 assert(stream->stream_flags & STREAM_SEND_RST); 647 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 648 stream->stream_flags &= ~STREAM_SEND_RST; 649 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 650 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 651 stream->stream_flags |= STREAM_RST_SENT; 652 maybe_finish_stream(stream); 653} 654 655 656static size_t 657read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 658{ 659 struct uncompressed_headers *uh = stream->uh; 660 size_t n_avail = uh->uh_size - uh->uh_off; 661 if (n_avail < len) 662 len = n_avail; 663 memcpy(dst, uh->uh_headers + uh->uh_off, len); 664 uh->uh_off += len; 665 if (uh->uh_off == uh->uh_size) 666 { 667 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 668 free(uh); 669 stream->uh = NULL; 670 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 671 { 672 stream->stream_flags |= STREAM_FIN_REACHED; 673 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 674 } 675 } 676 return len; 677} 678 679 680/* This function returns 0 when EOF is reached. 681 */ 682ssize_t 683lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 684 int iovcnt) 685{ 686 size_t total_nread, nread; 687 int processed_frames, read_unc_headers, iovidx; 688 unsigned char *p, *end; 689 690 SM_HISTORY_APPEND(stream, SHE_USER_READ); 691 692#define NEXT_IOV() do { \ 693 ++iovidx; \ 694 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 695 ++iovidx; \ 696 if (iovidx < iovcnt) \ 697 { \ 698 p = iov[iovidx].iov_base; \ 699 end = p + iov[iovidx].iov_len; \ 700 } \ 701 else \ 702 p = end = NULL; \ 703} while (0) 704 705#define AVAIL() (end - p) 706 707 if (stream->stream_flags & STREAM_RST_FLAGS) 708 { 709 errno = ECONNRESET; 710 return -1; 711 } 712 if (stream->stream_flags & STREAM_U_READ_DONE) 713 { 714 errno = EBADF; 715 return -1; 716 } 717 if (stream->stream_flags & STREAM_FIN_REACHED) 718 return 0; 719 720 total_nread = 0; 721 processed_frames = 0; 722 723 iovidx = -1; 724 NEXT_IOV(); 725 726 if (stream->uh && AVAIL()) 727 { 728 read_unc_headers = 1; 729 do 730 { 731 nread = read_uh(stream, p, AVAIL()); 732 p += nread; 733 total_nread += nread; 734 if (p == end) 735 NEXT_IOV(); 736 } 737 while (stream->uh && AVAIL()); 738 } 739 else 740 read_unc_headers = 0; 741 742 struct data_frame *data_frame; 743 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 744 { 745 ++processed_frames; 746 size_t navail = data_frame->df_size - data_frame->df_read_off; 747 size_t ntowrite = AVAIL(); 748 if (navail < ntowrite) 749 ntowrite = navail; 750 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 751 p += ntowrite; 752 data_frame->df_read_off += ntowrite; 753 stream->read_offset += ntowrite; 754 total_nread += ntowrite; 755 if (data_frame->df_read_off == data_frame->df_size) 756 { 757 const int fin = data_frame->df_fin; 758 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 759 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 760 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 761 { 762 stream->data_in = stream->data_in->di_if->di_switch_impl( 763 stream->data_in, stream->read_offset); 764 if (!stream->data_in) 765 { 766 stream->data_in = data_in_error_new(); 767 return -1; 768 } 769 } 770 if (fin) 771 { 772 stream->stream_flags |= STREAM_FIN_REACHED; 773 break; 774 } 775 } 776 if (p == end) 777 NEXT_IOV(); 778 } 779 780 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 781 total_nread, stream->read_offset); 782 783 if (processed_frames) 784 { 785 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 786 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 787 { 788 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 789 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 790 stream->stream_flags |= STREAM_SEND_WUF; 791 maybe_conn_to_tickable_if_writeable(stream); 792 } 793 } 794 795 if (processed_frames || read_unc_headers) 796 { 797 return total_nread; 798 } 799 else 800 { 801 assert(0 == total_nread); 802 errno = EWOULDBLOCK; 803 return -1; 804 } 805} 806 807 808ssize_t 809lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 810{ 811 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 812 return lsquic_stream_readv(stream, &iov, 1); 813} 814 815 816static void 817stream_shutdown_read (lsquic_stream_t *stream) 818{ 819 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 820 { 821 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 822 stream->stream_flags |= STREAM_U_READ_DONE; 823 stream_wantread(stream, 0); 824 maybe_finish_stream(stream); 825 } 826} 827 828 829static void 830stream_shutdown_write (lsquic_stream_t *stream) 831{ 832 if (stream->stream_flags & STREAM_U_WRITE_DONE) 833 return; 834 835 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 836 stream->stream_flags |= STREAM_U_WRITE_DONE; 837 stream_wantwrite(stream, 0); 838 839 /* Don't bother to check whether there is anything else to write if 840 * the flags indicate that nothing else should be written. 841 */ 842 if (!(stream->stream_flags & 843 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 844 { 845 if (stream->sm_n_buffered == 0) 846 { 847 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 848 stream)) 849 { 850 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 851 stream->stream_flags |= STREAM_FIN_SENT; 852 } 853 else 854 { 855 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 856 "flag in it"); 857 (void) stream_flush_nocheck(stream); 858 } 859 } 860 else 861 (void) stream_flush_nocheck(stream); 862 } 863} 864 865 866int 867lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 868{ 869 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 870 if (lsquic_stream_is_closed(stream)) 871 { 872 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 873 errno = EBADF; 874 return -1; 875 } 876 /* 0: read, 1: write: 2: read and write 877 */ 878 if (how < 0 || how > 2) 879 { 880 errno = EINVAL; 881 return -1; 882 } 883 884 if (how) 885 stream_shutdown_write(stream); 886 if (how != 1) 887 stream_shutdown_read(stream); 888 889 maybe_finish_stream(stream); 890 maybe_schedule_call_on_close(stream); 891 if (how) 892 maybe_conn_to_tickable_if_writeable(stream); 893 894 return 0; 895} 896 897 898void 899lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 900{ 901 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 902 if (LSQUIC_STREAM_HANDSHAKE == stream->id 903 || ((stream->stream_flags & STREAM_USE_HEADERS) && 904 LSQUIC_STREAM_HEADERS == stream->id)) 905 { 906 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 907 stream->stream_flags |= STREAM_FORCE_FINISH; 908 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 909 } 910 maybe_finish_stream(stream); 911 maybe_schedule_call_on_close(stream); 912} 913 914 915static void 916fake_reset_unused_stream (lsquic_stream_t *stream) 917{ 918 stream->stream_flags |= 919 STREAM_RST_RECVD /* User will pick this up on read or write */ 920 | STREAM_RST_SENT /* Don't send anything else on this stream */ 921 ; 922 923 /* Cancel all writes to the network scheduled for this stream: */ 924 if (stream->stream_flags & STREAM_SENDING_FLAGS) 925 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 926 next_send_stream); 927 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 928 929 LSQ_DEBUG("fake-reset stream %u%s", 930 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 931 maybe_finish_stream(stream); 932 maybe_schedule_call_on_close(stream); 933} 934 935 936/* This function should only be called for locally-initiated streams whose ID 937 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 938 * frame sent by peer but we have not yet received it and created a stream. 939 * In this situation, we mark the stream as reset, so that user's on_read or 940 * on_write event callback picks up the error. That, in turn, should result 941 * in stream being closed. 942 * 943 * If we have received any data frames on this stream, this probably indicates 944 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 945 * lower than this. However, we still try to handle it gracefully and peform 946 * a shutdown, as if the stream was not reset. 947 */ 948void 949lsquic_stream_received_goaway (lsquic_stream_t *stream) 950{ 951 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 952 if (0 == stream->read_offset && 953 stream->data_in->di_if->di_empty(stream->data_in)) 954 fake_reset_unused_stream(stream); /* Normal condition */ 955 else 956 { /* This is odd, let's handle it the best we can: */ 957 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 958 lsquic_stream_shutdown_internal(stream); 959 } 960} 961 962 963uint64_t 964lsquic_stream_read_offset (const lsquic_stream_t *stream) 965{ 966 return stream->read_offset; 967} 968 969 970static int 971stream_wantread (lsquic_stream_t *stream, int is_want) 972{ 973 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 974 const int new_val = !!is_want; 975 if (old_val != new_val) 976 { 977 if (new_val) 978 { 979 if (!old_val) 980 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 981 next_read_stream); 982 stream->stream_flags |= STREAM_WANT_READ; 983 } 984 else 985 { 986 stream->stream_flags &= ~STREAM_WANT_READ; 987 if (old_val) 988 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 989 next_read_stream); 990 } 991 } 992 return old_val; 993} 994 995 996static void 997maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 998{ 999 assert(STREAM_WRITE_Q_FLAGS & flag); 1000 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1001 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1002 next_write_stream); 1003 stream->stream_flags |= flag; 1004} 1005 1006 1007static void 1008maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1009{ 1010 assert(STREAM_WRITE_Q_FLAGS & flag); 1011 if (stream->stream_flags & flag) 1012 { 1013 stream->stream_flags &= ~flag; 1014 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1015 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1016 next_write_stream); 1017 } 1018} 1019 1020 1021static int 1022stream_wantwrite (lsquic_stream_t *stream, int is_want) 1023{ 1024 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1025 const int new_val = !!is_want; 1026 if (old_val != new_val) 1027 { 1028 if (new_val) 1029 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1030 else 1031 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1032 } 1033 return old_val; 1034} 1035 1036 1037int 1038lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1039{ 1040 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1041 { 1042 if (is_want) 1043 maybe_conn_to_tickable_if_readable(stream); 1044 return stream_wantread(stream, is_want); 1045 } 1046 else 1047 { 1048 errno = EBADF; 1049 return -1; 1050 } 1051} 1052 1053 1054int 1055lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1056{ 1057 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1058 return stream_wantwrite(stream, is_want); 1059 else 1060 { 1061 errno = EBADF; 1062 return -1; 1063 } 1064} 1065 1066 1067#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1068 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1069 1070 1071static void 1072stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1073{ 1074 unsigned no_progress_count, no_progress_limit; 1075 enum stream_flags flags; 1076 uint64_t size; 1077 1078 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1079 1080 no_progress_count = 0; 1081 while ((stream->stream_flags & STREAM_WANT_READ) 1082 && lsquic_stream_readable(stream)) 1083 { 1084 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1085 size = stream->read_offset; 1086 1087 stream->stream_if->on_read(stream, stream->st_ctx); 1088 1089 if (no_progress_limit && size == stream->read_offset && 1090 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1091 { 1092 ++no_progress_count; 1093 if (no_progress_count >= no_progress_limit) 1094 { 1095 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1096 "progress) in user code reading from stream", 1097 no_progress_count, 1098 no_progress_count == 1 ? "" : "s"); 1099 break; 1100 } 1101 } 1102 else 1103 no_progress_count = 0; 1104 } 1105} 1106 1107 1108static void 1109stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1110{ 1111 unsigned no_progress_count, no_progress_limit; 1112 enum stream_flags flags; 1113 1114 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1115 1116 no_progress_count = 0; 1117 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1118 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1119 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1120 && lsquic_stream_write_avail(stream)) 1121 { 1122 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1123 1124 stream->stream_if->on_write(stream, stream->st_ctx); 1125 1126 if (no_progress_limit && 1127 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1128 { 1129 ++no_progress_count; 1130 if (no_progress_count >= no_progress_limit) 1131 { 1132 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1133 "progress) in user code writing to stream", 1134 no_progress_count, 1135 no_progress_count == 1 ? "" : "s"); 1136 break; 1137 } 1138 } 1139 else 1140 no_progress_count = 0; 1141 } 1142} 1143 1144 1145static void 1146stream_dispatch_read_events_once (lsquic_stream_t *stream) 1147{ 1148 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1149 { 1150 stream->stream_if->on_read(stream, stream->st_ctx); 1151 } 1152} 1153 1154 1155static void 1156maybe_mark_as_blocked (lsquic_stream_t *stream) 1157{ 1158 struct lsquic_conn_cap *cc; 1159 1160 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1161 { 1162 if (stream->blocked_off < stream->max_send_off) 1163 { 1164 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1165 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1166 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1167 next_send_stream); 1168 stream->stream_flags |= STREAM_SEND_BLOCKED; 1169 LSQ_DEBUG("marked stream-blocked at stream offset " 1170 "%"PRIu64, stream->blocked_off); 1171 } 1172 else 1173 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1174 " has been, or is about to be, sent", stream->blocked_off); 1175 } 1176 1177 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1178 && (cc = &stream->conn_pub->conn_cap, 1179 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1180 { 1181 if (cc->cc_blocked < cc->cc_max) 1182 { 1183 cc->cc_blocked = cc->cc_max; 1184 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1185 LSQ_DEBUG("marked connection-blocked at connection offset " 1186 "%"PRIu64, cc->cc_max); 1187 } 1188 else 1189 LSQ_DEBUG("stream has already been marked connection-blocked " 1190 "at offset %"PRIu64, cc->cc_blocked); 1191 } 1192} 1193 1194 1195void 1196lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1197{ 1198 assert(stream->stream_flags & STREAM_WANT_READ); 1199 1200 if (stream->stream_flags & STREAM_RW_ONCE) 1201 stream_dispatch_read_events_once(stream); 1202 else 1203 stream_dispatch_read_events_loop(stream); 1204} 1205 1206 1207void 1208lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1209{ 1210 int progress; 1211 uint64_t tosend_off; 1212 unsigned short n_buffered; 1213 enum stream_flags flags; 1214 1215 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1216 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1217 tosend_off = stream->tosend_off; 1218 n_buffered = stream->sm_n_buffered; 1219 1220 if (stream->stream_flags & STREAM_WANT_FLUSH) 1221 (void) stream_flush(stream); 1222 1223 if (stream->stream_flags & STREAM_RW_ONCE) 1224 { 1225 if ((stream->stream_flags & STREAM_WANT_WRITE) 1226 && lsquic_stream_write_avail(stream)) 1227 { 1228 stream->stream_if->on_write(stream, stream->st_ctx); 1229 } 1230 } 1231 else 1232 stream_dispatch_write_events_loop(stream); 1233 1234 /* Progress means either flags or offsets changed: */ 1235 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1236 stream->tosend_off == tosend_off && 1237 stream->sm_n_buffered == n_buffered); 1238 1239 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1240 { 1241 if (progress) 1242 { /* Move the stream to the end of the list to ensure fairness. */ 1243 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1244 next_write_stream); 1245 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1246 next_write_stream); 1247 } 1248 } 1249} 1250 1251 1252static size_t 1253inner_reader_empty_size (void *ctx) 1254{ 1255 return 0; 1256} 1257 1258 1259static size_t 1260inner_reader_empty_read (void *ctx, void *buf, size_t count) 1261{ 1262 return 0; 1263} 1264 1265 1266static int 1267stream_flush (lsquic_stream_t *stream) 1268{ 1269 struct lsquic_reader empty_reader; 1270 ssize_t nw; 1271 1272 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1273 assert(stream->sm_n_buffered > 0 || 1274 /* Flushing is also used to packetize standalone FIN: */ 1275 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1276 == STREAM_U_WRITE_DONE)); 1277 1278 empty_reader.lsqr_size = inner_reader_empty_size; 1279 empty_reader.lsqr_read = inner_reader_empty_read; 1280 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1281 nw = stream_write_to_packets(stream, &empty_reader, 0); 1282 1283 if (nw >= 0) 1284 { 1285 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1286 return 0; 1287 } 1288 else 1289 return -1; 1290} 1291 1292 1293static int 1294stream_flush_nocheck (lsquic_stream_t *stream) 1295{ 1296 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1297 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1298 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1299 1300 return stream_flush(stream); 1301} 1302 1303 1304int 1305lsquic_stream_flush (lsquic_stream_t *stream) 1306{ 1307 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1308 { 1309 LSQ_DEBUG("cannot flush closed stream"); 1310 errno = EBADF; 1311 return -1; 1312 } 1313 1314 if (0 == stream->sm_n_buffered) 1315 { 1316 LSQ_DEBUG("flushing 0 bytes: noop"); 1317 return 0; 1318 } 1319 1320 return stream_flush_nocheck(stream); 1321} 1322 1323 1324/* The flush threshold is the maximum size of stream data that can be sent 1325 * in a full packet. 1326 */ 1327static size_t 1328flush_threshold (const lsquic_stream_t *stream) 1329{ 1330 enum packet_out_flags flags; 1331 enum lsquic_packno_bits bits; 1332 unsigned packet_header_sz, stream_header_sz; 1333 size_t threshold; 1334 1335 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1336 flags = bits << POBIT_SHIFT; 1337 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1338 flags |= PO_CONN_ID; 1339 1340 packet_header_sz = lsquic_po_header_length(flags); 1341 stream_header_sz = stream->conn_pub->lconn->cn_pf 1342 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1343 1344 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1345 - packet_header_sz - stream_header_sz; 1346 return threshold; 1347} 1348 1349 1350#define COMMON_WRITE_CHECKS() do { \ 1351 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1352 == STREAM_USE_HEADERS) \ 1353 { \ 1354 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1355 errno = EILSEQ; \ 1356 return -1; \ 1357 } \ 1358 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1359 { \ 1360 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1361 errno = ECONNRESET; \ 1362 return -1; \ 1363 } \ 1364 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1365 { \ 1366 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1367 "writing"); \ 1368 errno = EBADF; \ 1369 return -1; \ 1370 } \ 1371} while (0) 1372 1373 1374struct frame_gen_ctx 1375{ 1376 lsquic_stream_t *fgc_stream; 1377 struct lsquic_reader *fgc_reader; 1378 /* We keep our own count of how many bytes were read from reader because 1379 * some readers are external. The external caller does not have to rely 1380 * on our count, but it can. 1381 */ 1382 size_t fgc_nread_from_reader; 1383}; 1384 1385 1386static size_t 1387frame_gen_size (void *ctx) 1388{ 1389 struct frame_gen_ctx *fg_ctx = ctx; 1390 size_t available, remaining; 1391 1392 /* Make sure we are not writing past available size: */ 1393 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1394 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1395 if (available < remaining) 1396 remaining = available; 1397 1398 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1399} 1400 1401 1402static int 1403frame_gen_fin (void *ctx) 1404{ 1405 struct frame_gen_ctx *fg_ctx = ctx; 1406 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1407 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1408 /* Do not use frame_gen_size() as it may chop the real size: */ 1409 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1410} 1411 1412 1413static void 1414incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1415{ 1416 if (stream->stream_flags & STREAM_CONN_LIMITED) 1417 { 1418 stream->conn_pub->conn_cap.cc_sent += incr; 1419 assert(stream->conn_pub->conn_cap.cc_sent 1420 <= stream->conn_pub->conn_cap.cc_max); 1421 } 1422} 1423 1424 1425static size_t 1426frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1427{ 1428 struct frame_gen_ctx *fg_ctx = ctx; 1429 unsigned char *p = begin_buf; 1430 unsigned char *const end = p + len; 1431 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1432 size_t n_written, available, n_to_write; 1433 1434 if (stream->sm_n_buffered > 0) 1435 { 1436 if (len <= stream->sm_n_buffered) 1437 { 1438 memcpy(p, stream->sm_buf, len); 1439 memmove(stream->sm_buf, stream->sm_buf + len, 1440 stream->sm_n_buffered - len); 1441 stream->sm_n_buffered -= len; 1442 stream->tosend_off += len; 1443 *fin = frame_gen_fin(fg_ctx); 1444 return len; 1445 } 1446 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1447 p += stream->sm_n_buffered; 1448 stream->sm_n_buffered = 0; 1449 } 1450 1451 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1452 n_to_write = end - p; 1453 if (n_to_write > available) 1454 n_to_write = available; 1455 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1456 n_to_write); 1457 p += n_written; 1458 fg_ctx->fgc_nread_from_reader += n_written; 1459 *fin = frame_gen_fin(fg_ctx); 1460 stream->tosend_off += p - (const unsigned char *) begin_buf; 1461 incr_conn_cap(stream, n_written); 1462 return p - (const unsigned char *) begin_buf; 1463} 1464 1465 1466static void 1467check_flush_threshold (lsquic_stream_t *stream) 1468{ 1469 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1470 stream->tosend_off >= stream->sm_flush_to) 1471 { 1472 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1473 stream->sm_flush_to); 1474 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1475 } 1476} 1477 1478 1479static struct lsquic_packet_out * 1480get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1481 const struct lsquic_stream *stream) 1482{ 1483 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1484} 1485 1486 1487static struct lsquic_packet_out * (* const get_packet[])( 1488 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1489{ 1490 lsquic_send_ctl_get_packet_for_stream, 1491 get_brand_new_packet, 1492}; 1493 1494 1495static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1496stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1497{ 1498 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1499 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1500 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1501 unsigned stream_header_sz, need_at_least, off; 1502 lsquic_packet_out_t *packet_out; 1503 int len, s, hsk; 1504 1505 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1506 stream->tosend_off); 1507 need_at_least = stream_header_sz + (size > 0); 1508 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1509 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1510 if (!packet_out) 1511 return SWTP_STOP; 1512 1513 off = packet_out->po_data_sz; 1514 len = pf->pf_gen_stream_frame( 1515 packet_out->po_data + packet_out->po_data_sz, 1516 lsquic_packet_out_avail(packet_out), stream->id, 1517 stream->tosend_off, 1518 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1519 if (len < 0) 1520 { 1521 LSQ_ERROR("could not generate stream frame"); 1522 return SWTP_ERROR; 1523 } 1524 1525 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1526 packet_out->po_data + packet_out->po_data_sz, len); 1527 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1528 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1529 if (0 == lsquic_packet_out_avail(packet_out)) 1530 packet_out->po_flags |= PO_STREAM_END; 1531 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1532 stream, QUIC_FRAME_STREAM, off, len); 1533 if (s != 0) 1534 { 1535 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1536 return SWTP_ERROR; 1537 } 1538 1539 check_flush_threshold(stream); 1540 1541 /* XXX: I don't like it that this is here */ 1542 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1543 { 1544 lsquic_packet_out_zero_pad(packet_out); 1545 packet_out->po_flags |= PO_HELLO; 1546 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1547 } 1548 1549 return SWTP_OK; 1550} 1551 1552 1553static void 1554abort_connection (struct lsquic_stream *stream) 1555{ 1556 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1557 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1558 next_service_stream); 1559 stream->stream_flags |= STREAM_ABORT_CONN; 1560 LSQ_WARN("connection will be aborted"); 1561} 1562 1563 1564static ssize_t 1565stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1566 size_t thresh) 1567{ 1568 size_t size; 1569 ssize_t nw; 1570 struct frame_gen_ctx fg_ctx = { 1571 .fgc_stream = stream, 1572 .fgc_reader = reader, 1573 .fgc_nread_from_reader = 0, 1574 }; 1575 1576 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1577 || frame_gen_fin(&fg_ctx)) 1578 { 1579 switch (stream_write_to_packet(&fg_ctx, size)) 1580 { 1581 case SWTP_OK: 1582 if (frame_gen_fin(&fg_ctx)) 1583 { 1584 stream->stream_flags |= STREAM_FIN_SENT; 1585 goto end; 1586 } 1587 else 1588 break; 1589 case SWTP_STOP: 1590 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1591 goto end; 1592 default: 1593 abort_connection(stream); 1594 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1595 return -1; 1596 } 1597 } 1598 1599 if (thresh) 1600 { 1601 assert(size < thresh); 1602 assert(size >= stream->sm_n_buffered); 1603 size -= stream->sm_n_buffered; 1604 if (size > 0) 1605 { 1606 nw = save_to_buffer(stream, reader, size); 1607 if (nw < 0) 1608 return -1; 1609 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1610 } 1611 } 1612 else 1613 { 1614 /* We count flushed data towards both stream and connection limits, 1615 * so we should have been able to packetize all of it: 1616 */ 1617 assert(0 == stream->sm_n_buffered); 1618 assert(size == 0); 1619 } 1620 1621 maybe_mark_as_blocked(stream); 1622 1623 end: 1624 return fg_ctx.fgc_nread_from_reader; 1625} 1626 1627 1628/* Perform an implicit flush when we hit connection limit while buffering 1629 * data. This is to prevent a (theoretical) stall: 1630 * 1631 * Imagine a number of streams, all of which buffered some data. The buffered 1632 * data is up to connection cap, which means no further writes are possible. 1633 * None of them flushes, which means that data is not sent and connection 1634 * WINDOW_UPDATE frame never arrives from peer. Stall. 1635 */ 1636static void 1637maybe_flush_stream (struct lsquic_stream *stream) 1638{ 1639 if (stream->sm_n_buffered > 0 1640 && (stream->stream_flags & STREAM_CONN_LIMITED) 1641 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1642 stream_flush_nocheck(stream); 1643} 1644 1645 1646static ssize_t 1647save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1648 size_t len) 1649{ 1650 size_t avail, n_written; 1651 1652 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1653 1654 if (!stream->sm_buf) 1655 { 1656 stream->sm_buf = malloc(SM_BUF_SIZE); 1657 if (!stream->sm_buf) 1658 return -1; 1659 } 1660 1661 avail = lsquic_stream_write_avail(stream); 1662 if (avail < len) 1663 len = avail; 1664 1665 n_written = reader->lsqr_read(reader->lsqr_ctx, 1666 stream->sm_buf + stream->sm_n_buffered, len); 1667 stream->sm_n_buffered += n_written; 1668 incr_conn_cap(stream, n_written); 1669 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1670 n_written, stream->sm_n_buffered); 1671 maybe_flush_stream(stream); 1672 return n_written; 1673} 1674 1675 1676static ssize_t 1677stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1678{ 1679 size_t thresh, len; 1680 1681 thresh = flush_threshold(stream); 1682 len = reader->lsqr_size(reader->lsqr_ctx); 1683 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1684 stream->sm_n_buffered + len < thresh) 1685 return save_to_buffer(stream, reader, len); 1686 else 1687 return stream_write_to_packets(stream, reader, thresh); 1688} 1689 1690 1691ssize_t 1692lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1693{ 1694 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1695 return lsquic_stream_writev(stream, &iov, 1); 1696} 1697 1698 1699struct inner_reader_iovec { 1700 const struct iovec *iov; 1701 const struct iovec *end; 1702 unsigned cur_iovec_off; 1703}; 1704 1705 1706static size_t 1707inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1708{ 1709 struct inner_reader_iovec *const iro = ctx; 1710 unsigned char *p = buf; 1711 unsigned char *const end = p + count; 1712 unsigned n_tocopy; 1713 1714 while (iro->iov < iro->end && p < end) 1715 { 1716 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1717 if (n_tocopy > (unsigned) (end - p)) 1718 n_tocopy = end - p; 1719 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1720 n_tocopy); 1721 p += n_tocopy; 1722 iro->cur_iovec_off += n_tocopy; 1723 if (iro->iov->iov_len == iro->cur_iovec_off) 1724 { 1725 ++iro->iov; 1726 iro->cur_iovec_off = 0; 1727 } 1728 } 1729 1730 return p + count - end; 1731} 1732 1733 1734static size_t 1735inner_reader_iovec_size (void *ctx) 1736{ 1737 struct inner_reader_iovec *const iro = ctx; 1738 const struct iovec *iov; 1739 size_t size; 1740 1741 size = 0; 1742 for (iov = iro->iov; iov < iro->end; ++iov) 1743 size += iov->iov_len; 1744 1745 return size - iro->cur_iovec_off; 1746} 1747 1748 1749ssize_t 1750lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1751 int iovcnt) 1752{ 1753 COMMON_WRITE_CHECKS(); 1754 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1755 1756 struct inner_reader_iovec iro = { 1757 .iov = iov, 1758 .end = iov + iovcnt, 1759 .cur_iovec_off = 0, 1760 }; 1761 struct lsquic_reader reader = { 1762 .lsqr_read = inner_reader_iovec_read, 1763 .lsqr_size = inner_reader_iovec_size, 1764 .lsqr_ctx = &iro, 1765 }; 1766 1767 return stream_write(stream, &reader); 1768} 1769 1770 1771ssize_t 1772lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1773{ 1774 COMMON_WRITE_CHECKS(); 1775 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1776 return stream_write(stream, reader); 1777} 1778 1779 1780int 1781lsquic_stream_send_headers (lsquic_stream_t *stream, 1782 const lsquic_http_headers_t *headers, int eos) 1783{ 1784 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1785 STREAM_U_WRITE_DONE)) 1786 == STREAM_USE_HEADERS) 1787 { 1788 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1789 stream->id, headers, eos, lsquic_stream_priority(stream)); 1790 if (0 == s) 1791 { 1792 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1793 stream->stream_flags |= STREAM_HEADERS_SENT; 1794 if (eos) 1795 stream->stream_flags |= STREAM_FIN_SENT; 1796 LSQ_INFO("sent headers for stream %u", stream->id); 1797 } 1798 else 1799 LSQ_WARN("could not send headers: %s", strerror(errno)); 1800 return s; 1801 } 1802 else 1803 { 1804 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1805 errno = EBADMSG; 1806 return -1; 1807 } 1808} 1809 1810 1811void 1812lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1813{ 1814 if (offset > stream->max_send_off) 1815 { 1816 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1817 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1818 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1819 stream->max_send_off = offset; 1820 } 1821 else 1822 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1823 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1824 stream->max_send_off); 1825} 1826 1827 1828/* This function is used to update offsets after handshake completes and we 1829 * learn of peer's limits from the handshake values. 1830 */ 1831int 1832lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1833{ 1834 LSQ_DEBUG("setting max_send_off to %u", offset); 1835 if (offset > stream->max_send_off) 1836 { 1837 lsquic_stream_window_update(stream, offset); 1838 return 0; 1839 } 1840 else if (offset < stream->tosend_off) 1841 { 1842 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1843 "already sent on this stream (%"PRIu64" bytes)", offset, 1844 stream->tosend_off); 1845 return -1; 1846 } 1847 else 1848 { 1849 stream->max_send_off = offset; 1850 return 0; 1851 } 1852} 1853 1854 1855void 1856lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1857{ 1858 lsquic_stream_reset_ext(stream, error_code, 1); 1859} 1860 1861 1862void 1863lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1864 int do_close) 1865{ 1866 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1867 { 1868 LSQ_INFO("reset already sent"); 1869 return; 1870 } 1871 1872 SM_HISTORY_APPEND(stream, SHE_RESET); 1873 1874 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1875 stream->error_code = error_code; 1876 1877 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1878 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1879 next_send_stream); 1880 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1881 stream->stream_flags |= STREAM_SEND_RST; 1882 1883 drop_buffered_data(stream); 1884 maybe_elide_stream_frames(stream); 1885 maybe_schedule_call_on_close(stream); 1886 1887 if (do_close) 1888 lsquic_stream_close(stream); 1889 else 1890 maybe_conn_to_tickable_if_writeable(stream); 1891} 1892 1893 1894unsigned 1895lsquic_stream_id (const lsquic_stream_t *stream) 1896{ 1897 return stream->id; 1898} 1899 1900 1901struct lsquic_conn * 1902lsquic_stream_conn (const lsquic_stream_t *stream) 1903{ 1904 return stream->conn_pub->lconn; 1905} 1906 1907 1908int 1909lsquic_stream_close (lsquic_stream_t *stream) 1910{ 1911 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1912 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1913 if (lsquic_stream_is_closed(stream)) 1914 { 1915 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1916 errno = EBADF; 1917 return -1; 1918 } 1919 stream_shutdown_write(stream); 1920 stream_shutdown_read(stream); 1921 maybe_schedule_call_on_close(stream); 1922 maybe_finish_stream(stream); 1923 maybe_conn_to_tickable_if_writeable(stream); 1924 return 0; 1925} 1926 1927 1928#ifndef NDEBUG 1929#if __GNUC__ 1930__attribute__((weak)) 1931#endif 1932#endif 1933void 1934lsquic_stream_acked (lsquic_stream_t *stream) 1935{ 1936 assert(stream->n_unacked); 1937 --stream->n_unacked; 1938 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 1939 if (0 == stream->n_unacked) 1940 maybe_finish_stream(stream); 1941} 1942 1943 1944void 1945lsquic_stream_push_req (lsquic_stream_t *stream, 1946 struct uncompressed_headers *push_req) 1947{ 1948 assert(!stream->push_req); 1949 stream->push_req = push_req; 1950 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 1951} 1952 1953 1954int 1955lsquic_stream_is_pushed (const lsquic_stream_t *stream) 1956{ 1957 return 1 & ~stream->id; 1958} 1959 1960 1961int 1962lsquic_stream_push_info (const lsquic_stream_t *stream, 1963 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 1964{ 1965 if (lsquic_stream_is_pushed(stream)) 1966 { 1967 assert(stream->push_req); 1968 *ref_stream_id = stream->push_req->uh_stream_id; 1969 *headers = stream->push_req->uh_headers; 1970 *headers_sz = stream->push_req->uh_size; 1971 return 0; 1972 } 1973 else 1974 return -1; 1975} 1976 1977 1978int 1979lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 1980{ 1981 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 1982 { 1983 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 1984 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 1985 stream->stream_flags |= STREAM_HAVE_UH; 1986 if (uh->uh_flags & UH_FIN) 1987 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 1988 stream->uh = uh; 1989 if (uh->uh_oth_stream_id == 0) 1990 { 1991 if (uh->uh_weight) 1992 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 1993 } 1994 else 1995 LSQ_NOTICE("don't know how to depend on stream %u", 1996 uh->uh_oth_stream_id); 1997 return 0; 1998 } 1999 else 2000 { 2001 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2002 return -1; 2003 } 2004} 2005 2006 2007unsigned 2008lsquic_stream_priority (const lsquic_stream_t *stream) 2009{ 2010 return 256 - stream->sm_priority; 2011} 2012 2013 2014int 2015lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2016{ 2017 /* The user should never get a reference to the special streams, 2018 * but let's check just in case: 2019 */ 2020 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2021 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2022 LSQUIC_STREAM_HEADERS == stream->id)) 2023 return -1; 2024 if (priority < 1 || priority > 256) 2025 return -1; 2026 stream->sm_priority = 256 - priority; 2027 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2028 LSQ_DEBUG("set priority to %u", priority); 2029 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2030 return 0; 2031} 2032 2033 2034int 2035lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2036{ 2037 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2038 { 2039 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2040 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2041 { 2042 /* We need to send headers only if we are a) using HEADERS stream 2043 * and b) we already sent initial headers. If initial headers 2044 * have not been sent yet, stream priority will be sent in the 2045 * HEADERS frame. 2046 */ 2047 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2048 stream->id, 0, 0, priority); 2049 } 2050 else 2051 return 0; 2052 } 2053 else 2054 return -1; 2055} 2056 2057 2058lsquic_stream_ctx_t * 2059lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2060{ 2061 return stream->st_ctx; 2062} 2063 2064 2065int 2066lsquic_stream_refuse_push (lsquic_stream_t *stream) 2067{ 2068 if (lsquic_stream_is_pushed(stream) && 2069 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2070 { 2071 LSQ_DEBUG("refusing pushed stream: send reset"); 2072 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2073 return 0; 2074 } 2075 else 2076 return -1; 2077} 2078 2079 2080size_t 2081lsquic_stream_mem_used (const struct lsquic_stream *stream) 2082{ 2083 size_t size; 2084 2085 size = sizeof(stream); 2086 if (stream->sm_buf) 2087 size += SM_BUF_SIZE; 2088 if (stream->data_in) 2089 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2090 2091 return size; 2092} 2093 2094 2095lsquic_cid_t 2096lsquic_stream_cid (const struct lsquic_stream *stream) 2097{ 2098 return LSQUIC_LOG_CONN_ID; 2099} 2100