lsquic_stream.c revision a6cdaedb
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_stream.c -- stream processing 4 * 5 * To clear up terminology, here are some of our stream states (in order). 6 * They are not codified, but they are referred to in both code and comments. 7 * 8 * CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this 9 * point, on_close() gets called. 10 * FINISHED FIN or RST has been sent to peer. Stream is scheduled to be 11 * finished (freed): it gets put onto the `service_streams' 12 * list for connection to clean it up. 13 * DESTROYED All remaining memory associated with the stream is released. 14 * If on_close() has not been called yet, it is called now. 15 * The stream pointer is now invalid. 16 * 17 * When connection is aborted, a stream may go directly to DESTROYED state. 18 */ 19 20#include <assert.h> 21#include <errno.h> 22#include <inttypes.h> 23#include <stdarg.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/queue.h> 27#include <stddef.h> 28 29#include "lsquic.h" 30 31#include "lsquic_int_types.h" 32#include "lsquic_packet_common.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn_flow.h" 36#include "lsquic_rtt.h" 37#include "lsquic_sfcw.h" 38#include "lsquic_varint.h" 39#include "lsquic_hq.h" 40#include "lsquic_hash.h" 41#include "lsquic_stream.h" 42#include "lsquic_conn_public.h" 43#include "lsquic_util.h" 44#include "lsquic_mm.h" 45#include "lsquic_headers_stream.h" 46#include "lsquic_conn.h" 47#include "lsquic_data_in_if.h" 48#include "lsquic_parse.h" 49#include "lsquic_packet_out.h" 50#include "lsquic_engine_public.h" 51#include "lsquic_senhist.h" 52#include "lsquic_pacer.h" 53#include "lsquic_cubic.h" 54#include "lsquic_bw_sampler.h" 55#include "lsquic_minmax.h" 56#include "lsquic_bbr.h" 57#include "lsquic_send_ctl.h" 58#include "lsquic_headers.h" 59#include "lsquic_ev_log.h" 60#include "lsquic_enc_sess.h" 61#include "lsqpack.h" 62#include "lsquic_frab_list.h" 63#include "lsquic_http1x_if.h" 64#include "lsquic_qdec_hdl.h" 65#include "lsquic_qenc_hdl.h" 66#include "lsquic_byteswap.h" 67#include "lsquic_h3_prio.h" 68#include "lsquic_ietf.h" 69#include "lsquic_push_promise.h" 70 71#define LSQUIC_LOGGER_MODULE LSQLM_STREAM 72#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn) 73#define LSQUIC_LOG_STREAM_ID stream->id 74#include "lsquic_logger.h" 75 76#define MIN(a, b) ((a) < (b) ? (a) : (b)) 77 78static void 79drop_frames_in (lsquic_stream_t *stream); 80 81static void 82maybe_schedule_call_on_close (lsquic_stream_t *stream); 83 84static int 85stream_wantread (lsquic_stream_t *stream, int is_want); 86 87static int 88stream_wantwrite (lsquic_stream_t *stream, int is_want); 89 90static ssize_t 91stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); 92 93static ssize_t 94save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); 95 96static int 97stream_flush (lsquic_stream_t *stream); 98 99static int 100stream_flush_nocheck (lsquic_stream_t *stream); 101 102static void 103maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag); 104 105enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR }; 106 107static enum swtp_status 108stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size); 109 110static enum swtp_status 111stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size); 112 113static enum swtp_status 114stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size); 115 116static size_t 117stream_write_avail (struct lsquic_stream *); 118 119static size_t 120stream_write_avail_with_headers (struct lsquic_stream *); 121 122static int 123hq_filter_readable (struct lsquic_stream *stream); 124 125static void 126hq_decr_left (struct lsquic_stream *stream, size_t); 127 128static size_t 129hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame); 130 131static int 132stream_readable_non_http (struct lsquic_stream *stream); 133 134static int 135stream_readable_http_gquic (struct lsquic_stream *stream); 136 137static int 138stream_readable_http_ietf (struct lsquic_stream *stream); 139 140static ssize_t 141stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz); 142 143static size_t 144active_hq_frame_sizes (const struct lsquic_stream *); 145 146static void 147on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 148 149static void 150on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *); 151 152static void 153stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *); 154 155const struct stream_filter_if hq_stream_filter_if = 156{ 157 .sfi_readable = hq_filter_readable, 158 .sfi_filter_df = hq_filter_df, 159 .sfi_decr_left = hq_decr_left, 160}; 161 162 163#if LSQUIC_KEEP_STREAM_HISTORY 164/* These values are printable ASCII characters for ease of printing the 165 * whole history in a single line of a log message. 166 * 167 * The list of events is not exhaustive: only most interesting events 168 * are recorded. 169 */ 170enum stream_history_event 171{ 172 SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ 173 SHE_PLUS = '+', /* Special entry: previous event occured more than once */ 174 SHE_REACH_FIN = 'a', 175 SHE_BLOCKED_OUT = 'b', 176 SHE_CREATED = 'C', 177 SHE_FRAME_IN = 'd', 178 SHE_FRAME_OUT = 'D', 179 SHE_RESET = 'e', 180 SHE_WINDOW_UPDATE = 'E', 181 SHE_FIN_IN = 'f', 182 SHE_FINISHED = 'F', 183 SHE_GOAWAY_IN = 'g', 184 SHE_USER_WRITE_HEADER = 'h', 185 SHE_HEADERS_IN = 'H', 186 SHE_IF_SWITCH = 'i', 187 SHE_ONCLOSE_SCHED = 'l', 188 SHE_ONCLOSE_CALL = 'L', 189 SHE_ONNEW = 'N', 190 SHE_SET_PRIO = 'p', 191 SHE_USER_READ = 'r', 192 SHE_SHUTDOWN_READ = 'R', 193 SHE_RST_IN = 's', 194 SHE_SS_IN = 'S', 195 SHE_RST_OUT = 't', 196 SHE_RST_ACKED = 'T', 197 SHE_FLUSH = 'u', 198 SHE_USER_WRITE_DATA = 'w', 199 SHE_SHUTDOWN_WRITE = 'W', 200 SHE_CLOSE = 'X', 201 SHE_DELAY_SW = 'y', 202 SHE_FORCE_FINISH = 'Z', 203}; 204 205static void 206sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event) 207{ 208 enum stream_history_event prev_event; 209 sm_hist_idx_t idx; 210 int plus; 211 212 idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK; 213 plus = SHE_PLUS == stream->sm_hist_buf[idx]; 214 idx = (idx - plus) & SM_HIST_IDX_MASK; 215 prev_event = stream->sm_hist_buf[idx]; 216 217 if (prev_event == sh_event && plus) 218 return; 219 220 if (prev_event == sh_event) 221 sh_event = SHE_PLUS; 222 stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event; 223 224 if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK)) 225 LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf), 226 stream->sm_hist_buf); 227} 228 229 230# define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event) 231# define SM_HISTORY_DUMP_REMAINING(stream) do { \ 232 if (stream->sm_hist_idx & SM_HIST_IDX_MASK) \ 233 LSQ_DEBUG("history: [%.*s]", \ 234 (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK), \ 235 (stream)->sm_hist_buf); \ 236 } while (0) 237#else 238# define SM_HISTORY_APPEND(stream, event) 239# define SM_HISTORY_DUMP_REMAINING(stream) 240#endif 241 242 243static int 244stream_inside_callback (const lsquic_stream_t *stream) 245{ 246 return stream->conn_pub->enpub->enp_flags & ENPUB_PROC; 247} 248 249 250static void 251maybe_conn_to_tickable (lsquic_stream_t *stream) 252{ 253 if (!stream_inside_callback(stream)) 254 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 255 stream->conn_pub->lconn); 256} 257 258 259/* Here, "readable" means that the user is able to read from the stream. */ 260static void 261maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) 262{ 263 if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) 264 { 265 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 266 stream->conn_pub->lconn); 267 } 268} 269 270 271/* Here, "writeable" means that data can be put into packets to be 272 * scheduled to be sent out. 273 * 274 * If `check_can_send' is false, it means that we do not need to check 275 * whether packets can be sent. This check was already performed when 276 * we packetized stream data. 277 */ 278static void 279maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream, 280 int check_can_send) 281{ 282 if (!stream_inside_callback(stream) && 283 (!check_can_send 284 || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) && 285 ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) 286 { 287 lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, 288 stream->conn_pub->lconn); 289 } 290} 291 292 293static int 294stream_stalled (const lsquic_stream_t *stream) 295{ 296 return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) && 297 ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags) 298 != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE); 299} 300 301 302static size_t 303stream_stream_frame_header_sz (const struct lsquic_stream *stream, 304 unsigned data_sz) 305{ 306 return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz( 307 stream->id, stream->tosend_off, data_sz); 308} 309 310 311static size_t 312stream_crypto_frame_header_sz (const struct lsquic_stream *stream, 313 unsigned data_sz_IGNORED) 314{ 315 return stream->conn_pub->lconn->cn_pf 316 ->pf_calc_crypto_frame_header_sz(stream->tosend_off); 317} 318 319 320/* GQUIC-only function */ 321static int 322stream_is_hsk (const struct lsquic_stream *stream) 323{ 324 if (stream->sm_bflags & SMBF_IETF) 325 return 0; 326 else 327 return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE; 328} 329 330 331static struct lsquic_stream * 332stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub, 333 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 334 enum stream_ctor_flags ctor_flags) 335{ 336 struct lsquic_stream *stream; 337 338 stream = calloc(1, sizeof(*stream)); 339 if (!stream) 340 return NULL; 341 342 if (ctor_flags & SCF_USE_DI_HASH) 343 stream->data_in = data_in_hash_new(conn_pub, id, 0); 344 else 345 stream->data_in = data_in_nocopy_new(conn_pub, id); 346 if (!stream->data_in) 347 { 348 free(stream); 349 return NULL; 350 } 351 352 stream->id = id; 353 stream->stream_if = stream_if; 354 stream->conn_pub = conn_pub; 355 stream->sm_onnew_arg = stream_if_ctx; 356 stream->sm_write_avail = stream_write_avail; 357 358 if ((ctor_flags & (SCF_IETF|SCF_CRITICAL)) == SCF_IETF) 359 { 360 if (0 == lsquic_prio_tree_add_stream(conn_pub->u.ietf.prio_tree, 361 stream, H3ET_ROOT, 0, 0)) 362 stream->sm_qflags |= SMQF_H3_PRIO; 363 else 364 { 365 stream->data_in->di_if->di_destroy(stream->data_in); 366 free(stream); 367 return NULL; 368 } 369 } 370 371 STAILQ_INIT(&stream->sm_hq_frames); 372 373 stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1); 374 if (conn_pub->lconn->cn_flags & LSCONN_SERVER) 375 stream->sm_bflags |= SMBF_SERVER; 376 377 return stream; 378} 379 380 381/* TODO: The logic to figure out whether the stream is connection limited 382 * should be taken out of the constructor. The caller should specify this 383 * via one of enum stream_ctor_flags. 384 */ 385lsquic_stream_t * 386lsquic_stream_new (lsquic_stream_id_t id, 387 struct lsquic_conn_public *conn_pub, 388 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 389 unsigned initial_window, uint64_t initial_send_off, 390 enum stream_ctor_flags ctor_flags) 391{ 392 lsquic_cfcw_t *cfcw; 393 lsquic_stream_t *stream; 394 395 stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx, 396 ctor_flags); 397 if (!stream) 398 return NULL; 399 400 if (!initial_window) 401 initial_window = 16 * 1024; 402 403 if (ctor_flags & SCF_IETF) 404 { 405 cfcw = &conn_pub->cfcw; 406 stream->sm_bflags |= SMBF_CONN_LIMITED; 407 if (ctor_flags & SCF_HTTP) 408 { 409 stream->sm_write_avail = stream_write_avail_with_headers; 410 stream->sm_readable = stream_readable_http_ietf; 411 stream->sm_sfi = &hq_stream_filter_if; 412 } 413 else 414 stream->sm_readable = stream_readable_non_http; 415 lsquic_stream_set_priority_internal(stream, 416 LSQUIC_STREAM_DEFAULT_PRIO); 417 stream->sm_write_to_packet = stream_write_to_packet_std; 418 } 419 else 420 { 421 if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id)) 422 cfcw = NULL; 423 else 424 { 425 cfcw = &conn_pub->cfcw; 426 stream->sm_bflags |= SMBF_CONN_LIMITED; 427 lsquic_stream_set_priority_internal(stream, 428 LSQUIC_STREAM_DEFAULT_PRIO); 429 } 430 if (stream->sm_bflags & SMBF_USE_HEADERS) 431 stream->sm_readable = stream_readable_http_gquic; 432 else 433 stream->sm_readable = stream_readable_non_http; 434 if (stream_is_hsk(stream)) 435 stream->sm_write_to_packet = stream_write_to_packet_hsk; 436 else 437 stream->sm_write_to_packet = stream_write_to_packet_std; 438 } 439 440 lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id); 441 stream->max_send_off = initial_send_off; 442 LSQ_DEBUG("created stream"); 443 SM_HISTORY_APPEND(stream, SHE_CREATED); 444 stream->sm_frame_header_sz = stream_stream_frame_header_sz; 445 if (ctor_flags & SCF_CALL_ON_NEW) 446 lsquic_stream_call_on_new(stream); 447 return stream; 448} 449 450 451struct lsquic_stream * 452lsquic_stream_new_crypto (enum enc_level enc_level, 453 struct lsquic_conn_public *conn_pub, 454 const struct lsquic_stream_if *stream_if, void *stream_if_ctx, 455 enum stream_ctor_flags ctor_flags) 456{ 457 struct lsquic_stream *stream; 458 lsquic_stream_id_t stream_id; 459 460 assert(ctor_flags & SCF_CRITICAL); 461 462 stream_id = ~0ULL - enc_level; 463 stream = stream_new_common(stream_id, conn_pub, stream_if, 464 stream_if_ctx, ctor_flags); 465 if (!stream) 466 return NULL; 467 468 stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF; 469 stream->sm_enc_level = enc_level; 470 /* TODO: why have limit in crypto stream? Set it to UINT64_MAX? */ 471 lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id); 472 stream->max_send_off = 16 * 1024; 473 LSQ_DEBUG("created crypto stream"); 474 SM_HISTORY_APPEND(stream, SHE_CREATED); 475 stream->sm_frame_header_sz = stream_crypto_frame_header_sz; 476 stream->sm_write_to_packet = stream_write_to_packet_crypto; 477 stream->sm_readable = stream_readable_non_http; 478 if (ctor_flags & SCF_CALL_ON_NEW) 479 lsquic_stream_call_on_new(stream); 480 return stream; 481} 482 483 484void 485lsquic_stream_call_on_new (lsquic_stream_t *stream) 486{ 487 assert(!(stream->stream_flags & STREAM_ONNEW_DONE)); 488 if (!(stream->stream_flags & STREAM_ONNEW_DONE)) 489 { 490 LSQ_DEBUG("calling on_new_stream"); 491 SM_HISTORY_APPEND(stream, SHE_ONNEW); 492 stream->stream_flags |= STREAM_ONNEW_DONE; 493 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 494 stream); 495 } 496} 497 498 499static void 500decr_conn_cap (struct lsquic_stream *stream, size_t incr) 501{ 502 if (stream->sm_bflags & SMBF_CONN_LIMITED) 503 { 504 assert(stream->conn_pub->conn_cap.cc_sent >= incr); 505 stream->conn_pub->conn_cap.cc_sent -= incr; 506 } 507} 508 509 510static void 511maybe_resize_stream_buffer (struct lsquic_stream *stream) 512{ 513 assert(0 == stream->sm_n_buffered); 514 515 if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size) 516 { 517 free(stream->sm_buf); 518 stream->sm_buf = NULL; 519 stream->sm_n_allocated = 0; 520 } 521 else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size) 522 stream->sm_n_allocated = stream->conn_pub->path->np_pack_size; 523} 524 525 526static void 527drop_buffered_data (struct lsquic_stream *stream) 528{ 529 decr_conn_cap(stream, stream->sm_n_buffered); 530 stream->sm_n_buffered = 0; 531 maybe_resize_stream_buffer(stream); 532 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 533 maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS); 534} 535 536 537static void 538destroy_uh (struct lsquic_stream *stream) 539{ 540 if (stream->uh) 541 { 542 if (stream->uh->uh_hset) 543 stream->conn_pub->enpub->enp_hsi_if 544 ->hsi_discard_header_set(stream->uh->uh_hset); 545 free(stream->uh); 546 stream->uh = NULL; 547 } 548} 549 550 551void 552lsquic_stream_destroy (lsquic_stream_t *stream) 553{ 554 struct push_promise *promise; 555 struct stream_hq_frame *shf; 556 557 stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE; 558 if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) == 559 STREAM_ONNEW_DONE) 560 { 561 stream->stream_flags |= STREAM_ONCLOSE_DONE; 562 stream->stream_if->on_close(stream, stream->st_ctx); 563 } 564 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 565 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 566 if (stream->sm_qflags & SMQF_WANT_READ) 567 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream); 568 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 569 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream); 570 if (stream->sm_qflags & SMQF_SERVICE_FLAGS) 571 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream); 572 if (stream->sm_qflags & SMQF_QPACK_DEC) 573 lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream); 574 if (stream->sm_qflags & SMQF_H3_PRIO) 575 lsquic_prio_tree_remove_stream(stream->conn_pub->u.ietf.prio_tree, 576 stream, lsquic_time_now() /* XXX: do we have to call this? */); 577 drop_buffered_data(stream); 578 lsquic_sfcw_consume_rem(&stream->fc); 579 drop_frames_in(stream); 580 if (stream->push_req) 581 { 582 if (stream->push_req->uh_hset) 583 stream->conn_pub->enpub->enp_hsi_if 584 ->hsi_discard_header_set(stream->push_req->uh_hset); 585 free(stream->push_req); 586 } 587 while ((promise = SLIST_FIRST(&stream->sm_promises))) 588 { 589 SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next); 590 lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises); 591 } 592 if (stream->sm_promise) 593 { 594 assert(stream->sm_promise->pp_pushed_stream == stream); 595 stream->sm_promise->pp_pushed_stream = NULL; 596 lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises); 597 } 598 while ((shf = STAILQ_FIRST(&stream->sm_hq_frames))) 599 stream_hq_frame_put(stream, shf); 600 destroy_uh(stream); 601 free(stream->sm_buf); 602 free(stream->sm_header_block); 603 LSQ_DEBUG("destroyed stream"); 604 SM_HISTORY_DUMP_REMAINING(stream); 605 free(stream); 606} 607 608 609static int 610stream_is_finished (const lsquic_stream_t *stream) 611{ 612 return lsquic_stream_is_closed(stream) 613 /* n_unacked checks that no outgoing packets that reference this 614 * stream are outstanding: 615 */ 616 && 0 == stream->n_unacked 617 /* This checks that no packets that reference this stream will 618 * become outstanding: 619 */ 620 && 0 == (stream->sm_qflags & SMQF_SEND_RST) 621 && ((stream->stream_flags & STREAM_FORCE_FINISH) 622 || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); 623} 624 625 626static void 627maybe_finish_stream (lsquic_stream_t *stream) 628{ 629 if (0 == (stream->stream_flags & STREAM_FINISHED) && 630 stream_is_finished(stream)) 631 { 632 LSQ_DEBUG("stream is now finished"); 633 SM_HISTORY_APPEND(stream, SHE_FINISHED); 634 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 635 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 636 next_service_stream); 637 stream->sm_qflags |= SMQF_FREE_STREAM; 638 stream->stream_flags |= STREAM_FINISHED; 639 } 640} 641 642 643static void 644maybe_schedule_call_on_close (lsquic_stream_t *stream) 645{ 646 if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE| 647 STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) 648 == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE) 649 && !(stream->sm_qflags & SMQF_CALL_ONCLOSE)) 650 { 651 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 652 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 653 next_service_stream); 654 stream->sm_qflags |= SMQF_CALL_ONCLOSE; 655 LSQ_DEBUG("scheduled calling on_close"); 656 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED); 657 } 658} 659 660 661void 662lsquic_stream_call_on_close (lsquic_stream_t *stream) 663{ 664 assert(stream->stream_flags & STREAM_ONNEW_DONE); 665 stream->sm_qflags &= ~SMQF_CALL_ONCLOSE; 666 if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS)) 667 TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, 668 next_service_stream); 669 if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE)) 670 { 671 LSQ_DEBUG("calling on_close"); 672 stream->stream_flags |= STREAM_ONCLOSE_DONE; 673 SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL); 674 stream->stream_if->on_close(stream, stream->st_ctx); 675 } 676 else 677 assert(0); 678} 679 680 681static int 682stream_readable_non_http (struct lsquic_stream *stream) 683{ 684 return stream->data_in->di_if->di_get_frame(stream->data_in, 685 stream->read_offset) != NULL; 686} 687 688 689static int 690stream_readable_http_gquic (struct lsquic_stream *stream) 691{ 692 return (stream->stream_flags & STREAM_HAVE_UH) 693 && (stream->uh 694 || stream->data_in->di_if->di_get_frame(stream->data_in, 695 stream->read_offset)); 696} 697 698 699static int 700stream_readable_http_ietf (struct lsquic_stream *stream) 701{ 702 return 703 /* If we have read the header set and the header set has not yet 704 * been read, the stream is readable. 705 */ 706 ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh) 707 || 708 /* Alternatively, run the filter and check for payload availability. */ 709 (stream->sm_sfi->sfi_readable(stream) 710 && (/* Running the filter may result in hitting FIN: */ 711 (stream->stream_flags & STREAM_FIN_REACHED) 712 || stream->data_in->di_if->di_get_frame(stream->data_in, 713 stream->read_offset))); 714} 715 716 717int 718lsquic_stream_readable (struct lsquic_stream *stream) 719{ 720 /* A stream is readable if one of the following is true: */ 721 return 722 /* - It is already finished: in that case, lsquic_stream_read() will 723 * return 0. 724 */ 725 (stream->stream_flags & STREAM_FIN_REACHED) 726 /* - The stream is reset, by either side. In this case, 727 * lsquic_stream_read() will return -1 (we want the user to be 728 * able to collect the error). 729 */ 730 || lsquic_stream_is_reset(stream) 731 /* Type-dependent readability check: */ 732 || stream->sm_readable(stream); 733 ; 734} 735 736 737static size_t 738stream_write_avail (struct lsquic_stream *stream) 739{ 740 uint64_t stream_avail, conn_avail; 741 size_t hq_frames_sz; 742 743 stream_avail = stream->max_send_off - stream->tosend_off 744 - stream->sm_n_buffered; 745 746 if (stream->sm_bflags & SMBF_CONN_LIMITED) 747 { 748 conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap); 749 if (conn_avail < stream_avail) 750 stream_avail = conn_avail; 751 } 752 753 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 754 == (SMBF_IETF|SMBF_USE_HEADERS)) 755 { 756 hq_frames_sz = active_hq_frame_sizes(stream); 757 if (hq_frames_sz == 0) 758 hq_frames_sz = 3; /* Smallest new frame */ 759 760 if (stream_avail > hq_frames_sz) 761 stream_avail -= hq_frames_sz; 762 else 763 stream_avail = 0; 764 } 765 766 return stream_avail; 767} 768 769 770static int 771stream_is_pushing_promise (const struct lsquic_stream *stream) 772{ 773 return (stream->stream_flags & STREAM_PUSHING) 774 && SLIST_FIRST(&stream->sm_promises) 775 && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE 776 ; 777} 778 779 780/* To prevent deadlocks, ensure that when headers are sent, the bytes 781 * sent on the encoder stream are written first. 782 * 783 * XXX If the encoder is set up in non-risking mode, it is perfectly 784 * fine to send the header block first. TODO: update the logic to 785 * reflect this. There should be two sending behaviors: risk and non-risk. 786 * For now, we assume risk for everything to be on the conservative side. 787 */ 788static size_t 789stream_write_avail_with_headers (struct lsquic_stream *stream) 790{ 791 if (stream->stream_flags & STREAM_PUSHING) 792 return stream_write_avail(stream); 793 794 switch (stream->sm_send_headers_state) 795 { 796 case SSHS_BEGIN: 797 return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh); 798 case SSHS_ENC_SENDING: 799 if (stream->sm_hb_compl > 800 lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh)) 801 return 0; 802 LSQ_DEBUG("encoder stream bytes have all been sent"); 803 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 804 /* fall-through */ 805 default: 806 assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state); 807 return stream_write_avail(stream); 808 } 809} 810 811 812size_t 813lsquic_stream_write_avail (struct lsquic_stream *stream) 814{ 815 return stream->sm_write_avail(stream); 816} 817 818 819int 820lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off) 821{ 822 struct lsquic_conn *lconn; 823 824 if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) && 825 !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off)) 826 { 827 if (stream->sm_bflags & SMBF_IETF) 828 { 829 lconn = stream->conn_pub->lconn; 830 lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR, 831 "flow control violation on stream %"PRIu64, stream->id); 832 } 833 return -1; 834 } 835 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 836 { 837 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 838 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 839 next_send_stream); 840 stream->sm_qflags |= SMQF_SEND_WUF; 841 } 842 return 0; 843} 844 845 846int 847lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) 848{ 849 uint64_t max_off; 850 int got_next_offset, rv, free_frame; 851 enum ins_frame ins_frame; 852 853 assert(frame->packet_in); 854 855 SM_HISTORY_APPEND(stream, SHE_FRAME_IN); 856 LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; " 857 "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin); 858 859 if ((stream->sm_bflags & SMBF_USE_HEADERS) 860 && (stream->stream_flags & STREAM_HEAD_IN_FIN)) 861 { 862 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 863 lsquic_malo_put(frame); 864 return -1; 865 } 866 867 got_next_offset = frame->data_frame.df_offset == stream->read_offset; 868 insert_frame: 869 ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset); 870 if (INS_FRAME_OK == ins_frame) 871 { 872 /* Update maximum offset in the flow controller and check for flow 873 * control violation: 874 */ 875 rv = -1; 876 free_frame = !stream->data_in->di_if->di_own_on_ok; 877 max_off = frame->data_frame.df_offset + frame->data_frame.df_size; 878 if (0 != lsquic_stream_update_sfcw(stream, max_off)) 879 goto end_ok; 880 if (frame->data_frame.df_fin) 881 { 882 SM_HISTORY_APPEND(stream, SHE_FIN_IN); 883 stream->stream_flags |= STREAM_FIN_RECVD; 884 stream->sm_fin_off = DF_END(frame); 885 maybe_finish_stream(stream); 886 } 887 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 888 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 889 { 890 stream->data_in = stream->data_in->di_if->di_switch_impl( 891 stream->data_in, stream->read_offset); 892 if (!stream->data_in) 893 { 894 stream->data_in = data_in_error_new(); 895 goto end_ok; 896 } 897 } 898 if (got_next_offset) 899 /* Checking the offset saves di_get_frame() call */ 900 maybe_conn_to_tickable_if_readable(stream); 901 rv = 0; 902 end_ok: 903 if (free_frame) 904 lsquic_malo_put(frame); 905 return rv; 906 } 907 else if (INS_FRAME_DUP == ins_frame) 908 { 909 return 0; 910 } 911 else if (INS_FRAME_OVERLAP == ins_frame) 912 { 913 LSQ_DEBUG("overlap: switching DATA IN implementation"); 914 stream->data_in = stream->data_in->di_if->di_switch_impl( 915 stream->data_in, stream->read_offset); 916 if (stream->data_in) 917 goto insert_frame; 918 stream->data_in = data_in_error_new(); 919 lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in); 920 lsquic_malo_put(frame); 921 return -1; 922 } 923 else 924 { 925 assert(INS_FRAME_ERR == ins_frame); 926 return -1; 927 } 928} 929 930 931static void 932drop_frames_in (lsquic_stream_t *stream) 933{ 934 if (stream->data_in) 935 { 936 stream->data_in->di_if->di_destroy(stream->data_in); 937 /* To avoid checking whether `data_in` is set, just set to the error 938 * data-in stream. It does the right thing after incoming data is 939 * dropped. 940 */ 941 stream->data_in = data_in_error_new(); 942 } 943} 944 945 946static void 947maybe_elide_stream_frames (struct lsquic_stream *stream) 948{ 949 if (!(stream->stream_flags & STREAM_FRAMES_ELIDED)) 950 { 951 if (stream->n_unacked) 952 lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl, 953 stream->id); 954 stream->stream_flags |= STREAM_FRAMES_ELIDED; 955 } 956} 957 958 959int 960lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, 961 uint64_t error_code) 962{ 963 964 if (stream->stream_flags & STREAM_RST_RECVD) 965 { 966 LSQ_DEBUG("ignore duplicate RST_STREAM frame"); 967 return 0; 968 } 969 970 SM_HISTORY_APPEND(stream, SHE_RST_IN); 971 /* This flag must always be set, even if we are "ignoring" it: it is 972 * used by elision code. 973 */ 974 stream->stream_flags |= STREAM_RST_RECVD; 975 976 if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset) 977 { 978 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is " 979 "smaller than that of byte following the last byte we have seen: " 980 "0x%"PRIX64, offset, 981 lsquic_sfcw_get_max_recv_off(&stream->fc)); 982 return -1; 983 } 984 985 if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset)) 986 { 987 LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64 988 " violates flow control", offset); 989 return -1; 990 } 991 992 /* Let user collect error: */ 993 maybe_conn_to_tickable_if_readable(stream); 994 995 lsquic_sfcw_consume_rem(&stream->fc); 996 drop_frames_in(stream); 997 drop_buffered_data(stream); 998 maybe_elide_stream_frames(stream); 999 1000 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1001 && !(stream->sm_qflags & SMQF_SEND_RST)) 1002 lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); 1003 1004 stream->stream_flags |= STREAM_RST_RECVD; 1005 1006 maybe_finish_stream(stream); 1007 maybe_schedule_call_on_close(stream); 1008 1009 return 0; 1010} 1011 1012 1013void 1014lsquic_stream_stop_sending_in (struct lsquic_stream *stream, 1015 uint64_t error_code) 1016{ 1017 if (stream->stream_flags & STREAM_SS_RECVD) 1018 { 1019 LSQ_DEBUG("ignore duplicate STOP_SENDING frame"); 1020 return; 1021 } 1022 1023 SM_HISTORY_APPEND(stream, SHE_SS_IN); 1024 stream->stream_flags |= STREAM_SS_RECVD; 1025 1026 /* Let user collect error: */ 1027 maybe_conn_to_tickable_if_readable(stream); 1028 1029 lsquic_sfcw_consume_rem(&stream->fc); 1030 drop_frames_in(stream); 1031 drop_buffered_data(stream); 1032 maybe_elide_stream_frames(stream); 1033 1034 if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) 1035 && !(stream->sm_qflags & SMQF_SEND_RST)) 1036 lsquic_stream_reset_ext(stream, error_code, 0); 1037 1038 maybe_finish_stream(stream); 1039 maybe_schedule_call_on_close(stream); 1040} 1041 1042 1043uint64_t 1044lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream) 1045{ 1046 return lsquic_sfcw_get_fc_recv_off(&stream->fc); 1047} 1048 1049 1050void 1051lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream) 1052{ 1053 assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA); 1054 stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA; 1055 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1056 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1057 stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1058} 1059 1060 1061uint64_t 1062lsquic_stream_fc_recv_off (lsquic_stream_t *stream) 1063{ 1064 assert(stream->sm_qflags & SMQF_SEND_WUF); 1065 stream->sm_qflags &= ~SMQF_SEND_WUF; 1066 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1067 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1068 return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc); 1069} 1070 1071 1072void 1073lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off) 1074{ 1075 uint64_t last_off; 1076 1077 if (stream->sm_last_recv_off) 1078 last_off = stream->sm_last_recv_off; 1079 else 1080 /* This gets advertized in transport parameters */ 1081 last_off = lsquic_sfcw_get_max_recv_off(&stream->fc); 1082 1083 LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA " 1084 "frame we sent advertized the limit of %"PRIu64, peer_off, last_off); 1085 1086 if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF)) 1087 { 1088 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1089 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1090 next_send_stream); 1091 stream->sm_qflags |= SMQF_SEND_WUF; 1092 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1093 } 1094 else if (stream->sm_qflags & SMQF_SEND_WUF) 1095 LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled"); 1096 else if (stream->sm_last_recv_off) 1097 LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either " 1098 "packetized or sent", stream->sm_last_recv_off); 1099 else 1100 LSQ_INFO("Peer should have receive transport param limit " 1101 "of %"PRIu64"; odd.", last_off); 1102} 1103 1104 1105/* GQUIC's BLOCKED frame does not have an offset */ 1106void 1107lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream) 1108{ 1109 LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame"); 1110 if (!(stream->sm_qflags & SMQF_SEND_WUF)) 1111 { 1112 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1113 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1114 next_send_stream); 1115 stream->sm_qflags |= SMQF_SEND_WUF; 1116 LSQ_DEBUG("marked to send MAX_STREAM_DATA frame"); 1117 } 1118 else 1119 LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled"); 1120} 1121 1122 1123void 1124lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream) 1125{ 1126 assert(stream->sm_qflags & SMQF_SEND_BLOCKED); 1127 SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT); 1128 stream->sm_qflags &= ~SMQF_SEND_BLOCKED; 1129 stream->stream_flags |= STREAM_BLOCKED_SENT; 1130 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1131 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1132} 1133 1134 1135void 1136lsquic_stream_rst_frame_sent (lsquic_stream_t *stream) 1137{ 1138 assert(stream->sm_qflags & SMQF_SEND_RST); 1139 SM_HISTORY_APPEND(stream, SHE_RST_OUT); 1140 stream->sm_qflags &= ~SMQF_SEND_RST; 1141 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1142 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); 1143 stream->stream_flags |= STREAM_RST_SENT; 1144 maybe_finish_stream(stream); 1145} 1146 1147 1148static size_t 1149read_uh (struct lsquic_stream *stream, 1150 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1151{ 1152 struct http1x_headers *const h1h = stream->uh->uh_hset; 1153 size_t nread; 1154 1155 nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off, 1156 h1h->h1h_size - h1h->h1h_off, 1157 (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0); 1158 h1h->h1h_off += nread; 1159 if (h1h->h1h_off == h1h->h1h_size) 1160 { 1161 LSQ_DEBUG("read all uncompressed headers"); 1162 destroy_uh(stream); 1163 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 1164 { 1165 stream->stream_flags |= STREAM_FIN_REACHED; 1166 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 1167 } 1168 } 1169 return nread; 1170} 1171 1172 1173static void 1174stream_consumed_bytes (struct lsquic_stream *stream) 1175{ 1176 lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset); 1177 if (lsquic_sfcw_fc_offsets_changed(&stream->fc)) 1178 { 1179 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1180 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1181 next_send_stream); 1182 stream->sm_qflags |= SMQF_SEND_WUF; 1183 maybe_conn_to_tickable_if_writeable(stream, 1); 1184 } 1185} 1186 1187 1188struct read_frames_status 1189{ 1190 int error; 1191 int processed_frames; 1192 size_t total_nread; 1193}; 1194 1195 1196static struct read_frames_status 1197read_data_frames (struct lsquic_stream *stream, int do_filtering, 1198 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1199{ 1200 struct data_frame *data_frame; 1201 size_t nread, toread, total_nread; 1202 int short_read, processed_frames; 1203 1204 processed_frames = 0; 1205 total_nread = 0; 1206 1207 while ((data_frame = stream->data_in->di_if->di_get_frame( 1208 stream->data_in, stream->read_offset))) 1209 { 1210 1211 ++processed_frames; 1212 1213 do 1214 { 1215 if (do_filtering && stream->sm_sfi) 1216 toread = stream->sm_sfi->sfi_filter_df(stream, data_frame); 1217 else 1218 toread = data_frame->df_size - data_frame->df_read_off; 1219 1220 if (toread || data_frame->df_fin) 1221 { 1222 nread = readf(ctx, data_frame->df_data + data_frame->df_read_off, 1223 toread, data_frame->df_fin); 1224 if (do_filtering && stream->sm_sfi) 1225 stream->sm_sfi->sfi_decr_left(stream, nread); 1226 data_frame->df_read_off += nread; 1227 stream->read_offset += nread; 1228 total_nread += nread; 1229 short_read = nread < toread; 1230 } 1231 else 1232 short_read = 0; 1233 1234 if (data_frame->df_read_off == data_frame->df_size) 1235 { 1236 const int fin = data_frame->df_fin; 1237 stream->data_in->di_if->di_frame_done(stream->data_in, data_frame); 1238 data_frame = NULL; 1239 if ((stream->sm_bflags & SMBF_AUTOSWITCH) && 1240 (stream->data_in->di_flags & DI_SWITCH_IMPL)) 1241 { 1242 stream->data_in = stream->data_in->di_if->di_switch_impl( 1243 stream->data_in, stream->read_offset); 1244 if (!stream->data_in) 1245 { 1246 stream->data_in = data_in_error_new(); 1247 return (struct read_frames_status) { .error = 1, }; 1248 } 1249 } 1250 if (fin) 1251 { 1252 stream->stream_flags |= STREAM_FIN_REACHED; 1253 goto end_while; 1254 } 1255 } 1256 else if (short_read) 1257 goto end_while; 1258 } 1259 while (data_frame); 1260 } 1261 end_while: 1262 1263 if (processed_frames) 1264 stream_consumed_bytes(stream); 1265 1266 return (struct read_frames_status) { 1267 .error = 0, 1268 .processed_frames = processed_frames, 1269 .total_nread = total_nread, 1270 }; 1271} 1272 1273 1274static ssize_t 1275stream_readf (struct lsquic_stream *stream, 1276 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1277{ 1278 size_t total_nread, nread; 1279 int read_unc_headers; 1280 1281 total_nread = 0; 1282 1283 if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) 1284 == (SMBF_USE_HEADERS|SMBF_IETF) 1285 && !(stream->stream_flags & STREAM_HAVE_UH) 1286 && !stream->uh) 1287 { 1288 if (stream->sm_readable(stream)) 1289 { 1290 if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR) 1291 { 1292 LSQ_INFO("HQ filter hit an error: cannot read from stream"); 1293 errno = EBADMSG; 1294 return -1; 1295 } 1296 assert(stream->uh); 1297 } 1298 else 1299 { 1300 errno = EWOULDBLOCK; 1301 return -1; 1302 } 1303 } 1304 1305 if (stream->uh) 1306 { 1307 if (stream->uh->uh_flags & UH_H1H) 1308 { 1309 nread = read_uh(stream, readf, ctx); 1310 read_unc_headers = nread > 0; 1311 total_nread += nread; 1312 if (stream->uh) 1313 return total_nread; 1314 } 1315 else 1316 { 1317 LSQ_INFO("header set not claimed: cannot read from stream"); 1318 return -1; 1319 } 1320 } 1321 else if ((stream->sm_bflags & SMBF_USE_HEADERS) 1322 && !(stream->stream_flags & STREAM_HAVE_UH)) 1323 { 1324 LSQ_DEBUG("cannot read: headers not available"); 1325 errno = EWOULDBLOCK; 1326 return -1; 1327 } 1328 else 1329 read_unc_headers = 0; 1330 1331 const struct read_frames_status rfs 1332 = read_data_frames(stream, 1, readf, ctx); 1333 if (rfs.error) 1334 return -1; 1335 total_nread += rfs.total_nread; 1336 1337 LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__, 1338 total_nread, stream->read_offset); 1339 1340 if (rfs.processed_frames || read_unc_headers) 1341 { 1342 return total_nread; 1343 } 1344 else 1345 { 1346 assert(0 == total_nread); 1347 errno = EWOULDBLOCK; 1348 return -1; 1349 } 1350} 1351 1352 1353/* This function returns 0 when EOF is reached. 1354 */ 1355ssize_t 1356lsquic_stream_readf (struct lsquic_stream *stream, 1357 size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx) 1358{ 1359 SM_HISTORY_APPEND(stream, SHE_USER_READ); 1360 1361 if (lsquic_stream_is_reset(stream)) 1362 { 1363 if (stream->stream_flags & STREAM_RST_RECVD) 1364 stream->stream_flags |= STREAM_RST_READ; 1365 errno = ECONNRESET; 1366 return -1; 1367 } 1368 if (stream->stream_flags & STREAM_U_READ_DONE) 1369 { 1370 errno = EBADF; 1371 return -1; 1372 } 1373 if ((stream->stream_flags & STREAM_FIN_REACHED) 1374 && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH) 1375 ^ !!(stream->sm_bflags & SMBF_USE_HEADERS))) 1376 return 0; 1377 1378 return stream_readf(stream, readf, ctx); 1379} 1380 1381 1382struct readv_ctx 1383{ 1384 const struct iovec *iov; 1385 const struct iovec *const end; 1386 unsigned char *p; 1387}; 1388 1389 1390static size_t 1391readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin) 1392{ 1393 struct readv_ctx *const ctx = ctx_p; 1394 const unsigned char *const end = buf + len; 1395 size_t ntocopy; 1396 1397 while (ctx->iov < ctx->end && buf < end) 1398 { 1399 ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len 1400 - ctx->p; 1401 if (ntocopy > (size_t) (end - buf)) 1402 ntocopy = end - buf; 1403 memcpy(ctx->p, buf, ntocopy); 1404 ctx->p += ntocopy; 1405 buf += ntocopy; 1406 if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len) 1407 { 1408 do 1409 ++ctx->iov; 1410 while (ctx->iov < ctx->end && ctx->iov->iov_len == 0); 1411 if (ctx->iov < ctx->end) 1412 ctx->p = ctx->iov->iov_base; 1413 else 1414 ctx->p = NULL; 1415 } 1416 } 1417 1418 return len - (end - buf); 1419} 1420 1421 1422ssize_t 1423lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov, 1424 int iovcnt) 1425{ 1426 struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, }; 1427 return lsquic_stream_readf(stream, readv_f, &ctx); 1428} 1429 1430 1431ssize_t 1432lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) 1433{ 1434 struct iovec iov = { .iov_base = buf, .iov_len = len, }; 1435 return lsquic_stream_readv(stream, &iov, 1); 1436} 1437 1438 1439static void 1440stream_shutdown_read (lsquic_stream_t *stream) 1441{ 1442 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1443 { 1444 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); 1445 stream->stream_flags |= STREAM_U_READ_DONE; 1446 stream_wantread(stream, 0); 1447 maybe_finish_stream(stream); 1448 } 1449} 1450 1451 1452static void 1453stream_shutdown_write (lsquic_stream_t *stream) 1454{ 1455 if (stream->stream_flags & STREAM_U_WRITE_DONE) 1456 return; 1457 1458 SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE); 1459 stream->stream_flags |= STREAM_U_WRITE_DONE; 1460 stream_wantwrite(stream, 0); 1461 1462 /* Don't bother to check whether there is anything else to write if 1463 * the flags indicate that nothing else should be written. 1464 */ 1465 if (!(stream->sm_bflags & SMBF_CRYPTO) 1466 && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) 1467 && !(stream->sm_qflags & SMQF_SEND_RST)) 1468 { 1469 if (stream->sm_n_buffered == 0) 1470 { 1471 if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl, 1472 stream)) 1473 { 1474 LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame"); 1475 stream->stream_flags |= STREAM_FIN_SENT; 1476 } 1477 else 1478 { 1479 LSQ_DEBUG("have to create a separate STREAM frame with FIN " 1480 "flag in it"); 1481 (void) stream_flush_nocheck(stream); 1482 } 1483 } 1484 else 1485 (void) stream_flush_nocheck(stream); 1486 } 1487} 1488 1489 1490static void 1491maybe_stream_shutdown_write (struct lsquic_stream *stream) 1492{ 1493 if (stream->sm_send_headers_state == SSHS_BEGIN) 1494 stream_shutdown_write(stream); 1495 else if (0 == (stream->stream_flags & STREAM_DELAYED_SW)) 1496 { 1497 LSQ_DEBUG("shutdown delayed"); 1498 SM_HISTORY_APPEND(stream, SHE_DELAY_SW); 1499 stream->stream_flags |= STREAM_DELAYED_SW; 1500 } 1501} 1502 1503 1504int 1505lsquic_stream_shutdown (lsquic_stream_t *stream, int how) 1506{ 1507 LSQ_DEBUG("shutdown; how: %d", how); 1508 if (lsquic_stream_is_closed(stream)) 1509 { 1510 LSQ_INFO("Attempt to shut down a closed stream"); 1511 errno = EBADF; 1512 return -1; 1513 } 1514 /* 0: read, 1: write: 2: read and write 1515 */ 1516 if (how < 0 || how > 2) 1517 { 1518 errno = EINVAL; 1519 return -1; 1520 } 1521 1522 if (how) 1523 maybe_stream_shutdown_write(stream); 1524 if (how != 1) 1525 stream_shutdown_read(stream); 1526 1527 maybe_finish_stream(stream); 1528 maybe_schedule_call_on_close(stream); 1529 if (how && !(stream->stream_flags & STREAM_DELAYED_SW)) 1530 maybe_conn_to_tickable_if_writeable(stream, 1); 1531 1532 return 0; 1533} 1534 1535 1536void 1537lsquic_stream_shutdown_internal (lsquic_stream_t *stream) 1538{ 1539 LSQ_DEBUG("internal shutdown"); 1540 if (lsquic_stream_is_critical(stream)) 1541 { 1542 LSQ_DEBUG("add flag to force-finish special stream"); 1543 stream->stream_flags |= STREAM_FORCE_FINISH; 1544 SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH); 1545 } 1546 maybe_finish_stream(stream); 1547 maybe_schedule_call_on_close(stream); 1548} 1549 1550 1551static void 1552fake_reset_unused_stream (lsquic_stream_t *stream) 1553{ 1554 stream->stream_flags |= 1555 STREAM_RST_RECVD /* User will pick this up on read or write */ 1556 | STREAM_RST_SENT /* Don't send anything else on this stream */ 1557 ; 1558 1559 /* Cancel all writes to the network scheduled for this stream: */ 1560 if (stream->sm_qflags & SMQF_SENDING_FLAGS) 1561 TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, 1562 next_send_stream); 1563 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 1564 1565 LSQ_DEBUG("fake-reset stream%s", 1566 stream_stalled(stream) ? " (stalled)" : ""); 1567 maybe_finish_stream(stream); 1568 maybe_schedule_call_on_close(stream); 1569} 1570 1571 1572/* This function should only be called for locally-initiated streams whose ID 1573 * is larger than that received in GOAWAY frame. This may occur when GOAWAY 1574 * frame sent by peer but we have not yet received it and created a stream. 1575 * In this situation, we mark the stream as reset, so that user's on_read or 1576 * on_write event callback picks up the error. That, in turn, should result 1577 * in stream being closed. 1578 * 1579 * If we have received any data frames on this stream, this probably indicates 1580 * a bug in peer code: it should not have sent GOAWAY frame with stream ID 1581 * lower than this. However, we still try to handle it gracefully and peform 1582 * a shutdown, as if the stream was not reset. 1583 */ 1584void 1585lsquic_stream_received_goaway (lsquic_stream_t *stream) 1586{ 1587 SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN); 1588 if (0 == stream->read_offset && 1589 stream->data_in->di_if->di_empty(stream->data_in)) 1590 fake_reset_unused_stream(stream); /* Normal condition */ 1591 else 1592 { /* This is odd, let's handle it the best we can: */ 1593 LSQ_WARN("GOAWAY received but have incoming data: shut down instead"); 1594 lsquic_stream_shutdown_internal(stream); 1595 } 1596} 1597 1598 1599uint64_t 1600lsquic_stream_read_offset (const lsquic_stream_t *stream) 1601{ 1602 return stream->read_offset; 1603} 1604 1605 1606static int 1607stream_wantread (lsquic_stream_t *stream, int is_want) 1608{ 1609 const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ); 1610 const int new_val = !!is_want; 1611 if (old_val != new_val) 1612 { 1613 if (new_val) 1614 { 1615 if (!old_val) 1616 TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream, 1617 next_read_stream); 1618 stream->sm_qflags |= SMQF_WANT_READ; 1619 } 1620 else 1621 { 1622 stream->sm_qflags &= ~SMQF_WANT_READ; 1623 if (old_val) 1624 TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, 1625 next_read_stream); 1626 } 1627 } 1628 return old_val; 1629} 1630 1631 1632static void 1633maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1634{ 1635 assert(SMQF_WRITE_Q_FLAGS & flag); 1636 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1637 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 1638 next_write_stream); 1639 stream->sm_qflags |= flag; 1640} 1641 1642 1643static void 1644maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag) 1645{ 1646 assert(SMQF_WRITE_Q_FLAGS & flag); 1647 if (stream->sm_qflags & flag) 1648 { 1649 stream->sm_qflags &= ~flag; 1650 if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS)) 1651 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 1652 next_write_stream); 1653 } 1654} 1655 1656 1657static int 1658stream_wantwrite (struct lsquic_stream *stream, int new_val) 1659{ 1660 const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1661 1662 assert(0 == (new_val & ~1)); /* new_val is either 0 or 1 */ 1663 1664 if (old_val != new_val) 1665 { 1666 if (new_val) 1667 maybe_put_onto_write_q(stream, SMQF_WANT_WRITE); 1668 else 1669 maybe_remove_from_write_q(stream, SMQF_WANT_WRITE); 1670 } 1671 return old_val; 1672} 1673 1674 1675int 1676lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) 1677{ 1678 if (!(stream->stream_flags & STREAM_U_READ_DONE)) 1679 { 1680 if (is_want) 1681 maybe_conn_to_tickable_if_readable(stream); 1682 return stream_wantread(stream, is_want); 1683 } 1684 else 1685 { 1686 errno = EBADF; 1687 return -1; 1688 } 1689} 1690 1691 1692int 1693lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want) 1694{ 1695 int old_val; 1696 1697 is_want = !!is_want; 1698 1699 if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE) 1700 && SSHS_BEGIN == stream->sm_send_headers_state) 1701 { 1702 stream->sm_saved_want_write = is_want; 1703 if (is_want) 1704 maybe_conn_to_tickable_if_writeable(stream, 1); 1705 return stream_wantwrite(stream, is_want); 1706 } 1707 else if (SSHS_BEGIN != stream->sm_send_headers_state) 1708 { 1709 old_val = stream->sm_saved_want_write; 1710 stream->sm_saved_want_write = is_want; 1711 return old_val; 1712 } 1713 else 1714 { 1715 errno = EBADF; 1716 return -1; 1717 } 1718} 1719 1720 1721struct progress 1722{ 1723 enum stream_flags s_flags; 1724 enum stream_q_flags q_flags; 1725}; 1726 1727 1728static struct progress 1729stream_progress (const struct lsquic_stream *stream) 1730{ 1731 return (struct progress) { 1732 .s_flags = stream->stream_flags 1733 & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE), 1734 .q_flags = stream->sm_qflags 1735 & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST), 1736 }; 1737} 1738 1739 1740static int 1741progress_eq (struct progress a, struct progress b) 1742{ 1743 return a.s_flags == b.s_flags && a.q_flags == b.q_flags; 1744} 1745 1746 1747static void 1748stream_dispatch_read_events_loop (lsquic_stream_t *stream) 1749{ 1750 unsigned no_progress_count, no_progress_limit; 1751 struct progress progress; 1752 uint64_t size; 1753 1754 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1755 1756 no_progress_count = 0; 1757 while ((stream->sm_qflags & SMQF_WANT_READ) 1758 && lsquic_stream_readable(stream)) 1759 { 1760 progress = stream_progress(stream); 1761 size = stream->read_offset; 1762 1763 stream->stream_if->on_read(stream, stream->st_ctx); 1764 1765 if (no_progress_limit && size == stream->read_offset && 1766 progress_eq(progress, stream_progress(stream))) 1767 { 1768 ++no_progress_count; 1769 if (no_progress_count >= no_progress_limit) 1770 { 1771 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1772 "progress) in user code reading from stream", 1773 no_progress_count, 1774 no_progress_count == 1 ? "" : "s"); 1775 break; 1776 } 1777 } 1778 else 1779 no_progress_count = 0; 1780 } 1781} 1782 1783 1784static void 1785stream_hblock_sent (struct lsquic_stream *stream) 1786{ 1787 int want_write; 1788 1789 LSQ_DEBUG("header block has been sent: restore default behavior"); 1790 stream->sm_send_headers_state = SSHS_BEGIN; 1791 stream->sm_write_avail = stream_write_avail; 1792 1793 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 1794 if (want_write != stream->sm_saved_want_write) 1795 (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write); 1796 1797 if (stream->stream_flags & STREAM_DELAYED_SW) 1798 { 1799 LSQ_DEBUG("performing delayed shutdown write"); 1800 stream->stream_flags &= ~STREAM_DELAYED_SW; 1801 stream_shutdown_write(stream); 1802 maybe_schedule_call_on_close(stream); 1803 maybe_finish_stream(stream); 1804 maybe_conn_to_tickable_if_writeable(stream, 1); 1805 } 1806} 1807 1808 1809static void 1810on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 1811{ 1812 ssize_t nw; 1813 1814 nw = stream_write_buf(stream, 1815 stream->sm_header_block + stream->sm_hblock_off, 1816 stream->sm_hblock_sz - stream->sm_hblock_off); 1817 if (nw > 0) 1818 { 1819 stream->sm_hblock_off += nw; 1820 if (stream->sm_hblock_off == stream->sm_hblock_sz) 1821 { 1822 stream->stream_flags |= STREAM_HEADERS_SENT; 1823 free(stream->sm_header_block); 1824 stream->sm_header_block = NULL; 1825 stream->sm_hblock_sz = 0; 1826 stream_hblock_sent(stream); 1827 LSQ_DEBUG("header block written out successfully"); 1828 /* TODO: if there was eos, do something else */ 1829 if (stream->sm_qflags & SMQF_WANT_WRITE) 1830 stream->stream_if->on_write(stream, h); 1831 } 1832 else 1833 { 1834 LSQ_DEBUG("wrote %zd bytes more of header block; not done yet", 1835 nw); 1836 } 1837 } 1838 else if (nw < 0) 1839 { 1840 /* XXX What should happen if we hit an error? TODO */ 1841 } 1842} 1843 1844 1845static void 1846(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *, 1847 lsquic_stream_ctx_t *) 1848{ 1849 if (0 == (stream->stream_flags & STREAM_PUSHING) 1850 && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state) 1851 /* Common case */ 1852 return stream->stream_if->on_write; 1853 else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state) 1854 return on_write_header_wrapper; 1855 else 1856 { 1857 assert(stream->stream_flags & STREAM_PUSHING); 1858 if (stream_is_pushing_promise(stream)) 1859 return on_write_pp_wrapper; 1860 else if (stream->sm_dup_push_off < stream->sm_dup_push_len) 1861 return on_write_dp_wrapper; 1862 else 1863 return stream->stream_if->on_write; 1864 } 1865} 1866 1867 1868static void 1869stream_dispatch_write_events_loop (lsquic_stream_t *stream) 1870{ 1871 unsigned no_progress_count, no_progress_limit; 1872 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1873 struct progress progress; 1874 1875 no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; 1876 1877 no_progress_count = 0; 1878 stream->stream_flags |= STREAM_LAST_WRITE_OK; 1879 while ((stream->sm_qflags & SMQF_WANT_WRITE) 1880 && (stream->stream_flags & STREAM_LAST_WRITE_OK) 1881 && lsquic_stream_write_avail(stream)) 1882 { 1883 progress = stream_progress(stream); 1884 1885 on_write = select_on_write(stream); 1886 on_write(stream, stream->st_ctx); 1887 1888 if (no_progress_limit && progress_eq(progress, stream_progress(stream))) 1889 { 1890 ++no_progress_count; 1891 if (no_progress_count >= no_progress_limit) 1892 { 1893 LSQ_WARN("broke suspected infinite loop (%u callback%s without " 1894 "progress) in user code writing to stream", 1895 no_progress_count, 1896 no_progress_count == 1 ? "" : "s"); 1897 break; 1898 } 1899 } 1900 else 1901 no_progress_count = 0; 1902 } 1903} 1904 1905 1906static void 1907stream_dispatch_read_events_once (lsquic_stream_t *stream) 1908{ 1909 if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream)) 1910 { 1911 stream->stream_if->on_read(stream, stream->st_ctx); 1912 } 1913} 1914 1915 1916uint64_t 1917lsquic_stream_combined_send_off (const struct lsquic_stream *stream) 1918{ 1919 size_t frames_sizes; 1920 1921 frames_sizes = active_hq_frame_sizes(stream); 1922 return stream->tosend_off + stream->sm_n_buffered + frames_sizes; 1923} 1924 1925 1926static void 1927maybe_mark_as_blocked (lsquic_stream_t *stream) 1928{ 1929 struct lsquic_conn_cap *cc; 1930 uint64_t used; 1931 1932 used = lsquic_stream_combined_send_off(stream); 1933 if (stream->max_send_off == used) 1934 { 1935 if (stream->blocked_off < stream->max_send_off) 1936 { 1937 stream->blocked_off = used; 1938 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 1939 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 1940 next_send_stream); 1941 stream->sm_qflags |= SMQF_SEND_BLOCKED; 1942 LSQ_DEBUG("marked stream-blocked at stream offset " 1943 "%"PRIu64, stream->blocked_off); 1944 } 1945 else 1946 LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64 1947 " has been, or is about to be, sent", stream->blocked_off); 1948 } 1949 1950 if ((stream->sm_bflags & SMBF_CONN_LIMITED) 1951 && (cc = &stream->conn_pub->conn_cap, 1952 stream->sm_n_buffered == lsquic_conn_cap_avail(cc))) 1953 { 1954 if (cc->cc_blocked < cc->cc_max) 1955 { 1956 cc->cc_blocked = cc->cc_max; 1957 stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED; 1958 LSQ_DEBUG("marked connection-blocked at connection offset " 1959 "%"PRIu64, cc->cc_max); 1960 } 1961 else 1962 LSQ_DEBUG("stream has already been marked connection-blocked " 1963 "at offset %"PRIu64, cc->cc_blocked); 1964 } 1965} 1966 1967 1968void 1969lsquic_stream_dispatch_read_events (lsquic_stream_t *stream) 1970{ 1971 assert(stream->sm_qflags & SMQF_WANT_READ); 1972 1973 if (stream->sm_bflags & SMBF_RW_ONCE) 1974 stream_dispatch_read_events_once(stream); 1975 else 1976 stream_dispatch_read_events_loop(stream); 1977} 1978 1979 1980void 1981lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) 1982{ 1983 void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *); 1984 int progress; 1985 uint64_t tosend_off; 1986 unsigned short n_buffered; 1987 enum stream_q_flags q_flags; 1988 1989 assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS); 1990 q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS; 1991 tosend_off = stream->tosend_off; 1992 n_buffered = stream->sm_n_buffered; 1993 1994 if (stream->sm_qflags & SMQF_WANT_FLUSH) 1995 (void) stream_flush(stream); 1996 1997 if (stream->sm_bflags & SMBF_RW_ONCE) 1998 { 1999 if ((stream->sm_qflags & SMQF_WANT_WRITE) 2000 && lsquic_stream_write_avail(stream)) 2001 { 2002 on_write = select_on_write(stream); 2003 on_write(stream, stream->st_ctx); 2004 } 2005 } 2006 else 2007 stream_dispatch_write_events_loop(stream); 2008 2009 /* Progress means either flags or offsets changed: */ 2010 progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags && 2011 stream->tosend_off == tosend_off && 2012 stream->sm_n_buffered == n_buffered); 2013 2014 if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS) 2015 { 2016 if (progress) 2017 { /* Move the stream to the end of the list to ensure fairness. */ 2018 TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, 2019 next_write_stream); 2020 TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream, 2021 next_write_stream); 2022 } 2023 } 2024} 2025 2026 2027static size_t 2028inner_reader_empty_size (void *ctx) 2029{ 2030 return 0; 2031} 2032 2033 2034static size_t 2035inner_reader_empty_read (void *ctx, void *buf, size_t count) 2036{ 2037 return 0; 2038} 2039 2040 2041static int 2042stream_flush (lsquic_stream_t *stream) 2043{ 2044 struct lsquic_reader empty_reader; 2045 ssize_t nw; 2046 2047 assert(stream->sm_qflags & SMQF_WANT_FLUSH); 2048 assert(stream->sm_n_buffered > 0 || 2049 /* Flushing is also used to packetize standalone FIN: */ 2050 ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) 2051 == STREAM_U_WRITE_DONE)); 2052 2053 empty_reader.lsqr_size = inner_reader_empty_size; 2054 empty_reader.lsqr_read = inner_reader_empty_read; 2055 empty_reader.lsqr_ctx = NULL; /* pro forma */ 2056 nw = stream_write_to_packets(stream, &empty_reader, 0); 2057 2058 if (nw >= 0) 2059 { 2060 assert(nw == 0); /* Empty reader: must have read zero bytes */ 2061 return 0; 2062 } 2063 else 2064 return -1; 2065} 2066 2067 2068static int 2069stream_flush_nocheck (lsquic_stream_t *stream) 2070{ 2071 size_t frames; 2072 2073 frames = active_hq_frame_sizes(stream); 2074 stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames; 2075 stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered; 2076 maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH); 2077 LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to); 2078 2079 return stream_flush(stream); 2080} 2081 2082 2083int 2084lsquic_stream_flush (lsquic_stream_t *stream) 2085{ 2086 if (stream->stream_flags & STREAM_U_WRITE_DONE) 2087 { 2088 LSQ_DEBUG("cannot flush closed stream"); 2089 errno = EBADF; 2090 return -1; 2091 } 2092 2093 if (0 == stream->sm_n_buffered) 2094 { 2095 LSQ_DEBUG("flushing 0 bytes: noop"); 2096 return 0; 2097 } 2098 2099 return stream_flush_nocheck(stream); 2100} 2101 2102 2103static size_t 2104stream_get_n_allowed (const struct lsquic_stream *stream) 2105{ 2106 if (stream->sm_n_allocated) 2107 return stream->sm_n_allocated; 2108 else 2109 return stream->conn_pub->path->np_pack_size; 2110} 2111 2112 2113/* The flush threshold is the maximum size of stream data that can be sent 2114 * in a full packet. 2115 */ 2116#ifdef NDEBUG 2117static 2118#endif 2119 size_t 2120lsquic_stream_flush_threshold (const struct lsquic_stream *stream, 2121 unsigned data_sz) 2122{ 2123 enum packet_out_flags flags; 2124 enum packno_bits bits; 2125 size_t packet_header_sz, stream_header_sz, tag_len; 2126 size_t threshold; 2127 2128 bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl); 2129 flags = bits << POBIT_SHIFT; 2130 if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)) 2131 flags |= PO_CONN_ID; 2132 if (stream_is_hsk(stream)) 2133 flags |= PO_LONGHEAD; 2134 2135 packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags, 2136 stream->conn_pub->path->np_dcid.len); 2137 stream_header_sz = stream->sm_frame_header_sz(stream, data_sz); 2138 tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len; 2139 2140 threshold = stream_get_n_allowed(stream) - tag_len 2141 - packet_header_sz - stream_header_sz; 2142 return threshold; 2143} 2144 2145 2146#define COMMON_WRITE_CHECKS() do { \ 2147 if ((stream->sm_bflags & SMBF_USE_HEADERS) \ 2148 && !(stream->stream_flags & STREAM_HEADERS_SENT)) \ 2149 { \ 2150 if (SSHS_BEGIN != stream->sm_send_headers_state) \ 2151 { \ 2152 LSQ_DEBUG("still sending headers: no writing allowed"); \ 2153 return 0; \ 2154 } \ 2155 else \ 2156 { \ 2157 LSQ_INFO("Attempt to write to stream before sending HTTP " \ 2158 "headers"); \ 2159 errno = EILSEQ; \ 2160 return -1; \ 2161 } \ 2162 } \ 2163 if (lsquic_stream_is_reset(stream)) \ 2164 { \ 2165 LSQ_INFO("Attempt to write to stream after it had been reset"); \ 2166 errno = ECONNRESET; \ 2167 return -1; \ 2168 } \ 2169 if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT)) \ 2170 { \ 2171 LSQ_INFO("Attempt to write to stream after it was closed for " \ 2172 "writing"); \ 2173 errno = EBADF; \ 2174 return -1; \ 2175 } \ 2176} while (0) 2177 2178 2179struct frame_gen_ctx 2180{ 2181 lsquic_stream_t *fgc_stream; 2182 struct lsquic_reader *fgc_reader; 2183 /* We keep our own count of how many bytes were read from reader because 2184 * some readers are external. The external caller does not have to rely 2185 * on our count, but it can. 2186 */ 2187 size_t fgc_nread_from_reader; 2188 size_t (*fgc_size) (void *ctx); 2189 int (*fgc_fin) (void *ctx); 2190 gsf_read_f fgc_read; 2191}; 2192 2193 2194static size_t 2195frame_std_gen_size (void *ctx) 2196{ 2197 struct frame_gen_ctx *fg_ctx = ctx; 2198 size_t available, remaining; 2199 2200 /* Make sure we are not writing past available size: */ 2201 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2202 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2203 if (available < remaining) 2204 remaining = available; 2205 2206 return remaining + fg_ctx->fgc_stream->sm_n_buffered; 2207} 2208 2209 2210static size_t 2211stream_hq_frame_size (const struct stream_hq_frame *shf) 2212{ 2213 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2214 return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0); 2215 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE) 2216 return 1 + (1 << vint_val2bits(shf->shf_frame_size)); 2217 else 2218 { 2219 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2220 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2221 return 0; 2222 } 2223} 2224 2225 2226static size_t 2227active_hq_frame_sizes (const struct lsquic_stream *stream) 2228{ 2229 const struct stream_hq_frame *shf; 2230 size_t size; 2231 2232 size = 0; 2233 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2234 == (SMBF_IETF|SMBF_USE_HEADERS)) 2235 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2236 if (!(shf->shf_flags & SHF_WRITTEN)) 2237 size += stream_hq_frame_size(shf); 2238 2239 return size; 2240} 2241 2242 2243static uint64_t 2244stream_hq_frame_end (const struct stream_hq_frame *shf) 2245{ 2246 if (shf->shf_flags & SHF_FIXED_SIZE) 2247 return shf->shf_off + shf->shf_frame_size; 2248 else if (shf->shf_flags & SHF_TWO_BYTES) 2249 return shf->shf_off + ((1 << 14) - 1); 2250 else 2251 return shf->shf_off + ((1 << 6) - 1); 2252} 2253 2254 2255static int 2256frame_in_stream (const struct lsquic_stream *stream, 2257 const struct stream_hq_frame *shf) 2258{ 2259 return shf >= stream->sm_hq_frame_arr 2260 && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr) 2261 / sizeof(stream->sm_hq_frame_arr[0]) 2262 ; 2263} 2264 2265 2266static void 2267stream_hq_frame_put (struct lsquic_stream *stream, 2268 struct stream_hq_frame *shf) 2269{ 2270 assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf); 2271 STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next); 2272 if (frame_in_stream(stream, shf)) 2273 memset(shf, 0, sizeof(*shf)); 2274 else 2275 lsquic_malo_put(shf); 2276} 2277 2278 2279static void 2280stream_hq_frame_close (struct lsquic_stream *stream, 2281 struct stream_hq_frame *shf) 2282{ 2283 unsigned bits; 2284 2285 LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64 2286 " (actual offset %"PRIu64")", shf->shf_frame_type, 2287 stream->sm_payload, stream->tosend_off); 2288 assert(shf->shf_flags & SHF_ACTIVE); 2289 if (!(shf->shf_flags & SHF_FIXED_SIZE)) 2290 { 2291 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2292 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2293 vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off, 2294 bits, 1 << bits); 2295 } 2296 stream_hq_frame_put(stream, shf); 2297} 2298 2299 2300static size_t 2301frame_hq_gen_size (void *ctx) 2302{ 2303 struct frame_gen_ctx *fg_ctx = ctx; 2304 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2305 size_t available, remaining, frames; 2306 const struct stream_hq_frame *shf; 2307 2308 frames = 0; 2309 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2310 if (shf->shf_off >= stream->sm_payload) 2311 frames += stream_hq_frame_size(shf); 2312 2313 /* Make sure we are not writing past available size: */ 2314 remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2315 available = lsquic_stream_write_avail(stream); 2316 if (available < remaining) 2317 remaining = available; 2318 2319 return remaining + stream->sm_n_buffered + frames; 2320} 2321 2322 2323static int 2324frame_std_gen_fin (void *ctx) 2325{ 2326 struct frame_gen_ctx *fg_ctx = ctx; 2327 return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO) 2328 && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE) 2329 && 0 == fg_ctx->fgc_stream->sm_n_buffered 2330 /* Do not use frame_std_gen_size() as it may chop the real size: */ 2331 && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); 2332} 2333 2334 2335static void 2336incr_conn_cap (struct lsquic_stream *stream, size_t incr) 2337{ 2338 if (stream->sm_bflags & SMBF_CONN_LIMITED) 2339 { 2340 stream->conn_pub->conn_cap.cc_sent += incr; 2341 assert(stream->conn_pub->conn_cap.cc_sent 2342 <= stream->conn_pub->conn_cap.cc_max); 2343 } 2344} 2345 2346 2347void 2348incr_sm_payload (struct lsquic_stream *stream, size_t incr) 2349{ 2350 stream->sm_payload += incr; 2351 stream->tosend_off += incr; 2352 assert(stream->tosend_off <= stream->max_send_off); 2353} 2354 2355 2356static size_t 2357frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2358{ 2359 struct frame_gen_ctx *fg_ctx = ctx; 2360 unsigned char *p = begin_buf; 2361 unsigned char *const end = p + len; 2362 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2363 size_t n_written, available, n_to_write; 2364 2365 if (stream->sm_n_buffered > 0) 2366 { 2367 if (len <= stream->sm_n_buffered) 2368 { 2369 memcpy(p, stream->sm_buf, len); 2370 memmove(stream->sm_buf, stream->sm_buf + len, 2371 stream->sm_n_buffered - len); 2372 stream->sm_n_buffered -= len; 2373 if (0 == stream->sm_n_buffered) 2374 maybe_resize_stream_buffer(stream); 2375 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 2376 incr_sm_payload(stream, len); 2377 *fin = fg_ctx->fgc_fin(fg_ctx); 2378 return len; 2379 } 2380 memcpy(p, stream->sm_buf, stream->sm_n_buffered); 2381 p += stream->sm_n_buffered; 2382 stream->sm_n_buffered = 0; 2383 maybe_resize_stream_buffer(stream); 2384 } 2385 2386 available = lsquic_stream_write_avail(fg_ctx->fgc_stream); 2387 n_to_write = end - p; 2388 if (n_to_write > available) 2389 n_to_write = available; 2390 n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p, 2391 n_to_write); 2392 p += n_written; 2393 fg_ctx->fgc_nread_from_reader += n_written; 2394 *fin = fg_ctx->fgc_fin(fg_ctx); 2395 incr_sm_payload(stream, p - (const unsigned char *) begin_buf); 2396 incr_conn_cap(stream, n_written); 2397 return p - (const unsigned char *) begin_buf; 2398} 2399 2400 2401static struct stream_hq_frame * 2402find_hq_frame (const struct lsquic_stream *stream, uint64_t off) 2403{ 2404 struct stream_hq_frame *shf; 2405 2406 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2407 if (shf->shf_off <= off && stream_hq_frame_end(shf) > off) 2408 return shf; 2409 2410 return NULL; 2411} 2412 2413 2414static struct stream_hq_frame * 2415find_cur_hq_frame (const struct lsquic_stream *stream) 2416{ 2417 return find_hq_frame(stream, stream->sm_payload); 2418} 2419 2420 2421static struct stream_hq_frame * 2422open_hq_frame (struct lsquic_stream *stream) 2423{ 2424 struct stream_hq_frame *shf; 2425 2426 for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr 2427 + sizeof(stream->sm_hq_frame_arr) 2428 / sizeof(stream->sm_hq_frame_arr[0]); ++shf) 2429 if (!(shf->shf_flags & SHF_ACTIVE)) 2430 goto found; 2431 2432 shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame); 2433 if (!shf) 2434 { 2435 LSQ_WARN("cannot allocate HQ frame"); 2436 return NULL; 2437 } 2438 memset(shf, 0, sizeof(*shf)); 2439 2440 found: 2441 STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next); 2442 shf->shf_flags = SHF_ACTIVE; 2443 return shf; 2444} 2445 2446 2447/* Returns index of the new frame */ 2448static struct stream_hq_frame * 2449stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, 2450 enum hq_frame_type frame_type, enum shf_flags flags, size_t size) 2451{ 2452 struct stream_hq_frame *shf; 2453 2454 shf = open_hq_frame(stream); 2455 if (!shf) 2456 { 2457 LSQ_WARN("could not open HQ frame"); 2458 return NULL; 2459 } 2460 2461 shf->shf_off = off; 2462 shf->shf_flags |= flags; 2463 shf->shf_frame_type = frame_type; 2464 if (shf->shf_flags & SHF_FIXED_SIZE) 2465 shf->shf_frame_size = size; 2466 else 2467 { 2468 shf->shf_frame_ptr = NULL; 2469 if (size >= (1 << 6)) 2470 shf->shf_flags |= SHF_TWO_BYTES; 2471 } 2472 2473 return shf; 2474} 2475 2476 2477static size_t 2478frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) 2479{ 2480 struct frame_gen_ctx *fg_ctx = ctx; 2481 unsigned char *p = begin_buf; 2482 unsigned char *const end = p + len; 2483 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2484 struct stream_hq_frame *shf; 2485 size_t nw, frame_sz, avail, rem; 2486 unsigned bits; 2487 2488 while (p < end) 2489 { 2490 shf = find_cur_hq_frame(stream); 2491 if (!shf) 2492 { 2493 rem = frame_std_gen_size(ctx); 2494 if (rem) 2495 { 2496 if (rem > ((1 << 14) - 1)) 2497 rem = (1 << 14) - 1; 2498 shf = stream_activate_hq_frame(stream, 2499 stream->sm_payload, HQFT_DATA, 0, rem); 2500 /* XXX malloc can fail */ 2501 } 2502 else 2503 break; 2504 } 2505 avail = stream->sm_n_buffered + stream->sm_write_avail(stream); 2506 if (shf->shf_off == stream->sm_payload 2507 && !(shf->shf_flags & SHF_WRITTEN)) 2508 { 2509 frame_sz = stream_hq_frame_size(shf); 2510 if (frame_sz > (uintptr_t) (end - p)) 2511 break; 2512 LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload " 2513 "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz, 2514 shf->shf_frame_type, stream->sm_payload, stream->tosend_off); 2515 if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) 2516 { 2517 shf->shf_frame_ptr = p; 2518 memset(p, 0, frame_sz); 2519 p += frame_sz; 2520 } 2521 else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2522 == SHF_FIXED_SIZE) 2523 { 2524 *p++ = shf->shf_frame_type; 2525 bits = vint_val2bits(shf->shf_frame_size); 2526 vint_write(p, shf->shf_frame_size, bits, 1 << bits); 2527 p += 1 << bits; 2528 } 2529 else 2530 assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) 2531 == (SHF_FIXED_SIZE|SHF_PHANTOM)); 2532 if (!(shf->shf_flags & SHF_CC_PAID)) 2533 { 2534 incr_conn_cap(stream, frame_sz); 2535 shf->shf_flags |= SHF_CC_PAID; 2536 } 2537 shf->shf_flags |= SHF_WRITTEN; 2538 stream->tosend_off += frame_sz; 2539 assert(stream->tosend_off <= stream->max_send_off); 2540 } 2541 else 2542 { 2543 len = stream_hq_frame_end(shf) - stream->sm_payload; 2544 assert(len); 2545 if (len > (unsigned) (end - p)) 2546 len = end - p; 2547 if (len > avail) 2548 len = avail; 2549 if (!len) 2550 break; 2551 nw = frame_std_gen_read(ctx, p, len, fin); 2552 p += nw; 2553 if (nw < len) 2554 break; 2555 if (stream_hq_frame_end(shf) == stream->sm_payload) 2556 stream_hq_frame_close(stream, shf); 2557 } 2558 } 2559 2560 return p - (unsigned char *) begin_buf; 2561} 2562 2563 2564static size_t 2565crypto_frame_gen_read (void *ctx, void *buf, size_t len) 2566{ 2567 int fin_ignored; 2568 2569 return frame_std_gen_read(ctx, buf, len, &fin_ignored); 2570} 2571 2572 2573static void 2574check_flush_threshold (lsquic_stream_t *stream) 2575{ 2576 if ((stream->sm_qflags & SMQF_WANT_FLUSH) && 2577 stream->tosend_off >= stream->sm_flush_to) 2578 { 2579 LSQ_DEBUG("flushed to or past required offset %"PRIu64, 2580 stream->sm_flush_to); 2581 maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH); 2582 } 2583} 2584 2585 2586static int 2587write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size, 2588 struct lsquic_packet_out *packet_out) 2589{ 2590 lsquic_stream_t *const stream = fg_ctx->fgc_stream; 2591 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2592 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2593 unsigned off; 2594 int len, s; 2595 2596#if LSQUIC_CONN_STATS 2597 const uint64_t begin_off = stream->tosend_off; 2598#endif 2599 off = packet_out->po_data_sz; 2600 len = pf->pf_gen_stream_frame( 2601 packet_out->po_data + packet_out->po_data_sz, 2602 lsquic_packet_out_avail(packet_out), stream->id, 2603 stream->tosend_off, 2604 fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx); 2605 if (len < 0) 2606 return len; 2607 2608#if LSQUIC_CONN_STATS 2609 stream->conn_pub->conn_stats->out.stream_frames += 1; 2610 stream->conn_pub->conn_stats->out.stream_data_sz 2611 += stream->tosend_off - begin_off; 2612#endif 2613 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf, 2614 packet_out->po_data + packet_out->po_data_sz, len); 2615 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2616 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 2617 if (0 == lsquic_packet_out_avail(packet_out)) 2618 packet_out->po_flags |= PO_STREAM_END; 2619 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2620 stream, QUIC_FRAME_STREAM, off, len); 2621 if (s != 0) 2622 { 2623 LSQ_ERROR("adding stream to packet failed: %s", strerror(errno)); 2624 return -1; 2625 } 2626 2627 check_flush_threshold(stream); 2628 return len; 2629} 2630 2631 2632static enum swtp_status 2633stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size) 2634{ 2635 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2636 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2637 struct lsquic_packet_out *packet_out; 2638 int len; 2639 2640 packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP, 2641 stream->conn_pub->path); 2642 if (!packet_out) 2643 return SWTP_STOP; 2644 packet_out->po_header_type = stream->tosend_off == 0 2645 ? HETY_INITIAL : HETY_HANDSHAKE; 2646 2647 len = write_stream_frame(fg_ctx, size, packet_out); 2648 2649 if (len > 0) 2650 { 2651 packet_out->po_flags |= PO_HELLO; 2652 lsquic_packet_out_zero_pad(packet_out); 2653 lsquic_send_ctl_scheduled_one(send_ctl, packet_out); 2654 return SWTP_OK; 2655 } 2656 else 2657 return SWTP_ERROR; 2658} 2659 2660 2661static enum swtp_status 2662stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size) 2663{ 2664 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2665 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2666 unsigned stream_header_sz, need_at_least; 2667 struct lsquic_packet_out *packet_out; 2668 int len; 2669 2670 if (!(stream->sm_bflags & SMBF_IETF) 2671 && (stream->stream_flags & 2672 (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED)) 2673 == STREAM_HEADERS_SENT 2674 && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream)) 2675 { 2676 /* TODO: make this logic work for IETF streams as well XXX */ 2677 struct lsquic_stream *const headers_stream 2678 = lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs); 2679 if (lsquic_stream_has_data_to_flush(headers_stream)) 2680 { 2681 LSQ_DEBUG("flushing headers stream before potential write to a " 2682 "buffered packet"); 2683 (void) lsquic_stream_flush(headers_stream); 2684 } 2685 else 2686 /* Some other stream must have flushed it: this means our headers 2687 * are flushed. 2688 */ 2689 stream->stream_flags |= STREAM_HDRS_FLUSHED; 2690 } 2691 2692 stream_header_sz = stream->sm_frame_header_sz(stream, size); 2693 need_at_least = stream_header_sz + (size > 0); 2694 get_packet: 2695 packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl, 2696 need_at_least, stream->conn_pub->path, stream); 2697 if (packet_out) 2698 { 2699 len = write_stream_frame(fg_ctx, size, packet_out); 2700 if (len > 0) 2701 return SWTP_OK; 2702 assert(len < 0); 2703 if (-len > (int) need_at_least) 2704 { 2705 LSQ_DEBUG("need more room (%d bytes) than initially calculated " 2706 "%u bytes, will try again", -len, need_at_least); 2707 need_at_least = -len; 2708 goto get_packet; 2709 } 2710 return SWTP_ERROR; 2711 } 2712 else 2713 return SWTP_STOP; 2714} 2715 2716 2717static enum swtp_status 2718stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size) 2719{ 2720 struct lsquic_stream *const stream = fg_ctx->fgc_stream; 2721 struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl; 2722 const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf; 2723 unsigned crypto_header_sz, need_at_least; 2724 struct lsquic_packet_out *packet_out; 2725 unsigned short off; 2726 const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ]; 2727 int len, s; 2728 2729 assert(size > 0); 2730 crypto_header_sz = stream->sm_frame_header_sz(stream, size); 2731 need_at_least = crypto_header_sz + 1; 2732 2733 packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl, 2734 need_at_least, pns, stream->conn_pub->path); 2735 if (!packet_out) 2736 return SWTP_STOP; 2737 2738 off = packet_out->po_data_sz; 2739 len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz, 2740 lsquic_packet_out_avail(packet_out), stream->tosend_off, 2741 size, crypto_frame_gen_read, fg_ctx); 2742 if (len < 0) 2743 return len; 2744 2745 EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf, 2746 packet_out->po_data + packet_out->po_data_sz, len); 2747 lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len); 2748 packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO; 2749 s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm, 2750 stream, QUIC_FRAME_CRYPTO, off, len); 2751 if (s != 0) 2752 { 2753 LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno)); 2754 return -1; 2755 } 2756 2757 packet_out->po_flags |= PO_HELLO; 2758 2759 check_flush_threshold(stream); 2760 return SWTP_OK; 2761} 2762 2763 2764static void 2765abort_connection (struct lsquic_stream *stream) 2766{ 2767 if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS)) 2768 TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream, 2769 next_service_stream); 2770 stream->sm_qflags |= SMQF_ABORT_CONN; 2771 LSQ_WARN("connection will be aborted"); 2772 maybe_conn_to_tickable(stream); 2773} 2774 2775 2776static void 2777maybe_close_varsize_hq_frame (struct lsquic_stream *stream) 2778{ 2779 struct stream_hq_frame *shf; 2780 uint64_t size; 2781 unsigned bits; 2782 2783 shf = find_cur_hq_frame(stream); 2784 if (!shf) 2785 return; 2786 2787 if (shf->shf_flags & SHF_FIXED_SIZE) 2788 { 2789 stream_hq_frame_put(stream, shf); 2790 return; 2791 } 2792 2793 bits = (shf->shf_flags & SHF_TWO_BYTES) > 0; 2794 size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off; 2795 if (size && size <= VINT_MAX_B(bits)) 2796 { 2797 if (0 == stream->sm_n_buffered) 2798 LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64, 2799 shf->shf_frame_type, size); 2800 else 2801 LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64, 2802 shf->shf_frame_type, size); 2803 shf->shf_frame_ptr[0] = shf->shf_frame_type; 2804 vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits); 2805 if (0 == stream->sm_n_buffered) 2806 stream_hq_frame_put(stream, shf); 2807 else 2808 { 2809 shf->shf_frame_size = size; 2810 shf->shf_flags |= SHF_FIXED_SIZE; 2811 } 2812 } 2813 else if (!size) 2814 { 2815 assert(!shf->shf_frame_ptr); 2816 LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")", 2817 shf->shf_frame_type, shf->shf_off); 2818 stream_hq_frame_put(stream, shf); 2819 } 2820 else 2821 { 2822 assert(stream->sm_n_buffered); 2823 LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size); 2824 /* TODO: abort connection */ 2825 stream_hq_frame_put(stream, shf); 2826 } 2827} 2828 2829 2830static ssize_t 2831stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, 2832 size_t thresh) 2833{ 2834 size_t size; 2835 ssize_t nw; 2836 unsigned seen_ok; 2837 int use_framing; 2838 struct frame_gen_ctx fg_ctx = { 2839 .fgc_stream = stream, 2840 .fgc_reader = reader, 2841 .fgc_nread_from_reader = 0, 2842 }; 2843 2844 use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 2845 == (SMBF_IETF|SMBF_USE_HEADERS); 2846 if (use_framing) 2847 { 2848 fg_ctx.fgc_size = frame_hq_gen_size; 2849 fg_ctx.fgc_read = frame_hq_gen_read; 2850 fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */ 2851 } 2852 else 2853 { 2854 fg_ctx.fgc_size = frame_std_gen_size; 2855 fg_ctx.fgc_read = frame_std_gen_read; 2856 fg_ctx.fgc_fin = frame_std_gen_fin; 2857 } 2858 2859 seen_ok = 0; 2860 while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0) 2861 || fg_ctx.fgc_fin(&fg_ctx)) 2862 { 2863 switch (stream->sm_write_to_packet(&fg_ctx, size)) 2864 { 2865 case SWTP_OK: 2866 if (!seen_ok++) 2867 maybe_conn_to_tickable_if_writeable(stream, 0); 2868 if (fg_ctx.fgc_fin(&fg_ctx)) 2869 { 2870 if (use_framing && seen_ok) 2871 maybe_close_varsize_hq_frame(stream); 2872 stream->stream_flags |= STREAM_FIN_SENT; 2873 goto end; 2874 } 2875 else 2876 break; 2877 case SWTP_STOP: 2878 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2879 if (use_framing && seen_ok) 2880 maybe_close_varsize_hq_frame(stream); 2881 goto end; 2882 default: 2883 abort_connection(stream); 2884 stream->stream_flags &= ~STREAM_LAST_WRITE_OK; 2885 return -1; 2886 } 2887 } 2888 2889 if (use_framing && seen_ok) 2890 maybe_close_varsize_hq_frame(stream); 2891 2892 if (thresh) 2893 { 2894 assert(size < thresh); 2895 assert(size >= stream->sm_n_buffered); 2896 size -= stream->sm_n_buffered; 2897 if (size > 0) 2898 { 2899 nw = save_to_buffer(stream, reader, size); 2900 if (nw < 0) 2901 return -1; 2902 fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ 2903 } 2904 } 2905 else 2906 { 2907 /* We count flushed data towards both stream and connection limits, 2908 * so we should have been able to packetize all of it: 2909 */ 2910 assert(0 == stream->sm_n_buffered); 2911 assert(size == 0); 2912 } 2913 2914 maybe_mark_as_blocked(stream); 2915 2916 end: 2917 return fg_ctx.fgc_nread_from_reader; 2918} 2919 2920 2921/* Perform an implicit flush when we hit connection limit while buffering 2922 * data. This is to prevent a (theoretical) stall: 2923 * 2924 * Imagine a number of streams, all of which buffered some data. The buffered 2925 * data is up to connection cap, which means no further writes are possible. 2926 * None of them flushes, which means that data is not sent and connection 2927 * WINDOW_UPDATE frame never arrives from peer. Stall. 2928 */ 2929static int 2930maybe_flush_stream (struct lsquic_stream *stream) 2931{ 2932 if (stream->sm_n_buffered > 0 2933 && (stream->sm_bflags & SMBF_CONN_LIMITED) 2934 && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0) 2935 return stream_flush_nocheck(stream); 2936 else 2937 return 0; 2938} 2939 2940 2941static int 2942stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off, 2943 unsigned len) 2944{ 2945 return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0 2946 && cur_off - shf->shf_off < (1 << 6) 2947 && cur_off - shf->shf_off + len >= (1 << 6) 2948 ; 2949} 2950 2951 2952/* Update currently buffered HQ frame or create a new one, if possible. 2953 * Return update length to be buffered. If a HQ frame cannot be 2954 * buffered due to size, 0 is returned, thereby preventing both HQ frame 2955 * creation and buffering. 2956 */ 2957static size_t 2958update_buffered_hq_frames (struct lsquic_stream *stream, size_t len, 2959 size_t avail) 2960{ 2961 struct stream_hq_frame *shf; 2962 uint64_t cur_off, end; 2963 size_t frame_sz; 2964 int extendable; 2965 2966 cur_off = stream->sm_payload + stream->sm_n_buffered; 2967 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 2968 if (shf->shf_off <= cur_off) 2969 { 2970 end = stream_hq_frame_end(shf); 2971 extendable = stream_hq_frame_extendable(shf, cur_off, len); 2972 if (cur_off < end + extendable) 2973 break; 2974 } 2975 2976 if (shf) 2977 { 2978 if (len > end - cur_off) 2979 len = end - cur_off; 2980 frame_sz = stream_hq_frame_size(shf); 2981 } 2982 else if (avail >= 3) 2983 { 2984 shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len); 2985 /* XXX malloc can fail */ 2986 if (len > stream_hq_frame_end(shf) - cur_off) 2987 len = stream_hq_frame_end(shf) - cur_off; 2988 extendable = 0; 2989 frame_sz = stream_hq_frame_size(shf); 2990 if (avail < frame_sz) 2991 return 0; 2992 avail -= frame_sz; 2993 } 2994 else 2995 return 0; 2996 2997 if (!(shf->shf_flags & SHF_CC_PAID)) 2998 { 2999 incr_conn_cap(stream, frame_sz); 3000 shf->shf_flags |= SHF_CC_PAID; 3001 } 3002 if (extendable) 3003 { 3004 shf->shf_flags |= SHF_TWO_BYTES; 3005 incr_conn_cap(stream, 1); 3006 avail -= 1; 3007 if ((stream->sm_qflags & SMQF_WANT_FLUSH) 3008 && shf->shf_off <= stream->sm_payload 3009 && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload) 3010 stream->sm_flush_to += 1; 3011 } 3012 3013 if (len <= avail) 3014 return len; 3015 else 3016 return avail; 3017} 3018 3019 3020static ssize_t 3021save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, 3022 size_t len) 3023{ 3024 size_t avail, n_written, n_allowed; 3025 3026 n_allowed = stream_get_n_allowed(stream); 3027 assert(stream->sm_n_buffered + len <= n_allowed); 3028 3029 if (!stream->sm_buf) 3030 { 3031 stream->sm_buf = malloc(n_allowed); 3032 if (!stream->sm_buf) 3033 return -1; 3034 stream->sm_n_allocated = n_allowed; 3035 } 3036 3037 avail = lsquic_stream_write_avail(stream); 3038 if (avail < len) 3039 len = avail; 3040 3041 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3042 == (SMBF_IETF|SMBF_USE_HEADERS)) 3043 len = update_buffered_hq_frames(stream, len, avail); 3044 3045 n_written = reader->lsqr_read(reader->lsqr_ctx, 3046 stream->sm_buf + stream->sm_n_buffered, len); 3047 stream->sm_n_buffered += n_written; 3048 assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered); 3049 incr_conn_cap(stream, n_written); 3050 LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer", 3051 n_written, stream->sm_n_buffered); 3052 if (0 != maybe_flush_stream(stream)) 3053 return -1; 3054 return n_written; 3055} 3056 3057 3058static ssize_t 3059stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) 3060{ 3061 const struct stream_hq_frame *shf; 3062 size_t thresh, len, frames, total_len, n_allowed, nwritten; 3063 ssize_t nw; 3064 3065 len = reader->lsqr_size(reader->lsqr_ctx); 3066 if (len == 0) 3067 return 0; 3068 3069 frames = 0; 3070 if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS)) 3071 == (SMBF_IETF|SMBF_USE_HEADERS)) 3072 STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next) 3073 if (shf->shf_off >= stream->sm_payload) 3074 frames += stream_hq_frame_size(shf); 3075 total_len = len + frames + stream->sm_n_buffered; 3076 thresh = lsquic_stream_flush_threshold(stream, total_len); 3077 n_allowed = stream_get_n_allowed(stream); 3078 if (total_len <= n_allowed && total_len < thresh) 3079 { 3080 nwritten = 0; 3081 do 3082 { 3083 nw = save_to_buffer(stream, reader, len - nwritten); 3084 if (nw > 0) 3085 nwritten += (size_t) nw; 3086 else if (nw == 0) 3087 break; 3088 else 3089 return nw; 3090 } 3091 while (nwritten < len 3092 && stream->sm_n_buffered < stream->sm_n_allocated); 3093 return nwritten; 3094 } 3095 else 3096 return stream_write_to_packets(stream, reader, thresh); 3097} 3098 3099 3100ssize_t 3101lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len) 3102{ 3103 struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, }; 3104 return lsquic_stream_writev(stream, &iov, 1); 3105} 3106 3107 3108struct inner_reader_iovec { 3109 const struct iovec *iov; 3110 const struct iovec *end; 3111 unsigned cur_iovec_off; 3112}; 3113 3114 3115static size_t 3116inner_reader_iovec_read (void *ctx, void *buf, size_t count) 3117{ 3118 struct inner_reader_iovec *const iro = ctx; 3119 unsigned char *p = buf; 3120 unsigned char *const end = p + count; 3121 unsigned n_tocopy; 3122 3123 while (iro->iov < iro->end && p < end) 3124 { 3125 n_tocopy = iro->iov->iov_len - iro->cur_iovec_off; 3126 if (n_tocopy > (unsigned) (end - p)) 3127 n_tocopy = end - p; 3128 memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off, 3129 n_tocopy); 3130 p += n_tocopy; 3131 iro->cur_iovec_off += n_tocopy; 3132 if (iro->iov->iov_len == iro->cur_iovec_off) 3133 { 3134 ++iro->iov; 3135 iro->cur_iovec_off = 0; 3136 } 3137 } 3138 3139 return p + count - end; 3140} 3141 3142 3143static size_t 3144inner_reader_iovec_size (void *ctx) 3145{ 3146 struct inner_reader_iovec *const iro = ctx; 3147 const struct iovec *iov; 3148 size_t size; 3149 3150 size = 0; 3151 for (iov = iro->iov; iov < iro->end; ++iov) 3152 size += iov->iov_len; 3153 3154 return size - iro->cur_iovec_off; 3155} 3156 3157 3158ssize_t 3159lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, 3160 int iovcnt) 3161{ 3162 COMMON_WRITE_CHECKS(); 3163 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3164 3165 struct inner_reader_iovec iro = { 3166 .iov = iov, 3167 .end = iov + iovcnt, 3168 .cur_iovec_off = 0, 3169 }; 3170 struct lsquic_reader reader = { 3171 .lsqr_read = inner_reader_iovec_read, 3172 .lsqr_size = inner_reader_iovec_size, 3173 .lsqr_ctx = &iro, 3174 }; 3175 3176 return stream_write(stream, &reader); 3177} 3178 3179 3180ssize_t 3181lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) 3182{ 3183 COMMON_WRITE_CHECKS(); 3184 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); 3185 return stream_write(stream, reader); 3186} 3187 3188 3189/* This bypasses COMMON_WRITE_CHECKS */ 3190static ssize_t 3191stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) 3192{ 3193 struct iovec iov = { (void *) buf, sz, }; 3194 struct inner_reader_iovec iro = { 3195 .iov = &iov, 3196 .end = &iov + 1, 3197 .cur_iovec_off = 0, 3198 }; 3199 struct lsquic_reader reader = { 3200 .lsqr_read = inner_reader_iovec_read, 3201 .lsqr_size = inner_reader_iovec_size, 3202 .lsqr_ctx = &iro, 3203 }; 3204 return stream_write(stream, &reader); 3205} 3206 3207 3208/* XXX Move this define elsewhere? */ 3209#define MAX_HEADERS_SIZE (64 * 1024) 3210 3211static int 3212send_headers_ietf (struct lsquic_stream *stream, 3213 const struct lsquic_http_headers *headers, int eos) 3214{ 3215 enum qwh_status qwh; 3216 const size_t max_prefix_size = 3217 lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh); 3218 const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */; 3219 size_t prefix_sz, headers_sz, hblock_sz, push_sz; 3220 unsigned bits; 3221 ssize_t nw; 3222 unsigned char *header_block; 3223 unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE]; 3224 3225 stream->stream_flags &= ~STREAM_PUSHING; 3226 stream->stream_flags |= STREAM_NOPUSH; 3227 3228 /* TODO: Optimize for the common case: write directly to sm_buf and fall 3229 * back to a larger buffer if that fails. 3230 */ 3231 prefix_sz = max_prefix_size; 3232 headers_sz = sizeof(buf) - max_prefix_size - max_push_size; 3233 qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0, 3234 headers, buf + max_push_size + max_prefix_size, &prefix_sz, 3235 &headers_sz, &stream->sm_hb_compl); 3236 3237 if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL)) 3238 { 3239 if (qwh == QWH_ENOBUF) 3240 LSQ_INFO("not enough room for header block"); 3241 else 3242 LSQ_WARN("internal error encoding and sending HTTP headers"); 3243 return -1; 3244 } 3245 3246 if (stream->sm_promise) 3247 { 3248 assert(lsquic_stream_is_pushed(stream)); 3249 bits = vint_val2bits(stream->sm_promise->pp_id); 3250 push_sz = 1 + (1 << bits); 3251 if (!stream_activate_hq_frame(stream, 3252 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE, 3253 SHF_FIXED_SIZE|SHF_PHANTOM, push_sz)) 3254 return -1; 3255 buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH; 3256 vint_write(buf + max_push_size + max_prefix_size - prefix_sz 3257 - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits); 3258 } 3259 else 3260 push_sz = 0; 3261 3262 /* Construct contiguous header block buffer including HQ framing */ 3263 header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz; 3264 hblock_sz = push_sz + prefix_sz + headers_sz; 3265 if (!stream_activate_hq_frame(stream, 3266 stream->sm_payload + stream->sm_n_buffered + push_sz, 3267 HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz)) 3268 return -1; 3269 3270 if (qwh == QWH_FULL) 3271 { 3272 stream->sm_send_headers_state = SSHS_HBLOCK_SENDING; 3273 if (lsquic_stream_write_avail(stream)) 3274 { 3275 nw = stream_write_buf(stream, header_block, hblock_sz); 3276 if (nw < 0) 3277 { 3278 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 3279 return -1; 3280 } 3281 if ((size_t) nw == hblock_sz) 3282 { 3283 stream->stream_flags |= STREAM_HEADERS_SENT; 3284 stream_hblock_sent(stream); 3285 LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz); 3286 return 0; 3287 } 3288 LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw); 3289 } 3290 else 3291 { 3292 LSQ_DEBUG("cannot write to stream, stash all %zu bytes of " 3293 "header block", hblock_sz); 3294 nw = 0; 3295 } 3296 } 3297 else 3298 { 3299 stream->sm_send_headers_state = SSHS_ENC_SENDING; 3300 nw = 0; 3301 } 3302 3303 stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 3304 stream_wantwrite(stream, 1); 3305 3306 stream->sm_header_block = malloc(hblock_sz - (size_t) nw); 3307 if (!stream->sm_header_block) 3308 { 3309 LSQ_WARN("cannot allocate %zd bytes to stash %s header block", 3310 hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial"); 3311 return -1; 3312 } 3313 memcpy(stream->sm_header_block, header_block + (size_t) nw, 3314 hblock_sz - (size_t) nw); 3315 stream->sm_hblock_sz = hblock_sz - (size_t) nw; 3316 stream->sm_hblock_off = 0; 3317 LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz); 3318 return 0; 3319} 3320 3321 3322static int 3323send_headers_gquic (struct lsquic_stream *stream, 3324 const struct lsquic_http_headers *headers, int eos) 3325{ 3326 int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs, 3327 stream->id, headers, eos, lsquic_stream_priority(stream)); 3328 if (0 == s) 3329 { 3330 SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER); 3331 stream->stream_flags |= STREAM_HEADERS_SENT; 3332 if (eos) 3333 stream->stream_flags |= STREAM_FIN_SENT; 3334 LSQ_INFO("sent headers"); 3335 } 3336 else 3337 LSQ_WARN("could not send headers: %s", strerror(errno)); 3338 return s; 3339} 3340 3341 3342int 3343lsquic_stream_send_headers (lsquic_stream_t *stream, 3344 const lsquic_http_headers_t *headers, int eos) 3345{ 3346 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3347 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE))) 3348 { 3349 if (stream->sm_bflags & SMBF_IETF) 3350 return send_headers_ietf(stream, headers, eos); 3351 else 3352 return send_headers_gquic(stream, headers, eos); 3353 } 3354 else 3355 { 3356 LSQ_INFO("cannot send headers in this state"); 3357 errno = EBADMSG; 3358 return -1; 3359 } 3360} 3361 3362 3363void 3364lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset) 3365{ 3366 if (offset > stream->max_send_off) 3367 { 3368 SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE); 3369 LSQ_DEBUG("update max send offset from 0x%"PRIX64" to " 3370 "0x%"PRIX64, stream->max_send_off, offset); 3371 stream->max_send_off = offset; 3372 } 3373 else 3374 LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old " 3375 "max send offset 0x%"PRIX64", ignoring", offset, 3376 stream->max_send_off); 3377} 3378 3379 3380/* This function is used to update offsets after handshake completes and we 3381 * learn of peer's limits from the handshake values. 3382 */ 3383int 3384lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset) 3385{ 3386 LSQ_DEBUG("setting max_send_off to %"PRIu64, offset); 3387 if (offset > stream->max_send_off) 3388 { 3389 lsquic_stream_window_update(stream, offset); 3390 return 0; 3391 } 3392 else if (offset < stream->tosend_off) 3393 { 3394 LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of " 3395 "data already sent on this stream (%"PRIu64" bytes)", offset, 3396 stream->tosend_off); 3397 return -1; 3398 } 3399 else 3400 { 3401 stream->max_send_off = offset; 3402 return 0; 3403 } 3404} 3405 3406 3407void 3408lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code) 3409{ 3410 lsquic_stream_reset_ext(stream, error_code, 1); 3411} 3412 3413 3414void 3415lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code, 3416 int do_close) 3417{ 3418 if ((stream->stream_flags & STREAM_RST_SENT) 3419 || (stream->sm_qflags & SMQF_SEND_RST)) 3420 { 3421 LSQ_INFO("reset already sent"); 3422 return; 3423 } 3424 3425 SM_HISTORY_APPEND(stream, SHE_RESET); 3426 3427 LSQ_INFO("reset, error code %"PRIu64, error_code); 3428 stream->error_code = error_code; 3429 3430 if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) 3431 TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, 3432 next_send_stream); 3433 stream->sm_qflags &= ~SMQF_SENDING_FLAGS; 3434 stream->sm_qflags |= SMQF_SEND_RST; 3435 3436 if (stream->sm_qflags & SMQF_QPACK_DEC) 3437 { 3438 lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream); 3439 stream->sm_qflags |= ~SMQF_QPACK_DEC; 3440 } 3441 3442 drop_buffered_data(stream); 3443 maybe_elide_stream_frames(stream); 3444 maybe_schedule_call_on_close(stream); 3445 3446 if (do_close) 3447 lsquic_stream_close(stream); 3448 else 3449 maybe_conn_to_tickable_if_writeable(stream, 1); 3450} 3451 3452 3453lsquic_stream_id_t 3454lsquic_stream_id (const lsquic_stream_t *stream) 3455{ 3456 return stream->id; 3457} 3458 3459 3460#if !defined(NDEBUG) && __GNUC__ 3461__attribute__((weak)) 3462#endif 3463struct lsquic_conn * 3464lsquic_stream_conn (const lsquic_stream_t *stream) 3465{ 3466 return stream->conn_pub->lconn; 3467} 3468 3469 3470int 3471lsquic_stream_close (lsquic_stream_t *stream) 3472{ 3473 LSQ_DEBUG("lsquic_stream_close() called"); 3474 SM_HISTORY_APPEND(stream, SHE_CLOSE); 3475 if (lsquic_stream_is_closed(stream)) 3476 { 3477 LSQ_INFO("Attempt to close an already-closed stream"); 3478 errno = EBADF; 3479 return -1; 3480 } 3481 maybe_stream_shutdown_write(stream); 3482 stream_shutdown_read(stream); 3483 maybe_schedule_call_on_close(stream); 3484 maybe_finish_stream(stream); 3485 if (!(stream->stream_flags & STREAM_DELAYED_SW)) 3486 maybe_conn_to_tickable_if_writeable(stream, 1); 3487 return 0; 3488} 3489 3490 3491#ifndef NDEBUG 3492#if __GNUC__ 3493__attribute__((weak)) 3494#endif 3495#endif 3496void 3497lsquic_stream_acked (struct lsquic_stream *stream, 3498 enum quic_frame_type frame_type) 3499{ 3500 assert(stream->n_unacked); 3501 --stream->n_unacked; 3502 LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked); 3503 if (frame_type == QUIC_FRAME_RST_STREAM) 3504 { 3505 SM_HISTORY_APPEND(stream, SHE_RST_ACKED); 3506 LSQ_DEBUG("RESET that we sent has been acked by peer"); 3507 stream->stream_flags |= STREAM_RST_ACKED; 3508 } 3509 if (0 == stream->n_unacked) 3510 maybe_finish_stream(stream); 3511} 3512 3513 3514void 3515lsquic_stream_push_req (lsquic_stream_t *stream, 3516 struct uncompressed_headers *push_req) 3517{ 3518 assert(!stream->push_req); 3519 stream->push_req = push_req; 3520 stream->stream_flags |= STREAM_U_WRITE_DONE; /* Writing not allowed */ 3521} 3522 3523 3524int 3525lsquic_stream_is_pushed (const lsquic_stream_t *stream) 3526{ 3527 enum stream_id_type sit; 3528 3529 if (stream->sm_bflags & SMBF_IETF) 3530 { 3531 sit = stream->id & SIT_MASK; 3532 return sit == SIT_UNI_SERVER; 3533 } 3534 else 3535 return 1 & ~stream->id; 3536} 3537 3538 3539int 3540lsquic_stream_push_info (const lsquic_stream_t *stream, 3541 lsquic_stream_id_t *ref_stream_id, void **hset) 3542{ 3543 if (lsquic_stream_is_pushed(stream)) 3544 { 3545 assert(stream->push_req); 3546 *ref_stream_id = stream->push_req->uh_stream_id; 3547 *hset = stream->push_req->uh_hset; 3548 return 0; 3549 } 3550 else 3551 return -1; 3552} 3553 3554 3555int 3556lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh) 3557{ 3558 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3559 && !(stream->stream_flags & STREAM_HAVE_UH)) 3560 { 3561 SM_HISTORY_APPEND(stream, SHE_HEADERS_IN); 3562 LSQ_DEBUG("received uncompressed headers"); 3563 stream->stream_flags |= STREAM_HAVE_UH; 3564 if (uh->uh_flags & UH_FIN) 3565 { 3566 /* IETF QUIC only sets UH_FIN for a pushed stream on the server to 3567 * mark request as done: 3568 */ 3569 if (stream->sm_bflags & SMBF_IETF) 3570 assert((stream->sm_bflags & SMBF_SERVER) 3571 && lsquic_stream_is_pushed(stream)); 3572 stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN; 3573 } 3574 stream->uh = uh; 3575 if (uh->uh_oth_stream_id == 0) 3576 { 3577 if (uh->uh_weight) 3578 lsquic_stream_set_priority_internal(stream, uh->uh_weight); 3579 } 3580 else 3581 LSQ_NOTICE("don't know how to depend on stream %"PRIu64, 3582 uh->uh_oth_stream_id); 3583 return 0; 3584 } 3585 else 3586 { 3587 LSQ_ERROR("received unexpected uncompressed headers"); 3588 return -1; 3589 } 3590} 3591 3592 3593unsigned 3594lsquic_stream_priority (const lsquic_stream_t *stream) 3595{ 3596 return 256 - stream->sm_priority; 3597} 3598 3599 3600int 3601lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority) 3602{ 3603 /* The user should never get a reference to the special streams, 3604 * but let's check just in case: 3605 */ 3606 if (lsquic_stream_is_critical(stream)) 3607 return -1; 3608 if (priority < 1 || priority > 256) 3609 return -1; 3610 stream->sm_priority = 256 - priority; 3611 lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl); 3612 LSQ_DEBUG("set priority to %u", priority); 3613 SM_HISTORY_APPEND(stream, SHE_SET_PRIO); 3614 return 0; 3615} 3616 3617 3618static int 3619maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority) 3620{ 3621 if ((stream->sm_bflags & SMBF_USE_HEADERS) 3622 && (stream->stream_flags & STREAM_HEADERS_SENT)) 3623 { 3624 /* We need to send headers only if we are a) using HEADERS stream 3625 * and b) we already sent initial headers. If initial headers 3626 * have not been sent yet, stream priority will be sent in the 3627 * HEADERS frame. 3628 */ 3629 return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs, 3630 stream->id, 0, 0, priority); 3631 } 3632 else 3633 return 0; 3634} 3635 3636 3637static int 3638send_priority_ietf (struct lsquic_stream *stream, unsigned priority) 3639{ 3640 LSQ_WARN("%s: TODO", __func__); /* TODO */ 3641 return -1; 3642} 3643 3644 3645int 3646lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority) 3647{ 3648 if (0 == lsquic_stream_set_priority_internal(stream, priority)) 3649 { 3650 if (stream->sm_bflags & SMBF_IETF) 3651 return send_priority_ietf(stream, priority); 3652 else 3653 return maybe_send_priority_gquic(stream, priority); 3654 } 3655 else 3656 return -1; 3657} 3658 3659 3660lsquic_stream_ctx_t * 3661lsquic_stream_get_ctx (const lsquic_stream_t *stream) 3662{ 3663 return stream->st_ctx; 3664} 3665 3666 3667int 3668lsquic_stream_refuse_push (lsquic_stream_t *stream) 3669{ 3670 if (lsquic_stream_is_pushed(stream) 3671 && !(stream->sm_qflags & SMQF_SEND_RST) 3672 && !(stream->stream_flags & STREAM_RST_SENT)) 3673 { 3674 LSQ_DEBUG("refusing pushed stream: send reset"); 3675 lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1); 3676 return 0; 3677 } 3678 else 3679 return -1; 3680} 3681 3682 3683size_t 3684lsquic_stream_mem_used (const struct lsquic_stream *stream) 3685{ 3686 size_t size; 3687 3688 size = sizeof(stream); 3689 if (stream->sm_buf) 3690 size += stream->sm_n_allocated; 3691 if (stream->data_in) 3692 size += stream->data_in->di_if->di_mem_used(stream->data_in); 3693 3694 return size; 3695} 3696 3697 3698const lsquic_cid_t * 3699lsquic_stream_cid (const struct lsquic_stream *stream) 3700{ 3701 return LSQUIC_LOG_CONN_ID; 3702} 3703 3704 3705void 3706lsquic_stream_dump_state (const struct lsquic_stream *stream) 3707{ 3708 LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags, 3709 stream->read_offset); 3710 stream->data_in->di_if->di_dump_state(stream->data_in); 3711} 3712 3713 3714void * 3715lsquic_stream_get_hset (struct lsquic_stream *stream) 3716{ 3717 void *hset; 3718 3719 if (lsquic_stream_is_reset(stream)) 3720 { 3721 LSQ_INFO("%s: stream is reset, no headers returned", __func__); 3722 errno = ECONNRESET; 3723 return NULL; 3724 } 3725 3726 if (!((stream->sm_bflags & SMBF_USE_HEADERS) 3727 && (stream->stream_flags & STREAM_HAVE_UH))) 3728 { 3729 LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__, 3730 stream->stream_flags); 3731 return NULL; 3732 } 3733 3734 if (!stream->uh) 3735 { 3736 LSQ_INFO("%s: headers unavailable (already fetched?)", __func__); 3737 return NULL; 3738 } 3739 3740 if (stream->uh->uh_flags & UH_H1H) 3741 { 3742 LSQ_INFO("%s: uncompressed headers have internal format", __func__); 3743 return NULL; 3744 } 3745 3746 hset = stream->uh->uh_hset; 3747 stream->uh->uh_hset = NULL; 3748 destroy_uh(stream); 3749 if (stream->stream_flags & STREAM_HEAD_IN_FIN) 3750 { 3751 stream->stream_flags |= STREAM_FIN_REACHED; 3752 SM_HISTORY_APPEND(stream, SHE_REACH_FIN); 3753 } 3754 LSQ_DEBUG("return header set"); 3755 return hset; 3756} 3757 3758 3759/* GQUIC-only function */ 3760int 3761lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id) 3762{ 3763 return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE 3764 || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http); 3765} 3766 3767 3768int 3769lsquic_stream_is_critical (const struct lsquic_stream *stream) 3770{ 3771 return stream->sm_bflags & SMBF_CRITICAL; 3772} 3773 3774 3775void 3776lsquic_stream_set_stream_if (struct lsquic_stream *stream, 3777 const struct lsquic_stream_if *stream_if, void *stream_if_ctx) 3778{ 3779 SM_HISTORY_APPEND(stream, SHE_IF_SWITCH); 3780 stream->stream_if = stream_if; 3781 stream->sm_onnew_arg = stream_if_ctx; 3782 LSQ_DEBUG("switched interface"); 3783 assert(stream->stream_flags & STREAM_ONNEW_DONE); 3784 stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg, 3785 stream); 3786} 3787 3788 3789static int 3790update_type_hist_and_check (struct hq_filter *filter) 3791{ 3792 /* 3-bit codes: */ 3793 enum { 3794 CODE_UNSET, 3795 CODE_HEADER, /* H Header */ 3796 CODE_DATA, /* D Data */ 3797 CODE_PLUS, /* + Plus: meaning previous frame repeats */ 3798 }; 3799 static const unsigned valid_seqs[] = { 3800 /* Ordered by expected frequency */ 3801 0123, /* HD+ */ 3802 012, /* HD */ 3803 01, /* H */ 3804 01231, /* HD+H */ 3805 0121, /* HDH */ 3806 }; 3807 unsigned code, i; 3808 3809 switch (filter->hqfi_type) 3810 { 3811 case HQFT_HEADERS: 3812 code = CODE_HEADER; 3813 break; 3814 case HQFT_DATA: 3815 code = CODE_DATA; 3816 break; 3817 default: 3818 /* Ignore unknown frames */ 3819 return 0; 3820 } 3821 3822 if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES) 3823 return -1; 3824 3825 if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code) 3826 { 3827 filter->hqfi_hist_buf <<= 3; 3828 filter->hqfi_hist_buf |= CODE_PLUS; 3829 filter->hqfi_hist_idx++; 3830 } 3831 else if (filter->hqfi_hist_idx > 1 3832 && ((filter->hqfi_hist_buf >> 3) & 7) == code 3833 && (filter->hqfi_hist_buf & 7) == CODE_PLUS) 3834 /* Keep it at plus, do nothing */; 3835 else 3836 { 3837 filter->hqfi_hist_buf <<= 3; 3838 filter->hqfi_hist_buf |= code; 3839 filter->hqfi_hist_idx++; 3840 } 3841 3842 for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i) 3843 if (filter->hqfi_hist_buf == valid_seqs[i]) 3844 return 0; 3845 3846 return -1; 3847} 3848 3849 3850static size_t 3851hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin) 3852{ 3853 struct lsquic_stream *const stream = ctx; 3854 struct hq_filter *const filter = &stream->sm_hq_filter; 3855 const unsigned char *p = buf, *prev; 3856 const unsigned char *const end = buf + sz; 3857 struct lsquic_conn *lconn; 3858 enum lsqpack_read_header_status rhs; 3859 int s; 3860 3861 while (p < end) 3862 { 3863 switch (filter->hqfi_state) 3864 { 3865 case HQFI_STATE_FRAME_HEADER_BEGIN: 3866 filter->hqfi_vint_state.vr2s_state = 0; 3867 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE; 3868 /* fall-through */ 3869 case HQFI_STATE_FRAME_HEADER_CONTINUE: 3870 s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state); 3871 if (s < 0) 3872 break; 3873 filter->hqfi_flags |= HQFI_FLAG_BEGIN; 3874 filter->hqfi_state = HQFI_STATE_READING_PAYLOAD; 3875 LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64, 3876 filter->hqfi_type, stream->read_offset + (unsigned) (p - buf), 3877 filter->hqfi_left); 3878 if (0 != update_type_hist_and_check(filter)) 3879 { 3880 lconn = stream->conn_pub->lconn; 3881 filter->hqfi_flags |= HQFI_FLAG_ERROR; 3882 LSQ_INFO("unexpected HTTP/3 frame sequence: %o", 3883 filter->hqfi_hist_buf); 3884 lconn->cn_if->ci_abort_error(lconn, 1, HEC_UNEXPECTED_FRAME, 3885 "unexpected HTTP/3 frame sequence on stream %"PRIu64, 3886 stream->id); 3887 goto end; 3888 } 3889 if (filter->hqfi_type == HQFT_HEADERS) 3890 { 3891 if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS)) 3892 filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS; 3893 else 3894 { 3895 filter->hqfi_type = (1ull << 62) - 1; 3896 LSQ_DEBUG("Ignoring HEADERS frame"); 3897 } 3898 } 3899 if (filter->hqfi_left > 0) 3900 { 3901 if (filter->hqfi_type == HQFT_DATA) 3902 goto end; 3903 else if (filter->hqfi_type == HQFT_PUSH_PROMISE) 3904 { 3905 lconn = stream->conn_pub->lconn; 3906 if (stream->sm_bflags & SMBF_SERVER) 3907 lconn->cn_if->ci_abort_error(lconn, 1, 3908 HEC_UNEXPECTED_FRAME, "Received PUSH_PROMISE frame " 3909 "on stream %"PRIu64" (clients are not supposed to " 3910 "send those)", stream->id); 3911 else 3912 /* Our client implementation does not support server 3913 * push. 3914 */ 3915 lconn->cn_if->ci_abort_error(lconn, 1, 3916 HEC_UNEXPECTED_FRAME, /* TODO: in ID-21 ID_ERROR? */ 3917 "Received PUSH_PROMISE frame (not supported)" 3918 "on stream %"PRIu64, stream->id); 3919 goto end; 3920 } 3921 } 3922 else 3923 { 3924 switch (filter->hqfi_type) 3925 { 3926 case HQFT_CANCEL_PUSH: 3927 case HQFT_GOAWAY: 3928 case HQFT_HEADERS: 3929 case HQFT_MAX_PUSH_ID: 3930 case HQFT_PRIORITY: 3931 case HQFT_PUSH_PROMISE: 3932 case HQFT_SETTINGS: 3933 filter->hqfi_flags |= HQFI_FLAG_ERROR; 3934 LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0", 3935 filter->hqfi_type); 3936 abort_connection(stream); /* XXX Overkill? */ 3937 goto end; 3938 default: 3939 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 3940 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 3941 break; 3942 } 3943 } 3944 break; 3945 case HQFI_STATE_READING_PAYLOAD: 3946 if (filter->hqfi_type == HQFT_DATA) 3947 goto end; 3948 sz = filter->hqfi_left; 3949 if (sz > (uintptr_t) (end - p)) 3950 sz = (uintptr_t) (end - p); 3951 switch (filter->hqfi_type) 3952 { 3953 case HQFT_HEADERS: 3954 prev = p; 3955 if (filter->hqfi_flags & HQFI_FLAG_BEGIN) 3956 { 3957 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 3958 rhs = lsquic_qdh_header_in_begin( 3959 stream->conn_pub->u.ietf.qdh, 3960 stream, filter->hqfi_left, &p, sz); 3961 } 3962 else 3963 rhs = lsquic_qdh_header_in_continue( 3964 stream->conn_pub->u.ietf.qdh, stream, &p, sz); 3965 assert(p > prev || LQRHS_ERROR == rhs); 3966 filter->hqfi_left -= p - prev; 3967 if (filter->hqfi_left == 0) 3968 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 3969 switch (rhs) 3970 { 3971 case LQRHS_DONE: 3972 assert(filter->hqfi_left == 0); 3973 stream->sm_qflags &= ~SMQF_QPACK_DEC; 3974 break; 3975 case LQRHS_NEED: 3976 stream->sm_qflags |= SMQF_QPACK_DEC; 3977 break; 3978 case LQRHS_BLOCKED: 3979 stream->sm_qflags |= SMQF_QPACK_DEC; 3980 filter->hqfi_flags |= HQFI_FLAG_BLOCKED; 3981 goto end; 3982 default: 3983 assert(LQRHS_ERROR == rhs); 3984 filter->hqfi_flags |= HQFI_FLAG_ERROR; 3985 LSQ_INFO("error processing header block"); 3986 abort_connection(stream); /* XXX Overkill? */ 3987 goto end; 3988 } 3989 break; 3990 default: 3991 /* Simply skip unknown frame type payload for now */ 3992 filter->hqfi_flags &= ~HQFI_FLAG_BEGIN; 3993 p += sz; 3994 filter->hqfi_left -= sz; 3995 if (filter->hqfi_left == 0) 3996 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 3997 break; 3998 } 3999 break; 4000 default: 4001 assert(0); 4002 goto end; 4003 } 4004 } 4005 4006 end: 4007 if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN) 4008 { 4009 LSQ_INFO("FIN at unexpected place in filter; state: %u", 4010 filter->hqfi_state); 4011 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4012/* From [draft-ietf-quic-http-16] Section 3.1: 4013 * When a stream terminates cleanly, if the last frame on 4014 * the stream was truncated, this MUST be treated as a connection error 4015 * (see HTTP_MALFORMED_FRAME in Section 8.1). 4016 */ 4017 abort_connection(stream); 4018 } 4019 4020 return p - buf; 4021} 4022 4023 4024static int 4025hq_filter_readable_now (const struct lsquic_stream *stream) 4026{ 4027 const struct hq_filter *const filter = &stream->sm_hq_filter; 4028 4029 return (filter->hqfi_type == HQFT_DATA 4030 && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD) 4031 || (filter->hqfi_flags & HQFI_FLAG_ERROR) 4032 || stream->uh 4033 || (stream->stream_flags & STREAM_FIN_REACHED) 4034 ; 4035} 4036 4037 4038static int 4039hq_filter_readable (struct lsquic_stream *stream) 4040{ 4041 struct hq_filter *const filter = &stream->sm_hq_filter; 4042 struct read_frames_status rfs; 4043 4044 if (filter->hqfi_flags & HQFI_FLAG_BLOCKED) 4045 return 0; 4046 4047 if (!hq_filter_readable_now(stream)) 4048 { 4049 rfs = read_data_frames(stream, 0, hq_read, stream); 4050 if (rfs.total_nread == 0) 4051 { 4052 if (rfs.error) 4053 { 4054 filter->hqfi_flags |= HQFI_FLAG_ERROR; 4055 abort_connection(stream); /* XXX Overkill? */ 4056 return 1; /* Collect error */ 4057 } 4058 return 0; 4059 } 4060 } 4061 4062 return hq_filter_readable_now(stream); 4063} 4064 4065 4066static size_t 4067hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame) 4068{ 4069 struct hq_filter *const filter = &stream->sm_hq_filter; 4070 size_t nr; 4071 4072 if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4073 && filter->hqfi_type == HQFT_DATA)) 4074 { 4075 nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off, 4076 data_frame->df_size - data_frame->df_read_off, 4077 data_frame->df_fin); 4078 if (nr) 4079 { 4080 stream->read_offset += nr; 4081 stream_consumed_bytes(stream); 4082 } 4083 } 4084 else 4085 nr = 0; 4086 4087 if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR)) 4088 { 4089 data_frame->df_read_off += nr; 4090 if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4091 && filter->hqfi_type == HQFT_DATA) 4092 return MIN(filter->hqfi_left, 4093 (unsigned) data_frame->df_size - data_frame->df_read_off); 4094 else 4095 { 4096 assert(data_frame->df_read_off == data_frame->df_size); 4097 return 0; 4098 } 4099 } 4100 else 4101 { 4102 data_frame->df_read_off = data_frame->df_size; 4103 return 0; 4104 } 4105} 4106 4107 4108static void 4109hq_decr_left (struct lsquic_stream *stream, size_t read) 4110{ 4111 struct hq_filter *const filter = &stream->sm_hq_filter; 4112 4113 if (read) 4114 { 4115 assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD 4116 && filter->hqfi_type == HQFT_DATA); 4117 assert(read <= filter->hqfi_left); 4118 } 4119 4120 filter->hqfi_left -= read; 4121 if (0 == filter->hqfi_left) 4122 filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN; 4123} 4124 4125 4126struct qpack_dec_hdl * 4127lsquic_stream_get_qdh (const struct lsquic_stream *stream) 4128{ 4129 return stream->conn_pub->u.ietf.qdh; 4130} 4131 4132 4133/* These are IETF QUIC states */ 4134enum stream_state_sending 4135lsquic_stream_sending_state (const struct lsquic_stream *stream) 4136{ 4137 if (0 == (stream->stream_flags & STREAM_RST_SENT)) 4138 { 4139 if (stream->stream_flags & STREAM_FIN_SENT) 4140 { 4141 if (stream->n_unacked) 4142 return SSS_DATA_SENT; 4143 else 4144 return SSS_DATA_RECVD; 4145 } 4146 else 4147 { 4148 if (stream->tosend_off 4149 || (stream->stream_flags & STREAM_BLOCKED_SENT)) 4150 return SSS_SEND; 4151 else 4152 return SSS_READY; 4153 } 4154 } 4155 else if (stream->stream_flags & STREAM_RST_ACKED) 4156 return SSS_RESET_RECVD; 4157 else 4158 return SSS_RESET_SENT; 4159} 4160 4161 4162const char *const lsquic_sss2str[] = 4163{ 4164 [SSS_READY] = "Ready", 4165 [SSS_SEND] = "Send", 4166 [SSS_DATA_SENT] = "Data Sent", 4167 [SSS_RESET_SENT] = "Reset Sent", 4168 [SSS_DATA_RECVD] = "Data Recvd", 4169 [SSS_RESET_RECVD] = "Reset Recvd", 4170}; 4171 4172 4173const char *const lsquic_ssr2str[] = 4174{ 4175 [SSR_RECV] = "Recv", 4176 [SSR_SIZE_KNOWN] = "Size Known", 4177 [SSR_DATA_RECVD] = "Data Recvd", 4178 [SSR_RESET_RECVD] = "Reset Recvd", 4179 [SSR_DATA_READ] = "Data Read", 4180 [SSR_RESET_READ] = "Reset Read", 4181}; 4182 4183 4184/* These are IETF QUIC states */ 4185enum stream_state_receiving 4186lsquic_stream_receiving_state (struct lsquic_stream *stream) 4187{ 4188 uint64_t n_bytes; 4189 4190 if (0 == (stream->stream_flags & STREAM_RST_RECVD)) 4191 { 4192 if (0 == (stream->stream_flags & STREAM_FIN_RECVD)) 4193 return SSR_RECV; 4194 if (stream->stream_flags & STREAM_FIN_REACHED) 4195 return SSR_DATA_READ; 4196 if (0 == (stream->stream_flags & STREAM_DATA_RECVD)) 4197 { 4198 n_bytes = stream->data_in->di_if->di_readable_bytes( 4199 stream->data_in, stream->read_offset); 4200 if (stream->read_offset + n_bytes == stream->sm_fin_off) 4201 { 4202 stream->stream_flags |= STREAM_DATA_RECVD; 4203 return SSR_DATA_RECVD; 4204 } 4205 else 4206 return SSR_SIZE_KNOWN; 4207 } 4208 else 4209 return SSR_DATA_RECVD; 4210 } 4211 else if (stream->stream_flags & STREAM_RST_READ) 4212 return SSR_RESET_READ; 4213 else 4214 return SSR_RESET_RECVD; 4215} 4216 4217 4218void 4219lsquic_stream_qdec_unblocked (struct lsquic_stream *stream) 4220{ 4221 struct hq_filter *const filter = &stream->sm_hq_filter; 4222 4223 assert(stream->sm_qflags & SMQF_QPACK_DEC); 4224 assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED); 4225 4226 filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED; 4227 stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED; 4228 LSQ_DEBUG("QPACK decoder unblocked"); 4229} 4230 4231 4232int 4233lsquic_stream_is_rejected (const struct lsquic_stream *stream) 4234{ 4235 return stream->stream_flags & STREAM_SS_RECVD; 4236} 4237 4238 4239int 4240lsquic_stream_can_push (const struct lsquic_stream *stream) 4241{ 4242 if (lsquic_stream_is_pushed(stream)) 4243 return 0; 4244 else if (stream->sm_bflags & SMBF_IETF) 4245 return (stream->sm_bflags & SMBF_USE_HEADERS) 4246 && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH)) 4247 && stream->sm_send_headers_state == SSHS_BEGIN 4248 ; 4249 else 4250 return 1; 4251} 4252 4253 4254static size_t 4255dp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4256{ 4257 struct lsquic_stream *const stream = lsqr_ctx; 4258 unsigned char *dst = buf; 4259 unsigned char *const end = buf + count; 4260 size_t len; 4261 4262 len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off), 4263 (size_t) (end - dst)); 4264 memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len); 4265 stream->sm_dup_push_off += len; 4266 4267 if (stream->sm_dup_push_len == stream->sm_dup_push_off) 4268 LSQ_DEBUG("finish writing duplicate push"); 4269 4270 return len; 4271} 4272 4273 4274static size_t 4275dp_reader_size (void *lsqr_ctx) 4276{ 4277 struct lsquic_stream *const stream = lsqr_ctx; 4278 4279 return stream->sm_dup_push_len - stream->sm_dup_push_off; 4280} 4281 4282 4283static void 4284init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader) 4285{ 4286 reader->lsqr_read = dp_reader_read; 4287 reader->lsqr_size = dp_reader_size; 4288 reader->lsqr_ctx = stream; 4289} 4290 4291 4292static void 4293on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4294{ 4295 struct lsquic_reader dp_reader; 4296 ssize_t nw; 4297 int want_write; 4298 4299 assert(stream->sm_dup_push_off < stream->sm_dup_push_len); 4300 4301 init_dp_reader(stream, &dp_reader); 4302 nw = stream_write(stream, &dp_reader); 4303 if (nw > 0) 4304 { 4305 LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)", 4306 nw, stream->sm_dup_push_off == stream->sm_dup_push_len ? 4307 "done" : "not done"); 4308 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4309 { 4310 /* Restore want_write flag */ 4311 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4312 if (want_write != stream->sm_saved_want_write) 4313 (void) lsquic_stream_wantwrite(stream, 4314 stream->sm_saved_want_write); 4315 } 4316 } 4317 else if (nw < 0) 4318 { 4319 LSQ_WARN("could not write duplicate push (wrapper)"); 4320 /* XXX What should happen if we hit an error? TODO */ 4321 } 4322} 4323 4324 4325int 4326lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id) 4327{ 4328 struct lsquic_reader dp_reader; 4329 unsigned bits, len; 4330 ssize_t nw; 4331 4332 assert(stream->sm_bflags & SMBF_IETF); 4333 assert(lsquic_stream_can_push(stream)); 4334 4335 bits = vint_val2bits(push_id); 4336 len = 1 << bits; 4337 4338 if (!stream_activate_hq_frame(stream, 4339 stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH, 4340 SHF_FIXED_SIZE, len)) 4341 return -1; 4342 4343 stream->stream_flags |= STREAM_PUSHING; 4344 4345 stream->sm_dup_push_len = len; 4346 stream->sm_dup_push_off = 0; 4347 vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits); 4348 4349 init_dp_reader(stream, &dp_reader); 4350 nw = stream_write(stream, &dp_reader); 4351 if (nw > 0) 4352 { 4353 if (stream->sm_dup_push_off == stream->sm_dup_push_len) 4354 LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id); 4355 else 4356 { 4357 LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id); 4358 stream->stream_flags |= STREAM_NOPUSH; 4359 stream->sm_saved_want_write = 4360 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4361 stream_wantwrite(stream, 1); 4362 } 4363 return 0; 4364 } 4365 else 4366 { 4367 if (nw < 0) 4368 LSQ_WARN("failure writing DUPLICATE_PUSH"); 4369 stream->stream_flags |= STREAM_NOPUSH; 4370 stream->stream_flags &= ~STREAM_PUSHING; 4371 return -1; 4372 } 4373} 4374 4375 4376static size_t 4377pp_reader_read (void *lsqr_ctx, void *buf, size_t count) 4378{ 4379 struct push_promise *const promise = lsqr_ctx; 4380 unsigned char *dst = buf; 4381 unsigned char *const end = buf + count; 4382 size_t len; 4383 4384 while (dst < end) 4385 { 4386 switch (promise->pp_write_state) 4387 { 4388 case PPWS_ID0: 4389 case PPWS_ID1: 4390 case PPWS_ID2: 4391 case PPWS_ID3: 4392 case PPWS_ID4: 4393 case PPWS_ID5: 4394 case PPWS_ID6: 4395 case PPWS_ID7: 4396 *dst++ = promise->pp_encoded_push_id[promise->pp_write_state]; 4397 ++promise->pp_write_state; 4398 break; 4399 case PPWS_PFX0: 4400 *dst++ = 0; 4401 ++promise->pp_write_state; 4402 break; 4403 case PPWS_PFX1: 4404 *dst++ = 0; 4405 ++promise->pp_write_state; 4406 break; 4407 case PPWS_HBLOCK: 4408 len = MIN(promise->pp_content_len - promise->pp_write_off, 4409 (size_t) (end - dst)); 4410 memcpy(dst, promise->pp_content_buf + promise->pp_write_off, 4411 len); 4412 promise->pp_write_off += len; 4413 dst += len; 4414 if (promise->pp_content_len == promise->pp_write_off) 4415 { 4416 LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64 4417 ": reset push state", promise->pp_id); 4418 promise->pp_write_state = PPWS_DONE; 4419 } 4420 goto end; 4421 default: 4422 goto end; 4423 } 4424 } 4425 4426 end: 4427 return dst - (unsigned char *) buf; 4428} 4429 4430 4431static size_t 4432pp_reader_size (void *lsqr_ctx) 4433{ 4434 struct push_promise *const promise = lsqr_ctx; 4435 size_t size; 4436 4437 size = 0; 4438 switch (promise->pp_write_state) 4439 { 4440 case PPWS_ID0: 4441 case PPWS_ID1: 4442 case PPWS_ID2: 4443 case PPWS_ID3: 4444 case PPWS_ID4: 4445 case PPWS_ID5: 4446 case PPWS_ID6: 4447 case PPWS_ID7: 4448 size += 8 - promise->pp_write_state; 4449 case PPWS_PFX0: 4450 ++size; 4451 /* fall-through */ 4452 case PPWS_PFX1: 4453 ++size; 4454 /* fall-through */ 4455 case PPWS_HBLOCK: 4456 size += promise->pp_content_len - promise->pp_write_off; 4457 break; 4458 default: 4459 break; 4460 } 4461 4462 return size; 4463} 4464 4465 4466static void 4467init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader) 4468{ 4469 reader->lsqr_read = pp_reader_read; 4470 reader->lsqr_size = pp_reader_size; 4471 reader->lsqr_ctx = promise; 4472} 4473 4474 4475static void 4476on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) 4477{ 4478 struct lsquic_reader pp_reader; 4479 struct push_promise *promise; 4480 ssize_t nw; 4481 int want_write; 4482 4483 assert(stream_is_pushing_promise(stream)); 4484 4485 promise = SLIST_FIRST(&stream->sm_promises); 4486 init_pp_reader(promise, &pp_reader); 4487 nw = stream_write(stream, &pp_reader); 4488 if (nw > 0) 4489 { 4490 LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", 4491 nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done"); 4492 if (promise->pp_write_state == PPWS_DONE) 4493 { 4494 /* Restore want_write flag */ 4495 want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE); 4496 if (want_write != stream->sm_saved_want_write) 4497 (void) lsquic_stream_wantwrite(stream, 4498 stream->sm_saved_want_write); 4499 } 4500 } 4501 else if (nw < 0) 4502 { 4503 LSQ_WARN("could not write push promise (wrapper)"); 4504 /* XXX What should happen if we hit an error? TODO */ 4505 } 4506} 4507 4508 4509/* Success means that the push promise has been placed on sm_promises list and 4510 * the stream now owns it. Failure means that the push promise should be 4511 * destroyed by the caller. 4512 * 4513 * A push promise is written immediately. If it cannot be written to packets 4514 * or buffered whole, the stream is marked as unable to push further promises. 4515 */ 4516int 4517lsquic_stream_push_promise (struct lsquic_stream *stream, 4518 struct push_promise *promise) 4519{ 4520 struct lsquic_reader pp_reader; 4521 unsigned bits, len; 4522 ssize_t nw; 4523 4524 assert(stream->sm_bflags & SMBF_IETF); 4525 assert(lsquic_stream_can_push(stream)); 4526 4527 bits = vint_val2bits(promise->pp_id); 4528 len = 1 << bits; 4529 promise->pp_write_state = 8 - len; 4530 vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id, 4531 bits, 1 << bits); 4532 4533 if (!stream_activate_hq_frame(stream, 4534 stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE, 4535 SHF_FIXED_SIZE, pp_reader_size(promise))) 4536 return -1; 4537 4538 stream->stream_flags |= STREAM_PUSHING; 4539 4540 init_pp_reader(promise, &pp_reader); 4541 nw = stream_write(stream, &pp_reader); 4542 if (nw > 0) 4543 { 4544 SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); 4545 ++promise->pp_refcnt; 4546 if (promise->pp_write_state == PPWS_DONE) 4547 LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id); 4548 else 4549 { 4550 LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)" 4551 ", disable further pushing", promise->pp_id, 4552 promise->pp_write_state, promise->pp_write_off); 4553 stream->stream_flags |= STREAM_NOPUSH; 4554 stream->sm_saved_want_write = 4555 !!(stream->sm_qflags & SMQF_WANT_WRITE); 4556 stream_wantwrite(stream, 1); 4557 } 4558 return 0; 4559 } 4560 else 4561 { 4562 if (nw < 0) 4563 LSQ_WARN("failure writing push promise"); 4564 stream->stream_flags |= STREAM_NOPUSH; 4565 stream->stream_flags &= ~STREAM_PUSHING; 4566 return -1; 4567 } 4568} 4569