lsquic_di_hash.c revision 0a4f8953
1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_di_hash.c -- Copy incoming data into a hash 4 * 5 * While this implementation copies the data, its memory use is limited, 6 * which makes it a good choice when we have a lot of stream frames 7 * coming in. 8 * 9 * Another difference is that incoming STREAM frames are allowed to overlap. 10 */ 11 12 13#include <assert.h> 14#include <inttypes.h> 15#include <stddef.h> 16#include <stdint.h> 17#include <stdlib.h> 18#include <string.h> 19#include <sys/queue.h> 20 21#include "lsquic.h" 22#include "lsquic_int_types.h" 23#include "lsquic_types.h" 24#include "lsquic_conn_flow.h" 25#include "lsquic_packet_common.h" 26#include "lsquic_packet_in.h" 27#include "lsquic_rtt.h" 28#include "lsquic_sfcw.h" 29#include "lsquic_varint.h" 30#include "lsquic_hq.h" 31#include "lsquic_hash.h" 32#include "lsquic_stream.h" 33#include "lsquic_mm.h" 34#include "lsquic_malo.h" 35#include "lsquic_conn.h" 36#include "lsquic_conn_public.h" 37#include "lsquic_data_in_if.h" 38 39 40#define LSQUIC_LOGGER_MODULE LSQLM_DI 41#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(hdi->hdi_conn_pub->lconn) 42#define LSQUIC_LOG_STREAM_ID hdi->hdi_stream_id 43#include "lsquic_logger.h" 44 45 46#define N_DB_SETS 57 47 48#define DB_DATA_SIZE (0x1000 - sizeof(TAILQ_ENTRY(data_block)) - \ 49 sizeof(uint64_t) - N_DB_SETS * sizeof(uint64_t)) 50 51struct data_block 52{ 53 TAILQ_ENTRY(data_block) db_next; 54 uint64_t db_off; 55 uint64_t db_set[N_DB_SETS]; /* bit for each valid byte */ 56 unsigned char db_data[DB_DATA_SIZE]; 57}; 58 59typedef char db_set_covers_all_db_data[(N_DB_SETS * 64 >= DB_DATA_SIZE) ?1: - 1]; 60typedef char db_set_no_waste[(N_DB_SETS * 64 - 64 <= DB_DATA_SIZE)?1: - 1]; 61typedef char db_block_is_4K[(sizeof(struct data_block) == 0x1000) ?1:- 1]; 62 63 64TAILQ_HEAD(dblock_head, data_block); 65 66 67static const struct data_in_iface *di_if_hash_ptr; 68 69 70struct hash_data_in 71{ 72 struct data_in hdi_data_in; 73 struct lsquic_conn_public *hdi_conn_pub; 74 uint64_t hdi_fin_off; 75 struct dblock_head *hdi_buckets; 76 struct data_block *hdi_last_block; 77 struct data_frame hdi_data_frame; 78 lsquic_stream_id_t hdi_stream_id; 79 unsigned hdi_count; 80 unsigned hdi_nbits; 81 enum { 82 HDI_FIN = (1 << 0), 83 } hdi_flags; 84}; 85 86 87#define HDI_PTR(data_in) (struct hash_data_in *) \ 88 ((unsigned char *) (data_in) - offsetof(struct hash_data_in, hdi_data_in)) 89 90 91#define N_BUCKETS(n_bits) (1U << (n_bits)) 92#define BUCKNO(n_bits, off) ((off / DB_DATA_SIZE) & (N_BUCKETS(n_bits) - 1)) 93 94 95static unsigned 96my_log2 /* silly name to suppress compiler warning */ (unsigned sz) 97{ 98#if __GNUC__ 99 unsigned clz = __builtin_clz(sz); 100 return 32 - clz; 101#else 102 unsigned clz; 103 size_t y; 104 clz = 32; 105 y = sz >> 16; if (y) { clz -= 16; sz = y; } 106 y = sz >> 8; if (y) { clz -= 8; sz = y; } 107 y = sz >> 4; if (y) { clz -= 4; sz = y; } 108 y = sz >> 2; if (y) { clz -= 2; sz = y; } 109 y = sz >> 1; if (y) return 32 - clz + 1; 110 return 32 - clz + sz; 111#endif 112} 113 114 115struct data_in * 116lsquic_data_in_hash_new (struct lsquic_conn_public *conn_pub, 117 lsquic_stream_id_t stream_id, uint64_t byteage) 118{ 119 struct hash_data_in *hdi; 120 unsigned n; 121 122 hdi = malloc(sizeof(*hdi)); 123 if (!hdi) 124 return NULL; 125 126 hdi->hdi_data_in.di_if = di_if_hash_ptr; 127 hdi->hdi_data_in.di_flags = 0; 128 hdi->hdi_conn_pub = conn_pub; 129 hdi->hdi_stream_id = stream_id; 130 hdi->hdi_fin_off = 0; 131 hdi->hdi_flags = 0; 132 hdi->hdi_last_block = NULL; 133 if (byteage >= DB_DATA_SIZE /* __builtin_clz is undefined if 134 argument is 0 */) 135 hdi->hdi_nbits = my_log2(byteage / DB_DATA_SIZE) + 2; 136 else 137 hdi->hdi_nbits = 3; 138 hdi->hdi_count = 0; 139 hdi->hdi_buckets = malloc(sizeof(hdi->hdi_buckets[0]) * 140 N_BUCKETS(hdi->hdi_nbits)); 141 if (!hdi->hdi_buckets) 142 { 143 free(hdi); 144 return NULL; 145 } 146 147 for (n = 0; n < N_BUCKETS(hdi->hdi_nbits); ++n) 148 TAILQ_INIT(&hdi->hdi_buckets[n]); 149 150 return &hdi->hdi_data_in; 151} 152 153 154static void 155hash_di_destroy (struct data_in *data_in) 156{ 157 struct hash_data_in *const hdi = HDI_PTR(data_in); 158 struct data_block *block; 159 unsigned n; 160 161 for (n = 0; n < N_BUCKETS(hdi->hdi_nbits); ++n) 162 { 163 while ((block = TAILQ_FIRST(&hdi->hdi_buckets[n]))) 164 { 165 TAILQ_REMOVE(&hdi->hdi_buckets[n], block, db_next); 166 free(block); 167 } 168 } 169 free(hdi->hdi_buckets); 170 free(hdi); 171} 172 173 174static int 175hash_grow (struct hash_data_in *hdi) 176{ 177 struct dblock_head *new_buckets, *new[2]; 178 struct data_block *block; 179 unsigned n, old_nbits; 180 int idx; 181 182 old_nbits = hdi->hdi_nbits; 183 LSQ_DEBUG("doubling number of buckets to %u", N_BUCKETS(old_nbits + 1)); 184 new_buckets = malloc(sizeof(hdi->hdi_buckets[0]) 185 * N_BUCKETS(old_nbits + 1)); 186 if (!new_buckets) 187 { 188 LSQ_WARN("malloc failed: potential trouble ahead"); 189 return -1; 190 } 191 192 for (n = 0; n < N_BUCKETS(old_nbits); ++n) 193 { 194 new[0] = &new_buckets[n]; 195 new[1] = &new_buckets[n + N_BUCKETS(old_nbits)]; 196 TAILQ_INIT(new[0]); 197 TAILQ_INIT(new[1]); 198 while ((block = TAILQ_FIRST(&hdi->hdi_buckets[n]))) 199 { 200 TAILQ_REMOVE(&hdi->hdi_buckets[n], block, db_next); 201 idx = (BUCKNO(old_nbits + 1, block->db_off) >> old_nbits) & 1; 202 TAILQ_INSERT_TAIL(new[idx], block, db_next); 203 } 204 } 205 free(hdi->hdi_buckets); 206 hdi->hdi_nbits = old_nbits + 1; 207 hdi->hdi_buckets = new_buckets; 208 return 0; 209} 210 211 212static int 213hash_insert (struct hash_data_in *hdi, struct data_block *block) 214{ 215 unsigned buckno; 216 217 if (hdi->hdi_count >= N_BUCKETS(hdi->hdi_nbits) / 2 && 0 != hash_grow(hdi)) 218 return -1; 219 220 buckno = BUCKNO(hdi->hdi_nbits, block->db_off); 221 TAILQ_INSERT_TAIL(&hdi->hdi_buckets[buckno], block, db_next); 222 ++hdi->hdi_count; 223 return 0; 224} 225 226 227static struct data_block * 228hash_find (const struct hash_data_in *hdi, uint64_t off) 229{ 230 struct data_block *block; 231 unsigned buckno; 232 233 buckno = BUCKNO(hdi->hdi_nbits, off); 234 TAILQ_FOREACH(block, &hdi->hdi_buckets[buckno], db_next) 235 if (off == block->db_off) 236 return block; 237 return NULL; 238} 239 240 241static void 242hash_remove (struct hash_data_in *hdi, struct data_block *block) 243{ 244 unsigned buckno; 245 246 buckno = BUCKNO(hdi->hdi_nbits, block->db_off); 247 TAILQ_REMOVE(&hdi->hdi_buckets[buckno], block, db_next); 248 --hdi->hdi_count; 249} 250 251 252static struct data_block * 253new_block (struct hash_data_in *hdi, uint64_t off) 254{ 255 struct data_block *block; 256 257 assert(0 == off % DB_DATA_SIZE); 258 259 block = malloc(sizeof(*block)); 260 if (!block) 261 return NULL; 262 263 block->db_off = off; 264 if (0 != hash_insert(hdi, block)) 265 { 266 free(block); 267 return NULL; 268 } 269 270 memset(block->db_set, 0, sizeof(block->db_set)); 271 return block; 272} 273 274 275static unsigned 276block_write (struct data_block *block, unsigned block_off, 277 const unsigned char *data, unsigned data_sz) 278{ 279 const unsigned char *begin, *end; 280 unsigned set, bit, n_full_sets, n; 281 uint64_t mask; 282 283 assert(block_off < DB_DATA_SIZE); 284 if (data_sz > DB_DATA_SIZE - block_off) 285 data_sz = DB_DATA_SIZE - block_off; 286 287 begin = data; 288 end = begin + data_sz; 289 set = block_off >> 6; 290 bit = block_off & 0x3F; 291 292 assert(set < N_DB_SETS); 293 294 if (bit) 295 { 296 n = 64 - bit; 297 if (n > data_sz) 298 n = data_sz; 299 mask = ~((1ULL << bit ) - 1) 300 & ((1ULL << (bit + n - 1)) | ((1ULL << (bit + n - 1)) - 1)); 301 block->db_set[ set ] |= mask; 302 memcpy(block->db_data + block_off, data, n); 303 data += n; 304 block_off += n; 305 ++set; 306 } 307 308 n_full_sets = (end - data) >> 6; 309 if (n_full_sets) 310 { 311 memcpy(block->db_data + block_off, data, n_full_sets * 64); 312 data += n_full_sets * 64; 313 block_off += n_full_sets * 64; 314 memset(&block->db_set[ set ], 0xFF, n_full_sets * 8); 315 set += n_full_sets; 316 } 317 318 if (data < end) 319 { 320 assert(end - data < 64); 321 block->db_set[ set ] |= ((1ULL << (end - data)) - 1); 322 memcpy(block->db_data + block_off, data, end - data); 323 data = end; 324 } 325 326 assert(set <= N_DB_SETS); 327 328 return data - begin; 329} 330 331 332static int 333has_bytes_after (const struct data_block *block, unsigned off) 334{ 335 unsigned bit, set; 336 int has; 337 338 set = off >> 6; 339 bit = off & 0x3F; 340 341 has = 0 != (block->db_set[ set ] >> bit); 342 ++set; 343 344 for ( ; set < N_DB_SETS; ++set) 345 has += 0 != block->db_set[ set ]; 346 347 return has > 0; 348} 349 350 351enum ins_frame 352lsquic_data_in_hash_insert_data_frame (struct data_in *data_in, 353 const struct data_frame *data_frame, uint64_t read_offset) 354{ 355 struct hash_data_in *const hdi = HDI_PTR(data_in); 356 struct data_block *block; 357 uint64_t key, off, diff, fin_off; 358 const unsigned char *data; 359 unsigned size, nw; 360 361 if (data_frame->df_offset + data_frame->df_size < read_offset) 362 { 363 if (data_frame->df_fin) 364 return INS_FRAME_ERR; 365 else 366 return INS_FRAME_DUP; 367 } 368 369 if ((hdi->hdi_flags & HDI_FIN) && 370 ( 371 (data_frame->df_fin && 372 data_frame->df_offset + data_frame->df_size != hdi->hdi_fin_off) 373 || 374 data_frame->df_offset + data_frame->df_size > hdi->hdi_fin_off 375 ) 376 ) 377 { 378 return INS_FRAME_ERR; 379 } 380 381 if (data_frame->df_offset < read_offset) 382 { 383 diff = read_offset - data_frame->df_offset; 384 assert(diff <= data_frame->df_size); 385 size = data_frame->df_size - diff; 386 off = data_frame->df_offset + diff; 387 data = data_frame->df_data + diff; 388 } 389 else 390 { 391 size = data_frame->df_size; 392 off = data_frame->df_offset; 393 data = data_frame->df_data; 394 } 395 396 key = off - (off % DB_DATA_SIZE); 397 do 398 { 399 block = hash_find(hdi, key); 400 if (!block) 401 { 402 block = new_block(hdi, key); 403 if (!block) 404 return INS_FRAME_ERR; 405 } 406 nw = block_write(block, off % DB_DATA_SIZE, data, size); 407 size -= nw; 408 off += nw; 409 data += nw; 410 key += DB_DATA_SIZE; 411 } 412 while (size > 0); 413 414 if (data_frame->df_fin) 415 { 416 fin_off = data_frame->df_offset + data_frame->df_size; 417 if (has_bytes_after(block, fin_off - block->db_off) || 418 hash_find(hdi, key)) 419 { 420 return INS_FRAME_ERR; 421 } 422 hdi->hdi_flags |= HDI_FIN; 423 hdi->hdi_fin_off = fin_off; 424 } 425 426 return INS_FRAME_OK; 427} 428 429 430static enum ins_frame 431hash_di_insert_frame (struct data_in *data_in, 432 struct stream_frame *new_frame, uint64_t read_offset) 433{ 434 struct hash_data_in *const hdi = HDI_PTR(data_in); 435 const struct data_frame *const data_frame = &new_frame->data_frame; 436 enum ins_frame ins; 437 438 ins = lsquic_data_in_hash_insert_data_frame(data_in, data_frame, 439 read_offset); 440 assert(ins != INS_FRAME_OVERLAP); 441 /* NOTE: Only release packet and frame for INS_FRAME_OK, 442 * other cases are handled by caller */ 443 if (ins == INS_FRAME_OK) 444 { 445 lsquic_packet_in_put(hdi->hdi_conn_pub->mm, new_frame->packet_in); 446 lsquic_malo_put(new_frame); 447 } 448 return ins; 449} 450 451 452#if __GNUC__ 453# define ctz __builtin_ctzll 454#else 455static unsigned 456ctz (unsigned long long x) 457{ 458 unsigned n = 0; 459 if (0 == (x & ((1ULL << 32) - 1))) { n += 32; x >>= 32; } 460 if (0 == (x & ((1ULL << 16) - 1))) { n += 16; x >>= 16; } 461 if (0 == (x & ((1ULL << 8) - 1))) { n += 8; x >>= 8; } 462 if (0 == (x & ((1ULL << 4) - 1))) { n += 4; x >>= 4; } 463 if (0 == (x & ((1ULL << 2) - 1))) { n += 2; x >>= 2; } 464 if (0 == (x & ((1ULL << 1) - 1))) { n += 1; x >>= 1; } 465 return n; 466} 467#endif 468 469 470static unsigned 471n_avail_bytes (const struct data_block *block, unsigned set, unsigned bit) 472{ 473 unsigned count; 474 uint64_t part; 475 476 part = ~(block->db_set[ set ] >> bit); 477 if (part) 478 { 479 count = ctz(part); 480 if (count < 64 - bit) 481 return count; 482 } 483 else 484 count = 64; 485 ++set; 486 487 for ( ; set < N_DB_SETS && ~0ULL == block->db_set[ set ]; ++set) 488 count += 64; 489 490 if (set < N_DB_SETS) 491 { 492 part = ~block->db_set[ set ]; 493 if (part) 494 count += ctz(part); 495 else 496 count += 64; 497 } 498 499 return count; 500} 501 502 503/* Data block is readable if there is at least one readable byte at 504 * `read_offset' or there is FIN at that offset. 505 */ 506static int 507setup_data_frame (struct hash_data_in *hdi, const uint64_t read_offset, 508 struct data_block *block) 509{ 510 unsigned set, bit; 511 uint64_t offset; 512 513 offset = read_offset % DB_DATA_SIZE; 514 set = offset >> 6; 515 bit = offset & 0x3F; 516 517 if (block->db_set[ set ] & (1ULL << bit)) 518 { 519 hdi->hdi_last_block = block; 520 hdi->hdi_data_frame.df_data = block->db_data; 521 hdi->hdi_data_frame.df_offset = block->db_off; 522 hdi->hdi_data_frame.df_read_off = offset; 523 hdi->hdi_data_frame.df_size = offset + 524 n_avail_bytes(block, set, bit); 525 hdi->hdi_data_frame.df_fin = 526 (hdi->hdi_flags & HDI_FIN) && 527 hdi->hdi_data_frame.df_read_off + 528 hdi->hdi_data_frame.df_size == hdi->hdi_fin_off; 529 return 1; 530 } 531 else if ((hdi->hdi_flags & HDI_FIN) && read_offset == hdi->hdi_fin_off) 532 { 533 hdi->hdi_last_block = block; 534 hdi->hdi_data_frame.df_data = NULL; 535 hdi->hdi_data_frame.df_offset = block->db_off; 536 hdi->hdi_data_frame.df_read_off = offset; 537 hdi->hdi_data_frame.df_size = offset; 538 hdi->hdi_data_frame.df_fin = 1; 539 return 1; 540 } 541 else 542 return 0; 543} 544 545 546static struct data_frame * 547hash_di_get_frame (struct data_in *data_in, uint64_t read_offset) 548{ 549 struct hash_data_in *const hdi = HDI_PTR(data_in); 550 struct data_block *block; 551 uint64_t key; 552 553 key = read_offset - (read_offset % DB_DATA_SIZE); 554 block = hash_find(hdi, key); 555 if (!block) 556 { 557 if ((hdi->hdi_flags & HDI_FIN) && read_offset == hdi->hdi_fin_off) 558 { 559 hdi->hdi_last_block = NULL; 560 hdi->hdi_data_frame.df_data = NULL; 561 hdi->hdi_data_frame.df_offset = read_offset - 562 read_offset % DB_DATA_SIZE; 563 hdi->hdi_data_frame.df_read_off = 0; 564 hdi->hdi_data_frame.df_size = 0; 565 hdi->hdi_data_frame.df_fin = 1; 566 return &hdi->hdi_data_frame; 567 } 568 else 569 return NULL; 570 } 571 572 if (setup_data_frame(hdi, read_offset, block)) 573 return &hdi->hdi_data_frame; 574 else 575 return NULL; 576} 577 578 579static void 580hash_di_frame_done (struct data_in *data_in, struct data_frame *data_frame) 581{ 582 struct hash_data_in *const hdi = HDI_PTR(data_in); 583 struct data_block *const block = hdi->hdi_last_block; 584 585 if (block) 586 { 587 if (data_frame->df_read_off == DB_DATA_SIZE || 588 !has_bytes_after(block, data_frame->df_read_off)) 589 { 590 hash_remove(hdi, block); 591 free(block); 592 if (0 == hdi->hdi_count && 0 == (hdi->hdi_flags & HDI_FIN)) 593 { 594 LSQ_DEBUG("hash empty, want to switch"); 595 hdi->hdi_data_in.di_flags |= DI_SWITCH_IMPL; 596 } 597 } 598 } 599 else 600 assert(data_frame->df_fin && data_frame->df_size == 0); 601} 602 603 604static int 605hash_di_empty (struct data_in *data_in) 606{ 607 struct hash_data_in *const hdi = HDI_PTR(data_in); 608 return hdi->hdi_count == 0; 609} 610 611 612static struct data_in * 613hash_di_switch_impl (struct data_in *data_in, uint64_t read_offset) 614{ 615 struct hash_data_in *const hdi = HDI_PTR(data_in); 616 struct data_in *new_data_in; 617 618 assert(hdi->hdi_count == 0); 619 620 new_data_in = lsquic_data_in_nocopy_new(hdi->hdi_conn_pub, 621 hdi->hdi_stream_id); 622 data_in->di_if->di_destroy(data_in); 623 624 return new_data_in; 625} 626 627 628static size_t 629hash_di_mem_used (struct data_in *data_in) 630{ 631 struct hash_data_in *const hdi = HDI_PTR(data_in); 632 const struct data_block *block; 633 size_t size; 634 unsigned n; 635 636 size = sizeof(*data_in); 637 638 for (n = 0; n < N_BUCKETS(hdi->hdi_nbits); ++n) 639 TAILQ_FOREACH(block, &hdi->hdi_buckets[n], db_next) 640 size += sizeof(*block); 641 642 size += N_BUCKETS(hdi->hdi_nbits) * sizeof(hdi->hdi_buckets[0]); 643 644 return size; 645} 646 647 648static void 649hash_di_dump_state (struct data_in *data_in) 650{ 651 const struct hash_data_in *const hdi = HDI_PTR(data_in); 652 const struct data_block *block; 653 unsigned n; 654 655 LSQ_DEBUG("hash state: flags: %X; fin off: %"PRIu64"; count: %u", 656 hdi->hdi_flags, hdi->hdi_fin_off, hdi->hdi_count); 657 for (n = 0; n < N_BUCKETS(hdi->hdi_nbits); ++n) 658 TAILQ_FOREACH(block, &hdi->hdi_buckets[n], db_next) 659 LSQ_DEBUG("block: off: %"PRIu64, block->db_off); 660} 661 662 663static uint64_t 664hash_di_readable_bytes (struct data_in *data_in, uint64_t read_offset) 665{ 666 const struct data_frame *data_frame; 667 uint64_t starting_offset; 668 669 starting_offset = read_offset; 670 while (data_frame = hash_di_get_frame(data_in, read_offset), 671 data_frame && data_frame->df_size - data_frame->df_read_off) 672 read_offset += data_frame->df_size - data_frame->df_read_off; 673 674 return read_offset - starting_offset; 675} 676 677 678static const struct data_in_iface di_if_hash = { 679 .di_destroy = hash_di_destroy, 680 .di_dump_state = hash_di_dump_state, 681 .di_empty = hash_di_empty, 682 .di_frame_done = hash_di_frame_done, 683 .di_get_frame = hash_di_get_frame, 684 .di_insert_frame = hash_di_insert_frame, 685 .di_mem_used = hash_di_mem_used, 686 .di_own_on_ok = 0, 687 .di_readable_bytes 688 = hash_di_readable_bytes, 689 .di_switch_impl = hash_di_switch_impl, 690}; 691 692static const struct data_in_iface *di_if_hash_ptr = &di_if_hash; 693