lsquic_stream.c revision 0ae3fccd
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <sys/types.h> 28#include <sys/stat.h> 29#include <fcntl.h> 30#include <unistd.h> 31#include <stddef.h> 32 33#include "lsquic.h" 34 35#include "lsquic_int_types.h" 36#include "lsquic_packet_common.h" 37#include "lsquic_packet_in.h" 38#include "lsquic_malo.h" 39#include "lsquic_conn_flow.h" 40#include "lsquic_rtt.h" 41#include "lsquic_sfcw.h" 42#include "lsquic_stream.h" 43#include "lsquic_conn_public.h" 44#include "lsquic_util.h" 45#include "lsquic_mm.h" 46#include "lsquic_headers_stream.h" 47#include "lsquic_frame_reader.h" 48#include "lsquic_conn.h" 49#include "lsquic_data_in_if.h" 50#include "lsquic_parse.h" 51#include "lsquic_packet_out.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_senhist.h" 54#include "lsquic_pacer.h" 55#include "lsquic_cubic.h" 56#include "lsquic_send_ctl.h" 57#include "lsquic_ev_log.h" 58 59#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 60#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 61#define LSQUIC_LOG_STREAM_ID stream->id 62#include "lsquic_logger.h" 63 64enum sbt_type { 65 SBT_BUF, 66 SBT_FILE, 67}; 68 69struct stream_buf_tosend 70{ 71 TAILQ_ENTRY(stream_buf_tosend) next_sbt; 72 enum sbt_type sbt_type; 73 /* On 64-bit platform, here is a four-byte hole */ 74 union { 75 struct { 76 size_t sbt_sz; 77 size_t sbt_off; 78 unsigned char sbt_data[ 79 0x1000 - sizeof(enum sbt_type) - sizeof(long) + 4 - sizeof(size_t) * 2 80 - sizeof(TAILQ_ENTRY(stream_buf_tosend)) 81 ]; 82 } buf; 83 struct { 84 struct lsquic_stream *sbt_stream; 85 off_t sbt_sz; 86 off_t sbt_off; 87 int sbt_fd; 88 signed char sbt_last; 89 } file; 90 } u; 91}; 92 93typedef char _sbt_is_4K[(sizeof(struct stream_buf_tosend) == 0x1000) - 1]; 94 95static size_t 96sum_sbts (const lsquic_stream_t *stream); 97 98static void 99drop_sbts (lsquic_stream_t *stream); 100 101static void 102drop_frames_in (lsquic_stream_t *stream); 103 104static void 105maybe_schedule_call_on_close (lsquic_stream_t *stream); 106 107static void 108stream_file_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *); 109 110static void 111stream_flush_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *); 112 113static int 114stream_wantread (lsquic_stream_t *stream, int is_want); 115 116static int 117stream_wantwrite (lsquic_stream_t *stream, int is_want); 118 119static void 120stop_reading_from_file (lsquic_stream_t *stream); 121 122static int 123stream_readable (const lsquic_stream_t *stream); 124 125static int 126stream_writeable (const lsquic_stream_t *stream); 127 128static size_t 129stream_flush_internal (lsquic_stream_t *stream, size_t size); 130 131static void 132incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr); 133 134static void 135maybe_put_on_sending_streams (lsquic_stream_t *stream); 136 137 138#if LSQUIC_KEEP_STREAM_HISTORY 139/* These values are printable ASCII characters for ease of printing the 140 * whole history in a single line of a log message. 141 * 142 * The list of events is not exhaustive: only most interesting events 143 * are recorded. 144 */ 145enum stream_history_event 146{ 147 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 148 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 149 SHE_REACH_FIN = 'a', 150 SHE_BLOCKED_OUT = 'b', 151 SHE_CREATED = 'C', 152 SHE_FRAME_IN = 'd', 153 SHE_FRAME_OUT = 'D', 154 SHE_RESET = 'e', 155 SHE_WINDOW_UPDATE = 'E', 156 SHE_FIN_IN = 'f', 157 SHE_FINISHED = 'F', 158 SHE_GOAWAY_IN = 'g', 159 SHE_USER_WRITE_HEADER = 'h', 160 SHE_HEADERS_IN = 'H', 161 SHE_ONCLOSE_SCHED = 'l', 162 SHE_ONCLOSE_CALL = 'L', 163 SHE_ONNEW = 'N', 164 SHE_SET_PRIO = 'p', 165 SHE_USER_READ = 'r', 166 SHE_SHUTDOWN_READ = 'R', 167 SHE_RST_IN = 's', 168 SHE_RST_OUT = 't', 169 SHE_FLUSH = 'u', 170 SHE_USER_WRITE_DATA = 'w', 171 SHE_SHUTDOWN_WRITE = 'W', 172 SHE_CLOSE = 'X', 173 SHE_FORCE_FINISH = 'Z', 174}; 175 176static void 177sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 178{ 179 enum stream_history_event prev_event; 180 sm_hist_idx_t idx; 181 int plus; 182 183 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 184 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 185 idx = (idx - plus) & SM_HIST_IDX_MASK; 186 prev_event = stream->sm_hist_buf[idx]; 187 188 if (prev_event == sh_event && plus) 189 return; 190 191 if (prev_event == sh_event) 192 sh_event = SHE_PLUS; 193 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 194 195 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 196 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 197 stream->sm_hist_buf); 198} 199 200# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 201# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 202 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 203 LSQ_DEBUG("history: [%.*s]", \ 204 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 205 (stream)->sm_hist_buf); \ 206 } while (0) 207#else 208# define SM_HISTORY_APPEND(stream, event) 209# define SM_HISTORY_DUMP_REMAINING(stream) 210#endif 211 212 213/* Here, "readable" means that the user is able to read from the stream. */ 214static void 215maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream, 216 enum rw_reason reason) 217{ 218 if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) && 219 stream_readable(stream)) 220 { 221 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 222 stream->conn_pub->lconn, reason); 223 } 224} 225 226 227/* Here, "writeable" means that data can be put into packets to be 228 * scheduled to be sent out. 229 */ 230static void 231maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream, 232 enum rw_reason reason) 233{ 234 if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) && 235 lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) && 236 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 237 { 238 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 239 stream->conn_pub->lconn, reason); 240 } 241} 242 243 244static int 245stream_stalled (const lsquic_stream_t *stream) 246{ 247 return 0 == (stream->stream_flags & STREAM_RW_EVENT_FLAGS) && 248 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 249 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 250} 251 252 253static void 254use_user_on_write (lsquic_stream_t *stream) 255{ 256 LSQ_DEBUG("stream %u: use user-supplied on-write callback", stream->id); 257 stream->on_write_cb = stream->stream_if->on_write; 258} 259 260 261static void 262use_internal_on_write_file (lsquic_stream_t *stream) 263{ 264 LSQ_DEBUG("use internal on-write callback (file)"); 265 stream->on_write_cb = stream_file_on_write; 266} 267 268 269static void 270use_internal_on_write_flush (lsquic_stream_t *stream) 271{ 272 LSQ_DEBUG("use internal on-write callback (flush)"); 273 stream->on_write_cb = stream_flush_on_write; 274} 275 276 277#define writing_file(stream) ((stream)->file_fd >= 0) 278 279 280/* TODO: The logic to figure out whether the stream is connection limited 281 * should be taken out of the constructor. The caller should specify this 282 * via one of enum stream_ctor_flags. 283 */ 284lsquic_stream_t * 285lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 286 const struct lsquic_stream_if *stream_if, 287 void *stream_if_ctx, unsigned initial_window, 288 unsigned initial_send_off, 289 enum stream_ctor_flags ctor_flags) 290{ 291 lsquic_cfcw_t *cfcw; 292 lsquic_stream_t *stream; 293 294 stream = calloc(1, sizeof(*stream)); 295 if (!stream) 296 return NULL; 297 298 stream->stream_if = stream_if; 299 stream->id = id; 300 stream->file_fd = -1; 301 stream->conn_pub = conn_pub; 302 if (!initial_window) 303 initial_window = 16 * 1024; 304 if (LSQUIC_STREAM_HANDSHAKE == id || 305 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 306 cfcw = NULL; 307 else 308 { 309 cfcw = &conn_pub->cfcw; 310 stream->stream_flags |= STREAM_CONN_LIMITED; 311 if (conn_pub->hs) 312 stream->stream_flags |= STREAM_USE_HEADERS; 313 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 314 } 315 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 316 if (!initial_send_off) 317 initial_send_off = 16 * 1024; 318 stream->max_send_off = initial_send_off; 319 TAILQ_INIT(&stream->bufs_tosend); 320 if (ctor_flags & SCF_USE_DI_HASH) 321 stream->data_in = data_in_hash_new(conn_pub, id, 0); 322 else 323 stream->data_in = data_in_nocopy_new(conn_pub, id); 324 LSQ_DEBUG("created stream %u", id); 325 SM_HISTORY_APPEND(stream, SHE_CREATED); 326 if (ctor_flags & SCF_DI_AUTOSWITCH) 327 stream->stream_flags |= STREAM_AUTOSWITCH; 328 if (ctor_flags & SCF_CALL_ON_NEW) 329 lsquic_stream_call_on_new(stream, stream_if_ctx); 330 if (ctor_flags & SCF_DISP_RW_ONCE) 331 stream->stream_flags |= STREAM_RW_ONCE; 332 use_user_on_write(stream); 333 return stream; 334} 335 336 337void 338lsquic_stream_call_on_new (lsquic_stream_t *stream, void *stream_if_ctx) 339{ 340 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 341 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 342 { 343 LSQ_DEBUG("calling on_new_stream"); 344 SM_HISTORY_APPEND(stream, SHE_ONNEW); 345 stream->stream_flags |= STREAM_ONNEW_DONE; 346 stream->st_ctx = stream->stream_if->on_new_stream(stream_if_ctx, stream); 347 } 348} 349 350 351void 352lsquic_stream_destroy (lsquic_stream_t *stream) 353{ 354 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 355 STREAM_ONNEW_DONE) 356 { 357 stream->stream_flags |= STREAM_ONCLOSE_DONE; 358 stream->stream_if->on_close(stream, stream->st_ctx); 359 } 360 if (stream->file_fd >= 0) 361 (void) close(stream->file_fd); 362 if (stream->stream_flags & STREAM_SENDING_FLAGS) 363 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 364 if (stream->stream_flags & STREAM_RW_EVENT_FLAGS) 365 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 366 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 367 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 368 lsquic_sfcw_consume_rem(&stream->fc); 369 drop_frames_in(stream); 370 drop_sbts(stream); 371 free(stream->push_req); 372 free(stream->uh); 373 LSQ_DEBUG("destroyed stream %u", stream->id); 374 SM_HISTORY_DUMP_REMAINING(stream); 375 free(stream); 376} 377 378 379static int 380stream_is_finished (const lsquic_stream_t *stream) 381{ 382 return lsquic_stream_is_closed(stream) 383 /* n_unacked checks that no outgoing packets that reference this 384 * stream are outstanding: 385 */ 386 && 0 == stream->n_unacked 387 /* This checks that no packets that reference this stream will 388 * become outstanding: 389 */ 390 && 0 == (stream->stream_flags & (STREAM_SEND_DATA|STREAM_SEND_RST)) 391 && ((stream->stream_flags & STREAM_FORCE_FINISH) 392 || (((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 393 || lsquic_stream_is_pushed(stream)) 394 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 395} 396 397 398static void 399maybe_finish_stream (lsquic_stream_t *stream) 400{ 401 if (0 == (stream->stream_flags & STREAM_FINISHED) && 402 stream_is_finished(stream)) 403 { 404 LSQ_DEBUG("stream %u is now finished", stream->id); 405 SM_HISTORY_APPEND(stream, SHE_FINISHED); 406 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 407 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 408 next_service_stream); 409 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 410 } 411} 412 413 414static void 415maybe_schedule_call_on_close (lsquic_stream_t *stream) 416{ 417 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 418 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 419 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 420 { 421 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 422 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 423 next_service_stream); 424 stream->stream_flags |= STREAM_CALL_ONCLOSE; 425 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 426 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 427 } 428} 429 430 431void 432lsquic_stream_call_on_close (lsquic_stream_t *stream) 433{ 434 assert(stream->stream_flags & STREAM_ONNEW_DONE); 435 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 436 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 437 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 438 next_service_stream); 439 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 440 { 441 LSQ_DEBUG("calling on_close for stream %u", stream->id); 442 stream->stream_flags |= STREAM_ONCLOSE_DONE; 443 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 444 stream->stream_if->on_close(stream, stream->st_ctx); 445 } 446 else 447 assert(0); 448} 449 450 451static int 452stream_readable (const lsquic_stream_t *stream) 453{ 454 /* A stream is readable if one of the following is true: */ 455 return 456 /* - It is already finished: in that case, lsquic_stream_read() will 457 * return 0. 458 */ 459 (stream->stream_flags & STREAM_FIN_REACHED) 460 /* - The stream is reset, by either side. In this case, 461 * lsquic_stream_read() will return -1 (we want the user to be 462 * able to collect the error). 463 */ 464 || (stream->stream_flags & STREAM_RST_FLAGS) 465 /* - Either we are not in HTTP mode or the HTTP headers have been 466 * received and the headers or data from the stream can be read. 467 */ 468 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 469 == STREAM_USE_HEADERS) 470 && (stream->uh != NULL 471 || stream->data_in->di_if->di_get_frame(stream->data_in, 472 stream->read_offset))) 473 ; 474} 475 476 477size_t 478lsquic_stream_write_avail (const lsquic_stream_t *stream) 479{ 480 uint64_t stream_avail, conn_avail; 481 size_t unflushed_sum; 482 483 assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off); 484 stream_avail = stream->max_send_off - stream->tosend_off 485 - stream->tosend_sz; 486 if (0 == stream->tosend_sz) 487 { 488 unflushed_sum = sum_sbts(stream); 489 if (unflushed_sum > stream_avail) 490 stream_avail = 0; 491 else 492 stream_avail -= unflushed_sum; 493 } 494 495 if (stream->stream_flags & STREAM_CONN_LIMITED) 496 { 497 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 498 if (conn_avail < stream_avail) 499 { 500 LSQ_DEBUG("stream %u write buffer is limited by connection: " 501 "%"PRIu64, stream->id, conn_avail); 502 return conn_avail; 503 } 504 } 505 506 LSQ_DEBUG("stream %u write buffer is limited by stream: %"PRIu64, 507 stream->id, stream_avail); 508 return stream_avail; 509} 510 511 512static int 513stream_writeable (const lsquic_stream_t *stream) 514{ 515 /* A stream is writeable if one of the following is true: */ 516 return 517 /* - The stream is reset, by either side. In this case, 518 * lsquic_stream_write() will return -1 (we want the user to be 519 * able to collect the error). 520 */ 521 (stream->stream_flags & STREAM_RST_FLAGS) 522 /* - There is room to write to the stream. 523 */ 524 || lsquic_stream_write_avail(stream) > 0 525 ; 526} 527 528 529int 530lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 531{ 532 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 533 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 534 { 535 return -1; 536 } 537 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 538 { 539 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 540 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 541 next_send_stream); 542 stream->stream_flags |= STREAM_SEND_WUF; 543 } 544 return 0; 545} 546 547 548int 549lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 550{ 551 uint64_t max_off; 552 int got_next_offset; 553 enum ins_frame ins_frame; 554 555 assert(frame->packet_in); 556 557 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 558 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 559 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 560 561 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 562 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 563 { 564 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 565 lsquic_malo_put(frame); 566 return -1; 567 } 568 569 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 570 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 571 if (INS_FRAME_OK == ins_frame) 572 { 573 /* Update maximum offset in the flow controller and check for flow 574 * control violation: 575 */ 576 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 577 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 578 return -1; 579 if (frame->data_frame.df_fin) 580 { 581 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 582 stream->stream_flags |= STREAM_FIN_RECVD; 583 maybe_finish_stream(stream); 584 } 585 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 586 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 587 { 588 stream->data_in = stream->data_in->di_if->di_switch_impl( 589 stream->data_in, stream->read_offset); 590 if (!stream->data_in) 591 { 592 stream->data_in = data_in_error_new(); 593 return -1; 594 } 595 } 596 if (got_next_offset) 597 /* Checking the offset saves di_get_frame() call */ 598 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN); 599 return 0; 600 } 601 else if (INS_FRAME_DUP == ins_frame) 602 { 603 return 0; 604 } 605 else 606 { 607 assert(INS_FRAME_ERR == ins_frame); 608 return -1; 609 } 610} 611 612 613static void 614drop_frames_in (lsquic_stream_t *stream) 615{ 616 if (stream->data_in) 617 { 618 stream->data_in->di_if->di_destroy(stream->data_in); 619 /* To avoid checking whether `data_in` is set, just set to the error 620 * data-in stream. It does the right thing after incoming data is 621 * dropped. 622 */ 623 stream->data_in = data_in_error_new(); 624 } 625} 626 627 628int 629lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 630 uint32_t error_code) 631{ 632 633 if (stream->stream_flags & STREAM_RST_RECVD) 634 { 635 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 636 return 0; 637 } 638 639 SM_HISTORY_APPEND(stream, SHE_RST_IN); 640 /* This flag must always be set, even if we are "ignoring" it: it is 641 * used by elision code. 642 */ 643 stream->stream_flags |= STREAM_RST_RECVD; 644 645 if ((stream->stream_flags & STREAM_FIN_RECVD) && 646 /* Pushed streams have fake STREAM_FIN_RECVD set, thus 647 * we need a special check: 648 */ 649 !lsquic_stream_is_pushed(stream)) 650 { 651 LSQ_DEBUG("ignore RST_STREAM frame after FIN is received"); 652 return 0; 653 } 654 655 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 656 { 657 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 658 "smaller than that of byte following the last byte we have seen: " 659 "0x%"PRIX64, stream->id, offset, 660 lsquic_sfcw_get_max_recv_off(&stream->fc)); 661 return -1; 662 } 663 664 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 665 { 666 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 667 " violates flow control", stream->id, offset); 668 return -1; 669 } 670 671 /* Let user collect error: */ 672 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN); 673 674 lsquic_sfcw_consume_rem(&stream->fc); 675 drop_frames_in(stream); 676 677 if (!(stream->stream_flags & 678 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 679 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 680 681 stream->stream_flags |= STREAM_RST_RECVD; 682 683 maybe_finish_stream(stream); 684 maybe_schedule_call_on_close(stream); 685 686 return 0; 687} 688 689 690uint64_t 691lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 692{ 693 assert(stream->stream_flags & STREAM_SEND_WUF); 694 stream->stream_flags &= ~STREAM_SEND_WUF; 695 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 696 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 697 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 698} 699 700 701void 702lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 703{ 704 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 705 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 706 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 707 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 708 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 709} 710 711 712void 713lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 714{ 715 assert(stream->stream_flags & STREAM_SEND_RST); 716 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 717 stream->stream_flags &= ~STREAM_SEND_RST; 718 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 719 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 720 stream->stream_flags |= STREAM_RST_SENT; 721 maybe_finish_stream(stream); 722} 723 724 725static size_t 726read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 727{ 728 struct uncompressed_headers *uh = stream->uh; 729 size_t n_avail = uh->uh_size - uh->uh_off; 730 if (n_avail < len) 731 len = n_avail; 732 memcpy(dst, uh->uh_headers + uh->uh_off, len); 733 uh->uh_off += len; 734 if (uh->uh_off == uh->uh_size) 735 { 736 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 737 free(uh); 738 stream->uh = NULL; 739 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 740 { 741 stream->stream_flags |= STREAM_FIN_REACHED; 742 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 743 } 744 } 745 return len; 746} 747 748 749/* This function returns 0 when EOF is reached. 750 */ 751ssize_t 752lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 753 int iovcnt) 754{ 755 size_t total_nread, nread; 756 int processed_frames, read_unc_headers, iovidx; 757 unsigned char *p, *end; 758 759 SM_HISTORY_APPEND(stream, SHE_USER_READ); 760 761#define NEXT_IOV() do { \ 762 ++iovidx; \ 763 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 764 ++iovidx; \ 765 if (iovidx < iovcnt) \ 766 { \ 767 p = iov[iovidx].iov_base; \ 768 end = p + iov[iovidx].iov_len; \ 769 } \ 770 else \ 771 p = end = NULL; \ 772} while (0) 773 774#define AVAIL() (end - p) 775 776 if (stream->stream_flags & STREAM_RST_FLAGS) 777 { 778 errno = ECONNRESET; 779 return -1; 780 } 781 if (stream->stream_flags & STREAM_U_READ_DONE) 782 { 783 errno = EBADF; 784 return -1; 785 } 786 if (stream->stream_flags & STREAM_FIN_REACHED) 787 return 0; 788 789 total_nread = 0; 790 processed_frames = 0; 791 792 iovidx = -1; 793 NEXT_IOV(); 794 795 if (stream->uh && AVAIL()) 796 { 797 read_unc_headers = 1; 798 do 799 { 800 nread = read_uh(stream, p, AVAIL()); 801 p += nread; 802 total_nread += nread; 803 if (p == end) 804 NEXT_IOV(); 805 } 806 while (stream->uh && AVAIL()); 807 } 808 else 809 read_unc_headers = 0; 810 811 struct data_frame *data_frame; 812 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 813 { 814 ++processed_frames; 815 size_t navail = data_frame->df_size - data_frame->df_read_off; 816 size_t ntowrite = AVAIL(); 817 if (navail < ntowrite) 818 ntowrite = navail; 819 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 820 p += ntowrite; 821 data_frame->df_read_off += ntowrite; 822 stream->read_offset += ntowrite; 823 total_nread += ntowrite; 824 if (data_frame->df_read_off == data_frame->df_size) 825 { 826 const int fin = data_frame->df_fin; 827 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 828 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 829 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 830 { 831 stream->data_in = stream->data_in->di_if->di_switch_impl( 832 stream->data_in, stream->read_offset); 833 if (!stream->data_in) 834 { 835 stream->data_in = data_in_error_new(); 836 return -1; 837 } 838 } 839 if (fin) 840 { 841 stream->stream_flags |= STREAM_FIN_REACHED; 842 break; 843 } 844 } 845 if (p == end) 846 NEXT_IOV(); 847 } 848 849 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 850 total_nread, stream->read_offset); 851 852 if (processed_frames) 853 { 854 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 855 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 856 { 857 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 858 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 859 stream->stream_flags |= STREAM_SEND_WUF; 860 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ); 861 } 862 } 863 864 if (processed_frames || read_unc_headers) 865 { 866 return total_nread; 867 } 868 else 869 { 870 assert(0 == total_nread); 871 errno = EWOULDBLOCK; 872 return -1; 873 } 874} 875 876 877ssize_t 878lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 879{ 880 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 881 return lsquic_stream_readv(stream, &iov, 1); 882} 883 884 885#ifndef NDEBUG 886/* Use weak linkage so that tests can override this function */ 887int 888lsquic_stream_tosend_fin (const lsquic_stream_t *stream) 889 __attribute__((weak)) 890 ; 891#endif 892int 893lsquic_stream_tosend_fin (const lsquic_stream_t *stream) 894{ 895 return (stream->stream_flags & STREAM_U_WRITE_DONE) 896 && !writing_file(stream) 897 && 0 == stream->tosend_sz 898 && 0 == sum_sbts(stream); 899} 900 901 902static void 903stream_shutdown_read (lsquic_stream_t *stream) 904{ 905 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 906 { 907 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 908 stream->stream_flags |= STREAM_U_READ_DONE; 909 stream_wantread(stream, 0); 910 maybe_finish_stream(stream); 911 } 912} 913 914 915static void 916stream_shutdown_write (lsquic_stream_t *stream) 917{ 918 int flushed_all, data_send_ok; 919 920 if (stream->stream_flags & STREAM_U_WRITE_DONE) 921 return; 922 923 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 924 stream->stream_flags |= STREAM_U_WRITE_DONE; 925 926 data_send_ok = !(stream->stream_flags & 927 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)); 928 if (!data_send_ok) 929 { 930 stream_wantwrite(stream, 0); 931 return; 932 } 933 934 lsquic_stream_flush(stream); 935 flushed_all = stream->tosend_sz == sum_sbts(stream); 936 if (flushed_all) 937 stream_wantwrite(stream, 0); 938 else 939 { 940 stream_wantwrite(stream, 1); 941 use_internal_on_write_flush(stream); 942 } 943 944 if (flushed_all || stream->tosend_sz > 0) 945 { 946 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 947 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 948 stream->stream_flags |= STREAM_SEND_DATA; 949 } 950} 951 952 953int 954lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 955{ 956 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 957 if (lsquic_stream_is_closed(stream)) 958 { 959 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 960 errno = EBADF; 961 return -1; 962 } 963 /* 0: read, 1: write: 2: read and write 964 */ 965 if (how < 0 || how > 2) 966 { 967 errno = EINVAL; 968 return -1; 969 } 970 971 if (how) 972 stream_shutdown_write(stream); 973 if (how != 1) 974 stream_shutdown_read(stream); 975 976 maybe_finish_stream(stream); 977 maybe_schedule_call_on_close(stream); 978 if (how) 979 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN); 980 981 return 0; 982} 983 984 985void 986lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 987{ 988 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 989 if (LSQUIC_STREAM_HANDSHAKE == stream->id 990 || ((stream->stream_flags & STREAM_USE_HEADERS) && 991 LSQUIC_STREAM_HEADERS == stream->id)) 992 { 993 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 994 stream->stream_flags |= STREAM_FORCE_FINISH; 995 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 996 } 997 maybe_finish_stream(stream); 998 maybe_schedule_call_on_close(stream); 999} 1000 1001 1002static void 1003fake_reset_unused_stream (lsquic_stream_t *stream) 1004{ 1005 stream->stream_flags |= 1006 STREAM_RST_RECVD /* User will pick this up on read or write */ 1007 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1008 ; 1009 1010 /* Cancel all writes to the network scheduled for this stream: */ 1011 if (stream->stream_flags & STREAM_SENDING_FLAGS) 1012 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1013 next_send_stream); 1014 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1015 1016 if (writing_file(stream)) 1017 stop_reading_from_file(stream); 1018 1019 LSQ_DEBUG("fake-reset stream %u%s", 1020 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 1021 maybe_finish_stream(stream); 1022 maybe_schedule_call_on_close(stream); 1023} 1024 1025 1026/* This function should only be called for locally-initiated streams whose ID 1027 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1028 * frame sent by peer but we have not yet received it and created a stream. 1029 * In this situation, we mark the stream as reset, so that user's on_read or 1030 * on_write event callback picks up the error. That, in turn, should result 1031 * in stream being closed. 1032 * 1033 * If we have received any data frames on this stream, this probably indicates 1034 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1035 * lower than this. However, we still try to handle it gracefully and peform 1036 * a shutdown, as if the stream was not reset. 1037 */ 1038void 1039lsquic_stream_received_goaway (lsquic_stream_t *stream) 1040{ 1041 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1042 if (0 == stream->read_offset && 1043 stream->data_in->di_if->di_empty(stream->data_in)) 1044 fake_reset_unused_stream(stream); /* Normal condition */ 1045 else 1046 { /* This is odd, let's handle it the best we can: */ 1047 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1048 lsquic_stream_shutdown_internal(stream); 1049 } 1050} 1051 1052 1053uint64_t 1054lsquic_stream_read_offset (const lsquic_stream_t *stream) 1055{ 1056 return stream->read_offset; 1057} 1058 1059 1060static int 1061stream_want_read_or_write (lsquic_stream_t *stream, int is_want, int flag) 1062{ 1063 const int old_val = !!(stream->stream_flags & flag); 1064 if (old_val != is_want) 1065 { 1066 if (is_want) 1067 { 1068 if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS)) 1069 TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1070 stream->stream_flags |= flag; 1071 } 1072 else 1073 { 1074 stream->stream_flags &= ~flag; 1075 if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS)) 1076 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1077 } 1078 } 1079 return old_val; 1080} 1081 1082 1083static int 1084stream_wantread (lsquic_stream_t *stream, int is_want) 1085{ 1086 return stream_want_read_or_write(stream, is_want, STREAM_WANT_READ); 1087} 1088 1089 1090int 1091lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1092{ 1093 if (0 == (stream->stream_flags & STREAM_U_READ_DONE)) 1094 { 1095 if (is_want) 1096 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD); 1097 return stream_wantread(stream, is_want); 1098 } 1099 else 1100 { 1101 errno = EBADF; 1102 return -1; 1103 } 1104} 1105 1106 1107static int 1108stream_wantwrite (lsquic_stream_t *stream, int is_want) 1109{ 1110 if (writing_file(stream)) 1111 { 1112 int old_val = stream->stream_flags & STREAM_SAVED_WANTWR; 1113 stream->stream_flags |= old_val & (0 - !!is_want); 1114 return !!old_val; 1115 } 1116 else 1117 return stream_want_read_or_write(stream, is_want, STREAM_WANT_WRITE); 1118} 1119 1120 1121int 1122lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1123{ 1124 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1125 { 1126 return stream_wantwrite(stream, is_want); 1127 } 1128 else 1129 { 1130 errno = EBADF; 1131 return -1; 1132 } 1133} 1134 1135 1136static void 1137stream_flush_on_write (lsquic_stream_t *stream, 1138 struct lsquic_stream_ctx *stream_ctx) 1139{ 1140 size_t sum, n_flushed; 1141 1142 assert(stream->stream_flags & STREAM_U_WRITE_DONE); 1143 1144 sum = sum_sbts(stream) - stream->tosend_sz; 1145 if (sum == 0) 1146 { /* This can occur if the stream has been written to and closed by 1147 * the user, but a RST_STREAM comes in that drops all data. 1148 */ 1149 LSQ_DEBUG("%s: no more data to send", __func__); 1150 stream_wantwrite(stream, 0); 1151 return; 1152 } 1153 1154 n_flushed = stream_flush_internal(stream, sum); 1155 if (n_flushed == sum) 1156 { 1157 LSQ_DEBUG("Flushed all remaining data (%zd bytes)", n_flushed); 1158 stream_wantwrite(stream, 0); 1159 } 1160 else 1161 LSQ_DEBUG("Flushed %zd out of %zd remaining data", n_flushed, sum); 1162} 1163 1164 1165#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1166 STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1167 1168 1169static void 1170stream_dispatch_rw_events_loop (lsquic_stream_t *stream, int *processed) 1171{ 1172 unsigned no_progress_count, no_progress_limit; 1173 enum stream_flags flags; 1174 uint64_t size; 1175 size_t sbt_size; 1176 1177 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1178 *processed = 0; 1179 1180 no_progress_count = 0; 1181 while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1182 { 1183 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1184 size = stream->read_offset; 1185 1186 stream->stream_if->on_read(stream, stream->st_ctx); 1187 *processed = 1; 1188 1189 if (no_progress_limit && size == stream->read_offset && 1190 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1191 { 1192 ++no_progress_count; 1193 if (no_progress_count >= no_progress_limit) 1194 { 1195 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1196 "progress) in user code reading from stream", 1197 no_progress_count, 1198 no_progress_count == 1 ? "" : "s"); 1199 break; 1200 } 1201 } 1202 else 1203 no_progress_count = 0; 1204 } 1205 1206 no_progress_count = 0; 1207 while ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream)) 1208 { 1209 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1210 size = stream->tosend_sz; 1211 if (0 == size) 1212 sbt_size = sum_sbts(stream); 1213 1214 stream->on_write_cb(stream, stream->st_ctx); 1215 *processed = 1; 1216 1217 if (no_progress_limit && 1218 flags == (stream->stream_flags & USER_PROGRESS_FLAGS) && 1219 (0 == size ? sbt_size == sum_sbts(stream) : 1220 size == stream->tosend_sz)) 1221 { 1222 ++no_progress_count; 1223 if (no_progress_count >= no_progress_limit) 1224 { 1225 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1226 "progress) in user code writing to stream", 1227 no_progress_count, 1228 no_progress_count == 1 ? "" : "s"); 1229 break; 1230 } 1231 } 1232 else 1233 no_progress_count = 0; 1234 } 1235} 1236 1237 1238static void 1239stream_dispatch_rw_events_once (lsquic_stream_t *stream, int *processed) 1240{ 1241 *processed = 0; 1242 1243 if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1244 { 1245 stream->stream_if->on_read(stream, stream->st_ctx); 1246 *processed = 1; 1247 } 1248 1249 if ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream)) 1250 { 1251 stream->on_write_cb(stream, stream->st_ctx); 1252 *processed = 1; 1253 } 1254} 1255 1256 1257static void 1258maybe_mark_as_blocked (lsquic_stream_t *stream) 1259{ 1260 uint64_t off, stream_data_sz; 1261 1262 if (stream->tosend_sz) 1263 stream_data_sz = stream->tosend_sz; 1264 else 1265 stream_data_sz = sum_sbts(stream); 1266 1267 off = stream->tosend_off + stream_data_sz; 1268 if (off >= stream->max_send_off) 1269 { 1270 assert(off == stream->max_send_off); 1271 if (stream->blocked_off < stream->max_send_off) 1272 { 1273 stream->blocked_off = stream->max_send_off; 1274 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1275 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1276 next_send_stream); 1277 stream->stream_flags |= STREAM_SEND_BLOCKED; 1278 LSQ_DEBUG("marked stream-blocked at stream offset " 1279 "%"PRIu64, stream->blocked_off); 1280 return; 1281 } 1282 } 1283 1284 if (stream->stream_flags & STREAM_CONN_LIMITED) 1285 { 1286 struct lsquic_conn_cap *const cc = &stream->conn_pub->conn_cap; 1287 off = cc->cc_sent + cc->cc_tosend; 1288 if (off >= cc->cc_max) 1289 { 1290 assert(off == cc->cc_max); 1291 if (cc->cc_blocked < cc->cc_max) 1292 { 1293 cc->cc_blocked = cc->cc_max; 1294 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1295 LSQ_DEBUG("marked connection-blocked at connection offset " 1296 "%"PRIu64, cc->cc_max); 1297 } 1298 } 1299 } 1300} 1301 1302void 1303lsquic_stream_dispatch_rw_events (lsquic_stream_t *stream) 1304{ 1305 int processed; 1306 uint64_t tosend_off; 1307 1308 assert(stream->stream_flags & STREAM_RW_EVENT_FLAGS); 1309 tosend_off = stream->tosend_off; 1310 1311 if (stream->stream_flags & STREAM_RW_ONCE) 1312 stream_dispatch_rw_events_once(stream, &processed); 1313 else 1314 stream_dispatch_rw_events_loop(stream, &processed); 1315 1316 /* User wants to write, but no progress has been made: either stream 1317 * or connection is blocked. 1318 */ 1319 if ((stream->stream_flags & STREAM_WANT_WRITE) && 1320 stream->tosend_off == tosend_off && 1321 (stream->tosend_sz == 0 && sum_sbts(stream) == 0)) 1322 maybe_mark_as_blocked(stream); 1323 1324 if (stream->stream_flags & STREAM_RW_EVENT_FLAGS) 1325 { 1326 if (processed) 1327 { /* Move the stream to the end of the list to ensure fairness. */ 1328 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1329 TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1330 } 1331 } 1332 else if (((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 1333 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE)) 1334 LSQ_DEBUG("stream %u stalled", stream->id); 1335} 1336 1337 1338static struct stream_buf_tosend * 1339get_sbt_buf (lsquic_stream_t *stream) 1340{ 1341 struct stream_buf_tosend *sbt; 1342 1343 sbt = TAILQ_LAST(&stream->bufs_tosend, sbts_tailq); 1344 if (!(sbt && SBT_BUF == sbt->sbt_type && (sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz) > 0)) 1345 { 1346 sbt = lsquic_mm_get_4k(stream->conn_pub->mm); 1347 if (!sbt) 1348 return NULL; 1349 sbt->sbt_type = SBT_BUF; 1350 sbt->u.buf.sbt_sz = 0; 1351 sbt->u.buf.sbt_off = 0; 1352 TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt); 1353 } 1354 1355 return sbt; 1356} 1357 1358 1359static size_t 1360sbt_write (struct stream_buf_tosend *sbt, const void *buf, size_t len) 1361{ 1362 assert(SBT_BUF == sbt->sbt_type); 1363 size_t ntowrite = sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz; 1364 if (len < ntowrite) 1365 ntowrite = len; 1366 memcpy(sbt->u.buf.sbt_data + sbt->u.buf.sbt_sz, buf, ntowrite); 1367 sbt->u.buf.sbt_sz += ntowrite; 1368 return ntowrite; 1369} 1370 1371static size_t 1372sbt_read_buf (struct stream_buf_tosend *sbt, void *buf, size_t len) 1373{ 1374 size_t navail = sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off; 1375 if (len > navail) 1376 len = navail; 1377 memcpy(buf, sbt->u.buf.sbt_data + sbt->u.buf.sbt_off, len); 1378 sbt->u.buf.sbt_off += len; 1379 return len; 1380} 1381 1382 1383static void 1384incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr) 1385{ 1386 stream->tosend_sz += incr; 1387 if (stream->stream_flags & STREAM_CONN_LIMITED) 1388 { 1389 assert(stream->conn_pub->conn_cap.cc_tosend + 1390 stream->conn_pub->conn_cap.cc_sent + incr <= 1391 stream->conn_pub->conn_cap.cc_max); 1392 stream->conn_pub->conn_cap.cc_tosend += incr; 1393 } 1394} 1395 1396 1397static void 1398decr_tosend_sz (lsquic_stream_t *stream, uint64_t decr) 1399{ 1400 assert(decr <= stream->tosend_sz); 1401 stream->tosend_sz -= decr; 1402 if (stream->stream_flags & STREAM_CONN_LIMITED) 1403 { 1404 assert(decr <= stream->conn_pub->conn_cap.cc_tosend); 1405 stream->conn_pub->conn_cap.cc_tosend -= decr; 1406 } 1407} 1408 1409 1410static void 1411sbt_truncated_file (struct stream_buf_tosend *sbt) 1412{ 1413 off_t delta = sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1414 decr_tosend_sz(sbt->u.file.sbt_stream, delta); 1415 sbt->u.file.sbt_sz = sbt->u.file.sbt_off; 1416 sbt->u.file.sbt_last = 1; 1417} 1418 1419 1420static void 1421stop_reading_from_file (lsquic_stream_t *stream) 1422{ 1423 assert(stream->file_fd >= 0); 1424 if (stream->stream_flags & STREAM_CLOSE_FILE) 1425 (void) close(stream->file_fd); 1426 stream->file_fd = -1; 1427 stream_wantwrite(stream, !!(stream->stream_flags & STREAM_SAVED_WANTWR)); 1428 use_user_on_write(stream); 1429} 1430 1431 1432static size_t 1433sbt_read_file (struct stream_buf_tosend *sbt, void *pbuf, size_t len) 1434{ 1435 const lsquic_stream_t *const stream = sbt->u.file.sbt_stream; 1436 size_t navail; 1437 ssize_t nread; 1438 unsigned char *buf = pbuf; 1439 1440 navail = sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1441 if (len > navail) 1442 len = navail; 1443 1444 assert(len > 0); 1445 1446 *buf++ = sbt->u.file.sbt_stream->file_byte; 1447 sbt->u.file.sbt_off += 1; 1448 len -= 1; 1449 1450 while (len > 0) 1451 { 1452 nread = read(sbt->u.file.sbt_fd, buf, len); 1453 if (-1 == nread) 1454 { 1455 LSQ_WARN("error reading: %s", strerror(errno)); 1456 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1457 (intmax_t) sbt->u.file.sbt_off, 1458 (intmax_t) sbt->u.file.sbt_sz); 1459 sbt_truncated_file(sbt); 1460 break; 1461 } 1462 else if (0 == nread) 1463 { 1464 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1465 (intmax_t) sbt->u.file.sbt_off, 1466 (intmax_t) sbt->u.file.sbt_sz); 1467 sbt_truncated_file(sbt); 1468 break; 1469 } 1470 buf += nread; 1471 len -= nread; 1472 sbt->u.file.sbt_off += nread; 1473 } 1474 1475 len = buf - (unsigned char *) pbuf; 1476 1477 if (sbt->u.file.sbt_off < sbt->u.file.sbt_sz || !sbt->u.file.sbt_last) 1478 { 1479 nread = read(sbt->u.file.sbt_fd, &sbt->u.file.sbt_stream->file_byte, 1); 1480 if (-1 == nread) 1481 { 1482 LSQ_WARN("error reading: %s", strerror(errno)); 1483 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1484 (intmax_t) sbt->u.file.sbt_off, 1485 (intmax_t) sbt->u.file.sbt_sz); 1486 sbt_truncated_file(sbt); 1487 } 1488 else if (0 == nread) 1489 { 1490 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1491 (intmax_t) sbt->u.file.sbt_off, 1492 (intmax_t) sbt->u.file.sbt_sz); 1493 sbt_truncated_file(sbt); 1494 } 1495 } 1496 1497 if (sbt->u.file.sbt_last && sbt->u.file.sbt_off == sbt->u.file.sbt_sz) 1498 stop_reading_from_file(sbt->u.file.sbt_stream); 1499 1500 return len; 1501} 1502 1503 1504static size_t 1505sbt_read (struct stream_buf_tosend *sbt, void *buf, size_t len) 1506{ 1507 switch (sbt->sbt_type) 1508 { 1509 case SBT_BUF: 1510 return sbt_read_buf(sbt, buf, len); 1511 default: 1512 assert(SBT_FILE == sbt->sbt_type); 1513 return sbt_read_file(sbt, buf, len); 1514 } 1515} 1516 1517 1518static int 1519sbt_done (const struct stream_buf_tosend *sbt) 1520{ 1521 switch (sbt->sbt_type) 1522 { 1523 case SBT_BUF: 1524 return sbt->u.buf.sbt_off == sbt->u.buf.sbt_sz; 1525 default: 1526 assert(SBT_FILE == sbt->sbt_type); 1527 return sbt->u.file.sbt_off == sbt->u.file.sbt_sz; 1528 } 1529} 1530 1531 1532static void 1533sbt_destroy (lsquic_stream_t *stream, struct stream_buf_tosend *sbt) 1534{ 1535 switch (sbt->sbt_type) 1536 { 1537 case SBT_BUF: 1538 lsquic_mm_put_4k(stream->conn_pub->mm, sbt); 1539 break; 1540 default: 1541 assert(SBT_FILE == sbt->sbt_type); 1542 free(sbt); 1543 } 1544} 1545 1546 1547static size_t 1548sbt_size (const struct stream_buf_tosend *sbt) 1549{ 1550 switch (sbt->sbt_type) 1551 { 1552 case SBT_BUF: 1553 return sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off; 1554 default: 1555 return sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1556 } 1557} 1558 1559 1560static size_t 1561sum_sbts (const lsquic_stream_t *stream) 1562{ 1563 size_t sum; 1564 struct stream_buf_tosend *sbt; 1565 1566 sum = 0; 1567 TAILQ_FOREACH(sbt, &stream->bufs_tosend, next_sbt) 1568 sum += sbt_size(sbt); 1569 1570 return sum; 1571} 1572 1573 1574static void 1575maybe_put_on_sending_streams (lsquic_stream_t *stream) 1576{ 1577 if (lsquic_stream_tosend_sz(stream)) 1578 { 1579 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1580 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 1581 stream->stream_flags |= STREAM_SEND_DATA; 1582 } 1583} 1584 1585 1586static size_t 1587stream_flush_internal (lsquic_stream_t *stream, size_t size) 1588{ 1589 uint64_t conn_avail; 1590 1591 assert(0 == stream->tosend_sz || 1592 (stream->stream_flags & STREAM_U_WRITE_DONE)); 1593 if (stream->stream_flags & STREAM_CONN_LIMITED) 1594 { 1595 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 1596 if (size > conn_avail) 1597 { 1598 LSQ_DEBUG("connection-limited: flushing only %"PRIu64 1599 " out of %zd bytes", conn_avail, size); 1600 size = conn_avail; 1601 } 1602 } 1603 LSQ_DEBUG("flushed %zd bytes of stream %u", size, stream->id); 1604 SM_HISTORY_APPEND(stream, SHE_FLUSH); 1605 incr_tosend_sz(stream, size); 1606 maybe_put_on_sending_streams(stream); 1607 return size; 1608} 1609 1610 1611/* When stream->tosend_sz is zero and we have anything in SBT list, this 1612 * means that we have unflushed data. 1613 */ 1614int 1615lsquic_stream_flush (lsquic_stream_t *stream) 1616{ 1617 size_t sum; 1618 if (0 == stream->tosend_sz && !TAILQ_EMPTY(&stream->bufs_tosend)) 1619 { 1620 sum = sum_sbts(stream); 1621 if (stream_flush_internal(stream, sum) > 0) 1622 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_FLUSH); 1623 } 1624 return 0; 1625} 1626 1627 1628/* The flush threshold is the maximum size of stream data that can be sent 1629 * in a full packet. 1630 */ 1631static size_t 1632flush_threshold (const lsquic_stream_t *stream) 1633{ 1634 enum packet_out_flags flags; 1635 unsigned packet_header_sz, stream_header_sz; 1636 size_t threshold; 1637 1638 /* We are guessing the number of bytes that will be used to encode 1639 * packet number, because we do not have this information at this 1640 * point in time. 1641 */ 1642 flags = PACKNO_LEN_2 << POBIT_SHIFT; 1643 if (stream->conn_pub->lconn->cn_flags & LSCONN_TCID0) 1644 flags |= PO_CONN_ID; 1645 1646 packet_header_sz = lsquic_po_header_length(flags); 1647 stream_header_sz = stream->conn_pub->lconn->cn_pf 1648 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1649 1650 threshold = stream->conn_pub->lconn->cn_pack_size - packet_header_sz 1651 - stream_header_sz; 1652 return threshold; 1653} 1654 1655 1656static size_t 1657flush_or_check_flags (lsquic_stream_t *stream, size_t sz) 1658{ 1659 size_t sum; 1660 if (0 == stream->tosend_sz) 1661 { 1662 sum = sum_sbts(stream); 1663 if (sum >= flush_threshold(stream)) 1664 return stream_flush_internal(stream, sum); 1665 else 1666 return 0; 1667 } 1668 else 1669 { 1670 incr_tosend_sz(stream, sz); 1671 maybe_put_on_sending_streams(stream); 1672 return sz; 1673 } 1674} 1675 1676 1677static void 1678stream_file_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *st_ctx) 1679{ 1680 struct stream_buf_tosend *sbt; 1681 ssize_t nr; 1682 size_t size, left; 1683 int last; 1684 1685 if (stream->stream_flags & STREAM_RST_FLAGS) 1686 { 1687 LSQ_INFO("stream was reset: stopping sending the file at offset %jd", 1688 (intmax_t) stream->file_off); 1689 stop_reading_from_file(stream); 1690 return; 1691 } 1692 1693 /* Write as much as we can */ 1694 size = lsquic_stream_write_avail(stream); 1695 left = stream->file_size - stream->file_off; 1696 if (left < size) 1697 size = left; 1698 1699 if (0 == stream->file_off) 1700 { 1701 /* Try to read in 1 byte to check for truncation. Having a byte in 1702 * store guarantees that we can generate a frame even if the file is 1703 * truncated later. This function only does it once, when the first 1704 * SBT is queued. Subsequent SBT use the byte read in by previous 1705 * SBT in sbt_read_file(). 1706 */ 1707 nr = read(stream->file_fd, &stream->file_byte, 1); 1708 if (nr != 1) 1709 { 1710 if (nr < 0) 1711 LSQ_WARN("cannot read from file: %s", strerror(errno)); 1712 LSQ_INFO("stopping sending the file at offset %jd", 1713 (intmax_t) stream->file_off); 1714 stop_reading_from_file(stream); 1715 return; 1716 } 1717 } 1718 1719 last = stream->file_off + (off_t) size == stream->file_size; 1720 1721 sbt = malloc(offsetof(struct stream_buf_tosend, u.file.sbt_last) + 1722 sizeof(((struct stream_buf_tosend *)0)->u.file.sbt_last)); 1723 if (!sbt) 1724 { 1725 LSQ_WARN("malloc failed: %s", strerror(errno)); 1726 LSQ_INFO("stopping sending the file at offset %jd", 1727 (intmax_t) stream->file_off); 1728 stop_reading_from_file(stream); 1729 return; 1730 } 1731 1732 sbt->sbt_type = SBT_FILE; 1733 sbt->u.file.sbt_stream = stream; 1734 sbt->u.file.sbt_fd = stream->file_fd; 1735 sbt->u.file.sbt_sz = size; 1736 sbt->u.file.sbt_off = 0; 1737 sbt->u.file.sbt_last = last; 1738 TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt); 1739 1740 LSQ_DEBUG("inserted %zd-byte sbt at offset %jd, last: %d", size, 1741 (intmax_t) stream->file_off, last); 1742 1743 stream->file_off += size; 1744 1745 incr_tosend_sz(stream, size); 1746 maybe_put_on_sending_streams(stream); 1747 1748 if (last) 1749 stream_want_read_or_write(stream, 0, STREAM_WANT_WRITE); 1750} 1751 1752 1753static void 1754stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, size_t size, 1755 int close) 1756{ 1757 int want_write; 1758 1759 /* STREAM_WANT_WRITE is not guaranteed to be set: the user may have 1760 * already unset it. 1761 */ 1762 want_write = !!(stream->stream_flags & STREAM_WANT_WRITE); 1763 stream_wantwrite(stream, 1); 1764 1765 stream->file_fd = fd; 1766 stream->file_off = off; 1767 stream->file_size = size; 1768 1769 stream_wantwrite(stream, want_write); 1770 1771 if (close) 1772 stream->stream_flags |= STREAM_CLOSE_FILE; 1773 else 1774 stream->stream_flags &= ~STREAM_CLOSE_FILE; 1775 1776 use_internal_on_write_file(stream); 1777 stream->on_write_cb(stream, stream->st_ctx); 1778} 1779 1780 1781#define COMMON_WRITE_CHECKS() do { \ 1782 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1783 == STREAM_USE_HEADERS) \ 1784 { \ 1785 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1786 errno = EILSEQ; \ 1787 return -1; \ 1788 } \ 1789 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1790 { \ 1791 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1792 errno = ECONNRESET; \ 1793 return -1; \ 1794 } \ 1795 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1796 { \ 1797 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1798 "writing"); \ 1799 errno = EBADF; \ 1800 return -1; \ 1801 } \ 1802} while (0) 1803 1804 1805int 1806lsquic_stream_write_file (lsquic_stream_t *stream, const char *filename) 1807{ 1808 int fd, saved_errno; 1809 struct stat st; 1810 1811 COMMON_WRITE_CHECKS(); 1812 1813 fd = open(filename, O_RDONLY); 1814 if (fd < 0) 1815 { 1816 LSQ_WARN("could not open `%s' for reading: %s", filename, strerror(errno)); 1817 return -1; 1818 } 1819 1820 if (fstat(fd, &st) < 0) 1821 { 1822 LSQ_WARN("fstat64(%s) failed: %s", filename, strerror(errno)); 1823 saved_errno = errno; 1824 (void) close(fd); 1825 errno = saved_errno; 1826 return -1; 1827 } 1828 1829 if (0 == st.st_size) 1830 { 1831 LSQ_INFO("Writing zero-sized file `%s' is a no-op", filename); 1832 (void) close(fd); 1833 return 0; 1834 } 1835 1836 LSQ_DEBUG("Inserted `%s' into SBT queue; size: %jd", filename, 1837 (intmax_t) st.st_size); 1838 1839 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1840 stream_sendfile(stream, fd, 0, st.st_size, 1); 1841 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_WRITEFILE); 1842 return 0; 1843} 1844 1845 1846int 1847lsquic_stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, 1848 size_t size) 1849{ 1850 COMMON_WRITE_CHECKS(); 1851 if ((off_t) -1 == lseek(fd, off, SEEK_SET)) 1852 { 1853 LSQ_INFO("lseek failed: %s", strerror(errno)); 1854 return -1; 1855 } 1856 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1857 stream_sendfile(stream, fd, off, size, 0); 1858 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SENDFILE); 1859 return 0; 1860} 1861 1862 1863ssize_t 1864lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1865{ 1866 struct stream_buf_tosend *sbt; 1867 size_t nw, stream_avail, n_flushed; 1868 const unsigned char *p = buf; 1869 1870 COMMON_WRITE_CHECKS(); 1871 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1872 1873 stream_avail = lsquic_stream_write_avail(stream); 1874 if (stream_avail < len) 1875 { 1876 LSQ_DEBUG("cap length from %zd to %zd bytes", len, stream_avail); 1877 len = stream_avail; 1878 } 1879 1880 while (len > 0) 1881 { 1882 sbt = get_sbt_buf(stream); 1883 if (!sbt) 1884 { 1885 LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno)); 1886 break; 1887 } 1888 nw = sbt_write(sbt, p, len); 1889 len -= nw; 1890 p += nw; 1891 } 1892 1893 const size_t n_written = p - (unsigned char *) buf; 1894 LSQ_DEBUG("wrote %"PRIiPTR" bytes to stream %u", n_written, stream->id); 1895 1896 n_flushed = flush_or_check_flags(stream, n_written); 1897 if (n_flushed) 1898 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITE); 1899 1900 return n_written; 1901} 1902 1903 1904ssize_t 1905lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1906 int iovcnt) 1907{ 1908 int i; 1909 const unsigned char *p; 1910 struct stream_buf_tosend *sbt; 1911 size_t nw, stream_avail, len, n_flushed; 1912 ssize_t nw_total; 1913 1914 COMMON_WRITE_CHECKS(); 1915 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1916 1917 nw_total = 0; 1918 stream_avail = lsquic_stream_write_avail(stream); 1919 1920 for (i = 0; i < iovcnt && stream_avail > 0; ++i) 1921 { 1922 len = iov[i].iov_len; 1923 p = iov[i].iov_base; 1924 if (len > stream_avail) 1925 { 1926 LSQ_DEBUG("cap length from %zd to %zd bytes", nw_total + len, 1927 nw_total + stream_avail); 1928 len = stream_avail; 1929 } 1930 nw_total += len; 1931 stream_avail -= len; 1932 while (len > 0) 1933 { 1934 sbt = get_sbt_buf(stream); 1935 if (!sbt) 1936 { 1937 LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno)); 1938 break; 1939 } 1940 nw = sbt_write(sbt, p, len); 1941 len -= nw; 1942 p += nw; 1943 } 1944 } 1945 1946 LSQ_DEBUG("wrote %zd bytes to stream", nw_total); 1947 1948 n_flushed = flush_or_check_flags(stream, nw_total); 1949 if (n_flushed) 1950 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITEV); 1951 1952 return nw_total; 1953} 1954 1955 1956int 1957lsquic_stream_send_headers (lsquic_stream_t *stream, 1958 const lsquic_http_headers_t *headers, int eos) 1959{ 1960 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1961 STREAM_U_WRITE_DONE)) 1962 == STREAM_USE_HEADERS) 1963 { 1964 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1965 stream->id, headers, eos, lsquic_stream_priority(stream)); 1966 if (0 == s) 1967 { 1968 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1969 stream->stream_flags |= STREAM_HEADERS_SENT; 1970 if (eos) 1971 stream->stream_flags |= STREAM_FIN_SENT; 1972 LSQ_INFO("sent headers for stream %u", stream->id); 1973 } 1974 else 1975 LSQ_WARN("could not send headers: %s", strerror(errno)); 1976 return s; 1977 } 1978 else 1979 { 1980 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1981 errno = EBADMSG; 1982 return -1; 1983 } 1984} 1985 1986 1987void 1988lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 1989{ 1990 if (offset > stream->max_send_off) 1991 { 1992 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 1993 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 1994 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 1995 stream->max_send_off = offset; 1996 if (lsquic_stream_tosend_sz(stream)) 1997 { 1998 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1999 { 2000 LSQ_DEBUG("stream %u unblocked, schedule sending again", 2001 stream->id); 2002 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2003 next_send_stream); 2004 } 2005 stream->stream_flags |= STREAM_SEND_DATA; 2006 } 2007 } 2008 else 2009 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 2010 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 2011 stream->max_send_off); 2012} 2013 2014 2015/* This function is used to update offsets after handshake completes and we 2016 * learn of peer's limits from the handshake values. 2017 */ 2018int 2019lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 2020{ 2021 LSQ_DEBUG("setting max_send_off to %u", offset); 2022 if (offset > stream->max_send_off) 2023 { 2024 lsquic_stream_window_update(stream, offset); 2025 return 0; 2026 } 2027 else if (offset < stream->tosend_off) 2028 { 2029 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 2030 "already sent on this stream (%"PRIu64" bytes)", offset, 2031 stream->tosend_off); 2032 return -1; 2033 } 2034 else 2035 { 2036 stream->max_send_off = offset; 2037 return 0; 2038 } 2039} 2040 2041 2042#ifndef NDEBUG 2043/* Use weak linkage so that tests can override this function */ 2044size_t 2045lsquic_stream_tosend_sz (const lsquic_stream_t *stream) 2046 __attribute__((weak)) 2047 ; 2048#endif 2049size_t 2050lsquic_stream_tosend_sz (const lsquic_stream_t *stream) 2051{ 2052 assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off); 2053 return stream->tosend_sz; 2054} 2055 2056 2057#ifndef NDEBUG 2058/* Use weak linkage so that tests can override this function */ 2059size_t 2060lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len, 2061 int *reached_fin) 2062 __attribute__((weak)) 2063 ; 2064#endif 2065size_t 2066lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len, 2067 int *reached_fin) 2068{ 2069 assert(stream->tosend_sz > 0); 2070 assert(stream->stream_flags & STREAM_SEND_DATA); 2071 const size_t tosend_sz = lsquic_stream_tosend_sz(stream); 2072 if (tosend_sz < len) 2073 len = tosend_sz; 2074 struct stream_buf_tosend *sbt; 2075 unsigned char *p = buf; 2076 unsigned char *const end = p + len; 2077 while (p < end && (sbt = TAILQ_FIRST(&stream->bufs_tosend))) 2078 { 2079 size_t nread = sbt_read(sbt, p, len); 2080 p += nread; 2081 len -= nread; 2082 if (sbt_done(sbt)) 2083 { 2084 TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt); 2085 sbt_destroy(stream, sbt); 2086 LSQ_DEBUG("destroyed SBT"); 2087 } 2088 else 2089 break; 2090 } 2091 const size_t n_read = p - (unsigned char *) buf; 2092 decr_tosend_sz(stream, n_read); 2093 stream->tosend_off += n_read; 2094 if (stream->stream_flags & STREAM_CONN_LIMITED) 2095 { 2096 stream->conn_pub->conn_cap.cc_sent += n_read; 2097 assert(stream->conn_pub->conn_cap.cc_sent <= 2098 stream->conn_pub->conn_cap.cc_max); 2099 } 2100 *reached_fin = lsquic_stream_tosend_fin(stream); 2101 return n_read; 2102} 2103 2104 2105void 2106lsquic_stream_stream_frame_sent (lsquic_stream_t *stream) 2107{ 2108 assert(stream->stream_flags & STREAM_SEND_DATA); 2109 SM_HISTORY_APPEND(stream, SHE_FRAME_OUT); 2110 if (0 == lsquic_stream_tosend_sz(stream)) 2111 { 2112 /* Mark the stream as having no sendable data independent of reason 2113 * why there is no data to send. 2114 */ 2115 if (0 == stream->tosend_sz) 2116 { 2117 LSQ_DEBUG("all stream %u data has been scheduled for sending, " 2118 "now at offset 0x%"PRIX64, stream->id, stream->tosend_off); 2119 stream->stream_flags &= ~STREAM_SEND_DATA; 2120 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2121 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 2122 next_send_stream); 2123 2124 if ((stream->stream_flags & STREAM_U_WRITE_DONE) && !writing_file(stream)) 2125 { 2126 stream->stream_flags |= STREAM_FIN_SENT; 2127 maybe_finish_stream(stream); 2128 } 2129 } 2130 else 2131 { 2132 LSQ_DEBUG("stream %u blocked from sending", stream->id); 2133 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2134 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2135 next_send_stream); 2136 stream->stream_flags &= ~STREAM_SEND_DATA; 2137 stream->stream_flags |= STREAM_SEND_BLOCKED; 2138 } 2139 } 2140} 2141 2142 2143#ifndef NDEBUG 2144/* Use weak linkage so that tests can override this function */ 2145uint64_t 2146lsquic_stream_tosend_offset (const lsquic_stream_t *stream) 2147 __attribute__((weak)) 2148 ; 2149#endif 2150uint64_t 2151lsquic_stream_tosend_offset (const lsquic_stream_t *stream) 2152{ 2153 return stream->tosend_off; 2154} 2155 2156 2157static void 2158drop_sbts (lsquic_stream_t *stream) 2159{ 2160 struct stream_buf_tosend *sbt; 2161 2162 while ((sbt = TAILQ_FIRST(&stream->bufs_tosend))) 2163 { 2164 TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt); 2165 sbt_destroy(stream, sbt); 2166 } 2167 2168 decr_tosend_sz(stream, stream->tosend_sz); 2169 stream->tosend_sz = 0; 2170} 2171 2172 2173void 2174lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 2175{ 2176 lsquic_stream_reset_ext(stream, error_code, 1); 2177} 2178 2179 2180void 2181lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 2182 int do_close) 2183{ 2184 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 2185 { 2186 LSQ_INFO("reset already sent"); 2187 return; 2188 } 2189 2190 SM_HISTORY_APPEND(stream, SHE_RESET); 2191 2192 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 2193 stream->error_code = error_code; 2194 2195 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2196 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2197 next_send_stream); 2198 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 2199 stream->stream_flags |= STREAM_SEND_RST; 2200 2201 drop_sbts(stream); 2202 maybe_schedule_call_on_close(stream); 2203 2204 if (do_close) 2205 lsquic_stream_close(stream); 2206 else 2207 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT); 2208} 2209 2210 2211unsigned 2212lsquic_stream_id (const lsquic_stream_t *stream) 2213{ 2214 return stream->id; 2215} 2216 2217 2218struct lsquic_conn * 2219lsquic_stream_conn (const lsquic_stream_t *stream) 2220{ 2221 return stream->conn_pub->lconn; 2222} 2223 2224 2225int 2226lsquic_stream_close (lsquic_stream_t *stream) 2227{ 2228 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 2229 SM_HISTORY_APPEND(stream, SHE_CLOSE); 2230 if (lsquic_stream_is_closed(stream)) 2231 { 2232 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 2233 errno = EBADF; 2234 return -1; 2235 } 2236 stream_shutdown_write(stream); 2237 stream_shutdown_read(stream); 2238 maybe_schedule_call_on_close(stream); 2239 maybe_finish_stream(stream); 2240 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE); 2241 return 0; 2242} 2243 2244 2245void 2246lsquic_stream_acked (lsquic_stream_t *stream) 2247{ 2248 assert(stream->n_unacked); 2249 --stream->n_unacked; 2250 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 2251 if (0 == stream->n_unacked) 2252 maybe_finish_stream(stream); 2253} 2254 2255 2256void 2257lsquic_stream_push_req (lsquic_stream_t *stream, 2258 struct uncompressed_headers *push_req) 2259{ 2260 assert(!stream->push_req); 2261 stream->push_req = push_req; 2262 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 2263} 2264 2265 2266int 2267lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2268{ 2269 return 1 & ~stream->id; 2270} 2271 2272 2273int 2274lsquic_stream_push_info (const lsquic_stream_t *stream, 2275 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 2276{ 2277 if (lsquic_stream_is_pushed(stream)) 2278 { 2279 assert(stream->push_req); 2280 *ref_stream_id = stream->push_req->uh_stream_id; 2281 *headers = stream->push_req->uh_headers; 2282 *headers_sz = stream->push_req->uh_size; 2283 return 0; 2284 } 2285 else 2286 return -1; 2287} 2288 2289 2290int 2291lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2292{ 2293 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2294 { 2295 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2296 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2297 stream->stream_flags |= STREAM_HAVE_UH; 2298 if (uh->uh_flags & UH_FIN) 2299 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2300 stream->uh = uh; 2301 if (uh->uh_oth_stream_id == 0) 2302 { 2303 if (uh->uh_weight) 2304 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2305 } 2306 else 2307 LSQ_NOTICE("don't know how to depend on stream %u", 2308 uh->uh_oth_stream_id); 2309 return 0; 2310 } 2311 else 2312 { 2313 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2314 return -1; 2315 } 2316} 2317 2318 2319unsigned 2320lsquic_stream_priority (const lsquic_stream_t *stream) 2321{ 2322 return 256 - stream->sm_priority; 2323} 2324 2325 2326int 2327lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2328{ 2329 /* The user should never get a reference to the special streams, 2330 * but let's check just in case: 2331 */ 2332 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2333 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2334 LSQUIC_STREAM_HEADERS == stream->id)) 2335 return -1; 2336 if (priority < 1 || priority > 256) 2337 return -1; 2338 stream->sm_priority = 256 - priority; 2339 LSQ_DEBUG("set priority to %u", priority); 2340 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2341 return 0; 2342} 2343 2344 2345int 2346lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2347{ 2348 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2349 { 2350 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2351 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2352 { 2353 /* We need to send headers only if we are a) using HEADERS stream 2354 * and b) we already sent initial headers. If initial headers 2355 * have not been sent yet, stream priority will be sent in the 2356 * HEADERS frame. 2357 */ 2358 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2359 stream->id, 0, 0, priority); 2360 } 2361 else 2362 return 0; 2363 } 2364 else 2365 return -1; 2366} 2367 2368 2369lsquic_stream_ctx_t * 2370lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2371{ 2372 return stream->st_ctx; 2373} 2374 2375 2376int 2377lsquic_stream_refuse_push (lsquic_stream_t *stream) 2378{ 2379 if (lsquic_stream_is_pushed(stream) && 2380 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2381 { 2382 LSQ_DEBUG("refusing pushed stream: send reset"); 2383 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2384 return 0; 2385 } 2386 else 2387 return -1; 2388} 2389