lsquic_stream.c revision 50aadb33
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <sys/types.h> 28#include <sys/stat.h> 29#include <fcntl.h> 30#include <unistd.h> 31#include <stddef.h> 32 33#include "lsquic.h" 34 35#include "lsquic_int_types.h" 36#include "lsquic_packet_common.h" 37#include "lsquic_packet_in.h" 38#include "lsquic_malo.h" 39#include "lsquic_conn_flow.h" 40#include "lsquic_rtt.h" 41#include "lsquic_sfcw.h" 42#include "lsquic_stream.h" 43#include "lsquic_conn_public.h" 44#include "lsquic_util.h" 45#include "lsquic_mm.h" 46#include "lsquic_headers_stream.h" 47#include "lsquic_frame_reader.h" 48#include "lsquic_conn.h" 49#include "lsquic_data_in_if.h" 50#include "lsquic_parse.h" 51#include "lsquic_packet_out.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_senhist.h" 54#include "lsquic_pacer.h" 55#include "lsquic_cubic.h" 56#include "lsquic_send_ctl.h" 57#include "lsquic_ev_log.h" 58 59#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 60#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid 61#define LSQUIC_LOG_STREAM_ID stream->id 62#include "lsquic_logger.h" 63 64enum sbt_type { 65 SBT_BUF, 66 SBT_FILE, 67}; 68 69struct stream_buf_tosend 70{ 71 TAILQ_ENTRY(stream_buf_tosend) next_sbt; 72 enum sbt_type sbt_type; 73 /* On 64-bit platform, here is a four-byte hole */ 74 union { 75 struct { 76 size_t sbt_sz; 77 size_t sbt_off; 78 unsigned char sbt_data[ 79 0x1000 - sizeof(enum sbt_type) - sizeof(long) + 4 - sizeof(size_t) * 2 80 - sizeof(TAILQ_ENTRY(stream_buf_tosend)) 81 ]; 82 } buf; 83 struct { 84 struct lsquic_stream *sbt_stream; 85 off_t sbt_sz; 86 off_t sbt_off; 87 int sbt_fd; 88 signed char sbt_last; 89 } file; 90 } u; 91}; 92 93typedef char _sbt_is_4K[(sizeof(struct stream_buf_tosend) == 0x1000) - 1]; 94 95static size_t 96sum_sbts (const lsquic_stream_t *stream); 97 98static void 99drop_sbts (lsquic_stream_t *stream); 100 101static void 102drop_frames_in (lsquic_stream_t *stream); 103 104static void 105maybe_schedule_call_on_close (lsquic_stream_t *stream); 106 107static void 108stream_file_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *); 109 110static void 111stream_flush_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *); 112 113static int 114stream_wantread (lsquic_stream_t *stream, int is_want); 115 116static int 117stream_wantwrite (lsquic_stream_t *stream, int is_want); 118 119static void 120stop_reading_from_file (lsquic_stream_t *stream); 121 122static int 123stream_readable (const lsquic_stream_t *stream); 124 125static int 126stream_writeable (const lsquic_stream_t *stream); 127 128static size_t 129stream_flush_internal (lsquic_stream_t *stream, size_t size); 130 131static void 132incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr); 133 134static void 135maybe_put_on_sending_streams (lsquic_stream_t *stream); 136 137 138#if LSQUIC_KEEP_STREAM_HISTORY 139/* These values are printable ASCII characters for ease of printing the 140 * whole history in a single line of a log message. 141 * 142 * The list of events is not exhaustive: only most interesting events 143 * are recorded. 144 */ 145enum stream_history_event 146{ 147 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 148 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 149 SHE_REACH_FIN = 'a', 150 SHE_BLOCKED_OUT = 'b', 151 SHE_CREATED = 'C', 152 SHE_FRAME_IN = 'd', 153 SHE_FRAME_OUT = 'D', 154 SHE_RESET = 'e', 155 SHE_WINDOW_UPDATE = 'E', 156 SHE_FIN_IN = 'f', 157 SHE_FINISHED = 'F', 158 SHE_GOAWAY_IN = 'g', 159 SHE_USER_WRITE_HEADER = 'h', 160 SHE_HEADERS_IN = 'H', 161 SHE_ONCLOSE_SCHED = 'l', 162 SHE_ONCLOSE_CALL = 'L', 163 SHE_ONNEW = 'N', 164 SHE_SET_PRIO = 'p', 165 SHE_USER_READ = 'r', 166 SHE_SHUTDOWN_READ = 'R', 167 SHE_RST_IN = 's', 168 SHE_RST_OUT = 't', 169 SHE_FLUSH = 'u', 170 SHE_USER_WRITE_DATA = 'w', 171 SHE_SHUTDOWN_WRITE = 'W', 172 SHE_CLOSE = 'X', 173 SHE_FORCE_FINISH = 'Z', 174}; 175 176static void 177sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 178{ 179 enum stream_history_event prev_event; 180 sm_hist_idx_t idx; 181 int plus; 182 183 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 184 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 185 idx = (idx - plus) & SM_HIST_IDX_MASK; 186 prev_event = stream->sm_hist_buf[idx]; 187 188 if (prev_event == sh_event && plus) 189 return; 190 191 if (prev_event == sh_event) 192 sh_event = SHE_PLUS; 193 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 194 195 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 196 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 197 stream->sm_hist_buf); 198} 199 200 201# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 202# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 203 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 204 LSQ_DEBUG("history: [%.*s]", \ 205 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 206 (stream)->sm_hist_buf); \ 207 } while (0) 208#else 209# define SM_HISTORY_APPEND(stream, event) 210# define SM_HISTORY_DUMP_REMAINING(stream) 211#endif 212 213 214/* Here, "readable" means that the user is able to read from the stream. */ 215static void 216maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream, 217 enum rw_reason reason) 218{ 219 if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) && 220 stream_readable(stream)) 221 { 222 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 223 stream->conn_pub->lconn, reason); 224 } 225} 226 227 228/* Here, "writeable" means that data can be put into packets to be 229 * scheduled to be sent out. 230 */ 231static void 232maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream, 233 enum rw_reason reason) 234{ 235 if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) && 236 lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) && 237 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 238 { 239 lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, 240 stream->conn_pub->lconn, reason); 241 } 242} 243 244 245static int 246stream_stalled (const lsquic_stream_t *stream) 247{ 248 return 0 == (stream->stream_flags & STREAM_RW_EVENT_FLAGS) && 249 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 250 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 251} 252 253 254static void 255use_user_on_write (lsquic_stream_t *stream) 256{ 257 LSQ_DEBUG("stream %u: use user-supplied on-write callback", stream->id); 258 stream->on_write_cb = stream->stream_if->on_write; 259} 260 261 262static void 263use_internal_on_write_file (lsquic_stream_t *stream) 264{ 265 LSQ_DEBUG("use internal on-write callback (file)"); 266 stream->on_write_cb = stream_file_on_write; 267} 268 269 270static void 271use_internal_on_write_flush (lsquic_stream_t *stream) 272{ 273 LSQ_DEBUG("use internal on-write callback (flush)"); 274 stream->on_write_cb = stream_flush_on_write; 275} 276 277 278#define writing_file(stream) ((stream)->file_fd >= 0) 279 280 281/* TODO: The logic to figure out whether the stream is connection limited 282 * should be taken out of the constructor. The caller should specify this 283 * via one of enum stream_ctor_flags. 284 */ 285lsquic_stream_t * 286lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub, 287 const struct lsquic_stream_if *stream_if, 288 void *stream_if_ctx, unsigned initial_window, 289 unsigned initial_send_off, 290 enum stream_ctor_flags ctor_flags) 291{ 292 lsquic_cfcw_t *cfcw; 293 lsquic_stream_t *stream; 294 295 stream = calloc(1, sizeof(*stream)); 296 if (!stream) 297 return NULL; 298 299 stream->stream_if = stream_if; 300 stream->id = id; 301 stream->file_fd = -1; 302 stream->conn_pub = conn_pub; 303 if (!initial_window) 304 initial_window = 16 * 1024; 305 if (LSQUIC_STREAM_HANDSHAKE == id || 306 (conn_pub->hs && LSQUIC_STREAM_HEADERS == id)) 307 cfcw = NULL; 308 else 309 { 310 cfcw = &conn_pub->cfcw; 311 stream->stream_flags |= STREAM_CONN_LIMITED; 312 if (conn_pub->hs) 313 stream->stream_flags |= STREAM_USE_HEADERS; 314 lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO); 315 } 316 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 317 if (!initial_send_off) 318 initial_send_off = 16 * 1024; 319 stream->max_send_off = initial_send_off; 320 TAILQ_INIT(&stream->bufs_tosend); 321 if (ctor_flags & SCF_USE_DI_HASH) 322 stream->data_in = data_in_hash_new(conn_pub, id, 0); 323 else 324 stream->data_in = data_in_nocopy_new(conn_pub, id); 325 LSQ_DEBUG("created stream %u", id); 326 SM_HISTORY_APPEND(stream, SHE_CREATED); 327 if (ctor_flags & SCF_DI_AUTOSWITCH) 328 stream->stream_flags |= STREAM_AUTOSWITCH; 329 if (ctor_flags & SCF_CALL_ON_NEW) 330 lsquic_stream_call_on_new(stream, stream_if_ctx); 331 if (ctor_flags & SCF_DISP_RW_ONCE) 332 stream->stream_flags |= STREAM_RW_ONCE; 333 use_user_on_write(stream); 334 return stream; 335} 336 337 338void 339lsquic_stream_call_on_new (lsquic_stream_t *stream, void *stream_if_ctx) 340{ 341 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 342 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 343 { 344 LSQ_DEBUG("calling on_new_stream"); 345 SM_HISTORY_APPEND(stream, SHE_ONNEW); 346 stream->stream_flags |= STREAM_ONNEW_DONE; 347 stream->st_ctx = stream->stream_if->on_new_stream(stream_if_ctx, stream); 348 } 349} 350 351 352void 353lsquic_stream_destroy (lsquic_stream_t *stream) 354{ 355 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 356 STREAM_ONNEW_DONE) 357 { 358 stream->stream_flags |= STREAM_ONCLOSE_DONE; 359 stream->stream_if->on_close(stream, stream->st_ctx); 360 } 361 if (stream->file_fd >= 0) 362 (void) close(stream->file_fd); 363 if (stream->stream_flags & STREAM_SENDING_FLAGS) 364 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 365 if (stream->stream_flags & STREAM_RW_EVENT_FLAGS) 366 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 367 if (stream->stream_flags & STREAM_SERVICE_FLAGS) 368 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 369 lsquic_sfcw_consume_rem(&stream->fc); 370 drop_frames_in(stream); 371 drop_sbts(stream); 372 free(stream->push_req); 373 free(stream->uh); 374 LSQ_DEBUG("destroyed stream %u", stream->id); 375 SM_HISTORY_DUMP_REMAINING(stream); 376 free(stream); 377} 378 379 380static int 381stream_is_finished (const lsquic_stream_t *stream) 382{ 383 return lsquic_stream_is_closed(stream) 384 /* n_unacked checks that no outgoing packets that reference this 385 * stream are outstanding: 386 */ 387 && 0 == stream->n_unacked 388 /* This checks that no packets that reference this stream will 389 * become outstanding: 390 */ 391 && 0 == (stream->stream_flags & (STREAM_SEND_DATA|STREAM_SEND_RST)) 392 && ((stream->stream_flags & STREAM_FORCE_FINISH) 393 || (((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)) 394 || lsquic_stream_is_pushed(stream)) 395 && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))); 396} 397 398 399static void 400maybe_finish_stream (lsquic_stream_t *stream) 401{ 402 if (0 == (stream->stream_flags & STREAM_FINISHED) && 403 stream_is_finished(stream)) 404 { 405 LSQ_DEBUG("stream %u is now finished", stream->id); 406 SM_HISTORY_APPEND(stream, SHE_FINISHED); 407 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 408 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 409 next_service_stream); 410 stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED; 411 } 412} 413 414 415static void 416maybe_schedule_call_on_close (lsquic_stream_t *stream) 417{ 418 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 419 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE)) 420 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)) 421 { 422 if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS)) 423 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 424 next_service_stream); 425 stream->stream_flags |= STREAM_CALL_ONCLOSE; 426 LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id); 427 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 428 } 429} 430 431 432void 433lsquic_stream_call_on_close (lsquic_stream_t *stream) 434{ 435 assert(stream->stream_flags & STREAM_ONNEW_DONE); 436 stream->stream_flags &= ~STREAM_CALL_ONCLOSE; 437 if (!(stream->stream_flags & STREAM_SERVICE_FLAGS)) 438 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 439 next_service_stream); 440 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 441 { 442 LSQ_DEBUG("calling on_close for stream %u", stream->id); 443 stream->stream_flags |= STREAM_ONCLOSE_DONE; 444 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 445 stream->stream_if->on_close(stream, stream->st_ctx); 446 } 447 else 448 assert(0); 449} 450 451 452static int 453stream_readable (const lsquic_stream_t *stream) 454{ 455 /* A stream is readable if one of the following is true: */ 456 return 457 /* - It is already finished: in that case, lsquic_stream_read() will 458 * return 0. 459 */ 460 (stream->stream_flags & STREAM_FIN_REACHED) 461 /* - The stream is reset, by either side. In this case, 462 * lsquic_stream_read() will return -1 (we want the user to be 463 * able to collect the error). 464 */ 465 || (stream->stream_flags & STREAM_RST_FLAGS) 466 /* - Either we are not in HTTP mode or the HTTP headers have been 467 * received and the headers or data from the stream can be read. 468 */ 469 || (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) 470 == STREAM_USE_HEADERS) 471 && (stream->uh != NULL 472 || stream->data_in->di_if->di_get_frame(stream->data_in, 473 stream->read_offset))) 474 ; 475} 476 477 478size_t 479lsquic_stream_write_avail (const lsquic_stream_t *stream) 480{ 481 uint64_t stream_avail, conn_avail; 482 size_t unflushed_sum; 483 484 assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off); 485 stream_avail = stream->max_send_off - stream->tosend_off 486 - stream->tosend_sz; 487 if (0 == stream->tosend_sz) 488 { 489 unflushed_sum = sum_sbts(stream); 490 if (unflushed_sum > stream_avail) 491 stream_avail = 0; 492 else 493 stream_avail -= unflushed_sum; 494 } 495 496 if (stream->stream_flags & STREAM_CONN_LIMITED) 497 { 498 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 499 if (conn_avail < stream_avail) 500 { 501 LSQ_DEBUG("stream %u write buffer is limited by connection: %jd", 502 stream->id, conn_avail); 503 return conn_avail; 504 } 505 } 506 507 LSQ_DEBUG("stream %u write buffer is limited by stream: %jd", 508 stream->id, stream_avail); 509 return stream_avail; 510} 511 512 513static int 514stream_writeable (const lsquic_stream_t *stream) 515{ 516 /* A stream is writeable if one of the following is true: */ 517 return 518 /* - The stream is reset, by either side. In this case, 519 * lsquic_stream_write() will return -1 (we want the user to be 520 * able to collect the error). 521 */ 522 (stream->stream_flags & STREAM_RST_FLAGS) 523 /* - There is room to write to the stream. 524 */ 525 || lsquic_stream_write_avail(stream) > 0 526 ; 527} 528 529 530int 531lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 532{ 533 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 534 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 535 { 536 return -1; 537 } 538 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 539 { 540 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 541 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 542 next_send_stream); 543 stream->stream_flags |= STREAM_SEND_WUF; 544 } 545 return 0; 546} 547 548 549int 550lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 551{ 552 uint64_t max_off; 553 int got_next_offset; 554 enum ins_frame ins_frame; 555 556 assert(frame->packet_in); 557 558 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 559 LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; " 560 "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 561 562 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) == 563 (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) 564 { 565 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 566 lsquic_malo_put(frame); 567 return -1; 568 } 569 570 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 571 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 572 if (INS_FRAME_OK == ins_frame) 573 { 574 /* Update maximum offset in the flow controller and check for flow 575 * control violation: 576 */ 577 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 578 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 579 return -1; 580 if ((stream->stream_flags & STREAM_U_READ_DONE)) 581 lsquic_stream_reset_ext(stream, 1, 0); 582 if (frame->data_frame.df_fin) 583 { 584 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 585 stream->stream_flags |= STREAM_FIN_RECVD; 586 maybe_finish_stream(stream); 587 } 588 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 589 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 590 { 591 stream->data_in = stream->data_in->di_if->di_switch_impl( 592 stream->data_in, stream->read_offset); 593 if (!stream->data_in) 594 { 595 stream->data_in = data_in_error_new(); 596 return -1; 597 } 598 } 599 if (got_next_offset) 600 /* Checking the offset saves di_get_frame() call */ 601 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN); 602 return 0; 603 } 604 else if (INS_FRAME_DUP == ins_frame) 605 { 606 return 0; 607 } 608 else 609 { 610 assert(INS_FRAME_ERR == ins_frame); 611 return -1; 612 } 613} 614 615 616static void 617drop_frames_in (lsquic_stream_t *stream) 618{ 619 if (stream->data_in) 620 { 621 stream->data_in->di_if->di_destroy(stream->data_in); 622 /* To avoid checking whether `data_in` is set, just set to the error 623 * data-in stream. It does the right thing after incoming data is 624 * dropped. 625 */ 626 stream->data_in = data_in_error_new(); 627 } 628} 629 630 631int 632lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 633 uint32_t error_code) 634{ 635 636 if (stream->stream_flags & STREAM_RST_RECVD) 637 { 638 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 639 return 0; 640 } 641 642 SM_HISTORY_APPEND(stream, SHE_RST_IN); 643 /* This flag must always be set, even if we are "ignoring" it: it is 644 * used by elision code. 645 */ 646 stream->stream_flags |= STREAM_RST_RECVD; 647 648 if ((stream->stream_flags & STREAM_FIN_RECVD) && 649 /* Pushed streams have fake STREAM_FIN_RECVD set, thus 650 * we need a special check: 651 */ 652 !lsquic_stream_is_pushed(stream)) 653 { 654 LSQ_DEBUG("ignore RST_STREAM frame after FIN is received"); 655 return 0; 656 } 657 658 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 659 { 660 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is " 661 "smaller than that of byte following the last byte we have seen: " 662 "0x%"PRIX64, stream->id, offset, 663 lsquic_sfcw_get_max_recv_off(&stream->fc)); 664 return -1; 665 } 666 667 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 668 { 669 LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64 670 " violates flow control", stream->id, offset); 671 return -1; 672 } 673 674 /* Let user collect error: */ 675 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN); 676 677 lsquic_sfcw_consume_rem(&stream->fc); 678 drop_frames_in(stream); 679 680 if (!(stream->stream_flags & 681 (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT))) 682 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 683 684 stream->stream_flags |= STREAM_RST_RECVD; 685 686 maybe_finish_stream(stream); 687 maybe_schedule_call_on_close(stream); 688 689 return 0; 690} 691 692 693uint64_t 694lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 695{ 696 assert(stream->stream_flags & STREAM_SEND_WUF); 697 stream->stream_flags &= ~STREAM_SEND_WUF; 698 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 699 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 700 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 701} 702 703 704void 705lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 706{ 707 assert(stream->stream_flags & STREAM_SEND_BLOCKED); 708 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 709 stream->stream_flags &= ~STREAM_SEND_BLOCKED; 710 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 711 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 712} 713 714 715void 716lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 717{ 718 assert(stream->stream_flags & STREAM_SEND_RST); 719 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 720 stream->stream_flags &= ~STREAM_SEND_RST; 721 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 722 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 723 stream->stream_flags |= STREAM_RST_SENT; 724 maybe_finish_stream(stream); 725} 726 727 728static size_t 729read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len) 730{ 731 struct uncompressed_headers *uh = stream->uh; 732 size_t n_avail = uh->uh_size - uh->uh_off; 733 if (n_avail < len) 734 len = n_avail; 735 memcpy(dst, uh->uh_headers + uh->uh_off, len); 736 uh->uh_off += len; 737 if (uh->uh_off == uh->uh_size) 738 { 739 LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id); 740 free(uh); 741 stream->uh = NULL; 742 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 743 { 744 stream->stream_flags |= STREAM_FIN_REACHED; 745 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 746 } 747 } 748 return len; 749} 750 751 752/* This function returns 0 when EOF is reached. 753 */ 754ssize_t 755lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, 756 int iovcnt) 757{ 758 size_t total_nread, nread; 759 int processed_frames, read_unc_headers, iovidx; 760 unsigned char *p, *end; 761 762 SM_HISTORY_APPEND(stream, SHE_USER_READ); 763 764#define NEXT_IOV() do { \ 765 ++iovidx; \ 766 while (iovidx < iovcnt && 0 == iov[iovidx].iov_len) \ 767 ++iovidx; \ 768 if (iovidx < iovcnt) \ 769 { \ 770 p = iov[iovidx].iov_base; \ 771 end = p + iov[iovidx].iov_len; \ 772 } \ 773 else \ 774 p = end = NULL; \ 775} while (0) 776 777#define AVAIL() (end - p) 778 779 if (stream->stream_flags & STREAM_RST_FLAGS) 780 { 781 errno = ECONNRESET; 782 return -1; 783 } 784 if (stream->stream_flags & STREAM_U_READ_DONE) 785 { 786 errno = EBADF; 787 return -1; 788 } 789 if (stream->stream_flags & STREAM_FIN_REACHED) 790 return 0; 791 792 total_nread = 0; 793 processed_frames = 0; 794 795 iovidx = -1; 796 NEXT_IOV(); 797 798 if (stream->uh && AVAIL()) 799 { 800 read_unc_headers = 1; 801 do 802 { 803 nread = read_uh(stream, p, AVAIL()); 804 p += nread; 805 total_nread += nread; 806 if (p == end) 807 NEXT_IOV(); 808 } 809 while (stream->uh && AVAIL()); 810 } 811 else 812 read_unc_headers = 0; 813 814 struct data_frame *data_frame; 815 while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset))) 816 { 817 ++processed_frames; 818 size_t navail = data_frame->df_size - data_frame->df_read_off; 819 size_t ntowrite = AVAIL(); 820 if (navail < ntowrite) 821 ntowrite = navail; 822 memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite); 823 p += ntowrite; 824 data_frame->df_read_off += ntowrite; 825 stream->read_offset += ntowrite; 826 total_nread += ntowrite; 827 if (data_frame->df_read_off == data_frame->df_size) 828 { 829 const int fin = data_frame->df_fin; 830 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 831 if ((stream->stream_flags & STREAM_AUTOSWITCH) && 832 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 833 { 834 stream->data_in = stream->data_in->di_if->di_switch_impl( 835 stream->data_in, stream->read_offset); 836 if (!stream->data_in) 837 { 838 stream->data_in = data_in_error_new(); 839 return -1; 840 } 841 } 842 if (fin) 843 { 844 stream->stream_flags |= STREAM_FIN_REACHED; 845 break; 846 } 847 } 848 if (p == end) 849 NEXT_IOV(); 850 } 851 852 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 853 total_nread, stream->read_offset); 854 855 if (processed_frames) 856 { 857 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 858 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 859 { 860 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 861 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 862 stream->stream_flags |= STREAM_SEND_WUF; 863 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ); 864 } 865 } 866 867 if (processed_frames || read_unc_headers) 868 { 869 return total_nread; 870 } 871 else 872 { 873 assert(0 == total_nread); 874 errno = EWOULDBLOCK; 875 return -1; 876 } 877} 878 879 880ssize_t 881lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 882{ 883 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 884 return lsquic_stream_readv(stream, &iov, 1); 885} 886 887 888#ifndef NDEBUG 889/* Use weak linkage so that tests can override this function */ 890int 891lsquic_stream_tosend_fin (const lsquic_stream_t *stream) 892 __attribute__((weak)) 893 ; 894#endif 895int 896lsquic_stream_tosend_fin (const lsquic_stream_t *stream) 897{ 898 return (stream->stream_flags & STREAM_U_WRITE_DONE) 899 && !writing_file(stream) 900 && 0 == stream->tosend_sz 901 && 0 == sum_sbts(stream); 902} 903 904 905static int 906readable_data_frame_remains (lsquic_stream_t *stream) 907{ 908 return !stream->data_in->di_if->di_empty(stream->data_in); 909} 910 911 912static void 913stream_shutdown_read (lsquic_stream_t *stream) 914{ 915 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 916 { 917 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 918 if (stream->uh || readable_data_frame_remains(stream)) 919 { 920 LSQ_INFO("read shut down, but there is still data to be read"); 921 lsquic_stream_reset_ext(stream, 1, 1); 922 } 923 stream->stream_flags |= STREAM_U_READ_DONE; 924 stream_wantread(stream, 0); 925 maybe_finish_stream(stream); 926 } 927} 928 929 930static void 931stream_shutdown_write (lsquic_stream_t *stream) 932{ 933 int flushed_all, data_send_ok; 934 935 if (stream->stream_flags & STREAM_U_WRITE_DONE) 936 return; 937 938 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 939 stream->stream_flags |= STREAM_U_WRITE_DONE; 940 941 data_send_ok = !(stream->stream_flags & 942 (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)); 943 if (!data_send_ok) 944 { 945 stream_wantwrite(stream, 0); 946 return; 947 } 948 949 lsquic_stream_flush(stream); 950 flushed_all = stream->tosend_sz == sum_sbts(stream); 951 if (flushed_all) 952 stream_wantwrite(stream, 0); 953 else 954 { 955 stream_wantwrite(stream, 1); 956 use_internal_on_write_flush(stream); 957 } 958 959 if (flushed_all || stream->tosend_sz > 0) 960 { 961 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 962 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 963 stream->stream_flags |= STREAM_SEND_DATA; 964 } 965} 966 967 968int 969lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 970{ 971 LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how); 972 if (lsquic_stream_is_closed(stream)) 973 { 974 LSQ_INFO("Attempt to shut down a closed stream %u", stream->id); 975 errno = EBADF; 976 return -1; 977 } 978 /* 0: read, 1: write: 2: read and write 979 */ 980 if (how < 0 || how > 2) 981 { 982 errno = EINVAL; 983 return -1; 984 } 985 986 if (how) 987 stream_shutdown_write(stream); 988 if (how != 1) 989 stream_shutdown_read(stream); 990 991 maybe_finish_stream(stream); 992 maybe_schedule_call_on_close(stream); 993 if (how) 994 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN); 995 996 return 0; 997} 998 999 1000void 1001lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1002{ 1003 LSQ_DEBUG("internal shutdown of stream %u", stream->id); 1004 if (LSQUIC_STREAM_HANDSHAKE == stream->id 1005 || ((stream->stream_flags & STREAM_USE_HEADERS) && 1006 LSQUIC_STREAM_HEADERS == stream->id)) 1007 { 1008 LSQ_DEBUG("add flag to force-finish special stream %u", stream->id); 1009 stream->stream_flags |= STREAM_FORCE_FINISH; 1010 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1011 } 1012 maybe_finish_stream(stream); 1013 maybe_schedule_call_on_close(stream); 1014} 1015 1016 1017static void 1018fake_reset_unused_stream (lsquic_stream_t *stream) 1019{ 1020 stream->stream_flags |= 1021 STREAM_RST_RECVD /* User will pick this up on read or write */ 1022 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1023 ; 1024 1025 /* Cancel all writes to the network scheduled for this stream: */ 1026 if (stream->stream_flags & STREAM_SENDING_FLAGS) 1027 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1028 next_send_stream); 1029 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 1030 1031 if (writing_file(stream)) 1032 stop_reading_from_file(stream); 1033 1034 LSQ_DEBUG("fake-reset stream %u%s", 1035 stream->id, stream_stalled(stream) ? " (stalled)" : ""); 1036 maybe_finish_stream(stream); 1037 maybe_schedule_call_on_close(stream); 1038} 1039 1040 1041/* This function should only be called for locally-initiated streams whose ID 1042 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1043 * frame sent by peer but we have not yet received it and created a stream. 1044 * In this situation, we mark the stream as reset, so that user's on_read or 1045 * on_write event callback picks up the error. That, in turn, should result 1046 * in stream being closed. 1047 * 1048 * If we have received any data frames on this stream, this probably indicates 1049 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1050 * lower than this. However, we still try to handle it gracefully and peform 1051 * a shutdown, as if the stream was not reset. 1052 */ 1053void 1054lsquic_stream_received_goaway (lsquic_stream_t *stream) 1055{ 1056 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1057 if (0 == stream->read_offset && 1058 stream->data_in->di_if->di_empty(stream->data_in)) 1059 fake_reset_unused_stream(stream); /* Normal condition */ 1060 else 1061 { /* This is odd, let's handle it the best we can: */ 1062 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1063 lsquic_stream_shutdown_internal(stream); 1064 } 1065} 1066 1067 1068uint64_t 1069lsquic_stream_read_offset (const lsquic_stream_t *stream) 1070{ 1071 return stream->read_offset; 1072} 1073 1074 1075static int 1076stream_want_read_or_write (lsquic_stream_t *stream, int is_want, int flag) 1077{ 1078 const int old_val = !!(stream->stream_flags & flag); 1079 if (old_val != is_want) 1080 { 1081 if (is_want) 1082 { 1083 if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS)) 1084 TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1085 stream->stream_flags |= flag; 1086 } 1087 else 1088 { 1089 stream->stream_flags &= ~flag; 1090 if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS)) 1091 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1092 } 1093 } 1094 return old_val; 1095} 1096 1097 1098static int 1099stream_wantread (lsquic_stream_t *stream, int is_want) 1100{ 1101 return stream_want_read_or_write(stream, is_want, STREAM_WANT_READ); 1102} 1103 1104 1105int 1106lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1107{ 1108 if (0 == (stream->stream_flags & STREAM_U_READ_DONE)) 1109 { 1110 if (is_want) 1111 maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD); 1112 return stream_wantread(stream, is_want); 1113 } 1114 else 1115 { 1116 errno = EBADF; 1117 return -1; 1118 } 1119} 1120 1121 1122static int 1123stream_wantwrite (lsquic_stream_t *stream, int is_want) 1124{ 1125 if (writing_file(stream)) 1126 { 1127 int old_val = stream->stream_flags & STREAM_SAVED_WANTWR; 1128 stream->stream_flags |= old_val & (0 - !!is_want); 1129 return !!old_val; 1130 } 1131 else 1132 return stream_want_read_or_write(stream, is_want, STREAM_WANT_WRITE); 1133} 1134 1135 1136int 1137lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1138{ 1139 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)) 1140 { 1141 return stream_wantwrite(stream, is_want); 1142 } 1143 else 1144 { 1145 errno = EBADF; 1146 return -1; 1147 } 1148} 1149 1150 1151static void 1152stream_flush_on_write (lsquic_stream_t *stream, 1153 struct lsquic_stream_ctx *stream_ctx) 1154{ 1155 size_t sum, n_flushed; 1156 1157 assert(stream->stream_flags & STREAM_U_WRITE_DONE); 1158 1159 sum = sum_sbts(stream) - stream->tosend_sz; 1160 if (sum == 0) 1161 { /* This can occur if the stream has been written to and closed by 1162 * the user, but a RST_STREAM comes in that drops all data. 1163 */ 1164 LSQ_DEBUG("%s: no more data to send", __func__); 1165 stream_wantwrite(stream, 0); 1166 return; 1167 } 1168 1169 n_flushed = stream_flush_internal(stream, sum); 1170 if (n_flushed == sum) 1171 { 1172 LSQ_DEBUG("Flushed all remaining data (%zd bytes)", n_flushed); 1173 stream_wantwrite(stream, 0); 1174 } 1175 else 1176 LSQ_DEBUG("Flushed %zd out of %zd remaining data", n_flushed, sum); 1177} 1178 1179 1180#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE| \ 1181 STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST) 1182 1183 1184static void 1185stream_dispatch_rw_events_loop (lsquic_stream_t *stream, int *processed) 1186{ 1187 unsigned no_progress_count, no_progress_limit; 1188 enum stream_flags flags; 1189 uint64_t size; 1190 size_t sbt_size; 1191 1192 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1193 *processed = 0; 1194 1195 no_progress_count = 0; 1196 while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1197 { 1198 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1199 size = stream->read_offset; 1200 1201 stream->stream_if->on_read(stream, stream->st_ctx); 1202 *processed = 1; 1203 1204 if (no_progress_limit && size == stream->read_offset && 1205 flags == (stream->stream_flags & USER_PROGRESS_FLAGS)) 1206 { 1207 ++no_progress_count; 1208 if (no_progress_count >= no_progress_limit) 1209 { 1210 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1211 "progress) in user code reading from stream", 1212 no_progress_count, 1213 no_progress_count == 1 ? "" : "s"); 1214 break; 1215 } 1216 } 1217 else 1218 no_progress_count = 0; 1219 } 1220 1221 no_progress_count = 0; 1222 while ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream)) 1223 { 1224 flags = stream->stream_flags & USER_PROGRESS_FLAGS; 1225 size = stream->tosend_sz; 1226 if (0 == size) 1227 sbt_size = sum_sbts(stream); 1228 1229 stream->on_write_cb(stream, stream->st_ctx); 1230 *processed = 1; 1231 1232 if (no_progress_limit && 1233 flags == (stream->stream_flags & USER_PROGRESS_FLAGS) && 1234 (0 == size ? sbt_size == sum_sbts(stream) : 1235 size == stream->tosend_sz)) 1236 { 1237 ++no_progress_count; 1238 if (no_progress_count >= no_progress_limit) 1239 { 1240 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1241 "progress) in user code writing to stream", 1242 no_progress_count, 1243 no_progress_count == 1 ? "" : "s"); 1244 break; 1245 } 1246 } 1247 else 1248 no_progress_count = 0; 1249 } 1250} 1251 1252 1253static void 1254stream_dispatch_rw_events_once (lsquic_stream_t *stream, int *processed) 1255{ 1256 *processed = 0; 1257 1258 if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) 1259 { 1260 stream->stream_if->on_read(stream, stream->st_ctx); 1261 *processed = 1; 1262 } 1263 1264 if ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream)) 1265 { 1266 stream->on_write_cb(stream, stream->st_ctx); 1267 *processed = 1; 1268 } 1269} 1270 1271 1272static void 1273maybe_mark_as_blocked (lsquic_stream_t *stream) 1274{ 1275 uint64_t off, stream_data_sz; 1276 1277 if (stream->tosend_sz) 1278 stream_data_sz = stream->tosend_sz; 1279 else 1280 stream_data_sz = sum_sbts(stream); 1281 1282 off = stream->tosend_off + stream_data_sz; 1283 if (off >= stream->max_send_off) 1284 { 1285 assert(off == stream->max_send_off); 1286 if (stream->blocked_off < stream->max_send_off) 1287 { 1288 stream->blocked_off = stream->max_send_off; 1289 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1290 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1291 next_send_stream); 1292 stream->stream_flags |= STREAM_SEND_BLOCKED; 1293 LSQ_DEBUG("marked stream-blocked at stream offset " 1294 "%"PRIu64, stream->blocked_off); 1295 return; 1296 } 1297 } 1298 1299 if (stream->stream_flags & STREAM_CONN_LIMITED) 1300 { 1301 struct lsquic_conn_cap *const cc = &stream->conn_pub->conn_cap; 1302 off = cc->cc_sent + cc->cc_tosend; 1303 if (off >= cc->cc_max) 1304 { 1305 assert(off == cc->cc_max); 1306 if (cc->cc_blocked < cc->cc_max) 1307 { 1308 cc->cc_blocked = cc->cc_max; 1309 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1310 LSQ_DEBUG("marked connection-blocked at connection offset " 1311 "%"PRIu64, cc->cc_max); 1312 } 1313 } 1314 } 1315} 1316 1317 1318void 1319lsquic_stream_dispatch_rw_events (lsquic_stream_t *stream) 1320{ 1321 int processed; 1322 uint64_t tosend_off; 1323 1324 assert(stream->stream_flags & STREAM_RW_EVENT_FLAGS); 1325 tosend_off = stream->tosend_off; 1326 1327 if (stream->stream_flags & STREAM_RW_ONCE) 1328 stream_dispatch_rw_events_once(stream, &processed); 1329 else 1330 stream_dispatch_rw_events_loop(stream, &processed); 1331 1332 /* User wants to write, but no progress has been made: either stream 1333 * or connection is blocked. 1334 */ 1335 if ((stream->stream_flags & STREAM_WANT_WRITE) && 1336 stream->tosend_off == tosend_off && 1337 (stream->tosend_sz == 0 && sum_sbts(stream) == 0)) 1338 maybe_mark_as_blocked(stream); 1339 1340 if (stream->stream_flags & STREAM_RW_EVENT_FLAGS) 1341 { 1342 if (processed) 1343 { /* Move the stream to the end of the list to ensure fairness. */ 1344 TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1345 TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream); 1346 } 1347 } 1348 else if (((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 1349 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE)) 1350 LSQ_DEBUG("stream %u stalled", stream->id); 1351} 1352 1353 1354static struct stream_buf_tosend * 1355get_sbt_buf (lsquic_stream_t *stream) 1356{ 1357 struct stream_buf_tosend *sbt; 1358 1359 sbt = TAILQ_LAST(&stream->bufs_tosend, sbts_tailq); 1360 if (!(sbt && SBT_BUF == sbt->sbt_type && (sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz) > 0)) 1361 { 1362 sbt = lsquic_mm_get_4k(stream->conn_pub->mm); 1363 if (!sbt) 1364 return NULL; 1365 sbt->sbt_type = SBT_BUF; 1366 sbt->u.buf.sbt_sz = 0; 1367 sbt->u.buf.sbt_off = 0; 1368 TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt); 1369 } 1370 1371 return sbt; 1372} 1373 1374 1375static size_t 1376sbt_write (struct stream_buf_tosend *sbt, const void *buf, size_t len) 1377{ 1378 assert(SBT_BUF == sbt->sbt_type); 1379 size_t ntowrite = sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz; 1380 if (len < ntowrite) 1381 ntowrite = len; 1382 memcpy(sbt->u.buf.sbt_data + sbt->u.buf.sbt_sz, buf, ntowrite); 1383 sbt->u.buf.sbt_sz += ntowrite; 1384 return ntowrite; 1385} 1386 1387 1388static size_t 1389sbt_read_buf (struct stream_buf_tosend *sbt, void *buf, size_t len) 1390{ 1391 size_t navail = sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off; 1392 if (len > navail) 1393 len = navail; 1394 memcpy(buf, sbt->u.buf.sbt_data + sbt->u.buf.sbt_off, len); 1395 sbt->u.buf.sbt_off += len; 1396 return len; 1397} 1398 1399 1400static void 1401incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr) 1402{ 1403 stream->tosend_sz += incr; 1404 if (stream->stream_flags & STREAM_CONN_LIMITED) 1405 { 1406 assert(stream->conn_pub->conn_cap.cc_tosend + 1407 stream->conn_pub->conn_cap.cc_sent + incr <= 1408 stream->conn_pub->conn_cap.cc_max); 1409 stream->conn_pub->conn_cap.cc_tosend += incr; 1410 } 1411} 1412 1413 1414static void 1415decr_tosend_sz (lsquic_stream_t *stream, uint64_t decr) 1416{ 1417 assert(decr <= stream->tosend_sz); 1418 stream->tosend_sz -= decr; 1419 if (stream->stream_flags & STREAM_CONN_LIMITED) 1420 { 1421 assert(decr <= stream->conn_pub->conn_cap.cc_tosend); 1422 stream->conn_pub->conn_cap.cc_tosend -= decr; 1423 } 1424} 1425 1426 1427static void 1428sbt_truncated_file (struct stream_buf_tosend *sbt) 1429{ 1430 off_t delta = sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1431 decr_tosend_sz(sbt->u.file.sbt_stream, delta); 1432 sbt->u.file.sbt_sz = sbt->u.file.sbt_off; 1433 sbt->u.file.sbt_last = 1; 1434} 1435 1436 1437static void 1438stop_reading_from_file (lsquic_stream_t *stream) 1439{ 1440 assert(stream->file_fd >= 0); 1441 if (stream->stream_flags & STREAM_CLOSE_FILE) 1442 (void) close(stream->file_fd); 1443 stream->file_fd = -1; 1444 stream_wantwrite(stream, !!(stream->stream_flags & STREAM_SAVED_WANTWR)); 1445 use_user_on_write(stream); 1446} 1447 1448 1449static size_t 1450sbt_read_file (struct stream_buf_tosend *sbt, void *pbuf, size_t len) 1451{ 1452 const lsquic_stream_t *const stream = sbt->u.file.sbt_stream; 1453 size_t navail; 1454 ssize_t nread; 1455 unsigned char *buf = pbuf; 1456 1457 navail = sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1458 if (len > navail) 1459 len = navail; 1460 1461 assert(len > 0); 1462 1463 *buf++ = sbt->u.file.sbt_stream->file_byte; 1464 sbt->u.file.sbt_off += 1; 1465 len -= 1; 1466 1467 while (len > 0) 1468 { 1469 nread = read(sbt->u.file.sbt_fd, buf, len); 1470 if (-1 == nread) 1471 { 1472 LSQ_WARN("error reading: %s", strerror(errno)); 1473 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1474 (intmax_t) sbt->u.file.sbt_off, 1475 (intmax_t) sbt->u.file.sbt_sz); 1476 sbt_truncated_file(sbt); 1477 break; 1478 } 1479 else if (0 == nread) 1480 { 1481 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1482 (intmax_t) sbt->u.file.sbt_off, 1483 (intmax_t) sbt->u.file.sbt_sz); 1484 sbt_truncated_file(sbt); 1485 break; 1486 } 1487 buf += nread; 1488 len -= nread; 1489 sbt->u.file.sbt_off += nread; 1490 } 1491 1492 len = buf - (unsigned char *) pbuf; 1493 1494 if (sbt->u.file.sbt_off < sbt->u.file.sbt_sz || !sbt->u.file.sbt_last) 1495 { 1496 nread = read(sbt->u.file.sbt_fd, &sbt->u.file.sbt_stream->file_byte, 1); 1497 if (-1 == nread) 1498 { 1499 LSQ_WARN("error reading: %s", strerror(errno)); 1500 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1501 (intmax_t) sbt->u.file.sbt_off, 1502 (intmax_t) sbt->u.file.sbt_sz); 1503 sbt_truncated_file(sbt); 1504 } 1505 else if (0 == nread) 1506 { 1507 LSQ_WARN("could only send %jd bytes instead of intended %jd", 1508 (intmax_t) sbt->u.file.sbt_off, 1509 (intmax_t) sbt->u.file.sbt_sz); 1510 sbt_truncated_file(sbt); 1511 } 1512 } 1513 1514 if (sbt->u.file.sbt_last && sbt->u.file.sbt_off == sbt->u.file.sbt_sz) 1515 stop_reading_from_file(sbt->u.file.sbt_stream); 1516 1517 return len; 1518} 1519 1520 1521static size_t 1522sbt_read (struct stream_buf_tosend *sbt, void *buf, size_t len) 1523{ 1524 switch (sbt->sbt_type) 1525 { 1526 case SBT_BUF: 1527 return sbt_read_buf(sbt, buf, len); 1528 default: 1529 assert(SBT_FILE == sbt->sbt_type); 1530 return sbt_read_file(sbt, buf, len); 1531 } 1532} 1533 1534 1535static int 1536sbt_done (const struct stream_buf_tosend *sbt) 1537{ 1538 switch (sbt->sbt_type) 1539 { 1540 case SBT_BUF: 1541 return sbt->u.buf.sbt_off == sbt->u.buf.sbt_sz; 1542 default: 1543 assert(SBT_FILE == sbt->sbt_type); 1544 return sbt->u.file.sbt_off == sbt->u.file.sbt_sz; 1545 } 1546} 1547 1548 1549static void 1550sbt_destroy (lsquic_stream_t *stream, struct stream_buf_tosend *sbt) 1551{ 1552 switch (sbt->sbt_type) 1553 { 1554 case SBT_BUF: 1555 lsquic_mm_put_4k(stream->conn_pub->mm, sbt); 1556 break; 1557 default: 1558 assert(SBT_FILE == sbt->sbt_type); 1559 free(sbt); 1560 } 1561} 1562 1563 1564static size_t 1565sbt_size (const struct stream_buf_tosend *sbt) 1566{ 1567 switch (sbt->sbt_type) 1568 { 1569 case SBT_BUF: 1570 return sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off; 1571 default: 1572 return sbt->u.file.sbt_sz - sbt->u.file.sbt_off; 1573 } 1574} 1575 1576 1577static size_t 1578sum_sbts (const lsquic_stream_t *stream) 1579{ 1580 size_t sum; 1581 struct stream_buf_tosend *sbt; 1582 1583 sum = 0; 1584 TAILQ_FOREACH(sbt, &stream->bufs_tosend, next_sbt) 1585 sum += sbt_size(sbt); 1586 1587 return sum; 1588} 1589 1590 1591static void 1592maybe_put_on_sending_streams (lsquic_stream_t *stream) 1593{ 1594 if (lsquic_stream_tosend_sz(stream)) 1595 { 1596 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 1597 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); 1598 stream->stream_flags |= STREAM_SEND_DATA; 1599 } 1600} 1601 1602 1603static size_t 1604stream_flush_internal (lsquic_stream_t *stream, size_t size) 1605{ 1606 uint64_t conn_avail; 1607 1608 assert(0 == stream->tosend_sz || 1609 (stream->stream_flags & STREAM_U_WRITE_DONE)); 1610 if (stream->stream_flags & STREAM_CONN_LIMITED) 1611 { 1612 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 1613 if (size > conn_avail) 1614 { 1615 LSQ_DEBUG("connection-limited: flushing only %"PRIu64 1616 " out of %zd bytes", conn_avail, size); 1617 size = conn_avail; 1618 } 1619 } 1620 LSQ_DEBUG("flushed %zd bytes of stream %u", size, stream->id); 1621 SM_HISTORY_APPEND(stream, SHE_FLUSH); 1622 incr_tosend_sz(stream, size); 1623 maybe_put_on_sending_streams(stream); 1624 return size; 1625} 1626 1627 1628/* When stream->tosend_sz is zero and we have anything in SBT list, this 1629 * means that we have unflushed data. 1630 */ 1631int 1632lsquic_stream_flush (lsquic_stream_t *stream) 1633{ 1634 size_t sum; 1635 if (0 == stream->tosend_sz && !TAILQ_EMPTY(&stream->bufs_tosend)) 1636 { 1637 sum = sum_sbts(stream); 1638 if (stream_flush_internal(stream, sum) > 0) 1639 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_FLUSH); 1640 } 1641 return 0; 1642} 1643 1644 1645/* The flush threshold is the maximum size of stream data that can be sent 1646 * in a full packet. 1647 */ 1648static size_t 1649flush_threshold (const lsquic_stream_t *stream) 1650{ 1651 enum packet_out_flags flags; 1652 unsigned packet_header_sz, stream_header_sz; 1653 size_t threshold; 1654 1655 /* We are guessing the number of bytes that will be used to encode 1656 * packet number, because we do not have this information at this 1657 * point in time. 1658 */ 1659 flags = PACKNO_LEN_2 << POBIT_SHIFT; 1660 if (stream->conn_pub->lconn->cn_flags & LSCONN_TCID0) 1661 flags |= PO_CONN_ID; 1662 1663 packet_header_sz = lsquic_po_header_length(flags); 1664 stream_header_sz = stream->conn_pub->lconn->cn_pf 1665 ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off); 1666 1667 threshold = stream->conn_pub->lconn->cn_pack_size - packet_header_sz 1668 - stream_header_sz; 1669 return threshold; 1670} 1671 1672 1673static size_t 1674flush_or_check_flags (lsquic_stream_t *stream, size_t sz) 1675{ 1676 size_t sum; 1677 if (0 == stream->tosend_sz) 1678 { 1679 sum = sum_sbts(stream); 1680 if (sum >= flush_threshold(stream)) 1681 return stream_flush_internal(stream, sum); 1682 else 1683 return 0; 1684 } 1685 else 1686 { 1687 incr_tosend_sz(stream, sz); 1688 maybe_put_on_sending_streams(stream); 1689 return sz; 1690 } 1691} 1692 1693 1694static void 1695stream_file_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *st_ctx) 1696{ 1697 struct stream_buf_tosend *sbt; 1698 ssize_t nr; 1699 size_t size, left; 1700 int last; 1701 1702 if (stream->stream_flags & STREAM_RST_FLAGS) 1703 { 1704 LSQ_INFO("stream was reset: stopping sending the file at offset %jd", 1705 (intmax_t) stream->file_off); 1706 stop_reading_from_file(stream); 1707 return; 1708 } 1709 1710 /* Write as much as we can */ 1711 size = lsquic_stream_write_avail(stream); 1712 left = stream->file_size - stream->file_off; 1713 if (left < size) 1714 size = left; 1715 1716 if (0 == stream->file_off) 1717 { 1718 /* Try to read in 1 byte to check for truncation. Having a byte in 1719 * store guarantees that we can generate a frame even if the file is 1720 * truncated later. This function only does it once, when the first 1721 * SBT is queued. Subsequent SBT use the byte read in by previous 1722 * SBT in sbt_read_file(). 1723 */ 1724 nr = read(stream->file_fd, &stream->file_byte, 1); 1725 if (nr != 1) 1726 { 1727 if (nr < 0) 1728 LSQ_WARN("cannot read from file: %s", strerror(errno)); 1729 LSQ_INFO("stopping sending the file at offset %jd", 1730 (intmax_t) stream->file_off); 1731 stop_reading_from_file(stream); 1732 return; 1733 } 1734 } 1735 1736 last = stream->file_off + (off_t) size == stream->file_size; 1737 1738 sbt = malloc(offsetof(struct stream_buf_tosend, u.file.sbt_last) + 1739 sizeof(((struct stream_buf_tosend *)0)->u.file.sbt_last)); 1740 if (!sbt) 1741 { 1742 LSQ_WARN("malloc failed: %s", strerror(errno)); 1743 LSQ_INFO("stopping sending the file at offset %jd", 1744 (intmax_t) stream->file_off); 1745 stop_reading_from_file(stream); 1746 return; 1747 } 1748 1749 sbt->sbt_type = SBT_FILE; 1750 sbt->u.file.sbt_stream = stream; 1751 sbt->u.file.sbt_fd = stream->file_fd; 1752 sbt->u.file.sbt_sz = size; 1753 sbt->u.file.sbt_off = 0; 1754 sbt->u.file.sbt_last = last; 1755 TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt); 1756 1757 LSQ_DEBUG("inserted %zd-byte sbt at offset %jd, last: %d", size, 1758 (intmax_t) stream->file_off, last); 1759 1760 stream->file_off += size; 1761 1762 incr_tosend_sz(stream, size); 1763 maybe_put_on_sending_streams(stream); 1764 1765 if (last) 1766 stream_want_read_or_write(stream, 0, STREAM_WANT_WRITE); 1767} 1768 1769 1770static void 1771stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, size_t size, 1772 int close) 1773{ 1774 int want_write; 1775 1776 /* STREAM_WANT_WRITE is not guaranteed to be set: the user may have 1777 * already unset it. 1778 */ 1779 want_write = !!(stream->stream_flags & STREAM_WANT_WRITE); 1780 stream_wantwrite(stream, 1); 1781 1782 stream->file_fd = fd; 1783 stream->file_off = off; 1784 stream->file_size = size; 1785 1786 stream_wantwrite(stream, want_write); 1787 1788 if (close) 1789 stream->stream_flags |= STREAM_CLOSE_FILE; 1790 else 1791 stream->stream_flags &= ~STREAM_CLOSE_FILE; 1792 1793 use_internal_on_write_file(stream); 1794 stream->on_write_cb(stream, stream->st_ctx); 1795} 1796 1797 1798#define COMMON_WRITE_CHECKS() do { \ 1799 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) \ 1800 == STREAM_USE_HEADERS) \ 1801 { \ 1802 LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \ 1803 errno = EILSEQ; \ 1804 return -1; \ 1805 } \ 1806 if (stream->stream_flags & STREAM_RST_FLAGS) \ 1807 { \ 1808 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 1809 errno = ECONNRESET; \ 1810 return -1; \ 1811 } \ 1812 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 1813 { \ 1814 LSQ_WARN("Attempt to write to stream after it was closed for " \ 1815 "writing"); \ 1816 errno = EBADF; \ 1817 return -1; \ 1818 } \ 1819} while (0) 1820 1821 1822int 1823lsquic_stream_write_file (lsquic_stream_t *stream, const char *filename) 1824{ 1825 int fd, saved_errno; 1826 struct stat st; 1827 1828 COMMON_WRITE_CHECKS(); 1829 1830 fd = open(filename, O_RDONLY); 1831 if (fd < 0) 1832 { 1833 LSQ_WARN("could not open `%s' for reading: %s", filename, strerror(errno)); 1834 return -1; 1835 } 1836 1837 if (fstat(fd, &st) < 0) 1838 { 1839 LSQ_WARN("fstat64(%s) failed: %s", filename, strerror(errno)); 1840 saved_errno = errno; 1841 (void) close(fd); 1842 errno = saved_errno; 1843 return -1; 1844 } 1845 1846 if (0 == st.st_size) 1847 { 1848 LSQ_INFO("Writing zero-sized file `%s' is a no-op", filename); 1849 (void) close(fd); 1850 return 0; 1851 } 1852 1853 LSQ_DEBUG("Inserted `%s' into SBT queue; size: %jd", filename, 1854 (intmax_t) st.st_size); 1855 1856 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1857 stream_sendfile(stream, fd, 0, st.st_size, 1); 1858 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_WRITEFILE); 1859 return 0; 1860} 1861 1862 1863int 1864lsquic_stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, 1865 size_t size) 1866{ 1867 COMMON_WRITE_CHECKS(); 1868 if ((off_t) -1 == lseek(fd, off, SEEK_SET)) 1869 { 1870 LSQ_INFO("lseek failed: %s", strerror(errno)); 1871 return -1; 1872 } 1873 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1874 stream_sendfile(stream, fd, off, size, 0); 1875 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SENDFILE); 1876 return 0; 1877} 1878 1879 1880ssize_t 1881lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 1882{ 1883 struct stream_buf_tosend *sbt; 1884 size_t nw, stream_avail, n_flushed; 1885 const unsigned char *p = buf; 1886 1887 COMMON_WRITE_CHECKS(); 1888 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1889 1890 stream_avail = lsquic_stream_write_avail(stream); 1891 if (stream_avail < len) 1892 { 1893 LSQ_DEBUG("cap length from %zd to %zd bytes", len, stream_avail); 1894 len = stream_avail; 1895 } 1896 1897 while (len > 0) 1898 { 1899 sbt = get_sbt_buf(stream); 1900 if (!sbt) 1901 { 1902 LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno)); 1903 break; 1904 } 1905 nw = sbt_write(sbt, p, len); 1906 len -= nw; 1907 p += nw; 1908 } 1909 1910 const size_t n_written = p - (unsigned char *) buf; 1911 LSQ_DEBUG("wrote %"PRIiPTR" bytes to stream %u", n_written, stream->id); 1912 1913 n_flushed = flush_or_check_flags(stream, n_written); 1914 if (n_flushed) 1915 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITE); 1916 1917 return n_written; 1918} 1919 1920 1921ssize_t 1922lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 1923 int iovcnt) 1924{ 1925 int i; 1926 const unsigned char *p; 1927 struct stream_buf_tosend *sbt; 1928 size_t nw, stream_avail, len, n_flushed; 1929 ssize_t nw_total; 1930 1931 COMMON_WRITE_CHECKS(); 1932 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 1933 1934 nw_total = 0; 1935 stream_avail = lsquic_stream_write_avail(stream); 1936 1937 for (i = 0; i < iovcnt && stream_avail > 0; ++i) 1938 { 1939 len = iov[i].iov_len; 1940 p = iov[i].iov_base; 1941 if (len > stream_avail) 1942 { 1943 LSQ_DEBUG("cap length from %zd to %zd bytes", nw_total + len, 1944 nw_total + stream_avail); 1945 len = stream_avail; 1946 } 1947 nw_total += len; 1948 stream_avail -= len; 1949 while (len > 0) 1950 { 1951 sbt = get_sbt_buf(stream); 1952 if (!sbt) 1953 { 1954 LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno)); 1955 break; 1956 } 1957 nw = sbt_write(sbt, p, len); 1958 len -= nw; 1959 p += nw; 1960 } 1961 } 1962 1963 LSQ_DEBUG("wrote %zd bytes to stream", nw_total); 1964 1965 n_flushed = flush_or_check_flags(stream, nw_total); 1966 if (n_flushed) 1967 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITEV); 1968 1969 return nw_total; 1970} 1971 1972 1973int 1974lsquic_stream_send_headers (lsquic_stream_t *stream, 1975 const lsquic_http_headers_t *headers, int eos) 1976{ 1977 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT| 1978 STREAM_U_WRITE_DONE)) 1979 == STREAM_USE_HEADERS) 1980 { 1981 int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs, 1982 stream->id, headers, eos, lsquic_stream_priority(stream)); 1983 if (0 == s) 1984 { 1985 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 1986 stream->stream_flags |= STREAM_HEADERS_SENT; 1987 if (eos) 1988 stream->stream_flags |= STREAM_FIN_SENT; 1989 LSQ_INFO("sent headers for stream %u", stream->id); 1990 } 1991 else 1992 LSQ_WARN("could not send headers: %s", strerror(errno)); 1993 return s; 1994 } 1995 else 1996 { 1997 LSQ_WARN("cannot send headers for stream %u in this state", stream->id); 1998 errno = EBADMSG; 1999 return -1; 2000 } 2001} 2002 2003 2004void 2005lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 2006{ 2007 if (offset > stream->max_send_off) 2008 { 2009 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 2010 LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to " 2011 "0x%"PRIX64, stream->id, stream->max_send_off, offset); 2012 stream->max_send_off = offset; 2013 if (lsquic_stream_tosend_sz(stream)) 2014 { 2015 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2016 { 2017 LSQ_DEBUG("stream %u unblocked, schedule sending again", 2018 stream->id); 2019 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2020 next_send_stream); 2021 } 2022 stream->stream_flags |= STREAM_SEND_DATA; 2023 } 2024 } 2025 else 2026 LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old " 2027 "max send offset 0x%"PRIX64", ignoring", stream->id, offset, 2028 stream->max_send_off); 2029} 2030 2031 2032/* This function is used to update offsets after handshake completes and we 2033 * learn of peer's limits from the handshake values. 2034 */ 2035int 2036lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset) 2037{ 2038 LSQ_DEBUG("setting max_send_off to %u", offset); 2039 if (offset > stream->max_send_off) 2040 { 2041 lsquic_stream_window_update(stream, offset); 2042 return 0; 2043 } 2044 else if (offset < stream->tosend_off) 2045 { 2046 LSQ_INFO("new offset (%u bytes) is smaller than the amount of data " 2047 "already sent on this stream (%"PRIu64" bytes)", offset, 2048 stream->tosend_off); 2049 return -1; 2050 } 2051 else 2052 { 2053 stream->max_send_off = offset; 2054 return 0; 2055 } 2056} 2057 2058 2059#ifndef NDEBUG 2060/* Use weak linkage so that tests can override this function */ 2061size_t 2062lsquic_stream_tosend_sz (const lsquic_stream_t *stream) 2063 __attribute__((weak)) 2064 ; 2065#endif 2066size_t 2067lsquic_stream_tosend_sz (const lsquic_stream_t *stream) 2068{ 2069 assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off); 2070 return stream->tosend_sz; 2071} 2072 2073 2074#ifndef NDEBUG 2075/* Use weak linkage so that tests can override this function */ 2076size_t 2077lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len, 2078 int *reached_fin) 2079 __attribute__((weak)) 2080 ; 2081#endif 2082size_t 2083lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len, 2084 int *reached_fin) 2085{ 2086 assert(stream->tosend_sz > 0); 2087 assert(stream->stream_flags & STREAM_SEND_DATA); 2088 const size_t tosend_sz = lsquic_stream_tosend_sz(stream); 2089 if (tosend_sz < len) 2090 len = tosend_sz; 2091 struct stream_buf_tosend *sbt; 2092 unsigned char *p = buf; 2093 unsigned char *const end = p + len; 2094 while (p < end && (sbt = TAILQ_FIRST(&stream->bufs_tosend))) 2095 { 2096 size_t nread = sbt_read(sbt, p, len); 2097 p += nread; 2098 len -= nread; 2099 if (sbt_done(sbt)) 2100 { 2101 TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt); 2102 sbt_destroy(stream, sbt); 2103 LSQ_DEBUG("destroyed SBT"); 2104 } 2105 else 2106 break; 2107 } 2108 const size_t n_read = p - (unsigned char *) buf; 2109 decr_tosend_sz(stream, n_read); 2110 stream->tosend_off += n_read; 2111 if (stream->stream_flags & STREAM_CONN_LIMITED) 2112 { 2113 stream->conn_pub->conn_cap.cc_sent += n_read; 2114 assert(stream->conn_pub->conn_cap.cc_sent <= 2115 stream->conn_pub->conn_cap.cc_max); 2116 } 2117 *reached_fin = lsquic_stream_tosend_fin(stream); 2118 return n_read; 2119} 2120 2121 2122void 2123lsquic_stream_stream_frame_sent (lsquic_stream_t *stream) 2124{ 2125 assert(stream->stream_flags & STREAM_SEND_DATA); 2126 SM_HISTORY_APPEND(stream, SHE_FRAME_OUT); 2127 if (0 == lsquic_stream_tosend_sz(stream)) 2128 { 2129 /* Mark the stream as having no sendable data independent of reason 2130 * why there is no data to send. 2131 */ 2132 if (0 == stream->tosend_sz) 2133 { 2134 LSQ_DEBUG("all stream %u data has been scheduled for sending, " 2135 "now at offset 0x%"PRIX64, stream->id, stream->tosend_off); 2136 stream->stream_flags &= ~STREAM_SEND_DATA; 2137 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2138 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 2139 next_send_stream); 2140 2141 if ((stream->stream_flags & STREAM_U_WRITE_DONE) && !writing_file(stream)) 2142 { 2143 stream->stream_flags |= STREAM_FIN_SENT; 2144 maybe_finish_stream(stream); 2145 } 2146 } 2147 else 2148 { 2149 LSQ_DEBUG("stream %u blocked from sending", stream->id); 2150 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2151 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2152 next_send_stream); 2153 stream->stream_flags &= ~STREAM_SEND_DATA; 2154 stream->stream_flags |= STREAM_SEND_BLOCKED; 2155 } 2156 } 2157} 2158 2159 2160#ifndef NDEBUG 2161/* Use weak linkage so that tests can override this function */ 2162uint64_t 2163lsquic_stream_tosend_offset (const lsquic_stream_t *stream) 2164 __attribute__((weak)) 2165 ; 2166#endif 2167uint64_t 2168lsquic_stream_tosend_offset (const lsquic_stream_t *stream) 2169{ 2170 return stream->tosend_off; 2171} 2172 2173 2174static void 2175drop_sbts (lsquic_stream_t *stream) 2176{ 2177 struct stream_buf_tosend *sbt; 2178 2179 while ((sbt = TAILQ_FIRST(&stream->bufs_tosend))) 2180 { 2181 TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt); 2182 sbt_destroy(stream, sbt); 2183 } 2184 2185 decr_tosend_sz(stream, stream->tosend_sz); 2186 stream->tosend_sz = 0; 2187} 2188 2189 2190void 2191lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code) 2192{ 2193 lsquic_stream_reset_ext(stream, error_code, 1); 2194} 2195 2196 2197void 2198lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, 2199 int do_close) 2200{ 2201 if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT)) 2202 { 2203 LSQ_INFO("reset already sent"); 2204 return; 2205 } 2206 2207 SM_HISTORY_APPEND(stream, SHE_RESET); 2208 2209 LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code); 2210 stream->error_code = error_code; 2211 2212 if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) 2213 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 2214 next_send_stream); 2215 stream->stream_flags &= ~STREAM_SENDING_FLAGS; 2216 stream->stream_flags |= STREAM_SEND_RST; 2217 2218 drop_sbts(stream); 2219 maybe_schedule_call_on_close(stream); 2220 2221 if (do_close) 2222 lsquic_stream_close(stream); 2223 else 2224 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT); 2225} 2226 2227 2228unsigned 2229lsquic_stream_id (const lsquic_stream_t *stream) 2230{ 2231 return stream->id; 2232} 2233 2234 2235struct lsquic_conn * 2236lsquic_stream_conn (const lsquic_stream_t *stream) 2237{ 2238 return stream->conn_pub->lconn; 2239} 2240 2241 2242int 2243lsquic_stream_close (lsquic_stream_t *stream) 2244{ 2245 LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id); 2246 SM_HISTORY_APPEND(stream, SHE_CLOSE); 2247 if (lsquic_stream_is_closed(stream)) 2248 { 2249 LSQ_INFO("Attempt to close an already-closed stream %u", stream->id); 2250 errno = EBADF; 2251 return -1; 2252 } 2253 stream_shutdown_write(stream); 2254 stream_shutdown_read(stream); 2255 maybe_schedule_call_on_close(stream); 2256 maybe_finish_stream(stream); 2257 maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE); 2258 return 0; 2259} 2260 2261 2262void 2263lsquic_stream_acked (lsquic_stream_t *stream) 2264{ 2265 assert(stream->n_unacked); 2266 --stream->n_unacked; 2267 LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked); 2268 if (0 == stream->n_unacked) 2269 maybe_finish_stream(stream); 2270} 2271 2272 2273void 2274lsquic_stream_push_req (lsquic_stream_t *stream, 2275 struct uncompressed_headers *push_req) 2276{ 2277 assert(!stream->push_req); 2278 stream->push_req = push_req; 2279 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 2280} 2281 2282 2283int 2284lsquic_stream_is_pushed (const lsquic_stream_t *stream) 2285{ 2286 return 1 & ~stream->id; 2287} 2288 2289 2290int 2291lsquic_stream_push_info (const lsquic_stream_t *stream, 2292 uint32_t *ref_stream_id, const char **headers, size_t *headers_sz) 2293{ 2294 if (lsquic_stream_is_pushed(stream)) 2295 { 2296 assert(stream->push_req); 2297 *ref_stream_id = stream->push_req->uh_stream_id; 2298 *headers = stream->push_req->uh_headers; 2299 *headers_sz = stream->push_req->uh_size; 2300 return 0; 2301 } 2302 else 2303 return -1; 2304} 2305 2306 2307int 2308lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 2309{ 2310 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS) 2311 { 2312 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 2313 LSQ_DEBUG("received uncompressed headers for stream %u", stream->id); 2314 stream->stream_flags |= STREAM_HAVE_UH; 2315 if (uh->uh_flags & UH_FIN) 2316 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 2317 stream->uh = uh; 2318 if (uh->uh_oth_stream_id == 0) 2319 { 2320 if (uh->uh_weight) 2321 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 2322 } 2323 else 2324 LSQ_NOTICE("don't know how to depend on stream %u", 2325 uh->uh_oth_stream_id); 2326 return 0; 2327 } 2328 else 2329 { 2330 LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id); 2331 return -1; 2332 } 2333} 2334 2335 2336unsigned 2337lsquic_stream_priority (const lsquic_stream_t *stream) 2338{ 2339 return 256 - stream->sm_priority; 2340} 2341 2342 2343int 2344lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 2345{ 2346 /* The user should never get a reference to the special streams, 2347 * but let's check just in case: 2348 */ 2349 if (LSQUIC_STREAM_HANDSHAKE == stream->id 2350 || ((stream->stream_flags & STREAM_USE_HEADERS) && 2351 LSQUIC_STREAM_HEADERS == stream->id)) 2352 return -1; 2353 if (priority < 1 || priority > 256) 2354 return -1; 2355 stream->sm_priority = 256 - priority; 2356 LSQ_DEBUG("set priority to %u", priority); 2357 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 2358 return 0; 2359} 2360 2361 2362int 2363lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 2364{ 2365 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 2366 { 2367 if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) == 2368 (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) 2369 { 2370 /* We need to send headers only if we are a) using HEADERS stream 2371 * and b) we already sent initial headers. If initial headers 2372 * have not been sent yet, stream priority will be sent in the 2373 * HEADERS frame. 2374 */ 2375 return lsquic_headers_stream_send_priority(stream->conn_pub->hs, 2376 stream->id, 0, 0, priority); 2377 } 2378 else 2379 return 0; 2380 } 2381 else 2382 return -1; 2383} 2384 2385 2386lsquic_stream_ctx_t * 2387lsquic_stream_get_ctx (const lsquic_stream_t *stream) 2388{ 2389 return stream->st_ctx; 2390} 2391 2392 2393int 2394lsquic_stream_refuse_push (lsquic_stream_t *stream) 2395{ 2396 if (lsquic_stream_is_pushed(stream) && 2397 !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST))) 2398 { 2399 LSQ_DEBUG("refusing pushed stream: send reset"); 2400 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 2401 return 0; 2402 } 2403 else 2404 return -1; 2405} 2406 2407 2408