lsquic_engine.c revision 8252b0b9
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <limits.h> 10#include <stdint.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <string.h> 14#include <sys/queue.h> 15#include <time.h> 16#ifndef WIN32 17#include <sys/time.h> 18#include <netinet/in.h> 19#include <sys/types.h> 20#include <sys/stat.h> 21#include <fcntl.h> 22#include <unistd.h> 23#include <netdb.h> 24#endif 25 26 27 28#include "lsquic.h" 29#include "lsquic_types.h" 30#include "lsquic_alarmset.h" 31#include "lsquic_parse_common.h" 32#include "lsquic_parse.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_packet_out.h" 35#include "lsquic_senhist.h" 36#include "lsquic_rtt.h" 37#include "lsquic_cubic.h" 38#include "lsquic_pacer.h" 39#include "lsquic_send_ctl.h" 40#include "lsquic_set.h" 41#include "lsquic_conn_flow.h" 42#include "lsquic_sfcw.h" 43#include "lsquic_stream.h" 44#include "lsquic_conn.h" 45#include "lsquic_full_conn.h" 46#include "lsquic_util.h" 47#include "lsquic_qtags.h" 48#include "lsquic_str.h" 49#include "lsquic_handshake.h" 50#include "lsquic_mm.h" 51#include "lsquic_conn_hash.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_eng_hist.h" 54#include "lsquic_ev_log.h" 55#include "lsquic_version.h" 56#include "lsquic_hash.h" 57#include "lsquic_attq.h" 58#include "lsquic_min_heap.h" 59#include "lsquic_http1x_if.h" 60 61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 62#include "lsquic_logger.h" 63 64#define MIN(a, b) ((a) < (b) ? (a) : (b)) 65 66 67/* The batch of outgoing packets grows and shrinks dynamically */ 68#define MAX_OUT_BATCH_SIZE 1024 69#define MIN_OUT_BATCH_SIZE 256 70#define INITIAL_OUT_BATCH_SIZE 512 71 72struct out_batch 73{ 74 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 75 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 76 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 77}; 78 79typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 80 81static void 82process_connections (struct lsquic_engine *engine, conn_iter_f iter, 83 lsquic_time_t now); 84 85static void 86engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 87 88static lsquic_conn_t * 89engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 90 enum lsquic_conn_flags flag); 91 92static void 93force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 94 95/* Nested calls to LSQUIC are not supported */ 96#define ENGINE_IN(e) do { \ 97 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 98 (e)->pub.enp_flags |= ENPUB_PROC; \ 99} while (0) 100 101#define ENGINE_OUT(e) do { \ 102 assert((e)->pub.enp_flags & ENPUB_PROC); \ 103 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 104} while (0) 105 106/* A connection can be referenced from one of six places: 107 * 108 * 1. Connection hash: a connection starts its life in one of those. 109 * 110 * 2. Outgoing queue. 111 * 112 * 3. Tickable queue 113 * 114 * 4. Advisory Tick Time queue. 115 * 116 * 5. Closing connections queue. This is a transient queue -- it only 117 * exists for the duration of process_connections() function call. 118 * 119 * 6. Ticked connections queue. Another transient queue, similar to (5). 120 * 121 * The idea is to destroy the connection when it is no longer referenced. 122 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 123 * that case, the connection is referenced from two places: (2) and (5). 124 * After its packets are sent, it is only referenced in (5), and at the 125 * end of the function call, when it is removed from (5), reference count 126 * goes to zero and the connection is destroyed. If not all packets can 127 * be sent, at the end of the function call, the connection is referenced 128 * by (2) and will only be removed once all outgoing packets have been 129 * sent. 130 */ 131#define CONN_REF_FLAGS (LSCONN_HASHED \ 132 |LSCONN_HAS_OUTGOING \ 133 |LSCONN_TICKABLE \ 134 |LSCONN_TICKED \ 135 |LSCONN_CLOSING \ 136 |LSCONN_ATTQ) 137 138 139 140 141struct lsquic_engine 142{ 143 struct lsquic_engine_public pub; 144 enum { 145 ENG_SERVER = LSENG_SERVER, 146 ENG_HTTP = LSENG_HTTP, 147 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 148 ENG_PAST_DEADLINE 149 = (1 << 8), /* Previous call to a processing 150 * function went past time threshold. 151 */ 152#ifndef NDEBUG 153 ENG_DTOR = (1 << 26), /* Engine destructor */ 154#endif 155 } flags; 156 const struct lsquic_stream_if *stream_if; 157 void *stream_if_ctx; 158 lsquic_packets_out_f packets_out; 159 void *packets_out_ctx; 160 void *bad_handshake_ctx; 161 struct conn_hash conns_hash; 162 struct min_heap conns_tickable; 163 struct min_heap conns_out; 164 struct eng_hist history; 165 unsigned batch_size; 166 struct attq *attq; 167 /* Track time last time a packet was sent to give new connections 168 * priority lower than that of existing connections. 169 */ 170 lsquic_time_t last_sent; 171 unsigned n_conns; 172 lsquic_time_t deadline; 173 lsquic_time_t resume_sending_at; 174 struct out_batch out_batch; 175}; 176 177 178void 179lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 180 unsigned flags) 181{ 182 memset(settings, 0, sizeof(*settings)); 183 settings->es_versions = LSQUIC_DF_VERSIONS; 184 if (flags & ENG_SERVER) 185 { 186 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 187 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 188 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 189 } 190 else 191 { 192 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 193 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 194 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 195 } 196 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 197 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 198 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 199 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 200 settings->es_max_header_list_size 201 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 202 settings->es_ua = LSQUIC_DF_UA; 203 204 settings->es_pdmd = QTAG_X509; 205 settings->es_aead = QTAG_AESG; 206 settings->es_kexs = QTAG_C255; 207 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 208 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 209 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 210 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 211 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 212 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 213 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 214 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 215} 216 217 218/* Note: if returning an error, err_buf must be valid if non-NULL */ 219int 220lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 221 unsigned flags, 222 char *err_buf, size_t err_buf_sz) 223{ 224 if (settings->es_cfcw < LSQUIC_MIN_FCW || 225 settings->es_sfcw < LSQUIC_MIN_FCW) 226 { 227 if (err_buf) 228 snprintf(err_buf, err_buf_sz, "%s", 229 "flow control window set too low"); 230 return -1; 231 } 232 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 233 { 234 if (err_buf) 235 snprintf(err_buf, err_buf_sz, "%s", 236 "No supported QUIC versions specified"); 237 return -1; 238 } 239 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 240 { 241 if (err_buf) 242 snprintf(err_buf, err_buf_sz, "%s", 243 "one or more unsupported QUIC version is specified"); 244 return -1; 245 } 246 return 0; 247} 248 249 250static void 251free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6) 252{ 253 free(packet_data); 254} 255 256 257static void * 258malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6) 259{ 260 return malloc(size); 261} 262 263 264static const struct lsquic_packout_mem_if stock_pmi = 265{ 266 malloc_buf, free_packet, free_packet, 267}; 268 269 270static int 271hash_conns_by_addr (const struct lsquic_engine *engine) 272{ 273 if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS) 274 return 1; 275 if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS) 276 && engine->pub.enp_settings.es_support_tcid0) 277 return 1; 278 return 0; 279} 280 281 282lsquic_engine_t * 283lsquic_engine_new (unsigned flags, 284 const struct lsquic_engine_api *api) 285{ 286 lsquic_engine_t *engine; 287 int tag_buf_len; 288 char err_buf[100]; 289 290 if (!api->ea_packets_out) 291 { 292 LSQ_ERROR("packets_out callback is not specified"); 293 return NULL; 294 } 295 296 if (api->ea_settings && 297 0 != lsquic_engine_check_settings(api->ea_settings, flags, 298 err_buf, sizeof(err_buf))) 299 { 300 LSQ_ERROR("cannot create engine: %s", err_buf); 301 return NULL; 302 } 303 304 engine = calloc(1, sizeof(*engine)); 305 if (!engine) 306 return NULL; 307 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 308 { 309 free(engine); 310 return NULL; 311 } 312 if (api->ea_settings) 313 engine->pub.enp_settings = *api->ea_settings; 314 else 315 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 316 tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf, 317 sizeof(engine->pub.enp_ver_tags_buf), 318 engine->pub.enp_settings.es_versions); 319 if (tag_buf_len <= 0) 320 { 321 LSQ_ERROR("cannot generate version tags buffer"); 322 free(engine); 323 return NULL; 324 } 325 engine->pub.enp_ver_tags_len = tag_buf_len; 326 engine->pub.enp_flags = ENPUB_CAN_SEND; 327 328 engine->flags = flags; 329 engine->stream_if = api->ea_stream_if; 330 engine->stream_if_ctx = api->ea_stream_if_ctx; 331 engine->packets_out = api->ea_packets_out; 332 engine->packets_out_ctx = api->ea_packets_out_ctx; 333 if (api->ea_hsi_if) 334 { 335 engine->pub.enp_hsi_if = api->ea_hsi_if; 336 engine->pub.enp_hsi_ctx = api->ea_hsi_ctx; 337 } 338 else 339 { 340 engine->pub.enp_hsi_if = lsquic_http1x_if; 341 engine->pub.enp_hsi_ctx = NULL; 342 } 343 if (api->ea_pmi) 344 { 345 engine->pub.enp_pmi = api->ea_pmi; 346 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 347 } 348 else 349 { 350 engine->pub.enp_pmi = &stock_pmi; 351 engine->pub.enp_pmi_ctx = NULL; 352 } 353 engine->pub.enp_verify_cert = api->ea_verify_cert; 354 engine->pub.enp_verify_ctx = api->ea_verify_ctx; 355 engine->pub.enp_engine = engine; 356 conn_hash_init(&engine->conns_hash, 357 hash_conns_by_addr(engine) ? CHF_USE_ADDR : 0); 358 engine->attq = attq_create(); 359 eng_hist_init(&engine->history); 360 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 361 362 363 LSQ_INFO("instantiated engine"); 364 return engine; 365} 366 367 368static void 369grow_batch_size (struct lsquic_engine *engine) 370{ 371 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 372} 373 374 375static void 376shrink_batch_size (struct lsquic_engine *engine) 377{ 378 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 379} 380 381 382/* Wrapper to make sure important things occur before the connection is 383 * really destroyed. 384 */ 385static void 386destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 387{ 388 --engine->n_conns; 389 conn->cn_flags |= LSCONN_NEVER_TICKABLE; 390 conn->cn_if->ci_destroy(conn); 391} 392 393 394static int 395maybe_grow_conn_heaps (struct lsquic_engine *engine) 396{ 397 struct min_heap_elem *els; 398 unsigned count; 399 400 if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable)) 401 return 0; /* Nothing to do */ 402 403 if (lsquic_mh_nalloc(&engine->conns_tickable)) 404 count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2; 405 else 406 count = 8; 407 408 els = malloc(sizeof(els[0]) * count); 409 if (!els) 410 { 411 LSQ_ERROR("%s: malloc failed", __func__); 412 return -1; 413 } 414 415 LSQ_DEBUG("grew heaps to %u elements", count / 2); 416 memcpy(&els[0], engine->conns_tickable.mh_elems, 417 sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable)); 418 memcpy(&els[count / 2], engine->conns_out.mh_elems, 419 sizeof(els[0]) * lsquic_mh_count(&engine->conns_out)); 420 free(engine->conns_tickable.mh_elems); 421 engine->conns_tickable.mh_elems = els; 422 engine->conns_out.mh_elems = &els[count / 2]; 423 engine->conns_tickable.mh_nalloc = count / 2; 424 engine->conns_out.mh_nalloc = count / 2; 425 return 0; 426} 427 428 429static lsquic_conn_t * 430new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 431 unsigned short max_packet_size) 432{ 433 lsquic_conn_t *conn; 434 unsigned flags; 435 if (0 != maybe_grow_conn_heaps(engine)) 436 return NULL; 437 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 438 conn = full_conn_client_new(&engine->pub, engine->stream_if, 439 engine->stream_if_ctx, flags, hostname, max_packet_size); 440 if (!conn) 441 return NULL; 442 ++engine->n_conns; 443 return conn; 444} 445 446 447static lsquic_conn_t * 448find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 449 struct packin_parse_state *ppstate, const struct sockaddr *sa_local) 450{ 451 lsquic_conn_t *conn; 452 453 if (conn_hash_using_addr(&engine->conns_hash)) 454 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 455 else if (packet_in->pi_flags & PI_CONN_ID) 456 conn = conn_hash_find_by_cid(&engine->conns_hash, 457 packet_in->pi_conn_id); 458 else 459 { 460 LSQ_DEBUG("packet header does not have connection ID: discarding"); 461 return NULL; 462 } 463 464 if (!conn) 465 return NULL; 466 467 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 468 if ((packet_in->pi_flags & PI_CONN_ID) 469 && conn->cn_cid != packet_in->pi_conn_id) 470 { 471 LSQ_DEBUG("connection IDs do not match"); 472 return NULL; 473 } 474 475 return conn; 476} 477 478 479#if !defined(NDEBUG) && __GNUC__ 480__attribute__((weak)) 481#endif 482void 483lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub, 484 lsquic_conn_t *conn) 485{ 486 if (0 == (enpub->enp_flags & ENPUB_PROC) && 487 0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE))) 488 { 489 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 490 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 491 engine_incref_conn(conn, LSCONN_TICKABLE); 492 } 493} 494 495 496void 497lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 498 lsquic_conn_t *conn, lsquic_time_t tick_time) 499{ 500 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 501 if (conn->cn_flags & LSCONN_TICKABLE) 502 { 503 /* Optimization: no need to add the connection to the Advisory Tick 504 * Time Queue: it is about to be ticked, after which it its next tick 505 * time may be queried again. 506 */; 507 } 508 else if (conn->cn_flags & LSCONN_ATTQ) 509 { 510 if (lsquic_conn_adv_time(conn) != tick_time) 511 { 512 attq_remove(engine->attq, conn); 513 if (0 != attq_add(engine->attq, conn, tick_time)) 514 engine_decref_conn(engine, conn, LSCONN_ATTQ); 515 } 516 } 517 else if (0 == attq_add(engine->attq, conn, tick_time)) 518 engine_incref_conn(conn, LSCONN_ATTQ); 519} 520 521 522/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 523static int 524process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 525 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 526 const struct sockaddr *sa_peer, void *peer_ctx) 527{ 528 lsquic_conn_t *conn; 529 530 if (lsquic_packet_in_is_gquic_prst(packet_in) 531 && !engine->pub.enp_settings.es_honor_prst) 532 { 533 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 534 LSQ_DEBUG("public reset packet: discarding"); 535 return 1; 536 } 537 538 conn = find_conn(engine, packet_in, ppstate, sa_local); 539 540 if (!conn) 541 { 542 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 543 return 1; 544 } 545 546 if (0 == (conn->cn_flags & LSCONN_TICKABLE)) 547 { 548 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 549 engine_incref_conn(conn, LSCONN_TICKABLE); 550 } 551 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 552 lsquic_packet_in_upref(packet_in); 553 conn->cn_peer_ctx = peer_ctx; 554 conn->cn_if->ci_packet_in(conn, packet_in); 555 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 556 return 0; 557} 558 559 560void 561lsquic_engine_destroy (lsquic_engine_t *engine) 562{ 563 lsquic_conn_t *conn; 564 565 LSQ_DEBUG("destroying engine"); 566#ifndef NDEBUG 567 engine->flags |= ENG_DTOR; 568#endif 569 570 while ((conn = lsquic_mh_pop(&engine->conns_out))) 571 { 572 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 573 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 574 } 575 576 while ((conn = lsquic_mh_pop(&engine->conns_tickable))) 577 { 578 assert(conn->cn_flags & LSCONN_TICKABLE); 579 (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE); 580 } 581 582 for (conn = conn_hash_first(&engine->conns_hash); conn; 583 conn = conn_hash_next(&engine->conns_hash)) 584 force_close_conn(engine, conn); 585 conn_hash_cleanup(&engine->conns_hash); 586 587 assert(0 == engine->n_conns); 588 attq_destroy(engine->attq); 589 590 assert(0 == lsquic_mh_count(&engine->conns_out)); 591 assert(0 == lsquic_mh_count(&engine->conns_tickable)); 592 lsquic_mm_cleanup(&engine->pub.enp_mm); 593 free(engine->conns_tickable.mh_elems); 594 free(engine); 595} 596 597 598lsquic_conn_t * 599lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa, 600 const struct sockaddr *peer_sa, 601 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 602 const char *hostname, unsigned short max_packet_size) 603{ 604 lsquic_conn_t *conn; 605 ENGINE_IN(engine); 606 607 if (engine->flags & ENG_SERVER) 608 { 609 LSQ_ERROR("`%s' must only be called in client mode", __func__); 610 goto err; 611 } 612 613 if (conn_hash_using_addr(&engine->conns_hash) 614 && conn_hash_find_by_addr(&engine->conns_hash, local_sa)) 615 { 616 LSQ_ERROR("cannot have more than one connection on the same port"); 617 goto err; 618 } 619 620 if (0 == max_packet_size) 621 { 622 switch (peer_sa->sa_family) 623 { 624 case AF_INET: 625 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 626 break; 627 default: 628 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 629 break; 630 } 631 } 632 633 conn = new_full_conn_client(engine, hostname, max_packet_size); 634 if (!conn) 635 goto err; 636 lsquic_conn_record_sockaddr(conn, local_sa, peer_sa); 637 if (0 != conn_hash_add(&engine->conns_hash, conn)) 638 { 639 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 640 conn->cn_cid); 641 destroy_conn(engine, conn); 642 goto err; 643 } 644 assert(!(conn->cn_flags & 645 (CONN_REF_FLAGS 646 & ~LSCONN_TICKABLE /* This flag may be set as effect of user 647 callbacks */ 648 ))); 649 conn->cn_flags |= LSCONN_HASHED; 650 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 651 engine_incref_conn(conn, LSCONN_TICKABLE); 652 conn->cn_peer_ctx = peer_ctx; 653 lsquic_conn_set_ctx(conn, conn_ctx); 654 full_conn_client_call_on_new(conn); 655 end: 656 ENGINE_OUT(engine); 657 return conn; 658 err: 659 conn = NULL; 660 goto end; 661} 662 663 664static void 665remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 666{ 667 conn_hash_remove(&engine->conns_hash, conn); 668 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 669} 670 671 672static void 673refflags2str (enum lsquic_conn_flags flags, char s[6]) 674{ 675 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 676 *s = 'H'; s += !!(flags & LSCONN_HASHED); 677 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 678 *s = 'T'; s += !!(flags & LSCONN_TICKABLE); 679 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 680 *s = 'K'; s += !!(flags & LSCONN_TICKED); 681 *s = '\0'; 682} 683 684 685static void 686engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 687{ 688 char str[2][7]; 689 assert(flag & CONN_REF_FLAGS); 690 assert(!(conn->cn_flags & flag)); 691 conn->cn_flags |= flag; 692 LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 693 (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]), 694 (refflags2str(conn->cn_flags, str[1]), str[1])); 695} 696 697 698static lsquic_conn_t * 699engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 700 enum lsquic_conn_flags flags) 701{ 702 char str[2][7]; 703 assert(flags & CONN_REF_FLAGS); 704 assert(conn->cn_flags & flags); 705#ifndef NDEBUG 706 if (flags & LSCONN_CLOSING) 707 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 708#endif 709 conn->cn_flags &= ~flags; 710 LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 711 (refflags2str(conn->cn_flags | flags, str[0]), str[0]), 712 (refflags2str(conn->cn_flags, str[1]), str[1])); 713 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 714 { 715 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 716 destroy_conn(engine, conn); 717 return NULL; 718 } 719 else 720 return conn; 721} 722 723 724/* This is not a general-purpose function. Only call from engine dtor. */ 725static void 726force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 727{ 728 assert(engine->flags & ENG_DTOR); 729 const enum lsquic_conn_flags flags = conn->cn_flags; 730 assert(conn->cn_flags & CONN_REF_FLAGS); 731 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 732 assert(!(flags & LSCONN_TICKABLE)); /* Should be removed already */ 733 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 734 if (flags & LSCONN_ATTQ) 735 { 736 attq_remove(engine->attq, conn); 737 (void) engine_decref_conn(engine, conn, LSCONN_ATTQ); 738 } 739 if (flags & LSCONN_HASHED) 740 remove_conn_from_hash(engine, conn); 741} 742 743 744/* Iterator for tickable connections (those on the Tickable Queue). Before 745 * a connection is returned, it is removed from the Advisory Tick Time queue 746 * if necessary. 747 */ 748static lsquic_conn_t * 749conn_iter_next_tickable (struct lsquic_engine *engine) 750{ 751 lsquic_conn_t *conn; 752 753 conn = lsquic_mh_pop(&engine->conns_tickable); 754 755 if (conn) 756 conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE); 757 if (conn && (conn->cn_flags & LSCONN_ATTQ)) 758 { 759 attq_remove(engine->attq, conn); 760 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 761 } 762 763 return conn; 764} 765 766 767void 768lsquic_engine_process_conns (lsquic_engine_t *engine) 769{ 770 lsquic_conn_t *conn; 771 lsquic_time_t now; 772 773 ENGINE_IN(engine); 774 775 now = lsquic_time_now(); 776 while ((conn = attq_pop(engine->attq, now))) 777 { 778 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 779 if (conn && !(conn->cn_flags & LSCONN_TICKABLE)) 780 { 781 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 782 engine_incref_conn(conn, LSCONN_TICKABLE); 783 } 784 } 785 786 process_connections(engine, conn_iter_next_tickable, now); 787 ENGINE_OUT(engine); 788} 789 790 791static ssize_t 792really_encrypt_packet (const lsquic_conn_t *conn, 793 struct lsquic_packet_out *packet_out, 794 unsigned char *buf, size_t bufsz) 795{ 796 int header_sz, is_hello_packet; 797 enum enc_level enc_level; 798 size_t packet_sz; 799 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 800 801 header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out, 802 header_buf, sizeof(header_buf)); 803 if (header_sz < 0) 804 return -1; 805 806 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 807 enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session, 808 conn->cn_version, 0, 809 packet_out->po_packno, header_buf, header_sz, 810 packet_out->po_data, packet_out->po_data_sz, 811 buf, bufsz, &packet_sz, is_hello_packet); 812 if ((int) enc_level >= 0) 813 { 814 lsquic_packet_out_set_enc_level(packet_out, enc_level); 815 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, " 816 "ciphertext is %zd bytes", 817 packet_out->po_packno, 818 conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 819 packet_out->po_data_sz, 820 packet_sz); 821 return packet_sz; 822 } 823 else 824 return -1; 825} 826 827 828static int 829conn_peer_ipv6 (const struct lsquic_conn *conn) 830{ 831 return AF_INET6 == ((struct sockaddr *) conn->cn_peer_addr)->sa_family; 832} 833 834 835static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 836encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 837 lsquic_packet_out_t *packet_out) 838{ 839 ssize_t enc_sz; 840 size_t bufsz; 841 unsigned sent_sz; 842 unsigned char *buf; 843 int ipv6; 844 845 bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 846 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 847 if (bufsz > USHRT_MAX) 848 return ENCPA_BADCRYPT; /* To cause connection to close */ 849 ipv6 = conn_peer_ipv6(conn); 850 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, 851 conn->cn_peer_ctx, bufsz, ipv6); 852 if (!buf) 853 { 854 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 855 bufsz); 856 return ENCPA_NOMEM; 857 } 858 859 { 860 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 861 sent_sz = enc_sz; 862 } 863 864 if (enc_sz < 0) 865 { 866 engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx, 867 conn->cn_peer_ctx, buf, ipv6); 868 return ENCPA_BADCRYPT; 869 } 870 871 packet_out->po_enc_data = buf; 872 packet_out->po_enc_data_sz = enc_sz; 873 packet_out->po_sent_sz = sent_sz; 874 packet_out->po_flags &= ~PO_IPv6; 875 packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT); 876 877 return ENCPA_OK; 878} 879 880 881static void 882release_or_return_enc_data (struct lsquic_engine *engine, 883 void (*pmi_rel_or_ret) (void *, void *, void *, char), 884 struct lsquic_conn *conn, struct lsquic_packet_out *packet_out) 885{ 886 pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx, 887 packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out)); 888 packet_out->po_flags &= ~PO_ENCRYPTED; 889 packet_out->po_enc_data = NULL; 890} 891 892 893static void 894release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 895 struct lsquic_packet_out *packet_out) 896{ 897 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release, 898 conn, packet_out); 899} 900 901 902static void 903return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 904 struct lsquic_packet_out *packet_out) 905{ 906 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return, 907 conn, packet_out); 908} 909 910 911STAILQ_HEAD(conns_stailq, lsquic_conn); 912TAILQ_HEAD(conns_tailq, lsquic_conn); 913 914 915struct conns_out_iter 916{ 917 struct min_heap *coi_heap; 918 TAILQ_HEAD(, lsquic_conn) coi_active_list, 919 coi_inactive_list; 920 lsquic_conn_t *coi_next; 921#ifndef NDEBUG 922 lsquic_time_t coi_last_sent; 923#endif 924}; 925 926 927static void 928coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 929{ 930 iter->coi_heap = &engine->conns_out; 931 iter->coi_next = NULL; 932 TAILQ_INIT(&iter->coi_active_list); 933 TAILQ_INIT(&iter->coi_inactive_list); 934#ifndef NDEBUG 935 iter->coi_last_sent = 0; 936#endif 937} 938 939 940static lsquic_conn_t * 941coi_next (struct conns_out_iter *iter) 942{ 943 lsquic_conn_t *conn; 944 945 if (lsquic_mh_count(iter->coi_heap) > 0) 946 { 947 conn = lsquic_mh_pop(iter->coi_heap); 948 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 949 conn->cn_flags |= LSCONN_COI_ACTIVE; 950#ifndef NDEBUG 951 if (iter->coi_last_sent) 952 assert(iter->coi_last_sent <= conn->cn_last_sent); 953 iter->coi_last_sent = conn->cn_last_sent; 954#endif 955 return conn; 956 } 957 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 958 { 959 conn = iter->coi_next; 960 if (!conn) 961 conn = TAILQ_FIRST(&iter->coi_active_list); 962 if (conn) 963 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 964 return conn; 965 } 966 else 967 return NULL; 968} 969 970 971static void 972coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 973{ 974 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 975 { 976 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 977 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 978 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 979 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 980 conn->cn_flags |= LSCONN_COI_INACTIVE; 981 } 982} 983 984 985static void 986coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 987{ 988 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 989 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 990 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 991 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 992 conn->cn_flags |= LSCONN_COI_ACTIVE; 993} 994 995 996static void 997coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 998{ 999 lsquic_conn_t *conn; 1000 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1001 { 1002 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1003 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1004 lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent); 1005 } 1006 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1007 { 1008 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1009 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1010 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1011 } 1012} 1013 1014 1015static unsigned 1016send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1017 struct out_batch *batch, unsigned n_to_send) 1018{ 1019 int n_sent, i; 1020 lsquic_time_t now; 1021 1022 /* Set sent time before the write to avoid underestimating RTT */ 1023 now = lsquic_time_now(); 1024 for (i = 0; i < (int) n_to_send; ++i) 1025 batch->packets[i]->po_sent = now; 1026 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1027 n_to_send); 1028 if (n_sent < (int) n_to_send) 1029 { 1030 engine->pub.enp_flags &= ~ENPUB_CAN_SEND; 1031 engine->resume_sending_at = now + 1000000; 1032 LSQ_DEBUG("cannot send packets"); 1033 EV_LOG_GENERIC_EVENT("cannot send packets"); 1034 } 1035 if (n_sent >= 0) 1036 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1037 else 1038 { 1039 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1040 n_sent = 0; 1041 } 1042 if (n_sent > 0) 1043 engine->last_sent = now + n_sent; 1044 for (i = 0; i < n_sent; ++i) 1045 { 1046 eng_hist_inc(&engine->history, now, sl_packets_out); 1047 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1048 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1049 batch->packets[i]); 1050 /* `i' is added to maintain relative order */ 1051 batch->conns[i]->cn_last_sent = now + i; 1052 /* Release packet out buffer as soon as the packet is sent 1053 * successfully. If not successfully sent, we hold on to 1054 * this buffer until the packet sending is attempted again 1055 * or until it times out and regenerated. 1056 */ 1057 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1058 release_enc_data(engine, batch->conns[i], batch->packets[i]); 1059 } 1060 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1061 for ( ; i < (int) n_to_send; ++i) 1062 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1063 /* Return packets to the connection in reverse order so that the packet 1064 * ordering is maintained. 1065 */ 1066 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1067 { 1068 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1069 batch->packets[i]); 1070 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1071 coi_reactivate(conns_iter, batch->conns[i]); 1072 } 1073 return n_sent; 1074} 1075 1076 1077/* Return 1 if went past deadline, 0 otherwise */ 1078static int 1079check_deadline (lsquic_engine_t *engine) 1080{ 1081 if (engine->pub.enp_settings.es_proc_time_thresh && 1082 lsquic_time_now() > engine->deadline) 1083 { 1084 LSQ_INFO("went past threshold of %u usec, stop sending", 1085 engine->pub.enp_settings.es_proc_time_thresh); 1086 engine->flags |= ENG_PAST_DEADLINE; 1087 return 1; 1088 } 1089 else 1090 return 0; 1091} 1092 1093 1094static void 1095send_packets_out (struct lsquic_engine *engine, 1096 struct conns_tailq *ticked_conns, 1097 struct conns_stailq *closed_conns) 1098{ 1099 unsigned n, w, n_sent, n_batches_sent; 1100 lsquic_packet_out_t *packet_out; 1101 lsquic_conn_t *conn; 1102 struct out_batch *const batch = &engine->out_batch; 1103 struct conns_out_iter conns_iter; 1104 int shrink, deadline_exceeded; 1105 1106 coi_init(&conns_iter, engine); 1107 n_batches_sent = 0; 1108 n_sent = 0, n = 0; 1109 shrink = 0; 1110 deadline_exceeded = 0; 1111 1112 while ((conn = coi_next(&conns_iter))) 1113 { 1114 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1115 if (!packet_out) { 1116 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1117 conn->cn_cid); 1118 coi_deactivate(&conns_iter, conn); 1119 continue; 1120 } 1121 if ((packet_out->po_flags & PO_ENCRYPTED) 1122 && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn)) 1123 { 1124 /* Peer address changed since the packet was encrypted. Need to 1125 * reallocate. 1126 */ 1127 return_enc_data(engine, conn, packet_out); 1128 } 1129 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1130 { 1131 switch (encrypt_packet(engine, conn, packet_out)) 1132 { 1133 case ENCPA_NOMEM: 1134 /* Send what we have and wait for a more opportune moment */ 1135 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1136 goto end_for; 1137 case ENCPA_BADCRYPT: 1138 /* This is pretty bad: close connection immediately */ 1139 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1140 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1141 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1142 { 1143 if (!(conn->cn_flags & LSCONN_CLOSING)) 1144 { 1145 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1146 engine_incref_conn(conn, LSCONN_CLOSING); 1147 if (conn->cn_flags & LSCONN_HASHED) 1148 remove_conn_from_hash(engine, conn); 1149 } 1150 coi_deactivate(&conns_iter, conn); 1151 if (conn->cn_flags & LSCONN_TICKED) 1152 { 1153 TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked); 1154 engine_decref_conn(engine, conn, LSCONN_TICKED); 1155 } 1156 } 1157 continue; 1158 case ENCPA_OK: 1159 break; 1160 } 1161 } 1162 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1163 packet_out->po_packno, conn->cn_cid); 1164 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1165 if (packet_out->po_flags & PO_ENCRYPTED) 1166 { 1167 batch->outs[n].buf = packet_out->po_enc_data; 1168 batch->outs[n].sz = packet_out->po_enc_data_sz; 1169 } 1170 else 1171 { 1172 batch->outs[n].buf = packet_out->po_data; 1173 batch->outs[n].sz = packet_out->po_data_sz; 1174 } 1175 batch->outs [n].peer_ctx = conn->cn_peer_ctx; 1176 batch->outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1177 batch->outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1178 batch->conns [n] = conn; 1179 batch->packets[n] = packet_out; 1180 ++n; 1181 if (n == engine->batch_size) 1182 { 1183 n = 0; 1184 w = send_batch(engine, &conns_iter, batch, engine->batch_size); 1185 ++n_batches_sent; 1186 n_sent += w; 1187 if (w < engine->batch_size) 1188 { 1189 shrink = 1; 1190 break; 1191 } 1192 deadline_exceeded = check_deadline(engine); 1193 if (deadline_exceeded) 1194 break; 1195 grow_batch_size(engine); 1196 } 1197 } 1198 end_for: 1199 1200 if (n > 0) { 1201 w = send_batch(engine, &conns_iter, batch, n); 1202 n_sent += w; 1203 shrink = w < n; 1204 ++n_batches_sent; 1205 deadline_exceeded = check_deadline(engine); 1206 } 1207 1208 if (shrink) 1209 shrink_batch_size(engine); 1210 else if (n_batches_sent > 1 && !deadline_exceeded) 1211 grow_batch_size(engine); 1212 1213 coi_reheap(&conns_iter, engine); 1214 1215 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1216} 1217 1218 1219int 1220lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1221{ 1222 return lsquic_mh_count(&engine->conns_out) > 0 1223 ; 1224} 1225 1226 1227static void 1228reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1229{ 1230 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1231 engine->flags &= ~ENG_PAST_DEADLINE; 1232} 1233 1234 1235/* TODO: this is a user-facing function, account for load */ 1236void 1237lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1238{ 1239 lsquic_conn_t *conn; 1240 struct conns_stailq closed_conns; 1241 struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns); 1242 1243 STAILQ_INIT(&closed_conns); 1244 reset_deadline(engine, lsquic_time_now()); 1245 if (!(engine->pub.enp_flags & ENPUB_CAN_SEND)) 1246 { 1247 LSQ_DEBUG("can send again"); 1248 EV_LOG_GENERIC_EVENT("can send again"); 1249 engine->pub.enp_flags |= ENPUB_CAN_SEND; 1250 } 1251 1252 send_packets_out(engine, &ticked_conns, &closed_conns); 1253 1254 while ((conn = STAILQ_FIRST(&closed_conns))) { 1255 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1256 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1257 } 1258 1259} 1260 1261 1262static void 1263process_connections (lsquic_engine_t *engine, conn_iter_f next_conn, 1264 lsquic_time_t now) 1265{ 1266 lsquic_conn_t *conn; 1267 enum tick_st tick_st; 1268 unsigned i; 1269 lsquic_time_t next_tick_time; 1270 struct conns_stailq closed_conns; 1271 struct conns_tailq ticked_conns; 1272 1273 eng_hist_tick(&engine->history, now); 1274 1275 STAILQ_INIT(&closed_conns); 1276 TAILQ_INIT(&ticked_conns); 1277 reset_deadline(engine, now); 1278 1279 if (!(engine->pub.enp_flags & ENPUB_CAN_SEND) 1280 && now > engine->resume_sending_at) 1281 { 1282 LSQ_NOTICE("failsafe activated: resume sending packets again after " 1283 "timeout"); 1284 EV_LOG_GENERIC_EVENT("resume sending packets again after timeout"); 1285 engine->pub.enp_flags |= ENPUB_CAN_SEND; 1286 } 1287 1288 i = 0; 1289 while ((conn = next_conn(engine)) 1290 ) 1291 { 1292 tick_st = conn->cn_if->ci_tick(conn, now); 1293 conn->cn_last_ticked = now + i /* Maintain relative order */ ++; 1294 if (tick_st & TICK_SEND) 1295 { 1296 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1297 { 1298 lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent); 1299 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1300 } 1301 } 1302 if (tick_st & TICK_CLOSE) 1303 { 1304 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1305 engine_incref_conn(conn, LSCONN_CLOSING); 1306 if (conn->cn_flags & LSCONN_HASHED) 1307 remove_conn_from_hash(engine, conn); 1308 } 1309 else 1310 { 1311 TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked); 1312 engine_incref_conn(conn, LSCONN_TICKED); 1313 } 1314 } 1315 1316 if ((engine->pub.enp_flags & ENPUB_CAN_SEND) 1317 && lsquic_engine_has_unsent_packets(engine)) 1318 send_packets_out(engine, &ticked_conns, &closed_conns); 1319 1320 while ((conn = STAILQ_FIRST(&closed_conns))) { 1321 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1322 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1323 } 1324 1325 /* TODO Heapification can be optimized by switching to the Floyd method: 1326 * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap 1327 */ 1328 while ((conn = TAILQ_FIRST(&ticked_conns))) 1329 { 1330 TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked); 1331 engine_decref_conn(engine, conn, LSCONN_TICKED); 1332 if (!(conn->cn_flags & LSCONN_TICKABLE) 1333 && conn->cn_if->ci_is_tickable(conn)) 1334 { 1335 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 1336 engine_incref_conn(conn, LSCONN_TICKABLE); 1337 } 1338 else if (!(conn->cn_flags & LSCONN_ATTQ)) 1339 { 1340 next_tick_time = conn->cn_if->ci_next_tick_time(conn); 1341 if (next_tick_time) 1342 { 1343 if (0 == attq_add(engine->attq, conn, next_tick_time)) 1344 engine_incref_conn(conn, LSCONN_ATTQ); 1345 } 1346 else 1347 assert(0); 1348 } 1349 } 1350 1351} 1352 1353 1354/* Return 0 if packet is being processed by a real connection, 1 if the 1355 * packet was processed, but not by a connection, and -1 on error. 1356 */ 1357int 1358lsquic_engine_packet_in (lsquic_engine_t *engine, 1359 const unsigned char *packet_in_data, size_t packet_in_size, 1360 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1361 void *peer_ctx) 1362{ 1363 struct packin_parse_state ppstate; 1364 lsquic_packet_in_t *packet_in; 1365 int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length, 1366 int is_server, struct packin_parse_state *); 1367 1368 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1369 { 1370 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1371 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1372 errno = E2BIG; 1373 return -1; 1374 } 1375 1376 if (conn_hash_using_addr(&engine->conns_hash)) 1377 { 1378 const struct lsquic_conn *conn; 1379 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 1380 if (!conn) 1381 return -1; 1382 if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS) 1383 parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin; 1384 else 1385 parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin; 1386 } 1387 else 1388 parse_packet_in_begin = lsquic_parse_packet_in_begin; 1389 1390 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1391 if (!packet_in) 1392 return -1; 1393 1394 /* Library does not modify packet_in_data, it is not referenced after 1395 * this function returns and subsequent release of pi_data is guarded 1396 * by PI_OWN_DATA flag. 1397 */ 1398 packet_in->pi_data = (unsigned char *) packet_in_data; 1399 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1400 engine->flags & ENG_SERVER, &ppstate)) 1401 { 1402 LSQ_DEBUG("Cannot parse incoming packet's header"); 1403 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1404 errno = EINVAL; 1405 return -1; 1406 } 1407 1408 packet_in->pi_received = lsquic_time_now(); 1409 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1410 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1411 peer_ctx); 1412} 1413 1414 1415#if __GNUC__ && !defined(NDEBUG) 1416__attribute__((weak)) 1417#endif 1418unsigned 1419lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1420{ 1421 return engine->pub.enp_settings.es_versions; 1422} 1423 1424 1425int 1426lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1427{ 1428 const lsquic_time_t *next_attq_time; 1429 lsquic_time_t now, next_time; 1430 1431 if (((engine->flags & ENG_PAST_DEADLINE) 1432 && lsquic_mh_count(&engine->conns_out)) 1433 || lsquic_mh_count(&engine->conns_tickable)) 1434 { 1435 *diff = 0; 1436 return 1; 1437 } 1438 1439 next_attq_time = attq_next_time(engine->attq); 1440 if (engine->pub.enp_flags & ENPUB_CAN_SEND) 1441 { 1442 if (next_attq_time) 1443 next_time = *next_attq_time; 1444 else 1445 return 0; 1446 } 1447 else 1448 { 1449 if (next_attq_time) 1450 next_time = MIN(*next_attq_time, engine->resume_sending_at); 1451 else 1452 next_time = engine->resume_sending_at; 1453 } 1454 1455 now = lsquic_time_now(); 1456 *diff = (int) ((int64_t) next_time - (int64_t) now); 1457 return 1; 1458} 1459 1460 1461unsigned 1462lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1463{ 1464 lsquic_time_t now; 1465 now = lsquic_time_now(); 1466 if (from_now < 0) 1467 now -= from_now; 1468 else 1469 now += from_now; 1470 return attq_count_before(engine->attq, now); 1471} 1472 1473 1474