lsquic_qdec_hdl.c revision 758aff32
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qdec_hdl.c -- QPACK decoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#include "lsquic.h" 14#include "lsquic_types.h" 15#include "lsxpack_header.h" 16#include "lsquic_int_types.h" 17#include "lsquic_sfcw.h" 18#include "lsquic_varint.h" 19#include "lsquic_hq.h" 20#include "lsquic_hash.h" 21#include "lsquic_stream.h" 22#include "lsquic_frab_list.h" 23#include "lsqpack.h" 24#include "lsquic_http1x_if.h" 25#include "lsquic_qdec_hdl.h" 26#include "lsquic_mm.h" 27#include "lsquic_engine_public.h" 28#include "lsquic_headers.h" 29#include "lsquic_conn.h" 30#include "lsquic_conn_flow.h" 31#include "lsquic_rtt.h" 32#include "lsquic_conn_public.h" 33#include "lsquic_hq.h" 34#include "lsquic_parse.h" 35#include "lsquic_qpack_exp.h" 36#include "lsquic_util.h" 37 38#define LSQUIC_LOGGER_MODULE LSQLM_QDEC_HDL 39#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qdh->qdh_conn) 40#include "lsquic_logger.h" 41 42static const struct lsqpack_dec_hset_if dhi_if; 43 44 45struct header_ctx 46{ 47 void *hset; 48 struct qpack_dec_hdl *qdh; 49 enum ppc_flags ppc_flags; 50 struct lsquic_ext_http_prio ehp; 51}; 52 53 54/* We need to allocate struct uncompressed_headers anyway when header set 55 * is complete and we give it to the stream using lsquic_stream_uh_in(). 56 * To save a malloc, we reuse context after we're done with it. 57 */ 58union hblock_ctx 59{ 60 struct header_ctx ctx; 61 struct uncompressed_headers uh; 62}; 63 64 65static int 66qdh_write_decoder (struct qpack_dec_hdl *qdh, const unsigned char *buf, 67 size_t sz) 68{ 69 ssize_t nw; 70 71 if (!(qdh->qdh_dec_sm_out && lsquic_frab_list_empty(&qdh->qdh_fral))) 72 { 73 write_to_frab: 74 if (0 == lsquic_frab_list_write(&qdh->qdh_fral, 75 (unsigned char *) buf, sz)) 76 { 77 LSQ_DEBUG("wrote %zu bytes to frab list", sz); 78 lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1); 79 return 0; 80 } 81 else 82 { 83 LSQ_INFO("error writing to frab list"); 84 return -1; 85 } 86 } 87 88 nw = lsquic_stream_write(qdh->qdh_dec_sm_out, buf, sz); 89 if (nw < 0) 90 { 91 LSQ_INFO("error writing to outgoing QPACK decoder stream: %s", 92 strerror(errno)); 93 return -1; 94 } 95 LSQ_DEBUG("wrote %zd bytes to outgoing QPACK decoder stream", nw); 96 97 if ((size_t) nw == sz) 98 return 0; 99 100 buf = buf + nw; 101 sz -= (size_t) nw; 102 goto write_to_frab; 103} 104 105 106static int 107qdh_write_type (struct qpack_dec_hdl *qdh) 108{ 109 int s; 110 111#ifndef NDEBUG 112 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 113 if (env && atoi(env)) 114 { 115 s = rand() & 3; 116 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 117 } 118 else 119#endif 120 s = 0; 121 122 switch (s) 123 { 124 case 0: 125 return qdh_write_decoder(qdh, 126 (unsigned char []) { HQUST_QPACK_DEC }, 1); 127 case 1: 128 return qdh_write_decoder(qdh, 129 (unsigned char []) { 0x40, HQUST_QPACK_DEC }, 2); 130 case 2: 131 return qdh_write_decoder(qdh, 132 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_DEC }, 4); 133 default: 134 return qdh_write_decoder(qdh, 135 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 136 HQUST_QPACK_DEC }, 8); 137 } 138} 139 140 141static void 142qdh_begin_out (struct qpack_dec_hdl *qdh) 143{ 144 if (0 != qdh_write_type(qdh)) 145 { 146 LSQ_WARN("%s: could not write to decoder", __func__); 147 qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn, 148 "cannot write to decoder stream"); 149 } 150} 151 152 153int 154lsquic_qdh_init (struct qpack_dec_hdl *qdh, struct lsquic_conn *conn, 155 int is_server, const struct lsquic_engine_public *enpub, 156 unsigned dyn_table_size, unsigned max_risked_streams) 157{ 158 enum lsqpack_dec_opts dec_opts; 159 160 dec_opts = 0; 161 if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HTTP1X) 162 dec_opts |= LSQPACK_DEC_OPT_HTTP1X; 163 if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAME) 164 dec_opts |= LSQPACK_DEC_OPT_HASH_NAME; 165 if (enpub->enp_hsi_if->hsi_flags & LSQUIC_HSI_HASH_NAMEVAL) 166 dec_opts |= LSQPACK_DEC_OPT_HASH_NAMEVAL; 167 168 if (enpub->enp_settings.es_qpack_experiment) 169 { 170 qdh->qdh_exp_rec = lsquic_qpack_exp_new(); 171 if (qdh->qdh_exp_rec) 172 { 173 if (conn->cn_flags & LSCONN_SERVER) 174 qdh->qdh_exp_rec->qer_flags |= QER_SERVER; 175 qdh->qdh_exp_rec->qer_used_max_size = dyn_table_size; 176 qdh->qdh_exp_rec->qer_used_max_blocked = max_risked_streams; 177 } 178 } 179 180 qdh->qdh_conn = conn; 181 lsquic_frab_list_init(&qdh->qdh_fral, 0x400, NULL, NULL, NULL); 182 lsqpack_dec_init(&qdh->qdh_decoder, (void *) conn, dyn_table_size, 183 max_risked_streams, &dhi_if, dec_opts); 184 qdh->qdh_flags |= QDH_INITIALIZED; 185 qdh->qdh_enpub = enpub; 186 if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if) 187 { 188 qdh->qdh_h1x_ctor_ctx = (struct http1x_ctor_ctx) { 189 .conn = conn, 190 .max_headers_sz = MAX_HTTP1X_HEADERS_SIZE, 191 .is_server = is_server, 192 }; 193 qdh->qdh_hsi_ctx = &qdh->qdh_h1x_ctor_ctx; 194 } 195 else 196 qdh->qdh_hsi_ctx = qdh->qdh_enpub->enp_hsi_ctx; 197 if (qdh->qdh_dec_sm_out) 198 qdh_begin_out(qdh); 199 if (qdh->qdh_enc_sm_in) 200 lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1); 201 LSQ_DEBUG("initialized"); 202 return 0; 203} 204 205 206static void 207qdh_log_and_clean_exp_rec (struct qpack_dec_hdl *qdh) 208{ 209 char buf[0x400]; 210 211 qdh->qdh_exp_rec->qer_comp_ratio = lsqpack_dec_ratio(&qdh->qdh_decoder); 212 /* Naughty: poking inside the decoder, it's not exposed. (Should it be?) */ 213 qdh->qdh_exp_rec->qer_peer_max_size = qdh->qdh_decoder.qpd_cur_max_capacity; 214 (void) lsquic_qpack_exp_to_xml(qdh->qdh_exp_rec, buf, sizeof(buf)); 215 LSQ_NOTICE("%s", buf); 216 lsquic_qpack_exp_destroy(qdh->qdh_exp_rec); 217 qdh->qdh_exp_rec = NULL; 218} 219 220 221void 222lsquic_qdh_cleanup (struct qpack_dec_hdl *qdh) 223{ 224 if (qdh->qdh_flags & QDH_INITIALIZED) 225 { 226 LSQ_DEBUG("cleanup"); 227 if (qdh->qdh_exp_rec) 228 qdh_log_and_clean_exp_rec(qdh); 229 lsqpack_dec_cleanup(&qdh->qdh_decoder); 230 lsquic_frab_list_cleanup(&qdh->qdh_fral); 231 qdh->qdh_flags &= ~QDH_INITIALIZED; 232 } 233} 234 235static lsquic_stream_ctx_t * 236qdh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 237{ 238 struct qpack_dec_hdl *const qdh = stream_if_ctx; 239 qdh->qdh_dec_sm_out = stream; 240 if (qdh->qdh_flags & QDH_INITIALIZED) 241 qdh_begin_out(qdh); 242 LSQ_DEBUG("initialized outgoing decoder stream"); 243 return (void *) qdh; 244} 245 246 247static void 248qdh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 249{ 250 struct qpack_dec_hdl *const qdh = (void *) ctx; 251 struct lsquic_reader reader; 252 ssize_t nw; 253 unsigned char buf[LSQPACK_LONGEST_ICI]; 254 255 if (lsqpack_dec_ici_pending(&qdh->qdh_decoder)) 256 { 257 nw = lsqpack_dec_write_ici(&qdh->qdh_decoder, buf, sizeof(buf)); 258 if (nw > 0) 259 { 260 if (0 == qdh_write_decoder(qdh, buf, nw)) 261 LSQ_DEBUG("wrote %zd-byte TSS instruction", nw); 262 else 263 goto err; 264 } 265 else if (nw < 0) 266 { 267 LSQ_WARN("could not generate TSS instruction"); 268 goto err; 269 } 270 } 271 272 if (lsquic_frab_list_empty(&qdh->qdh_fral)) 273 { 274 LSQ_DEBUG("%s: nothing to write", __func__); 275 lsquic_stream_wantwrite(stream, 0); 276 return; 277 } 278 279 reader = (struct lsquic_reader) { 280 .lsqr_read = lsquic_frab_list_read, 281 .lsqr_size = lsquic_frab_list_size, 282 .lsqr_ctx = &qdh->qdh_fral, 283 }; 284 285 nw = lsquic_stream_writef(stream, &reader); 286 if (nw >= 0) 287 { 288 LSQ_DEBUG("wrote %zd bytes to stream", nw); 289 (void) lsquic_stream_flush(stream); 290 if (lsquic_frab_list_empty(&qdh->qdh_fral)) 291 { 292 lsquic_stream_wantwrite(stream, 0); 293 if (qdh->qdh_on_dec_sent_func) 294 { 295 LSQ_DEBUG("buffered data written: call callback"); 296 qdh->qdh_on_dec_sent_func(qdh->qdh_on_dec_sent_ctx); 297 qdh->qdh_on_dec_sent_func = NULL; 298 qdh->qdh_on_dec_sent_ctx = NULL; 299 } 300 } 301 } 302 else 303 { 304 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 305 err: 306 lsquic_stream_wantwrite(stream, 0); 307 qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn, 308 "cannot write to stream"); 309 } 310} 311 312 313static void 314qdh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 315{ 316 struct qpack_dec_hdl *const qdh = (void *) ctx; 317 qdh->qdh_dec_sm_out = NULL; 318 LSQ_DEBUG("closed outgoing decoder stream"); 319} 320 321 322static void 323qdh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 324{ 325 assert(0); 326} 327 328 329static const struct lsquic_stream_if qdh_dec_sm_out_if = 330{ 331 .on_new_stream = qdh_out_on_new, 332 .on_read = qdh_out_on_read, 333 .on_write = qdh_out_on_write, 334 .on_close = qdh_out_on_close, 335}; 336const struct lsquic_stream_if *const lsquic_qdh_dec_sm_out_if = 337 &qdh_dec_sm_out_if; 338 339 340static lsquic_stream_ctx_t * 341qdh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 342{ 343 struct qpack_dec_hdl *const qdh = stream_if_ctx; 344 qdh->qdh_enc_sm_in = stream; 345 if (qdh->qdh_flags & QDH_INITIALIZED) 346 lsquic_stream_wantread(qdh->qdh_enc_sm_in, 1); 347 LSQ_DEBUG("initialized incoming encoder stream"); 348 return (void *) qdh; 349} 350 351 352static size_t 353qdh_read_encoder_stream (void *ctx, const unsigned char *buf, size_t sz, 354 int fin) 355{ 356 struct qpack_dec_hdl *const qdh = (void *) ctx; 357 const struct lsqpack_dec_err *qerr; 358 int s; 359 360 if (fin) 361 { 362 LSQ_INFO("encoder stream is closed"); 363 qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1, 364 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK encoder stream"); 365 goto end; 366 } 367 368 s = lsqpack_dec_enc_in(&qdh->qdh_decoder, buf, sz); 369 if (s != 0) 370 { 371 LSQ_INFO("error reading encoder stream"); 372 qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder); 373 qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1, 374 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK encoder " 375 "stream; offset %"PRIu64", line %d", qerr->off, qerr->line); 376 goto end; 377 } 378 if (qdh->qdh_dec_sm_out 379 && lsqpack_dec_ici_pending(&qdh->qdh_decoder)) 380 lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1); 381 382 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 383 384 end: 385 return sz; 386} 387 388 389static void 390qdh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 391{ 392 struct qpack_dec_hdl *const qdh = (void *) ctx; 393 ssize_t nread; 394 395 nread = lsquic_stream_readf(stream, qdh_read_encoder_stream, qdh); 396 if (nread <= 0) 397 { 398 if (nread < 0) 399 { 400 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 401 qdh->qdh_conn->cn_if->ci_internal_error(qdh->qdh_conn, 402 "cannot read from encoder stream"); 403 } 404 else 405 { 406 LSQ_INFO("encoder stream closed by peer: abort connection"); 407 qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1, 408 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 409 } 410 lsquic_stream_wantread(stream, 0); 411 } 412} 413 414 415static void 416qdh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 417{ 418 struct qpack_dec_hdl *const qdh = (void *) ctx; 419 LSQ_DEBUG("closed incoming encoder stream"); 420 qdh->qdh_enc_sm_in = NULL; 421} 422 423 424static void 425qdh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 426{ 427 assert(0); 428} 429 430 431static const struct lsquic_stream_if qdh_enc_sm_in_if = 432{ 433 .on_new_stream = qdh_in_on_new, 434 .on_read = qdh_in_on_read, 435 .on_write = qdh_in_on_write, 436 .on_close = qdh_in_on_close, 437}; 438const struct lsquic_stream_if *const lsquic_qdh_enc_sm_in_if = 439 &qdh_enc_sm_in_if; 440 441 442static void 443qdh_hblock_unblocked (void *stream_p) 444{ 445 struct lsquic_stream *const stream = stream_p; 446 union hblock_ctx *const u = stream->sm_hblock_ctx; 447 struct qpack_dec_hdl *qdh = u->ctx.qdh; 448 449 LSQ_DEBUG("header block for stream %"PRIu64" unblocked", stream->id); 450 lsquic_stream_qdec_unblocked(stream); 451} 452 453 454struct cont_len 455{ 456 unsigned long long value; 457 int has; /* 1: set, 0: not set, -1: invalid */ 458}; 459 460 461static void 462process_content_length (const struct qpack_dec_hdl *qdh /* for logging */, 463 struct cont_len *cl, const char *val /* not NUL-terminated */, 464 unsigned len) 465{ 466 char *endcl, cont_len_buf[30]; 467 468 if (0 == cl->has) 469 { 470 if (len >= sizeof(cont_len_buf)) 471 { 472 LSQ_DEBUG("content-length has invalid value `%.*s'", 473 (int) len, val); 474 cl->has = -1; 475 return; 476 } 477 memcpy(cont_len_buf, val, len); 478 cont_len_buf[len] = '\0'; 479 cl->value = strtoull(cont_len_buf, &endcl, 10); 480 if (*endcl == '\0' && !(ULLONG_MAX == cl->value && ERANGE == errno)) 481 { 482 cl->has = 1; 483 LSQ_DEBUG("content length is %llu", cl->value); 484 } 485 else 486 { 487 cl->has = -1; 488 LSQ_DEBUG("content-length has invalid value `%.*s'", 489 (int) len, val); 490 } 491 } 492 else if (cl->has > 0) 493 { 494 LSQ_DEBUG("header set has two content-length: ambiguous, " 495 "turn off checking"); 496 cl->has = -1; 497 } 498} 499 500 501static int 502is_content_length (const struct lsxpack_header *xhdr) 503{ 504 return ((xhdr->flags & LSXPACK_QPACK_IDX) 505 && xhdr->qpack_index == LSQPACK_TNV_CONTENT_LENGTH_0) 506 || (xhdr->name_len == 14 && 0 == memcmp(lsxpack_header_get_name(xhdr), 507 "content-length", 13)) 508 ; 509} 510 511 512static int 513is_priority (const struct lsxpack_header *xhdr) 514{ 515 return xhdr->name_len == 8 516 && 0 == memcmp(lsxpack_header_get_name(xhdr), "priority", 8); 517} 518 519 520static struct lsxpack_header * 521qdh_prepare_decode (void *stream_p, struct lsxpack_header *xhdr, size_t space) 522{ 523 struct lsquic_stream *const stream = stream_p; 524 union hblock_ctx *const u = stream->sm_hblock_ctx; 525 struct qpack_dec_hdl *const qdh = u->ctx.qdh; 526 527 return qdh->qdh_enpub->enp_hsi_if->hsi_prepare_decode( 528 u->ctx.hset, xhdr, space); 529} 530 531 532static void 533qdh_maybe_set_user_agent (struct qpack_dec_hdl *qdh, 534 const struct lsxpack_header *xhdr) 535{ 536 /* Flipped: we are the *decoder* */ 537 const char *const name = qdh->qdh_exp_rec->qer_flags & QER_SERVER ? 538 "user-agent" : "server"; 539 const size_t len = qdh->qdh_exp_rec->qer_flags & QER_SERVER ? 10 : 6; 540 541 if (len == xhdr->name_len 542 && 0 == memcmp(name, lsxpack_header_get_name(xhdr), len)) 543 qdh->qdh_exp_rec->qer_user_agent 544 = strndup(lsxpack_header_get_value(xhdr), xhdr->val_len); 545} 546 547 548static int 549qdh_process_header (void *stream_p, struct lsxpack_header *xhdr) 550{ 551 struct lsquic_stream *const stream = stream_p; 552 union hblock_ctx *const u = stream->sm_hblock_ctx; 553 struct qpack_dec_hdl *const qdh = u->ctx.qdh; 554 struct cont_len cl; 555 556 if (is_content_length(xhdr)) 557 { 558 cl.has = 0; 559 process_content_length(qdh, &cl, lsxpack_header_get_value(xhdr), 560 xhdr->val_len); 561 if (cl.has > 0) 562 (void) lsquic_stream_verify_len(stream, cl.value); 563 } 564 else if ((stream->sm_bflags & (SMBF_HTTP_PRIO|SMBF_HPRIO_SET)) 565 == SMBF_HTTP_PRIO 566 && is_priority(xhdr)) 567 { 568 u->ctx.ppc_flags &= ~(PPC_INC_NAME|PPC_URG_NAME); 569 (void) lsquic_http_parse_pfv(lsxpack_header_get_value(xhdr), 570 xhdr->val_len, &u->ctx.ppc_flags, &u->ctx.ehp, 571 (char *) stream->conn_pub->mm->acki, 572 sizeof(*stream->conn_pub->mm->acki)); 573 } 574 else if (qdh->qdh_exp_rec && !qdh->qdh_exp_rec->qer_user_agent) 575 qdh_maybe_set_user_agent(qdh, xhdr); 576 577 return qdh->qdh_enpub->enp_hsi_if->hsi_process_header(u->ctx.hset, xhdr); 578} 579 580 581static const struct lsqpack_dec_hset_if dhi_if = 582{ 583 .dhi_unblocked = qdh_hblock_unblocked, 584 .dhi_prepare_decode = qdh_prepare_decode, 585 .dhi_process_header = qdh_process_header, 586}; 587 588 589static enum lsqpack_read_header_status 590qdh_header_read_results (struct qpack_dec_hdl *qdh, 591 struct lsquic_stream *stream, enum lsqpack_read_header_status rhs, 592 const unsigned char *dec_buf, size_t dec_buf_sz) 593{ 594 const struct lsqpack_dec_err *qerr; 595 struct uncompressed_headers *uh; 596 void *hset; 597 598 if (rhs == LQRHS_DONE) 599 { 600 if (!lsquic_stream_header_is_trailer(stream)) 601 { 602 if (stream->sm_hblock_ctx->ctx.ppc_flags 603 & (PPC_INC_SET|PPC_URG_SET)) 604 { 605 assert(stream->sm_bflags & SMBF_HTTP_PRIO); 606 LSQ_DEBUG("Apply Priority from headers to stream %"PRIu64, 607 stream->id); 608 (void) lsquic_stream_set_http_prio(stream, 609 &stream->sm_hblock_ctx->ctx.ehp); 610 } 611 hset = stream->sm_hblock_ctx->ctx.hset; 612 uh = &stream->sm_hblock_ctx->uh; 613 stream->sm_hblock_ctx = NULL; 614 memset(uh, 0, sizeof(*uh)); 615 uh->uh_stream_id = stream->id; 616 uh->uh_oth_stream_id = 0; 617 uh->uh_weight = 0; 618 uh->uh_exclusive = -1; 619 if (qdh->qdh_enpub->enp_hsi_if == lsquic_http1x_if) 620 uh->uh_flags |= UH_H1H; 621 if (0 != qdh->qdh_enpub->enp_hsi_if 622 ->hsi_process_header(hset, NULL)) 623 { 624 LSQ_DEBUG("finishing HTTP/1.x hset failed"); 625 free(uh); 626 return LQRHS_ERROR; 627 } 628 uh->uh_hset = hset; 629 if (0 == lsquic_stream_uh_in(stream, uh)) 630 LSQ_DEBUG("gave hset to stream %"PRIu64, stream->id); 631 else 632 { 633 LSQ_DEBUG("could not give hset to stream %"PRIu64, stream->id); 634 free(uh); 635 return LQRHS_ERROR; 636 } 637 } 638 else 639 { 640 LSQ_DEBUG("discard trailer header set"); 641 free(stream->sm_hblock_ctx); 642 stream->sm_hblock_ctx = NULL; 643 } 644 if (qdh->qdh_dec_sm_out) 645 { 646 if (dec_buf_sz 647 && 0 != qdh_write_decoder(qdh, dec_buf, dec_buf_sz)) 648 { 649 return LQRHS_ERROR; 650 } 651 if (dec_buf_sz || lsqpack_dec_ici_pending(&qdh->qdh_decoder)) 652 lsquic_stream_wantwrite(qdh->qdh_dec_sm_out, 1); 653 } 654 } 655 else if (rhs == LQRHS_ERROR) 656 { 657 qerr = lsqpack_dec_get_err_info(&qdh->qdh_decoder); 658 qdh->qdh_conn->cn_if->ci_abort_error(qdh->qdh_conn, 1, 659 HEC_QPACK_DECOMPRESSION_FAILED, "QPACK decompression error; " 660 "stream %"PRIu64", offset %"PRIu64", line %d", qerr->stream_id, 661 qerr->off, qerr->line); 662 } 663 664 return rhs; 665} 666 667 668enum lsqpack_read_header_status 669lsquic_qdh_header_in_begin (struct qpack_dec_hdl *qdh, 670 struct lsquic_stream *stream, uint64_t header_size, 671 const unsigned char **buf, size_t bufsz) 672{ 673 enum lsqpack_read_header_status rhs; 674 void *hset; 675 int is_pp; 676 size_t dec_buf_sz; 677 union hblock_ctx *u; 678 unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK]; 679 680 assert(!(stream->stream_flags & STREAM_U_READ_DONE)); 681 682 if (!(qdh->qdh_flags & QDH_INITIALIZED)) 683 { 684 LSQ_WARN("not initialized: cannot process header block"); 685 return LQRHS_ERROR; 686 } 687 688 u = malloc(sizeof(*u)); 689 if (!u) 690 { 691 LSQ_INFO("cannot allocate hblock_ctx"); 692 return LQRHS_ERROR; 693 } 694 695 is_pp = lsquic_stream_header_is_pp(stream); 696 hset = qdh->qdh_enpub->enp_hsi_if->hsi_create_header_set( 697 qdh->qdh_hsi_ctx, stream, is_pp); 698 if (!hset) 699 { 700 free(u); 701 LSQ_DEBUG("hsi_create_header_set failure"); 702 return LQRHS_ERROR; 703 } 704 705 u->ctx.hset = hset; 706 u->ctx.qdh = qdh; 707 u->ctx.ppc_flags = 0; 708 u->ctx.ehp = (struct lsquic_ext_http_prio) { 709 .urgency = LSQUIC_DEF_HTTP_URGENCY, 710 .incremental = LSQUIC_DEF_HTTP_INCREMENTAL, 711 }; 712 stream->sm_hblock_ctx = u; 713 714 if (qdh->qdh_exp_rec) 715 { 716 const lsquic_time_t now = lsquic_time_now(); 717 if (0 == qdh->qdh_exp_rec->qer_hblock_count) 718 qdh->qdh_exp_rec->qer_first_req = now; 719 qdh->qdh_exp_rec->qer_last_req = now; 720 ++qdh->qdh_exp_rec->qer_hblock_count; 721 qdh->qdh_exp_rec->qer_hblock_size += bufsz; 722 } 723 724 dec_buf_sz = sizeof(dec_buf); 725 rhs = lsqpack_dec_header_in(&qdh->qdh_decoder, stream, stream->id, 726 header_size, buf, bufsz, dec_buf, &dec_buf_sz); 727 if (qdh->qdh_exp_rec) 728 qdh->qdh_exp_rec->qer_peer_max_blocked += rhs == LQRHS_BLOCKED; 729 return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz); 730} 731 732 733enum lsqpack_read_header_status 734lsquic_qdh_header_in_continue (struct qpack_dec_hdl *qdh, 735 struct lsquic_stream *stream, const unsigned char **buf, size_t bufsz) 736{ 737 enum lsqpack_read_header_status rhs; 738 size_t dec_buf_sz; 739 unsigned char dec_buf[LSQPACK_LONGEST_HEADER_ACK]; 740 741 assert(!(stream->stream_flags & STREAM_U_READ_DONE)); 742 743 if (qdh->qdh_flags & QDH_INITIALIZED) 744 { 745 if (qdh->qdh_exp_rec) 746 qdh->qdh_exp_rec->qer_hblock_size += bufsz; 747 dec_buf_sz = sizeof(dec_buf); 748 rhs = lsqpack_dec_header_read(&qdh->qdh_decoder, stream, 749 buf, bufsz, dec_buf, &dec_buf_sz); 750 if (qdh->qdh_exp_rec) 751 qdh->qdh_exp_rec->qer_peer_max_blocked += rhs == LQRHS_BLOCKED; 752 return qdh_header_read_results(qdh, stream, rhs, dec_buf, dec_buf_sz); 753 } 754 else 755 { 756 LSQ_WARN("not initialized: cannot process header block"); 757 return LQRHS_ERROR; 758 } 759} 760 761 762static void 763lsquic_qdh_unref_stream (struct qpack_dec_hdl *qdh, 764 struct lsquic_stream *stream) 765{ 766 if (0 == lsqpack_dec_unref_stream(&qdh->qdh_decoder, stream)) 767 LSQ_DEBUG("unreffed stream %"PRIu64, stream->id); 768 else 769 LSQ_WARN("cannot unref stream %"PRIu64, stream->id); 770} 771 772 773void 774lsquic_qdh_cancel_stream (struct qpack_dec_hdl *qdh, 775 struct lsquic_stream *stream) 776{ 777 ssize_t nw; 778 unsigned char buf[LSQPACK_LONGEST_CANCEL]; 779 780 if (!qdh->qdh_dec_sm_out) 781 return; 782 783 nw = lsqpack_dec_cancel_stream(&qdh->qdh_decoder, stream, buf, sizeof(buf)); 784 if (nw > 0) 785 { 786 if (0 == qdh_write_decoder(qdh, buf, nw)) 787 LSQ_DEBUG("cancelled stream %"PRIu64" and wrote %zd-byte Cancel " 788 "Stream instruction to the decoder stream", stream->id, nw); 789 } 790 else if (nw == 0) 791 LSQ_WARN("cannot cancel stream %"PRIu64" -- not found", stream->id); 792 else 793 { 794 LSQ_WARN("cannot cancel stream %"PRIu64" -- not enough buffer space " 795 "to encode Cancel Stream instructin", stream->id); 796 lsquic_qdh_unref_stream(qdh, stream); 797 } 798} 799 800 801void 802lsquic_qdh_cancel_stream_id (struct qpack_dec_hdl *qdh, 803 lsquic_stream_id_t stream_id) 804{ 805 ssize_t nw; 806 unsigned char buf[LSQPACK_LONGEST_CANCEL]; 807 808 if (!qdh->qdh_dec_sm_out) 809 return; 810 811 nw = lsqpack_dec_cancel_stream_id(&qdh->qdh_decoder, stream_id, buf, 812 sizeof(buf)); 813 if (nw > 0) 814 { 815 if (0 == qdh_write_decoder(qdh, buf, nw)) 816 LSQ_DEBUG("wrote %zd-byte Cancel Stream instruction for " 817 "stream %"PRIu64" to the decoder stream", nw, stream_id); 818 } 819 else if (nw == 0) 820 LSQ_DEBUG("not generating Cancel Stream instruction for " 821 "stream %"PRIu64, stream_id); 822 else 823 LSQ_WARN("cannot generate Cancel Stream instruction for " 824 "stream %"PRIu64" -- not enough buffer space", stream_id); 825} 826 827 828int 829lsquic_qdh_arm_if_unsent (struct qpack_dec_hdl *qdh, void (*func)(void *), 830 void *ctx) 831{ 832 size_t bytes; 833 834 /* Use size of a single frab list buffer as an arbitrary threshold */ 835 bytes = lsquic_frab_list_size(&qdh->qdh_fral); 836 if (bytes <= qdh->qdh_fral.fl_buf_size) 837 return 0; 838 else 839 { 840 LSQ_DEBUG("have %zu bytes of unsent QPACK decoder stream data: set " 841 "up callback", bytes); 842 qdh->qdh_on_dec_sent_func = func; 843 qdh->qdh_on_dec_sent_ctx = ctx; 844 return 1; 845 } 846} 847