lsquic_qenc_hdl.c revision 758aff32
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#ifdef WIN32 14#include <malloc.h> 15#endif 16 17#include "lsquic.h" 18#include "lsquic_types.h" 19#include "lsquic_int_types.h" 20#include "lsquic_sfcw.h" 21#include "lsquic_varint.h" 22#include "lsquic_hq.h" 23#include "lsquic_hash.h" 24#include "lsquic_stream.h" 25#include "lsquic_frab_list.h" 26#include "lsqpack.h" 27#include "lsxpack_header.h" 28#include "lsquic_conn.h" 29#include "lsquic_qpack_exp.h" 30#include "lsquic_util.h" 31#include "lsquic_qenc_hdl.h" 32 33#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL 34#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn) 35#include "lsquic_logger.h" 36 37 38static int 39qeh_write_type (struct qpack_enc_hdl *qeh) 40{ 41 int s; 42 43#ifndef NDEBUG 44 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 45 if (env && atoi(env)) 46 { 47 s = rand() & 3; 48 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 49 } 50 else 51#endif 52 s = 0; 53 54 switch (s) 55 { 56 case 0: 57 return lsquic_frab_list_write(&qeh->qeh_fral, 58 (unsigned char []) { HQUST_QPACK_ENC }, 1); 59 case 1: 60 return lsquic_frab_list_write(&qeh->qeh_fral, 61 (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2); 62 case 2: 63 return lsquic_frab_list_write(&qeh->qeh_fral, 64 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4); 65 default: 66 return lsquic_frab_list_write(&qeh->qeh_fral, 67 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 68 HQUST_QPACK_ENC }, 8); 69 } 70} 71 72 73static void 74qeh_begin_out (struct qpack_enc_hdl *qeh) 75{ 76 if (0 == qeh_write_type(qeh) 77 && (qeh->qeh_tsu_sz == 0 78 || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf, 79 qeh->qeh_tsu_sz))) 80 { 81 LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz); 82 lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1); 83 } 84 else 85 { 86 LSQ_WARN("could not write to frab list"); 87 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 88 "cannot write to frab list"); 89 } 90} 91 92 93void 94lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn) 95{ 96 assert(!(qeh->qeh_flags & QEH_INITIALIZED)); 97 qeh->qeh_conn = conn; 98 lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL); 99 lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn); 100 qeh->qeh_flags |= QEH_INITIALIZED; 101 qeh->qeh_max_prefix_size = 102 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 103 if (qeh->qeh_dec_sm_in) 104 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 105 LSQ_DEBUG("initialized"); 106} 107 108 109int 110lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size, 111 unsigned dyn_table_size, unsigned max_risked_streams, int server) 112{ 113 enum lsqpack_enc_opts enc_opts; 114 115 assert(qeh->qeh_flags & QEH_INITIALIZED); 116 117 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 118 { 119 LSQ_WARN("settings already set"); 120 return -1; 121 } 122 123 enc_opts = LSQPACK_ENC_OPT_STAGE_2 124 | (server ? LSQPACK_ENC_OPT_SERVER : 0); 125 qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf); 126 if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn, 127 max_table_size, dyn_table_size, max_risked_streams, enc_opts, 128 qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz)) 129 { 130 LSQ_INFO("could not initialize QPACK encoder"); 131 return -1; 132 } 133 LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz); 134 qeh->qeh_flags |= QEH_HAVE_SETTINGS; 135 qeh->qeh_max_prefix_size = 136 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 137 LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked " 138 "streams=%u", max_table_size, dyn_table_size, max_risked_streams); 139 if (qeh->qeh_enc_sm_out) 140 qeh_begin_out(qeh); 141 return 0; 142} 143 144 145static void 146qeh_log_and_clean_exp_rec (struct qpack_enc_hdl *qeh) 147{ 148 char buf[0x400]; 149 150 qeh->qeh_exp_rec->qer_comp_ratio = lsqpack_enc_ratio(&qeh->qeh_encoder); 151 (void) lsquic_qpack_exp_to_xml(qeh->qeh_exp_rec, buf, sizeof(buf)); 152 LSQ_NOTICE("%s", buf); 153 lsquic_qpack_exp_destroy(qeh->qeh_exp_rec); 154 qeh->qeh_exp_rec = NULL; 155} 156 157 158void 159lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh) 160{ 161 if (qeh->qeh_flags & QEH_INITIALIZED) 162 { 163 LSQ_DEBUG("cleanup"); 164 if (qeh->qeh_exp_rec) 165 qeh_log_and_clean_exp_rec(qeh); 166 lsqpack_enc_cleanup(&qeh->qeh_encoder); 167 lsquic_frab_list_cleanup(&qeh->qeh_fral); 168 memset(qeh, 0, sizeof(*qeh)); 169 } 170} 171 172static lsquic_stream_ctx_t * 173qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 174{ 175 struct qpack_enc_hdl *const qeh = stream_if_ctx; 176 qeh->qeh_enc_sm_out = stream; 177 if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 178 == (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 179 qeh_begin_out(qeh); 180 else 181 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 182 LSQ_DEBUG("initialized outgoing encoder stream"); 183 return (void *) qeh; 184} 185 186 187static void 188qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 189{ 190 struct qpack_enc_hdl *const qeh = (void *) ctx; 191 struct lsquic_reader reader = { 192 .lsqr_read = lsquic_frab_list_read, 193 .lsqr_size = lsquic_frab_list_size, 194 .lsqr_ctx = &qeh->qeh_fral, 195 }; 196 ssize_t nw; 197 198 nw = lsquic_stream_writef(stream, &reader); 199 if (nw >= 0) 200 { 201 LSQ_DEBUG("wrote %zd bytes to stream", nw); 202 (void) lsquic_stream_flush(stream); 203 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 204 lsquic_stream_wantwrite(stream, 0); 205 } 206 else 207 { 208 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 209 "cannot write to stream"); 210 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 211 lsquic_stream_wantwrite(stream, 0); 212 } 213} 214 215 216static void 217qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 218{ 219 struct qpack_enc_hdl *const qeh = (void *) ctx; 220 qeh->qeh_enc_sm_out = NULL; 221 LSQ_DEBUG("closed outgoing encoder stream"); 222} 223 224 225static void 226qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 227{ 228 assert(0); 229} 230 231 232static const struct lsquic_stream_if qeh_enc_sm_out_if = 233{ 234 .on_new_stream = qeh_out_on_new, 235 .on_read = qeh_out_on_read, 236 .on_write = qeh_out_on_write, 237 .on_close = qeh_out_on_close, 238}; 239const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if = 240 &qeh_enc_sm_out_if; 241 242 243static lsquic_stream_ctx_t * 244qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 245{ 246 struct qpack_enc_hdl *const qeh = stream_if_ctx; 247 qeh->qeh_dec_sm_in = stream; 248 if (qeh->qeh_flags & QEH_INITIALIZED) 249 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 250 else 251 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 252 LSQ_DEBUG("initialized incoming decoder stream"); 253 return (void *) qeh; 254} 255 256 257static size_t 258qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz, 259 int fin) 260{ 261 struct qpack_enc_hdl *const qeh = (void *) ctx; 262 uint64_t offset; 263 int s; 264 265 if (fin) 266 { 267 LSQ_INFO("decoder stream is closed"); 268 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 269 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream"); 270 goto end; 271 } 272 273 offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in); 274 s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz); 275 if (s != 0) 276 { 277 LSQ_INFO("error reading decoder stream"); 278 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 279 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder " 280 "stream at offset %"PRIu64, offset); 281 goto end; 282 } 283 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 284 285 end: 286 return sz; 287} 288 289 290static void 291qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 292{ 293 struct qpack_enc_hdl *const qeh = (void *) ctx; 294 ssize_t nread; 295 296 nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh); 297 if (nread <= 0) 298 { 299 if (nread < 0) 300 { 301 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 302 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 303 "cannot read from encoder stream"); 304 } 305 else 306 { 307 LSQ_INFO("encoder stream closed by peer: abort connection"); 308 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 309 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 310 } 311 lsquic_stream_wantread(stream, 0); 312 } 313} 314 315 316static void 317qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 318{ 319 struct qpack_enc_hdl *const qeh = (void *) ctx; 320 LSQ_DEBUG("closed incoming decoder stream"); 321 qeh->qeh_dec_sm_in = NULL; 322} 323 324 325static void 326qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 327{ 328 assert(0); 329} 330 331 332static const struct lsquic_stream_if qeh_dec_sm_in_if = 333{ 334 .on_new_stream = qeh_in_on_new, 335 .on_read = qeh_in_on_read, 336 .on_write = qeh_in_on_write, 337 .on_close = qeh_in_on_close, 338}; 339const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if = 340 &qeh_dec_sm_in_if; 341 342 343static void 344qeh_maybe_set_user_agent (struct qpack_enc_hdl *qeh, 345 const struct lsquic_http_headers *headers) 346{ 347 const char *const name = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 348 "server" : "user-agent"; 349 const size_t len = qeh->qeh_exp_rec->qer_flags & QER_SERVER ? 6 : 10; 350 int i; 351 352 for (i = 0; i < headers->count; ++i) 353 if (len == headers->headers[i].name_len 354 && 0 == memcmp(name, 355 lsxpack_header_get_name(&headers->headers[i]), len)) 356 { 357 qeh->qeh_exp_rec->qer_user_agent = strndup( 358 lsxpack_header_get_value(&headers->headers[i]), 359 headers->headers[i].val_len); 360 break; 361 } 362} 363 364 365static enum qwh_status 366qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id, 367 unsigned seqno, const struct lsquic_http_headers *headers, 368 unsigned char *buf, size_t *prefix_sz, size_t *headers_sz, 369 uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags) 370{ 371 unsigned char *p = buf; 372 unsigned char *const end = buf + *headers_sz; 373 const unsigned char *enc_p; 374 size_t enc_sz, hea_sz, total_enc_sz; 375 ssize_t nw; 376 enum lsqpack_enc_status st; 377 int i, s, write_to_stream; 378 enum lsqpack_enc_flags enc_flags; 379 enum qwh_status retval; 380#ifndef WIN32 381 unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ]; 382#else 383 unsigned char *enc_buf; 384 enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2); 385 if (!enc_buf) 386 return QWH_ERR; 387#endif 388 389 if (qeh->qeh_exp_rec) 390 { 391 const lsquic_time_t now = lsquic_time_now(); 392 if (qeh->qeh_exp_rec->qer_hblock_count == 0) 393 qeh->qeh_exp_rec->qer_first_req = now; 394 qeh->qeh_exp_rec->qer_last_req = now; 395 ++qeh->qeh_exp_rec->qer_hblock_count; 396 if (!qeh->qeh_exp_rec->qer_user_agent) 397 qeh_maybe_set_user_agent(qeh, headers); 398 } 399 400 s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0); 401 if (s != 0) 402 { 403 LSQ_WARN("cannot start header"); 404 retval = QWH_ERR; 405 goto end; 406 } 407 LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id); 408 409 if (qeh->qeh_enc_sm_out) 410 enc_flags = 0; 411 else 412 { 413 enc_flags = LQEF_NO_INDEX; 414 LSQ_DEBUG("encoder stream is unavailable, won't index headers"); 415 } 416 write_to_stream = qeh->qeh_enc_sm_out 417 && lsquic_frab_list_empty(&qeh->qeh_fral); 418 total_enc_sz = 0; 419 for (i = 0; i < headers->count; ++i) 420 { 421 if (headers->headers[i].buf == NULL) 422 continue; 423 enc_sz = sizeof(enc_buf); 424 hea_sz = end - p; 425 st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p, 426 &hea_sz, &headers->headers[i], enc_flags); 427 switch (st) 428 { 429 case LQES_OK: 430 LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, " 431 "%zd bytes to encoder stream", 432 (int) headers->headers[i].name_len, 433 lsxpack_header_get_name(&headers->headers[i]), 434 (int) headers->headers[i].val_len, 435 lsxpack_header_get_value(&headers->headers[i]), 436 hea_sz, enc_sz); 437 total_enc_sz += enc_sz; 438 p += hea_sz; 439 if (enc_sz) 440 { 441 if (write_to_stream) 442 { 443 nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz); 444 if ((size_t) nw == enc_sz) 445 break; 446 if (nw < 0) 447 { 448 LSQ_INFO("could not write to encoder stream: %s", 449 strerror(errno)); 450 retval = QWH_ERR; 451 goto end; 452 } 453 write_to_stream = 0; 454 enc_p = enc_buf + (size_t) nw; 455 enc_sz -= (size_t) nw; 456 } 457 else 458 enc_p = enc_buf; 459 if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz)) 460 { 461 LSQ_INFO("could not write to frab list"); 462 retval = QWH_ERR; 463 goto end; 464 } 465 } 466 break; 467 case LQES_NOBUF_HEAD: 468 retval = QWH_ENOBUF; 469 goto end; 470 default: 471 assert(0); 472 retval = QWH_ERR; 473 goto end; 474 case LQES_NOBUF_ENC: 475 LSQ_DEBUG("not enough room to write encoder stream data"); 476 retval = QWH_ERR; 477 goto end; 478 } 479 } 480 481 nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz, 482 *prefix_sz, hflags); 483 if (nw <= 0) 484 { 485 LSQ_WARN("could not end header: %zd", nw); 486 retval = QWH_ERR; 487 goto end; 488 } 489 490 if ((size_t) nw < *prefix_sz) 491 { 492 memmove(buf - nw, buf - *prefix_sz, (size_t) nw); 493 *prefix_sz = (size_t) nw; 494 } 495 *headers_sz = p - buf; 496 if (qeh->qeh_exp_rec) 497 qeh->qeh_exp_rec->qer_hblock_size += p - buf; 498 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 499 { 500 LSQ_DEBUG("all %zd bytes of encoder stream written out; header block " 501 "is %zd bytes; estimated compression ratio %.3f", total_enc_sz, 502 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 503 retval = QWH_FULL; 504 goto end; 505 } 506 else 507 { 508 *completion_offset = lsquic_qeh_enc_off(qeh) 509 + lsquic_frab_list_size(&qeh->qeh_fral); 510 LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes " 511 "buffered; header block is %zd bytes; estimated compression ratio " 512 "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral), 513 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 514 retval = QWH_PARTIAL; 515 goto end; 516 } 517 518 end: 519#ifdef WIN32 520 _freea(enc_buf); 521#endif 522 return retval; 523} 524 525 526#if !defined(NDEBUG) && __GNUC__ 527__attribute__((weak)) 528#endif 529enum qwh_status 530lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh, 531 lsquic_stream_id_t stream_id, unsigned seqno, 532 const struct lsquic_http_headers *headers, unsigned char *buf, 533 size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset, 534 enum lsqpack_enc_header_flags *hflags) 535{ 536 if (qeh->qeh_flags & QEH_INITIALIZED) 537 return qeh_write_headers(qeh, stream_id, seqno, headers, buf, 538 prefix_sz, headers_sz, completion_offset, hflags); 539 else 540 return QWH_ERR; 541} 542 543 544#if !defined(NDEBUG) && __GNUC__ 545__attribute__((weak)) 546#endif 547uint64_t 548lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh) 549{ 550 if (qeh->qeh_enc_sm_out) 551 return qeh->qeh_enc_sm_out->tosend_off; 552 else 553 return 0; 554} 555 556 557size_t 558lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh) 559{ 560 if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out) 561 return lsquic_stream_write_avail(qeh->qeh_enc_sm_out); 562 else if (qeh->qeh_flags & QEH_INITIALIZED) 563 return ~((size_t) 0); /* Unlimited write */ 564 else 565 return 0; 566} 567 568 569size_t 570lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh) 571{ 572 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 573 return qeh->qeh_max_prefix_size; 574 else 575 return LSQPACK_UINT64_ENC_SZ * 2; 576} 577