lsquic_di_nocopy.c revision 55613f44
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_di_nocopy.c -- The "no-copy" data in stream. 4 * 5 * Data from packets is not copied: the packets are referenced by stream 6 * frames. When all data from stream frame is read, the frame is released 7 * and packet reference count is decremented, which possibly results in 8 * packet being released as well. 9 * 10 * This approach works well in regular circumstances; there are two scenarios 11 * when it does not: 12 * 13 * A. If we have many out-of-order frames, insertion into the list becomes 14 * expensive. In the degenerate case, we'd have to traverse the whole 15 * list to find appropriate position. 16 * 17 * B. Having many frames ties up resources, as each frame keeps a reference 18 * to the packet that contains it. This is a possible attack vector: 19 * send many one-byte packets; a single hole at the beginning will stop 20 * the server from being able to read the stream, thus tying up resources. 21 * 22 * If we detect that either (A) or (B) is true, we request that the stream 23 * switch to a more robust incoming stream frame handler by setting 24 * DI_SWITCH_IMPL flag. 25 * 26 * For a small number of elements, (A) and (B) do not matter and the checks 27 * are not performed. This number is defined by EFF_CHECK_THRESH_LOW. On 28 * the other side of the spectrum, if the number of frames grows very high, 29 * we want to switch to a more memory-efficient implementation even if (A) 30 * and (B) are not true. EFF_CHECK_THRESH_HIGH defines this threshold. 31 * 32 * Between the low and high thresholds, we detect efficiency problems as 33 * follows. 34 * 35 * To detect (A), we count how many elements we have to traverse during 36 * insertion. If we have to traverse at least half the list 37 * EFF_FAR_TRAVERSE_COUNT in a row, DI_SWITCH_IMPL is issued. 38 * 39 * If average stream frame size is smaller than EFF_TINY_FRAME_SZ bytes, 40 * (B) condition is true. In addition, if there are more than EFF_MAX_HOLES 41 * in the stream, this is also indicative of (B). 42 */ 43 44 45#include <assert.h> 46#include <inttypes.h> 47#include <stddef.h> 48#include <stdint.h> 49#include <stdlib.h> 50#include <sys/queue.h> 51 52#include "lsquic.h" 53#include "lsquic_types.h" 54#include "lsquic_int_types.h" 55#include "lsquic_conn_flow.h" 56#include "lsquic_packet_common.h" 57#include "lsquic_packet_in.h" 58#include "lsquic_rtt.h" 59#include "lsquic_sfcw.h" 60#include "lsquic_varint.h" 61#include "lsquic_hq.h" 62#include "lsquic_hash.h" 63#include "lsquic_stream.h" 64#include "lsquic_mm.h" 65#include "lsquic_malo.h" 66#include "lsquic_conn.h" 67#include "lsquic_conn_public.h" 68#include "lsquic_data_in_if.h" 69 70 71#define LSQUIC_LOGGER_MODULE LSQLM_DI 72#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(ncdi->ncdi_conn_pub->lconn) 73#define LSQUIC_LOG_STREAM_ID ncdi->ncdi_stream_id 74#include "lsquic_logger.h" 75 76 77/* If number of elements is at or below this number, we do not bother to check 78 * efficiency conditions. 79 */ 80#define EFF_CHECK_THRESH_LOW 10 81 82/* If number of elements is higher than this number, efficiency alert 83 * is issued unconditionally. 84 */ 85#define EFF_CHECK_THRESH_HIGH 1000 86 87/* Maximum number of consecutive far traversals */ 88#define EFF_FAR_TRAVERSE_COUNT 4 89 90/* Maximum number of holes that is not deemed suspicious */ 91#define EFF_MAX_HOLES 5 92 93/* What is deemed a tiny frame, in bytes. If it is a power of two, calculation 94 * is cheaper. 95 */ 96#define EFF_TINY_FRAME_SZ 64 97 98 99TAILQ_HEAD(stream_frames_tailq, stream_frame); 100 101 102struct nocopy_data_in 103{ 104 struct stream_frames_tailq ncdi_frames_in; 105 struct data_in ncdi_data_in; 106 struct lsquic_conn_public *ncdi_conn_pub; 107 uint64_t ncdi_byteage; 108 uint64_t ncdi_fin_off; 109 lsquic_stream_id_t ncdi_stream_id; 110 unsigned ncdi_n_frames; 111 unsigned ncdi_n_holes; 112 unsigned ncdi_cons_far; 113 enum { 114 NCDI_FIN_SET = 1 << 0, 115 NCDI_FIN_REACHED = 1 << 1, 116 } ncdi_flags; 117}; 118 119 120#define NCDI_PTR(data_in) (struct nocopy_data_in *) \ 121 ((unsigned char *) (data_in) - offsetof(struct nocopy_data_in, ncdi_data_in)) 122 123#define STREAM_FRAME_PTR(data_frame) (struct stream_frame *) \ 124 ((unsigned char *) (data_frame) - offsetof(struct stream_frame, data_frame)) 125 126 127static const struct data_in_iface *di_if_nocopy_ptr; 128 129 130struct data_in * 131lsquic_data_in_nocopy_new (struct lsquic_conn_public *conn_pub, 132 lsquic_stream_id_t stream_id) 133{ 134 struct nocopy_data_in *ncdi; 135 136 ncdi = malloc(sizeof(*ncdi)); 137 if (!ncdi) 138 return NULL; 139 140 TAILQ_INIT(&ncdi->ncdi_frames_in); 141 ncdi->ncdi_data_in.di_if = di_if_nocopy_ptr; 142 ncdi->ncdi_data_in.di_flags = 0; 143 ncdi->ncdi_conn_pub = conn_pub; 144 ncdi->ncdi_stream_id = stream_id; 145 ncdi->ncdi_byteage = 0; 146 ncdi->ncdi_n_frames = 0; 147 ncdi->ncdi_n_holes = 0; 148 ncdi->ncdi_cons_far = 0; 149 ncdi->ncdi_fin_off = 0; 150 ncdi->ncdi_flags = 0; 151 LSQ_DEBUG("initialized"); 152 return &ncdi->ncdi_data_in; 153} 154 155 156static void 157nocopy_di_destroy (struct data_in *data_in) 158{ 159 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 160 stream_frame_t *frame; 161 while ((frame = TAILQ_FIRST(&ncdi->ncdi_frames_in))) 162 { 163 TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame); 164 lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in); 165 lsquic_malo_put(frame); 166 } 167 free(ncdi); 168} 169 170 171#if LSQUIC_EXTRA_CHECKS 172static int 173frame_list_is_sane (const struct nocopy_data_in *ncdi) 174{ 175 const stream_frame_t *frame; 176 uint64_t prev_off = 0, prev_end = 0; 177 int ordered = 1, overlaps = 0; 178 TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame) 179 { 180 ordered &= prev_off <= DF_OFF(frame); 181 overlaps |= prev_end > DF_OFF(frame); 182 prev_off = DF_OFF(frame); 183 prev_end = DF_END(frame); 184 } 185 return ordered && !overlaps; 186} 187#define CHECK_ORDER(ncdi) assert(frame_list_is_sane(ncdi)) 188#else 189#define CHECK_ORDER(ncdi) 190#endif 191 192 193#define CASE(letter) ((int) (letter) << 8) 194 195/* Not all errors are picked up by this function, as it is expensive (and 196 * potentially error-prone) to check for all possible error conditions. 197 * It an error be misclassified as an overlap or dup, in the worst case 198 * we end up with an application error instead of protocol violation. 199 */ 200static int 201insert_frame (struct nocopy_data_in *ncdi, struct stream_frame *new_frame, 202 uint64_t read_offset, unsigned *p_n_frames) 203{ 204 stream_frame_t *prev_frame, *next_frame; 205 unsigned count; 206 207 if (read_offset > DF_END(new_frame)) 208 { 209 if (DF_FIN(new_frame)) 210 return INS_FRAME_ERR | CASE('A'); 211 else 212 return INS_FRAME_DUP | CASE('B'); 213 } 214 215 if (ncdi->ncdi_flags & NCDI_FIN_SET) 216 { 217 if (DF_FIN(new_frame) && DF_END(new_frame) != ncdi->ncdi_fin_off) 218 return INS_FRAME_ERR | CASE('C'); 219 if (DF_END(new_frame) > ncdi->ncdi_fin_off) 220 return INS_FRAME_ERR | CASE('D'); 221 if (read_offset == DF_END(new_frame)) 222 return INS_FRAME_DUP | CASE('M'); 223 } 224 else 225 { 226 if (read_offset == DF_END(new_frame) && !DF_FIN(new_frame)) 227 return INS_FRAME_DUP | CASE('L'); 228 } 229 230 /* Find position in the list, going backwards. We go backwards because 231 * that is the most likely scenario. 232 */ 233 next_frame = TAILQ_LAST(&ncdi->ncdi_frames_in, stream_frames_tailq); 234 if (next_frame && DF_OFF(new_frame) < DF_OFF(next_frame)) 235 { 236 count = 1; 237 prev_frame = TAILQ_PREV(next_frame, stream_frames_tailq, next_frame); 238 for ( ; prev_frame && DF_OFF(new_frame) < DF_OFF(next_frame); 239 next_frame = prev_frame, 240 prev_frame = TAILQ_PREV(prev_frame, stream_frames_tailq, next_frame)) 241 { 242 if (DF_OFF(new_frame) >= DF_OFF(prev_frame)) 243 break; 244 ++count; 245 } 246 } 247 else 248 { 249 count = 0; 250 prev_frame = NULL; 251 } 252 253 if (!prev_frame && next_frame && DF_OFF(new_frame) >= DF_OFF(next_frame)) 254 { 255 prev_frame = next_frame; 256 next_frame = TAILQ_NEXT(next_frame, next_frame); 257 } 258 259 const int select = !!prev_frame << 1 | !!next_frame; 260 switch (select) 261 { 262 default: /* No neighbors */ 263 if (read_offset == DF_END(new_frame)) 264 { 265 if (DF_SIZE(new_frame)) 266 { 267 if (DF_FIN(new_frame) 268 && !((ncdi->ncdi_flags & NCDI_FIN_REACHED) 269 && read_offset == ncdi->ncdi_fin_off)) 270 return INS_FRAME_OVERLAP | CASE('E'); 271 else 272 return INS_FRAME_DUP | CASE('F'); 273 } 274 else if (!DF_FIN(new_frame) 275 || ((ncdi->ncdi_flags & NCDI_FIN_REACHED) 276 && read_offset == ncdi->ncdi_fin_off)) 277 return INS_FRAME_DUP | CASE('G'); 278 } 279 else if (read_offset > DF_OFF(new_frame)) 280 return INS_FRAME_OVERLAP | CASE('N'); 281 goto list_was_empty; 282 case 3: /* Both left and right neighbors */ 283 case 2: /* Only left neighbor (prev_frame) */ 284 if (DF_OFF(prev_frame) == DF_OFF(new_frame) 285 && DF_SIZE(prev_frame) == DF_SIZE(new_frame)) 286 { 287 if (!DF_FIN(prev_frame) && DF_FIN(new_frame)) 288 return INS_FRAME_OVERLAP | CASE('H'); 289 else 290 return INS_FRAME_DUP | CASE('I'); 291 } 292 if (DF_END(prev_frame) > DF_OFF(new_frame)) 293 return INS_FRAME_OVERLAP | CASE('J'); 294 if (select == 2) 295 goto have_prev; 296 /* Fall-through */ 297 case 1: /* Only right neighbor (next_frame) */ 298 if (DF_END(new_frame) > DF_OFF(next_frame)) 299 return INS_FRAME_OVERLAP | CASE('K'); 300 else if (read_offset > DF_OFF(new_frame)) 301 return INS_FRAME_OVERLAP | CASE('O'); 302 break; 303 } 304 305 if (prev_frame) 306 { 307 have_prev: 308 TAILQ_INSERT_AFTER(&ncdi->ncdi_frames_in, prev_frame, new_frame, next_frame); 309 ncdi->ncdi_n_holes += DF_END(prev_frame) != DF_OFF(new_frame); 310 if (next_frame) 311 { 312 ncdi->ncdi_n_holes += DF_END(new_frame) != DF_OFF(next_frame); 313 --ncdi->ncdi_n_holes; 314 } 315 } 316 else 317 { 318 ncdi->ncdi_n_holes += next_frame 319 && DF_END(new_frame) != DF_OFF(next_frame); 320 list_was_empty: 321 TAILQ_INSERT_HEAD(&ncdi->ncdi_frames_in, new_frame, next_frame); 322 } 323 CHECK_ORDER(ncdi); 324 325 if (DF_FIN(new_frame)) 326 { 327 ncdi->ncdi_flags |= NCDI_FIN_SET; 328 ncdi->ncdi_fin_off = DF_END(new_frame); 329 LSQ_DEBUG("FIN set at %"PRIu64, DF_END(new_frame)); 330 } 331 332 ++ncdi->ncdi_n_frames; 333 ncdi->ncdi_byteage += DF_SIZE(new_frame); 334 *p_n_frames = count; 335 336 return INS_FRAME_OK | CASE('Z'); 337} 338 339 340static int 341check_efficiency (struct nocopy_data_in *ncdi, unsigned count) 342{ 343 if (ncdi->ncdi_n_frames <= EFF_CHECK_THRESH_LOW) 344 { 345 ncdi->ncdi_cons_far = 0; 346 return 0; 347 } 348 if (ncdi->ncdi_n_frames > EFF_CHECK_THRESH_HIGH) 349 return 1; 350 if (count >= ncdi->ncdi_n_frames / 2) 351 { 352 ++ncdi->ncdi_cons_far; 353 if (ncdi->ncdi_cons_far > EFF_FAR_TRAVERSE_COUNT) 354 return 1; 355 } 356 else 357 ncdi->ncdi_cons_far = 0; 358 if (ncdi->ncdi_n_holes > EFF_MAX_HOLES) 359 return 1; 360 if (ncdi->ncdi_byteage / EFF_TINY_FRAME_SZ < ncdi->ncdi_n_frames) 361 return 1; 362 return 0; 363} 364 365 366static void 367set_eff_alert (struct nocopy_data_in *ncdi) 368{ 369 LSQ_DEBUG("low efficiency: n_frames: %u; n_holes: %u; cons_far: %u; " 370 "byteage: %"PRIu64, ncdi->ncdi_n_frames, ncdi->ncdi_n_holes, 371 ncdi->ncdi_cons_far, ncdi->ncdi_byteage); 372 ncdi->ncdi_data_in.di_flags |= DI_SWITCH_IMPL; 373} 374 375 376static enum ins_frame 377nocopy_di_insert_frame (struct data_in *data_in, 378 struct stream_frame *new_frame, uint64_t read_offset) 379{ 380 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 381 unsigned count; 382 enum ins_frame ins; 383 int ins_case; 384 385 assert(0 == (new_frame->data_frame.df_fin & ~1)); 386 ins_case = insert_frame(ncdi, new_frame, read_offset, &count); 387 ins = ins_case & 0xFF; 388 ins_case >>= 8; 389 LSQ_DEBUG("%s: ins: %d (case '%c')", __func__, ins, (char) ins_case); 390 switch (ins) 391 { 392 case INS_FRAME_OK: 393 if (check_efficiency(ncdi, count)) 394 set_eff_alert(ncdi); 395 break; 396 case INS_FRAME_DUP: 397 case INS_FRAME_ERR: 398 lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, new_frame->packet_in); 399 lsquic_malo_put(new_frame); 400 break; 401 default: 402 break; 403 } 404 405 return ins; 406} 407 408 409static struct data_frame * 410nocopy_di_get_frame (struct data_in *data_in, uint64_t read_offset) 411{ 412 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 413 struct stream_frame *frame = TAILQ_FIRST(&ncdi->ncdi_frames_in); 414 if (frame && frame->data_frame.df_offset + 415 frame->data_frame.df_read_off == read_offset) 416 { 417 LSQ_DEBUG("get_frame: frame (off: %"PRIu64", size: %u, fin: %d), at " 418 "read offset %"PRIu64, DF_OFF(frame), DF_SIZE(frame), DF_FIN(frame), 419 read_offset); 420 return &frame->data_frame; 421 } 422 else 423 { 424 LSQ_DEBUG("get_frame: no frame at read offset %"PRIu64, read_offset); 425 return NULL; 426 } 427} 428 429 430static void 431nocopy_di_frame_done (struct data_in *data_in, struct data_frame *data_frame) 432{ 433 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 434 struct stream_frame *const frame = STREAM_FRAME_PTR(data_frame), *first; 435 assert(data_frame->df_read_off == data_frame->df_size); 436 TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame); 437 first = TAILQ_FIRST(&ncdi->ncdi_frames_in); 438 ncdi->ncdi_n_holes -= first && frame->data_frame.df_offset + 439 frame->data_frame.df_size != first->data_frame.df_offset; 440 --ncdi->ncdi_n_frames; 441 ncdi->ncdi_byteage -= frame->data_frame.df_size; 442 if (DF_FIN(frame)) 443 { 444 ncdi->ncdi_flags |= NCDI_FIN_REACHED; 445 LSQ_DEBUG("FIN has been reached at offset %"PRIu64, DF_END(frame)); 446 } 447 LSQ_DEBUG("frame (off: %"PRIu64", size: %u, fin: %d) done", 448 DF_OFF(frame), DF_SIZE(frame), DF_FIN(frame)); 449 lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in); 450 lsquic_malo_put(frame); 451} 452 453 454static int 455nocopy_di_empty (struct data_in *data_in) 456{ 457 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 458 return TAILQ_EMPTY(&ncdi->ncdi_frames_in); 459} 460 461 462static struct data_in * 463nocopy_di_switch_impl (struct data_in *data_in, uint64_t read_offset) 464{ 465 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 466 struct data_in *new_data_in; 467 stream_frame_t *frame; 468 enum ins_frame ins; 469 470 new_data_in = lsquic_data_in_hash_new(ncdi->ncdi_conn_pub, 471 ncdi->ncdi_stream_id, ncdi->ncdi_byteage); 472 if (!new_data_in) 473 goto end; 474 475 while ((frame = TAILQ_FIRST(&ncdi->ncdi_frames_in))) 476 { 477 TAILQ_REMOVE(&ncdi->ncdi_frames_in, frame, next_frame); 478 ins = lsquic_data_in_hash_insert_data_frame(new_data_in, 479 &frame->data_frame, read_offset); 480 lsquic_packet_in_put(ncdi->ncdi_conn_pub->mm, frame->packet_in); 481 lsquic_malo_put(frame); 482 if (INS_FRAME_ERR == ins) 483 { 484 new_data_in->di_if->di_destroy(new_data_in); 485 new_data_in = NULL; 486 goto end; 487 } 488 } 489 490 end: 491 data_in->di_if->di_destroy(data_in); 492 return new_data_in; 493} 494 495 496/* This function overestimates amount of memory because some packets are 497 * referenced by more than one stream. In the usual case, however, I 498 * expect the error not to be large. 499 */ 500static size_t 501nocopy_di_mem_used (struct data_in *data_in) 502{ 503 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 504 const stream_frame_t *frame; 505 size_t size; 506 507 size = sizeof(*data_in); 508 TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame) 509 size += lsquic_packet_in_mem_used(frame->packet_in); 510 511 return size; 512} 513 514 515static void 516nocopy_di_dump_state (struct data_in *data_in) 517{ 518 struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 519 const struct stream_frame *frame; 520 521 LSQ_DEBUG("nocopy state: frames: %u; holes: %u; cons_far: %u", 522 ncdi->ncdi_n_frames, ncdi->ncdi_n_holes, ncdi->ncdi_cons_far); 523 TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame) 524 LSQ_DEBUG("frame: off: %"PRIu64"; read_off: %"PRIu16"; size: %"PRIu16 525 "; fin: %d", DF_OFF(frame), frame->data_frame.df_read_off, 526 DF_SIZE(frame), DF_FIN(frame)); 527} 528 529 530static uint64_t 531nocopy_di_readable_bytes (struct data_in *data_in, uint64_t read_offset) 532{ 533 const struct nocopy_data_in *const ncdi = NCDI_PTR(data_in); 534 const struct stream_frame *frame; 535 uint64_t starting_offset; 536 537 starting_offset = read_offset; 538 TAILQ_FOREACH(frame, &ncdi->ncdi_frames_in, next_frame) 539 if (DF_ROFF(frame) == read_offset) 540 read_offset += DF_END(frame) - DF_ROFF(frame); 541 else if (read_offset > starting_offset) 542 break; 543 544 return read_offset - starting_offset; 545} 546 547 548static const struct data_in_iface di_if_nocopy = { 549 .di_destroy = nocopy_di_destroy, 550 .di_dump_state = nocopy_di_dump_state, 551 .di_empty = nocopy_di_empty, 552 .di_frame_done = nocopy_di_frame_done, 553 .di_get_frame = nocopy_di_get_frame, 554 .di_insert_frame = nocopy_di_insert_frame, 555 .di_mem_used = nocopy_di_mem_used, 556 .di_own_on_ok = 1, 557 .di_readable_bytes 558 = nocopy_di_readable_bytes, 559 .di_switch_impl = nocopy_di_switch_impl, 560}; 561 562static const struct data_in_iface *di_if_nocopy_ptr = &di_if_nocopy; 563