lsquic_engine.c revision bf2c7037
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <limits.h> 10#include <stdint.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <string.h> 14#include <sys/queue.h> 15#include <time.h> 16#ifndef WIN32 17#include <sys/time.h> 18#include <netinet/in.h> 19#include <sys/types.h> 20#include <sys/stat.h> 21#include <fcntl.h> 22#include <unistd.h> 23#include <netdb.h> 24#endif 25 26 27 28#include "lsquic.h" 29#include "lsquic_types.h" 30#include "lsquic_alarmset.h" 31#include "lsquic_parse_common.h" 32#include "lsquic_parse.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_packet_out.h" 35#include "lsquic_senhist.h" 36#include "lsquic_rtt.h" 37#include "lsquic_cubic.h" 38#include "lsquic_pacer.h" 39#include "lsquic_send_ctl.h" 40#include "lsquic_set.h" 41#include "lsquic_conn_flow.h" 42#include "lsquic_sfcw.h" 43#include "lsquic_stream.h" 44#include "lsquic_conn.h" 45#include "lsquic_full_conn.h" 46#include "lsquic_util.h" 47#include "lsquic_qtags.h" 48#include "lsquic_str.h" 49#include "lsquic_handshake.h" 50#include "lsquic_mm.h" 51#include "lsquic_conn_hash.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_eng_hist.h" 54#include "lsquic_ev_log.h" 55#include "lsquic_version.h" 56#include "lsquic_hash.h" 57#include "lsquic_attq.h" 58#include "lsquic_min_heap.h" 59#include "lsquic_http1x_if.h" 60 61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 62#include "lsquic_logger.h" 63 64 65/* The batch of outgoing packets grows and shrinks dynamically */ 66#define MAX_OUT_BATCH_SIZE 1024 67#define MIN_OUT_BATCH_SIZE 256 68#define INITIAL_OUT_BATCH_SIZE 512 69 70struct out_batch 71{ 72 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 73 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 74 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 75}; 76 77typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 78 79static void 80process_connections (struct lsquic_engine *engine, conn_iter_f iter, 81 lsquic_time_t now); 82 83static void 84engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 85 86static lsquic_conn_t * 87engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 88 enum lsquic_conn_flags flag); 89 90static void 91force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 92 93/* Nested calls to LSQUIC are not supported */ 94#define ENGINE_IN(e) do { \ 95 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 96 (e)->pub.enp_flags |= ENPUB_PROC; \ 97} while (0) 98 99#define ENGINE_OUT(e) do { \ 100 assert((e)->pub.enp_flags & ENPUB_PROC); \ 101 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 102} while (0) 103 104/* A connection can be referenced from one of six places: 105 * 106 * 1. Connection hash: a connection starts its life in one of those. 107 * 108 * 2. Outgoing queue. 109 * 110 * 3. Tickable queue 111 * 112 * 4. Advisory Tick Time queue. 113 * 114 * 5. Closing connections queue. This is a transient queue -- it only 115 * exists for the duration of process_connections() function call. 116 * 117 * 6. Ticked connections queue. Another transient queue, similar to (5). 118 * 119 * The idea is to destroy the connection when it is no longer referenced. 120 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 121 * that case, the connection is referenced from two places: (2) and (5). 122 * After its packets are sent, it is only referenced in (5), and at the 123 * end of the function call, when it is removed from (5), reference count 124 * goes to zero and the connection is destroyed. If not all packets can 125 * be sent, at the end of the function call, the connection is referenced 126 * by (2) and will only be removed once all outgoing packets have been 127 * sent. 128 */ 129#define CONN_REF_FLAGS (LSCONN_HASHED \ 130 |LSCONN_HAS_OUTGOING \ 131 |LSCONN_TICKABLE \ 132 |LSCONN_TICKED \ 133 |LSCONN_CLOSING \ 134 |LSCONN_ATTQ) 135 136 137 138 139struct lsquic_engine 140{ 141 struct lsquic_engine_public pub; 142 enum { 143 ENG_SERVER = LSENG_SERVER, 144 ENG_HTTP = LSENG_HTTP, 145 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 146 ENG_PAST_DEADLINE 147 = (1 << 8), /* Previous call to a processing 148 * function went past time threshold. 149 */ 150#ifndef NDEBUG 151 ENG_DTOR = (1 << 26), /* Engine destructor */ 152#endif 153 } flags; 154 const struct lsquic_stream_if *stream_if; 155 void *stream_if_ctx; 156 lsquic_packets_out_f packets_out; 157 void *packets_out_ctx; 158 void *bad_handshake_ctx; 159 struct conn_hash conns_hash; 160 struct min_heap conns_tickable; 161 struct min_heap conns_out; 162 struct eng_hist history; 163 unsigned batch_size; 164 struct attq *attq; 165 /* Track time last time a packet was sent to give new connections 166 * priority lower than that of existing connections. 167 */ 168 lsquic_time_t last_sent; 169 unsigned n_conns; 170 lsquic_time_t deadline; 171 struct out_batch out_batch; 172}; 173 174 175void 176lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 177 unsigned flags) 178{ 179 memset(settings, 0, sizeof(*settings)); 180 settings->es_versions = LSQUIC_DF_VERSIONS; 181 if (flags & ENG_SERVER) 182 { 183 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 184 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 185 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 186 } 187 else 188 { 189 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 190 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 191 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 192 } 193 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 194 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 195 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 196 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 197 settings->es_max_header_list_size 198 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 199 settings->es_ua = LSQUIC_DF_UA; 200 201 settings->es_pdmd = QTAG_X509; 202 settings->es_aead = QTAG_AESG; 203 settings->es_kexs = QTAG_C255; 204 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 205 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 206 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 207 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 208 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 209 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 210 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 211 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 212} 213 214 215/* Note: if returning an error, err_buf must be valid if non-NULL */ 216int 217lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 218 unsigned flags, 219 char *err_buf, size_t err_buf_sz) 220{ 221 if (settings->es_cfcw < LSQUIC_MIN_FCW || 222 settings->es_sfcw < LSQUIC_MIN_FCW) 223 { 224 if (err_buf) 225 snprintf(err_buf, err_buf_sz, "%s", 226 "flow control window set too low"); 227 return -1; 228 } 229 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 230 { 231 if (err_buf) 232 snprintf(err_buf, err_buf_sz, "%s", 233 "No supported QUIC versions specified"); 234 return -1; 235 } 236 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 237 { 238 if (err_buf) 239 snprintf(err_buf, err_buf_sz, "%s", 240 "one or more unsupported QUIC version is specified"); 241 return -1; 242 } 243 return 0; 244} 245 246 247static void 248free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6) 249{ 250 free(packet_data); 251} 252 253 254static void * 255malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6) 256{ 257 return malloc(size); 258} 259 260 261static const struct lsquic_packout_mem_if stock_pmi = 262{ 263 malloc_buf, free_packet, free_packet, 264}; 265 266 267static int 268hash_conns_by_addr (const struct lsquic_engine *engine) 269{ 270 if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS) 271 return 1; 272 if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS) 273 && engine->pub.enp_settings.es_support_tcid0) 274 return 1; 275 return 0; 276} 277 278 279lsquic_engine_t * 280lsquic_engine_new (unsigned flags, 281 const struct lsquic_engine_api *api) 282{ 283 lsquic_engine_t *engine; 284 int tag_buf_len; 285 char err_buf[100]; 286 287 if (!api->ea_packets_out) 288 { 289 LSQ_ERROR("packets_out callback is not specified"); 290 return NULL; 291 } 292 293 if (api->ea_settings && 294 0 != lsquic_engine_check_settings(api->ea_settings, flags, 295 err_buf, sizeof(err_buf))) 296 { 297 LSQ_ERROR("cannot create engine: %s", err_buf); 298 return NULL; 299 } 300 301 engine = calloc(1, sizeof(*engine)); 302 if (!engine) 303 return NULL; 304 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 305 { 306 free(engine); 307 return NULL; 308 } 309 if (api->ea_settings) 310 engine->pub.enp_settings = *api->ea_settings; 311 else 312 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 313 tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf, 314 sizeof(engine->pub.enp_ver_tags_buf), 315 engine->pub.enp_settings.es_versions); 316 if (tag_buf_len <= 0) 317 { 318 LSQ_ERROR("cannot generate version tags buffer"); 319 free(engine); 320 return NULL; 321 } 322 engine->pub.enp_ver_tags_len = tag_buf_len; 323 engine->pub.enp_flags = ENPUB_CAN_SEND; 324 325 engine->flags = flags; 326 engine->stream_if = api->ea_stream_if; 327 engine->stream_if_ctx = api->ea_stream_if_ctx; 328 engine->packets_out = api->ea_packets_out; 329 engine->packets_out_ctx = api->ea_packets_out_ctx; 330 if (api->ea_hsi_if) 331 { 332 engine->pub.enp_hsi_if = api->ea_hsi_if; 333 engine->pub.enp_hsi_ctx = api->ea_hsi_ctx; 334 } 335 else 336 { 337 engine->pub.enp_hsi_if = lsquic_http1x_if; 338 engine->pub.enp_hsi_ctx = NULL; 339 } 340 if (api->ea_pmi) 341 { 342 engine->pub.enp_pmi = api->ea_pmi; 343 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 344 } 345 else 346 { 347 engine->pub.enp_pmi = &stock_pmi; 348 engine->pub.enp_pmi_ctx = NULL; 349 } 350 engine->pub.enp_verify_cert = api->ea_verify_cert; 351 engine->pub.enp_verify_ctx = api->ea_verify_ctx; 352 engine->pub.enp_engine = engine; 353 conn_hash_init(&engine->conns_hash, 354 hash_conns_by_addr(engine) ? CHF_USE_ADDR : 0); 355 engine->attq = attq_create(); 356 eng_hist_init(&engine->history); 357 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 358 359 360 LSQ_INFO("instantiated engine"); 361 return engine; 362} 363 364 365static void 366grow_batch_size (struct lsquic_engine *engine) 367{ 368 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 369} 370 371 372static void 373shrink_batch_size (struct lsquic_engine *engine) 374{ 375 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 376} 377 378 379/* Wrapper to make sure important things occur before the connection is 380 * really destroyed. 381 */ 382static void 383destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 384{ 385 --engine->n_conns; 386 conn->cn_flags |= LSCONN_NEVER_TICKABLE; 387 conn->cn_if->ci_destroy(conn); 388} 389 390 391static int 392maybe_grow_conn_heaps (struct lsquic_engine *engine) 393{ 394 struct min_heap_elem *els; 395 unsigned count; 396 397 if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable)) 398 return 0; /* Nothing to do */ 399 400 if (lsquic_mh_nalloc(&engine->conns_tickable)) 401 count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2; 402 else 403 count = 8; 404 405 els = malloc(sizeof(els[0]) * count); 406 if (!els) 407 { 408 LSQ_ERROR("%s: malloc failed", __func__); 409 return -1; 410 } 411 412 LSQ_DEBUG("grew heaps to %u elements", count / 2); 413 memcpy(&els[0], engine->conns_tickable.mh_elems, 414 sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable)); 415 memcpy(&els[count / 2], engine->conns_out.mh_elems, 416 sizeof(els[0]) * lsquic_mh_count(&engine->conns_out)); 417 free(engine->conns_tickable.mh_elems); 418 engine->conns_tickable.mh_elems = els; 419 engine->conns_out.mh_elems = &els[count / 2]; 420 engine->conns_tickable.mh_nalloc = count / 2; 421 engine->conns_out.mh_nalloc = count / 2; 422 return 0; 423} 424 425 426static lsquic_conn_t * 427new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 428 unsigned short max_packet_size) 429{ 430 lsquic_conn_t *conn; 431 unsigned flags; 432 if (0 != maybe_grow_conn_heaps(engine)) 433 return NULL; 434 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 435 conn = full_conn_client_new(&engine->pub, engine->stream_if, 436 engine->stream_if_ctx, flags, hostname, max_packet_size); 437 if (!conn) 438 return NULL; 439 ++engine->n_conns; 440 return conn; 441} 442 443 444static lsquic_conn_t * 445find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 446 struct packin_parse_state *ppstate, const struct sockaddr *sa_local) 447{ 448 lsquic_conn_t *conn; 449 450 if (conn_hash_using_addr(&engine->conns_hash)) 451 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 452 else if (packet_in->pi_flags & PI_CONN_ID) 453 conn = conn_hash_find_by_cid(&engine->conns_hash, 454 packet_in->pi_conn_id); 455 else 456 { 457 LSQ_DEBUG("packet header does not have connection ID: discarding"); 458 return NULL; 459 } 460 461 if (!conn) 462 return NULL; 463 464 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 465 if ((packet_in->pi_flags & PI_CONN_ID) 466 && conn->cn_cid != packet_in->pi_conn_id) 467 { 468 LSQ_DEBUG("connection IDs do not match"); 469 return NULL; 470 } 471 472 return conn; 473} 474 475 476#if !defined(NDEBUG) && __GNUC__ 477__attribute__((weak)) 478#endif 479void 480lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub, 481 lsquic_conn_t *conn) 482{ 483 if (0 == (enpub->enp_flags & ENPUB_PROC) && 484 0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE))) 485 { 486 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 487 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 488 engine_incref_conn(conn, LSCONN_TICKABLE); 489 } 490} 491 492 493void 494lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 495 lsquic_conn_t *conn, lsquic_time_t tick_time) 496{ 497 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 498 if (conn->cn_flags & LSCONN_TICKABLE) 499 { 500 /* Optimization: no need to add the connection to the Advisory Tick 501 * Time Queue: it is about to be ticked, after which it its next tick 502 * time may be queried again. 503 */; 504 } 505 else if (conn->cn_flags & LSCONN_ATTQ) 506 { 507 if (lsquic_conn_adv_time(conn) != tick_time) 508 { 509 attq_remove(engine->attq, conn); 510 if (0 != attq_add(engine->attq, conn, tick_time)) 511 engine_decref_conn(engine, conn, LSCONN_ATTQ); 512 } 513 } 514 else if (0 == attq_add(engine->attq, conn, tick_time)) 515 engine_incref_conn(conn, LSCONN_ATTQ); 516} 517 518 519/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 520static int 521process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 522 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 523 const struct sockaddr *sa_peer, void *peer_ctx) 524{ 525 lsquic_conn_t *conn; 526 527 if (lsquic_packet_in_is_gquic_prst(packet_in) 528 && !engine->pub.enp_settings.es_honor_prst) 529 { 530 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 531 LSQ_DEBUG("public reset packet: discarding"); 532 return 1; 533 } 534 535 conn = find_conn(engine, packet_in, ppstate, sa_local); 536 537 if (!conn) 538 { 539 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 540 return 1; 541 } 542 543 if (0 == (conn->cn_flags & LSCONN_TICKABLE)) 544 { 545 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 546 engine_incref_conn(conn, LSCONN_TICKABLE); 547 } 548 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 549 lsquic_packet_in_upref(packet_in); 550 conn->cn_peer_ctx = peer_ctx; 551 conn->cn_if->ci_packet_in(conn, packet_in); 552 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 553 return 0; 554} 555 556 557void 558lsquic_engine_destroy (lsquic_engine_t *engine) 559{ 560 lsquic_conn_t *conn; 561 562 LSQ_DEBUG("destroying engine"); 563#ifndef NDEBUG 564 engine->flags |= ENG_DTOR; 565#endif 566 567 while ((conn = lsquic_mh_pop(&engine->conns_out))) 568 { 569 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 570 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 571 } 572 573 while ((conn = lsquic_mh_pop(&engine->conns_tickable))) 574 { 575 assert(conn->cn_flags & LSCONN_TICKABLE); 576 (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE); 577 } 578 579 for (conn = conn_hash_first(&engine->conns_hash); conn; 580 conn = conn_hash_next(&engine->conns_hash)) 581 force_close_conn(engine, conn); 582 conn_hash_cleanup(&engine->conns_hash); 583 584 assert(0 == engine->n_conns); 585 attq_destroy(engine->attq); 586 587 assert(0 == lsquic_mh_count(&engine->conns_out)); 588 assert(0 == lsquic_mh_count(&engine->conns_tickable)); 589 lsquic_mm_cleanup(&engine->pub.enp_mm); 590 free(engine->conns_tickable.mh_elems); 591 free(engine); 592} 593 594 595lsquic_conn_t * 596lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa, 597 const struct sockaddr *peer_sa, 598 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 599 const char *hostname, unsigned short max_packet_size) 600{ 601 lsquic_conn_t *conn; 602 ENGINE_IN(engine); 603 604 if (engine->flags & ENG_SERVER) 605 { 606 LSQ_ERROR("`%s' must only be called in client mode", __func__); 607 goto err; 608 } 609 610 if (conn_hash_using_addr(&engine->conns_hash) 611 && conn_hash_find_by_addr(&engine->conns_hash, local_sa)) 612 { 613 LSQ_ERROR("cannot have more than one connection on the same port"); 614 goto err; 615 } 616 617 if (0 == max_packet_size) 618 { 619 switch (peer_sa->sa_family) 620 { 621 case AF_INET: 622 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 623 break; 624 default: 625 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 626 break; 627 } 628 } 629 630 conn = new_full_conn_client(engine, hostname, max_packet_size); 631 if (!conn) 632 goto err; 633 lsquic_conn_record_sockaddr(conn, local_sa, peer_sa); 634 if (0 != conn_hash_add(&engine->conns_hash, conn)) 635 { 636 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 637 conn->cn_cid); 638 destroy_conn(engine, conn); 639 goto err; 640 } 641 assert(!(conn->cn_flags & 642 (CONN_REF_FLAGS 643 & ~LSCONN_TICKABLE /* This flag may be set as effect of user 644 callbacks */ 645 ))); 646 conn->cn_flags |= LSCONN_HASHED; 647 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 648 engine_incref_conn(conn, LSCONN_TICKABLE); 649 conn->cn_peer_ctx = peer_ctx; 650 lsquic_conn_set_ctx(conn, conn_ctx); 651 full_conn_client_call_on_new(conn); 652 end: 653 ENGINE_OUT(engine); 654 return conn; 655 err: 656 conn = NULL; 657 goto end; 658} 659 660 661static void 662remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 663{ 664 conn_hash_remove(&engine->conns_hash, conn); 665 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 666} 667 668 669static void 670refflags2str (enum lsquic_conn_flags flags, char s[6]) 671{ 672 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 673 *s = 'H'; s += !!(flags & LSCONN_HASHED); 674 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 675 *s = 'T'; s += !!(flags & LSCONN_TICKABLE); 676 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 677 *s = 'K'; s += !!(flags & LSCONN_TICKED); 678 *s = '\0'; 679} 680 681 682static void 683engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 684{ 685 char str[2][7]; 686 assert(flag & CONN_REF_FLAGS); 687 assert(!(conn->cn_flags & flag)); 688 conn->cn_flags |= flag; 689 LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 690 (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]), 691 (refflags2str(conn->cn_flags, str[1]), str[1])); 692} 693 694 695static lsquic_conn_t * 696engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 697 enum lsquic_conn_flags flags) 698{ 699 char str[2][7]; 700 assert(flags & CONN_REF_FLAGS); 701 assert(conn->cn_flags & flags); 702#ifndef NDEBUG 703 if (flags & LSCONN_CLOSING) 704 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 705#endif 706 conn->cn_flags &= ~flags; 707 LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 708 (refflags2str(conn->cn_flags | flags, str[0]), str[0]), 709 (refflags2str(conn->cn_flags, str[1]), str[1])); 710 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 711 { 712 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 713 destroy_conn(engine, conn); 714 return NULL; 715 } 716 else 717 return conn; 718} 719 720 721/* This is not a general-purpose function. Only call from engine dtor. */ 722static void 723force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 724{ 725 assert(engine->flags & ENG_DTOR); 726 const enum lsquic_conn_flags flags = conn->cn_flags; 727 assert(conn->cn_flags & CONN_REF_FLAGS); 728 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 729 assert(!(flags & LSCONN_TICKABLE)); /* Should be removed already */ 730 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 731 if (flags & LSCONN_ATTQ) 732 { 733 attq_remove(engine->attq, conn); 734 (void) engine_decref_conn(engine, conn, LSCONN_ATTQ); 735 } 736 if (flags & LSCONN_HASHED) 737 remove_conn_from_hash(engine, conn); 738} 739 740 741/* Iterator for tickable connections (those on the Tickable Queue). Before 742 * a connection is returned, it is removed from the Advisory Tick Time queue 743 * if necessary. 744 */ 745static lsquic_conn_t * 746conn_iter_next_tickable (struct lsquic_engine *engine) 747{ 748 lsquic_conn_t *conn; 749 750 conn = lsquic_mh_pop(&engine->conns_tickable); 751 752 if (conn) 753 conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE); 754 if (conn && (conn->cn_flags & LSCONN_ATTQ)) 755 { 756 attq_remove(engine->attq, conn); 757 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 758 } 759 760 return conn; 761} 762 763 764void 765lsquic_engine_process_conns (lsquic_engine_t *engine) 766{ 767 lsquic_conn_t *conn; 768 lsquic_time_t now; 769 770 ENGINE_IN(engine); 771 772 now = lsquic_time_now(); 773 while ((conn = attq_pop(engine->attq, now))) 774 { 775 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 776 if (conn && !(conn->cn_flags & LSCONN_TICKABLE)) 777 { 778 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 779 engine_incref_conn(conn, LSCONN_TICKABLE); 780 } 781 } 782 783 process_connections(engine, conn_iter_next_tickable, now); 784 ENGINE_OUT(engine); 785} 786 787 788static ssize_t 789really_encrypt_packet (const lsquic_conn_t *conn, 790 struct lsquic_packet_out *packet_out, 791 unsigned char *buf, size_t bufsz) 792{ 793 int header_sz, is_hello_packet; 794 enum enc_level enc_level; 795 size_t packet_sz; 796 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 797 798 header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out, 799 header_buf, sizeof(header_buf)); 800 if (header_sz < 0) 801 return -1; 802 803 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 804 enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session, 805 conn->cn_version, 0, 806 packet_out->po_packno, header_buf, header_sz, 807 packet_out->po_data, packet_out->po_data_sz, 808 buf, bufsz, &packet_sz, is_hello_packet); 809 if ((int) enc_level >= 0) 810 { 811 lsquic_packet_out_set_enc_level(packet_out, enc_level); 812 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, " 813 "ciphertext is %zd bytes", 814 packet_out->po_packno, 815 conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 816 packet_out->po_data_sz, 817 packet_sz); 818 return packet_sz; 819 } 820 else 821 return -1; 822} 823 824 825static int 826conn_peer_ipv6 (const struct lsquic_conn *conn) 827{ 828 return AF_INET6 == ((struct sockaddr *) conn->cn_peer_addr)->sa_family; 829} 830 831 832static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 833encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 834 lsquic_packet_out_t *packet_out) 835{ 836 ssize_t enc_sz; 837 size_t bufsz; 838 unsigned sent_sz; 839 unsigned char *buf; 840 int ipv6; 841 842 bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 843 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 844 if (bufsz > USHRT_MAX) 845 return ENCPA_BADCRYPT; /* To cause connection to close */ 846 ipv6 = conn_peer_ipv6(conn); 847 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, 848 conn->cn_peer_ctx, bufsz, ipv6); 849 if (!buf) 850 { 851 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 852 bufsz); 853 return ENCPA_NOMEM; 854 } 855 856 { 857 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 858 sent_sz = enc_sz; 859 } 860 861 if (enc_sz < 0) 862 { 863 engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx, 864 conn->cn_peer_ctx, buf, ipv6); 865 return ENCPA_BADCRYPT; 866 } 867 868 packet_out->po_enc_data = buf; 869 packet_out->po_enc_data_sz = enc_sz; 870 packet_out->po_sent_sz = sent_sz; 871 packet_out->po_flags &= ~PO_IPv6; 872 packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT); 873 874 return ENCPA_OK; 875} 876 877 878static void 879release_or_return_enc_data (struct lsquic_engine *engine, 880 void (*pmi_rel_or_ret) (void *, void *, void *, char), 881 struct lsquic_conn *conn, struct lsquic_packet_out *packet_out) 882{ 883 pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx, 884 packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out)); 885 packet_out->po_flags &= ~PO_ENCRYPTED; 886 packet_out->po_enc_data = NULL; 887} 888 889 890static void 891release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 892 struct lsquic_packet_out *packet_out) 893{ 894 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release, 895 conn, packet_out); 896} 897 898 899static void 900return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 901 struct lsquic_packet_out *packet_out) 902{ 903 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return, 904 conn, packet_out); 905} 906 907 908STAILQ_HEAD(conns_stailq, lsquic_conn); 909TAILQ_HEAD(conns_tailq, lsquic_conn); 910 911 912struct conns_out_iter 913{ 914 struct min_heap *coi_heap; 915 TAILQ_HEAD(, lsquic_conn) coi_active_list, 916 coi_inactive_list; 917 lsquic_conn_t *coi_next; 918#ifndef NDEBUG 919 lsquic_time_t coi_last_sent; 920#endif 921}; 922 923 924static void 925coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 926{ 927 iter->coi_heap = &engine->conns_out; 928 iter->coi_next = NULL; 929 TAILQ_INIT(&iter->coi_active_list); 930 TAILQ_INIT(&iter->coi_inactive_list); 931#ifndef NDEBUG 932 iter->coi_last_sent = 0; 933#endif 934} 935 936 937static lsquic_conn_t * 938coi_next (struct conns_out_iter *iter) 939{ 940 lsquic_conn_t *conn; 941 942 if (lsquic_mh_count(iter->coi_heap) > 0) 943 { 944 conn = lsquic_mh_pop(iter->coi_heap); 945 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 946 conn->cn_flags |= LSCONN_COI_ACTIVE; 947#ifndef NDEBUG 948 if (iter->coi_last_sent) 949 assert(iter->coi_last_sent <= conn->cn_last_sent); 950 iter->coi_last_sent = conn->cn_last_sent; 951#endif 952 return conn; 953 } 954 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 955 { 956 conn = iter->coi_next; 957 if (!conn) 958 conn = TAILQ_FIRST(&iter->coi_active_list); 959 if (conn) 960 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 961 return conn; 962 } 963 else 964 return NULL; 965} 966 967 968static void 969coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 970{ 971 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 972 { 973 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 974 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 975 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 976 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 977 conn->cn_flags |= LSCONN_COI_INACTIVE; 978 } 979} 980 981 982static void 983coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 984{ 985 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 986 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 987 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 988 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 989 conn->cn_flags |= LSCONN_COI_ACTIVE; 990} 991 992 993static void 994coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 995{ 996 lsquic_conn_t *conn; 997 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 998 { 999 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1000 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1001 lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent); 1002 } 1003 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1004 { 1005 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1006 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1007 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1008 } 1009} 1010 1011 1012static unsigned 1013send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1014 struct out_batch *batch, unsigned n_to_send) 1015{ 1016 int n_sent, i; 1017 lsquic_time_t now; 1018 1019 /* Set sent time before the write to avoid underestimating RTT */ 1020 now = lsquic_time_now(); 1021 for (i = 0; i < (int) n_to_send; ++i) 1022 batch->packets[i]->po_sent = now; 1023 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1024 n_to_send); 1025 if (n_sent < (int) n_to_send) 1026 { 1027 engine->pub.enp_flags &= ~ENPUB_CAN_SEND; 1028 LSQ_DEBUG("cannot send packets"); 1029 EV_LOG_GENERIC_EVENT("cannot send packets"); 1030 } 1031 if (n_sent >= 0) 1032 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1033 else 1034 { 1035 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1036 n_sent = 0; 1037 } 1038 if (n_sent > 0) 1039 engine->last_sent = now + n_sent; 1040 for (i = 0; i < n_sent; ++i) 1041 { 1042 eng_hist_inc(&engine->history, now, sl_packets_out); 1043 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1044 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1045 batch->packets[i]); 1046 /* `i' is added to maintain relative order */ 1047 batch->conns[i]->cn_last_sent = now + i; 1048 /* Release packet out buffer as soon as the packet is sent 1049 * successfully. If not successfully sent, we hold on to 1050 * this buffer until the packet sending is attempted again 1051 * or until it times out and regenerated. 1052 */ 1053 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1054 release_enc_data(engine, batch->conns[i], batch->packets[i]); 1055 } 1056 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1057 for ( ; i < (int) n_to_send; ++i) 1058 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1059 /* Return packets to the connection in reverse order so that the packet 1060 * ordering is maintained. 1061 */ 1062 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1063 { 1064 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1065 batch->packets[i]); 1066 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1067 coi_reactivate(conns_iter, batch->conns[i]); 1068 } 1069 return n_sent; 1070} 1071 1072 1073/* Return 1 if went past deadline, 0 otherwise */ 1074static int 1075check_deadline (lsquic_engine_t *engine) 1076{ 1077 if (engine->pub.enp_settings.es_proc_time_thresh && 1078 lsquic_time_now() > engine->deadline) 1079 { 1080 LSQ_INFO("went past threshold of %u usec, stop sending", 1081 engine->pub.enp_settings.es_proc_time_thresh); 1082 engine->flags |= ENG_PAST_DEADLINE; 1083 return 1; 1084 } 1085 else 1086 return 0; 1087} 1088 1089 1090static void 1091send_packets_out (struct lsquic_engine *engine, 1092 struct conns_tailq *ticked_conns, 1093 struct conns_stailq *closed_conns) 1094{ 1095 unsigned n, w, n_sent, n_batches_sent; 1096 lsquic_packet_out_t *packet_out; 1097 lsquic_conn_t *conn; 1098 struct out_batch *const batch = &engine->out_batch; 1099 struct conns_out_iter conns_iter; 1100 int shrink, deadline_exceeded; 1101 1102 coi_init(&conns_iter, engine); 1103 n_batches_sent = 0; 1104 n_sent = 0, n = 0; 1105 shrink = 0; 1106 deadline_exceeded = 0; 1107 1108 while ((conn = coi_next(&conns_iter))) 1109 { 1110 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1111 if (!packet_out) { 1112 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1113 conn->cn_cid); 1114 coi_deactivate(&conns_iter, conn); 1115 continue; 1116 } 1117 if ((packet_out->po_flags & PO_ENCRYPTED) 1118 && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn)) 1119 { 1120 /* Peer address changed since the packet was encrypted. Need to 1121 * reallocate. 1122 */ 1123 return_enc_data(engine, conn, packet_out); 1124 } 1125 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1126 { 1127 switch (encrypt_packet(engine, conn, packet_out)) 1128 { 1129 case ENCPA_NOMEM: 1130 /* Send what we have and wait for a more opportune moment */ 1131 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1132 goto end_for; 1133 case ENCPA_BADCRYPT: 1134 /* This is pretty bad: close connection immediately */ 1135 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1136 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1137 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1138 { 1139 if (!(conn->cn_flags & LSCONN_CLOSING)) 1140 { 1141 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1142 engine_incref_conn(conn, LSCONN_CLOSING); 1143 if (conn->cn_flags & LSCONN_HASHED) 1144 remove_conn_from_hash(engine, conn); 1145 } 1146 coi_deactivate(&conns_iter, conn); 1147 if (conn->cn_flags & LSCONN_TICKED) 1148 { 1149 TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked); 1150 engine_decref_conn(engine, conn, LSCONN_TICKED); 1151 } 1152 } 1153 continue; 1154 case ENCPA_OK: 1155 break; 1156 } 1157 } 1158 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1159 packet_out->po_packno, conn->cn_cid); 1160 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1161 if (packet_out->po_flags & PO_ENCRYPTED) 1162 { 1163 batch->outs[n].buf = packet_out->po_enc_data; 1164 batch->outs[n].sz = packet_out->po_enc_data_sz; 1165 } 1166 else 1167 { 1168 batch->outs[n].buf = packet_out->po_data; 1169 batch->outs[n].sz = packet_out->po_data_sz; 1170 } 1171 batch->outs [n].peer_ctx = conn->cn_peer_ctx; 1172 batch->outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1173 batch->outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1174 batch->conns [n] = conn; 1175 batch->packets[n] = packet_out; 1176 ++n; 1177 if (n == engine->batch_size) 1178 { 1179 n = 0; 1180 w = send_batch(engine, &conns_iter, batch, engine->batch_size); 1181 ++n_batches_sent; 1182 n_sent += w; 1183 if (w < engine->batch_size) 1184 { 1185 shrink = 1; 1186 break; 1187 } 1188 deadline_exceeded = check_deadline(engine); 1189 if (deadline_exceeded) 1190 break; 1191 grow_batch_size(engine); 1192 } 1193 } 1194 end_for: 1195 1196 if (n > 0) { 1197 w = send_batch(engine, &conns_iter, batch, n); 1198 n_sent += w; 1199 shrink = w < n; 1200 ++n_batches_sent; 1201 deadline_exceeded = check_deadline(engine); 1202 } 1203 1204 if (shrink) 1205 shrink_batch_size(engine); 1206 else if (n_batches_sent > 1 && !deadline_exceeded) 1207 grow_batch_size(engine); 1208 1209 coi_reheap(&conns_iter, engine); 1210 1211 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1212} 1213 1214 1215int 1216lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1217{ 1218 return lsquic_mh_count(&engine->conns_out) > 0 1219 ; 1220} 1221 1222 1223static void 1224reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1225{ 1226 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1227 engine->flags &= ~ENG_PAST_DEADLINE; 1228} 1229 1230 1231/* TODO: this is a user-facing function, account for load */ 1232void 1233lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1234{ 1235 lsquic_conn_t *conn; 1236 struct conns_stailq closed_conns; 1237 struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns); 1238 1239 STAILQ_INIT(&closed_conns); 1240 reset_deadline(engine, lsquic_time_now()); 1241 if (!(engine->pub.enp_flags & ENPUB_CAN_SEND)) 1242 { 1243 LSQ_DEBUG("can send again"); 1244 EV_LOG_GENERIC_EVENT("can send again"); 1245 engine->pub.enp_flags |= ENPUB_CAN_SEND; 1246 } 1247 1248 send_packets_out(engine, &ticked_conns, &closed_conns); 1249 1250 while ((conn = STAILQ_FIRST(&closed_conns))) { 1251 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1252 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1253 } 1254 1255} 1256 1257 1258static void 1259process_connections (lsquic_engine_t *engine, conn_iter_f next_conn, 1260 lsquic_time_t now) 1261{ 1262 lsquic_conn_t *conn; 1263 enum tick_st tick_st; 1264 unsigned i; 1265 lsquic_time_t next_tick_time; 1266 struct conns_stailq closed_conns; 1267 struct conns_tailq ticked_conns; 1268 1269 eng_hist_tick(&engine->history, now); 1270 1271 STAILQ_INIT(&closed_conns); 1272 TAILQ_INIT(&ticked_conns); 1273 reset_deadline(engine, now); 1274 1275 i = 0; 1276 while ((conn = next_conn(engine)) 1277 ) 1278 { 1279 tick_st = conn->cn_if->ci_tick(conn, now); 1280 conn->cn_last_ticked = now + i /* Maintain relative order */ ++; 1281 if (tick_st & TICK_SEND) 1282 { 1283 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1284 { 1285 lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent); 1286 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1287 } 1288 } 1289 if (tick_st & TICK_CLOSE) 1290 { 1291 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1292 engine_incref_conn(conn, LSCONN_CLOSING); 1293 if (conn->cn_flags & LSCONN_HASHED) 1294 remove_conn_from_hash(engine, conn); 1295 } 1296 else 1297 { 1298 TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked); 1299 engine_incref_conn(conn, LSCONN_TICKED); 1300 } 1301 } 1302 1303 if ((engine->pub.enp_flags & ENPUB_CAN_SEND) 1304 && lsquic_engine_has_unsent_packets(engine)) 1305 send_packets_out(engine, &ticked_conns, &closed_conns); 1306 1307 while ((conn = STAILQ_FIRST(&closed_conns))) { 1308 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1309 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1310 } 1311 1312 /* TODO Heapification can be optimized by switching to the Floyd method: 1313 * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap 1314 */ 1315 while ((conn = TAILQ_FIRST(&ticked_conns))) 1316 { 1317 TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked); 1318 engine_decref_conn(engine, conn, LSCONN_TICKED); 1319 if (!(conn->cn_flags & LSCONN_TICKABLE) 1320 && conn->cn_if->ci_is_tickable(conn)) 1321 { 1322 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 1323 engine_incref_conn(conn, LSCONN_TICKABLE); 1324 } 1325 else if (!(conn->cn_flags & LSCONN_ATTQ)) 1326 { 1327 next_tick_time = conn->cn_if->ci_next_tick_time(conn); 1328 if (next_tick_time) 1329 { 1330 if (0 == attq_add(engine->attq, conn, next_tick_time)) 1331 engine_incref_conn(conn, LSCONN_ATTQ); 1332 } 1333 else 1334 assert(0); 1335 } 1336 } 1337 1338} 1339 1340 1341/* Return 0 if packet is being processed by a real connection, 1 if the 1342 * packet was processed, but not by a connection, and -1 on error. 1343 */ 1344int 1345lsquic_engine_packet_in (lsquic_engine_t *engine, 1346 const unsigned char *packet_in_data, size_t packet_in_size, 1347 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1348 void *peer_ctx) 1349{ 1350 struct packin_parse_state ppstate; 1351 lsquic_packet_in_t *packet_in; 1352 int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length, 1353 int is_server, struct packin_parse_state *); 1354 1355 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1356 { 1357 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1358 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1359 errno = E2BIG; 1360 return -1; 1361 } 1362 1363 if (conn_hash_using_addr(&engine->conns_hash)) 1364 { 1365 const struct lsquic_conn *conn; 1366 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 1367 if (!conn) 1368 return -1; 1369 if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS) 1370 parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin; 1371 else 1372 parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin; 1373 } 1374 else 1375 parse_packet_in_begin = lsquic_parse_packet_in_begin; 1376 1377 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1378 if (!packet_in) 1379 return -1; 1380 1381 /* Library does not modify packet_in_data, it is not referenced after 1382 * this function returns and subsequent release of pi_data is guarded 1383 * by PI_OWN_DATA flag. 1384 */ 1385 packet_in->pi_data = (unsigned char *) packet_in_data; 1386 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1387 engine->flags & ENG_SERVER, &ppstate)) 1388 { 1389 LSQ_DEBUG("Cannot parse incoming packet's header"); 1390 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1391 errno = EINVAL; 1392 return -1; 1393 } 1394 1395 packet_in->pi_received = lsquic_time_now(); 1396 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1397 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1398 peer_ctx); 1399} 1400 1401 1402#if __GNUC__ && !defined(NDEBUG) 1403__attribute__((weak)) 1404#endif 1405unsigned 1406lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1407{ 1408 return engine->pub.enp_settings.es_versions; 1409} 1410 1411 1412int 1413lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1414{ 1415 const lsquic_time_t *next_time; 1416 lsquic_time_t now; 1417 1418 if (((engine->flags & ENG_PAST_DEADLINE) 1419 && lsquic_mh_count(&engine->conns_out)) 1420 || lsquic_mh_count(&engine->conns_tickable)) 1421 { 1422 *diff = 0; 1423 return 1; 1424 } 1425 1426 next_time = attq_next_time(engine->attq); 1427 if (!next_time) 1428 return 0; 1429 1430 now = lsquic_time_now(); 1431 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1432 return 1; 1433} 1434 1435 1436unsigned 1437lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1438{ 1439 lsquic_time_t now; 1440 now = lsquic_time_now(); 1441 if (from_now < 0) 1442 now -= from_now; 1443 else 1444 now += from_now; 1445 return attq_count_before(engine->attq, now); 1446} 1447 1448 1449