lsquic_stream.c revision c7d81ce1
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_stream.h" 39#include "lsquic_conn_public.h" 40#include "lsquic_util.h" 41#include "lsquic_mm.h" 42#include "lsquic_headers_stream.h" 43#include "lsquic_conn.h" 44#include "lsquic_data_in_if.h" 45#include "lsquic_parse.h" 46#include "lsquic_packet_out.h" 47#include "lsquic_engine_public.h" 48#include "lsquic_senhist.h" 49#include "lsquic_pacer.h" 50#include "lsquic_cubic.h" 51#include "lsquic_send_ctl.h" 52#include "lsquic_headers.h" 53#include "lsquic_ev_log.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 57#define LSQUIC_LOG_STREAM_ID stream->id 58#include "lsquic_logger.h" 59 60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ 61 62static void 63drop_frames_in (lsquic_stream_t *stream); 64 65static void 66maybe_schedule_call_on_close (lsquic_stream_t *stream); 67 68static int 69stream_wantread (lsquic_stream_t *stream, int is_want); 70 71static int 72stream_wantwrite (lsquic_stream_t *stream, int is_want); 73 74static ssize_t 75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 76 77static ssize_t 78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 79 80static int 81stream_flush (lsquic_stream_t *stream); 82 83static int 84stream_flush_nocheck (lsquic_stream_t *stream); 85 86static void 87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag); 88 89 90#if LSQUIC_KEEP_STREAM_HISTORY 91/* These values are printable ASCII characters for ease of printing the 92 * whole history in a single line of a log message. 93 * 94 * The list of events is not exhaustive: only most interesting events 95 * are recorded. 96 */ 97enum stream_history_event 98{ 99 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 100 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 101 SHE_REACH_FIN = 'a', 102 SHE_BLOCKED_OUT = 'b', 103 SHE_CREATED = 'C', 104 SHE_FRAME_IN = 'd', 105 SHE_FRAME_OUT = 'D', 106 SHE_RESET = 'e', 107 SHE_WINDOW_UPDATE = 'E', 108 SHE_FIN_IN = 'f', 109 SHE_FINISHED = 'F', 110 SHE_GOAWAY_IN = 'g', 111 SHE_USER_WRITE_HEADER = 'h', 112 SHE_HEADERS_IN = 'H', 113 SHE_ONCLOSE_SCHED = 'l', 114 SHE_ONCLOSE_CALL = 'L', 115 SHE_ONNEW = 'N', 116 SHE_SET_PRIO = 'p', 117 SHE_USER_READ = 'r', 118 SHE_SHUTDOWN_READ = 'R', 119 SHE_RST_IN = 's', 120 SHE_RST_OUT = 't', 121 SHE_FLUSH = 'u', 122 SHE_USER_WRITE_DATA = 'w', 123 SHE_SHUTDOWN_WRITE = 'W', 124 SHE_CLOSE = 'X', 125 SHE_FORCE_FINISH = 'Z', 126}; 127 128static void 129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 130{ 131 enum stream_history_event prev_event; 132 sm_hist_idx_t idx; 133 int plus; 134 135 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 136 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 137 idx = (idx - plus) & SM_HIST_IDX_MASK; 138 prev_event = stream->sm_hist_buf[idx]; 139 140 if (prev_event == sh_event && plus) 141 return; 142 143 if (prev_event == sh_event) 144 sh_event = SHE_PLUS; 145 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 146 147 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 148 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 149 stream->sm_hist_buf); 150} 151 152 153# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 154# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 155 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 156 LSQ_DEBUG("history: [%.*s]", \ 157 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 158 (stream)->sm_hist_buf); \ 159 } while (0) 160#else 161# define SM_HISTORY_APPEND(stream, event) 162# define SM_HISTORY_DUMP_REMAINING(stream) 163#endif 164 165 166static int 167stream_inside_callback (const lsquic_stream_t *stream) 168{ 169 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 170} 171 172 173static void 174maybe_conn_to_tickable (lsquic_stream_t *stream) 175{ 176 if (!stream_inside_callback(stream)) 177 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 178 stream->conn_pub->lconn); 179} 180 181 182/* Here, "readable" means that the user is able to read from the stream. */ 183static void 184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 185{ 186 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 187 { 188 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 189 stream->conn_pub->lconn); 190 } 191} 192 193 194/* Here, "writeable" means that data can be put into packets to be 195 * scheduled to be sent out. 196 * 197 * If `check_can_send' is false, it means that we do not need to check 198 * whether packets can be sent. This check was already performed when 199 * we packetized stream data. 200 */ 201static void 202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 203 int check_can_send) 204{ 205 if (!stream_inside_callback(stream) && 206 (!check_can_send 207 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 208 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 209 { 210 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 211 stream->conn_pub->lconn); 212 } 213} 214 215 216static int 217stream_stalled (const lsquic_stream_t *stream) 218{ 219 return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) && 220 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 221 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 222} 223 224 225/* TODO: The logic to figure out whether the stream is connection limited 226 * should be taken out of the constructor. The caller should specify this 227 * via one of enum stream_ctor_flags. 228 */ 229lsquic_stream_t * 230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 231 const struct lsquic_stream_if *stream_if, 232 void *stream_if_ctx, unsigned initial_window, 233 unsigned initial_send_off, 234 enum stream_ctor_flags ctor_flags) 235{ 236 lsquic_cfcw_t *cfcw; 237 lsquic_stream_t *stream; 238 239 stream = calloc(1, sizeof(*stream)); 240 if (!stream) 241 return NULL; 242 243 stream->stream_if = stream_if; 244 stream->id = id; 245 stream->conn_pub = conn_pub; 246 stream->sm_onnew_arg = stream_if_ctx; 247 if (!initial_window) 248 initial_window = 16 * 1024; 249 if (LSQUIC_STREAM_HANDSHAKE == id || 250 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 251 cfcw = NULL; 252 else 253 { 254 cfcw = &conn_pub->cfcw; 255 stream->stream_flags |= STREAM_CONN_LIMITED; 256 if (conn_pub->hs) 257 stream->stream_flags |= STREAM_USE_HEADERS; 258 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 259 } 260 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 261 if (!initial_send_off) 262 initial_send_off = 16 * 1024; 263 stream->max_send_off = initial_send_off; 264 if (ctor_flags & SCF_USE_DI_HASH) 265 stream->data_in = data_in_hash_new(conn_pub, id, 0); 266 else 267 stream->data_in = data_in_nocopy_new(conn_pub, id); 268 LSQ_DEBUG("created stream %u @%p", id, stream); 269 SM_HISTORY_APPEND(stream, SHE_CREATED); 270 if (ctor_flags & SCF_DI_AUTOSWITCH) 271 stream->stream_flags |= STREAM_AUTOSWITCH; 272 if (ctor_flags & SCF_CALL_ON_NEW) 273 lsquic_stream_call_on_new(stream); 274 if (ctor_flags & SCF_DISP_RW_ONCE) 275 stream->stream_flags |= STREAM_RW_ONCE; 276 if (ctor_flags & SCF_CRITICAL) 277 stream->stream_flags |= STREAM_CRITICAL; 278 return stream; 279} 280 281 282void 283lsquic_stream_call_on_new (lsquic_stream_t *stream) 284{ 285 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 286 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 287 { 288 LSQ_DEBUG("calling on_new_stream"); 289 SM_HISTORY_APPEND(stream, SHE_ONNEW); 290 stream->stream_flags |= STREAM_ONNEW_DONE; 291 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 292 stream); 293 } 294} 295 296 297static void 298decr_conn_cap (struct lsquic_stream *stream, size_t incr) 299{ 300 if (stream->stream_flags & STREAM_CONN_LIMITED) 301 { 302 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 303 stream->conn_pub->conn_cap.cc_sent -= incr; 304 } 305} 306 307 308static void 309drop_buffered_data (struct lsquic_stream *stream) 310{ 311 decr_conn_cap(stream, stream->sm_n_buffered); 312 stream->sm_n_buffered = 0; 313 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 314 maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS); 315} 316 317 318static void 319destroy_uh (struct lsquic_stream *stream) 320{ 321 if (stream->uh) 322 { 323 if (stream->uh->uh_hset) 324 stream->conn_pub->enpub->enp_hsi_if 325 ->hsi_discard_header_set(stream->uh->uh_hset); 326 free(stream->uh); 327 stream->uh = NULL; 328 } 329} 330 331 332void 333lsquic_stream_destroy (lsquic_stream_t *stream) 334{ 335 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 336 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 337 STREAM_ONNEW_DONE) 338 { 339 stream->stream_flags |= STREAM_ONCLOSE_DONE; 340 stream->stream_if->on_close(stream, stream->st_ctx); 341 } 342 if (stream->stream_flags & STREAM_SENDING_FLAGS) 343 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 344 if (stream->stream_flags & STREAM_WANT_READ) 345 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 346 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 347 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 348 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 349 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 350 drop_buffered_data(stream); 351 lsquic_sfcw_consume_rem(&stream->fc); 352 drop_frames_in(stream); 353 if (stream->push_req) 354 { 355 if (stream->push_req->uh_hset) 356 stream->conn_pub->enpub->enp_hsi_if 357 ->hsi_discard_header_set(stream->push_req->uh_hset); 358 free(stream->push_req); 359 } 360 destroy_uh(stream); 361 free(stream->sm_buf); 362 LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream); 363 SM_HISTORY_DUMP_REMAINING(stream); 364 free(stream); 365} 366 367 368static int 369stream_is_finished (const lsquic_stream_t *stream) 370{ 371 return lsquic_stream_is_closed(stream) 372 /* n_unacked checks that no outgoing packets that reference this 373 * stream are outstanding: 374 */ 375 && 0 == stream->n_unacked 376 /* This checks that no packets that reference this stream will 377 * become outstanding: 378 */ 379 && 0 == (stream->stream_flags & STREAM_SEND_RST) 380 && ((stream->stream_flags & STREAM_FORCE_FINISH) 381 || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 382 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 383} 384 385 386static void 387maybe_finish_stream (lsquic_stream_t *stream) 388{ 389 if (0 == (stream->stream_flags & STREAM_FINISHED) && 390 stream_is_finished(stream)) 391 { 392 LSQ_DEBUG("stream %u is now finished", stream->id); 393 SM_HISTORY_APPEND(stream, SHE_FINISHED); 394 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 395 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 396 next_service_stream); 397 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 398 } 399} 400 401 402static void 403maybe_schedule_call_on_close (lsquic_stream_t *stream) 404{ 405 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 406 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 407 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 408 { 409 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 410 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 411 next_service_stream); 412 stream->stream_flags |= STREAM_CALL_ONCLOSE; 413 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 414 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 415 } 416} 417 418 419void 420lsquic_stream_call_on_close (lsquic_stream_t *stream) 421{ 422 assert(stream->stream_flags & STREAM_ONNEW_DONE); 423 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 424 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 425 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 426 next_service_stream); 427 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 428 { 429 LSQ_DEBUG("calling on_close for stream %u", stream->id); 430 stream->stream_flags |= STREAM_ONCLOSE_DONE; 431 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 432 stream->stream_if->on_close(stream, stream->st_ctx); 433 } 434 else 435 assert(0); 436} 437 438 439int 440lsquic_stream_readable (const lsquic_stream_t *stream) 441{ 442 /* A stream is readable if one of the following is true: */ 443 return 444 /* - It is already finished: in that case, lsquic_stream_read() will 445 * return 0. 446 */ 447 (stream->stream_flags & STREAM_FIN_REACHED) 448 /* - The stream is reset, by either side. In this case, 449 * lsquic_stream_read() will return -1 (we want the user to be 450 * able to collect the error). 451 */ 452 || (stream->stream_flags & STREAM_RST_FLAGS) 453 /* - Either we are not in HTTP mode or the HTTP headers have been 454 * received and the headers or data from the stream can be read. 455 */ 456 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 457 == STREAM_USE_HEADERS) 458 && (stream->uh != NULL 459 || stream->data_in->di_if->di_get_frame(stream->data_in, 460 stream->read_offset))) 461 ; 462} 463 464 465size_t 466lsquic_stream_write_avail (const struct lsquic_stream *stream) 467{ 468 uint64_t stream_avail, conn_avail; 469 470 stream_avail = stream->max_send_off - stream->tosend_off 471 - stream->sm_n_buffered; 472 if (stream->stream_flags & STREAM_CONN_LIMITED) 473 { 474 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 475 if (conn_avail < stream_avail) 476 return conn_avail; 477 } 478 479 return stream_avail; 480} 481 482 483int 484lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 485{ 486 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 487 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 488 { 489 return -1; 490 } 491 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 492 { 493 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 494 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 495 next_send_stream); 496 stream->stream_flags |= STREAM_SEND_WUF; 497 } 498 return 0; 499} 500 501 502int 503lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 504{ 505 uint64_t max_off; 506 int got_next_offset; 507 enum ins_frame ins_frame; 508 509 assert(frame->packet_in); 510 511 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 512 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 513 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 514 515 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 516 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 517 { 518 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 519 lsquic_malo_put(frame); 520 return -1; 521 } 522 523 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 524 insert_frame: 525 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 526 if (INS_FRAME_OK == ins_frame) 527 { 528 /* Update maximum offset in the flow controller and check for flow 529 * control violation: 530 */ 531 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 532 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 533 return -1; 534 if (frame->data_frame.df_fin) 535 { 536 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 537 stream->stream_flags |= STREAM_FIN_RECVD; 538 maybe_finish_stream(stream); 539 } 540 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 541 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 542 { 543 stream->data_in = stream->data_in->di_if->di_switch_impl( 544 stream->data_in, stream->read_offset); 545 if (!stream->data_in) 546 { 547 stream->data_in = data_in_error_new(); 548 return -1; 549 } 550 } 551 if (got_next_offset) 552 /* Checking the offset saves di_get_frame() call */ 553 maybe_conn_to_tickable_if_readable(stream); 554 return 0; 555 } 556 else if (INS_FRAME_DUP == ins_frame) 557 { 558 return 0; 559 } 560 else if (INS_FRAME_OVERLAP == ins_frame) 561 { 562 LSQ_DEBUG("overlap: switching DATA IN implementation"); 563 stream->data_in = stream->data_in->di_if->di_switch_impl( 564 stream->data_in, stream->read_offset); 565 if (stream->data_in) 566 goto insert_frame; 567 stream->data_in = data_in_error_new(); 568 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 569 lsquic_malo_put(frame); 570 return -1; 571 } 572 else 573 { 574 assert(INS_FRAME_ERR == ins_frame); 575 return -1; 576 } 577} 578 579 580static void 581drop_frames_in (lsquic_stream_t *stream) 582{ 583 if (stream->data_in) 584 { 585 stream->data_in->di_if->di_destroy(stream->data_in); 586 /* To avoid checking whether `data_in` is set, just set to the error 587 * data-in stream. It does the right thing after incoming data is 588 * dropped. 589 */ 590 stream->data_in = data_in_error_new(); 591 } 592} 593 594 595static void 596maybe_elide_stream_frames (struct lsquic_stream *stream) 597{ 598 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 599 { 600 if (stream->n_unacked) 601 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 602 stream->id); 603 stream->stream_flags |= STREAM_FRAMES_ELIDED; 604 } 605} 606 607 608int 609lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 610 uint32_t error_code) 611{ 612 613 if (stream->stream_flags & STREAM_RST_RECVD) 614 { 615 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 616 return 0; 617 } 618 619 SM_HISTORY_APPEND(stream, SHE_RST_IN); 620 /* This flag must always be set, even if we are "ignoring" it: it is 621 * used by elision code. 622 */ 623 stream->stream_flags |= STREAM_RST_RECVD; 624 625 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 626 { 627 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 628 "smaller than that of byte following the last byte we have seen: " 629 "0x%"PRIX64, stream->id, offset, 630 lsquic_sfcw_get_max_recv_off(&stream->fc)); 631 return -1; 632 } 633 634 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 635 { 636 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 637 " violates flow control", stream->id, offset); 638 return -1; 639 } 640 641 /* Let user collect error: */ 642 maybe_conn_to_tickable_if_readable(stream); 643 644 lsquic_sfcw_consume_rem(&stream->fc); 645 drop_frames_in(stream); 646 drop_buffered_data(stream); 647 maybe_elide_stream_frames(stream); 648 649 if (!(stream->stream_flags & 650 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 651 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 652 653 stream->stream_flags |= STREAM_RST_RECVD; 654 655 maybe_finish_stream(stream); 656 maybe_schedule_call_on_close(stream); 657 658 return 0; 659} 660 661 662uint64_t 663lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 664{ 665 assert(stream->stream_flags & STREAM_SEND_WUF); 666 stream->stream_flags &= ~STREAM_SEND_WUF; 667 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 668 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 669 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 670} 671 672 673void 674lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 675{ 676 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 677 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 678 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 679 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 680 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 681} 682 683 684void 685lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 686{ 687 assert(stream->stream_flags & STREAM_SEND_RST); 688 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 689 stream->stream_flags &= ~STREAM_SEND_RST; 690 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 691 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 692 stream->stream_flags |= STREAM_RST_SENT; 693 maybe_finish_stream(stream); 694} 695 696 697static size_t 698read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 699{ 700 struct http1x_headers *h1h = stream->uh->uh_hset; 701 size_t n_avail = h1h->h1h_size - h1h->h1h_off; 702 if (n_avail < len) 703 len = n_avail; 704 memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len); 705 h1h->h1h_off += len; 706 if (h1h->h1h_off == h1h->h1h_size) 707 { 708 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 709 destroy_uh(stream); 710 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 711 { 712 stream->stream_flags |= STREAM_FIN_REACHED; 713 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 714 } 715 } 716 return len; 717} 718 719 720/* This function returns 0 when EOF is reached. 721 */ 722ssize_t 723lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 724 int iovcnt) 725{ 726 size_t total_nread, nread; 727 int processed_frames, read_unc_headers, iovidx; 728 unsigned char *p, *end; 729 730 SM_HISTORY_APPEND(stream, SHE_USER_READ); 731 732#define NEXT_IOV() do { \ 733 ++iovidx; \ 734 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 735 ++iovidx; \ 736 if (iovidx < iovcnt) \ 737 { \ 738 p = iov[iovidx].iov_base; \ 739 end = p + iov[iovidx].iov_len; \ 740 } \ 741 else \ 742 p = end = NULL; \ 743} while (0) 744 745#define AVAIL() (end - p) 746 747 if (stream->stream_flags & STREAM_RST_FLAGS) 748 { 749 errno = ECONNRESET; 750 return -1; 751 } 752 if (stream->stream_flags & STREAM_U_READ_DONE) 753 { 754 errno = EBADF; 755 return -1; 756 } 757 if (stream->stream_flags & STREAM_FIN_REACHED) 758 return 0; 759 760 total_nread = 0; 761 processed_frames = 0; 762 763 iovidx = -1; 764 NEXT_IOV(); 765 766 if (stream->uh) 767 { 768 if (stream->uh->uh_flags & UH_H1H) 769 { 770 if (AVAIL()) 771 { 772 read_unc_headers = 1; 773 do 774 { 775 nread = read_uh(stream, p, AVAIL()); 776 p += nread; 777 total_nread += nread; 778 if (p == end) 779 NEXT_IOV(); 780 } 781 while (stream->uh && AVAIL()); 782 } 783 else 784 read_unc_headers = 0; 785 } 786 else 787 { 788 LSQ_INFO("header set not claimed: cannot read from stream"); 789 return -1; 790 } 791 } 792 else 793 read_unc_headers = 0; 794 795 struct data_frame *data_frame; 796 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 797 { 798 ++processed_frames; 799 size_t navail = data_frame->df_size - data_frame->df_read_off; 800 size_t ntowrite = AVAIL(); 801 if (navail < ntowrite) 802 ntowrite = navail; 803 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 804 p += ntowrite; 805 data_frame->df_read_off += ntowrite; 806 stream->read_offset += ntowrite; 807 total_nread += ntowrite; 808 if (data_frame->df_read_off == data_frame->df_size) 809 { 810 const int fin = data_frame->df_fin; 811 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 812 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 813 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 814 { 815 stream->data_in = stream->data_in->di_if->di_switch_impl( 816 stream->data_in, stream->read_offset); 817 if (!stream->data_in) 818 { 819 stream->data_in = data_in_error_new(); 820 return -1; 821 } 822 } 823 if (fin) 824 { 825 stream->stream_flags |= STREAM_FIN_REACHED; 826 break; 827 } 828 } 829 if (p == end) 830 NEXT_IOV(); 831 } 832 833 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 834 total_nread, stream->read_offset); 835 836 if (processed_frames) 837 { 838 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 839 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 840 { 841 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 842 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 843 stream->stream_flags |= STREAM_SEND_WUF; 844 maybe_conn_to_tickable_if_writeable(stream, 1); 845 } 846 } 847 848 if (processed_frames || read_unc_headers) 849 { 850 return total_nread; 851 } 852 else 853 { 854 assert(0 == total_nread); 855 errno = EWOULDBLOCK; 856 return -1; 857 } 858} 859 860 861ssize_t 862lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 863{ 864 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 865 return lsquic_stream_readv(stream, &iov, 1); 866} 867 868 869static void 870stream_shutdown_read (lsquic_stream_t *stream) 871{ 872 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 873 { 874 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 875 stream->stream_flags |= STREAM_U_READ_DONE; 876 stream_wantread(stream, 0); 877 maybe_finish_stream(stream); 878 } 879} 880 881 882static void 883stream_shutdown_write (lsquic_stream_t *stream) 884{ 885 if (stream->stream_flags & STREAM_U_WRITE_DONE) 886 return; 887 888 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 889 stream->stream_flags |= STREAM_U_WRITE_DONE; 890 stream_wantwrite(stream, 0); 891 892 /* Don't bother to check whether there is anything else to write if 893 * the flags indicate that nothing else should be written. 894 */ 895 if (!(stream->stream_flags & 896 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT))) 897 { 898 if (stream->sm_n_buffered == 0) 899 { 900 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 901 stream)) 902 { 903 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 904 stream->stream_flags |= STREAM_FIN_SENT; 905 } 906 else 907 { 908 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 909 "flag in it"); 910 (void) stream_flush_nocheck(stream); 911 } 912 } 913 else 914 (void) stream_flush_nocheck(stream); 915 } 916} 917 918 919int 920lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 921{ 922 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 923 if (lsquic_stream_is_closed(stream)) 924 { 925 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 926 errno = EBADF; 927 return -1; 928 } 929 /* 0: read, 1: write: 2: read and write 930 */ 931 if (how < 0 || how > 2) 932 { 933 errno = EINVAL; 934 return -1; 935 } 936 937 if (how) 938 stream_shutdown_write(stream); 939 if (how != 1) 940 stream_shutdown_read(stream); 941 942 maybe_finish_stream(stream); 943 maybe_schedule_call_on_close(stream); 944 if (how) 945 maybe_conn_to_tickable_if_writeable(stream, 1); 946 947 return 0; 948} 949 950 951void 952lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 953{ 954 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 955 if (LSQUIC_STREAM_HANDSHAKE == stream->id 956 || ((stream->stream_flags & STREAM_USE_HEADERS) && 957 LSQUIC_STREAM_HEADERS == stream->id)) 958 { 959 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 960 stream->stream_flags |= STREAM_FORCE_FINISH; 961 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 962 } 963 maybe_finish_stream(stream); 964 maybe_schedule_call_on_close(stream); 965} 966 967 968static void 969fake_reset_unused_stream (lsquic_stream_t *stream) 970{ 971 stream->stream_flags |= 972 STREAM_RST_RECVD /* User will pick this up on read or write */ 973 | STREAM_RST_SENT /* Don't send anything else on this stream */ 974 ; 975 976 /* Cancel all writes to the network scheduled for this stream: */ 977 if (stream->stream_flags & STREAM_SENDING_FLAGS) 978 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 979 next_send_stream); 980 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 981 982 LSQ_DEBUG("fake-reset stream %u%s", 983 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 984 maybe_finish_stream(stream); 985 maybe_schedule_call_on_close(stream); 986} 987 988 989/* This function should only be called for locally-initiated streams whose ID 990 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 991 * frame sent by peer but we have not yet received it and created a stream. 992 * In this situation, we mark the stream as reset, so that user's on_read or 993 * on_write event callback picks up the error. That, in turn, should result 994 * in stream being closed. 995 * 996 * If we have received any data frames on this stream, this probably indicates 997 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 998 * lower than this. However, we still try to handle it gracefully and peform 999 * a shutdown, as if the stream was not reset. 1000 */ 1001void 1002lsquic_stream_received_goaway (lsquic_stream_t *stream) 1003{ 1004 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1005 if (0 == stream->read_offset && 1006 stream->data_in->di_if->di_empty(stream->data_in)) 1007 fake_reset_unused_stream(stream); /* Normal condition */ 1008 else 1009 { /* This is odd, let's handle it the best we can: */ 1010 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1011 lsquic_stream_shutdown_internal(stream); 1012 } 1013} 1014 1015 1016uint64_t 1017lsquic_stream_read_offset (const lsquic_stream_t *stream) 1018{ 1019 return stream->read_offset; 1020} 1021 1022 1023static int 1024stream_wantread (lsquic_stream_t *stream, int is_want) 1025{ 1026 const int old_val = !!(stream->stream_flags & STREAM_WANT_READ); 1027 const int new_val = !!is_want; 1028 if (old_val != new_val) 1029 { 1030 if (new_val) 1031 { 1032 if (!old_val) 1033 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1034 next_read_stream); 1035 stream->stream_flags |= STREAM_WANT_READ; 1036 } 1037 else 1038 { 1039 stream->stream_flags &= ~STREAM_WANT_READ; 1040 if (old_val) 1041 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1042 next_read_stream); 1043 } 1044 } 1045 return old_val; 1046} 1047 1048 1049static void 1050maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1051{ 1052 assert(STREAM_WRITE_Q_FLAGS & flag); 1053 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1054 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1055 next_write_stream); 1056 stream->stream_flags |= flag; 1057} 1058 1059 1060static void 1061maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag) 1062{ 1063 assert(STREAM_WRITE_Q_FLAGS & flag); 1064 if (stream->stream_flags & flag) 1065 { 1066 stream->stream_flags &= ~flag; 1067 if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS)) 1068 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1069 next_write_stream); 1070 } 1071} 1072 1073 1074static int 1075stream_wantwrite (lsquic_stream_t *stream, int is_want) 1076{ 1077 const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE); 1078 const int new_val = !!is_want; 1079 if (old_val != new_val) 1080 { 1081 if (new_val) 1082 maybe_put_onto_write_q(stream, STREAM_WANT_WRITE); 1083 else 1084 maybe_remove_from_write_q(stream, STREAM_WANT_WRITE); 1085 } 1086 return old_val; 1087} 1088 1089 1090int 1091lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1092{ 1093 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1094 { 1095 if (is_want) 1096 maybe_conn_to_tickable_if_readable(stream); 1097 return stream_wantread(stream, is_want); 1098 } 1099 else 1100 { 1101 errno = EBADF; 1102 return -1; 1103 } 1104} 1105 1106 1107int 1108lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1109{ 1110 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1111 { 1112 if (is_want) 1113 maybe_conn_to_tickable_if_writeable(stream, 1); 1114 return stream_wantwrite(stream, is_want); 1115 } 1116 else 1117 { 1118 errno = EBADF; 1119 return -1; 1120 } 1121} 1122 1123 1124#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1125 STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1126 1127 1128static void 1129stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1130{ 1131 unsigned no_progress_count, no_progress_limit; 1132 enum stream_flags flags; 1133 uint64_t size; 1134 1135 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1136 1137 no_progress_count = 0; 1138 while ((stream->stream_flags & STREAM_WANT_READ) 1139 && lsquic_stream_readable(stream)) 1140 { 1141 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1142 size = stream->read_offset; 1143 1144 stream->stream_if->on_read(stream, stream->st_ctx); 1145 1146 if (no_progress_limit && size == stream->read_offset && 1147 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1148 { 1149 ++no_progress_count; 1150 if (no_progress_count >= no_progress_limit) 1151 { 1152 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1153 "progress) in user code reading from stream", 1154 no_progress_count, 1155 no_progress_count == 1 ? "" : "s"); 1156 break; 1157 } 1158 } 1159 else 1160 no_progress_count = 0; 1161 } 1162} 1163 1164 1165static void 1166stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1167{ 1168 unsigned no_progress_count, no_progress_limit; 1169 enum stream_flags flags; 1170 1171 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1172 1173 no_progress_count = 0; 1174 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1175 while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) 1176 == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) 1177 && lsquic_stream_write_avail(stream)) 1178 { 1179 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1180 1181 stream->stream_if->on_write(stream, stream->st_ctx); 1182 1183 if (no_progress_limit && 1184 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1185 { 1186 ++no_progress_count; 1187 if (no_progress_count >= no_progress_limit) 1188 { 1189 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1190 "progress) in user code writing to stream", 1191 no_progress_count, 1192 no_progress_count == 1 ? "" : "s"); 1193 break; 1194 } 1195 } 1196 else 1197 no_progress_count = 0; 1198 } 1199} 1200 1201 1202static void 1203stream_dispatch_read_events_once (lsquic_stream_t *stream) 1204{ 1205 if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) 1206 { 1207 stream->stream_if->on_read(stream, stream->st_ctx); 1208 } 1209} 1210 1211 1212static void 1213maybe_mark_as_blocked (lsquic_stream_t *stream) 1214{ 1215 struct lsquic_conn_cap *cc; 1216 1217 if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered) 1218 { 1219 if (stream->blocked_off < stream->max_send_off) 1220 { 1221 stream->blocked_off = stream->max_send_off + stream->sm_n_buffered; 1222 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1223 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1224 next_send_stream); 1225 stream->stream_flags |= STREAM_SEND_BLOCKED; 1226 LSQ_DEBUG("marked stream-blocked at stream offset " 1227 "%"PRIu64, stream->blocked_off); 1228 } 1229 else 1230 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1231 " has been, or is about to be, sent", stream->blocked_off); 1232 } 1233 1234 if ((stream->stream_flags & STREAM_CONN_LIMITED) 1235 && (cc = &stream->conn_pub->conn_cap, 1236 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1237 { 1238 if (cc->cc_blocked < cc->cc_max) 1239 { 1240 cc->cc_blocked = cc->cc_max; 1241 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1242 LSQ_DEBUG("marked connection-blocked at connection offset " 1243 "%"PRIu64, cc->cc_max); 1244 } 1245 else 1246 LSQ_DEBUG("stream has already been marked connection-blocked " 1247 "at offset %"PRIu64, cc->cc_blocked); 1248 } 1249} 1250 1251 1252void 1253lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1254{ 1255 assert(stream->stream_flags & STREAM_WANT_READ); 1256 1257 if (stream->stream_flags & STREAM_RW_ONCE) 1258 stream_dispatch_read_events_once(stream); 1259 else 1260 stream_dispatch_read_events_loop(stream); 1261} 1262 1263 1264void 1265lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1266{ 1267 int progress; 1268 uint64_t tosend_off; 1269 unsigned short n_buffered; 1270 enum stream_flags flags; 1271 1272 assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS); 1273 flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS; 1274 tosend_off = stream->tosend_off; 1275 n_buffered = stream->sm_n_buffered; 1276 1277 if (stream->stream_flags & STREAM_WANT_FLUSH) 1278 (void) stream_flush(stream); 1279 1280 if (stream->stream_flags & STREAM_RW_ONCE) 1281 { 1282 if ((stream->stream_flags & STREAM_WANT_WRITE) 1283 && lsquic_stream_write_avail(stream)) 1284 { 1285 stream->stream_if->on_write(stream, stream->st_ctx); 1286 } 1287 } 1288 else 1289 stream_dispatch_write_events_loop(stream); 1290 1291 /* Progress means either flags or offsets changed: */ 1292 progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags && 1293 stream->tosend_off == tosend_off && 1294 stream->sm_n_buffered == n_buffered); 1295 1296 if (stream->stream_flags & STREAM_WRITE_Q_FLAGS) 1297 { 1298 if (progress) 1299 { /* Move the stream to the end of the list to ensure fairness. */ 1300 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1301 next_write_stream); 1302 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1303 next_write_stream); 1304 } 1305 } 1306} 1307 1308 1309static size_t 1310inner_reader_empty_size (void *ctx) 1311{ 1312 return 0; 1313} 1314 1315 1316static size_t 1317inner_reader_empty_read (void *ctx, void *buf, size_t count) 1318{ 1319 return 0; 1320} 1321 1322 1323static int 1324stream_flush (lsquic_stream_t *stream) 1325{ 1326 struct lsquic_reader empty_reader; 1327 ssize_t nw; 1328 1329 assert(stream->stream_flags & STREAM_WANT_FLUSH); 1330 assert(stream->sm_n_buffered > 0 || 1331 /* Flushing is also used to packetize standalone FIN: */ 1332 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 1333 == STREAM_U_WRITE_DONE)); 1334 1335 empty_reader.lsqr_size = inner_reader_empty_size; 1336 empty_reader.lsqr_read = inner_reader_empty_read; 1337 empty_reader.lsqr_ctx = NULL; /* pro forma */ 1338 nw = stream_write_to_packets(stream, &empty_reader, 0); 1339 1340 if (nw >= 0) 1341 { 1342 assert(nw == 0); /* Empty reader: must have read zero bytes */ 1343 return 0; 1344 } 1345 else 1346 return -1; 1347} 1348 1349 1350static int 1351stream_flush_nocheck (lsquic_stream_t *stream) 1352{ 1353 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered; 1354 maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH); 1355 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 1356 1357 return stream_flush(stream); 1358} 1359 1360 1361int 1362lsquic_stream_flush (lsquic_stream_t *stream) 1363{ 1364 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1365 { 1366 LSQ_DEBUG("cannot flush closed stream"); 1367 errno = EBADF; 1368 return -1; 1369 } 1370 1371 if (0 == stream->sm_n_buffered) 1372 { 1373 LSQ_DEBUG("flushing 0 bytes: noop"); 1374 return 0; 1375 } 1376 1377 return stream_flush_nocheck(stream); 1378} 1379 1380 1381/* The flush threshold is the maximum size of stream data that can be sent 1382 * in a full packet. 1383 */ 1384#ifdef NDEBUG 1385static 1386#endif 1387 size_t 1388lsquic_stream_flush_threshold (const struct lsquic_stream *stream) 1389{ 1390 enum packet_out_flags flags; 1391 enum packno_bits bits; 1392 unsigned packet_header_sz, stream_header_sz; 1393 size_t threshold; 1394 1395 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 1396 flags = bits << POBIT_SHIFT; 1397 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 1398 flags |= PO_CONN_ID; 1399 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 1400 flags |= PO_LONGHEAD; 1401 1402 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags); 1403 stream_header_sz = stream->conn_pub->lconn->cn_pf 1404 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1405 1406 threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ 1407 - packet_header_sz - stream_header_sz; 1408 return threshold; 1409} 1410 1411 1412#define COMMON_WRITE_CHECKS() do { \ 1413 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1414 == STREAM_USE_HEADERS) \ 1415 { \ 1416 LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \ 1417 errno = EILSEQ; \ 1418 return -1; \ 1419 } \ 1420 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1421 { \ 1422 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1423 errno = ECONNRESET; \ 1424 return -1; \ 1425 } \ 1426 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1427 { \ 1428 LSQ_INFO("Attempt to write to stream after it was closed for " \ 1429 "writing"); \ 1430 errno = EBADF; \ 1431 return -1; \ 1432 } \ 1433} while (0) 1434 1435 1436struct frame_gen_ctx 1437{ 1438 lsquic_stream_t *fgc_stream; 1439 struct lsquic_reader *fgc_reader; 1440 /* We keep our own count of how many bytes were read from reader because 1441 * some readers are external. The external caller does not have to rely 1442 * on our count, but it can. 1443 */ 1444 size_t fgc_nread_from_reader; 1445}; 1446 1447 1448static size_t 1449frame_gen_size (void *ctx) 1450{ 1451 struct frame_gen_ctx *fg_ctx = ctx; 1452 size_t available, remaining; 1453 1454 /* Make sure we are not writing past available size: */ 1455 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1456 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1457 if (available < remaining) 1458 remaining = available; 1459 1460 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 1461} 1462 1463 1464static int 1465frame_gen_fin (void *ctx) 1466{ 1467 struct frame_gen_ctx *fg_ctx = ctx; 1468 return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE 1469 && 0 == fg_ctx->fgc_stream->sm_n_buffered 1470 /* Do not use frame_gen_size() as it may chop the real size: */ 1471 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 1472} 1473 1474 1475static void 1476incr_conn_cap (struct lsquic_stream *stream, size_t incr) 1477{ 1478 if (stream->stream_flags & STREAM_CONN_LIMITED) 1479 { 1480 stream->conn_pub->conn_cap.cc_sent += incr; 1481 assert(stream->conn_pub->conn_cap.cc_sent 1482 <= stream->conn_pub->conn_cap.cc_max); 1483 } 1484} 1485 1486 1487static size_t 1488frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 1489{ 1490 struct frame_gen_ctx *fg_ctx = ctx; 1491 unsigned char *p = begin_buf; 1492 unsigned char *const end = p + len; 1493 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1494 size_t n_written, available, n_to_write; 1495 1496 if (stream->sm_n_buffered > 0) 1497 { 1498 if (len <= stream->sm_n_buffered) 1499 { 1500 memcpy(p, stream->sm_buf, len); 1501 memmove(stream->sm_buf, stream->sm_buf + len, 1502 stream->sm_n_buffered - len); 1503 stream->sm_n_buffered -= len; 1504 stream->tosend_off += len; 1505 *fin = frame_gen_fin(fg_ctx); 1506 return len; 1507 } 1508 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 1509 p += stream->sm_n_buffered; 1510 stream->sm_n_buffered = 0; 1511 } 1512 1513 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 1514 n_to_write = end - p; 1515 if (n_to_write > available) 1516 n_to_write = available; 1517 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 1518 n_to_write); 1519 p += n_written; 1520 fg_ctx->fgc_nread_from_reader += n_written; 1521 *fin = frame_gen_fin(fg_ctx); 1522 stream->tosend_off += p - (const unsigned char *) begin_buf; 1523 incr_conn_cap(stream, n_written); 1524 return p - (const unsigned char *) begin_buf; 1525} 1526 1527 1528static void 1529check_flush_threshold (lsquic_stream_t *stream) 1530{ 1531 if ((stream->stream_flags & STREAM_WANT_FLUSH) && 1532 stream->tosend_off >= stream->sm_flush_to) 1533 { 1534 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 1535 stream->sm_flush_to); 1536 maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH); 1537 } 1538} 1539 1540 1541static struct lsquic_packet_out * 1542get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least, 1543 const struct lsquic_stream *stream) 1544{ 1545 return lsquic_send_ctl_new_packet_out(ctl, need_at_least); 1546} 1547 1548 1549static struct lsquic_packet_out * (* const get_packet[])( 1550 struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) = 1551{ 1552 lsquic_send_ctl_get_packet_for_stream, 1553 get_brand_new_packet, 1554}; 1555 1556 1557static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR } 1558stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size) 1559{ 1560 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 1561 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 1562 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 1563 unsigned stream_header_sz, need_at_least, off; 1564 lsquic_packet_out_t *packet_out; 1565 int len, s, hsk; 1566 1567 if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 1568 == STREAM_HEADERS_SENT 1569 && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream)) 1570 { 1571 struct lsquic_stream *const headers_stream 1572 = lsquic_headers_stream_get_stream(stream->conn_pub->hs); 1573 if (lsquic_stream_has_data_to_flush(headers_stream)) 1574 { 1575 LSQ_DEBUG("flushing headers stream before potential write to a " 1576 "buffered packet"); 1577 (void) lsquic_stream_flush(headers_stream); 1578 } 1579 else 1580 /* Some other stream must have flushed it: this means our headers 1581 * are flushed. 1582 */ 1583 stream->stream_flags |= STREAM_HDRS_FLUSHED; 1584 } 1585 1586 stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id, 1587 stream->tosend_off); 1588 need_at_least = stream_header_sz + (size > 0); 1589 hsk = LSQUIC_STREAM_HANDSHAKE == stream->id; 1590 get_packet: 1591 packet_out = get_packet[hsk](send_ctl, need_at_least, stream); 1592 if (!packet_out) 1593 return SWTP_STOP; 1594 if (hsk) 1595 packet_out->po_header_type = stream->tosend_off == 0 1596 ? HETY_INITIAL : HETY_HANDSHAKE; 1597 1598#if LSQUIC_CONN_STATS 1599 const uint64_t begin_off = stream->tosend_off; 1600#endif 1601 off = packet_out->po_data_sz; 1602 len = pf->pf_gen_stream_frame( 1603 packet_out->po_data + packet_out->po_data_sz, 1604 lsquic_packet_out_avail(packet_out), stream->id, 1605 stream->tosend_off, 1606 frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx); 1607 if (len < 0) 1608 { 1609 if (-len > (int) need_at_least) 1610 { 1611 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 1612 "%u bytes, will try again", -len, need_at_least); 1613 need_at_least = -len; 1614 goto get_packet; 1615 } 1616 else 1617 { 1618 LSQ_ERROR("could not generate stream frame"); 1619 return SWTP_ERROR; 1620 } 1621 } 1622 1623#if LSQUIC_CONN_STATS 1624 stream->conn_pub->conn_stats->out.stream_frames += 1; 1625 stream->conn_pub->conn_stats->out.stream_data_sz 1626 += stream->tosend_off - begin_off; 1627#endif 1628 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 1629 packet_out->po_data + packet_out->po_data_sz, len); 1630 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 1631 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1632 if (0 == lsquic_packet_out_avail(packet_out)) 1633 packet_out->po_flags |= PO_STREAM_END; 1634 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 1635 stream, QUIC_FRAME_STREAM, off, len); 1636 if (s != 0) 1637 { 1638 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 1639 return SWTP_ERROR; 1640 } 1641 1642 check_flush_threshold(stream); 1643 1644 /* XXX: I don't like it that this is here */ 1645 if (hsk && !(packet_out->po_flags & PO_HELLO)) 1646 { 1647 lsquic_packet_out_zero_pad(packet_out); 1648 packet_out->po_flags |= PO_HELLO; 1649 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 1650 } 1651 1652 return SWTP_OK; 1653} 1654 1655 1656static void 1657abort_connection (struct lsquic_stream *stream) 1658{ 1659 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 1660 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 1661 next_service_stream); 1662 stream->stream_flags |= STREAM_ABORT_CONN; 1663 LSQ_WARN("connection will be aborted"); 1664 maybe_conn_to_tickable(stream); 1665} 1666 1667 1668static ssize_t 1669stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 1670 size_t thresh) 1671{ 1672 size_t size; 1673 ssize_t nw; 1674 unsigned seen_ok; 1675 struct frame_gen_ctx fg_ctx = { 1676 .fgc_stream = stream, 1677 .fgc_reader = reader, 1678 .fgc_nread_from_reader = 0, 1679 }; 1680 1681 seen_ok = 0; 1682 while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0) 1683 || frame_gen_fin(&fg_ctx)) 1684 { 1685 switch (stream_write_to_packet(&fg_ctx, size)) 1686 { 1687 case SWTP_OK: 1688 if (!seen_ok++) 1689 maybe_conn_to_tickable_if_writeable(stream, 0); 1690 if (frame_gen_fin(&fg_ctx)) 1691 { 1692 stream->stream_flags |= STREAM_FIN_SENT; 1693 goto end; 1694 } 1695 else 1696 break; 1697 case SWTP_STOP: 1698 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1699 goto end; 1700 default: 1701 abort_connection(stream); 1702 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 1703 return -1; 1704 } 1705 } 1706 1707 if (thresh) 1708 { 1709 assert(size < thresh); 1710 assert(size >= stream->sm_n_buffered); 1711 size -= stream->sm_n_buffered; 1712 if (size > 0) 1713 { 1714 nw = save_to_buffer(stream, reader, size); 1715 if (nw < 0) 1716 return -1; 1717 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 1718 } 1719 } 1720 else 1721 { 1722 /* We count flushed data towards both stream and connection limits, 1723 * so we should have been able to packetize all of it: 1724 */ 1725 assert(0 == stream->sm_n_buffered); 1726 assert(size == 0); 1727 } 1728 1729 maybe_mark_as_blocked(stream); 1730 1731 end: 1732 return fg_ctx.fgc_nread_from_reader; 1733} 1734 1735 1736/* Perform an implicit flush when we hit connection limit while buffering 1737 * data. This is to prevent a (theoretical) stall: 1738 * 1739 * Imagine a number of streams, all of which buffered some data. The buffered 1740 * data is up to connection cap, which means no further writes are possible. 1741 * None of them flushes, which means that data is not sent and connection 1742 * WINDOW_UPDATE frame never arrives from peer. Stall. 1743 */ 1744static int 1745maybe_flush_stream (struct lsquic_stream *stream) 1746{ 1747 if (stream->sm_n_buffered > 0 1748 && (stream->stream_flags & STREAM_CONN_LIMITED) 1749 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 1750 return stream_flush_nocheck(stream); 1751 else 1752 return 0; 1753} 1754 1755 1756static ssize_t 1757save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 1758 size_t len) 1759{ 1760 size_t avail, n_written; 1761 1762 assert(stream->sm_n_buffered + len <= SM_BUF_SIZE); 1763 1764 if (!stream->sm_buf) 1765 { 1766 stream->sm_buf = malloc(SM_BUF_SIZE); 1767 if (!stream->sm_buf) 1768 return -1; 1769 } 1770 1771 avail = lsquic_stream_write_avail(stream); 1772 if (avail < len) 1773 len = avail; 1774 1775 n_written = reader->lsqr_read(reader->lsqr_ctx, 1776 stream->sm_buf + stream->sm_n_buffered, len); 1777 stream->sm_n_buffered += n_written; 1778 incr_conn_cap(stream, n_written); 1779 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 1780 n_written, stream->sm_n_buffered); 1781 if (0 != maybe_flush_stream(stream)) 1782 return -1; 1783 return n_written; 1784} 1785 1786 1787static ssize_t 1788stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 1789{ 1790 size_t thresh, len; 1791 1792 thresh = lsquic_stream_flush_threshold(stream); 1793 len = reader->lsqr_size(reader->lsqr_ctx); 1794 if (stream->sm_n_buffered + len <= SM_BUF_SIZE && 1795 stream->sm_n_buffered + len < thresh) 1796 return save_to_buffer(stream, reader, len); 1797 else 1798 return stream_write_to_packets(stream, reader, thresh); 1799} 1800 1801 1802ssize_t 1803lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1804{ 1805 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 1806 return lsquic_stream_writev(stream, &iov, 1); 1807} 1808 1809 1810struct inner_reader_iovec { 1811 const struct iovec *iov; 1812 const struct iovec *end; 1813 unsigned cur_iovec_off; 1814}; 1815 1816 1817static size_t 1818inner_reader_iovec_read (void *ctx, void *buf, size_t count) 1819{ 1820 struct inner_reader_iovec *const iro = ctx; 1821 unsigned char *p = buf; 1822 unsigned char *const end = p + count; 1823 unsigned n_tocopy; 1824 1825 while (iro->iov < iro->end && p < end) 1826 { 1827 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 1828 if (n_tocopy > (unsigned) (end - p)) 1829 n_tocopy = end - p; 1830 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 1831 n_tocopy); 1832 p += n_tocopy; 1833 iro->cur_iovec_off += n_tocopy; 1834 if (iro->iov->iov_len == iro->cur_iovec_off) 1835 { 1836 ++iro->iov; 1837 iro->cur_iovec_off = 0; 1838 } 1839 } 1840 1841 return p + count - end; 1842} 1843 1844 1845static size_t 1846inner_reader_iovec_size (void *ctx) 1847{ 1848 struct inner_reader_iovec *const iro = ctx; 1849 const struct iovec *iov; 1850 size_t size; 1851 1852 size = 0; 1853 for (iov = iro->iov; iov < iro->end; ++iov) 1854 size += iov->iov_len; 1855 1856 return size - iro->cur_iovec_off; 1857} 1858 1859 1860ssize_t 1861lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1862 int iovcnt) 1863{ 1864 COMMON_WRITE_CHECKS(); 1865 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1866 1867 struct inner_reader_iovec iro = { 1868 .iov = iov, 1869 .end = iov + iovcnt, 1870 .cur_iovec_off = 0, 1871 }; 1872 struct lsquic_reader reader = { 1873 .lsqr_read = inner_reader_iovec_read, 1874 .lsqr_size = inner_reader_iovec_size, 1875 .lsqr_ctx = &iro, 1876 }; 1877 1878 return stream_write(stream, &reader); 1879} 1880 1881 1882ssize_t 1883lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 1884{ 1885 COMMON_WRITE_CHECKS(); 1886 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1887 return stream_write(stream, reader); 1888} 1889 1890 1891int 1892lsquic_stream_send_headers (lsquic_stream_t *stream, 1893 const lsquic_http_headers_t *headers, int eos) 1894{ 1895 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1896 STREAM_U_WRITE_DONE)) 1897 == STREAM_USE_HEADERS) 1898 { 1899 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1900 stream->id, headers, eos, lsquic_stream_priority(stream)); 1901 if (0 == s) 1902 { 1903 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1904 stream->stream_flags |= STREAM_HEADERS_SENT; 1905 if (eos) 1906 stream->stream_flags |= STREAM_FIN_SENT; 1907 LSQ_INFO("sent headers for stream %u", stream->id); 1908 } 1909 else 1910 LSQ_WARN("could not send headers: %s", strerror(errno)); 1911 return s; 1912 } 1913 else 1914 { 1915 LSQ_INFO("cannot send headers for stream %u in this state", stream->id); 1916 errno = EBADMSG; 1917 return -1; 1918 } 1919} 1920 1921 1922void 1923lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1924{ 1925 if (offset > stream->max_send_off) 1926 { 1927 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1928 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1929 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1930 stream->max_send_off = offset; 1931 } 1932 else 1933 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 1934 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 1935 stream->max_send_off); 1936} 1937 1938 1939/* This function is used to update offsets after handshake completes and we 1940 * learn of peer's limits from the handshake values. 1941 */ 1942int 1943lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 1944{ 1945 LSQ_DEBUG("setting max_send_off to %u", offset); 1946 if (offset > stream->max_send_off) 1947 { 1948 lsquic_stream_window_update(stream, offset); 1949 return 0; 1950 } 1951 else if (offset < stream->tosend_off) 1952 { 1953 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 1954 "already sent on this stream (%"PRIu64" bytes)", offset, 1955 stream->tosend_off); 1956 return -1; 1957 } 1958 else 1959 { 1960 stream->max_send_off = offset; 1961 return 0; 1962 } 1963} 1964 1965 1966void 1967lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 1968{ 1969 lsquic_stream_reset_ext(stream, error_code, 1); 1970} 1971 1972 1973void 1974lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 1975 int do_close) 1976{ 1977 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 1978 { 1979 LSQ_INFO("reset already sent"); 1980 return; 1981 } 1982 1983 SM_HISTORY_APPEND(stream, SHE_RESET); 1984 1985 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 1986 stream->error_code = error_code; 1987 1988 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1989 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1990 next_send_stream); 1991 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1992 stream->stream_flags |= STREAM_SEND_RST; 1993 1994 drop_buffered_data(stream); 1995 maybe_elide_stream_frames(stream); 1996 maybe_schedule_call_on_close(stream); 1997 1998 if (do_close) 1999 lsquic_stream_close(stream); 2000 else 2001 maybe_conn_to_tickable_if_writeable(stream, 1); 2002} 2003 2004 2005unsigned 2006lsquic_stream_id (const lsquic_stream_t *stream) 2007{ 2008 return stream->id; 2009} 2010 2011 2012struct lsquic_conn * 2013lsquic_stream_conn (const lsquic_stream_t *stream) 2014{ 2015 return stream->conn_pub->lconn; 2016} 2017 2018 2019int 2020lsquic_stream_close (lsquic_stream_t *stream) 2021{ 2022 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 2023 SM_HISTORY_APPEND(stream, SHE_CLOSE); 2024 if (lsquic_stream_is_closed(stream)) 2025 { 2026 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 2027 errno = EBADF; 2028 return -1; 2029 } 2030 stream_shutdown_write(stream); 2031 stream_shutdown_read(stream); 2032 maybe_schedule_call_on_close(stream); 2033 maybe_finish_stream(stream); 2034 maybe_conn_to_tickable_if_writeable(stream, 1); 2035 return 0; 2036} 2037 2038 2039#ifndef NDEBUG 2040#if __GNUC__ 2041__attribute__((weak)) 2042#endif 2043#endif 2044void 2045lsquic_stream_acked (lsquic_stream_t *stream) 2046{ 2047 assert(stream->n_unacked); 2048 --stream->n_unacked; 2049 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 2050 if (0 == stream->n_unacked) 2051 maybe_finish_stream(stream); 2052} 2053 2054 2055void 2056lsquic_stream_push_req (lsquic_stream_t *stream, 2057 struct uncompressed_headers *push_req) 2058{ 2059 assert(!stream->push_req); 2060 stream->push_req = push_req; 2061 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 2062} 2063 2064 2065int 2066lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2067{ 2068 return 1 & ~stream->id; 2069} 2070 2071 2072int 2073lsquic_stream_push_info (const lsquic_stream_t *stream, 2074 uint32_t *ref_stream_id, void **hset) 2075{ 2076 if (lsquic_stream_is_pushed(stream)) 2077 { 2078 assert(stream->push_req); 2079 *ref_stream_id = stream->push_req->uh_stream_id; 2080 *hset = stream->push_req->uh_hset; 2081 return 0; 2082 } 2083 else 2084 return -1; 2085} 2086 2087 2088int 2089lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2090{ 2091 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2092 { 2093 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2094 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2095 stream->stream_flags |= STREAM_HAVE_UH; 2096 if (uh->uh_flags & UH_FIN) 2097 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2098 stream->uh = uh; 2099 if (uh->uh_oth_stream_id == 0) 2100 { 2101 if (uh->uh_weight) 2102 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2103 } 2104 else 2105 LSQ_NOTICE("don't know how to depend on stream %u", 2106 uh->uh_oth_stream_id); 2107 return 0; 2108 } 2109 else 2110 { 2111 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2112 return -1; 2113 } 2114} 2115 2116 2117unsigned 2118lsquic_stream_priority (const lsquic_stream_t *stream) 2119{ 2120 return 256 - stream->sm_priority; 2121} 2122 2123 2124int 2125lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2126{ 2127 /* The user should never get a reference to the special streams, 2128 * but let's check just in case: 2129 */ 2130 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2131 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2132 LSQUIC_STREAM_HEADERS == stream->id)) 2133 return -1; 2134 if (priority < 1 || priority > 256) 2135 return -1; 2136 stream->sm_priority = 256 - priority; 2137 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 2138 LSQ_DEBUG("set priority to %u", priority); 2139 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2140 return 0; 2141} 2142 2143 2144int 2145lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2146{ 2147 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2148 { 2149 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2150 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2151 { 2152 /* We need to send headers only if we are a) using HEADERS stream 2153 * and b) we already sent initial headers. If initial headers 2154 * have not been sent yet, stream priority will be sent in the 2155 * HEADERS frame. 2156 */ 2157 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2158 stream->id, 0, 0, priority); 2159 } 2160 else 2161 return 0; 2162 } 2163 else 2164 return -1; 2165} 2166 2167 2168lsquic_stream_ctx_t * 2169lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2170{ 2171 return stream->st_ctx; 2172} 2173 2174 2175int 2176lsquic_stream_refuse_push (lsquic_stream_t *stream) 2177{ 2178 if (lsquic_stream_is_pushed(stream) && 2179 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2180 { 2181 LSQ_DEBUG("refusing pushed stream: send reset"); 2182 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2183 return 0; 2184 } 2185 else 2186 return -1; 2187} 2188 2189 2190size_t 2191lsquic_stream_mem_used (const struct lsquic_stream *stream) 2192{ 2193 size_t size; 2194 2195 size = sizeof(stream); 2196 if (stream->sm_buf) 2197 size += SM_BUF_SIZE; 2198 if (stream->data_in) 2199 size += stream->data_in->di_if->di_mem_used(stream->data_in); 2200 2201 return size; 2202} 2203 2204 2205lsquic_cid_t 2206lsquic_stream_cid (const struct lsquic_stream *stream) 2207{ 2208 return LSQUIC_LOG_CONN_ID; 2209} 2210 2211 2212void * 2213lsquic_stream_get_hset (struct lsquic_stream *stream) 2214{ 2215 void *hset; 2216 2217 if (stream->stream_flags & STREAM_RST_FLAGS) 2218 { 2219 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 2220 errno = ECONNRESET; 2221 return NULL; 2222 } 2223 2224 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2225 != (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 2226 { 2227 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 2228 stream->stream_flags); 2229 return NULL; 2230 } 2231 2232 if (!stream->uh) 2233 { 2234 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 2235 return NULL; 2236 } 2237 2238 if (stream->uh->uh_flags & UH_H1H) 2239 { 2240 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 2241 return NULL; 2242 } 2243 2244 hset = stream->uh->uh_hset; 2245 stream->uh->uh_hset = NULL; 2246 destroy_uh(stream); 2247 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 2248 { 2249 stream->stream_flags |= STREAM_FIN_REACHED; 2250 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 2251 } 2252 LSQ_DEBUG("return header set"); 2253 return hset; 2254} 2255