lsquic_headers_stream.c revision 3b55e6ae
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * HEADERS stream logic 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <stdarg.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12#ifdef MSVC 13#include <vc_compat.h> 14#endif 15 16#include "lsquic.h" 17#include "lsquic_types.h" 18#include "lsquic_int_types.h" 19#include "lsquic_frame_common.h" 20#include "lsquic_frame_reader.h" 21#include "lsquic_frame_writer.h" 22#include "lsquic_mm.h" 23#include "lsquic_engine_public.h" 24#include "lshpack.h" 25 26#include "lsquic_headers_stream.h" 27 28#define MAX_HEADERS_SIZE (64 * 1024) 29#define MAX_HEADER_TABLE_SIZE (512 * 1024) 30 31#define LSQUIC_LOGGER_MODULE LSQLM_HEADERS 32#define LSQUIC_LOG_CONN_ID lsquic_conn_id(lsquic_stream_conn(hs->hs_stream)) 33#include "lsquic_logger.h" 34 35static const struct frame_reader_callbacks *frame_callbacks_ptr; 36 37struct headers_stream 38{ 39 struct lsquic_stream *hs_stream; 40 struct lsquic_frame_reader *hs_fr; 41 struct lsquic_frame_writer *hs_fw; 42 const struct headers_stream_callbacks 43 *hs_callbacks; 44 void *hs_cb_ctx; 45 struct lshpack_enc hs_henc; 46 struct lshpack_dec hs_hdec; 47 enum { 48 HS_IS_SERVER = (1 << 0), 49 HS_HENC_INITED = (1 << 1), 50 } hs_flags; 51 struct lsquic_engine_public *hs_enpub; 52}; 53 54 55int 56lsquic_headers_stream_send_settings (struct headers_stream *hs, 57 const struct lsquic_http2_setting *settings, unsigned n_settings) 58{ 59 if (0 == lsquic_frame_writer_write_settings(hs->hs_fw, settings, 60 n_settings)) 61 { 62 lsquic_stream_wantwrite(hs->hs_stream, 63 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 64 return 0; 65 } 66 else 67 { 68 LSQ_WARN("Could not write settings to stream: %s", 69 strerror(errno)); 70 return -1; 71 } 72} 73 74 75static lsquic_stream_ctx_t * 76headers_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) 77{ 78 struct headers_stream *hs = stream_if_ctx; 79 lshpack_dec_init(&hs->hs_hdec); 80 if (0 != lshpack_enc_init(&hs->hs_henc)) 81 { 82 LSQ_WARN("could not initialize HPACK encoder: %s", strerror(errno)); 83 return NULL; 84 } 85 hs->hs_flags |= HS_HENC_INITED; 86 hs->hs_stream = stream; 87 LSQ_DEBUG("stream created"); 88 hs->hs_fr = lsquic_frame_reader_new((hs->hs_flags & HS_IS_SERVER) ? FRF_SERVER : 0, 89 MAX_HEADERS_SIZE, &hs->hs_enpub->enp_mm, 90 stream, lsquic_stream_read, &hs->hs_hdec, 91 frame_callbacks_ptr, hs, 92 hs->hs_enpub->enp_hsi_if, hs->hs_enpub->enp_hsi_ctx); 93 if (!hs->hs_fr) 94 { 95 LSQ_WARN("could not create frame reader: %s", strerror(errno)); 96 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 97 return NULL; 98 } 99 hs->hs_fw = lsquic_frame_writer_new(&hs->hs_enpub->enp_mm, stream, 0, 100 &hs->hs_henc, lsquic_stream_write, (hs->hs_flags & HS_IS_SERVER)); 101 if (!hs->hs_fw) 102 { 103 LSQ_WARN("could not create frame writer: %s", strerror(errno)); 104 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 105 return NULL; 106 } 107 lsquic_stream_wantread(stream, 1); 108 return (lsquic_stream_ctx_t *) hs; 109} 110 111 112static void 113headers_on_read (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 114{ 115 struct headers_stream *hs = (struct headers_stream *) ctx; 116 if (0 != lsquic_frame_reader_read(hs->hs_fr)) 117 { 118 LSQ_ERROR("frame reader failed"); 119 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 120 } 121} 122 123 124static void 125headers_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 126{ 127 struct headers_stream *hs = (struct headers_stream *) ctx; 128 assert(lsquic_frame_writer_have_leftovers(hs->hs_fw)); 129 int s = lsquic_frame_writer_flush(hs->hs_fw); 130 if (0 == s) 131 { 132 LSQ_DEBUG("flushed"); 133 lsquic_stream_wantwrite(stream, 134 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 135 } 136 else 137 { 138 LSQ_WARN("Error writing to stream: %s", strerror(errno)); 139 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 140 } 141} 142 143 144static void 145headers_on_close (lsquic_stream_t *stream, struct lsquic_stream_ctx *ctx) 146{ 147 struct headers_stream *hs = (struct headers_stream *) ctx; 148 LSQ_DEBUG("stream closed"); 149} 150 151 152int 153lsquic_headers_stream_send_headers (struct headers_stream *hs, 154 uint32_t stream_id, const struct lsquic_http_headers *headers, int eos, 155 unsigned weight) 156{ 157 LSQ_DEBUG("received compressed headers to send"); 158 int s; 159 s = lsquic_frame_writer_write_headers(hs->hs_fw, stream_id, headers, eos, 160 weight); 161 if (0 == s) 162 { 163 lsquic_stream_wantwrite(hs->hs_stream, 164 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 165 } 166 else 167 LSQ_INFO("Error writing headers: %s", strerror(errno)); 168 return s; 169} 170 171 172int 173lsquic_headers_stream_send_priority (struct headers_stream *hs, 174 uint32_t stream_id, int exclusive, uint32_t dep_stream_id, unsigned weight) 175{ 176 LSQ_DEBUG("received priority to send"); 177 int s; 178 if (stream_id == dep_stream_id) 179 { 180 LSQ_INFO("stream cannot depend on itself"); /* RFC 7540, Sec. 5.3.1. */ 181 return -1; 182 } 183 s = lsquic_frame_writer_write_priority(hs->hs_fw, stream_id, exclusive, 184 dep_stream_id, weight); 185 if (0 == s) 186 { 187 lsquic_stream_wantwrite(hs->hs_stream, 188 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 189 } 190 else 191 LSQ_INFO("Error writing priority frame: %s", strerror(errno)); 192 return s; 193} 194 195 196struct headers_stream * 197lsquic_headers_stream_new (int is_server, struct lsquic_engine_public *enpub, 198 const struct headers_stream_callbacks *callbacks, 199 void *cb_ctx) 200{ 201 struct headers_stream *hs = calloc(1, sizeof(*hs)); 202 if (!hs) 203 return NULL; 204 hs->hs_callbacks = callbacks; 205 hs->hs_cb_ctx = cb_ctx; 206 if (is_server) 207 hs->hs_flags = HS_IS_SERVER; 208 else 209 hs->hs_flags = 0; 210 hs->hs_enpub = enpub; 211 return hs; 212} 213 214 215void 216lsquic_headers_stream_destroy (struct headers_stream *hs) 217{ 218 if (hs->hs_fr) 219 lsquic_frame_reader_destroy(hs->hs_fr); 220 if (hs->hs_fw) 221 lsquic_frame_writer_destroy(hs->hs_fw); 222 if (hs->hs_flags & HS_HENC_INITED) 223 lshpack_enc_cleanup(&hs->hs_henc); 224 lshpack_dec_cleanup(&hs->hs_hdec); 225 free(hs); 226} 227 228 229static const struct lsquic_stream_if headers_stream_if = 230{ 231 .on_new_stream = headers_on_new_stream, 232 .on_read = headers_on_read, 233 .on_write = headers_on_write, 234 .on_close = headers_on_close, 235}; 236 237 238const struct lsquic_stream_if *const lsquic_headers_stream_if = 239 &headers_stream_if; 240 241 242static void 243headers_on_incoming_headers (void *ctx, struct uncompressed_headers *uh) 244{ 245 struct headers_stream *hs = ctx; 246 hs->hs_callbacks->hsc_on_headers(hs->hs_cb_ctx, uh); 247} 248 249 250static void 251headers_on_push_promise (void *ctx, struct uncompressed_headers *uh) 252{ 253 struct headers_stream *hs = ctx; 254 hs->hs_callbacks->hsc_on_push_promise(hs->hs_cb_ctx, uh); 255} 256 257 258static void 259headers_on_priority (void *ctx, uint32_t stream_id, int exclusive, 260 uint32_t dep_stream_id, unsigned weight) 261{ 262 struct headers_stream *hs = ctx; 263 hs->hs_callbacks->hsc_on_priority(hs->hs_cb_ctx, stream_id, exclusive, 264 dep_stream_id, weight); 265} 266 267 268static void 269headers_on_error (void *ctx, uint32_t stream_id, enum frame_reader_error err) 270{ 271 struct headers_stream *hs = ctx; 272 switch (err) 273 { 274 case FR_ERR_DUPLICATE_PSEH: 275 case FR_ERR_INCOMPL_REQ_PSEH: 276 case FR_ERR_UNNEC_REQ_PSEH: 277 case FR_ERR_INCOMPL_RESP_PSEH: 278 case FR_ERR_UNNEC_RESP_PSEH: 279 case FR_ERR_UNKNOWN_PSEH: 280 case FR_ERR_UPPERCASE_HEADER: 281 case FR_ERR_MISPLACED_PSEH: 282 case FR_ERR_MISSING_PSEH: 283 case FR_ERR_DECOMPRESS: 284 case FR_ERR_HEADERS_TOO_LARGE: 285 case FR_ERR_SELF_DEP_STREAM: 286 LSQ_INFO("error %u is a stream error (stream %u)", err, stream_id); 287 hs->hs_callbacks->hsc_on_stream_error(hs->hs_cb_ctx, stream_id); 288 break; 289 case FR_ERR_INVALID_FRAME_SIZE: 290 case FR_ERR_NONZERO_STREAM_ID: 291 case FR_ERR_UNEXPECTED_PUSH: 292 case FR_ERR_ZERO_STREAM_ID: 293 case FR_ERR_NOMEM: 294 case FR_ERR_EXPECTED_CONTIN: 295 LSQ_INFO("error %u is a connection error (stream %u)", err, stream_id); 296 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 297 break; 298 } 299} 300 301 302static void 303headers_on_settings (void *ctx, uint16_t setting_id, uint32_t setting_value) 304{ 305 struct headers_stream *hs = ctx; 306 switch (setting_id) 307 { 308 case SETTINGS_HEADER_TABLE_SIZE: 309 if (setting_value > MAX_HEADER_TABLE_SIZE) 310 { 311 LSQ_INFO("tried to update table size to %u, which is larger than " 312 "allowed maximum of %u bytes", setting_value, 313 MAX_HEADER_TABLE_SIZE); 314 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 315 } 316 else 317 { 318 LSQ_INFO("update hpack table size to %u", setting_value); 319 lshpack_enc_set_max_capacity(&hs->hs_henc, setting_value); 320 } 321 break; 322 case SETTINGS_MAX_HEADER_LIST_SIZE: 323 LSQ_INFO("set max header list size to %u", setting_value); 324 lsquic_frame_writer_max_header_list_size(hs->hs_fw, setting_value); 325 break; 326 case SETTINGS_ENABLE_PUSH: 327 LSQ_INFO("got setting enable_push: %u", setting_value); 328 if (hs->hs_flags & HS_IS_SERVER) 329 { 330 if (setting_value <= 1) 331 hs->hs_callbacks->hsc_on_enable_push(hs->hs_cb_ctx, 332 setting_value); 333 else 334 { 335 LSQ_INFO("invalid value of enable_push"); 336 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 337 } 338 } 339 else 340 { 341 LSQ_INFO("it is an error to receive enable_push setting in " 342 "client mode"); 343 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 344 } 345 break; 346 case SETTINGS_MAX_CONCURRENT_STREAMS: 347 case SETTINGS_INITIAL_WINDOW_SIZE: 348 case SETTINGS_MAX_FRAME_SIZE: 349 /* [draft-ietf-quic-http-00], Section 3 */ 350 LSQ_INFO("Specifying setting 0x%X is a QUIC error", setting_id); 351 hs->hs_callbacks->hsc_on_conn_error(hs->hs_cb_ctx); 352 break; 353 default: 354 LSQ_INFO("Ignoring unknown setting 0x%X; value 0x%X", setting_id, 355 setting_value); 356 break; 357 } 358} 359 360 361int 362lsquic_headers_stream_push_promise (struct headers_stream *hs, 363 uint32_t stream_id, uint32_t promised_stream_id, 364 const struct iovec *path, const struct iovec *host, 365 const struct lsquic_http_headers *headers) 366{ 367 int s; 368 LSQ_DEBUG("promising stream %u in response to stream %u", 369 promised_stream_id, stream_id); 370 s = lsquic_frame_writer_write_promise(hs->hs_fw, stream_id, 371 promised_stream_id, path, host, headers); 372 if (0 == s) 373 { 374 lsquic_stream_wantwrite(hs->hs_stream, 375 lsquic_frame_writer_have_leftovers(hs->hs_fw)); 376 } 377 else 378 LSQ_INFO("Error writing push promise: %s", strerror(errno)); 379 return s; 380} 381 382 383size_t 384lsquic_headers_stream_mem_used (const struct headers_stream *hs) 385{ 386 size_t size; 387 388 size = sizeof(*hs); 389 size += lsquic_frame_reader_mem_used(hs->hs_fr); 390 size += lsquic_frame_writer_mem_used(hs->hs_fw); 391 /* XXX: get rid of this mem_used business as we no longer use it? */ 392 393 return size; 394} 395 396 397struct lsquic_stream * 398lsquic_headers_stream_get_stream (const struct headers_stream *hs) 399{ 400 return hs->hs_stream; 401} 402 403 404static const struct frame_reader_callbacks frame_callbacks = { 405 .frc_on_headers = headers_on_incoming_headers, 406 .frc_on_push_promise = headers_on_push_promise, 407 .frc_on_error = headers_on_error, 408 .frc_on_settings = headers_on_settings, 409 .frc_on_priority = headers_on_priority, 410}; 411 412static const struct frame_reader_callbacks *frame_callbacks_ptr = &frame_callbacks; 413 414