lsquic_engine.c revision c51ce338
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/time.h> 15#include <time.h> 16#include <netinet/in.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <fcntl.h> 20#include <unistd.h> 21#include <netdb.h> 22 23 24 25#include "lsquic.h" 26#include "lsquic_types.h" 27#include "lsquic_alarmset.h" 28#include "lsquic_parse.h" 29#include "lsquic_packet_in.h" 30#include "lsquic_packet_out.h" 31#include "lsquic_senhist.h" 32#include "lsquic_rtt.h" 33#include "lsquic_cubic.h" 34#include "lsquic_pacer.h" 35#include "lsquic_send_ctl.h" 36#include "lsquic_set.h" 37#include "lsquic_conn_flow.h" 38#include "lsquic_sfcw.h" 39#include "lsquic_stream.h" 40#include "lsquic_conn.h" 41#include "lsquic_full_conn.h" 42#include "lsquic_util.h" 43#include "lsquic_qtags.h" 44#include "lsquic_str.h" 45#include "lsquic_handshake.h" 46#include "lsquic_mm.h" 47#include "lsquic_conn_hash.h" 48#include "lsquic_engine_public.h" 49#include "lsquic_eng_hist.h" 50#include "lsquic_ev_log.h" 51#include "lsquic_version.h" 52#include "lsquic_attq.h" 53 54#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 55#include "lsquic_logger.h" 56 57 58/* The batch of outgoing packets grows and shrinks dynamically */ 59#define MAX_OUT_BATCH_SIZE 1024 60#define MIN_OUT_BATCH_SIZE 256 61#define INITIAL_OUT_BATCH_SIZE 512 62 63typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 64 65static void 66process_connections (struct lsquic_engine *engine, conn_iter_f iter); 67 68static void 69engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 70 71static lsquic_conn_t * 72engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 73 enum lsquic_conn_flags flag); 74 75static void 76force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 77 78/* Nested calls to LSQUIC are not supported */ 79#define ENGINE_IN(e) do { \ 80 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 81 (e)->pub.enp_flags |= ENPUB_PROC; \ 82} while (0) 83 84#define ENGINE_OUT(e) do { \ 85 assert((e)->pub.enp_flags & ENPUB_PROC); \ 86 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 87} while (0) 88 89/* A connection can be referenced from one of six places: 90 * 91 * 1. Connection hash: a connection starts its life in one of those. 92 * 93 * 2. Outgoing queue. 94 * 95 * 3. Incoming queue. 96 * 97 * 4. Pending RW Events queue. 98 * 99 * 5. Advisory Tick Time queue. 100 * 101 * 6. Closing connections queue. This is a transient queue -- it only 102 * exists for the duration of process_connections() function call. 103 * 104 * The idea is to destroy the connection when it is no longer referenced. 105 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 106 * that case, the connection is referenced from two places: (2) and (6). 107 * After its packets are sent, it is only referenced in (6), and at the 108 * end of the function call, when it is removed from (6), reference count 109 * goes to zero and the connection is destroyed. If not all packets can 110 * be sent, at the end of the function call, the connection is referenced 111 * by (2) and will only be removed once all outgoing packets have been 112 * sent. 113 */ 114#define CONN_REF_FLAGS (LSCONN_HASHED \ 115 |LSCONN_HAS_OUTGOING \ 116 |LSCONN_HAS_INCOMING \ 117 |LSCONN_RW_PENDING \ 118 |LSCONN_CLOSING \ 119 |LSCONN_ATTQ) 120 121 122struct out_heap_elem 123{ 124 struct lsquic_conn *ohe_conn; 125 lsquic_time_t ohe_last_sent; 126}; 127 128 129struct out_heap 130{ 131 struct out_heap_elem *oh_elems; 132 unsigned oh_nalloc, 133 oh_nelem; 134}; 135 136 137struct lsquic_engine 138{ 139 struct lsquic_engine_public pub; 140 enum { 141 ENG_SERVER = LSENG_SERVER, 142 ENG_HTTP = LSENG_HTTP, 143 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 144 ENG_PAST_DEADLINE 145 = (1 << 8), /* Previous call to a processing 146 * function went past time threshold. 147 */ 148#ifndef NDEBUG 149 ENG_DTOR = (1 << 26), /* Engine destructor */ 150#endif 151 } flags; 152 const struct lsquic_stream_if *stream_if; 153 void *stream_if_ctx; 154 lsquic_packets_out_f packets_out; 155 void *packets_out_ctx; 156 void *bad_handshake_ctx; 157 struct conn_hash full_conns; 158 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 159 struct out_heap conns_out; 160 /* Use a union because only one iterator is being used at any one time */ 161 union { 162 struct { 163 /* This iterator does not have any state: it uses `conns_in' */ 164 } conn_in; 165 struct { 166 /* This iterator does not have any state: it uses `conns_pend_rw' */ 167 } rw_pend; 168 struct { 169 /* Iterator state to process connections in Advisory Tick Time 170 * queue. 171 */ 172 lsquic_time_t cutoff; 173 } attq; 174 struct { 175 /* Iterator state to process all connections */ 176 } all; 177 struct { 178 lsquic_conn_t *conn; 179 } one; 180 } iter_state; 181 struct eng_hist history; 182 unsigned batch_size; 183 unsigned time_until_desired_tick; 184 struct attq *attq; 185 lsquic_time_t proc_time; 186 /* Track time last time a packet was sent to give new connections 187 * priority lower than that of existing connections. 188 */ 189 lsquic_time_t last_sent; 190 lsquic_time_t deadline; 191}; 192 193 194#define OHE_PARENT(i) ((i - 1) / 2) 195#define OHE_LCHILD(i) (2 * i + 1) 196#define OHE_RCHILD(i) (2 * i + 2) 197 198 199static void 200heapify_out_heap (struct out_heap *heap, unsigned i) 201{ 202 struct out_heap_elem el; 203 unsigned smallest; 204 205 assert(i < heap->oh_nelem); 206 207 if (OHE_LCHILD(i) < heap->oh_nelem) 208 { 209 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 210 heap->oh_elems[ i ].ohe_last_sent) 211 smallest = OHE_LCHILD(i); 212 else 213 smallest = i; 214 if (OHE_RCHILD(i) < heap->oh_nelem && 215 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 216 heap->oh_elems[ smallest ].ohe_last_sent) 217 smallest = OHE_RCHILD(i); 218 } 219 else 220 smallest = i; 221 222 if (smallest != i) 223 { 224 el = heap->oh_elems[ smallest ]; 225 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 226 heap->oh_elems[ i ] = el; 227 heapify_out_heap(heap, smallest); 228 } 229} 230 231 232static void 233oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 234{ 235 struct out_heap_elem el; 236 unsigned nalloc, i; 237 238 if (heap->oh_nelem == heap->oh_nalloc) 239 { 240 if (0 == heap->oh_nalloc) 241 nalloc = 4; 242 else 243 nalloc = heap->oh_nalloc * 2; 244 heap->oh_elems = realloc(heap->oh_elems, 245 nalloc * sizeof(heap->oh_elems[0])); 246 if (!heap->oh_elems) 247 { /* Not much we can do here */ 248 LSQ_ERROR("realloc failed"); 249 return; 250 } 251 heap->oh_nalloc = nalloc; 252 } 253 254 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 255 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 256 ++heap->oh_nelem; 257 258 i = heap->oh_nelem - 1; 259 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 260 heap->oh_elems[ i ].ohe_last_sent) 261 { 262 el = heap->oh_elems[ OHE_PARENT(i) ]; 263 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 264 heap->oh_elems[ i ] = el; 265 i = OHE_PARENT(i); 266 } 267} 268 269 270static struct lsquic_conn * 271oh_pop (struct out_heap *heap) 272{ 273 struct lsquic_conn *conn; 274 275 assert(heap->oh_nelem); 276 277 conn = heap->oh_elems[0].ohe_conn; 278 --heap->oh_nelem; 279 if (heap->oh_nelem > 0) 280 { 281 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 282 heapify_out_heap(heap, 0); 283 } 284 285 return conn; 286} 287 288 289void 290lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 291 unsigned flags) 292{ 293 memset(settings, 0, sizeof(*settings)); 294 settings->es_versions = LSQUIC_DF_VERSIONS; 295 if (flags & ENG_SERVER) 296 { 297 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 298 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 299 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 300 } 301 else 302 { 303 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 304 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 305 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 306 } 307 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 308 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 309 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 310 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 311 settings->es_max_header_list_size 312 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 313 settings->es_ua = LSQUIC_DF_UA; 314 315 settings->es_pdmd = QTAG_X509; 316 settings->es_aead = QTAG_AESG; 317 settings->es_kexs = QTAG_C255; 318 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 319 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 320 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 321 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 322 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 323 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 324 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 325 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 326 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 327} 328 329 330/* Note: if returning an error, err_buf must be valid if non-NULL */ 331int 332lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 333 unsigned flags, 334 char *err_buf, size_t err_buf_sz) 335{ 336 if (settings->es_cfcw < LSQUIC_MIN_FCW || 337 settings->es_sfcw < LSQUIC_MIN_FCW) 338 { 339 if (err_buf) 340 snprintf(err_buf, err_buf_sz, "%s", 341 "flow control window set too low"); 342 return -1; 343 } 344 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 345 { 346 if (err_buf) 347 snprintf(err_buf, err_buf_sz, "%s", 348 "No supported QUIC versions specified"); 349 return -1; 350 } 351 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 352 { 353 if (err_buf) 354 snprintf(err_buf, err_buf_sz, "%s", 355 "one or more unsupported QUIC version is specified"); 356 return -1; 357 } 358 return 0; 359} 360 361 362static void 363free_packet (void *ctx, unsigned char *packet_data) 364{ 365 free(packet_data); 366} 367 368 369static void * 370malloc_buf (void *ctx, size_t size) 371{ 372 return malloc(size); 373} 374 375 376static const struct lsquic_packout_mem_if stock_pmi = 377{ 378 malloc_buf, (void(*)(void *, void *)) free_packet, 379}; 380 381 382lsquic_engine_t * 383lsquic_engine_new (unsigned flags, 384 const struct lsquic_engine_api *api) 385{ 386 lsquic_engine_t *engine; 387 int tag_buf_len; 388 char err_buf[100]; 389 390 if (!api->ea_packets_out) 391 { 392 LSQ_ERROR("packets_out callback is not specified"); 393 return NULL; 394 } 395 396 if (api->ea_settings && 397 0 != lsquic_engine_check_settings(api->ea_settings, flags, 398 err_buf, sizeof(err_buf))) 399 { 400 LSQ_ERROR("cannot create engine: %s", err_buf); 401 return NULL; 402 } 403 404 engine = calloc(1, sizeof(*engine)); 405 if (!engine) 406 return NULL; 407 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 408 { 409 free(engine); 410 return NULL; 411 } 412 if (api->ea_settings) 413 engine->pub.enp_settings = *api->ea_settings; 414 else 415 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 416 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 417 sizeof(engine->pub.enp_ver_tags_buf), 418 engine->pub.enp_settings.es_versions); 419 if (tag_buf_len <= 0) 420 { 421 LSQ_ERROR("cannot generate version tags buffer"); 422 free(engine); 423 return NULL; 424 } 425 engine->pub.enp_ver_tags_len = tag_buf_len; 426 427 engine->flags = flags; 428 engine->stream_if = api->ea_stream_if; 429 engine->stream_if_ctx = api->ea_stream_if_ctx; 430 engine->packets_out = api->ea_packets_out; 431 engine->packets_out_ctx = api->ea_packets_out_ctx; 432 if (api->ea_pmi) 433 { 434 engine->pub.enp_pmi = api->ea_pmi; 435 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 436 } 437 else 438 { 439 engine->pub.enp_pmi = &stock_pmi; 440 engine->pub.enp_pmi_ctx = NULL; 441 } 442 engine->pub.enp_engine = engine; 443 TAILQ_INIT(&engine->conns_in); 444 TAILQ_INIT(&engine->conns_pend_rw); 445 conn_hash_init(&engine->full_conns, ~0); 446 engine->attq = attq_create(); 447 eng_hist_init(&engine->history); 448 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 449 450 451 LSQ_INFO("instantiated engine"); 452 return engine; 453} 454 455 456static void 457grow_batch_size (struct lsquic_engine *engine) 458{ 459 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 460} 461 462 463static void 464shrink_batch_size (struct lsquic_engine *engine) 465{ 466 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 467} 468 469 470/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */ 471static void 472destroy_conn (lsquic_conn_t *conn) 473{ 474 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 475 conn->cn_if->ci_destroy(conn); 476} 477 478 479static lsquic_conn_t * 480new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 481 unsigned short max_packet_size) 482{ 483 lsquic_conn_t *conn; 484 unsigned flags; 485 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 486 conn = full_conn_client_new(&engine->pub, engine->stream_if, 487 engine->stream_if_ctx, flags, hostname, max_packet_size); 488 if (!conn) 489 return NULL; 490 if (0 != conn_hash_add_new(&engine->full_conns, conn)) 491 { 492 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 493 conn->cn_cid); 494 destroy_conn(conn); 495 return NULL; 496 } 497 assert(!(conn->cn_flags & 498 (CONN_REF_FLAGS 499 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 500 callbacks */ 501 ))); 502 conn->cn_flags |= LSCONN_HASHED; 503 return conn; 504} 505 506 507static lsquic_conn_t * 508find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 509 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 510 void *peer_ctx) 511{ 512 lsquic_conn_t *conn; 513 514 if (lsquic_packet_in_is_prst(packet_in) 515 && !engine->pub.enp_settings.es_honor_prst) 516 { 517 LSQ_DEBUG("public reset packet: discarding"); 518 return NULL; 519 } 520 521 if (!(packet_in->pi_flags & PI_CONN_ID)) 522 { 523 LSQ_DEBUG("packet header does not have connection ID: discarding"); 524 return NULL; 525 } 526 527 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL); 528 if (conn) 529 { 530 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 531 return conn; 532 } 533 534 return conn; 535} 536 537 538static void 539add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 540 enum rw_reason reason) 541{ 542 int hist_idx; 543 544 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 545 engine_incref_conn(conn, LSCONN_RW_PENDING); 546 547 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 548 conn->cn_rw_hist_buf[ hist_idx ] = reason; 549 ++conn->cn_rw_hist_idx; 550 551 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 552 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 553 "rw_hist: %.*s", (char) reason, 554 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 555 else 556 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 557 (char) reason); 558} 559 560 561#if !defined(NDEBUG) && __GNUC__ 562__attribute__((weak)) 563#endif 564void 565lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 566 lsquic_conn_t *conn, enum rw_reason reason) 567{ 568 if (0 == (enpub->enp_flags & ENPUB_PROC) && 569 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 570 { 571 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 572 add_conn_to_pend_rw(engine, conn, reason); 573 } 574} 575 576 577void 578lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 579 lsquic_conn_t *conn, lsquic_time_t tick_time) 580{ 581 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 582 /* Instead of performing an update, we simply remove the connection from 583 * the queue and add it back. This should not happen in at the time of 584 * this writing. 585 */ 586 if (conn->cn_flags & LSCONN_ATTQ) 587 { 588 attq_remove(engine->attq, conn); 589 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 590 } 591 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 592 0 == attq_maybe_add(engine->attq, conn, tick_time)) 593 engine_incref_conn(conn, LSCONN_ATTQ); 594} 595 596 597static void 598update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 599 int progress_made) 600{ 601 rw_hist_idx_t hist_idx; 602 const unsigned char *empty; 603 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 604 605 if (!pendrw_check) 606 return; 607 608 /* Convert previous entry to uppercase: */ 609 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 610 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 611 612 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 613 if (progress_made) 614 { 615 conn->cn_noprogress_count = 0; 616 return; 617 } 618 619 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 620 "no progress"); 621 ++conn->cn_noprogress_count; 622 if (conn->cn_noprogress_count <= pendrw_check) 623 return; 624 625 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 626 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 627 sizeof(conn->cn_rw_hist_buf)); 628 if (empty) 629 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 630 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 631 conn->cn_cid, conn->cn_noprogress_count, 632 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 633 else 634 { 635 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 636 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 637 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 638 conn->cn_cid, conn->cn_noprogress_count, 639 /* First part of history: */ 640 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 641 conn->cn_rw_hist_buf + hist_idx, 642 /* Second part of history: */ 643 hist_idx, conn->cn_rw_hist_buf); 644 } 645} 646 647 648/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 649static int 650process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 651 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 652 const struct sockaddr *sa_peer, void *peer_ctx) 653{ 654 lsquic_conn_t *conn; 655 656 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 657 if (!conn) 658 { 659 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 660 return 1; 661 } 662 663 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 664 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 665 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 666 } 667 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 668 lsquic_packet_in_upref(packet_in); 669 conn->cn_peer_ctx = peer_ctx; 670 conn->cn_if->ci_packet_in(conn, packet_in); 671 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 672 return 0; 673} 674 675 676static int 677conn_attq_expired (const struct lsquic_engine *engine, 678 const lsquic_conn_t *conn) 679{ 680 assert(conn->cn_attq_elem); 681 return lsquic_conn_adv_time(conn) < engine->proc_time; 682} 683 684 685/* Iterator for connections with incoming packets */ 686static lsquic_conn_t * 687conn_iter_next_incoming (struct lsquic_engine *engine) 688{ 689 enum lsquic_conn_flags addl_flags; 690 lsquic_conn_t *conn; 691 while ((conn = TAILQ_FIRST(&engine->conns_in))) 692 { 693 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 694 if (conn->cn_flags & LSCONN_RW_PENDING) 695 { 696 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 697 EV_LOG_CONN_EVENT(conn->cn_cid, 698 "removed from pending RW queue (processing incoming)"); 699 } 700 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 701 { 702 addl_flags = LSCONN_ATTQ; 703 attq_remove(engine->attq, conn); 704 } 705 else 706 addl_flags = 0; 707 conn = engine_decref_conn(engine, conn, 708 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 709 if (conn) 710 break; 711 } 712 return conn; 713} 714 715 716/* Iterator for connections with that have pending read/write events */ 717static lsquic_conn_t * 718conn_iter_next_rw_pend (struct lsquic_engine *engine) 719{ 720 enum lsquic_conn_flags addl_flags; 721 lsquic_conn_t *conn; 722 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 723 { 724 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 725 EV_LOG_CONN_EVENT(conn->cn_cid, 726 "removed from pending RW queue (processing pending RW conns)"); 727 if (conn->cn_flags & LSCONN_HAS_INCOMING) 728 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 729 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 730 { 731 addl_flags = LSCONN_ATTQ; 732 attq_remove(engine->attq, conn); 733 } 734 else 735 addl_flags = 0; 736 conn = engine_decref_conn(engine, conn, 737 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 738 if (conn) 739 break; 740 } 741 return conn; 742} 743 744 745void 746lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 747{ 748 LSQ_DEBUG("process connections with incoming packets"); 749 ENGINE_IN(engine); 750 process_connections(engine, conn_iter_next_incoming); 751 assert(TAILQ_EMPTY(&engine->conns_in)); 752 ENGINE_OUT(engine); 753} 754 755 756int 757lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 758{ 759 return !(engine->flags & ENG_PAST_DEADLINE) 760 && !TAILQ_EMPTY(&engine->conns_pend_rw); 761} 762 763 764void 765lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 766{ 767 LSQ_DEBUG("process connections with pending RW events"); 768 ENGINE_IN(engine); 769 process_connections(engine, conn_iter_next_rw_pend); 770 ENGINE_OUT(engine); 771} 772 773 774void 775lsquic_engine_destroy (lsquic_engine_t *engine) 776{ 777 lsquic_conn_t *conn; 778 779 LSQ_DEBUG("destroying engine"); 780#ifndef NDEBUG 781 engine->flags |= ENG_DTOR; 782#endif 783 784 while (engine->conns_out.oh_nelem > 0) 785 { 786 --engine->conns_out.oh_nelem; 787 conn = engine->conns_out.oh_elems[ 788 engine->conns_out.oh_nelem ].ohe_conn; 789 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 790 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 791 } 792 793 for (conn = conn_hash_first(&engine->full_conns); conn; 794 conn = conn_hash_next(&engine->full_conns)) 795 force_close_conn(engine, conn); 796 conn_hash_cleanup(&engine->full_conns); 797 798 799 attq_destroy(engine->attq); 800 801 assert(0 == engine->conns_out.oh_nelem); 802 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 803 lsquic_mm_cleanup(&engine->pub.enp_mm); 804 free(engine->conns_out.oh_elems); 805 free(engine); 806} 807 808 809#if __GNUC__ 810__attribute__((nonnull(3))) 811#endif 812static lsquic_conn_t * 813remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 814 lsquic_conn_t *conn, const char *reason) 815{ 816 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 817 if (conn->cn_flags & LSCONN_HAS_INCOMING) 818 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 819 if (conn->cn_flags & LSCONN_RW_PENDING) 820 { 821 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 822 EV_LOG_CONN_EVENT(conn->cn_cid, 823 "removed from pending RW queue (%s)", reason); 824 } 825 conn = engine_decref_conn(engine, conn, 826 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 827 assert(conn); 828 return conn; 829} 830 831 832static lsquic_conn_t * 833conn_iter_next_one (lsquic_engine_t *engine) 834{ 835 lsquic_conn_t *conn = engine->iter_state.one.conn; 836 if (conn) 837 { 838 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 839 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 840 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 841 conn_attq_expired(engine, conn)) 842 { 843 attq_remove(engine->attq, conn); 844 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 845 } 846 engine->iter_state.one.conn = NULL; 847 } 848 return conn; 849} 850 851 852int 853lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 854 void *conn_ctx, const char *hostname, 855 unsigned short max_packet_size) 856{ 857 lsquic_conn_t *conn; 858 859 if (engine->flags & ENG_SERVER) 860 { 861 LSQ_ERROR("`%s' must only be called in client mode", __func__); 862 return -1; 863 } 864 865 if (0 == max_packet_size) 866 { 867 switch (peer_sa->sa_family) 868 { 869 case AF_INET: 870 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 871 break; 872 default: 873 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 874 break; 875 } 876 } 877 878 conn = new_full_conn_client(engine, hostname, max_packet_size); 879 if (!conn) 880 return -1; 881 ENGINE_IN(engine); 882 lsquic_conn_record_peer_sa(conn, peer_sa); 883 conn->cn_peer_ctx = conn_ctx; 884 engine->iter_state.one.conn = conn; 885 process_connections(engine, conn_iter_next_one); 886 ENGINE_OUT(engine); 887 return 0; 888} 889 890 891static void 892remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 893{ 894 conn_hash_remove(&engine->full_conns, conn); 895 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 896} 897 898 899static void 900refflags2str (enum lsquic_conn_flags flags, char s[7]) 901{ 902 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 903 *s = 'H'; s += !!(flags & LSCONN_HASHED); 904 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 905 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 906 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 907 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 908 *s = '\0'; 909} 910 911 912static void 913engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 914{ 915 char str[7]; 916 assert(flag & CONN_REF_FLAGS); 917 assert(!(conn->cn_flags & flag)); 918 conn->cn_flags |= flag; 919 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 920 (refflags2str(conn->cn_flags, str), str)); 921} 922 923 924static lsquic_conn_t * 925engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 926 enum lsquic_conn_flags flags) 927{ 928 char str[7]; 929 assert(flags & CONN_REF_FLAGS); 930 assert(conn->cn_flags & flags); 931#ifndef NDEBUG 932 if (flags & LSCONN_CLOSING) 933 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 934#endif 935 conn->cn_flags &= ~flags; 936 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 937 (refflags2str(conn->cn_flags, str), str)); 938 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 939 { 940 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 941 destroy_conn(conn); 942 return NULL; 943 } 944 else 945 return conn; 946} 947 948 949/* This is not a general-purpose function. Only call from engine dtor. */ 950static void 951force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 952{ 953 assert(engine->flags & ENG_DTOR); 954 const enum lsquic_conn_flags flags = conn->cn_flags; 955 assert(conn->cn_flags & CONN_REF_FLAGS); 956 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 957 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 958 if (flags & LSCONN_HAS_INCOMING) 959 { 960 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 961 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 962 } 963 if (flags & LSCONN_RW_PENDING) 964 { 965 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 966 EV_LOG_CONN_EVENT(conn->cn_cid, 967 "removed from pending RW queue (engine destruction)"); 968 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 969 } 970 if (flags & LSCONN_ATTQ) 971 attq_remove(engine->attq, conn); 972 if (flags & LSCONN_HASHED) 973 remove_conn_from_hash(engine, conn); 974} 975 976 977/* Iterator for all connections. 978 * Returned connections are removed from the Incoming, Pending RW Event, 979 * and Advisory Tick Time queues if necessary. 980 */ 981static lsquic_conn_t * 982conn_iter_next_all (struct lsquic_engine *engine) 983{ 984 lsquic_conn_t *conn; 985 986 conn = conn_hash_next(&engine->full_conns); 987 988 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 989 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 990 if (conn && (conn->cn_flags & LSCONN_ATTQ) 991 && conn_attq_expired(engine, conn)) 992 { 993 attq_remove(engine->attq, conn); 994 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 995 } 996 997 return conn; 998} 999 1000 1001static lsquic_conn_t * 1002conn_iter_next_attq (struct lsquic_engine *engine) 1003{ 1004 lsquic_conn_t *conn; 1005 1006 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1007 if (conn) 1008 { 1009 assert(conn->cn_flags & LSCONN_ATTQ); 1010 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1011 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1012 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1013 } 1014 1015 return conn; 1016} 1017 1018 1019void 1020lsquic_engine_proc_all (lsquic_engine_t *engine) 1021{ 1022 ENGINE_IN(engine); 1023 /* We poke each connection every time as initial implementation. If it 1024 * proves to be too inefficient, we will need to figure out 1025 * a) when to stop processing; and 1026 * b) how to remember state between calls. 1027 */ 1028 conn_hash_reset_iter(&engine->full_conns); 1029 process_connections(engine, conn_iter_next_all); 1030 ENGINE_OUT(engine); 1031} 1032 1033 1034void 1035lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1036{ 1037 lsquic_time_t prev_min, cutoff; 1038 1039 LSQ_DEBUG("process connections in attq"); 1040 ENGINE_IN(engine); 1041 cutoff = lsquic_time_now(); 1042 prev_min = attq_set_min(engine->attq, cutoff); /* Prevent infinite loop */ 1043 engine->iter_state.attq.cutoff = cutoff; 1044 process_connections(engine, conn_iter_next_attq); 1045 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1046 ENGINE_OUT(engine); 1047} 1048 1049 1050static int 1051generate_header (const lsquic_packet_out_t *packet_out, 1052 const struct parse_funcs *pf, lsquic_cid_t cid, 1053 unsigned char *buf, size_t bufsz) 1054{ 1055 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1056 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1057 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1058 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1059 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1060} 1061 1062 1063static ssize_t 1064really_encrypt_packet (const lsquic_conn_t *conn, 1065 const lsquic_packet_out_t *packet_out, 1066 unsigned char *buf, size_t bufsz) 1067{ 1068 int enc, header_sz, is_hello_packet; 1069 size_t packet_sz; 1070 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1071 1072 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1073 header_buf, sizeof(header_buf)); 1074 if (header_sz < 0) 1075 return -1; 1076 1077 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1078 enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0, 1079 packet_out->po_packno, header_buf, header_sz, 1080 packet_out->po_data, packet_out->po_data_sz, 1081 buf, bufsz, &packet_sz, is_hello_packet); 1082 if (0 == enc) 1083 { 1084 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1085 "ciphertext is %zd bytes", 1086 packet_out->po_packno, 1087 lsquic_po_header_length(packet_out->po_flags) + 1088 packet_out->po_data_sz, 1089 packet_sz); 1090 return packet_sz; 1091 } 1092 else 1093 return -1; 1094} 1095 1096 1097static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1098encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1099 lsquic_packet_out_t *packet_out) 1100{ 1101 ssize_t enc_sz; 1102 size_t bufsz; 1103 unsigned char *buf; 1104 1105 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1106 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1107 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1108 if (!buf) 1109 { 1110 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1111 bufsz); 1112 return ENCPA_NOMEM; 1113 } 1114 1115 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1116 1117 if (enc_sz < 0) 1118 { 1119 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1120 return ENCPA_BADCRYPT; 1121 } 1122 1123 packet_out->po_enc_data = buf; 1124 packet_out->po_enc_data_sz = enc_sz; 1125 packet_out->po_flags |= PO_ENCRYPTED; 1126 1127 return ENCPA_OK; 1128} 1129 1130 1131struct out_batch 1132{ 1133 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 1134 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 1135 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 1136}; 1137 1138 1139STAILQ_HEAD(closed_conns, lsquic_conn); 1140 1141 1142struct conns_out_iter 1143{ 1144 struct out_heap *coi_heap; 1145 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1146 coi_inactive_list; 1147 lsquic_conn_t *coi_next; 1148#ifndef NDEBUG 1149 lsquic_time_t coi_last_sent; 1150#endif 1151}; 1152 1153 1154static void 1155coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1156{ 1157 iter->coi_heap = &engine->conns_out; 1158 iter->coi_next = NULL; 1159 TAILQ_INIT(&iter->coi_active_list); 1160 TAILQ_INIT(&iter->coi_inactive_list); 1161#ifndef NDEBUG 1162 iter->coi_last_sent = 0; 1163#endif 1164} 1165 1166 1167static lsquic_conn_t * 1168coi_next (struct conns_out_iter *iter) 1169{ 1170 lsquic_conn_t *conn; 1171 1172 if (iter->coi_heap->oh_nelem > 0) 1173 { 1174 conn = oh_pop(iter->coi_heap); 1175 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1176 conn->cn_flags |= LSCONN_COI_ACTIVE; 1177#ifndef NDEBUG 1178 if (iter->coi_last_sent) 1179 assert(iter->coi_last_sent <= conn->cn_last_sent); 1180 iter->coi_last_sent = conn->cn_last_sent; 1181#endif 1182 return conn; 1183 } 1184 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1185 { 1186 conn = iter->coi_next; 1187 if (!conn) 1188 conn = TAILQ_FIRST(&iter->coi_active_list); 1189 if (conn) 1190 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1191 return conn; 1192 } 1193 else 1194 return NULL; 1195} 1196 1197 1198static void 1199coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1200{ 1201 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1202 { 1203 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1204 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1205 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1206 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1207 conn->cn_flags |= LSCONN_COI_INACTIVE; 1208 } 1209} 1210 1211 1212static void 1213coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1214{ 1215 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1216 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1217 { 1218 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1219 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1220 } 1221} 1222 1223 1224static void 1225coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1226{ 1227 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1228 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1229 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1230 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1231 conn->cn_flags |= LSCONN_COI_ACTIVE; 1232} 1233 1234 1235static void 1236coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1237{ 1238 lsquic_conn_t *conn; 1239 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1240 { 1241 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1242 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1243 oh_insert(iter->coi_heap, conn); 1244 } 1245 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1246 { 1247 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1248 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1249 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1250 } 1251} 1252 1253 1254static unsigned 1255send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1256 struct out_batch *batch, unsigned n_to_send) 1257{ 1258 int n_sent, i; 1259 lsquic_time_t now; 1260 1261 /* Set sent time before the write to avoid underestimating RTT */ 1262 now = lsquic_time_now(); 1263 for (i = 0; i < (int) n_to_send; ++i) 1264 batch->packets[i]->po_sent = now; 1265 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1266 n_to_send); 1267 if (n_sent >= 0) 1268 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1269 else 1270 { 1271 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1272 n_sent = 0; 1273 } 1274 if (n_sent > 0) 1275 engine->last_sent = now + n_sent; 1276 for (i = 0; i < n_sent; ++i) 1277 { 1278 eng_hist_inc(&engine->history, now, sl_packets_out); 1279 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1280 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1281 batch->packets[i]); 1282 /* `i' is added to maintain relative order */ 1283 batch->conns[i]->cn_last_sent = now + i; 1284 /* Release packet out buffer as soon as the packet is sent 1285 * successfully. If not successfully sent, we hold on to 1286 * this buffer until the packet sending is attempted again 1287 * or until it times out and regenerated. 1288 */ 1289 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1290 { 1291 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1292 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1293 batch->packets[i]->po_enc_data); 1294 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1295 } 1296 } 1297 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1298 for ( ; i < (int) n_to_send; ++i) 1299 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1300 /* Return packets to the connection in reverse order so that the packet 1301 * ordering is maintained. 1302 */ 1303 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1304 { 1305 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1306 batch->packets[i]); 1307 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1308 coi_reactivate(conns_iter, batch->conns[i]); 1309 } 1310 return n_sent; 1311} 1312 1313 1314/* Return 1 if went past deadline, 0 otherwise */ 1315static int 1316check_deadline (lsquic_engine_t *engine) 1317{ 1318 if (engine->pub.enp_settings.es_proc_time_thresh && 1319 lsquic_time_now() > engine->deadline) 1320 { 1321 LSQ_INFO("went past threshold of %u usec, stop sending", 1322 engine->pub.enp_settings.es_proc_time_thresh); 1323 engine->flags |= ENG_PAST_DEADLINE; 1324 return 1; 1325 } 1326 else 1327 return 0; 1328} 1329 1330 1331static void 1332send_packets_out (struct lsquic_engine *engine, 1333 struct closed_conns *closed_conns) 1334{ 1335 unsigned n, w, n_sent, n_batches_sent; 1336 lsquic_packet_out_t *packet_out; 1337 lsquic_conn_t *conn; 1338 struct out_batch batch; 1339 struct conns_out_iter conns_iter; 1340 int shrink, deadline_exceeded; 1341 1342 coi_init(&conns_iter, engine); 1343 n_batches_sent = 0; 1344 n_sent = 0, n = 0; 1345 shrink = 0; 1346 deadline_exceeded = 0; 1347 1348 while ((conn = coi_next(&conns_iter))) 1349 { 1350 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1351 if (!packet_out) { 1352 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1353 conn->cn_cid); 1354 coi_deactivate(&conns_iter, conn); 1355 continue; 1356 } 1357 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1358 { 1359 switch (encrypt_packet(engine, conn, packet_out)) 1360 { 1361 case ENCPA_NOMEM: 1362 /* Send what we have and wait for a more opportune moment */ 1363 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1364 goto end_for; 1365 case ENCPA_BADCRYPT: 1366 /* This is pretty bad: close connection immediately */ 1367 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1368 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1369 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1370 { 1371 if (!(conn->cn_flags & LSCONN_CLOSING)) 1372 { 1373 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1374 engine_incref_conn(conn, LSCONN_CLOSING); 1375 if (conn->cn_flags & LSCONN_HASHED) 1376 remove_conn_from_hash(engine, conn); 1377 } 1378 coi_remove(&conns_iter, conn); 1379 } 1380 continue; 1381 case ENCPA_OK: 1382 break; 1383 } 1384 } 1385 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1386 packet_out->po_packno, conn->cn_cid); 1387 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1388 if (packet_out->po_flags & PO_ENCRYPTED) 1389 { 1390 batch.outs[n].buf = packet_out->po_enc_data; 1391 batch.outs[n].sz = packet_out->po_enc_data_sz; 1392 } 1393 else 1394 { 1395 batch.outs[n].buf = packet_out->po_data; 1396 batch.outs[n].sz = packet_out->po_data_sz; 1397 } 1398 batch.outs [n].peer_ctx = conn->cn_peer_ctx; 1399 batch.outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1400 batch.outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1401 batch.conns [n] = conn; 1402 batch.packets[n] = packet_out; 1403 ++n; 1404 if (n == engine->batch_size) 1405 { 1406 n = 0; 1407 w = send_batch(engine, &conns_iter, &batch, engine->batch_size); 1408 ++n_batches_sent; 1409 n_sent += w; 1410 if (w < engine->batch_size) 1411 { 1412 shrink = 1; 1413 break; 1414 } 1415 deadline_exceeded = check_deadline(engine); 1416 if (deadline_exceeded) 1417 break; 1418 grow_batch_size(engine); 1419 } 1420 } 1421 end_for: 1422 1423 if (n > 0) { 1424 w = send_batch(engine, &conns_iter, &batch, n); 1425 n_sent += w; 1426 shrink = w < n; 1427 ++n_batches_sent; 1428 deadline_exceeded = check_deadline(engine); 1429 } 1430 1431 if (shrink) 1432 shrink_batch_size(engine); 1433 else if (n_batches_sent > 1 && !deadline_exceeded) 1434 grow_batch_size(engine); 1435 1436 coi_reheap(&conns_iter, engine); 1437 1438 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1439} 1440 1441 1442int 1443lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1444{ 1445 return !(engine->flags & ENG_PAST_DEADLINE) 1446 && ( engine->conns_out.oh_nelem > 0 1447 ) 1448 ; 1449} 1450 1451 1452static void 1453reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1454{ 1455 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1456 engine->flags &= ~ENG_PAST_DEADLINE; 1457} 1458 1459 1460/* TODO: this is a user-facing function, account for load */ 1461void 1462lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1463{ 1464 lsquic_conn_t *conn; 1465 struct closed_conns closed_conns; 1466 1467 STAILQ_INIT(&closed_conns); 1468 reset_deadline(engine, lsquic_time_now()); 1469 1470 send_packets_out(engine, &closed_conns); 1471 1472 while ((conn = STAILQ_FIRST(&closed_conns))) { 1473 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1474 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1475 } 1476 1477} 1478 1479 1480static void 1481process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1482{ 1483 lsquic_conn_t *conn; 1484 enum tick_st tick_st; 1485 lsquic_time_t now = lsquic_time_now(); 1486 struct closed_conns closed_conns; 1487 1488 engine->proc_time = now; 1489 eng_hist_tick(&engine->history, now); 1490 1491 STAILQ_INIT(&closed_conns); 1492 reset_deadline(engine, now); 1493 1494 while ((conn = next_conn(engine))) 1495 { 1496 tick_st = conn->cn_if->ci_tick(conn, now); 1497 if (conn_iter_next_rw_pend == next_conn) 1498 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1499 if (tick_st & TICK_SEND) 1500 { 1501 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1502 { 1503 oh_insert(&engine->conns_out, conn); 1504 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1505 } 1506 } 1507 if (tick_st & TICK_CLOSE) 1508 { 1509 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1510 engine_incref_conn(conn, LSCONN_CLOSING); 1511 if (conn->cn_flags & LSCONN_HASHED) 1512 remove_conn_from_hash(engine, conn); 1513 } 1514 } 1515 1516 if (lsquic_engine_has_unsent_packets(engine)) 1517 send_packets_out(engine, &closed_conns); 1518 1519 while ((conn = STAILQ_FIRST(&closed_conns))) { 1520 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1521 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1522 } 1523 1524} 1525 1526 1527/* Return 0 if packet is being processed by a real connection, 1 if the 1528 * packet was processed, but not by a connection, and -1 on error. 1529 */ 1530int 1531lsquic_engine_packet_in (lsquic_engine_t *engine, 1532 const unsigned char *packet_in_data, size_t packet_in_size, 1533 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1534 void *peer_ctx) 1535{ 1536 struct packin_parse_state ppstate; 1537 lsquic_packet_in_t *packet_in; 1538 1539 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1540 { 1541 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1542 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1543 errno = E2BIG; 1544 return -1; 1545 } 1546 1547 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1548 if (!packet_in) 1549 return -1; 1550 1551 /* Library does not modify packet_in_data, it is not referenced after 1552 * this function returns and subsequent release of pi_data is guarded 1553 * by PI_OWN_DATA flag. 1554 */ 1555 packet_in->pi_data = (unsigned char *) packet_in_data; 1556 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1557 engine->flags & ENG_SERVER, &ppstate)) 1558 { 1559 LSQ_DEBUG("Cannot parse incoming packet's header"); 1560 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1561 errno = EINVAL; 1562 return -1; 1563 } 1564 1565 packet_in->pi_received = lsquic_time_now(); 1566 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1567 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1568 peer_ctx); 1569} 1570 1571 1572#if __GNUC__ && !defined(NDEBUG) 1573__attribute__((weak)) 1574#endif 1575unsigned 1576lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1577{ 1578 return engine->pub.enp_settings.es_versions; 1579} 1580 1581 1582int 1583lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1584{ 1585 const lsquic_time_t *next_time; 1586 lsquic_time_t now; 1587 1588 next_time = attq_next_time(engine->attq); 1589 if (!next_time) 1590 return 0; 1591 1592 now = lsquic_time_now(); 1593 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1594 return 1; 1595} 1596 1597 1598unsigned 1599lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1600{ 1601 lsquic_time_t now; 1602 now = lsquic_time_now(); 1603 if (from_now < 0) 1604 now -= from_now; 1605 else 1606 now += from_now; 1607 return attq_count_before(engine->attq, now); 1608} 1609 1610 1611