lsquic_qenc_hdl.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_qenc_hdl.c -- QPACK encoder streams handler 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12 13#include "lsquic.h" 14#include "lsquic_types.h" 15#include "lsquic_int_types.h" 16#include "lsquic_sfcw.h" 17#include "lsquic_varint.h" 18#include "lsquic_hq.h" 19#include "lsquic_hash.h" 20#include "lsquic_stream.h" 21#include "lsquic_frab_list.h" 22#include "lsqpack.h" 23#include "lsquic_conn.h" 24#include "lsquic_qenc_hdl.h" 25 26#define LSQUIC_LOGGER_MODULE LSQLM_QENC_HDL 27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(qeh->qeh_conn) 28#include "lsquic_logger.h" 29 30 31static int 32qeh_write_type (struct qpack_enc_hdl *qeh) 33{ 34 int s; 35 36#ifndef NDEBUG 37 const char *env = getenv("LSQUIC_RND_VARINT_LEN"); 38 if (env && atoi(env)) 39 { 40 s = rand() & 3; 41 LSQ_DEBUG("writing %d-byte stream type", 1 << s); 42 } 43 else 44#endif 45 s = 0; 46 47 switch (s) 48 { 49 case 0: 50 return lsquic_frab_list_write(&qeh->qeh_fral, 51 (unsigned char []) { HQUST_QPACK_ENC }, 1); 52 case 1: 53 return lsquic_frab_list_write(&qeh->qeh_fral, 54 (unsigned char []) { 0x40, HQUST_QPACK_ENC }, 2); 55 case 2: 56 return lsquic_frab_list_write(&qeh->qeh_fral, 57 (unsigned char []) { 0x80, 0x00, 0x00, HQUST_QPACK_ENC }, 4); 58 default: 59 return lsquic_frab_list_write(&qeh->qeh_fral, 60 (unsigned char []) { 0xC0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 61 HQUST_QPACK_ENC }, 8); 62 } 63} 64 65 66static void 67qeh_begin_out (struct qpack_enc_hdl *qeh) 68{ 69 if (0 == qeh_write_type(qeh) 70 && (qeh->qeh_tsu_sz == 0 71 || 0 == lsquic_frab_list_write(&qeh->qeh_fral, qeh->qeh_tsu_buf, 72 qeh->qeh_tsu_sz))) 73 { 74 LSQ_DEBUG("wrote %zu bytes to frab list", 1 + qeh->qeh_tsu_sz); 75 lsquic_stream_wantwrite(qeh->qeh_enc_sm_out, 1); 76 } 77 else 78 { 79 LSQ_WARN("could not write to frab list"); 80 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 81 "cannot write to frab list"); 82 } 83} 84 85 86void 87lsquic_qeh_init (struct qpack_enc_hdl *qeh, struct lsquic_conn *conn) 88{ 89 assert(!(qeh->qeh_flags & QEH_INITIALIZED)); 90 qeh->qeh_conn = conn; 91 lsquic_frab_list_init(&qeh->qeh_fral, 0x400, NULL, NULL, NULL); 92 lsqpack_enc_preinit(&qeh->qeh_encoder, (void *) conn); 93 qeh->qeh_flags |= QEH_INITIALIZED; 94 qeh->qeh_max_prefix_size = 95 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 96 if (qeh->qeh_dec_sm_in) 97 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 98 LSQ_DEBUG("initialized"); 99} 100 101 102int 103lsquic_qeh_settings (struct qpack_enc_hdl *qeh, unsigned max_table_size, 104 unsigned dyn_table_size, unsigned max_risked_streams, int server) 105{ 106 enum lsqpack_enc_opts enc_opts; 107 108 assert(qeh->qeh_flags & QEH_INITIALIZED); 109 110 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 111 { 112 LSQ_WARN("settings already set"); 113 return -1; 114 } 115 116 enc_opts = LSQPACK_ENC_OPT_STAGE_2 117 | (server ? LSQPACK_ENC_OPT_SERVER : 0); 118 qeh->qeh_tsu_sz = sizeof(qeh->qeh_tsu_buf); 119 if (0 != lsqpack_enc_init(&qeh->qeh_encoder, (void *) qeh->qeh_conn, 120 max_table_size, dyn_table_size, max_risked_streams, enc_opts, 121 qeh->qeh_tsu_buf, &qeh->qeh_tsu_sz)) 122 { 123 LSQ_INFO("could not initialize QPACK encoder"); 124 return -1; 125 } 126 LSQ_DEBUG("%zu-byte post-init TSU", qeh->qeh_tsu_sz); 127 qeh->qeh_flags |= QEH_HAVE_SETTINGS; 128 qeh->qeh_max_prefix_size = 129 lsqpack_enc_header_block_prefix_size(&qeh->qeh_encoder); 130 LSQ_DEBUG("have settings: max table size=%u; dyn table size=%u; max risked " 131 "streams=%u", max_table_size, dyn_table_size, max_risked_streams); 132 if (qeh->qeh_enc_sm_out) 133 qeh_begin_out(qeh); 134 return 0; 135} 136 137 138void 139lsquic_qeh_cleanup (struct qpack_enc_hdl *qeh) 140{ 141 if (qeh->qeh_flags & QEH_INITIALIZED) 142 { 143 LSQ_DEBUG("cleanup"); 144 lsqpack_enc_cleanup(&qeh->qeh_encoder); 145 lsquic_frab_list_cleanup(&qeh->qeh_fral); 146 memset(qeh, 0, sizeof(*qeh)); 147 } 148} 149 150static lsquic_stream_ctx_t * 151qeh_out_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 152{ 153 struct qpack_enc_hdl *const qeh = stream_if_ctx; 154 qeh->qeh_enc_sm_out = stream; 155 if ((qeh->qeh_flags & (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 156 == (QEH_INITIALIZED|QEH_HAVE_SETTINGS)) 157 qeh_begin_out(qeh); 158 else 159 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 160 LSQ_DEBUG("initialized outgoing encoder stream"); 161 return (void *) qeh; 162} 163 164 165static void 166qeh_out_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 167{ 168 struct qpack_enc_hdl *const qeh = (void *) ctx; 169 struct lsquic_reader reader = { 170 .lsqr_read = lsquic_frab_list_read, 171 .lsqr_size = lsquic_frab_list_size, 172 .lsqr_ctx = &qeh->qeh_fral, 173 }; 174 ssize_t nw; 175 176 nw = lsquic_stream_writef(stream, &reader); 177 if (nw >= 0) 178 { 179 LSQ_DEBUG("wrote %zd bytes to stream", nw); 180 (void) lsquic_stream_flush(stream); 181 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 182 lsquic_stream_wantwrite(stream, 0); 183 } 184 else 185 { 186 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 187 "cannot write to stream"); 188 LSQ_WARN("cannot write to stream: %s", strerror(errno)); 189 lsquic_stream_wantwrite(stream, 0); 190 } 191} 192 193 194static void 195qeh_out_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 196{ 197 struct qpack_enc_hdl *const qeh = (void *) ctx; 198 qeh->qeh_enc_sm_out = NULL; 199 LSQ_DEBUG("closed outgoing encoder stream"); 200} 201 202 203static void 204qeh_out_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 205{ 206 assert(0); 207} 208 209 210static const struct lsquic_stream_if qeh_enc_sm_out_if = 211{ 212 .on_new_stream = qeh_out_on_new, 213 .on_read = qeh_out_on_read, 214 .on_write = qeh_out_on_write, 215 .on_close = qeh_out_on_close, 216}; 217const struct lsquic_stream_if *const lsquic_qeh_enc_sm_out_if = 218 &qeh_enc_sm_out_if; 219 220 221static lsquic_stream_ctx_t * 222qeh_in_on_new (void *stream_if_ctx, struct lsquic_stream *stream) 223{ 224 struct qpack_enc_hdl *const qeh = stream_if_ctx; 225 qeh->qeh_dec_sm_in = stream; 226 if (qeh->qeh_flags & QEH_INITIALIZED) 227 lsquic_stream_wantread(qeh->qeh_dec_sm_in, 1); 228 else 229 qeh->qeh_conn = lsquic_stream_conn(stream); /* Or NULL deref in log */ 230 LSQ_DEBUG("initialized incoming decoder stream"); 231 return (void *) qeh; 232} 233 234 235static size_t 236qeh_read_decoder_stream (void *ctx, const unsigned char *buf, size_t sz, 237 int fin) 238{ 239 struct qpack_enc_hdl *const qeh = (void *) ctx; 240 uint64_t offset; 241 int s; 242 243 if (fin) 244 { 245 LSQ_INFO("decoder stream is closed"); 246 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 247 HEC_CLOSED_CRITICAL_STREAM, "Peer closed QPACK decoder stream"); 248 goto end; 249 } 250 251 offset = lsquic_stream_read_offset(qeh->qeh_dec_sm_in); 252 s = lsqpack_enc_decoder_in(&qeh->qeh_encoder, buf, sz); 253 if (s != 0) 254 { 255 LSQ_INFO("error reading decoder stream"); 256 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 257 HEC_QPACK_DECODER_STREAM_ERROR, "Error interpreting QPACK decoder " 258 "stream at offset %"PRIu64, offset); 259 goto end; 260 } 261 LSQ_DEBUG("successfully fed %zu bytes to QPACK decoder", sz); 262 263 end: 264 return sz; 265} 266 267 268static void 269qeh_in_on_read (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 270{ 271 struct qpack_enc_hdl *const qeh = (void *) ctx; 272 ssize_t nread; 273 274 nread = lsquic_stream_readf(stream, qeh_read_decoder_stream, qeh); 275 if (nread <= 0) 276 { 277 if (nread < 0) 278 { 279 LSQ_WARN("cannot read from encoder stream: %s", strerror(errno)); 280 qeh->qeh_conn->cn_if->ci_internal_error(qeh->qeh_conn, 281 "cannot read from encoder stream"); 282 } 283 else 284 { 285 LSQ_INFO("encoder stream closed by peer: abort connection"); 286 qeh->qeh_conn->cn_if->ci_abort_error(qeh->qeh_conn, 1, 287 HEC_CLOSED_CRITICAL_STREAM, "encoder stream closed"); 288 } 289 lsquic_stream_wantread(stream, 0); 290 } 291} 292 293 294static void 295qeh_in_on_close (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 296{ 297 struct qpack_enc_hdl *const qeh = (void *) ctx; 298 LSQ_DEBUG("closed incoming decoder stream"); 299 qeh->qeh_dec_sm_in = NULL; 300} 301 302 303static void 304qeh_in_on_write (struct lsquic_stream *stream, lsquic_stream_ctx_t *ctx) 305{ 306 assert(0); 307} 308 309 310static const struct lsquic_stream_if qeh_dec_sm_in_if = 311{ 312 .on_new_stream = qeh_in_on_new, 313 .on_read = qeh_in_on_read, 314 .on_write = qeh_in_on_write, 315 .on_close = qeh_in_on_close, 316}; 317const struct lsquic_stream_if *const lsquic_qeh_dec_sm_in_if = 318 &qeh_dec_sm_in_if; 319 320 321static enum qwh_status 322qeh_write_headers (struct qpack_enc_hdl *qeh, lsquic_stream_id_t stream_id, 323 unsigned seqno, const struct lsquic_http_headers *headers, 324 unsigned char *buf, size_t *prefix_sz, size_t *headers_sz, 325 uint64_t *completion_offset, enum lsqpack_enc_header_flags *hflags) 326{ 327 unsigned char *p = buf; 328 unsigned char *const end = buf + *headers_sz; 329 const unsigned char *enc_p; 330 size_t enc_sz, hea_sz, total_enc_sz; 331 ssize_t nw; 332 enum lsqpack_enc_status st; 333 int i, s, write_to_stream; 334 enum lsqpack_enc_flags enc_flags; 335 unsigned char enc_buf[ qeh->qeh_encoder.qpe_cur_max_capacity * 2 ]; 336 337 s = lsqpack_enc_start_header(&qeh->qeh_encoder, stream_id, 0); 338 if (s != 0) 339 { 340 LSQ_WARN("cannot start header"); 341 return QWH_ERR; 342 } 343 LSQ_DEBUG("begin encoding headers for stream %"PRIu64, stream_id); 344 345 if (qeh->qeh_enc_sm_out) 346 enc_flags = 0; 347 else 348 { 349 enc_flags = LQEF_NO_INDEX; 350 LSQ_DEBUG("encoder stream is unavailable, won't index headers"); 351 } 352 write_to_stream = qeh->qeh_enc_sm_out 353 && lsquic_frab_list_empty(&qeh->qeh_fral); 354 total_enc_sz = 0; 355 for (i = 0; i < headers->count; ++i) 356 { 357 enc_sz = sizeof(enc_buf); 358 hea_sz = end - p; 359 st = lsqpack_enc_encode(&qeh->qeh_encoder, enc_buf, &enc_sz, p, 360 &hea_sz, headers->headers[i].name.iov_base, 361 headers->headers[i].name.iov_len, 362 headers->headers[i].value.iov_base, 363 headers->headers[i].value.iov_len, enc_flags); 364 switch (st) 365 { 366 case LQES_OK: 367 LSQ_DEBUG("encoded `%.*s': `%.*s' -- %zd bytes to header block, " 368 "%zd bytes to encoder stream", 369 (int) headers->headers[i].name.iov_len, 370 (char *) headers->headers[i].name.iov_base, 371 (int) headers->headers[i].value.iov_len, 372 (char *) headers->headers[i].value.iov_base, 373 hea_sz, enc_sz); 374 total_enc_sz += enc_sz; 375 p += hea_sz; 376 if (enc_sz) 377 { 378 if (write_to_stream) 379 { 380 nw = lsquic_stream_write(qeh->qeh_enc_sm_out, enc_buf, enc_sz); 381 if ((size_t) nw == enc_sz) 382 break; 383 if (nw < 0) 384 { 385 LSQ_INFO("could not write to encoder stream: %s", 386 strerror(errno)); 387 return QWH_ERR; 388 } 389 write_to_stream = 0; 390 enc_p = enc_buf + (size_t) nw; 391 enc_sz -= (size_t) nw; 392 } 393 else 394 enc_p = enc_buf; 395 if (0 != lsquic_frab_list_write(&qeh->qeh_fral, enc_p, enc_sz)) 396 { 397 LSQ_INFO("could not write to frab list"); 398 return QWH_ERR; 399 } 400 } 401 break; 402 case LQES_NOBUF_HEAD: 403 return QWH_ENOBUF; 404 default: 405 assert(0); 406 return QWH_ERR; 407 case LQES_NOBUF_ENC: 408 LSQ_DEBUG("not enough room to write encoder stream data"); 409 return QWH_ERR; 410 } 411 } 412 413 nw = lsqpack_enc_end_header(&qeh->qeh_encoder, buf - *prefix_sz, 414 *prefix_sz, hflags); 415 if (nw <= 0) 416 { 417 LSQ_WARN("could not end header: %zd", nw); 418 return QWH_ERR; 419 } 420 421 if ((size_t) nw < *prefix_sz) 422 { 423 memmove(buf - nw, buf - *prefix_sz, (size_t) nw); 424 *prefix_sz = (size_t) nw; 425 } 426 *headers_sz = p - buf; 427 if (lsquic_frab_list_empty(&qeh->qeh_fral)) 428 { 429 LSQ_DEBUG("all %zd bytes of encoder stream written out; header block " 430 "is %zd bytes; estimated compression ratio %.3f", total_enc_sz, 431 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 432 return QWH_FULL; 433 } 434 else 435 { 436 *completion_offset = lsquic_qeh_enc_off(qeh) 437 + lsquic_frab_list_size(&qeh->qeh_fral); 438 LSQ_DEBUG("not all %zd bytes of encoder stream written out; %zd bytes " 439 "buffered; header block is %zd bytes; estimated compression ratio " 440 "%.3f", total_enc_sz, lsquic_frab_list_size(&qeh->qeh_fral), 441 *headers_sz, lsqpack_enc_ratio(&qeh->qeh_encoder)); 442 return QWH_PARTIAL; 443 } 444} 445 446 447#if !defined(NDEBUG) && __GNUC__ 448__attribute__((weak)) 449#endif 450enum qwh_status 451lsquic_qeh_write_headers (struct qpack_enc_hdl *qeh, 452 lsquic_stream_id_t stream_id, unsigned seqno, 453 const struct lsquic_http_headers *headers, unsigned char *buf, 454 size_t *prefix_sz, size_t *headers_sz, uint64_t *completion_offset, 455 enum lsqpack_enc_header_flags *hflags) 456{ 457 if (qeh->qeh_flags & QEH_INITIALIZED) 458 return qeh_write_headers(qeh, stream_id, seqno, headers, buf, 459 prefix_sz, headers_sz, completion_offset, hflags); 460 else 461 return QWH_ERR; 462} 463 464 465#if !defined(NDEBUG) && __GNUC__ 466__attribute__((weak)) 467#endif 468uint64_t 469lsquic_qeh_enc_off (struct qpack_enc_hdl *qeh) 470{ 471 if (qeh->qeh_enc_sm_out) 472 return qeh->qeh_enc_sm_out->tosend_off; 473 else 474 return 0; 475} 476 477 478size_t 479lsquic_qeh_write_avail (struct qpack_enc_hdl *qeh) 480{ 481 if ((qeh->qeh_flags & QEH_INITIALIZED) && qeh->qeh_enc_sm_out) 482 return lsquic_stream_write_avail(qeh->qeh_enc_sm_out); 483 else if (qeh->qeh_flags & QEH_INITIALIZED) 484 return ~((size_t) 0); /* Unlimited write */ 485 else 486 return 0; 487} 488 489 490size_t 491lsquic_qeh_max_prefix_size (const struct qpack_enc_hdl *qeh) 492{ 493 if (qeh->qeh_flags & QEH_HAVE_SETTINGS) 494 return qeh->qeh_max_prefix_size; 495 else 496 return LSQPACK_UINT64_ENC_SZ * 2; 497} 498