lsquic_qenc_hdl.c revision fb3e20e0
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#ifdef WIN32 14#include <malloc.h> 15#endif 16 17#include "lsquic.h" 18#include "lsquic_types.h" 19#include "lsquic_int_types.h" 20#include "lsquic_sfcw.h" 21#include "lsquic_varint.h" 22#include "lsquic_hq.h" 23#include "lsquic_hash.h" 24#include "lsquic_stream.h" 25#include "lsquic_frab_list.h" 26#include "lsqpack.h" 27#include "lsxpack_header.h" 28#include "lsquic_conn.h" 29#include "lsquic_qenc_hdl.h" 30 31#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL 32#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn) 33#include "lsquic_logger.h" 34 35 36static int 37qeh_write_type (struct qpack_enc_hdl *qeh) 38{ 39 int s; 40 41#ifndef NDEBUG 42 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 43 if (env && atoi(env)) 44 { 45 s = rand() & 3; 46 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 47 } 48 else 49#endif 50 s = 0; 51 52 switch (s) 53 { 54 case 0: 55 return lsquic_frab_list_write(&qeh->qeh_fral, 56 (unsigned char []) { HQUST_QPACK_ENC }, 1); 57 case 1: 58 return lsquic_frab_list_write(&qeh->qeh_fral, 59 (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2); 60 case 2: 61 return lsquic_frab_list_write(&qeh->qeh_fral, 62 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4); 63 default: 64 return lsquic_frab_list_write(&qeh->qeh_fral, 65 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 66 HQUST_QPACK_ENC }, 8); 67 } 68} 69 70 71static void 72qeh_begin_out (struct qpack_enc_hdl *qeh) 73{ 74 if (0 == qeh_write_type(qeh) 75 && (qeh->qeh_tsu_sz == 0 76 || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf, 77 qeh->qeh_tsu_sz))) 78 { 79 LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz); 80 lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1); 81 } 82 else 83 { 84 LSQ_WARN("could not write to frab list"); 85 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 86 "cannot write to frab list"); 87 } 88} 89 90 91void 92lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn) 93{ 94 assert(!(qeh->qeh_flags & QEH_INITIALIZED)); 95 qeh->qeh_conn = conn; 96 lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL); 97 lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn); 98 qeh->qeh_flags |= QEH_INITIALIZED; 99 qeh->qeh_max_prefix_size = 100 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 101 if (qeh->qeh_dec_sm_in) 102 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 103 LSQ_DEBUG("initialized"); 104} 105 106 107int 108lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size, 109 unsigned dyn_table_size, unsigned max_risked_streams, int server) 110{ 111 enum lsqpack_enc_opts enc_opts; 112 113 assert(qeh->qeh_flags & QEH_INITIALIZED); 114 115 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 116 { 117 LSQ_WARN("settings already set"); 118 return -1; 119 } 120 121 enc_opts = LSQPACK_ENC_OPT_STAGE_2 122 | (server ? LSQPACK_ENC_OPT_SERVER : 0); 123 qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf); 124 if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn, 125 max_table_size, dyn_table_size, max_risked_streams, enc_opts, 126 qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz)) 127 { 128 LSQ_INFO("could not initialize QPACK encoder"); 129 return -1; 130 } 131 LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz); 132 qeh->qeh_flags |= QEH_HAVE_SETTINGS; 133 qeh->qeh_max_prefix_size = 134 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 135 LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked " 136 "streams=%u", max_table_size, dyn_table_size, max_risked_streams); 137 if (qeh->qeh_enc_sm_out) 138 qeh_begin_out(qeh); 139 return 0; 140} 141 142 143void 144lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh) 145{ 146 if (qeh->qeh_flags & QEH_INITIALIZED) 147 { 148 LSQ_DEBUG("cleanup"); 149 lsqpack_enc_cleanup(&qeh->qeh_encoder); 150 lsquic_frab_list_cleanup(&qeh->qeh_fral); 151 memset(qeh, 0, sizeof(*qeh)); 152 } 153} 154 155static lsquic_stream_ctx_t * 156qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 157{ 158 struct qpack_enc_hdl *const qeh = stream_if_ctx; 159 qeh->qeh_enc_sm_out = stream; 160 if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 161 == (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 162 qeh_begin_out(qeh); 163 else 164 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 165 LSQ_DEBUG("initialized outgoing encoder stream"); 166 return (void *) qeh; 167} 168 169 170static void 171qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 172{ 173 struct qpack_enc_hdl *const qeh = (void *) ctx; 174 struct lsquic_reader reader = { 175 .lsqr_read = lsquic_frab_list_read, 176 .lsqr_size = lsquic_frab_list_size, 177 .lsqr_ctx = &qeh->qeh_fral, 178 }; 179 ssize_t nw; 180 181 nw = lsquic_stream_writef(stream, &reader); 182 if (nw >= 0) 183 { 184 LSQ_DEBUG("wrote %zd bytes to stream", nw); 185 (void) lsquic_stream_flush(stream); 186 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 187 lsquic_stream_wantwrite(stream, 0); 188 } 189 else 190 { 191 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 192 "cannot write to stream"); 193 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 194 lsquic_stream_wantwrite(stream, 0); 195 } 196} 197 198 199static void 200qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 201{ 202 struct qpack_enc_hdl *const qeh = (void *) ctx; 203 qeh->qeh_enc_sm_out = NULL; 204 LSQ_DEBUG("closed outgoing encoder stream"); 205} 206 207 208static void 209qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 210{ 211 assert(0); 212} 213 214 215static const struct lsquic_stream_if qeh_enc_sm_out_if = 216{ 217 .on_new_stream = qeh_out_on_new, 218 .on_read = qeh_out_on_read, 219 .on_write = qeh_out_on_write, 220 .on_close = qeh_out_on_close, 221}; 222const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if = 223 &qeh_enc_sm_out_if; 224 225 226static lsquic_stream_ctx_t * 227qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 228{ 229 struct qpack_enc_hdl *const qeh = stream_if_ctx; 230 qeh->qeh_dec_sm_in = stream; 231 if (qeh->qeh_flags & QEH_INITIALIZED) 232 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 233 else 234 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 235 LSQ_DEBUG("initialized incoming decoder stream"); 236 return (void *) qeh; 237} 238 239 240static size_t 241qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz, 242 int fin) 243{ 244 struct qpack_enc_hdl *const qeh = (void *) ctx; 245 uint64_t offset; 246 int s; 247 248 if (fin) 249 { 250 LSQ_INFO("decoder stream is closed"); 251 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 252 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream"); 253 goto end; 254 } 255 256 offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in); 257 s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz); 258 if (s != 0) 259 { 260 LSQ_INFO("error reading decoder stream"); 261 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 262 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder " 263 "stream at offset %"PRIu64, offset); 264 goto end; 265 } 266 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 267 268 end: 269 return sz; 270} 271 272 273static void 274qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 275{ 276 struct qpack_enc_hdl *const qeh = (void *) ctx; 277 ssize_t nread; 278 279 nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh); 280 if (nread <= 0) 281 { 282 if (nread < 0) 283 { 284 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 285 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 286 "cannot read from encoder stream"); 287 } 288 else 289 { 290 LSQ_INFO("encoder stream closed by peer: abort connection"); 291 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 292 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 293 } 294 lsquic_stream_wantread(stream, 0); 295 } 296} 297 298 299static void 300qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 301{ 302 struct qpack_enc_hdl *const qeh = (void *) ctx; 303 LSQ_DEBUG("closed incoming decoder stream"); 304 qeh->qeh_dec_sm_in = NULL; 305} 306 307 308static void 309qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 310{ 311 assert(0); 312} 313 314 315static const struct lsquic_stream_if qeh_dec_sm_in_if = 316{ 317 .on_new_stream = qeh_in_on_new, 318 .on_read = qeh_in_on_read, 319 .on_write = qeh_in_on_write, 320 .on_close = qeh_in_on_close, 321}; 322const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if = 323 &qeh_dec_sm_in_if; 324 325 326static enum qwh_status 327qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id, 328 unsigned seqno, const struct lsquic_http_headers *headers, 329 unsigned char *buf, size_t *prefix_sz, size_t *headers_sz, 330 uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags) 331{ 332 unsigned char *p = buf; 333 unsigned char *const end = buf + *headers_sz; 334 const unsigned char *enc_p; 335 size_t enc_sz, hea_sz, total_enc_sz; 336 ssize_t nw; 337 enum lsqpack_enc_status st; 338 int i, s, write_to_stream; 339 enum lsqpack_enc_flags enc_flags; 340 enum qwh_status retval; 341#ifndef WIN32 342 unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ]; 343#else 344 unsigned char *enc_buf; 345 enc_buf = _malloca(qeh->qeh_encoder.qpe_cur_max_capacity * 2); 346 if (!enc_buf) 347 return QWH_ERR; 348#endif 349 350 s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0); 351 if (s != 0) 352 { 353 LSQ_WARN("cannot start header"); 354 retval = QWH_ERR; 355 goto end; 356 } 357 LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id); 358 359 if (qeh->qeh_enc_sm_out) 360 enc_flags = 0; 361 else 362 { 363 enc_flags = LQEF_NO_INDEX; 364 LSQ_DEBUG("encoder stream is unavailable, won't index headers"); 365 } 366 write_to_stream = qeh->qeh_enc_sm_out 367 && lsquic_frab_list_empty(&qeh->qeh_fral); 368 total_enc_sz = 0; 369 for (i = 0; i < headers->count; ++i) 370 { 371 if (headers->headers[i].buf == NULL) 372 continue; 373 enc_sz = sizeof(enc_buf); 374 hea_sz = end - p; 375 st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p, 376 &hea_sz, &headers->headers[i], enc_flags); 377 switch (st) 378 { 379 case LQES_OK: 380 LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, " 381 "%zd bytes to encoder stream", 382 (int) headers->headers[i].name_len, 383 lsxpack_header_get_name(&headers->headers[i]), 384 (int) headers->headers[i].val_len, 385 lsxpack_header_get_value(&headers->headers[i]), 386 hea_sz, enc_sz); 387 total_enc_sz += enc_sz; 388 p += hea_sz; 389 if (enc_sz) 390 { 391 if (write_to_stream) 392 { 393 nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz); 394 if ((size_t) nw == enc_sz) 395 break; 396 if (nw < 0) 397 { 398 LSQ_INFO("could not write to encoder stream: %s", 399 strerror(errno)); 400 retval = QWH_ERR; 401 goto end; 402 } 403 write_to_stream = 0; 404 enc_p = enc_buf + (size_t) nw; 405 enc_sz -= (size_t) nw; 406 } 407 else 408 enc_p = enc_buf; 409 if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz)) 410 { 411 LSQ_INFO("could not write to frab list"); 412 retval = QWH_ERR; 413 goto end; 414 } 415 } 416 break; 417 case LQES_NOBUF_HEAD: 418 retval = QWH_ENOBUF; 419 goto end; 420 default: 421 assert(0); 422 retval = QWH_ERR; 423 goto end; 424 case LQES_NOBUF_ENC: 425 LSQ_DEBUG("not enough room to write encoder stream data"); 426 retval = QWH_ERR; 427 goto end; 428 } 429 } 430 431 nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz, 432 *prefix_sz, hflags); 433 if (nw <= 0) 434 { 435 LSQ_WARN("could not end header: %zd", nw); 436 retval = QWH_ERR; 437 goto end; 438 } 439 440 if ((size_t) nw < *prefix_sz) 441 { 442 memmove(buf - nw, buf - *prefix_sz, (size_t) nw); 443 *prefix_sz = (size_t) nw; 444 } 445 *headers_sz = p - buf; 446 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 447 { 448 LSQ_DEBUG("all %zd bytes of encoder stream written out; header block " 449 "is %zd bytes; estimated compression ratio %.3f", total_enc_sz, 450 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 451 retval = QWH_FULL; 452 goto end; 453 } 454 else 455 { 456 *completion_offset = lsquic_qeh_enc_off(qeh) 457 + lsquic_frab_list_size(&qeh->qeh_fral); 458 LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes " 459 "buffered; header block is %zd bytes; estimated compression ratio " 460 "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral), 461 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 462 retval = QWH_PARTIAL; 463 goto end; 464 } 465 466 end: 467#ifdef WIN32 468 _freea(enc_buf); 469#endif 470 return retval; 471} 472 473 474#if !defined(NDEBUG) && __GNUC__ 475__attribute__((weak)) 476#endif 477enum qwh_status 478lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh, 479 lsquic_stream_id_t stream_id, unsigned seqno, 480 const struct lsquic_http_headers *headers, unsigned char *buf, 481 size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset, 482 enum lsqpack_enc_header_flags *hflags) 483{ 484 if (qeh->qeh_flags & QEH_INITIALIZED) 485 return qeh_write_headers(qeh, stream_id, seqno, headers, buf, 486 prefix_sz, headers_sz, completion_offset, hflags); 487 else 488 return QWH_ERR; 489} 490 491 492#if !defined(NDEBUG) && __GNUC__ 493__attribute__((weak)) 494#endif 495uint64_t 496lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh) 497{ 498 if (qeh->qeh_enc_sm_out) 499 return qeh->qeh_enc_sm_out->tosend_off; 500 else 501 return 0; 502} 503 504 505size_t 506lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh) 507{ 508 if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out) 509 return lsquic_stream_write_avail(qeh->qeh_enc_sm_out); 510 else if (qeh->qeh_flags & QEH_INITIALIZED) 511 return ~((size_t) 0); /* Unlimited write */ 512 else 513 return 0; 514} 515 516 517size_t 518lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh) 519{ 520 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 521 return qeh->qeh_max_prefix_size; 522 else 523 return LSQPACK_UINT64_ENC_SZ * 2; 524} 525