lsquic_headers_stream.c revision a74702c6
1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * HEADERS stream logic 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdarg.h> 10#include <stdlib.h> 11#include <string.h> 12#include <sys/queue.h> 13 14#include "lsquic.h" 15#include "lsquic_types.h" 16#include "lsquic_int_types.h" 17#include "lsquic_frame_common.h" 18#include "lsquic_frame_reader.h" 19#include "lsquic_frame_writer.h" 20#include "lsquic_mm.h" 21#include "lsquic_engine_public.h" 22#include "lshpack.h" 23 24#include "lsquic_headers_stream.h" 25 26#define MAX_HEADERS_SIZE (64 * 1024) 27#define MAX_HEADER_TABLE_SIZE (512 * 1024) 28 29#define LSQUIC_LOGGER_MODULE LSQLM_HEADERS 30#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(\ 31 lsquic_stream_conn(hs->hs_stream)) 32#include "lsquic_logger.h" 33 34static const struct frame_reader_callbacks *frame_callbacks_ptr; 35 36struct headers_stream 37{ 38 struct lsquic_stream *hs_stream; 39 struct lsquic_frame_reader *hs_fr; 40 struct lsquic_frame_writer *hs_fw; 41 const struct headers_stream_callbacks 42 *hs_callbacks; 43 void *hs_cb_ctx; 44 struct lshpack_enc hs_henc; 45 struct lshpack_dec hs_hdec; 46 enum { 47 HS_IS_SERVER = (1 << 0), 48 HS_HENC_INITED = (1 << 1), 49 } hs_flags; 50 struct lsquic_engine_public *hs_enpub; 51#if LSQUIC_CONN_STATS 52 struct conn_stats *hs_conn_stats; 53#endif 54}; 55 56 57int 58lsquic_headers_stream_send_settings (struct headers_stream *hs, 59 const struct lsquic_http2_setting *settings, unsigned n_settings) 60{ 61 if (0 == lsquic_frame_writer_write_settings(hs->hs_fw, settings, 62 n_settings)) 63 { 64 lsquic_stream_wantwrite(hs->hs_stream, 65 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 66 return 0; 67 } 68 else 69 { 70 LSQ_WARN("Could not write settings to stream: %s", 71 strerror(errno)); 72 return -1; 73 } 74} 75 76 77static lsquic_stream_ctx_t * 78headers_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) 79{ 80 struct headers_stream *hs = stream_if_ctx; 81 82 lshpack_dec_init(&hs->hs_hdec); 83 if (0 != lshpack_enc_init(&hs->hs_henc)) 84 { 85 LSQ_WARN("could not initialize HPACK encoder: %s", strerror(errno)); 86 return NULL; 87 } 88 (void) lshpack_enc_use_hist(&hs->hs_henc, 1); 89 hs->hs_flags |= HS_HENC_INITED; 90 hs->hs_stream = stream; 91 LSQ_DEBUG("stream created"); 92 hs->hs_fr = lsquic_frame_reader_new((hs->hs_flags & HS_IS_SERVER) ? FRF_SERVER : 0, 93 MAX_HEADERS_SIZE, &hs->hs_enpub->enp_mm, 94 stream, lsquic_stream_read, &hs->hs_hdec, 95 frame_callbacks_ptr, hs, 96#if LSQUIC_CONN_STATS 97 hs->hs_conn_stats, 98#endif 99 hs->hs_enpub->enp_hsi_if, hs->hs_enpub->enp_hsi_ctx); 100 if (!hs->hs_fr) 101 { 102 LSQ_WARN("could not create frame reader: %s", strerror(errno)); 103 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 104 return NULL; 105 } 106 hs->hs_fw = lsquic_frame_writer_new(&hs->hs_enpub->enp_mm, stream, 0, 107 &hs->hs_henc, lsquic_stream_writef, 108#if LSQUIC_CONN_STATS 109 hs->hs_conn_stats, 110#endif 111 (hs->hs_flags & HS_IS_SERVER)); 112 if (!hs->hs_fw) 113 { 114 LSQ_WARN("could not create frame writer: %s", strerror(errno)); 115 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 116 return NULL; 117 } 118 lsquic_stream_wantread(stream, 1); 119 return (lsquic_stream_ctx_t *) hs; 120} 121 122 123static void 124headers_on_read (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 125{ 126 struct headers_stream *hs = (struct headers_stream *) ctx; 127 if (0 != lsquic_frame_reader_read(hs->hs_fr)) 128 { 129 LSQ_ERROR("frame reader failed"); 130 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 131 } 132} 133 134 135static void 136headers_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 137{ 138 struct headers_stream *hs = (struct headers_stream *) ctx; 139 assert(lsquic_frame_writer_have_leftovers(hs->hs_fw)); 140 int s = lsquic_frame_writer_flush(hs->hs_fw); 141 if (0 == s) 142 { 143 LSQ_DEBUG("flushed"); 144 lsquic_stream_wantwrite(stream, 145 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 146 } 147 else 148 { 149 LSQ_WARN("Error writing to stream: %s", strerror(errno)); 150 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 151 } 152} 153 154 155static void 156headers_on_close (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 157{ 158 struct headers_stream *hs = (struct headers_stream *) ctx; 159 LSQ_DEBUG("stream closed"); 160} 161 162 163int 164lsquic_headers_stream_send_headers (struct headers_stream *hs, 165 lsquic_stream_id_t stream_id, const struct lsquic_http_headers *headers, 166 int eos, unsigned weight) 167{ 168 LSQ_DEBUG("received compressed headers to send"); 169 int s; 170 s = lsquic_frame_writer_write_headers(hs->hs_fw, stream_id, headers, eos, 171 weight); 172 if (0 == s) 173 { 174 lsquic_stream_wantwrite(hs->hs_stream, 175 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 176 } 177 else 178 LSQ_INFO("Error writing headers: %s", strerror(errno)); 179 return s; 180} 181 182 183int 184lsquic_headers_stream_send_priority (struct headers_stream *hs, 185 lsquic_stream_id_t stream_id, int exclusive, 186 lsquic_stream_id_t dep_stream_id, unsigned weight) 187{ 188 LSQ_DEBUG("received priority to send"); 189 int s; 190 if (stream_id == dep_stream_id) 191 { 192 LSQ_INFO("stream cannot depend on itself"); /* RFC 7540, Sec. 5.3.1. */ 193 return -1; 194 } 195 s = lsquic_frame_writer_write_priority(hs->hs_fw, stream_id, exclusive, 196 dep_stream_id, weight); 197 if (0 == s) 198 { 199 lsquic_stream_wantwrite(hs->hs_stream, 200 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 201 } 202 else 203 LSQ_INFO("Error writing priority frame: %s", strerror(errno)); 204 return s; 205} 206 207 208struct headers_stream * 209lsquic_headers_stream_new (int is_server, struct lsquic_engine_public *enpub, 210 const struct headers_stream_callbacks *callbacks, 211#if LSQUIC_CONN_STATS 212 struct conn_stats *conn_stats, 213#endif 214 void *cb_ctx) 215{ 216 struct headers_stream *hs = calloc(1, sizeof(*hs)); 217 if (!hs) 218 return NULL; 219 hs->hs_callbacks = callbacks; 220 hs->hs_cb_ctx = cb_ctx; 221 if (is_server) 222 hs->hs_flags = HS_IS_SERVER; 223 else 224 hs->hs_flags = 0; 225 hs->hs_enpub = enpub; 226#if LSQUIC_CONN_STATS 227 hs->hs_conn_stats= conn_stats; 228#endif 229 return hs; 230} 231 232 233void 234lsquic_headers_stream_destroy (struct headers_stream *hs) 235{ 236 if (hs->hs_fr) 237 lsquic_frame_reader_destroy(hs->hs_fr); 238 if (hs->hs_fw) 239 lsquic_frame_writer_destroy(hs->hs_fw); 240 if (hs->hs_flags & HS_HENC_INITED) 241 lshpack_enc_cleanup(&hs->hs_henc); 242 lshpack_dec_cleanup(&hs->hs_hdec); 243 free(hs); 244} 245 246 247static const struct lsquic_stream_if headers_stream_if = 248{ 249 .on_new_stream = headers_on_new_stream, 250 .on_read = headers_on_read, 251 .on_write = headers_on_write, 252 .on_close = headers_on_close, 253}; 254 255 256const struct lsquic_stream_if *const lsquic_headers_stream_if = 257 &headers_stream_if; 258 259 260static void 261headers_on_incoming_headers (void *ctx, struct uncompressed_headers *uh) 262{ 263 struct headers_stream *hs = ctx; 264 hs->hs_callbacks->hsc_on_headers(hs->hs_cb_ctx, uh); 265} 266 267 268static void 269headers_on_push_promise (void *ctx, struct uncompressed_headers *uh) 270{ 271 struct headers_stream *hs = ctx; 272 hs->hs_callbacks->hsc_on_push_promise(hs->hs_cb_ctx, uh); 273} 274 275 276static void 277headers_on_priority (void *ctx, lsquic_stream_id_t stream_id, int exclusive, 278 lsquic_stream_id_t dep_stream_id, unsigned weight) 279{ 280 struct headers_stream *hs = ctx; 281 hs->hs_callbacks->hsc_on_priority(hs->hs_cb_ctx, stream_id, exclusive, 282 dep_stream_id, weight); 283} 284 285 286static void 287headers_on_error (void *ctx, lsquic_stream_id_t stream_id, 288 enum frame_reader_error err) 289{ 290 struct headers_stream *hs = ctx; 291 switch (err) 292 { 293 case FR_ERR_BAD_HEADER: 294 case FR_ERR_DECOMPRESS: 295 case FR_ERR_SELF_DEP_STREAM: 296 LSQ_INFO("error %u is a stream error (stream %"PRIu64")", err, 297 stream_id); 298 hs->hs_callbacks->hsc_on_stream_error(hs->hs_cb_ctx, stream_id); 299 break; 300 case FR_ERR_INVALID_FRAME_SIZE: 301 case FR_ERR_NONZERO_STREAM_ID: 302 case FR_ERR_UNEXPECTED_PUSH: 303 case FR_ERR_ZERO_STREAM_ID: 304 case FR_ERR_EXPECTED_CONTIN: 305 case FR_ERR_OTHER_ERROR: 306 LSQ_INFO("error %u is a connection error (stream %"PRIu64")", err, 307 stream_id); 308 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 309 break; 310 } 311} 312 313 314static void 315headers_on_settings (void *ctx, uint16_t setting_id, uint32_t setting_value) 316{ 317 struct headers_stream *hs = ctx; 318 switch (setting_id) 319 { 320 case SETTINGS_HEADER_TABLE_SIZE: 321 if (setting_value > MAX_HEADER_TABLE_SIZE) 322 { 323 LSQ_INFO("tried to update table size to %u, which is larger than " 324 "allowed maximum of %u bytes", setting_value, 325 MAX_HEADER_TABLE_SIZE); 326 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 327 } 328 else 329 { 330 LSQ_INFO("update hpack table size to %u", setting_value); 331 lshpack_enc_set_max_capacity(&hs->hs_henc, setting_value); 332 } 333 break; 334 case SETTINGS_MAX_HEADER_LIST_SIZE: 335 LSQ_INFO("set max header list size to %u", setting_value); 336 lsquic_frame_writer_max_header_list_size(hs->hs_fw, setting_value); 337 break; 338 case SETTINGS_ENABLE_PUSH: 339 LSQ_INFO("got setting enable_push: %u", setting_value); 340 if (hs->hs_flags & HS_IS_SERVER) 341 { 342 if (setting_value <= 1) 343 hs->hs_callbacks->hsc_on_enable_push(hs->hs_cb_ctx, 344 setting_value); 345 else 346 { 347 LSQ_INFO("invalid value of enable_push"); 348 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 349 } 350 } 351 else 352 { 353 LSQ_INFO("it is an error to receive enable_push setting in " 354 "client mode"); 355 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 356 } 357 break; 358 case SETTINGS_MAX_CONCURRENT_STREAMS: 359 case SETTINGS_INITIAL_WINDOW_SIZE: 360 case SETTINGS_MAX_FRAME_SIZE: 361 /* [draft-ietf-quic-http-00], Section 3 */ 362 LSQ_INFO("Specifying setting 0x%X is a QUIC error", setting_id); 363 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 364 break; 365 default: 366 LSQ_INFO("Ignoring unknown setting 0x%X; value 0x%X", setting_id, 367 setting_value); 368 break; 369 } 370} 371 372 373int 374lsquic_headers_stream_push_promise (struct headers_stream *hs, 375 lsquic_stream_id_t stream_id64, lsquic_stream_id_t promised_stream_id64, 376 const struct lsquic_http_headers *headers) 377{ 378 uint32_t stream_id = stream_id64; 379 uint32_t promised_stream_id = promised_stream_id64; 380 int s; 381 LSQ_DEBUG("promising stream %u in response to stream %u", 382 promised_stream_id, stream_id); 383 s = lsquic_frame_writer_write_promise(hs->hs_fw, stream_id, 384 promised_stream_id, headers); 385 if (0 == s) 386 { 387 lsquic_stream_wantwrite(hs->hs_stream, 388 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 389 } 390 else 391 LSQ_INFO("Error writing push promise: %s", strerror(errno)); 392 return s; 393} 394 395 396size_t 397lsquic_headers_stream_mem_used (const struct headers_stream *hs) 398{ 399 size_t size; 400 401 size = sizeof(*hs); 402 size += lsquic_frame_reader_mem_used(hs->hs_fr); 403 size += lsquic_frame_writer_mem_used(hs->hs_fw); 404 /* XXX: does not cover HPACK encoder and HPACK decoder */ 405 406 return size; 407} 408 409 410struct lsquic_stream * 411lsquic_headers_stream_get_stream (const struct headers_stream *hs) 412{ 413 return hs->hs_stream; 414} 415 416 417static const struct frame_reader_callbacks frame_callbacks = { 418 .frc_on_headers = headers_on_incoming_headers, 419 .frc_on_push_promise = headers_on_push_promise, 420 .frc_on_error = headers_on_error, 421 .frc_on_settings = headers_on_settings, 422 .frc_on_priority = headers_on_priority, 423}; 424 425static const struct frame_reader_callbacks *frame_callbacks_ptr = &frame_callbacks; 426 427