lsquic_qenc_hdl.c revision a74702c6
1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#ifdef WIN32 14#include <malloc.h> 15#endif 16 17#include "lsquic.h" 18#include "lsquic_types.h" 19#include "lsquic_int_types.h" 20#include "lsquic_sfcw.h" 21#include "lsquic_varint.h" 22#include "lsquic_hq.h" 23#include "lsquic_hash.h" 24#include "lsquic_stream.h" 25#include "lsquic_frab_list.h" 26#include "lsqpack.h" 27#include "lsxpack_header.h" 28#include "lsquic_conn.h" 29#include "lsquic_qpack_exp.h" 30#include "lsquic_util.h" 31#include "lsquic_qenc_hdl.h" 32 33#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL 34#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn) 35#include "lsquic_logger.h" 36 37#define QENC_MIN_DYN_TABLE_SIZE 32u 38 39static int 40qeh_write_type (struct qpack_enc_hdl *qeh) 41{ 42 int s; 43 44#ifndef NDEBUG 45 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 46 if (env && atoi(env)) 47 { 48 s = rand() & 3; 49 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 50 } 51 else 52#endif 53 s = 0; 54 55 switch (s) 56 { 57 case 0: 58 return lsquic_frab_list_write(&qeh->qeh_fral, 59 (unsigned char []) { HQUST_QPACK_ENC }, 1); 60 case 1: 61 return lsquic_frab_list_write(&qeh->qeh_fral, 62 (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2); 63 case 2: 64 return lsquic_frab_list_write(&qeh->qeh_fral, 65 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4); 66 default: 67 return lsquic_frab_list_write(&qeh->qeh_fral, 68 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 69 HQUST_QPACK_ENC }, 8); 70 } 71} 72 73 74static void 75qeh_begin_out (struct qpack_enc_hdl *qeh) 76{ 77 if (0 == qeh_write_type(qeh) 78 && (qeh->qeh_tsu_sz == 0 79 || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf, 80 qeh->qeh_tsu_sz))) 81 { 82 LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz); 83 lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1); 84 } 85 else 86 { 87 LSQ_WARN("could not write to frab list"); 88 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 89 "cannot write to frab list"); 90 } 91} 92 93 94void 95lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn) 96{ 97 assert(!(qeh->qeh_flags & QEH_INITIALIZED)); 98 qeh->qeh_conn = conn; 99 lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL); 100 lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn); 101 qeh->qeh_flags |= QEH_INITIALIZED; 102 qeh->qeh_max_prefix_size = 103 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 104 if (qeh->qeh_dec_sm_in) 105 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 106 LSQ_DEBUG("initialized"); 107} 108 109 110int 111lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size, 112 unsigned dyn_table_size, unsigned max_risked_streams, int server) 113{ 114 enum lsqpack_enc_opts enc_opts; 115 116 assert(qeh->qeh_flags & QEH_INITIALIZED); 117 118 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 119 { 120 LSQ_WARN("settings already set"); 121 return -1; 122 } 123 124 enc_opts = LSQPACK_ENC_OPT_STAGE_2 125 | (server ? LSQPACK_ENC_OPT_SERVER : 0); 126 qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf); 127 if (QENC_MIN_DYN_TABLE_SIZE > dyn_table_size) 128 dyn_table_size = 0; 129 if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn, 130 max_table_size, dyn_table_size, max_risked_streams, enc_opts, 131 qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz)) 132 { 133 LSQ_INFO("could not initialize QPACK encoder"); 134 return -1; 135 } 136 LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz); 137 qeh->qeh_flags |= QEH_HAVE_SETTINGS; 138 qeh->qeh_max_prefix_size = 139 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 140 LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked " 141 "streams=%u", max_table_size, dyn_table_size, max_risked_streams); 142 if (qeh->qeh_enc_sm_out) 143 qeh_begin_out(qeh); 144 return 0; 145} 146 147 148static void 149qeh_log_and_clean_exp_rec (struct qpack_enc_hdl *qeh) 150{ 151 char buf[0x400]; 152 153 qeh->qeh_exp_rec->qer_comp_ratio = lsqpack_enc_ratio(&qeh->qeh_encoder); 154 (void) lsquic_qpack_exp_to_xml(qeh->qeh_exp_rec, buf, sizeof(buf)); 155 LSQ_NOTICE("%s", buf); 156 lsquic_qpack_exp_destroy(qeh->qeh_exp_rec); 157 qeh->qeh_exp_rec = NULL; 158} 159 160 161void 162lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh) 163{ 164 if (qeh->qeh_flags & QEH_INITIALIZED) 165 { 166 LSQ_DEBUG("cleanup"); 167 if (qeh->qeh_exp_rec) 168 qeh_log_and_clean_exp_rec(qeh); 169 lsqpack_enc_cleanup(&qeh->qeh_encoder); 170 lsquic_frab_list_cleanup(&qeh->qeh_fral); 171 memset(qeh, 0, sizeof(*qeh)); 172 } 173} 174 175static lsquic_stream_ctx_t * 176qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 177{ 178 struct qpack_enc_hdl *const qeh = stream_if_ctx; 179 qeh->qeh_enc_sm_out = stream; 180 if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 181 == (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 182 qeh_begin_out(qeh); 183 else 184 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 185 LSQ_DEBUG("initialized outgoing encoder stream"); 186 return (void *) qeh; 187} 188 189 190static void 191qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 192{ 193 struct qpack_enc_hdl *const qeh = (void *) ctx; 194 struct lsquic_reader reader = { 195 .lsqr_read = lsquic_frab_list_read, 196 .lsqr_size = lsquic_frab_list_size, 197 .lsqr_ctx = &qeh->qeh_fral, 198 }; 199 ssize_t nw; 200 201 nw = lsquic_stream_writef(stream, &reader); 202 if (nw >= 0) 203 { 204 LSQ_DEBUG("wrote %zd bytes to stream", nw); 205 (void) lsquic_stream_flush(stream); 206 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 207 lsquic_stream_wantwrite(stream, 0); 208 } 209 else 210 { 211 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 212 "cannot write to stream"); 213 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 214 lsquic_stream_wantwrite(stream, 0); 215 } 216} 217 218 219static void 220qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 221{ 222 struct qpack_enc_hdl *const qeh = (void *) ctx; 223 qeh->qeh_enc_sm_out = NULL; 224 LSQ_DEBUG("closed outgoing encoder stream"); 225} 226 227 228static void 229qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 230{ 231 assert(0); 232} 233 234 235static const struct lsquic_stream_if qeh_enc_sm_out_if = 236{ 237 .on_new_stream = qeh_out_on_new, 238 .on_read = qeh_out_on_read, 239 .on_write = qeh_out_on_write, 240 .on_close = qeh_out_on_close, 241}; 242const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if = 243 &qeh_enc_sm_out_if; 244 245 246static lsquic_stream_ctx_t * 247qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 248{ 249 struct qpack_enc_hdl *const qeh = stream_if_ctx; 250 qeh->qeh_dec_sm_in = stream; 251 if (qeh->qeh_flags & QEH_INITIALIZED) 252 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 253 else 254 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 255 LSQ_DEBUG("initialized incoming decoder stream"); 256 return (void *) qeh; 257} 258 259 260static size_t 261qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz, 262 int fin) 263{ 264 struct qpack_enc_hdl *const qeh = (void *) ctx; 265 uint64_t offset; 266 int s; 267 268 if (fin) 269 { 270 LSQ_INFO("decoder stream is closed"); 271 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 272 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream"); 273 goto end; 274 } 275 276 offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in); 277 s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz); 278 if (s != 0) 279 { 280 LSQ_INFO("error reading decoder stream"); 281 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 282 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder " 283 "stream at offset %"PRIu64, offset); 284 goto end; 285 } 286 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 287 288 end: 289 return sz; 290} 291 292 293static void 294qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 295{ 296 struct qpack_enc_hdl *const qeh = (void *) ctx; 297 ssize_t nread; 298 299 nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh); 300 if (nread <= 0) 301 { 302 if (nread < 0) 303 { 304 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 305 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 306 "cannot read from encoder stream"); 307 } 308 else 309 { 310 LSQ_INFO("encoder stream closed by peer: abort connection"); 311 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 312 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 313 } 314 lsquic_stream_wantread(stream, 0); 315 } 316} 317 318 319static void 320qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 321{ 322 struct qpack_enc_hdl *const qeh = (void *) ctx; 323 LSQ_DEBUG("closed incoming decoder stream"); 324 qeh->qeh_dec_sm_in = NULL; 325} 326 327 328static void 329qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 330{ 331 assert(0); 332} 333 334 335static const struct lsquic_stream_if qeh_dec_sm_in_if = 336{ 337 .on_new_stream = qeh_in_on_new, 338 .on_read = qeh_in_on_read, 339 .on_write = qeh_in_on_write, 340 .on_close = qeh_in_on_close, 341}; 342const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if = 343 &qeh_dec_sm_in_if; 344 345 346static void 347qeh_maybe_set_user_agent (struct qpack_enc_hdl *qeh, 348 const struct lsquic_http_headers *headers) 349{ 350 const char *const name = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 351 "server" : "user-agent"; 352 const size_t len = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 6 : 10; 353 int i; 354 355 for (i = 0; i < headers->count; ++i) 356 if (len == headers->headers[i].name_len 357 && 0 == memcmp(name, 358 lsxpack_header_get_name(&headers->headers[i]), len)) 359 { 360 qeh->qeh_exp_rec->qer_user_agent = strndup( 361 lsxpack_header_get_value(&headers->headers[i]), 362 headers->headers[i].val_len); 363 break; 364 } 365} 366 367 368static enum qwh_status 369qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id, 370 unsigned seqno, const struct lsquic_http_headers *headers, 371 unsigned char *buf, size_t *prefix_sz, size_t *headers_sz, 372 uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags) 373{ 374 unsigned char *p = buf; 375 unsigned char *const end = buf + *headers_sz; 376 const unsigned char *enc_p; 377 size_t enc_sz, hea_sz, total_enc_sz; 378 ssize_t nw; 379 enum lsqpack_enc_status st; 380 int i, s, write_to_stream; 381 enum lsqpack_enc_flags enc_flags; 382 enum qwh_status retval; 383#ifndef WIN32 384 unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ]; 385#else 386 unsigned char *enc_buf; 387 enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2); 388 if (!enc_buf) 389 return QWH_ERR; 390#endif 391 392 if (qeh->qeh_exp_rec) 393 { 394 const lsquic_time_t now = lsquic_time_now(); 395 if (qeh->qeh_exp_rec->qer_hblock_count == 0) 396 qeh->qeh_exp_rec->qer_first_req = now; 397 qeh->qeh_exp_rec->qer_last_req = now; 398 ++qeh->qeh_exp_rec->qer_hblock_count; 399 if (!qeh->qeh_exp_rec->qer_user_agent) 400 qeh_maybe_set_user_agent(qeh, headers); 401 } 402 403 s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0); 404 if (s != 0) 405 { 406 LSQ_WARN("cannot start header"); 407 retval = QWH_ERR; 408 goto end; 409 } 410 LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id); 411 412 if (qeh->qeh_enc_sm_out) 413 enc_flags = 0; 414 else 415 { 416 enc_flags = LQEF_NO_INDEX; 417 LSQ_DEBUG("encoder stream is unavailable, won't index headers"); 418 } 419 write_to_stream = qeh->qeh_enc_sm_out 420 && lsquic_frab_list_empty(&qeh->qeh_fral); 421 total_enc_sz = 0; 422 for (i = 0; i < headers->count; ++i) 423 { 424 if (headers->headers[i].buf == NULL) 425 continue; 426 enc_sz = sizeof(enc_buf); 427 hea_sz = end - p; 428 st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p, 429 &hea_sz, &headers->headers[i], enc_flags); 430 switch (st) 431 { 432 case LQES_OK: 433 LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, " 434 "%zd bytes to encoder stream", 435 (int) headers->headers[i].name_len, 436 lsxpack_header_get_name(&headers->headers[i]), 437 (int) headers->headers[i].val_len, 438 lsxpack_header_get_value(&headers->headers[i]), 439 hea_sz, enc_sz); 440 total_enc_sz += enc_sz; 441 p += hea_sz; 442 if (enc_sz) 443 { 444 if (write_to_stream) 445 { 446 nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz); 447 if ((size_t) nw == enc_sz) 448 break; 449 if (nw < 0) 450 { 451 LSQ_INFO("could not write to encoder stream: %s", 452 strerror(errno)); 453 retval = QWH_ERR; 454 goto end; 455 } 456 write_to_stream = 0; 457 enc_p = enc_buf + (size_t) nw; 458 enc_sz -= (size_t) nw; 459 } 460 else 461 enc_p = enc_buf; 462 if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz)) 463 { 464 LSQ_INFO("could not write to frab list"); 465 retval = QWH_ERR; 466 goto end; 467 } 468 } 469 break; 470 case LQES_NOBUF_HEAD: 471 retval = QWH_ENOBUF; 472 goto end; 473 default: 474 assert(0); 475 retval = QWH_ERR; 476 goto end; 477 case LQES_NOBUF_ENC: 478 LSQ_DEBUG("not enough room to write encoder stream data"); 479 retval = QWH_ERR; 480 goto end; 481 } 482 } 483 484 nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz, 485 *prefix_sz, hflags); 486 if (nw <= 0) 487 { 488 LSQ_WARN("could not end header: %zd", nw); 489 retval = QWH_ERR; 490 goto end; 491 } 492 493 if ((size_t) nw < *prefix_sz) 494 { 495 memmove(buf - nw, buf - *prefix_sz, (size_t) nw); 496 *prefix_sz = (size_t) nw; 497 } 498 *headers_sz = p - buf; 499 if (qeh->qeh_exp_rec) 500 qeh->qeh_exp_rec->qer_hblock_size += p - buf; 501 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 502 { 503 LSQ_DEBUG("all %zd bytes of encoder stream written out; header block " 504 "is %zd bytes; estimated compression ratio %.3f", total_enc_sz, 505 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 506 retval = QWH_FULL; 507 goto end; 508 } 509 else 510 { 511 *completion_offset = lsquic_qeh_enc_off(qeh) 512 + lsquic_frab_list_size(&qeh->qeh_fral); 513 LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes " 514 "buffered; header block is %zd bytes; estimated compression ratio " 515 "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral), 516 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 517 retval = QWH_PARTIAL; 518 goto end; 519 } 520 521 end: 522#ifdef WIN32 523 _freea(enc_buf); 524#endif 525 return retval; 526} 527 528 529#if !defined(NDEBUG) && __GNUC__ 530__attribute__((weak)) 531#endif 532enum qwh_status 533lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh, 534 lsquic_stream_id_t stream_id, unsigned seqno, 535 const struct lsquic_http_headers *headers, unsigned char *buf, 536 size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset, 537 enum lsqpack_enc_header_flags *hflags) 538{ 539 if (qeh->qeh_flags & QEH_INITIALIZED) 540 return qeh_write_headers(qeh, stream_id, seqno, headers, buf, 541 prefix_sz, headers_sz, completion_offset, hflags); 542 else 543 return QWH_ERR; 544} 545 546 547#if !defined(NDEBUG) && __GNUC__ 548__attribute__((weak)) 549#endif 550uint64_t 551lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh) 552{ 553 if (qeh->qeh_enc_sm_out) 554 return qeh->qeh_enc_sm_out->tosend_off; 555 else 556 return 0; 557} 558 559 560size_t 561lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh) 562{ 563 if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out) 564 return lsquic_stream_write_avail(qeh->qeh_enc_sm_out); 565 else if (qeh->qeh_flags & QEH_INITIALIZED) 566 return ~((size_t) 0); /* Unlimited write */ 567 else 568 return 0; 569} 570 571 572size_t 573lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh) 574{ 575 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 576 return qeh->qeh_max_prefix_size; 577 else 578 return LSQPACK_UINT64_ENC_SZ * 2; 579} 580