lsquic_engine.c revision 16a9b66a
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/time.h> 15#include <time.h> 16#include <netinet/in.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <fcntl.h> 20#include <unistd.h> 21#include <netdb.h> 22 23 24 25#include "lsquic.h" 26#include "lsquic_types.h" 27#include "lsquic_alarmset.h" 28#include "lsquic_parse.h" 29#include "lsquic_packet_in.h" 30#include "lsquic_packet_out.h" 31#include "lsquic_senhist.h" 32#include "lsquic_rtt.h" 33#include "lsquic_cubic.h" 34#include "lsquic_pacer.h" 35#include "lsquic_send_ctl.h" 36#include "lsquic_set.h" 37#include "lsquic_conn_flow.h" 38#include "lsquic_sfcw.h" 39#include "lsquic_stream.h" 40#include "lsquic_conn.h" 41#include "lsquic_full_conn.h" 42#include "lsquic_util.h" 43#include "lsquic_qtags.h" 44#include "lsquic_str.h" 45#include "lsquic_handshake.h" 46#include "lsquic_mm.h" 47#include "lsquic_conn_hash.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_eng_hist.h" 50#include "lsquic_ev_log.h" 51#include "lsquic_version.h" 52#include "lsquic_hash.h" 53#include "lsquic_attq.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 56#include "lsquic_logger.h" 57 58 59/* The batch of outgoing packets grows and shrinks dynamically */ 60#define MAX_OUT_BATCH_SIZE 1024 61#define MIN_OUT_BATCH_SIZE 256 62#define INITIAL_OUT_BATCH_SIZE 512 63 64typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 65 66static void 67process_connections (struct lsquic_engine *engine, conn_iter_f iter); 68 69static void 70engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 71 72static lsquic_conn_t * 73engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 74 enum lsquic_conn_flags flag); 75 76static void 77force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 78 79/* Nested calls to LSQUIC are not supported */ 80#define ENGINE_IN(e) do { \ 81 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 82 (e)->pub.enp_flags |= ENPUB_PROC; \ 83} while (0) 84 85#define ENGINE_OUT(e) do { \ 86 assert((e)->pub.enp_flags & ENPUB_PROC); \ 87 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 88} while (0) 89 90/* A connection can be referenced from one of six places: 91 * 92 * 1. Connection hash: a connection starts its life in one of those. 93 * 94 * 2. Outgoing queue. 95 * 96 * 3. Incoming queue. 97 * 98 * 4. Pending RW Events queue. 99 * 100 * 5. Advisory Tick Time queue. 101 * 102 * 6. Closing connections queue. This is a transient queue -- it only 103 * exists for the duration of process_connections() function call. 104 * 105 * The idea is to destroy the connection when it is no longer referenced. 106 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 107 * that case, the connection is referenced from two places: (2) and (6). 108 * After its packets are sent, it is only referenced in (6), and at the 109 * end of the function call, when it is removed from (6), reference count 110 * goes to zero and the connection is destroyed. If not all packets can 111 * be sent, at the end of the function call, the connection is referenced 112 * by (2) and will only be removed once all outgoing packets have been 113 * sent. 114 */ 115#define CONN_REF_FLAGS (LSCONN_HASHED \ 116 |LSCONN_HAS_OUTGOING \ 117 |LSCONN_HAS_INCOMING \ 118 |LSCONN_RW_PENDING \ 119 |LSCONN_CLOSING \ 120 |LSCONN_ATTQ) 121 122 123struct out_heap_elem 124{ 125 struct lsquic_conn *ohe_conn; 126 lsquic_time_t ohe_last_sent; 127}; 128 129 130struct out_heap 131{ 132 struct out_heap_elem *oh_elems; 133 unsigned oh_nalloc, 134 oh_nelem; 135}; 136 137 138 139 140struct lsquic_engine 141{ 142 struct lsquic_engine_public pub; 143 enum { 144 ENG_SERVER = LSENG_SERVER, 145 ENG_HTTP = LSENG_HTTP, 146 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 147 ENG_PAST_DEADLINE 148 = (1 << 8), /* Previous call to a processing 149 * function went past time threshold. 150 */ 151#ifndef NDEBUG 152 ENG_DTOR = (1 << 26), /* Engine destructor */ 153#endif 154 } flags; 155 const struct lsquic_stream_if *stream_if; 156 void *stream_if_ctx; 157 lsquic_packets_out_f packets_out; 158 void *packets_out_ctx; 159 void *bad_handshake_ctx; 160 struct conn_hash full_conns; 161 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 162 struct out_heap conns_out; 163 /* Use a union because only one iterator is being used at any one time */ 164 union { 165 struct { 166 /* This iterator does not have any state: it uses `conns_in' */ 167 } conn_in; 168 struct { 169 /* This iterator does not have any state: it uses `conns_pend_rw' */ 170 } rw_pend; 171 struct { 172 /* Iterator state to process connections in Advisory Tick Time 173 * queue. 174 */ 175 lsquic_time_t cutoff; 176 } attq; 177 struct { 178 /* Iterator state to process all connections */ 179 } all; 180 struct { 181 lsquic_conn_t *conn; 182 } one; 183 } iter_state; 184 struct eng_hist history; 185 unsigned batch_size; 186 unsigned time_until_desired_tick; 187 struct attq *attq; 188 lsquic_time_t proc_time; 189 /* Track time last time a packet was sent to give new connections 190 * priority lower than that of existing connections. 191 */ 192 lsquic_time_t last_sent; 193 lsquic_time_t deadline; 194}; 195 196 197#define OHE_PARENT(i) ((i - 1) / 2) 198#define OHE_LCHILD(i) (2 * i + 1) 199#define OHE_RCHILD(i) (2 * i + 2) 200 201 202static void 203heapify_out_heap (struct out_heap *heap, unsigned i) 204{ 205 struct out_heap_elem el; 206 unsigned smallest; 207 208 assert(i < heap->oh_nelem); 209 210 if (OHE_LCHILD(i) < heap->oh_nelem) 211 { 212 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 213 heap->oh_elems[ i ].ohe_last_sent) 214 smallest = OHE_LCHILD(i); 215 else 216 smallest = i; 217 if (OHE_RCHILD(i) < heap->oh_nelem && 218 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 219 heap->oh_elems[ smallest ].ohe_last_sent) 220 smallest = OHE_RCHILD(i); 221 } 222 else 223 smallest = i; 224 225 if (smallest != i) 226 { 227 el = heap->oh_elems[ smallest ]; 228 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 229 heap->oh_elems[ i ] = el; 230 heapify_out_heap(heap, smallest); 231 } 232} 233 234 235static void 236oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 237{ 238 struct out_heap_elem el; 239 unsigned nalloc, i; 240 241 if (heap->oh_nelem == heap->oh_nalloc) 242 { 243 if (0 == heap->oh_nalloc) 244 nalloc = 4; 245 else 246 nalloc = heap->oh_nalloc * 2; 247 heap->oh_elems = realloc(heap->oh_elems, 248 nalloc * sizeof(heap->oh_elems[0])); 249 if (!heap->oh_elems) 250 { /* Not much we can do here */ 251 LSQ_ERROR("realloc failed"); 252 return; 253 } 254 heap->oh_nalloc = nalloc; 255 } 256 257 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 258 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 259 ++heap->oh_nelem; 260 261 i = heap->oh_nelem - 1; 262 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 263 heap->oh_elems[ i ].ohe_last_sent) 264 { 265 el = heap->oh_elems[ OHE_PARENT(i) ]; 266 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 267 heap->oh_elems[ i ] = el; 268 i = OHE_PARENT(i); 269 } 270} 271 272 273static struct lsquic_conn * 274oh_pop (struct out_heap *heap) 275{ 276 struct lsquic_conn *conn; 277 278 assert(heap->oh_nelem); 279 280 conn = heap->oh_elems[0].ohe_conn; 281 --heap->oh_nelem; 282 if (heap->oh_nelem > 0) 283 { 284 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 285 heapify_out_heap(heap, 0); 286 } 287 288 return conn; 289} 290 291 292void 293lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 294 unsigned flags) 295{ 296 memset(settings, 0, sizeof(*settings)); 297 settings->es_versions = LSQUIC_DF_VERSIONS; 298 if (flags & ENG_SERVER) 299 { 300 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 301 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 302 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 303 } 304 else 305 { 306 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 307 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 308 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 309 } 310 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 311 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 312 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 313 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 314 settings->es_max_header_list_size 315 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 316 settings->es_ua = LSQUIC_DF_UA; 317 318 settings->es_pdmd = QTAG_X509; 319 settings->es_aead = QTAG_AESG; 320 settings->es_kexs = QTAG_C255; 321 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 322 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 323 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 324 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 325 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 326 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 327 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 328 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 329 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 330} 331 332 333/* Note: if returning an error, err_buf must be valid if non-NULL */ 334int 335lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 336 unsigned flags, 337 char *err_buf, size_t err_buf_sz) 338{ 339 if (settings->es_cfcw < LSQUIC_MIN_FCW || 340 settings->es_sfcw < LSQUIC_MIN_FCW) 341 { 342 if (err_buf) 343 snprintf(err_buf, err_buf_sz, "%s", 344 "flow control window set too low"); 345 return -1; 346 } 347 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 348 { 349 if (err_buf) 350 snprintf(err_buf, err_buf_sz, "%s", 351 "No supported QUIC versions specified"); 352 return -1; 353 } 354 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 355 { 356 if (err_buf) 357 snprintf(err_buf, err_buf_sz, "%s", 358 "one or more unsupported QUIC version is specified"); 359 return -1; 360 } 361 return 0; 362} 363 364 365static void 366free_packet (void *ctx, unsigned char *packet_data) 367{ 368 free(packet_data); 369} 370 371 372static void * 373malloc_buf (void *ctx, size_t size) 374{ 375 return malloc(size); 376} 377 378 379static const struct lsquic_packout_mem_if stock_pmi = 380{ 381 malloc_buf, (void(*)(void *, void *)) free_packet, 382}; 383 384 385lsquic_engine_t * 386lsquic_engine_new (unsigned flags, 387 const struct lsquic_engine_api *api) 388{ 389 lsquic_engine_t *engine; 390 int tag_buf_len; 391 char err_buf[100]; 392 393 if (!api->ea_packets_out) 394 { 395 LSQ_ERROR("packets_out callback is not specified"); 396 return NULL; 397 } 398 399 if (api->ea_settings && 400 0 != lsquic_engine_check_settings(api->ea_settings, flags, 401 err_buf, sizeof(err_buf))) 402 { 403 LSQ_ERROR("cannot create engine: %s", err_buf); 404 return NULL; 405 } 406 407 engine = calloc(1, sizeof(*engine)); 408 if (!engine) 409 return NULL; 410 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 411 { 412 free(engine); 413 return NULL; 414 } 415 if (api->ea_settings) 416 engine->pub.enp_settings = *api->ea_settings; 417 else 418 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 419 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 420 sizeof(engine->pub.enp_ver_tags_buf), 421 engine->pub.enp_settings.es_versions); 422 if (tag_buf_len <= 0) 423 { 424 LSQ_ERROR("cannot generate version tags buffer"); 425 free(engine); 426 return NULL; 427 } 428 engine->pub.enp_ver_tags_len = tag_buf_len; 429 430 engine->flags = flags; 431 engine->stream_if = api->ea_stream_if; 432 engine->stream_if_ctx = api->ea_stream_if_ctx; 433 engine->packets_out = api->ea_packets_out; 434 engine->packets_out_ctx = api->ea_packets_out_ctx; 435 if (api->ea_pmi) 436 { 437 engine->pub.enp_pmi = api->ea_pmi; 438 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 439 } 440 else 441 { 442 engine->pub.enp_pmi = &stock_pmi; 443 engine->pub.enp_pmi_ctx = NULL; 444 } 445 engine->pub.enp_engine = engine; 446 TAILQ_INIT(&engine->conns_in); 447 TAILQ_INIT(&engine->conns_pend_rw); 448 conn_hash_init(&engine->full_conns, ~0); 449 engine->attq = attq_create(); 450 eng_hist_init(&engine->history); 451 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 452 453 454 LSQ_INFO("instantiated engine"); 455 return engine; 456} 457 458 459static void 460grow_batch_size (struct lsquic_engine *engine) 461{ 462 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 463} 464 465 466static void 467shrink_batch_size (struct lsquic_engine *engine) 468{ 469 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 470} 471 472 473/* Wrapper to make sure important things occur before the connection is 474 * really destroyed. 475 */ 476static void 477destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 478{ 479 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 480 conn->cn_if->ci_destroy(conn); 481} 482 483 484static lsquic_conn_t * 485new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 486 unsigned short max_packet_size) 487{ 488 lsquic_conn_t *conn; 489 unsigned flags; 490 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 491 conn = full_conn_client_new(&engine->pub, engine->stream_if, 492 engine->stream_if_ctx, flags, hostname, max_packet_size); 493 if (!conn) 494 return NULL; 495 if (0 != conn_hash_add(&engine->full_conns, conn)) 496 { 497 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 498 conn->cn_cid); 499 destroy_conn(engine, conn); 500 return NULL; 501 } 502 assert(!(conn->cn_flags & 503 (CONN_REF_FLAGS 504 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 505 callbacks */ 506 ))); 507 conn->cn_flags |= LSCONN_HASHED; 508 return conn; 509} 510 511 512static lsquic_conn_t * 513find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 514 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 515 void *peer_ctx) 516{ 517 lsquic_conn_t *conn; 518 519 if (lsquic_packet_in_is_prst(packet_in) 520 && !engine->pub.enp_settings.es_honor_prst) 521 { 522 LSQ_DEBUG("public reset packet: discarding"); 523 return NULL; 524 } 525 526 if (!(packet_in->pi_flags & PI_CONN_ID)) 527 { 528 LSQ_DEBUG("packet header does not have connection ID: discarding"); 529 return NULL; 530 } 531 532 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id); 533 if (conn) 534 { 535 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 536 return conn; 537 } 538 539 return conn; 540} 541 542 543static void 544add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 545 enum rw_reason reason) 546{ 547 int hist_idx; 548 549 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 550 engine_incref_conn(conn, LSCONN_RW_PENDING); 551 552 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 553 conn->cn_rw_hist_buf[ hist_idx ] = reason; 554 ++conn->cn_rw_hist_idx; 555 556 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 557 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 558 "rw_hist: %.*s", (char) reason, 559 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 560 else 561 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 562 (char) reason); 563} 564 565 566#if !defined(NDEBUG) && __GNUC__ 567__attribute__((weak)) 568#endif 569void 570lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 571 lsquic_conn_t *conn, enum rw_reason reason) 572{ 573 if (0 == (enpub->enp_flags & ENPUB_PROC) && 574 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 575 { 576 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 577 add_conn_to_pend_rw(engine, conn, reason); 578 } 579} 580 581 582void 583lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 584 lsquic_conn_t *conn, lsquic_time_t tick_time) 585{ 586 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 587 /* Instead of performing an update, we simply remove the connection from 588 * the queue and add it back. This should not happen in at the time of 589 * this writing. 590 */ 591 if (conn->cn_flags & LSCONN_ATTQ) 592 { 593 attq_remove(engine->attq, conn); 594 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 595 } 596 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 597 0 == attq_maybe_add(engine->attq, conn, tick_time)) 598 engine_incref_conn(conn, LSCONN_ATTQ); 599} 600 601 602static void 603update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 604 int progress_made) 605{ 606 rw_hist_idx_t hist_idx; 607 const unsigned char *empty; 608 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 609 610 if (!pendrw_check) 611 return; 612 613 /* Convert previous entry to uppercase: */ 614 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 615 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 616 617 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 618 if (progress_made) 619 { 620 conn->cn_noprogress_count = 0; 621 return; 622 } 623 624 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 625 "no progress"); 626 ++conn->cn_noprogress_count; 627 if (conn->cn_noprogress_count <= pendrw_check) 628 return; 629 630 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 631 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 632 sizeof(conn->cn_rw_hist_buf)); 633 if (empty) 634 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 635 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 636 conn->cn_cid, conn->cn_noprogress_count, 637 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 638 else 639 { 640 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 641 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 642 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 643 conn->cn_cid, conn->cn_noprogress_count, 644 /* First part of history: */ 645 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 646 conn->cn_rw_hist_buf + hist_idx, 647 /* Second part of history: */ 648 hist_idx, conn->cn_rw_hist_buf); 649 } 650} 651 652 653/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 654static int 655process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 656 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 657 const struct sockaddr *sa_peer, void *peer_ctx) 658{ 659 lsquic_conn_t *conn; 660 661 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 662 if (!conn) 663 { 664 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 665 return 1; 666 } 667 668 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 669 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 670 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 671 } 672 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 673 lsquic_packet_in_upref(packet_in); 674 conn->cn_peer_ctx = peer_ctx; 675 conn->cn_if->ci_packet_in(conn, packet_in); 676 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 677 return 0; 678} 679 680 681static int 682conn_attq_expired (const struct lsquic_engine *engine, 683 const lsquic_conn_t *conn) 684{ 685 assert(conn->cn_attq_elem); 686 return lsquic_conn_adv_time(conn) < engine->proc_time; 687} 688 689 690/* Iterator for connections with incoming packets */ 691static lsquic_conn_t * 692conn_iter_next_incoming (struct lsquic_engine *engine) 693{ 694 enum lsquic_conn_flags addl_flags; 695 lsquic_conn_t *conn; 696 while ((conn = TAILQ_FIRST(&engine->conns_in))) 697 { 698 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 699 if (conn->cn_flags & LSCONN_RW_PENDING) 700 { 701 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 702 EV_LOG_CONN_EVENT(conn->cn_cid, 703 "removed from pending RW queue (processing incoming)"); 704 } 705 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 706 { 707 addl_flags = LSCONN_ATTQ; 708 attq_remove(engine->attq, conn); 709 } 710 else 711 addl_flags = 0; 712 conn = engine_decref_conn(engine, conn, 713 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 714 if (conn) 715 break; 716 } 717 return conn; 718} 719 720 721/* Iterator for connections with that have pending read/write events */ 722static lsquic_conn_t * 723conn_iter_next_rw_pend (struct lsquic_engine *engine) 724{ 725 enum lsquic_conn_flags addl_flags; 726 lsquic_conn_t *conn; 727 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 728 { 729 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 730 EV_LOG_CONN_EVENT(conn->cn_cid, 731 "removed from pending RW queue (processing pending RW conns)"); 732 if (conn->cn_flags & LSCONN_HAS_INCOMING) 733 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 734 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 735 { 736 addl_flags = LSCONN_ATTQ; 737 attq_remove(engine->attq, conn); 738 } 739 else 740 addl_flags = 0; 741 conn = engine_decref_conn(engine, conn, 742 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 743 if (conn) 744 break; 745 } 746 return conn; 747} 748 749 750void 751lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 752{ 753 LSQ_DEBUG("process connections with incoming packets"); 754 ENGINE_IN(engine); 755 process_connections(engine, conn_iter_next_incoming); 756 assert(TAILQ_EMPTY(&engine->conns_in)); 757 ENGINE_OUT(engine); 758} 759 760 761int 762lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 763{ 764 return !(engine->flags & ENG_PAST_DEADLINE) 765 && !TAILQ_EMPTY(&engine->conns_pend_rw); 766} 767 768 769void 770lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 771{ 772 LSQ_DEBUG("process connections with pending RW events"); 773 ENGINE_IN(engine); 774 process_connections(engine, conn_iter_next_rw_pend); 775 ENGINE_OUT(engine); 776} 777 778 779void 780lsquic_engine_destroy (lsquic_engine_t *engine) 781{ 782 lsquic_conn_t *conn; 783 784 LSQ_DEBUG("destroying engine"); 785#ifndef NDEBUG 786 engine->flags |= ENG_DTOR; 787#endif 788 789 while (engine->conns_out.oh_nelem > 0) 790 { 791 --engine->conns_out.oh_nelem; 792 conn = engine->conns_out.oh_elems[ 793 engine->conns_out.oh_nelem ].ohe_conn; 794 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 795 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 796 } 797 798 for (conn = conn_hash_first(&engine->full_conns); conn; 799 conn = conn_hash_next(&engine->full_conns)) 800 force_close_conn(engine, conn); 801 conn_hash_cleanup(&engine->full_conns); 802 803 804 attq_destroy(engine->attq); 805 806 assert(0 == engine->conns_out.oh_nelem); 807 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 808 lsquic_mm_cleanup(&engine->pub.enp_mm); 809 free(engine->conns_out.oh_elems); 810 free(engine); 811} 812 813 814#if __GNUC__ 815__attribute__((nonnull(3))) 816#endif 817static lsquic_conn_t * 818remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 819 lsquic_conn_t *conn, const char *reason) 820{ 821 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 822 if (conn->cn_flags & LSCONN_HAS_INCOMING) 823 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 824 if (conn->cn_flags & LSCONN_RW_PENDING) 825 { 826 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 827 EV_LOG_CONN_EVENT(conn->cn_cid, 828 "removed from pending RW queue (%s)", reason); 829 } 830 conn = engine_decref_conn(engine, conn, 831 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 832 assert(conn); 833 return conn; 834} 835 836 837static lsquic_conn_t * 838conn_iter_next_one (lsquic_engine_t *engine) 839{ 840 lsquic_conn_t *conn = engine->iter_state.one.conn; 841 if (conn) 842 { 843 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 844 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 845 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 846 conn_attq_expired(engine, conn)) 847 { 848 attq_remove(engine->attq, conn); 849 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 850 } 851 engine->iter_state.one.conn = NULL; 852 } 853 return conn; 854} 855 856 857lsquic_conn_t * 858lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 859 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 860 const char *hostname, unsigned short max_packet_size) 861{ 862 lsquic_conn_t *conn; 863 864 if (engine->flags & ENG_SERVER) 865 { 866 LSQ_ERROR("`%s' must only be called in client mode", __func__); 867 return NULL; 868 } 869 870 if (0 == max_packet_size) 871 { 872 switch (peer_sa->sa_family) 873 { 874 case AF_INET: 875 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 876 break; 877 default: 878 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 879 break; 880 } 881 } 882 883 conn = new_full_conn_client(engine, hostname, max_packet_size); 884 if (!conn) 885 return NULL; 886 ENGINE_IN(engine); 887 lsquic_conn_record_peer_sa(conn, peer_sa); 888 conn->cn_peer_ctx = peer_ctx; 889 lsquic_conn_set_ctx(conn, conn_ctx); 890 engine->iter_state.one.conn = conn; 891 full_conn_client_call_on_new(conn); 892 process_connections(engine, conn_iter_next_one); 893 ENGINE_OUT(engine); 894 return conn; 895} 896 897 898static void 899remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 900{ 901 conn_hash_remove(&engine->full_conns, conn); 902 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 903} 904 905 906static void 907refflags2str (enum lsquic_conn_flags flags, char s[7]) 908{ 909 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 910 *s = 'H'; s += !!(flags & LSCONN_HASHED); 911 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 912 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 913 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 914 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 915 *s = '\0'; 916} 917 918 919static void 920engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 921{ 922 char str[7]; 923 assert(flag & CONN_REF_FLAGS); 924 assert(!(conn->cn_flags & flag)); 925 conn->cn_flags |= flag; 926 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 927 (refflags2str(conn->cn_flags, str), str)); 928} 929 930 931static lsquic_conn_t * 932engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 933 enum lsquic_conn_flags flags) 934{ 935 char str[7]; 936 assert(flags & CONN_REF_FLAGS); 937 assert(conn->cn_flags & flags); 938#ifndef NDEBUG 939 if (flags & LSCONN_CLOSING) 940 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 941#endif 942 conn->cn_flags &= ~flags; 943 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 944 (refflags2str(conn->cn_flags, str), str)); 945 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 946 { 947 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 948 destroy_conn(engine, conn); 949 return NULL; 950 } 951 else 952 return conn; 953} 954 955 956/* This is not a general-purpose function. Only call from engine dtor. */ 957static void 958force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 959{ 960 assert(engine->flags & ENG_DTOR); 961 const enum lsquic_conn_flags flags = conn->cn_flags; 962 assert(conn->cn_flags & CONN_REF_FLAGS); 963 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 964 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 965 if (flags & LSCONN_HAS_INCOMING) 966 { 967 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 968 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 969 } 970 if (flags & LSCONN_RW_PENDING) 971 { 972 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 973 EV_LOG_CONN_EVENT(conn->cn_cid, 974 "removed from pending RW queue (engine destruction)"); 975 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 976 } 977 if (flags & LSCONN_ATTQ) 978 attq_remove(engine->attq, conn); 979 if (flags & LSCONN_HASHED) 980 remove_conn_from_hash(engine, conn); 981} 982 983 984/* Iterator for all connections. 985 * Returned connections are removed from the Incoming, Pending RW Event, 986 * and Advisory Tick Time queues if necessary. 987 */ 988static lsquic_conn_t * 989conn_iter_next_all (struct lsquic_engine *engine) 990{ 991 lsquic_conn_t *conn; 992 993 conn = conn_hash_next(&engine->full_conns); 994 995 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 996 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 997 if (conn && (conn->cn_flags & LSCONN_ATTQ) 998 && conn_attq_expired(engine, conn)) 999 { 1000 attq_remove(engine->attq, conn); 1001 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1002 } 1003 1004 return conn; 1005} 1006 1007 1008static lsquic_conn_t * 1009conn_iter_next_attq (struct lsquic_engine *engine) 1010{ 1011 lsquic_conn_t *conn; 1012 1013 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1014 if (conn) 1015 { 1016 assert(conn->cn_flags & LSCONN_ATTQ); 1017 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1018 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1019 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1020 } 1021 1022 return conn; 1023} 1024 1025 1026void 1027lsquic_engine_proc_all (lsquic_engine_t *engine) 1028{ 1029 ENGINE_IN(engine); 1030 /* We poke each connection every time as initial implementation. If it 1031 * proves to be too inefficient, we will need to figure out 1032 * a) when to stop processing; and 1033 * b) how to remember state between calls. 1034 */ 1035 conn_hash_reset_iter(&engine->full_conns); 1036 process_connections(engine, conn_iter_next_all); 1037 ENGINE_OUT(engine); 1038} 1039 1040 1041void 1042lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1043{ 1044 lsquic_time_t prev_min, now; 1045 1046 now = lsquic_time_now(); 1047 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) 1048 { 1049 const lsquic_time_t *expected_time; 1050 int64_t diff; 1051 expected_time = attq_next_time(engine->attq); 1052 if (expected_time) 1053 diff = *expected_time - now; 1054 else 1055 diff = -1; 1056 LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff); 1057 } 1058 1059 ENGINE_IN(engine); 1060 prev_min = attq_set_min(engine->attq, now); /* Prevent infinite loop */ 1061 engine->iter_state.attq.cutoff = now; 1062 process_connections(engine, conn_iter_next_attq); 1063 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1064 ENGINE_OUT(engine); 1065} 1066 1067 1068static int 1069generate_header (const lsquic_packet_out_t *packet_out, 1070 const struct parse_funcs *pf, lsquic_cid_t cid, 1071 unsigned char *buf, size_t bufsz) 1072{ 1073 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1074 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1075 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1076 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1077 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1078} 1079 1080 1081static ssize_t 1082really_encrypt_packet (const lsquic_conn_t *conn, 1083 const lsquic_packet_out_t *packet_out, 1084 unsigned char *buf, size_t bufsz) 1085{ 1086 int enc, header_sz, is_hello_packet; 1087 size_t packet_sz; 1088 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1089 1090 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1091 header_buf, sizeof(header_buf)); 1092 if (header_sz < 0) 1093 return -1; 1094 1095 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1096 enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0, 1097 packet_out->po_packno, header_buf, header_sz, 1098 packet_out->po_data, packet_out->po_data_sz, 1099 buf, bufsz, &packet_sz, is_hello_packet); 1100 if (0 == enc) 1101 { 1102 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1103 "ciphertext is %zd bytes", 1104 packet_out->po_packno, 1105 lsquic_po_header_length(packet_out->po_flags) + 1106 packet_out->po_data_sz, 1107 packet_sz); 1108 return packet_sz; 1109 } 1110 else 1111 return -1; 1112} 1113 1114 1115static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1116encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1117 lsquic_packet_out_t *packet_out) 1118{ 1119 ssize_t enc_sz; 1120 size_t bufsz; 1121 unsigned sent_sz; 1122 unsigned char *buf; 1123 1124 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1125 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1126 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1127 if (!buf) 1128 { 1129 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1130 bufsz); 1131 return ENCPA_NOMEM; 1132 } 1133 1134 { 1135 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1136 sent_sz = enc_sz; 1137 } 1138 1139 if (enc_sz < 0) 1140 { 1141 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1142 return ENCPA_BADCRYPT; 1143 } 1144 1145 packet_out->po_enc_data = buf; 1146 packet_out->po_enc_data_sz = enc_sz; 1147 packet_out->po_sent_sz = sent_sz; 1148 packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ; 1149 1150 return ENCPA_OK; 1151} 1152 1153 1154struct out_batch 1155{ 1156 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 1157 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 1158 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 1159}; 1160 1161 1162STAILQ_HEAD(closed_conns, lsquic_conn); 1163 1164 1165struct conns_out_iter 1166{ 1167 struct out_heap *coi_heap; 1168 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1169 coi_inactive_list; 1170 lsquic_conn_t *coi_next; 1171#ifndef NDEBUG 1172 lsquic_time_t coi_last_sent; 1173#endif 1174}; 1175 1176 1177static void 1178coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1179{ 1180 iter->coi_heap = &engine->conns_out; 1181 iter->coi_next = NULL; 1182 TAILQ_INIT(&iter->coi_active_list); 1183 TAILQ_INIT(&iter->coi_inactive_list); 1184#ifndef NDEBUG 1185 iter->coi_last_sent = 0; 1186#endif 1187} 1188 1189 1190static lsquic_conn_t * 1191coi_next (struct conns_out_iter *iter) 1192{ 1193 lsquic_conn_t *conn; 1194 1195 if (iter->coi_heap->oh_nelem > 0) 1196 { 1197 conn = oh_pop(iter->coi_heap); 1198 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1199 conn->cn_flags |= LSCONN_COI_ACTIVE; 1200#ifndef NDEBUG 1201 if (iter->coi_last_sent) 1202 assert(iter->coi_last_sent <= conn->cn_last_sent); 1203 iter->coi_last_sent = conn->cn_last_sent; 1204#endif 1205 return conn; 1206 } 1207 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1208 { 1209 conn = iter->coi_next; 1210 if (!conn) 1211 conn = TAILQ_FIRST(&iter->coi_active_list); 1212 if (conn) 1213 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1214 return conn; 1215 } 1216 else 1217 return NULL; 1218} 1219 1220 1221static void 1222coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1223{ 1224 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1225 { 1226 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1227 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1228 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1229 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1230 conn->cn_flags |= LSCONN_COI_INACTIVE; 1231 } 1232} 1233 1234 1235static void 1236coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1237{ 1238 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1239 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1240 { 1241 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1242 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1243 } 1244} 1245 1246 1247static void 1248coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1249{ 1250 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1251 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1252 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1253 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1254 conn->cn_flags |= LSCONN_COI_ACTIVE; 1255} 1256 1257 1258static void 1259coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1260{ 1261 lsquic_conn_t *conn; 1262 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1263 { 1264 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1265 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1266 oh_insert(iter->coi_heap, conn); 1267 } 1268 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1269 { 1270 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1271 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1272 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1273 } 1274} 1275 1276 1277static unsigned 1278send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1279 struct out_batch *batch, unsigned n_to_send) 1280{ 1281 int n_sent, i; 1282 lsquic_time_t now; 1283 1284 /* Set sent time before the write to avoid underestimating RTT */ 1285 now = lsquic_time_now(); 1286 for (i = 0; i < (int) n_to_send; ++i) 1287 batch->packets[i]->po_sent = now; 1288 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1289 n_to_send); 1290 if (n_sent >= 0) 1291 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1292 else 1293 { 1294 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1295 n_sent = 0; 1296 } 1297 if (n_sent > 0) 1298 engine->last_sent = now + n_sent; 1299 for (i = 0; i < n_sent; ++i) 1300 { 1301 eng_hist_inc(&engine->history, now, sl_packets_out); 1302 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1303 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1304 batch->packets[i]); 1305 /* `i' is added to maintain relative order */ 1306 batch->conns[i]->cn_last_sent = now + i; 1307 /* Release packet out buffer as soon as the packet is sent 1308 * successfully. If not successfully sent, we hold on to 1309 * this buffer until the packet sending is attempted again 1310 * or until it times out and regenerated. 1311 */ 1312 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1313 { 1314 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1315 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1316 batch->packets[i]->po_enc_data); 1317 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1318 } 1319 } 1320 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1321 for ( ; i < (int) n_to_send; ++i) 1322 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1323 /* Return packets to the connection in reverse order so that the packet 1324 * ordering is maintained. 1325 */ 1326 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1327 { 1328 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1329 batch->packets[i]); 1330 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1331 coi_reactivate(conns_iter, batch->conns[i]); 1332 } 1333 return n_sent; 1334} 1335 1336 1337/* Return 1 if went past deadline, 0 otherwise */ 1338static int 1339check_deadline (lsquic_engine_t *engine) 1340{ 1341 if (engine->pub.enp_settings.es_proc_time_thresh && 1342 lsquic_time_now() > engine->deadline) 1343 { 1344 LSQ_INFO("went past threshold of %u usec, stop sending", 1345 engine->pub.enp_settings.es_proc_time_thresh); 1346 engine->flags |= ENG_PAST_DEADLINE; 1347 return 1; 1348 } 1349 else 1350 return 0; 1351} 1352 1353 1354static void 1355send_packets_out (struct lsquic_engine *engine, 1356 struct closed_conns *closed_conns) 1357{ 1358 unsigned n, w, n_sent, n_batches_sent; 1359 lsquic_packet_out_t *packet_out; 1360 lsquic_conn_t *conn; 1361 struct out_batch batch; 1362 struct conns_out_iter conns_iter; 1363 int shrink, deadline_exceeded; 1364 1365 coi_init(&conns_iter, engine); 1366 n_batches_sent = 0; 1367 n_sent = 0, n = 0; 1368 shrink = 0; 1369 deadline_exceeded = 0; 1370 1371 while ((conn = coi_next(&conns_iter))) 1372 { 1373 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1374 if (!packet_out) { 1375 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1376 conn->cn_cid); 1377 coi_deactivate(&conns_iter, conn); 1378 continue; 1379 } 1380 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1381 { 1382 switch (encrypt_packet(engine, conn, packet_out)) 1383 { 1384 case ENCPA_NOMEM: 1385 /* Send what we have and wait for a more opportune moment */ 1386 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1387 goto end_for; 1388 case ENCPA_BADCRYPT: 1389 /* This is pretty bad: close connection immediately */ 1390 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1391 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1392 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1393 { 1394 if (!(conn->cn_flags & LSCONN_CLOSING)) 1395 { 1396 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1397 engine_incref_conn(conn, LSCONN_CLOSING); 1398 if (conn->cn_flags & LSCONN_HASHED) 1399 remove_conn_from_hash(engine, conn); 1400 } 1401 coi_remove(&conns_iter, conn); 1402 } 1403 continue; 1404 case ENCPA_OK: 1405 break; 1406 } 1407 } 1408 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1409 packet_out->po_packno, conn->cn_cid); 1410 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1411 if (packet_out->po_flags & PO_ENCRYPTED) 1412 { 1413 batch.outs[n].buf = packet_out->po_enc_data; 1414 batch.outs[n].sz = packet_out->po_enc_data_sz; 1415 } 1416 else 1417 { 1418 batch.outs[n].buf = packet_out->po_data; 1419 batch.outs[n].sz = packet_out->po_data_sz; 1420 } 1421 batch.outs [n].peer_ctx = conn->cn_peer_ctx; 1422 batch.outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1423 batch.outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1424 batch.conns [n] = conn; 1425 batch.packets[n] = packet_out; 1426 ++n; 1427 if (n == engine->batch_size) 1428 { 1429 n = 0; 1430 w = send_batch(engine, &conns_iter, &batch, engine->batch_size); 1431 ++n_batches_sent; 1432 n_sent += w; 1433 if (w < engine->batch_size) 1434 { 1435 shrink = 1; 1436 break; 1437 } 1438 deadline_exceeded = check_deadline(engine); 1439 if (deadline_exceeded) 1440 break; 1441 grow_batch_size(engine); 1442 } 1443 } 1444 end_for: 1445 1446 if (n > 0) { 1447 w = send_batch(engine, &conns_iter, &batch, n); 1448 n_sent += w; 1449 shrink = w < n; 1450 ++n_batches_sent; 1451 deadline_exceeded = check_deadline(engine); 1452 } 1453 1454 if (shrink) 1455 shrink_batch_size(engine); 1456 else if (n_batches_sent > 1 && !deadline_exceeded) 1457 grow_batch_size(engine); 1458 1459 coi_reheap(&conns_iter, engine); 1460 1461 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1462} 1463 1464 1465int 1466lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1467{ 1468 return !(engine->flags & ENG_PAST_DEADLINE) 1469 && ( engine->conns_out.oh_nelem > 0 1470 ) 1471 ; 1472} 1473 1474 1475static void 1476reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1477{ 1478 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1479 engine->flags &= ~ENG_PAST_DEADLINE; 1480} 1481 1482 1483/* TODO: this is a user-facing function, account for load */ 1484void 1485lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1486{ 1487 lsquic_conn_t *conn; 1488 struct closed_conns closed_conns; 1489 1490 STAILQ_INIT(&closed_conns); 1491 reset_deadline(engine, lsquic_time_now()); 1492 1493 send_packets_out(engine, &closed_conns); 1494 1495 while ((conn = STAILQ_FIRST(&closed_conns))) { 1496 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1497 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1498 } 1499 1500} 1501 1502 1503static void 1504process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1505{ 1506 lsquic_conn_t *conn; 1507 enum tick_st tick_st; 1508 lsquic_time_t now = lsquic_time_now(); 1509 struct closed_conns closed_conns; 1510 1511 engine->proc_time = now; 1512 eng_hist_tick(&engine->history, now); 1513 1514 STAILQ_INIT(&closed_conns); 1515 reset_deadline(engine, now); 1516 1517 while ((conn = next_conn(engine))) 1518 { 1519 tick_st = conn->cn_if->ci_tick(conn, now); 1520 if (conn_iter_next_rw_pend == next_conn) 1521 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1522 if (tick_st & TICK_SEND) 1523 { 1524 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1525 { 1526 oh_insert(&engine->conns_out, conn); 1527 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1528 } 1529 } 1530 if (tick_st & TICK_CLOSE) 1531 { 1532 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1533 engine_incref_conn(conn, LSCONN_CLOSING); 1534 if (conn->cn_flags & LSCONN_HASHED) 1535 remove_conn_from_hash(engine, conn); 1536 } 1537 } 1538 1539 if (lsquic_engine_has_unsent_packets(engine)) 1540 send_packets_out(engine, &closed_conns); 1541 1542 while ((conn = STAILQ_FIRST(&closed_conns))) { 1543 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1544 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1545 } 1546 1547} 1548 1549 1550/* Return 0 if packet is being processed by a real connection, 1 if the 1551 * packet was processed, but not by a connection, and -1 on error. 1552 */ 1553int 1554lsquic_engine_packet_in (lsquic_engine_t *engine, 1555 const unsigned char *packet_in_data, size_t packet_in_size, 1556 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1557 void *peer_ctx) 1558{ 1559 struct packin_parse_state ppstate; 1560 lsquic_packet_in_t *packet_in; 1561 1562 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1563 { 1564 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1565 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1566 errno = E2BIG; 1567 return -1; 1568 } 1569 1570 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1571 if (!packet_in) 1572 return -1; 1573 1574 /* Library does not modify packet_in_data, it is not referenced after 1575 * this function returns and subsequent release of pi_data is guarded 1576 * by PI_OWN_DATA flag. 1577 */ 1578 packet_in->pi_data = (unsigned char *) packet_in_data; 1579 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1580 engine->flags & ENG_SERVER, &ppstate)) 1581 { 1582 LSQ_DEBUG("Cannot parse incoming packet's header"); 1583 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1584 errno = EINVAL; 1585 return -1; 1586 } 1587 1588 packet_in->pi_received = lsquic_time_now(); 1589 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1590 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1591 peer_ctx); 1592} 1593 1594 1595#if __GNUC__ && !defined(NDEBUG) 1596__attribute__((weak)) 1597#endif 1598unsigned 1599lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1600{ 1601 return engine->pub.enp_settings.es_versions; 1602} 1603 1604 1605int 1606lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1607{ 1608 const lsquic_time_t *next_time; 1609 lsquic_time_t now; 1610 1611 next_time = attq_next_time(engine->attq); 1612 if (!next_time) 1613 return 0; 1614 1615 now = lsquic_time_now(); 1616 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1617 return 1; 1618} 1619 1620 1621unsigned 1622lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1623{ 1624 lsquic_time_t now; 1625 now = lsquic_time_now(); 1626 if (from_now < 0) 1627 now -= from_now; 1628 else 1629 now += from_now; 1630 return attq_count_before(engine->attq, now); 1631} 1632 1633 1634