lsquic_qenc_hdl.c revision 55613f44
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#include "lsquic.h" 14#include "lsquic_types.h" 15#include "lsquic_int_types.h" 16#include "lsquic_sfcw.h" 17#include "lsquic_varint.h" 18#include "lsquic_hq.h" 19#include "lsquic_hash.h" 20#include "lsquic_stream.h" 21#include "lsquic_frab_list.h" 22#include "lsqpack.h" 23#include "lsxpack_header.h" 24#include "lsquic_conn.h" 25#include "lsquic_qenc_hdl.h" 26 27#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL 28#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn) 29#include "lsquic_logger.h" 30 31 32static int 33qeh_write_type (struct qpack_enc_hdl *qeh) 34{ 35 int s; 36 37#ifndef NDEBUG 38 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 39 if (env && atoi(env)) 40 { 41 s = rand() & 3; 42 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 43 } 44 else 45#endif 46 s = 0; 47 48 switch (s) 49 { 50 case 0: 51 return lsquic_frab_list_write(&qeh->qeh_fral, 52 (unsigned char []) { HQUST_QPACK_ENC }, 1); 53 case 1: 54 return lsquic_frab_list_write(&qeh->qeh_fral, 55 (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2); 56 case 2: 57 return lsquic_frab_list_write(&qeh->qeh_fral, 58 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4); 59 default: 60 return lsquic_frab_list_write(&qeh->qeh_fral, 61 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 62 HQUST_QPACK_ENC }, 8); 63 } 64} 65 66 67static void 68qeh_begin_out (struct qpack_enc_hdl *qeh) 69{ 70 if (0 == qeh_write_type(qeh) 71 && (qeh->qeh_tsu_sz == 0 72 || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf, 73 qeh->qeh_tsu_sz))) 74 { 75 LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz); 76 lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1); 77 } 78 else 79 { 80 LSQ_WARN("could not write to frab list"); 81 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 82 "cannot write to frab list"); 83 } 84} 85 86 87void 88lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn) 89{ 90 assert(!(qeh->qeh_flags & QEH_INITIALIZED)); 91 qeh->qeh_conn = conn; 92 lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL); 93 lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn); 94 qeh->qeh_flags |= QEH_INITIALIZED; 95 qeh->qeh_max_prefix_size = 96 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 97 if (qeh->qeh_dec_sm_in) 98 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 99 LSQ_DEBUG("initialized"); 100} 101 102 103int 104lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size, 105 unsigned dyn_table_size, unsigned max_risked_streams, int server) 106{ 107 enum lsqpack_enc_opts enc_opts; 108 109 assert(qeh->qeh_flags & QEH_INITIALIZED); 110 111 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 112 { 113 LSQ_WARN("settings already set"); 114 return -1; 115 } 116 117 enc_opts = LSQPACK_ENC_OPT_STAGE_2 118 | (server ? LSQPACK_ENC_OPT_SERVER : 0); 119 qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf); 120 if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn, 121 max_table_size, dyn_table_size, max_risked_streams, enc_opts, 122 qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz)) 123 { 124 LSQ_INFO("could not initialize QPACK encoder"); 125 return -1; 126 } 127 LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz); 128 qeh->qeh_flags |= QEH_HAVE_SETTINGS; 129 qeh->qeh_max_prefix_size = 130 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 131 LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked " 132 "streams=%u", max_table_size, dyn_table_size, max_risked_streams); 133 if (qeh->qeh_enc_sm_out) 134 qeh_begin_out(qeh); 135 return 0; 136} 137 138 139void 140lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh) 141{ 142 if (qeh->qeh_flags & QEH_INITIALIZED) 143 { 144 LSQ_DEBUG("cleanup"); 145 lsqpack_enc_cleanup(&qeh->qeh_encoder); 146 lsquic_frab_list_cleanup(&qeh->qeh_fral); 147 memset(qeh, 0, sizeof(*qeh)); 148 } 149} 150 151static lsquic_stream_ctx_t * 152qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 153{ 154 struct qpack_enc_hdl *const qeh = stream_if_ctx; 155 qeh->qeh_enc_sm_out = stream; 156 if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 157 == (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 158 qeh_begin_out(qeh); 159 else 160 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 161 LSQ_DEBUG("initialized outgoing encoder stream"); 162 return (void *) qeh; 163} 164 165 166static void 167qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 168{ 169 struct qpack_enc_hdl *const qeh = (void *) ctx; 170 struct lsquic_reader reader = { 171 .lsqr_read = lsquic_frab_list_read, 172 .lsqr_size = lsquic_frab_list_size, 173 .lsqr_ctx = &qeh->qeh_fral, 174 }; 175 ssize_t nw; 176 177 nw = lsquic_stream_writef(stream, &reader); 178 if (nw >= 0) 179 { 180 LSQ_DEBUG("wrote %zd bytes to stream", nw); 181 (void) lsquic_stream_flush(stream); 182 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 183 lsquic_stream_wantwrite(stream, 0); 184 } 185 else 186 { 187 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 188 "cannot write to stream"); 189 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 190 lsquic_stream_wantwrite(stream, 0); 191 } 192} 193 194 195static void 196qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 197{ 198 struct qpack_enc_hdl *const qeh = (void *) ctx; 199 qeh->qeh_enc_sm_out = NULL; 200 LSQ_DEBUG("closed outgoing encoder stream"); 201} 202 203 204static void 205qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 206{ 207 assert(0); 208} 209 210 211static const struct lsquic_stream_if qeh_enc_sm_out_if = 212{ 213 .on_new_stream = qeh_out_on_new, 214 .on_read = qeh_out_on_read, 215 .on_write = qeh_out_on_write, 216 .on_close = qeh_out_on_close, 217}; 218const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if = 219 &qeh_enc_sm_out_if; 220 221 222static lsquic_stream_ctx_t * 223qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 224{ 225 struct qpack_enc_hdl *const qeh = stream_if_ctx; 226 qeh->qeh_dec_sm_in = stream; 227 if (qeh->qeh_flags & QEH_INITIALIZED) 228 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 229 else 230 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 231 LSQ_DEBUG("initialized incoming decoder stream"); 232 return (void *) qeh; 233} 234 235 236static size_t 237qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz, 238 int fin) 239{ 240 struct qpack_enc_hdl *const qeh = (void *) ctx; 241 uint64_t offset; 242 int s; 243 244 if (fin) 245 { 246 LSQ_INFO("decoder stream is closed"); 247 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 248 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream"); 249 goto end; 250 } 251 252 offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in); 253 s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz); 254 if (s != 0) 255 { 256 LSQ_INFO("error reading decoder stream"); 257 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 258 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder " 259 "stream at offset %"PRIu64, offset); 260 goto end; 261 } 262 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 263 264 end: 265 return sz; 266} 267 268 269static void 270qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 271{ 272 struct qpack_enc_hdl *const qeh = (void *) ctx; 273 ssize_t nread; 274 275 nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh); 276 if (nread <= 0) 277 { 278 if (nread < 0) 279 { 280 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 281 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 282 "cannot read from encoder stream"); 283 } 284 else 285 { 286 LSQ_INFO("encoder stream closed by peer: abort connection"); 287 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 288 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 289 } 290 lsquic_stream_wantread(stream, 0); 291 } 292} 293 294 295static void 296qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 297{ 298 struct qpack_enc_hdl *const qeh = (void *) ctx; 299 LSQ_DEBUG("closed incoming decoder stream"); 300 qeh->qeh_dec_sm_in = NULL; 301} 302 303 304static void 305qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 306{ 307 assert(0); 308} 309 310 311static const struct lsquic_stream_if qeh_dec_sm_in_if = 312{ 313 .on_new_stream = qeh_in_on_new, 314 .on_read = qeh_in_on_read, 315 .on_write = qeh_in_on_write, 316 .on_close = qeh_in_on_close, 317}; 318const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if = 319 &qeh_dec_sm_in_if; 320 321 322static enum qwh_status 323qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id, 324 unsigned seqno, const struct lsquic_http_headers *headers, 325 unsigned char *buf, size_t *prefix_sz, size_t *headers_sz, 326 uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags) 327{ 328 unsigned char *p = buf; 329 unsigned char *const end = buf + *headers_sz; 330 const unsigned char *enc_p; 331 size_t enc_sz, hea_sz, total_enc_sz; 332 ssize_t nw; 333 enum lsqpack_enc_status st; 334 int i, s, write_to_stream; 335 enum lsqpack_enc_flags enc_flags; 336 unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ]; 337 338 s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0); 339 if (s != 0) 340 { 341 LSQ_WARN("cannot start header"); 342 return QWH_ERR; 343 } 344 LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id); 345 346 if (qeh->qeh_enc_sm_out) 347 enc_flags = 0; 348 else 349 { 350 enc_flags = LQEF_NO_INDEX; 351 LSQ_DEBUG("encoder stream is unavailable, won't index headers"); 352 } 353 write_to_stream = qeh->qeh_enc_sm_out 354 && lsquic_frab_list_empty(&qeh->qeh_fral); 355 total_enc_sz = 0; 356 for (i = 0; i < headers->count; ++i) 357 { 358 if (headers->headers[i].buf == NULL) 359 continue; 360 enc_sz = sizeof(enc_buf); 361 hea_sz = end - p; 362 st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p, 363 &hea_sz, &headers->headers[i], enc_flags); 364 switch (st) 365 { 366 case LQES_OK: 367 LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, " 368 "%zd bytes to encoder stream", 369 (int) headers->headers[i].name_len, 370 lsxpack_header_get_name(&headers->headers[i]), 371 (int) headers->headers[i].val_len, 372 lsxpack_header_get_value(&headers->headers[i]), 373 hea_sz, enc_sz); 374 total_enc_sz += enc_sz; 375 p += hea_sz; 376 if (enc_sz) 377 { 378 if (write_to_stream) 379 { 380 nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz); 381 if ((size_t) nw == enc_sz) 382 break; 383 if (nw < 0) 384 { 385 LSQ_INFO("could not write to encoder stream: %s", 386 strerror(errno)); 387 return QWH_ERR; 388 } 389 write_to_stream = 0; 390 enc_p = enc_buf + (size_t) nw; 391 enc_sz -= (size_t) nw; 392 } 393 else 394 enc_p = enc_buf; 395 if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz)) 396 { 397 LSQ_INFO("could not write to frab list"); 398 return QWH_ERR; 399 } 400 } 401 break; 402 case LQES_NOBUF_HEAD: 403 return QWH_ENOBUF; 404 default: 405 assert(0); 406 return QWH_ERR; 407 case LQES_NOBUF_ENC: 408 LSQ_DEBUG("not enough room to write encoder stream data"); 409 return QWH_ERR; 410 } 411 } 412 413 nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz, 414 *prefix_sz, hflags); 415 if (nw <= 0) 416 { 417 LSQ_WARN("could not end header: %zd", nw); 418 return QWH_ERR; 419 } 420 421 if ((size_t) nw < *prefix_sz) 422 { 423 memmove(buf - nw, buf - *prefix_sz, (size_t) nw); 424 *prefix_sz = (size_t) nw; 425 } 426 *headers_sz = p - buf; 427 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 428 { 429 LSQ_DEBUG("all %zd bytes of encoder stream written out; header block " 430 "is %zd bytes; estimated compression ratio %.3f", total_enc_sz, 431 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 432 return QWH_FULL; 433 } 434 else 435 { 436 *completion_offset = lsquic_qeh_enc_off(qeh) 437 + lsquic_frab_list_size(&qeh->qeh_fral); 438 LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes " 439 "buffered; header block is %zd bytes; estimated compression ratio " 440 "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral), 441 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 442 return QWH_PARTIAL; 443 } 444} 445 446 447#if !defined(NDEBUG) && __GNUC__ 448__attribute__((weak)) 449#endif 450enum qwh_status 451lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh, 452 lsquic_stream_id_t stream_id, unsigned seqno, 453 const struct lsquic_http_headers *headers, unsigned char *buf, 454 size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset, 455 enum lsqpack_enc_header_flags *hflags) 456{ 457 if (qeh->qeh_flags & QEH_INITIALIZED) 458 return qeh_write_headers(qeh, stream_id, seqno, headers, buf, 459 prefix_sz, headers_sz, completion_offset, hflags); 460 else 461 return QWH_ERR; 462} 463 464 465#if !defined(NDEBUG) && __GNUC__ 466__attribute__((weak)) 467#endif 468uint64_t 469lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh) 470{ 471 if (qeh->qeh_enc_sm_out) 472 return qeh->qeh_enc_sm_out->tosend_off; 473 else 474 return 0; 475} 476 477 478size_t 479lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh) 480{ 481 if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out) 482 return lsquic_stream_write_avail(qeh->qeh_enc_sm_out); 483 else if (qeh->qeh_flags & QEH_INITIALIZED) 484 return ~((size_t) 0); /* Unlimited write */ 485 else 486 return 0; 487} 488 489 490size_t 491lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh) 492{ 493 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 494 return qeh->qeh_max_prefix_size; 495 else 496 return LSQPACK_UINT64_ENC_SZ * 2; 497} 498