lsquic_engine.c revision bfc7bfd8
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/time.h> 15#include <time.h> 16#include <netinet/in.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <fcntl.h> 20#include <unistd.h> 21#include <netdb.h> 22 23 24 25#include "lsquic.h" 26#include "lsquic_types.h" 27#include "lsquic_alarmset.h" 28#include "lsquic_parse.h" 29#include "lsquic_packet_in.h" 30#include "lsquic_packet_out.h" 31#include "lsquic_senhist.h" 32#include "lsquic_rtt.h" 33#include "lsquic_cubic.h" 34#include "lsquic_pacer.h" 35#include "lsquic_send_ctl.h" 36#include "lsquic_set.h" 37#include "lsquic_conn_flow.h" 38#include "lsquic_sfcw.h" 39#include "lsquic_stream.h" 40#include "lsquic_conn.h" 41#include "lsquic_full_conn.h" 42#include "lsquic_util.h" 43#include "lsquic_qtags.h" 44#include "lsquic_str.h" 45#include "lsquic_handshake.h" 46#include "lsquic_mm.h" 47#include "lsquic_conn_hash.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_eng_hist.h" 50#include "lsquic_ev_log.h" 51#include "lsquic_version.h" 52#include "lsquic_hash.h" 53#include "lsquic_attq.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 56#include "lsquic_logger.h" 57 58 59/* The batch of outgoing packets grows and shrinks dynamically */ 60#define MAX_OUT_BATCH_SIZE 1024 61#define MIN_OUT_BATCH_SIZE 256 62#define INITIAL_OUT_BATCH_SIZE 512 63 64typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 65 66static void 67process_connections (struct lsquic_engine *engine, conn_iter_f iter); 68 69static void 70engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 71 72static lsquic_conn_t * 73engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 74 enum lsquic_conn_flags flag); 75 76static void 77force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 78 79/* Nested calls to LSQUIC are not supported */ 80#define ENGINE_IN(e) do { \ 81 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 82 (e)->pub.enp_flags |= ENPUB_PROC; \ 83} while (0) 84 85#define ENGINE_OUT(e) do { \ 86 assert((e)->pub.enp_flags & ENPUB_PROC); \ 87 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 88} while (0) 89 90/* A connection can be referenced from one of six places: 91 * 92 * 1. Connection hash: a connection starts its life in one of those. 93 * 94 * 2. Outgoing queue. 95 * 96 * 3. Incoming queue. 97 * 98 * 4. Pending RW Events queue. 99 * 100 * 5. Advisory Tick Time queue. 101 * 102 * 6. Closing connections queue. This is a transient queue -- it only 103 * exists for the duration of process_connections() function call. 104 * 105 * The idea is to destroy the connection when it is no longer referenced. 106 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 107 * that case, the connection is referenced from two places: (2) and (6). 108 * After its packets are sent, it is only referenced in (6), and at the 109 * end of the function call, when it is removed from (6), reference count 110 * goes to zero and the connection is destroyed. If not all packets can 111 * be sent, at the end of the function call, the connection is referenced 112 * by (2) and will only be removed once all outgoing packets have been 113 * sent. 114 */ 115#define CONN_REF_FLAGS (LSCONN_HASHED \ 116 |LSCONN_HAS_OUTGOING \ 117 |LSCONN_HAS_INCOMING \ 118 |LSCONN_RW_PENDING \ 119 |LSCONN_CLOSING \ 120 |LSCONN_ATTQ) 121 122 123struct out_heap_elem 124{ 125 struct lsquic_conn *ohe_conn; 126 lsquic_time_t ohe_last_sent; 127}; 128 129 130struct out_heap 131{ 132 struct out_heap_elem *oh_elems; 133 unsigned oh_nalloc, 134 oh_nelem; 135}; 136 137 138 139 140struct lsquic_engine 141{ 142 struct lsquic_engine_public pub; 143 enum { 144 ENG_SERVER = LSENG_SERVER, 145 ENG_HTTP = LSENG_HTTP, 146 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 147 ENG_PAST_DEADLINE 148 = (1 << 8), /* Previous call to a processing 149 * function went past time threshold. 150 */ 151#ifndef NDEBUG 152 ENG_DTOR = (1 << 26), /* Engine destructor */ 153#endif 154 } flags; 155 const struct lsquic_stream_if *stream_if; 156 void *stream_if_ctx; 157 lsquic_packets_out_f packets_out; 158 void *packets_out_ctx; 159 void *bad_handshake_ctx; 160 struct conn_hash full_conns; 161 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 162 struct out_heap conns_out; 163 /* Use a union because only one iterator is being used at any one time */ 164 union { 165 struct { 166 /* This iterator does not have any state: it uses `conns_in' */ 167 } conn_in; 168 struct { 169 /* This iterator does not have any state: it uses `conns_pend_rw' */ 170 } rw_pend; 171 struct { 172 /* Iterator state to process connections in Advisory Tick Time 173 * queue. 174 */ 175 lsquic_time_t cutoff; 176 } attq; 177 struct { 178 /* Iterator state to process all connections */ 179 } all; 180 struct { 181 lsquic_conn_t *conn; 182 } one; 183 } iter_state; 184 struct eng_hist history; 185 unsigned batch_size; 186 unsigned time_until_desired_tick; 187 struct attq *attq; 188 lsquic_time_t proc_time; 189 /* Track time last time a packet was sent to give new connections 190 * priority lower than that of existing connections. 191 */ 192 lsquic_time_t last_sent; 193 lsquic_time_t deadline; 194}; 195 196 197#define OHE_PARENT(i) ((i - 1) / 2) 198#define OHE_LCHILD(i) (2 * i + 1) 199#define OHE_RCHILD(i) (2 * i + 2) 200 201 202static void 203heapify_out_heap (struct out_heap *heap, unsigned i) 204{ 205 struct out_heap_elem el; 206 unsigned smallest; 207 208 assert(i < heap->oh_nelem); 209 210 if (OHE_LCHILD(i) < heap->oh_nelem) 211 { 212 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 213 heap->oh_elems[ i ].ohe_last_sent) 214 smallest = OHE_LCHILD(i); 215 else 216 smallest = i; 217 if (OHE_RCHILD(i) < heap->oh_nelem && 218 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 219 heap->oh_elems[ smallest ].ohe_last_sent) 220 smallest = OHE_RCHILD(i); 221 } 222 else 223 smallest = i; 224 225 if (smallest != i) 226 { 227 el = heap->oh_elems[ smallest ]; 228 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 229 heap->oh_elems[ i ] = el; 230 heapify_out_heap(heap, smallest); 231 } 232} 233 234 235static void 236oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 237{ 238 struct out_heap_elem el; 239 unsigned nalloc, i; 240 241 if (heap->oh_nelem == heap->oh_nalloc) 242 { 243 if (0 == heap->oh_nalloc) 244 nalloc = 4; 245 else 246 nalloc = heap->oh_nalloc * 2; 247 heap->oh_elems = realloc(heap->oh_elems, 248 nalloc * sizeof(heap->oh_elems[0])); 249 if (!heap->oh_elems) 250 { /* Not much we can do here */ 251 LSQ_ERROR("realloc failed"); 252 return; 253 } 254 heap->oh_nalloc = nalloc; 255 } 256 257 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 258 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 259 ++heap->oh_nelem; 260 261 i = heap->oh_nelem - 1; 262 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 263 heap->oh_elems[ i ].ohe_last_sent) 264 { 265 el = heap->oh_elems[ OHE_PARENT(i) ]; 266 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 267 heap->oh_elems[ i ] = el; 268 i = OHE_PARENT(i); 269 } 270} 271 272 273static struct lsquic_conn * 274oh_pop (struct out_heap *heap) 275{ 276 struct lsquic_conn *conn; 277 278 assert(heap->oh_nelem); 279 280 conn = heap->oh_elems[0].ohe_conn; 281 --heap->oh_nelem; 282 if (heap->oh_nelem > 0) 283 { 284 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 285 heapify_out_heap(heap, 0); 286 } 287 288 return conn; 289} 290 291 292void 293lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 294 unsigned flags) 295{ 296 memset(settings, 0, sizeof(*settings)); 297 settings->es_versions = LSQUIC_DF_VERSIONS; 298 if (flags & ENG_SERVER) 299 { 300 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 301 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 302 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 303 } 304 else 305 { 306 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 307 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 308 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 309 } 310 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 311 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 312 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 313 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 314 settings->es_max_header_list_size 315 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 316 settings->es_ua = LSQUIC_DF_UA; 317 318 settings->es_pdmd = QTAG_X509; 319 settings->es_aead = QTAG_AESG; 320 settings->es_kexs = QTAG_C255; 321 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 322 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 323 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 324 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 325 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 326 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 327 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 328 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 329 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 330} 331 332 333/* Note: if returning an error, err_buf must be valid if non-NULL */ 334int 335lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 336 unsigned flags, 337 char *err_buf, size_t err_buf_sz) 338{ 339 if (settings->es_cfcw < LSQUIC_MIN_FCW || 340 settings->es_sfcw < LSQUIC_MIN_FCW) 341 { 342 if (err_buf) 343 snprintf(err_buf, err_buf_sz, "%s", 344 "flow control window set too low"); 345 return -1; 346 } 347 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 348 { 349 if (err_buf) 350 snprintf(err_buf, err_buf_sz, "%s", 351 "No supported QUIC versions specified"); 352 return -1; 353 } 354 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 355 { 356 if (err_buf) 357 snprintf(err_buf, err_buf_sz, "%s", 358 "one or more unsupported QUIC version is specified"); 359 return -1; 360 } 361 return 0; 362} 363 364 365static void 366free_packet (void *ctx, unsigned char *packet_data) 367{ 368 free(packet_data); 369} 370 371 372static void * 373malloc_buf (void *ctx, size_t size) 374{ 375 return malloc(size); 376} 377 378 379static const struct lsquic_packout_mem_if stock_pmi = 380{ 381 malloc_buf, (void(*)(void *, void *)) free_packet, 382}; 383 384 385lsquic_engine_t * 386lsquic_engine_new (unsigned flags, 387 const struct lsquic_engine_api *api) 388{ 389 lsquic_engine_t *engine; 390 int tag_buf_len; 391 char err_buf[100]; 392 393 if (!api->ea_packets_out) 394 { 395 LSQ_ERROR("packets_out callback is not specified"); 396 return NULL; 397 } 398 399 if (api->ea_settings && 400 0 != lsquic_engine_check_settings(api->ea_settings, flags, 401 err_buf, sizeof(err_buf))) 402 { 403 LSQ_ERROR("cannot create engine: %s", err_buf); 404 return NULL; 405 } 406 407 engine = calloc(1, sizeof(*engine)); 408 if (!engine) 409 return NULL; 410 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 411 { 412 free(engine); 413 return NULL; 414 } 415 if (api->ea_settings) 416 engine->pub.enp_settings = *api->ea_settings; 417 else 418 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 419 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 420 sizeof(engine->pub.enp_ver_tags_buf), 421 engine->pub.enp_settings.es_versions); 422 if (tag_buf_len <= 0) 423 { 424 LSQ_ERROR("cannot generate version tags buffer"); 425 free(engine); 426 return NULL; 427 } 428 engine->pub.enp_ver_tags_len = tag_buf_len; 429 430 engine->flags = flags; 431 engine->stream_if = api->ea_stream_if; 432 engine->stream_if_ctx = api->ea_stream_if_ctx; 433 engine->packets_out = api->ea_packets_out; 434 engine->packets_out_ctx = api->ea_packets_out_ctx; 435 if (api->ea_pmi) 436 { 437 engine->pub.enp_pmi = api->ea_pmi; 438 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 439 } 440 else 441 { 442 engine->pub.enp_pmi = &stock_pmi; 443 engine->pub.enp_pmi_ctx = NULL; 444 } 445 engine->pub.enp_engine = engine; 446 TAILQ_INIT(&engine->conns_in); 447 TAILQ_INIT(&engine->conns_pend_rw); 448 conn_hash_init(&engine->full_conns, ~0); 449 engine->attq = attq_create(); 450 eng_hist_init(&engine->history); 451 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 452 453 454 LSQ_INFO("instantiated engine"); 455 return engine; 456} 457 458 459static void 460grow_batch_size (struct lsquic_engine *engine) 461{ 462 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 463} 464 465 466static void 467shrink_batch_size (struct lsquic_engine *engine) 468{ 469 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 470} 471 472 473/* Wrapper to make sure important things occur before the connection is 474 * really destroyed. 475 */ 476static void 477destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 478{ 479 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 480 conn->cn_if->ci_destroy(conn); 481} 482 483 484static lsquic_conn_t * 485new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 486 unsigned short max_packet_size) 487{ 488 lsquic_conn_t *conn; 489 unsigned flags; 490 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 491 conn = full_conn_client_new(&engine->pub, engine->stream_if, 492 engine->stream_if_ctx, flags, hostname, max_packet_size); 493 if (!conn) 494 return NULL; 495 if (0 != conn_hash_add_new(&engine->full_conns, conn)) 496 { 497 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 498 conn->cn_cid); 499 destroy_conn(engine, conn); 500 return NULL; 501 } 502 assert(!(conn->cn_flags & 503 (CONN_REF_FLAGS 504 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 505 callbacks */ 506 ))); 507 conn->cn_flags |= LSCONN_HASHED; 508 return conn; 509} 510 511 512static lsquic_conn_t * 513find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 514 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 515 void *peer_ctx) 516{ 517 lsquic_conn_t *conn; 518 519 if (lsquic_packet_in_is_prst(packet_in) 520 && !engine->pub.enp_settings.es_honor_prst) 521 { 522 LSQ_DEBUG("public reset packet: discarding"); 523 return NULL; 524 } 525 526 if (!(packet_in->pi_flags & PI_CONN_ID)) 527 { 528 LSQ_DEBUG("packet header does not have connection ID: discarding"); 529 return NULL; 530 } 531 532 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL); 533 if (conn) 534 { 535 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 536 return conn; 537 } 538 539 return conn; 540} 541 542 543static void 544add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 545 enum rw_reason reason) 546{ 547 int hist_idx; 548 549 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 550 engine_incref_conn(conn, LSCONN_RW_PENDING); 551 552 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 553 conn->cn_rw_hist_buf[ hist_idx ] = reason; 554 ++conn->cn_rw_hist_idx; 555 556 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 557 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 558 "rw_hist: %.*s", (char) reason, 559 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 560 else 561 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 562 (char) reason); 563} 564 565 566#if !defined(NDEBUG) && __GNUC__ 567__attribute__((weak)) 568#endif 569void 570lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 571 lsquic_conn_t *conn, enum rw_reason reason) 572{ 573 if (0 == (enpub->enp_flags & ENPUB_PROC) && 574 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 575 { 576 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 577 add_conn_to_pend_rw(engine, conn, reason); 578 } 579} 580 581 582void 583lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 584 lsquic_conn_t *conn, lsquic_time_t tick_time) 585{ 586 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 587 /* Instead of performing an update, we simply remove the connection from 588 * the queue and add it back. This should not happen in at the time of 589 * this writing. 590 */ 591 if (conn->cn_flags & LSCONN_ATTQ) 592 { 593 attq_remove(engine->attq, conn); 594 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 595 } 596 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 597 0 == attq_maybe_add(engine->attq, conn, tick_time)) 598 engine_incref_conn(conn, LSCONN_ATTQ); 599} 600 601 602static void 603update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 604 int progress_made) 605{ 606 rw_hist_idx_t hist_idx; 607 const unsigned char *empty; 608 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 609 610 if (!pendrw_check) 611 return; 612 613 /* Convert previous entry to uppercase: */ 614 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 615 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 616 617 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 618 if (progress_made) 619 { 620 conn->cn_noprogress_count = 0; 621 return; 622 } 623 624 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 625 "no progress"); 626 ++conn->cn_noprogress_count; 627 if (conn->cn_noprogress_count <= pendrw_check) 628 return; 629 630 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 631 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 632 sizeof(conn->cn_rw_hist_buf)); 633 if (empty) 634 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 635 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 636 conn->cn_cid, conn->cn_noprogress_count, 637 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 638 else 639 { 640 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 641 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 642 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 643 conn->cn_cid, conn->cn_noprogress_count, 644 /* First part of history: */ 645 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 646 conn->cn_rw_hist_buf + hist_idx, 647 /* Second part of history: */ 648 hist_idx, conn->cn_rw_hist_buf); 649 } 650} 651 652 653/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 654static int 655process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 656 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 657 const struct sockaddr *sa_peer, void *peer_ctx) 658{ 659 lsquic_conn_t *conn; 660 661 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 662 if (!conn) 663 { 664 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 665 return 1; 666 } 667 668 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 669 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 670 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 671 } 672 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 673 lsquic_packet_in_upref(packet_in); 674 conn->cn_peer_ctx = peer_ctx; 675 conn->cn_if->ci_packet_in(conn, packet_in); 676 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 677 return 0; 678} 679 680 681static int 682conn_attq_expired (const struct lsquic_engine *engine, 683 const lsquic_conn_t *conn) 684{ 685 assert(conn->cn_attq_elem); 686 return lsquic_conn_adv_time(conn) < engine->proc_time; 687} 688 689 690/* Iterator for connections with incoming packets */ 691static lsquic_conn_t * 692conn_iter_next_incoming (struct lsquic_engine *engine) 693{ 694 enum lsquic_conn_flags addl_flags; 695 lsquic_conn_t *conn; 696 while ((conn = TAILQ_FIRST(&engine->conns_in))) 697 { 698 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 699 if (conn->cn_flags & LSCONN_RW_PENDING) 700 { 701 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 702 EV_LOG_CONN_EVENT(conn->cn_cid, 703 "removed from pending RW queue (processing incoming)"); 704 } 705 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 706 { 707 addl_flags = LSCONN_ATTQ; 708 attq_remove(engine->attq, conn); 709 } 710 else 711 addl_flags = 0; 712 conn = engine_decref_conn(engine, conn, 713 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 714 if (conn) 715 break; 716 } 717 return conn; 718} 719 720 721/* Iterator for connections with that have pending read/write events */ 722static lsquic_conn_t * 723conn_iter_next_rw_pend (struct lsquic_engine *engine) 724{ 725 enum lsquic_conn_flags addl_flags; 726 lsquic_conn_t *conn; 727 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 728 { 729 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 730 EV_LOG_CONN_EVENT(conn->cn_cid, 731 "removed from pending RW queue (processing pending RW conns)"); 732 if (conn->cn_flags & LSCONN_HAS_INCOMING) 733 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 734 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 735 { 736 addl_flags = LSCONN_ATTQ; 737 attq_remove(engine->attq, conn); 738 } 739 else 740 addl_flags = 0; 741 conn = engine_decref_conn(engine, conn, 742 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 743 if (conn) 744 break; 745 } 746 return conn; 747} 748 749 750void 751lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 752{ 753 LSQ_DEBUG("process connections with incoming packets"); 754 ENGINE_IN(engine); 755 process_connections(engine, conn_iter_next_incoming); 756 assert(TAILQ_EMPTY(&engine->conns_in)); 757 ENGINE_OUT(engine); 758} 759 760 761int 762lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 763{ 764 return !(engine->flags & ENG_PAST_DEADLINE) 765 && !TAILQ_EMPTY(&engine->conns_pend_rw); 766} 767 768 769void 770lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 771{ 772 LSQ_DEBUG("process connections with pending RW events"); 773 ENGINE_IN(engine); 774 process_connections(engine, conn_iter_next_rw_pend); 775 ENGINE_OUT(engine); 776} 777 778 779void 780lsquic_engine_destroy (lsquic_engine_t *engine) 781{ 782 lsquic_conn_t *conn; 783 784 LSQ_DEBUG("destroying engine"); 785#ifndef NDEBUG 786 engine->flags |= ENG_DTOR; 787#endif 788 789 while (engine->conns_out.oh_nelem > 0) 790 { 791 --engine->conns_out.oh_nelem; 792 conn = engine->conns_out.oh_elems[ 793 engine->conns_out.oh_nelem ].ohe_conn; 794 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 795 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 796 } 797 798 for (conn = conn_hash_first(&engine->full_conns); conn; 799 conn = conn_hash_next(&engine->full_conns)) 800 force_close_conn(engine, conn); 801 conn_hash_cleanup(&engine->full_conns); 802 803 804 attq_destroy(engine->attq); 805 806 assert(0 == engine->conns_out.oh_nelem); 807 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 808 lsquic_mm_cleanup(&engine->pub.enp_mm); 809 free(engine->conns_out.oh_elems); 810 free(engine); 811} 812 813 814#if __GNUC__ 815__attribute__((nonnull(3))) 816#endif 817static lsquic_conn_t * 818remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 819 lsquic_conn_t *conn, const char *reason) 820{ 821 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 822 if (conn->cn_flags & LSCONN_HAS_INCOMING) 823 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 824 if (conn->cn_flags & LSCONN_RW_PENDING) 825 { 826 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 827 EV_LOG_CONN_EVENT(conn->cn_cid, 828 "removed from pending RW queue (%s)", reason); 829 } 830 conn = engine_decref_conn(engine, conn, 831 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 832 assert(conn); 833 return conn; 834} 835 836 837static lsquic_conn_t * 838conn_iter_next_one (lsquic_engine_t *engine) 839{ 840 lsquic_conn_t *conn = engine->iter_state.one.conn; 841 if (conn) 842 { 843 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 844 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 845 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 846 conn_attq_expired(engine, conn)) 847 { 848 attq_remove(engine->attq, conn); 849 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 850 } 851 engine->iter_state.one.conn = NULL; 852 } 853 return conn; 854} 855 856 857lsquic_conn_t * 858lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 859 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 860 const char *hostname, unsigned short max_packet_size) 861{ 862 lsquic_conn_t *conn; 863 864 if (engine->flags & ENG_SERVER) 865 { 866 LSQ_ERROR("`%s' must only be called in client mode", __func__); 867 return NULL; 868 } 869 870 if (0 == max_packet_size) 871 { 872 switch (peer_sa->sa_family) 873 { 874 case AF_INET: 875 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 876 break; 877 default: 878 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 879 break; 880 } 881 } 882 883 conn = new_full_conn_client(engine, hostname, max_packet_size); 884 if (!conn) 885 return NULL; 886 ENGINE_IN(engine); 887 lsquic_conn_record_peer_sa(conn, peer_sa); 888 conn->cn_peer_ctx = peer_ctx; 889 lsquic_conn_set_ctx(conn, conn_ctx); 890 engine->iter_state.one.conn = conn; 891 full_conn_client_call_on_new(conn); 892 process_connections(engine, conn_iter_next_one); 893 ENGINE_OUT(engine); 894 return conn; 895} 896 897 898static void 899remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 900{ 901 conn_hash_remove(&engine->full_conns, conn); 902 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 903} 904 905 906static void 907refflags2str (enum lsquic_conn_flags flags, char s[7]) 908{ 909 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 910 *s = 'H'; s += !!(flags & LSCONN_HASHED); 911 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 912 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 913 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 914 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 915 *s = '\0'; 916} 917 918 919static void 920engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 921{ 922 char str[7]; 923 assert(flag & CONN_REF_FLAGS); 924 assert(!(conn->cn_flags & flag)); 925 conn->cn_flags |= flag; 926 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 927 (refflags2str(conn->cn_flags, str), str)); 928} 929 930 931static lsquic_conn_t * 932engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 933 enum lsquic_conn_flags flags) 934{ 935 char str[7]; 936 assert(flags & CONN_REF_FLAGS); 937 assert(conn->cn_flags & flags); 938#ifndef NDEBUG 939 if (flags & LSCONN_CLOSING) 940 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 941#endif 942 conn->cn_flags &= ~flags; 943 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 944 (refflags2str(conn->cn_flags, str), str)); 945 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 946 { 947 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 948 destroy_conn(engine, conn); 949 return NULL; 950 } 951 else 952 return conn; 953} 954 955 956/* This is not a general-purpose function. Only call from engine dtor. */ 957static void 958force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 959{ 960 assert(engine->flags & ENG_DTOR); 961 const enum lsquic_conn_flags flags = conn->cn_flags; 962 assert(conn->cn_flags & CONN_REF_FLAGS); 963 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 964 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 965 if (flags & LSCONN_HAS_INCOMING) 966 { 967 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 968 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 969 } 970 if (flags & LSCONN_RW_PENDING) 971 { 972 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 973 EV_LOG_CONN_EVENT(conn->cn_cid, 974 "removed from pending RW queue (engine destruction)"); 975 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 976 } 977 if (flags & LSCONN_ATTQ) 978 attq_remove(engine->attq, conn); 979 if (flags & LSCONN_HASHED) 980 remove_conn_from_hash(engine, conn); 981} 982 983 984/* Iterator for all connections. 985 * Returned connections are removed from the Incoming, Pending RW Event, 986 * and Advisory Tick Time queues if necessary. 987 */ 988static lsquic_conn_t * 989conn_iter_next_all (struct lsquic_engine *engine) 990{ 991 lsquic_conn_t *conn; 992 993 conn = conn_hash_next(&engine->full_conns); 994 995 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 996 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 997 if (conn && (conn->cn_flags & LSCONN_ATTQ) 998 && conn_attq_expired(engine, conn)) 999 { 1000 attq_remove(engine->attq, conn); 1001 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1002 } 1003 1004 return conn; 1005} 1006 1007 1008static lsquic_conn_t * 1009conn_iter_next_attq (struct lsquic_engine *engine) 1010{ 1011 lsquic_conn_t *conn; 1012 1013 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1014 if (conn) 1015 { 1016 assert(conn->cn_flags & LSCONN_ATTQ); 1017 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1018 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1019 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1020 } 1021 1022 return conn; 1023} 1024 1025 1026void 1027lsquic_engine_proc_all (lsquic_engine_t *engine) 1028{ 1029 ENGINE_IN(engine); 1030 /* We poke each connection every time as initial implementation. If it 1031 * proves to be too inefficient, we will need to figure out 1032 * a) when to stop processing; and 1033 * b) how to remember state between calls. 1034 */ 1035 conn_hash_reset_iter(&engine->full_conns); 1036 process_connections(engine, conn_iter_next_all); 1037 ENGINE_OUT(engine); 1038} 1039 1040 1041void 1042lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1043{ 1044 lsquic_time_t prev_min, now; 1045 1046 now = lsquic_time_now(); 1047 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) 1048 { 1049 const lsquic_time_t *expected_time; 1050 int64_t diff; 1051 expected_time = attq_next_time(engine->attq); 1052 if (expected_time) 1053 diff = *expected_time - now; 1054 else 1055 diff = -1; 1056 LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff); 1057 } 1058 1059 ENGINE_IN(engine); 1060 prev_min = attq_set_min(engine->attq, now); /* Prevent infinite loop */ 1061 engine->iter_state.attq.cutoff = now; 1062 process_connections(engine, conn_iter_next_attq); 1063 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1064 ENGINE_OUT(engine); 1065} 1066 1067 1068static int 1069generate_header (const lsquic_packet_out_t *packet_out, 1070 const struct parse_funcs *pf, lsquic_cid_t cid, 1071 unsigned char *buf, size_t bufsz) 1072{ 1073 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1074 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1075 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1076 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1077 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1078} 1079 1080 1081static ssize_t 1082really_encrypt_packet (const lsquic_conn_t *conn, 1083 const lsquic_packet_out_t *packet_out, 1084 unsigned char *buf, size_t bufsz) 1085{ 1086 int enc, header_sz, is_hello_packet; 1087 size_t packet_sz; 1088 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1089 1090 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1091 header_buf, sizeof(header_buf)); 1092 if (header_sz < 0) 1093 return -1; 1094 1095 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1096 enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0, 1097 packet_out->po_packno, header_buf, header_sz, 1098 packet_out->po_data, packet_out->po_data_sz, 1099 buf, bufsz, &packet_sz, is_hello_packet); 1100 if (0 == enc) 1101 { 1102 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1103 "ciphertext is %zd bytes", 1104 packet_out->po_packno, 1105 lsquic_po_header_length(packet_out->po_flags) + 1106 packet_out->po_data_sz, 1107 packet_sz); 1108 return packet_sz; 1109 } 1110 else 1111 return -1; 1112} 1113 1114 1115static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1116encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1117 lsquic_packet_out_t *packet_out) 1118{ 1119 ssize_t enc_sz; 1120 size_t bufsz; 1121 unsigned char *buf; 1122 1123 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1124 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1125 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1126 if (!buf) 1127 { 1128 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1129 bufsz); 1130 return ENCPA_NOMEM; 1131 } 1132 1133 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1134 1135 if (enc_sz < 0) 1136 { 1137 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1138 return ENCPA_BADCRYPT; 1139 } 1140 1141 packet_out->po_enc_data = buf; 1142 packet_out->po_enc_data_sz = enc_sz; 1143 packet_out->po_flags |= PO_ENCRYPTED; 1144 1145 return ENCPA_OK; 1146} 1147 1148 1149struct out_batch 1150{ 1151 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 1152 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 1153 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 1154}; 1155 1156 1157STAILQ_HEAD(closed_conns, lsquic_conn); 1158 1159 1160struct conns_out_iter 1161{ 1162 struct out_heap *coi_heap; 1163 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1164 coi_inactive_list; 1165 lsquic_conn_t *coi_next; 1166#ifndef NDEBUG 1167 lsquic_time_t coi_last_sent; 1168#endif 1169}; 1170 1171 1172static void 1173coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1174{ 1175 iter->coi_heap = &engine->conns_out; 1176 iter->coi_next = NULL; 1177 TAILQ_INIT(&iter->coi_active_list); 1178 TAILQ_INIT(&iter->coi_inactive_list); 1179#ifndef NDEBUG 1180 iter->coi_last_sent = 0; 1181#endif 1182} 1183 1184 1185static lsquic_conn_t * 1186coi_next (struct conns_out_iter *iter) 1187{ 1188 lsquic_conn_t *conn; 1189 1190 if (iter->coi_heap->oh_nelem > 0) 1191 { 1192 conn = oh_pop(iter->coi_heap); 1193 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1194 conn->cn_flags |= LSCONN_COI_ACTIVE; 1195#ifndef NDEBUG 1196 if (iter->coi_last_sent) 1197 assert(iter->coi_last_sent <= conn->cn_last_sent); 1198 iter->coi_last_sent = conn->cn_last_sent; 1199#endif 1200 return conn; 1201 } 1202 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1203 { 1204 conn = iter->coi_next; 1205 if (!conn) 1206 conn = TAILQ_FIRST(&iter->coi_active_list); 1207 if (conn) 1208 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1209 return conn; 1210 } 1211 else 1212 return NULL; 1213} 1214 1215 1216static void 1217coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1218{ 1219 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1220 { 1221 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1222 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1223 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1224 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1225 conn->cn_flags |= LSCONN_COI_INACTIVE; 1226 } 1227} 1228 1229 1230static void 1231coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1232{ 1233 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1234 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1235 { 1236 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1237 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1238 } 1239} 1240 1241 1242static void 1243coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1244{ 1245 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1246 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1247 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1248 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1249 conn->cn_flags |= LSCONN_COI_ACTIVE; 1250} 1251 1252 1253static void 1254coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1255{ 1256 lsquic_conn_t *conn; 1257 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1258 { 1259 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1260 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1261 oh_insert(iter->coi_heap, conn); 1262 } 1263 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1264 { 1265 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1266 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1267 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1268 } 1269} 1270 1271 1272static unsigned 1273send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1274 struct out_batch *batch, unsigned n_to_send) 1275{ 1276 int n_sent, i; 1277 lsquic_time_t now; 1278 1279 /* Set sent time before the write to avoid underestimating RTT */ 1280 now = lsquic_time_now(); 1281 for (i = 0; i < (int) n_to_send; ++i) 1282 batch->packets[i]->po_sent = now; 1283 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1284 n_to_send); 1285 if (n_sent >= 0) 1286 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1287 else 1288 { 1289 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1290 n_sent = 0; 1291 } 1292 if (n_sent > 0) 1293 engine->last_sent = now + n_sent; 1294 for (i = 0; i < n_sent; ++i) 1295 { 1296 eng_hist_inc(&engine->history, now, sl_packets_out); 1297 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1298 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1299 batch->packets[i]); 1300 /* `i' is added to maintain relative order */ 1301 batch->conns[i]->cn_last_sent = now + i; 1302 /* Release packet out buffer as soon as the packet is sent 1303 * successfully. If not successfully sent, we hold on to 1304 * this buffer until the packet sending is attempted again 1305 * or until it times out and regenerated. 1306 */ 1307 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1308 { 1309 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1310 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1311 batch->packets[i]->po_enc_data); 1312 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1313 } 1314 } 1315 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1316 for ( ; i < (int) n_to_send; ++i) 1317 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1318 /* Return packets to the connection in reverse order so that the packet 1319 * ordering is maintained. 1320 */ 1321 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1322 { 1323 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1324 batch->packets[i]); 1325 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1326 coi_reactivate(conns_iter, batch->conns[i]); 1327 } 1328 return n_sent; 1329} 1330 1331 1332/* Return 1 if went past deadline, 0 otherwise */ 1333static int 1334check_deadline (lsquic_engine_t *engine) 1335{ 1336 if (engine->pub.enp_settings.es_proc_time_thresh && 1337 lsquic_time_now() > engine->deadline) 1338 { 1339 LSQ_INFO("went past threshold of %u usec, stop sending", 1340 engine->pub.enp_settings.es_proc_time_thresh); 1341 engine->flags |= ENG_PAST_DEADLINE; 1342 return 1; 1343 } 1344 else 1345 return 0; 1346} 1347 1348 1349static void 1350send_packets_out (struct lsquic_engine *engine, 1351 struct closed_conns *closed_conns) 1352{ 1353 unsigned n, w, n_sent, n_batches_sent; 1354 lsquic_packet_out_t *packet_out; 1355 lsquic_conn_t *conn; 1356 struct out_batch batch; 1357 struct conns_out_iter conns_iter; 1358 int shrink, deadline_exceeded; 1359 1360 coi_init(&conns_iter, engine); 1361 n_batches_sent = 0; 1362 n_sent = 0, n = 0; 1363 shrink = 0; 1364 deadline_exceeded = 0; 1365 1366 while ((conn = coi_next(&conns_iter))) 1367 { 1368 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1369 if (!packet_out) { 1370 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1371 conn->cn_cid); 1372 coi_deactivate(&conns_iter, conn); 1373 continue; 1374 } 1375 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1376 { 1377 switch (encrypt_packet(engine, conn, packet_out)) 1378 { 1379 case ENCPA_NOMEM: 1380 /* Send what we have and wait for a more opportune moment */ 1381 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1382 goto end_for; 1383 case ENCPA_BADCRYPT: 1384 /* This is pretty bad: close connection immediately */ 1385 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1386 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1387 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1388 { 1389 if (!(conn->cn_flags & LSCONN_CLOSING)) 1390 { 1391 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1392 engine_incref_conn(conn, LSCONN_CLOSING); 1393 if (conn->cn_flags & LSCONN_HASHED) 1394 remove_conn_from_hash(engine, conn); 1395 } 1396 coi_remove(&conns_iter, conn); 1397 } 1398 continue; 1399 case ENCPA_OK: 1400 break; 1401 } 1402 } 1403 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1404 packet_out->po_packno, conn->cn_cid); 1405 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1406 if (packet_out->po_flags & PO_ENCRYPTED) 1407 { 1408 batch.outs[n].buf = packet_out->po_enc_data; 1409 batch.outs[n].sz = packet_out->po_enc_data_sz; 1410 } 1411 else 1412 { 1413 batch.outs[n].buf = packet_out->po_data; 1414 batch.outs[n].sz = packet_out->po_data_sz; 1415 } 1416 batch.outs [n].peer_ctx = conn->cn_peer_ctx; 1417 batch.outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1418 batch.outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1419 batch.conns [n] = conn; 1420 batch.packets[n] = packet_out; 1421 ++n; 1422 if (n == engine->batch_size) 1423 { 1424 n = 0; 1425 w = send_batch(engine, &conns_iter, &batch, engine->batch_size); 1426 ++n_batches_sent; 1427 n_sent += w; 1428 if (w < engine->batch_size) 1429 { 1430 shrink = 1; 1431 break; 1432 } 1433 deadline_exceeded = check_deadline(engine); 1434 if (deadline_exceeded) 1435 break; 1436 grow_batch_size(engine); 1437 } 1438 } 1439 end_for: 1440 1441 if (n > 0) { 1442 w = send_batch(engine, &conns_iter, &batch, n); 1443 n_sent += w; 1444 shrink = w < n; 1445 ++n_batches_sent; 1446 deadline_exceeded = check_deadline(engine); 1447 } 1448 1449 if (shrink) 1450 shrink_batch_size(engine); 1451 else if (n_batches_sent > 1 && !deadline_exceeded) 1452 grow_batch_size(engine); 1453 1454 coi_reheap(&conns_iter, engine); 1455 1456 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1457} 1458 1459 1460int 1461lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1462{ 1463 return !(engine->flags & ENG_PAST_DEADLINE) 1464 && ( engine->conns_out.oh_nelem > 0 1465 ) 1466 ; 1467} 1468 1469 1470static void 1471reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1472{ 1473 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1474 engine->flags &= ~ENG_PAST_DEADLINE; 1475} 1476 1477 1478/* TODO: this is a user-facing function, account for load */ 1479void 1480lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1481{ 1482 lsquic_conn_t *conn; 1483 struct closed_conns closed_conns; 1484 1485 STAILQ_INIT(&closed_conns); 1486 reset_deadline(engine, lsquic_time_now()); 1487 1488 send_packets_out(engine, &closed_conns); 1489 1490 while ((conn = STAILQ_FIRST(&closed_conns))) { 1491 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1492 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1493 } 1494 1495} 1496 1497 1498static void 1499process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1500{ 1501 lsquic_conn_t *conn; 1502 enum tick_st tick_st; 1503 lsquic_time_t now = lsquic_time_now(); 1504 struct closed_conns closed_conns; 1505 1506 engine->proc_time = now; 1507 eng_hist_tick(&engine->history, now); 1508 1509 STAILQ_INIT(&closed_conns); 1510 reset_deadline(engine, now); 1511 1512 while ((conn = next_conn(engine))) 1513 { 1514 tick_st = conn->cn_if->ci_tick(conn, now); 1515 if (conn_iter_next_rw_pend == next_conn) 1516 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1517 if (tick_st & TICK_SEND) 1518 { 1519 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1520 { 1521 oh_insert(&engine->conns_out, conn); 1522 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1523 } 1524 } 1525 if (tick_st & TICK_CLOSE) 1526 { 1527 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1528 engine_incref_conn(conn, LSCONN_CLOSING); 1529 if (conn->cn_flags & LSCONN_HASHED) 1530 remove_conn_from_hash(engine, conn); 1531 } 1532 } 1533 1534 if (lsquic_engine_has_unsent_packets(engine)) 1535 send_packets_out(engine, &closed_conns); 1536 1537 while ((conn = STAILQ_FIRST(&closed_conns))) { 1538 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1539 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1540 } 1541 1542} 1543 1544 1545/* Return 0 if packet is being processed by a real connection, 1 if the 1546 * packet was processed, but not by a connection, and -1 on error. 1547 */ 1548int 1549lsquic_engine_packet_in (lsquic_engine_t *engine, 1550 const unsigned char *packet_in_data, size_t packet_in_size, 1551 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1552 void *peer_ctx) 1553{ 1554 struct packin_parse_state ppstate; 1555 lsquic_packet_in_t *packet_in; 1556 1557 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1558 { 1559 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1560 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1561 errno = E2BIG; 1562 return -1; 1563 } 1564 1565 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1566 if (!packet_in) 1567 return -1; 1568 1569 /* Library does not modify packet_in_data, it is not referenced after 1570 * this function returns and subsequent release of pi_data is guarded 1571 * by PI_OWN_DATA flag. 1572 */ 1573 packet_in->pi_data = (unsigned char *) packet_in_data; 1574 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1575 engine->flags & ENG_SERVER, &ppstate)) 1576 { 1577 LSQ_DEBUG("Cannot parse incoming packet's header"); 1578 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1579 errno = EINVAL; 1580 return -1; 1581 } 1582 1583 packet_in->pi_received = lsquic_time_now(); 1584 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1585 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1586 peer_ctx); 1587} 1588 1589 1590#if __GNUC__ && !defined(NDEBUG) 1591__attribute__((weak)) 1592#endif 1593unsigned 1594lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1595{ 1596 return engine->pub.enp_settings.es_versions; 1597} 1598 1599 1600int 1601lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1602{ 1603 const lsquic_time_t *next_time; 1604 lsquic_time_t now; 1605 1606 next_time = attq_next_time(engine->attq); 1607 if (!next_time) 1608 return 0; 1609 1610 now = lsquic_time_now(); 1611 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1612 return 1; 1613} 1614 1615 1616unsigned 1617lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1618{ 1619 lsquic_time_t now; 1620 now = lsquic_time_now(); 1621 if (from_now < 0) 1622 now -= from_now; 1623 else 1624 now += from_now; 1625 return attq_count_before(engine->attq, now); 1626} 1627 1628 1629