lsquic_stream.c revision bfc7bfd8
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_frame_reader.h" 44#include "lsquic_conn.h" 45#include "lsquic_data_in_if.h" 46#include "lsquic_parse.h" 47#include "lsquic_packet_out.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_senhist.h" 50#include "lsquic_pacer.h" 51#include "lsquic_cubic.h" 52#include "lsquic_send_ctl.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static int 75stream_readable (const lsquic_stream_t *stream); 76 77static ssize_t 78stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 79 80static ssize_t 81save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 82 83static int 84stream_flush (lsquic_stream_t *stream); 85 86static int 87stream_flush_nocheck (lsquic_stream_t *stream); 88 89static void 90maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 91 92 93#if LSQUIC_KEEP_STREAM_HISTORY 94/* These values are printable ASCII characters for ease of printing the 95 * whole history in a single line of a log message. 96 * 97 * The list of events is not exhaustive: only most interesting events 98 * are recorded. 99 */ 100enum stream_history_event 101{ 102 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 103 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 104 SHE_REACH_FIN = 'a', 105 SHE_BLOCKED_OUT = 'b', 106 SHE_CREATED = 'C', 107 SHE_FRAME_IN = 'd', 108 SHE_FRAME_OUT = 'D', 109 SHE_RESET = 'e', 110 SHE_WINDOW_UPDATE = 'E', 111 SHE_FIN_IN = 'f', 112 SHE_FINISHED = 'F', 113 SHE_GOAWAY_IN = 'g', 114 SHE_USER_WRITE_HEADER = 'h', 115 SHE_HEADERS_IN = 'H', 116 SHE_ONCLOSE_SCHED = 'l', 117 SHE_ONCLOSE_CALL = 'L', 118 SHE_ONNEW = 'N', 119 SHE_SET_PRIO = 'p', 120 SHE_USER_READ = 'r', 121 SHE_SHUTDOWN_READ = 'R', 122 SHE_RST_IN = 's', 123 SHE_RST_OUT = 't', 124 SHE_FLUSH = 'u', 125 SHE_USER_WRITE_DATA = 'w', 126 SHE_SHUTDOWN_WRITE = 'W', 127 SHE_CLOSE = 'X', 128 SHE_FORCE_FINISH = 'Z', 129}; 130 131static void 132sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 133{ 134 enum stream_history_event prev_event; 135 sm_hist_idx_t idx; 136 int plus; 137 138 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 139 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 140 idx = (idx - plus) & SM_HIST_IDX_MASK; 141 prev_event = stream->sm_hist_buf[idx]; 142 143 if (prev_event == sh_event && plus) 144 return; 145 146 if (prev_event == sh_event) 147 sh_event = SHE_PLUS; 148 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 149 150 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 151 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 152 stream->sm_hist_buf); 153} 154 155# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 156# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 157 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 158 LSQ_DEBUG("history: [%.*s]", \ 159 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 160 (stream)->sm_hist_buf); \ 161 } while (0) 162#else 163# define SM_HISTORY_APPEND(stream, event) 164# define SM_HISTORY_DUMP_REMAINING(stream) 165#endif 166 167 168static int 169stream_inside_callback (const lsquic_stream_t *stream) 170{ 171 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 172} 173 174 175/* Here, "readable" means that the user is able to read from the stream. */ 176static void 177maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream, 178 enum rw_reason reason) 179{ 180 if (!stream_inside_callback(stream) && stream_readable(stream)) 181 { 182 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 183 stream->conn_pub->lconn, reason); 184 } 185} 186 187 188/* Here, "writeable" means that data can be put into packets to be 189 * scheduled to be sent out. 190 */ 191static void 192maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream, 193 enum rw_reason reason) 194{ 195 if (!stream_inside_callback(stream) && 196 lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) && 197 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 198 { 199 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 200 stream->conn_pub->lconn, reason); 201 } 202} 203 204 205static int 206stream_stalled (const lsquic_stream_t *stream) 207{ 208 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 209 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 210 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 211} 212 213 214/* TODO: The logic to figure out whether the stream is connection limited 215 * should be taken out of the constructor. The caller should specify this 216 * via one of enum stream_ctor_flags. 217 */ 218lsquic_stream_t * 219lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 220 const struct lsquic_stream_if *stream_if, 221 void *stream_if_ctx, unsigned initial_window, 222 unsigned initial_send_off, 223 enum stream_ctor_flags ctor_flags) 224{ 225 lsquic_cfcw_t *cfcw; 226 lsquic_stream_t *stream; 227 228 stream = calloc(1, sizeof(*stream)); 229 if (!stream) 230 return NULL; 231 232 stream->stream_if = stream_if; 233 stream->id = id; 234 stream->conn_pub = conn_pub; 235 stream->sm_onnew_arg = stream_if_ctx; 236 if (!initial_window) 237 initial_window = 16 * 1024; 238 if (LSQUIC_STREAM_HANDSHAKE == id || 239 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 240 cfcw = NULL; 241 else 242 { 243 cfcw = &conn_pub->cfcw; 244 stream->stream_flags |= STREAM_CONN_LIMITED; 245 if (conn_pub->hs) 246 stream->stream_flags |= STREAM_USE_HEADERS; 247 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 248 } 249 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 250 if (!initial_send_off) 251 initial_send_off = 16 * 1024; 252 stream->max_send_off = initial_send_off; 253 if (ctor_flags & SCF_USE_DI_HASH) 254 stream->data_in = data_in_hash_new(conn_pub, id, 0); 255 else 256 stream->data_in = data_in_nocopy_new(conn_pub, id); 257 LSQ_DEBUG("created stream %u @%p", id, stream); 258 SM_HISTORY_APPEND(stream, SHE_CREATED); 259 if (ctor_flags & SCF_DI_AUTOSWITCH) 260 stream->stream_flags |= STREAM_AUTOSWITCH; 261 if (ctor_flags & SCF_CALL_ON_NEW) 262 lsquic_stream_call_on_new(stream); 263 if (ctor_flags & SCF_DISP_RW_ONCE) 264 stream->stream_flags |= STREAM_RW_ONCE; 265 return stream; 266} 267 268 269void 270lsquic_stream_call_on_new (lsquic_stream_t *stream) 271{ 272 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 273 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 274 { 275 LSQ_DEBUG("calling on_new_stream"); 276 SM_HISTORY_APPEND(stream, SHE_ONNEW); 277 stream->stream_flags |= STREAM_ONNEW_DONE; 278 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 279 stream); 280 } 281} 282 283 284static void 285decr_conn_cap (struct lsquic_stream *stream, size_t incr) 286{ 287 if (stream->stream_flags & STREAM_CONN_LIMITED) 288 { 289 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 290 stream->conn_pub->conn_cap.cc_sent -= incr; 291 } 292} 293 294 295static void 296drop_buffered_data (struct lsquic_stream *stream) 297{ 298 decr_conn_cap(stream, stream->sm_n_buffered); 299 stream->sm_n_buffered = 0; 300 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 301 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 302} 303 304 305void 306lsquic_stream_destroy (lsquic_stream_t *stream) 307{ 308 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 309 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 310 STREAM_ONNEW_DONE) 311 { 312 stream->stream_flags |= STREAM_ONCLOSE_DONE; 313 stream->stream_if->on_close(stream, stream->st_ctx); 314 } 315 if (stream->stream_flags & STREAM_SENDING_FLAGS) 316 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 317 if (stream->stream_flags & STREAM_WANT_READ) 318 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 319 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 320 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 321 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 322 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 323 drop_buffered_data(stream); 324 lsquic_sfcw_consume_rem(&stream->fc); 325 drop_frames_in(stream); 326 free(stream->push_req); 327 free(stream->uh); 328 free(stream->sm_buf); 329 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 330 SM_HISTORY_DUMP_REMAINING(stream); 331 free(stream); 332} 333 334 335static int 336stream_is_finished (const lsquic_stream_t *stream) 337{ 338 return lsquic_stream_is_closed(stream) 339 /* n_unacked checks that no outgoing packets that reference this 340 * stream are outstanding: 341 */ 342 && 0 == stream->n_unacked 343 /* This checks that no packets that reference this stream will 344 * become outstanding: 345 */ 346 && 0 == (stream->stream_flags & STREAM_SEND_RST) 347 && ((stream->stream_flags & STREAM_FORCE_FINISH) 348 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 349 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 350} 351 352 353static void 354maybe_finish_stream (lsquic_stream_t *stream) 355{ 356 if (0 == (stream->stream_flags & STREAM_FINISHED) && 357 stream_is_finished(stream)) 358 { 359 LSQ_DEBUG("stream %u is now finished", stream->id); 360 SM_HISTORY_APPEND(stream, SHE_FINISHED); 361 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 362 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 363 next_service_stream); 364 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 365 } 366} 367 368 369static void 370maybe_schedule_call_on_close (lsquic_stream_t *stream) 371{ 372 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 373 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 374 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 375 { 376 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 377 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 378 next_service_stream); 379 stream->stream_flags |= STREAM_CALL_ONCLOSE; 380 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 381 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 382 } 383} 384 385 386void 387lsquic_stream_call_on_close (lsquic_stream_t *stream) 388{ 389 assert(stream->stream_flags & STREAM_ONNEW_DONE); 390 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 391 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 392 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 393 next_service_stream); 394 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 395 { 396 LSQ_DEBUG("calling on_close for stream %u", stream->id); 397 stream->stream_flags |= STREAM_ONCLOSE_DONE; 398 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 399 stream->stream_if->on_close(stream, stream->st_ctx); 400 } 401 else 402 assert(0); 403} 404 405 406static int 407stream_readable (const lsquic_stream_t *stream) 408{ 409 /* A stream is readable if one of the following is true: */ 410 return 411 /* - It is already finished: in that case, lsquic_stream_read() will 412 * return 0. 413 */ 414 (stream->stream_flags & STREAM_FIN_REACHED) 415 /* - The stream is reset, by either side. In this case, 416 * lsquic_stream_read() will return -1 (we want the user to be 417 * able to collect the error). 418 */ 419 || (stream->stream_flags & STREAM_RST_FLAGS) 420 /* - Either we are not in HTTP mode or the HTTP headers have been 421 * received and the headers or data from the stream can be read. 422 */ 423 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 424 == STREAM_USE_HEADERS) 425 && (stream->uh != NULL 426 || stream->data_in->di_if->di_get_frame(stream->data_in, 427 stream->read_offset))) 428 ; 429} 430 431 432static size_t 433stream_write_avail (const struct lsquic_stream *stream) 434{ 435 uint64_t stream_avail, conn_avail; 436 437 stream_avail = stream->max_send_off - stream->tosend_off 438 - stream->sm_n_buffered; 439 if (stream->stream_flags & STREAM_CONN_LIMITED) 440 { 441 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 442 if (conn_avail < stream_avail) 443 return conn_avail; 444 } 445 446 return stream_avail; 447} 448 449 450int 451lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 452{ 453 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 454 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 455 { 456 return -1; 457 } 458 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 459 { 460 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 461 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 462 next_send_stream); 463 stream->stream_flags |= STREAM_SEND_WUF; 464 } 465 return 0; 466} 467 468 469int 470lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 471{ 472 uint64_t max_off; 473 int got_next_offset; 474 enum ins_frame ins_frame; 475 476 assert(frame->packet_in); 477 478 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 479 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 480 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 481 482 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 483 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 484 { 485 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 486 lsquic_malo_put(frame); 487 return -1; 488 } 489 490 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 491 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 492 if (INS_FRAME_OK == ins_frame) 493 { 494 /* Update maximum offset in the flow controller and check for flow 495 * control violation: 496 */ 497 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 498 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 499 return -1; 500 if (frame->data_frame.df_fin) 501 { 502 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 503 stream->stream_flags |= STREAM_FIN_RECVD; 504 maybe_finish_stream(stream); 505 } 506 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 507 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 508 { 509 stream->data_in = stream->data_in->di_if->di_switch_impl( 510 stream->data_in, stream->read_offset); 511 if (!stream->data_in) 512 { 513 stream->data_in = data_in_error_new(); 514 return -1; 515 } 516 } 517 if (got_next_offset) 518 /* Checking the offset saves di_get_frame() call */ 519 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN); 520 return 0; 521 } 522 else if (INS_FRAME_DUP == ins_frame) 523 { 524 return 0; 525 } 526 else 527 { 528 assert(INS_FRAME_ERR == ins_frame); 529 return -1; 530 } 531} 532 533 534static void 535drop_frames_in (lsquic_stream_t *stream) 536{ 537 if (stream->data_in) 538 { 539 stream->data_in->di_if->di_destroy(stream->data_in); 540 /* To avoid checking whether `data_in` is set, just set to the error 541 * data-in stream. It does the right thing after incoming data is 542 * dropped. 543 */ 544 stream->data_in = data_in_error_new(); 545 } 546} 547 548 549static void 550maybe_elide_stream_frames (struct lsquic_stream *stream) 551{ 552 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 553 { 554 if (stream->n_unacked) 555 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 556 stream->id); 557 stream->stream_flags |= STREAM_FRAMES_ELIDED; 558 } 559} 560 561 562int 563lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 564 uint32_t error_code) 565{ 566 567 if (stream->stream_flags & STREAM_RST_RECVD) 568 { 569 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 570 return 0; 571 } 572 573 SM_HISTORY_APPEND(stream, SHE_RST_IN); 574 /* This flag must always be set, even if we are "ignoring" it: it is 575 * used by elision code. 576 */ 577 stream->stream_flags |= STREAM_RST_RECVD; 578 579 if ((stream->stream_flags & STREAM_FIN_RECVD) && 580 /* Pushed streams have fake STREAM_FIN_RECVD set, thus 581 * we need a special check: 582 */ 583 !lsquic_stream_is_pushed(stream)) 584 { 585 LSQ_DEBUG("ignore RST_STREAM frame after FIN is received"); 586 return 0; 587 } 588 589 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 590 { 591 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 592 "smaller than that of byte following the last byte we have seen: " 593 "0x%"PRIX64, stream->id, offset, 594 lsquic_sfcw_get_max_recv_off(&stream->fc)); 595 return -1; 596 } 597 598 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 599 { 600 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 601 " violates flow control", stream->id, offset); 602 return -1; 603 } 604 605 /* Let user collect error: */ 606 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN); 607 608 lsquic_sfcw_consume_rem(&stream->fc); 609 drop_frames_in(stream); 610 drop_buffered_data(stream); 611 maybe_elide_stream_frames(stream); 612 613 if (!(stream->stream_flags & 614 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 615 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 616 617 stream->stream_flags |= STREAM_RST_RECVD; 618 619 maybe_finish_stream(stream); 620 maybe_schedule_call_on_close(stream); 621 622 return 0; 623} 624 625 626uint64_t 627lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 628{ 629 assert(stream->stream_flags & STREAM_SEND_WUF); 630 stream->stream_flags &= ~STREAM_SEND_WUF; 631 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 632 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 633 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 634} 635 636 637void 638lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 639{ 640 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 641 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 642 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 643 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 644 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 645} 646 647 648void 649lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 650{ 651 assert(stream->stream_flags & STREAM_SEND_RST); 652 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 653 stream->stream_flags &= ~STREAM_SEND_RST; 654 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 655 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 656 stream->stream_flags |= STREAM_RST_SENT; 657 maybe_finish_stream(stream); 658} 659 660 661static size_t 662read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 663{ 664 struct uncompressed_headers *uh = stream->uh; 665 size_t n_avail = uh->uh_size - uh->uh_off; 666 if (n_avail < len) 667 len = n_avail; 668 memcpy(dst, uh->uh_headers + uh->uh_off, len); 669 uh->uh_off += len; 670 if (uh->uh_off == uh->uh_size) 671 { 672 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 673 free(uh); 674 stream->uh = NULL; 675 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 676 { 677 stream->stream_flags |= STREAM_FIN_REACHED; 678 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 679 } 680 } 681 return len; 682} 683 684 685/* This function returns 0 when EOF is reached. 686 */ 687ssize_t 688lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 689 int iovcnt) 690{ 691 size_t total_nread, nread; 692 int processed_frames, read_unc_headers, iovidx; 693 unsigned char *p, *end; 694 695 SM_HISTORY_APPEND(stream, SHE_USER_READ); 696 697#define NEXT_IOV() do { \ 698 ++iovidx; \ 699 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 700 ++iovidx; \ 701 if (iovidx < iovcnt) \ 702 { \ 703 p = iov[iovidx].iov_base; \ 704 end = p + iov[iovidx].iov_len; \ 705 } \ 706 else \ 707 p = end = NULL; \ 708} while (0) 709 710#define AVAIL() (end - p) 711 712 if (stream->stream_flags & STREAM_RST_FLAGS) 713 { 714 errno = ECONNRESET; 715 return -1; 716 } 717 if (stream->stream_flags & STREAM_U_READ_DONE) 718 { 719 errno = EBADF; 720 return -1; 721 } 722 if (stream->stream_flags & STREAM_FIN_REACHED) 723 return 0; 724 725 total_nread = 0; 726 processed_frames = 0; 727 728 iovidx = -1; 729 NEXT_IOV(); 730 731 if (stream->uh && AVAIL()) 732 { 733 read_unc_headers = 1; 734 do 735 { 736 nread = read_uh(stream, p, AVAIL()); 737 p += nread; 738 total_nread += nread; 739 if (p == end) 740 NEXT_IOV(); 741 } 742 while (stream->uh && AVAIL()); 743 } 744 else 745 read_unc_headers = 0; 746 747 struct data_frame *data_frame; 748 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 749 { 750 ++processed_frames; 751 size_t navail = data_frame->df_size - data_frame->df_read_off; 752 size_t ntowrite = AVAIL(); 753 if (navail < ntowrite) 754 ntowrite = navail; 755 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 756 p += ntowrite; 757 data_frame->df_read_off += ntowrite; 758 stream->read_offset += ntowrite; 759 total_nread += ntowrite; 760 if (data_frame->df_read_off == data_frame->df_size) 761 { 762 const int fin = data_frame->df_fin; 763 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 764 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 765 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 766 { 767 stream->data_in = stream->data_in->di_if->di_switch_impl( 768 stream->data_in, stream->read_offset); 769 if (!stream->data_in) 770 { 771 stream->data_in = data_in_error_new(); 772 return -1; 773 } 774 } 775 if (fin) 776 { 777 stream->stream_flags |= STREAM_FIN_REACHED; 778 break; 779 } 780 } 781 if (p == end) 782 NEXT_IOV(); 783 } 784 785 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 786 total_nread, stream->read_offset); 787 788 if (processed_frames) 789 { 790 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 791 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 792 { 793 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 794 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 795 stream->stream_flags |= STREAM_SEND_WUF; 796 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ); 797 } 798 } 799 800 if (processed_frames || read_unc_headers) 801 { 802 return total_nread; 803 } 804 else 805 { 806 assert(0 == total_nread); 807 errno = EWOULDBLOCK; 808 return -1; 809 } 810} 811 812 813ssize_t 814lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 815{ 816 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 817 return lsquic_stream_readv(stream, &iov, 1); 818} 819 820 821static void 822stream_shutdown_read (lsquic_stream_t *stream) 823{ 824 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 825 { 826 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 827 stream->stream_flags |= STREAM_U_READ_DONE; 828 stream_wantread(stream, 0); 829 maybe_finish_stream(stream); 830 } 831} 832 833 834static void 835stream_shutdown_write (lsquic_stream_t *stream) 836{ 837 if (stream->stream_flags & STREAM_U_WRITE_DONE) 838 return; 839 840 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 841 stream->stream_flags |= STREAM_U_WRITE_DONE; 842 stream_wantwrite(stream, 0); 843 844 /* Don't bother to check whether there is anything else to write if 845 * the flags indicate that nothing else should be written. 846 */ 847 if (!(stream->stream_flags & 848 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 849 { 850 if (stream->sm_n_buffered == 0) 851 { 852 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 853 stream)) 854 { 855 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 856 stream->stream_flags |= STREAM_FIN_SENT; 857 } 858 else 859 { 860 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 861 "flag in it"); 862 (void) stream_flush_nocheck(stream); 863 } 864 } 865 else 866 (void) stream_flush_nocheck(stream); 867 } 868} 869 870 871int 872lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 873{ 874 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 875 if (lsquic_stream_is_closed(stream)) 876 { 877 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 878 errno = EBADF; 879 return -1; 880 } 881 /* 0: read, 1: write: 2: read and write 882 */ 883 if (how < 0 || how > 2) 884 { 885 errno = EINVAL; 886 return -1; 887 } 888 889 if (how) 890 stream_shutdown_write(stream); 891 if (how != 1) 892 stream_shutdown_read(stream); 893 894 maybe_finish_stream(stream); 895 maybe_schedule_call_on_close(stream); 896 if (how) 897 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN); 898 899 return 0; 900} 901 902 903void 904lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 905{ 906 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 907 if (LSQUIC_STREAM_HANDSHAKE == stream->id 908 || ((stream->stream_flags & STREAM_USE_HEADERS) && 909 LSQUIC_STREAM_HEADERS == stream->id)) 910 { 911 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 912 stream->stream_flags |= STREAM_FORCE_FINISH; 913 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 914 } 915 maybe_finish_stream(stream); 916 maybe_schedule_call_on_close(stream); 917} 918 919 920static void 921fake_reset_unused_stream (lsquic_stream_t *stream) 922{ 923 stream->stream_flags |= 924 STREAM_RST_RECVD /* User will pick this up on read or write */ 925 | STREAM_RST_SENT /* Don't send anything else on this stream */ 926 ; 927 928 /* Cancel all writes to the network scheduled for this stream: */ 929 if (stream->stream_flags & STREAM_SENDING_FLAGS) 930 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 931 next_send_stream); 932 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 933 934 LSQ_DEBUG("fake-reset stream %u%s", 935 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 936 maybe_finish_stream(stream); 937 maybe_schedule_call_on_close(stream); 938} 939 940 941/* This function should only be called for locally-initiated streams whose ID 942 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 943 * frame sent by peer but we have not yet received it and created a stream. 944 * In this situation, we mark the stream as reset, so that user's on_read or 945 * on_write event callback picks up the error. That, in turn, should result 946 * in stream being closed. 947 * 948 * If we have received any data frames on this stream, this probably indicates 949 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 950 * lower than this. However, we still try to handle it gracefully and peform 951 * a shutdown, as if the stream was not reset. 952 */ 953void 954lsquic_stream_received_goaway (lsquic_stream_t *stream) 955{ 956 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 957 if (0 == stream->read_offset && 958 stream->data_in->di_if->di_empty(stream->data_in)) 959 fake_reset_unused_stream(stream); /* Normal condition */ 960 else 961 { /* This is odd, let's handle it the best we can: */ 962 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 963 lsquic_stream_shutdown_internal(stream); 964 } 965} 966 967 968uint64_t 969lsquic_stream_read_offset (const lsquic_stream_t *stream) 970{ 971 return stream->read_offset; 972} 973 974 975static int 976stream_wantread (lsquic_stream_t *stream, int is_want) 977{ 978 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 979 const int new_val = !!is_want; 980 if (old_val != new_val) 981 { 982 if (new_val) 983 { 984 if (!old_val) 985 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 986 next_read_stream); 987 stream->stream_flags |= STREAM_WANT_READ; 988 } 989 else 990 { 991 stream->stream_flags &= ~STREAM_WANT_READ; 992 if (old_val) 993 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 994 next_read_stream); 995 } 996 } 997 return old_val; 998} 999 1000 1001static void 1002maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1003{ 1004 assert(STREAM_WRITE_Q_FLAGS & flag); 1005 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1006 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1007 next_write_stream); 1008 stream->stream_flags |= flag; 1009} 1010 1011 1012static void 1013maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1014{ 1015 assert(STREAM_WRITE_Q_FLAGS & flag); 1016 if (stream->stream_flags & flag) 1017 { 1018 stream->stream_flags &= ~flag; 1019 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1020 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1021 next_write_stream); 1022 } 1023} 1024 1025 1026static int 1027stream_wantwrite (lsquic_stream_t *stream, int is_want) 1028{ 1029 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1030 const int new_val = !!is_want; 1031 if (old_val != new_val) 1032 { 1033 if (new_val) 1034 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1035 else 1036 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1037 } 1038 return old_val; 1039} 1040 1041 1042int 1043lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1044{ 1045 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1046 { 1047 if (is_want) 1048 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD); 1049 return stream_wantread(stream, is_want); 1050 } 1051 else 1052 { 1053 errno = EBADF; 1054 return -1; 1055 } 1056} 1057 1058 1059int 1060lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1061{ 1062 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1063 return stream_wantwrite(stream, is_want); 1064 else 1065 { 1066 errno = EBADF; 1067 return -1; 1068 } 1069} 1070 1071 1072#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1073 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1074 1075 1076static void 1077stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1078{ 1079 unsigned no_progress_count, no_progress_limit; 1080 enum stream_flags flags; 1081 uint64_t size; 1082 1083 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1084 1085 no_progress_count = 0; 1086 while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1087 { 1088 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1089 size = stream->read_offset; 1090 1091 stream->stream_if->on_read(stream, stream->st_ctx); 1092 1093 if (no_progress_limit && size == stream->read_offset && 1094 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1095 { 1096 ++no_progress_count; 1097 if (no_progress_count >= no_progress_limit) 1098 { 1099 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1100 "progress) in user code reading from stream", 1101 no_progress_count, 1102 no_progress_count == 1 ? "" : "s"); 1103 break; 1104 } 1105 } 1106 else 1107 no_progress_count = 0; 1108 } 1109} 1110 1111 1112static void 1113stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1114{ 1115 unsigned no_progress_count, no_progress_limit; 1116 enum stream_flags flags; 1117 1118 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1119 1120 no_progress_count = 0; 1121 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1122 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1123 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1124 && stream_write_avail(stream)) 1125 { 1126 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1127 1128 stream->stream_if->on_write(stream, stream->st_ctx); 1129 1130 if (no_progress_limit && 1131 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1132 { 1133 ++no_progress_count; 1134 if (no_progress_count >= no_progress_limit) 1135 { 1136 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1137 "progress) in user code writing to stream", 1138 no_progress_count, 1139 no_progress_count == 1 ? "" : "s"); 1140 break; 1141 } 1142 } 1143 else 1144 no_progress_count = 0; 1145 } 1146} 1147 1148 1149static void 1150stream_dispatch_read_events_once (lsquic_stream_t *stream) 1151{ 1152 if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1153 { 1154 stream->stream_if->on_read(stream, stream->st_ctx); 1155 } 1156} 1157 1158 1159static void 1160maybe_mark_as_blocked (lsquic_stream_t *stream) 1161{ 1162 struct lsquic_conn_cap *cc; 1163 1164 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1165 { 1166 if (stream->blocked_off < stream->max_send_off) 1167 { 1168 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1169 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1170 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1171 next_send_stream); 1172 stream->stream_flags |= STREAM_SEND_BLOCKED; 1173 LSQ_DEBUG("marked stream-blocked at stream offset " 1174 "%"PRIu64, stream->blocked_off); 1175 } 1176 else 1177 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1178 " has been, or is about to be, sent", stream->blocked_off); 1179 } 1180 1181 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1182 && (cc = &stream->conn_pub->conn_cap, 1183 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1184 { 1185 if (cc->cc_blocked < cc->cc_max) 1186 { 1187 cc->cc_blocked = cc->cc_max; 1188 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1189 LSQ_DEBUG("marked connection-blocked at connection offset " 1190 "%"PRIu64, cc->cc_max); 1191 } 1192 else 1193 LSQ_DEBUG("stream has already been marked connection-blocked " 1194 "at offset %"PRIu64, cc->cc_blocked); 1195 } 1196} 1197 1198 1199void 1200lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1201{ 1202 assert(stream->stream_flags & STREAM_WANT_READ); 1203 1204 if (stream->stream_flags & STREAM_RW_ONCE) 1205 stream_dispatch_read_events_once(stream); 1206 else 1207 stream_dispatch_read_events_loop(stream); 1208} 1209 1210 1211void 1212lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1213{ 1214 int progress; 1215 uint64_t tosend_off; 1216 unsigned short n_buffered; 1217 enum stream_flags flags; 1218 1219 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1220 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1221 tosend_off = stream->tosend_off; 1222 n_buffered = stream->sm_n_buffered; 1223 1224 if (stream->stream_flags & STREAM_WANT_FLUSH) 1225 (void) stream_flush(stream); 1226 1227 if (stream->stream_flags & STREAM_RW_ONCE) 1228 { 1229 if ((stream->stream_flags & STREAM_WANT_WRITE) 1230 && stream_write_avail(stream)) 1231 { 1232 stream->stream_if->on_write(stream, stream->st_ctx); 1233 } 1234 } 1235 else 1236 stream_dispatch_write_events_loop(stream); 1237 1238 /* Progress means either flags or offsets changed: */ 1239 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1240 stream->tosend_off == tosend_off && 1241 stream->sm_n_buffered == n_buffered); 1242 1243 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1244 { 1245 if (progress) 1246 { /* Move the stream to the end of the list to ensure fairness. */ 1247 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1248 next_write_stream); 1249 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1250 next_write_stream); 1251 } 1252 } 1253} 1254 1255 1256static size_t 1257inner_reader_empty_size (void *ctx) 1258{ 1259 return 0; 1260} 1261 1262 1263static size_t 1264inner_reader_empty_read (void *ctx, void *buf, size_t count) 1265{ 1266 return 0; 1267} 1268 1269 1270static int 1271stream_flush (lsquic_stream_t *stream) 1272{ 1273 struct lsquic_reader empty_reader; 1274 ssize_t nw; 1275 1276 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1277 assert(stream->sm_n_buffered > 0 || 1278 /* Flushing is also used to packetize standalone FIN: */ 1279 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1280 == STREAM_U_WRITE_DONE)); 1281 1282 empty_reader.lsqr_size = inner_reader_empty_size; 1283 empty_reader.lsqr_read = inner_reader_empty_read; 1284 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1285 nw = stream_write_to_packets(stream, &empty_reader, 0); 1286 1287 if (nw >= 0) 1288 { 1289 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1290 return 0; 1291 } 1292 else 1293 return -1; 1294} 1295 1296 1297static int 1298stream_flush_nocheck (lsquic_stream_t *stream) 1299{ 1300 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1301 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1302 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1303 1304 return stream_flush(stream); 1305} 1306 1307 1308int 1309lsquic_stream_flush (lsquic_stream_t *stream) 1310{ 1311 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1312 { 1313 LSQ_DEBUG("cannot flush closed stream"); 1314 errno = EBADF; 1315 return -1; 1316 } 1317 1318 if (0 == stream->sm_n_buffered) 1319 { 1320 LSQ_DEBUG("flushing 0 bytes: noop"); 1321 return 0; 1322 } 1323 1324 return stream_flush_nocheck(stream); 1325} 1326 1327 1328/* The flush threshold is the maximum size of stream data that can be sent 1329 * in a full packet. 1330 */ 1331static size_t 1332flush_threshold (const lsquic_stream_t *stream) 1333{ 1334 enum packet_out_flags flags; 1335 enum lsquic_packno_bits bits; 1336 unsigned packet_header_sz, stream_header_sz; 1337 size_t threshold; 1338 1339 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1340 flags = bits << POBIT_SHIFT; 1341 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1342 flags |= PO_CONN_ID; 1343 1344 packet_header_sz = lsquic_po_header_length(flags); 1345 stream_header_sz = stream->conn_pub->lconn->cn_pf 1346 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1347 1348 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1349 - packet_header_sz - stream_header_sz; 1350 return threshold; 1351} 1352 1353 1354#define COMMON_WRITE_CHECKS() do { \ 1355 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1356 == STREAM_USE_HEADERS) \ 1357 { \ 1358 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1359 errno = EILSEQ; \ 1360 return -1; \ 1361 } \ 1362 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1363 { \ 1364 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1365 errno = ECONNRESET; \ 1366 return -1; \ 1367 } \ 1368 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1369 { \ 1370 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1371 "writing"); \ 1372 errno = EBADF; \ 1373 return -1; \ 1374 } \ 1375} while (0) 1376 1377 1378struct frame_gen_ctx 1379{ 1380 lsquic_stream_t *fgc_stream; 1381 struct lsquic_reader *fgc_reader; 1382 /* We keep our own count of how many bytes were read from reader because 1383 * some readers are external. The external caller does not have to rely 1384 * on our count, but it can. 1385 */ 1386 size_t fgc_nread_from_reader; 1387}; 1388 1389 1390static size_t 1391frame_gen_size (void *ctx) 1392{ 1393 struct frame_gen_ctx *fg_ctx = ctx; 1394 size_t available, remaining; 1395 1396 /* Make sure we are not writing past available size: */ 1397 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1398 available = stream_write_avail(fg_ctx->fgc_stream); 1399 if (available < remaining) 1400 remaining = available; 1401 1402 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1403} 1404 1405 1406static int 1407frame_gen_fin (void *ctx) 1408{ 1409 struct frame_gen_ctx *fg_ctx = ctx; 1410 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1411 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1412 /* Do not use frame_gen_size() as it may chop the real size: */ 1413 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1414} 1415 1416 1417static void 1418incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1419{ 1420 if (stream->stream_flags & STREAM_CONN_LIMITED) 1421 { 1422 stream->conn_pub->conn_cap.cc_sent += incr; 1423 assert(stream->conn_pub->conn_cap.cc_sent 1424 <= stream->conn_pub->conn_cap.cc_max); 1425 } 1426} 1427 1428 1429static size_t 1430frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1431{ 1432 struct frame_gen_ctx *fg_ctx = ctx; 1433 unsigned char *p = begin_buf; 1434 unsigned char *const end = p + len; 1435 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1436 size_t n_written, available, n_to_write; 1437 1438 if (stream->sm_n_buffered > 0) 1439 { 1440 if (len <= stream->sm_n_buffered) 1441 { 1442 memcpy(p, stream->sm_buf, len); 1443 memmove(stream->sm_buf, stream->sm_buf + len, 1444 stream->sm_n_buffered - len); 1445 stream->sm_n_buffered -= len; 1446 stream->tosend_off += len; 1447 *fin = frame_gen_fin(fg_ctx); 1448 return len; 1449 } 1450 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1451 p += stream->sm_n_buffered; 1452 stream->sm_n_buffered = 0; 1453 } 1454 1455 available = stream_write_avail(fg_ctx->fgc_stream); 1456 n_to_write = end - p; 1457 if (n_to_write > available) 1458 n_to_write = available; 1459 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1460 n_to_write); 1461 p += n_written; 1462 fg_ctx->fgc_nread_from_reader += n_written; 1463 *fin = frame_gen_fin(fg_ctx); 1464 stream->tosend_off += p - (const unsigned char *) begin_buf; 1465 incr_conn_cap(stream, n_written); 1466 return p - (const unsigned char *) begin_buf; 1467} 1468 1469 1470static void 1471check_flush_threshold (lsquic_stream_t *stream) 1472{ 1473 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1474 stream->tosend_off >= stream->sm_flush_to) 1475 { 1476 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1477 stream->sm_flush_to); 1478 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1479 } 1480} 1481 1482 1483static struct lsquic_packet_out * 1484get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1485 const struct lsquic_stream *stream) 1486{ 1487 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1488} 1489 1490 1491static struct lsquic_packet_out * (* const get_packet[])( 1492 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1493{ 1494 lsquic_send_ctl_get_packet_for_stream, 1495 get_brand_new_packet, 1496}; 1497 1498 1499static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1500stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1501{ 1502 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1503 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1504 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1505 unsigned stream_header_sz, need_at_least, off; 1506 lsquic_packet_out_t *packet_out; 1507 int len, s, hsk; 1508 1509 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1510 stream->tosend_off); 1511 need_at_least = stream_header_sz + (size > 0); 1512 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1513 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1514 if (!packet_out) 1515 return SWTP_STOP; 1516 1517 off = packet_out->po_data_sz; 1518 len = pf->pf_gen_stream_frame( 1519 packet_out->po_data + packet_out->po_data_sz, 1520 lsquic_packet_out_avail(packet_out), stream->id, 1521 stream->tosend_off, 1522 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1523 if (len < 0) 1524 { 1525 LSQ_ERROR("could not generate stream frame"); 1526 return SWTP_ERROR; 1527 } 1528 1529 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1530 packet_out->po_data + packet_out->po_data_sz, len); 1531 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1532 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1533 if (0 == lsquic_packet_out_avail(packet_out)) 1534 packet_out->po_flags |= PO_STREAM_END; 1535 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1536 stream, QUIC_FRAME_STREAM, off, len); 1537 if (s != 0) 1538 { 1539 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1540 return SWTP_ERROR; 1541 } 1542 1543 check_flush_threshold(stream); 1544 1545 /* XXX: I don't like it that this is here */ 1546 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1547 { 1548 lsquic_packet_out_zero_pad(packet_out); 1549 packet_out->po_flags |= PO_HELLO; 1550 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1551 } 1552 1553 return SWTP_OK; 1554} 1555 1556 1557static void 1558abort_connection (struct lsquic_stream *stream) 1559{ 1560 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1561 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1562 next_service_stream); 1563 stream->stream_flags |= STREAM_ABORT_CONN; 1564 LSQ_WARN("connection will be aborted"); 1565} 1566 1567 1568static ssize_t 1569stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1570 size_t thresh) 1571{ 1572 size_t size; 1573 ssize_t nw; 1574 struct frame_gen_ctx fg_ctx = { 1575 .fgc_stream = stream, 1576 .fgc_reader = reader, 1577 .fgc_nread_from_reader = 0, 1578 }; 1579 1580 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1581 || frame_gen_fin(&fg_ctx)) 1582 { 1583 switch (stream_write_to_packet(&fg_ctx, size)) 1584 { 1585 case SWTP_OK: 1586 if (frame_gen_fin(&fg_ctx)) 1587 { 1588 stream->stream_flags |= STREAM_FIN_SENT; 1589 goto end; 1590 } 1591 else 1592 break; 1593 case SWTP_STOP: 1594 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1595 goto end; 1596 default: 1597 abort_connection(stream); 1598 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1599 return -1; 1600 } 1601 } 1602 1603 if (thresh) 1604 { 1605 assert(size < thresh); 1606 assert(size >= stream->sm_n_buffered); 1607 size -= stream->sm_n_buffered; 1608 if (size > 0) 1609 { 1610 nw = save_to_buffer(stream, reader, size); 1611 if (nw < 0) 1612 return -1; 1613 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1614 } 1615 } 1616 else 1617 { 1618 /* We count flushed data towards both stream and connection limits, 1619 * so we should have been able to packetize all of it: 1620 */ 1621 assert(0 == stream->sm_n_buffered); 1622 assert(size == 0); 1623 } 1624 1625 maybe_mark_as_blocked(stream); 1626 1627 end: 1628 return fg_ctx.fgc_nread_from_reader; 1629} 1630 1631 1632/* Perform an implicit flush when we hit connection limit while buffering 1633 * data. This is to prevent a (theoretical) stall: 1634 * 1635 * Imagine a number of streams, all of which buffered some data. The buffered 1636 * data is up to connection cap, which means no further writes are possible. 1637 * None of them flushes, which means that data is not sent and connection 1638 * WINDOW_UPDATE frame never arrives from peer. Stall. 1639 */ 1640static void 1641maybe_flush_stream (struct lsquic_stream *stream) 1642{ 1643 if (stream->sm_n_buffered > 0 1644 && (stream->stream_flags & STREAM_CONN_LIMITED) 1645 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1646 stream_flush_nocheck(stream); 1647} 1648 1649 1650static ssize_t 1651save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1652 size_t len) 1653{ 1654 size_t avail, n_written; 1655 1656 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1657 1658 if (!stream->sm_buf) 1659 { 1660 stream->sm_buf = malloc(SM_BUF_SIZE); 1661 if (!stream->sm_buf) 1662 return -1; 1663 } 1664 1665 avail = stream_write_avail(stream); 1666 if (avail < len) 1667 len = avail; 1668 1669 n_written = reader->lsqr_read(reader->lsqr_ctx, 1670 stream->sm_buf + stream->sm_n_buffered, len); 1671 stream->sm_n_buffered += n_written; 1672 incr_conn_cap(stream, n_written); 1673 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1674 n_written, stream->sm_n_buffered); 1675 maybe_flush_stream(stream); 1676 return n_written; 1677} 1678 1679 1680static ssize_t 1681stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1682{ 1683 size_t thresh, len; 1684 1685 thresh = flush_threshold(stream); 1686 len = reader->lsqr_size(reader->lsqr_ctx); 1687 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1688 stream->sm_n_buffered + len < thresh) 1689 return save_to_buffer(stream, reader, len); 1690 else 1691 return stream_write_to_packets(stream, reader, thresh); 1692} 1693 1694 1695ssize_t 1696lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1697{ 1698 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1699 return lsquic_stream_writev(stream, &iov, 1); 1700} 1701 1702 1703struct inner_reader_iovec { 1704 const struct iovec *iov; 1705 const struct iovec *const end; 1706 unsigned cur_iovec_off; 1707}; 1708 1709 1710static size_t 1711inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1712{ 1713 struct inner_reader_iovec *const iro = ctx; 1714 unsigned char *p = buf; 1715 unsigned char *const end = p + count; 1716 unsigned n_tocopy; 1717 1718 while (iro->iov < iro->end && p < end) 1719 { 1720 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1721 if (n_tocopy > (unsigned) (end - p)) 1722 n_tocopy = end - p; 1723 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1724 n_tocopy); 1725 p += n_tocopy; 1726 iro->cur_iovec_off += n_tocopy; 1727 if (iro->iov->iov_len == iro->cur_iovec_off) 1728 { 1729 ++iro->iov; 1730 iro->cur_iovec_off = 0; 1731 } 1732 } 1733 1734 return p + count - end; 1735} 1736 1737 1738static size_t 1739inner_reader_iovec_size (void *ctx) 1740{ 1741 struct inner_reader_iovec *const iro = ctx; 1742 const struct iovec *iov; 1743 size_t size; 1744 1745 size = 0; 1746 for (iov = iro->iov; iov < iro->end; ++iov) 1747 size += iov->iov_len; 1748 1749 return size - iro->cur_iovec_off; 1750} 1751 1752 1753ssize_t 1754lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1755 int iovcnt) 1756{ 1757 COMMON_WRITE_CHECKS(); 1758 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1759 1760 struct inner_reader_iovec iro = { 1761 .iov = iov, 1762 .end = iov + iovcnt, 1763 .cur_iovec_off = 0, 1764 }; 1765 struct lsquic_reader reader = { 1766 .lsqr_read = inner_reader_iovec_read, 1767 .lsqr_size = inner_reader_iovec_size, 1768 .lsqr_ctx = &iro, 1769 }; 1770 1771 return stream_write(stream, &reader); 1772} 1773 1774 1775ssize_t 1776lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1777{ 1778 COMMON_WRITE_CHECKS(); 1779 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1780 return stream_write(stream, reader); 1781} 1782 1783 1784int 1785lsquic_stream_send_headers (lsquic_stream_t *stream, 1786 const lsquic_http_headers_t *headers, int eos) 1787{ 1788 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1789 STREAM_U_WRITE_DONE)) 1790 == STREAM_USE_HEADERS) 1791 { 1792 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1793 stream->id, headers, eos, lsquic_stream_priority(stream)); 1794 if (0 == s) 1795 { 1796 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1797 stream->stream_flags |= STREAM_HEADERS_SENT; 1798 if (eos) 1799 stream->stream_flags |= STREAM_FIN_SENT; 1800 LSQ_INFO("sent headers for stream %u", stream->id); 1801 } 1802 else 1803 LSQ_WARN("could not send headers: %s", strerror(errno)); 1804 return s; 1805 } 1806 else 1807 { 1808 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1809 errno = EBADMSG; 1810 return -1; 1811 } 1812} 1813 1814 1815void 1816lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1817{ 1818 if (offset > stream->max_send_off) 1819 { 1820 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1821 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1822 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1823 stream->max_send_off = offset; 1824 } 1825 else 1826 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1827 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1828 stream->max_send_off); 1829} 1830 1831 1832/* This function is used to update offsets after handshake completes and we 1833 * learn of peer's limits from the handshake values. 1834 */ 1835int 1836lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1837{ 1838 LSQ_DEBUG("setting max_send_off to %u", offset); 1839 if (offset > stream->max_send_off) 1840 { 1841 lsquic_stream_window_update(stream, offset); 1842 return 0; 1843 } 1844 else if (offset < stream->tosend_off) 1845 { 1846 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1847 "already sent on this stream (%"PRIu64" bytes)", offset, 1848 stream->tosend_off); 1849 return -1; 1850 } 1851 else 1852 { 1853 stream->max_send_off = offset; 1854 return 0; 1855 } 1856} 1857 1858 1859void 1860lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1861{ 1862 lsquic_stream_reset_ext(stream, error_code, 1); 1863} 1864 1865 1866void 1867lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1868 int do_close) 1869{ 1870 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1871 { 1872 LSQ_INFO("reset already sent"); 1873 return; 1874 } 1875 1876 SM_HISTORY_APPEND(stream, SHE_RESET); 1877 1878 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1879 stream->error_code = error_code; 1880 1881 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1882 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1883 next_send_stream); 1884 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1885 stream->stream_flags |= STREAM_SEND_RST; 1886 1887 drop_buffered_data(stream); 1888 maybe_elide_stream_frames(stream); 1889 maybe_schedule_call_on_close(stream); 1890 1891 if (do_close) 1892 lsquic_stream_close(stream); 1893 else 1894 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT); 1895} 1896 1897 1898unsigned 1899lsquic_stream_id (const lsquic_stream_t *stream) 1900{ 1901 return stream->id; 1902} 1903 1904 1905struct lsquic_conn * 1906lsquic_stream_conn (const lsquic_stream_t *stream) 1907{ 1908 return stream->conn_pub->lconn; 1909} 1910 1911 1912int 1913lsquic_stream_close (lsquic_stream_t *stream) 1914{ 1915 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 1916 SM_HISTORY_APPEND(stream, SHE_CLOSE); 1917 if (lsquic_stream_is_closed(stream)) 1918 { 1919 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 1920 errno = EBADF; 1921 return -1; 1922 } 1923 stream_shutdown_write(stream); 1924 stream_shutdown_read(stream); 1925 maybe_schedule_call_on_close(stream); 1926 maybe_finish_stream(stream); 1927 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE); 1928 return 0; 1929} 1930 1931 1932#ifndef NDEBUG 1933#if __GNUC__ 1934__attribute__((weak)) 1935#endif 1936#endif 1937void 1938lsquic_stream_acked (lsquic_stream_t *stream) 1939{ 1940 assert(stream->n_unacked); 1941 --stream->n_unacked; 1942 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 1943 if (0 == stream->n_unacked) 1944 maybe_finish_stream(stream); 1945} 1946 1947 1948void 1949lsquic_stream_push_req (lsquic_stream_t *stream, 1950 struct uncompressed_headers *push_req) 1951{ 1952 assert(!stream->push_req); 1953 stream->push_req = push_req; 1954 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 1955} 1956 1957 1958int 1959lsquic_stream_is_pushed (const lsquic_stream_t *stream) 1960{ 1961 return 1 & ~stream->id; 1962} 1963 1964 1965int 1966lsquic_stream_push_info (const lsquic_stream_t *stream, 1967 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 1968{ 1969 if (lsquic_stream_is_pushed(stream)) 1970 { 1971 assert(stream->push_req); 1972 *ref_stream_id = stream->push_req->uh_stream_id; 1973 *headers = stream->push_req->uh_headers; 1974 *headers_sz = stream->push_req->uh_size; 1975 return 0; 1976 } 1977 else 1978 return -1; 1979} 1980 1981 1982int 1983lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 1984{ 1985 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 1986 { 1987 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 1988 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 1989 stream->stream_flags |= STREAM_HAVE_UH; 1990 if (uh->uh_flags & UH_FIN) 1991 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 1992 stream->uh = uh; 1993 if (uh->uh_oth_stream_id == 0) 1994 { 1995 if (uh->uh_weight) 1996 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 1997 } 1998 else 1999 LSQ_NOTICE("don't know how to depend on stream %u", 2000 uh->uh_oth_stream_id); 2001 return 0; 2002 } 2003 else 2004 { 2005 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2006 return -1; 2007 } 2008} 2009 2010 2011unsigned 2012lsquic_stream_priority (const lsquic_stream_t *stream) 2013{ 2014 return 256 - stream->sm_priority; 2015} 2016 2017 2018int 2019lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2020{ 2021 /* The user should never get a reference to the special streams, 2022 * but let's check just in case: 2023 */ 2024 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2025 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2026 LSQUIC_STREAM_HEADERS == stream->id)) 2027 return -1; 2028 if (priority < 1 || priority > 256) 2029 return -1; 2030 stream->sm_priority = 256 - priority; 2031 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2032 LSQ_DEBUG("set priority to %u", priority); 2033 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2034 return 0; 2035} 2036 2037 2038int 2039lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2040{ 2041 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2042 { 2043 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2044 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2045 { 2046 /* We need to send headers only if we are a) using HEADERS stream 2047 * and b) we already sent initial headers. If initial headers 2048 * have not been sent yet, stream priority will be sent in the 2049 * HEADERS frame. 2050 */ 2051 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2052 stream->id, 0, 0, priority); 2053 } 2054 else 2055 return 0; 2056 } 2057 else 2058 return -1; 2059} 2060 2061 2062lsquic_stream_ctx_t * 2063lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2064{ 2065 return stream->st_ctx; 2066} 2067 2068 2069int 2070lsquic_stream_refuse_push (lsquic_stream_t *stream) 2071{ 2072 if (lsquic_stream_is_pushed(stream) && 2073 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2074 { 2075 LSQ_DEBUG("refusing pushed stream: send reset"); 2076 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2077 return 0; 2078 } 2079 else 2080 return -1; 2081} 2082 2083 2084size_t 2085lsquic_stream_mem_used (const struct lsquic_stream *stream) 2086{ 2087 size_t size; 2088 2089 size = sizeof(stream); 2090 if (stream->sm_buf) 2091 size += SM_BUF_SIZE; 2092 if (stream->data_in) 2093 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2094 2095 return size; 2096} 2097 2098 2099lsquic_cid_t 2100lsquic_stream_cid (const struct lsquic_stream *stream) 2101{ 2102 return LSQUIC_LOG_CONN_ID; 2103} 2104