lsquic_engine.c revision 50aadb33
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/time.h> 15#include <time.h> 16#include <netinet/in.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <fcntl.h> 20#include <unistd.h> 21#include <netdb.h> 22 23#ifndef NDEBUG 24#include <sys/types.h> 25#include <regex.h> /* For code that loses packets */ 26#endif 27 28 29 30#include "lsquic.h" 31#include "lsquic_types.h" 32#include "lsquic_alarmset.h" 33#include "lsquic_parse.h" 34#include "lsquic_packet_in.h" 35#include "lsquic_packet_out.h" 36#include "lsquic_senhist.h" 37#include "lsquic_rtt.h" 38#include "lsquic_cubic.h" 39#include "lsquic_pacer.h" 40#include "lsquic_send_ctl.h" 41#include "lsquic_set.h" 42#include "lsquic_conn_flow.h" 43#include "lsquic_sfcw.h" 44#include "lsquic_stream.h" 45#include "lsquic_conn.h" 46#include "lsquic_full_conn.h" 47#include "lsquic_util.h" 48#include "lsquic_qtags.h" 49#include "lsquic_handshake.h" 50#include "lsquic_mm.h" 51#include "lsquic_conn_hash.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_eng_hist.h" 54#include "lsquic_ev_log.h" 55#include "lsquic_version.h" 56#include "lsquic_attq.h" 57 58#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 59#include "lsquic_logger.h" 60 61 62/* The batch of outgoing packets grows and shrinks dynamically */ 63#define MAX_OUT_BATCH_SIZE 1024 64#define MIN_OUT_BATCH_SIZE 256 65#define INITIAL_OUT_BATCH_SIZE 512 66 67typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 68 69static void 70process_connections (struct lsquic_engine *engine, conn_iter_f iter); 71 72static void 73engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 74 75static lsquic_conn_t * 76engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 77 enum lsquic_conn_flags flag); 78 79static void 80force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 81 82/* Nested calls to LSQUIC are not supported */ 83#define ENGINE_IN(e) do { \ 84 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 85 (e)->pub.enp_flags |= ENPUB_PROC; \ 86} while (0) 87 88#define ENGINE_OUT(e) do { \ 89 assert((e)->pub.enp_flags & ENPUB_PROC); \ 90 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 91} while (0) 92 93/* A connection can be referenced from one of six places: 94 * 95 * 1. Connection hash: a connection starts its life in one of those. 96 * 97 * 2. Outgoing queue. 98 * 99 * 3. Incoming queue. 100 * 101 * 4. Pending RW Events queue. 102 * 103 * 5. Advisory Tick Time queue. 104 * 105 * 6. Closing connections queue. This is a transient queue -- it only 106 * exists for the duration of process_connections() function call. 107 * 108 * The idea is to destroy the connection when it is no longer referenced. 109 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 110 * that case, the connection is referenced from two places: (2) and (6). 111 * After its packets are sent, it is only referenced in (6), and at the 112 * end of the function call, when it is removed from (6), reference count 113 * goes to zero and the connection is destroyed. If not all packets can 114 * be sent, at the end of the function call, the connection is referenced 115 * by (2) and will only be removed once all outgoing packets have been 116 * sent. 117 */ 118#define CONN_REF_FLAGS (LSCONN_HASHED \ 119 |LSCONN_HAS_OUTGOING \ 120 |LSCONN_HAS_INCOMING \ 121 |LSCONN_RW_PENDING \ 122 |LSCONN_CLOSING \ 123 |LSCONN_ATTQ) 124 125 126struct out_heap_elem 127{ 128 struct lsquic_conn *ohe_conn; 129 lsquic_time_t ohe_last_sent; 130}; 131 132 133struct out_heap 134{ 135 struct out_heap_elem *oh_elems; 136 unsigned oh_nalloc, 137 oh_nelem; 138}; 139 140 141struct lsquic_engine 142{ 143 struct lsquic_engine_public pub; 144 enum { 145 ENG_SERVER = LSENG_SERVER, 146 ENG_HTTP = LSENG_HTTP, 147 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 148 ENG_PAST_DEADLINE 149 = (1 << 8), /* Previous call to a processing 150 * function went past time threshold. 151 */ 152#ifndef NDEBUG 153 ENG_DTOR = (1 << 26), /* Engine destructor */ 154#endif 155 } flags; 156 const struct lsquic_stream_if *stream_if; 157 void *stream_if_ctx; 158 lsquic_packets_out_f packets_out; 159 void *packets_out_ctx; 160 void *bad_handshake_ctx; 161 struct conn_hash full_conns; 162 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 163 struct out_heap conns_out; 164 /* Use a union because only one iterator is being used at any one time */ 165 union { 166 struct { 167 /* This iterator does not have any state: it uses `conns_in' */ 168 } conn_in; 169 struct { 170 /* This iterator does not have any state: it uses `conns_pend_rw' */ 171 } rw_pend; 172 struct { 173 /* Iterator state to process connections in Advisory Tick Time 174 * queue. 175 */ 176 lsquic_time_t cutoff; 177 } attq; 178 struct { 179 /* Iterator state to process all connections */ 180 } all; 181 struct { 182 lsquic_conn_t *conn; 183 } one; 184 } iter_state; 185 struct eng_hist history; 186 unsigned batch_size; 187 unsigned time_until_desired_tick; 188 struct attq *attq; 189 lsquic_time_t proc_time; 190 /* Track time last time a packet was sent to give new connections 191 * priority lower than that of existing connections. 192 */ 193 lsquic_time_t last_sent; 194 lsquic_time_t deadline; 195}; 196 197 198#define OHE_PARENT(i) ((i - 1) / 2) 199#define OHE_LCHILD(i) (2 * i + 1) 200#define OHE_RCHILD(i) (2 * i + 2) 201 202 203static void 204heapify_out_heap (struct out_heap *heap, unsigned i) 205{ 206 struct out_heap_elem el; 207 unsigned smallest; 208 209 assert(i < heap->oh_nelem); 210 211 if (OHE_LCHILD(i) < heap->oh_nelem) 212 { 213 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 214 heap->oh_elems[ i ].ohe_last_sent) 215 smallest = OHE_LCHILD(i); 216 else 217 smallest = i; 218 if (OHE_RCHILD(i) < heap->oh_nelem && 219 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 220 heap->oh_elems[ smallest ].ohe_last_sent) 221 smallest = OHE_RCHILD(i); 222 } 223 else 224 smallest = i; 225 226 if (smallest != i) 227 { 228 el = heap->oh_elems[ smallest ]; 229 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 230 heap->oh_elems[ i ] = el; 231 heapify_out_heap(heap, smallest); 232 } 233} 234 235 236static void 237oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 238{ 239 struct out_heap_elem el; 240 unsigned nalloc, i; 241 242 if (heap->oh_nelem == heap->oh_nalloc) 243 { 244 if (0 == heap->oh_nalloc) 245 nalloc = 4; 246 else 247 nalloc = heap->oh_nalloc * 2; 248 heap->oh_elems = realloc(heap->oh_elems, 249 nalloc * sizeof(heap->oh_elems[0])); 250 if (!heap->oh_elems) 251 { /* Not much we can do here */ 252 LSQ_ERROR("realloc failed"); 253 return; 254 } 255 heap->oh_nalloc = nalloc; 256 } 257 258 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 259 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 260 ++heap->oh_nelem; 261 262 i = heap->oh_nelem - 1; 263 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 264 heap->oh_elems[ i ].ohe_last_sent) 265 { 266 el = heap->oh_elems[ OHE_PARENT(i) ]; 267 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 268 heap->oh_elems[ i ] = el; 269 i = OHE_PARENT(i); 270 } 271} 272 273 274static struct lsquic_conn * 275oh_pop (struct out_heap *heap) 276{ 277 struct lsquic_conn *conn; 278 279 assert(heap->oh_nelem); 280 281 conn = heap->oh_elems[0].ohe_conn; 282 --heap->oh_nelem; 283 if (heap->oh_nelem > 0) 284 { 285 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 286 heapify_out_heap(heap, 0); 287 } 288 289 return conn; 290} 291 292 293void 294lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 295 unsigned flags) 296{ 297 memset(settings, 0, sizeof(*settings)); 298 settings->es_versions = LSQUIC_DF_VERSIONS; 299 if (flags & ENG_SERVER) 300 { 301 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 302 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 303 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 304 } 305 else 306 { 307 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 308 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 309 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 310 } 311 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 312 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 313 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 314 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 315 settings->es_max_header_list_size 316 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 317 settings->es_ua = LSQUIC_DF_UA; 318 319 settings->es_pdmd = QTAG_X509; 320 settings->es_aead = QTAG_AESG; 321 settings->es_kexs = QTAG_C255; 322 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 323 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 324 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 325 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 326 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 327 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 328 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 329 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 330 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 331} 332 333 334/* Note: if returning an error, err_buf must be valid if non-NULL */ 335int 336lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 337 unsigned flags, 338 char *err_buf, size_t err_buf_sz) 339{ 340 if (settings->es_cfcw < LSQUIC_MIN_FCW || 341 settings->es_sfcw < LSQUIC_MIN_FCW) 342 { 343 if (err_buf) 344 snprintf(err_buf, err_buf_sz, "%s", 345 "flow control window set too low"); 346 return -1; 347 } 348 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 349 { 350 if (err_buf) 351 snprintf(err_buf, err_buf_sz, "%s", 352 "No supported QUIC versions specified"); 353 return -1; 354 } 355 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 356 { 357 if (err_buf) 358 snprintf(err_buf, err_buf_sz, "%s", 359 "one or more unsupported QUIC version is specified"); 360 return -1; 361 } 362 return 0; 363} 364 365 366static void 367free_packet (void *ctx, unsigned char *packet_data) 368{ 369 free(packet_data); 370} 371 372 373static void * 374malloc_buf (void *ctx, size_t size) 375{ 376 return malloc(size); 377} 378 379 380static const struct lsquic_packout_mem_if stock_pmi = 381{ 382 malloc_buf, (void(*)(void *, void *)) free_packet, 383}; 384 385 386lsquic_engine_t * 387lsquic_engine_new (unsigned flags, 388 const struct lsquic_engine_api *api) 389{ 390 lsquic_engine_t *engine; 391 int tag_buf_len; 392 char err_buf[100]; 393 394 if (!api->ea_packets_out) 395 { 396 LSQ_ERROR("packets_out callback is not specified"); 397 return NULL; 398 } 399 400 if (api->ea_settings && 401 0 != lsquic_engine_check_settings(api->ea_settings, flags, 402 err_buf, sizeof(err_buf))) 403 { 404 LSQ_ERROR("cannot create engine: %s", err_buf); 405 return NULL; 406 } 407 408 engine = calloc(1, sizeof(*engine)); 409 if (!engine) 410 return NULL; 411 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 412 { 413 free(engine); 414 return NULL; 415 } 416 if (api->ea_settings) 417 engine->pub.enp_settings = *api->ea_settings; 418 else 419 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 420 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 421 sizeof(engine->pub.enp_ver_tags_buf), 422 engine->pub.enp_settings.es_versions); 423 if (tag_buf_len <= 0) 424 { 425 LSQ_ERROR("cannot generate version tags buffer"); 426 free(engine); 427 return NULL; 428 } 429 engine->pub.enp_ver_tags_len = tag_buf_len; 430 431 engine->flags = flags; 432 engine->stream_if = api->ea_stream_if; 433 engine->stream_if_ctx = api->ea_stream_if_ctx; 434 engine->packets_out = api->ea_packets_out; 435 engine->packets_out_ctx = api->ea_packets_out_ctx; 436 if (api->ea_pmi) 437 { 438 engine->pub.enp_pmi = api->ea_pmi; 439 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 440 } 441 else 442 { 443 engine->pub.enp_pmi = &stock_pmi; 444 engine->pub.enp_pmi_ctx = NULL; 445 } 446 engine->pub.enp_engine = engine; 447 TAILQ_INIT(&engine->conns_in); 448 TAILQ_INIT(&engine->conns_pend_rw); 449 conn_hash_init(&engine->full_conns, ~0); 450 engine->attq = attq_create(); 451 eng_hist_init(&engine->history); 452 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 453 454 455 LSQ_INFO("instantiated engine"); 456 return engine; 457} 458 459 460static void 461grow_batch_size (struct lsquic_engine *engine) 462{ 463 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 464} 465 466 467static void 468shrink_batch_size (struct lsquic_engine *engine) 469{ 470 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 471} 472 473 474/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */ 475static void 476destroy_conn (lsquic_conn_t *conn) 477{ 478 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 479 conn->cn_if->ci_destroy(conn); 480} 481 482 483static lsquic_conn_t * 484new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 485 unsigned short max_packet_size) 486{ 487 lsquic_conn_t *conn; 488 unsigned flags; 489 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 490 conn = full_conn_client_new(&engine->pub, engine->stream_if, 491 engine->stream_if_ctx, flags, hostname, max_packet_size); 492 if (!conn) 493 return NULL; 494 if (0 != conn_hash_add_new(&engine->full_conns, conn)) 495 { 496 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 497 conn->cn_cid); 498 destroy_conn(conn); 499 return NULL; 500 } 501 assert(!(conn->cn_flags & 502 (CONN_REF_FLAGS 503 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 504 callbacks */ 505 ))); 506 conn->cn_flags |= LSCONN_HASHED; 507 return conn; 508} 509 510 511static lsquic_conn_t * 512find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 513 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 514 void *peer_ctx) 515{ 516 lsquic_conn_t *conn; 517 518 if (lsquic_packet_in_is_prst(packet_in) 519 && !engine->pub.enp_settings.es_honor_prst) 520 { 521 LSQ_DEBUG("public reset packet: discarding"); 522 return NULL; 523 } 524 525 if (!(packet_in->pi_flags & PI_CONN_ID)) 526 { 527 LSQ_DEBUG("packet header does not have connection ID: discarding"); 528 return NULL; 529 } 530 531 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL); 532 if (conn) 533 { 534 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 535 return conn; 536 } 537 538 return conn; 539} 540 541 542static void 543add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 544 enum rw_reason reason) 545{ 546 int hist_idx; 547 548 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 549 engine_incref_conn(conn, LSCONN_RW_PENDING); 550 551 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 552 conn->cn_rw_hist_buf[ hist_idx ] = reason; 553 ++conn->cn_rw_hist_idx; 554 555 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 556 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 557 "rw_hist: %.*s", (char) reason, 558 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 559 else 560 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 561 (char) reason); 562} 563 564 565#if !defined(NDEBUG) && __GNUC__ 566__attribute__((weak)) 567#endif 568void 569lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 570 lsquic_conn_t *conn, enum rw_reason reason) 571{ 572 if (0 == (enpub->enp_flags & ENPUB_PROC) && 573 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 574 { 575 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 576 add_conn_to_pend_rw(engine, conn, reason); 577 } 578} 579 580 581void 582lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 583 lsquic_conn_t *conn, lsquic_time_t tick_time) 584{ 585 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 586 /* Instead of performing an update, we simply remove the connection from 587 * the queue and add it back. This should not happen in at the time of 588 * this writing. 589 */ 590 if (conn->cn_flags & LSCONN_ATTQ) 591 { 592 attq_remove(engine->attq, conn); 593 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 594 } 595 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 596 0 == attq_maybe_add(engine->attq, conn, tick_time)) 597 engine_incref_conn(conn, LSCONN_ATTQ); 598} 599 600 601static void 602update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 603 int progress_made) 604{ 605 rw_hist_idx_t hist_idx; 606 const unsigned char *empty; 607 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 608 609 if (!pendrw_check) 610 return; 611 612 /* Convert previous entry to uppercase: */ 613 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 614 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 615 616 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 617 if (progress_made) 618 { 619 conn->cn_noprogress_count = 0; 620 return; 621 } 622 623 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 624 "no progress"); 625 ++conn->cn_noprogress_count; 626 if (conn->cn_noprogress_count <= pendrw_check) 627 return; 628 629 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 630 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 631 sizeof(conn->cn_rw_hist_buf)); 632 if (empty) 633 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 634 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 635 conn->cn_cid, conn->cn_noprogress_count, 636 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 637 else 638 { 639 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 640 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 641 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 642 conn->cn_cid, conn->cn_noprogress_count, 643 /* First part of history: */ 644 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 645 conn->cn_rw_hist_buf + hist_idx, 646 /* Second part of history: */ 647 hist_idx, conn->cn_rw_hist_buf); 648 } 649} 650 651 652/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 653static int 654process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 655 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 656 const struct sockaddr *sa_peer, void *peer_ctx) 657{ 658 lsquic_conn_t *conn; 659 660 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 661 if (!conn) 662 { 663 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 664 return 1; 665 } 666 667 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 668 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 669 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 670 } 671 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 672 lsquic_packet_in_upref(packet_in); 673 conn->cn_peer_ctx = peer_ctx; 674 conn->cn_if->ci_packet_in(conn, packet_in); 675 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 676 return 0; 677} 678 679 680static int 681conn_attq_expired (const struct lsquic_engine *engine, 682 const lsquic_conn_t *conn) 683{ 684 assert(conn->cn_attq_elem); 685 return lsquic_conn_adv_time(conn) < engine->proc_time; 686} 687 688 689/* Iterator for connections with incoming packets */ 690static lsquic_conn_t * 691conn_iter_next_incoming (struct lsquic_engine *engine) 692{ 693 enum lsquic_conn_flags addl_flags; 694 lsquic_conn_t *conn; 695 while ((conn = TAILQ_FIRST(&engine->conns_in))) 696 { 697 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 698 if (conn->cn_flags & LSCONN_RW_PENDING) 699 { 700 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 701 EV_LOG_CONN_EVENT(conn->cn_cid, 702 "removed from pending RW queue (processing incoming)"); 703 } 704 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 705 { 706 addl_flags = LSCONN_ATTQ; 707 attq_remove(engine->attq, conn); 708 } 709 else 710 addl_flags = 0; 711 conn = engine_decref_conn(engine, conn, 712 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 713 if (conn) 714 break; 715 } 716 return conn; 717} 718 719 720/* Iterator for connections with that have pending read/write events */ 721static lsquic_conn_t * 722conn_iter_next_rw_pend (struct lsquic_engine *engine) 723{ 724 enum lsquic_conn_flags addl_flags; 725 lsquic_conn_t *conn; 726 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 727 { 728 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 729 EV_LOG_CONN_EVENT(conn->cn_cid, 730 "removed from pending RW queue (processing pending RW conns)"); 731 if (conn->cn_flags & LSCONN_HAS_INCOMING) 732 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 733 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 734 { 735 addl_flags = LSCONN_ATTQ; 736 attq_remove(engine->attq, conn); 737 } 738 else 739 addl_flags = 0; 740 conn = engine_decref_conn(engine, conn, 741 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 742 if (conn) 743 break; 744 } 745 return conn; 746} 747 748 749void 750lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 751{ 752 LSQ_DEBUG("process connections with incoming packets"); 753 ENGINE_IN(engine); 754 process_connections(engine, conn_iter_next_incoming); 755 assert(TAILQ_EMPTY(&engine->conns_in)); 756 ENGINE_OUT(engine); 757} 758 759 760int 761lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 762{ 763 return !(engine->flags & ENG_PAST_DEADLINE) 764 && !TAILQ_EMPTY(&engine->conns_pend_rw); 765} 766 767 768void 769lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 770{ 771 LSQ_DEBUG("process connections with pending RW events"); 772 ENGINE_IN(engine); 773 process_connections(engine, conn_iter_next_rw_pend); 774 ENGINE_OUT(engine); 775} 776 777 778void 779lsquic_engine_destroy (lsquic_engine_t *engine) 780{ 781 lsquic_conn_t *conn; 782 783 LSQ_DEBUG("destroying engine"); 784#ifndef NDEBUG 785 engine->flags |= ENG_DTOR; 786#endif 787 788 while (engine->conns_out.oh_nelem > 0) 789 { 790 --engine->conns_out.oh_nelem; 791 conn = engine->conns_out.oh_elems[ 792 engine->conns_out.oh_nelem ].ohe_conn; 793 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 794 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 795 } 796 797 for (conn = conn_hash_first(&engine->full_conns); conn; 798 conn = conn_hash_next(&engine->full_conns)) 799 force_close_conn(engine, conn); 800 conn_hash_cleanup(&engine->full_conns); 801 802 803 attq_destroy(engine->attq); 804 805 assert(0 == engine->conns_out.oh_nelem); 806 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 807 lsquic_mm_cleanup(&engine->pub.enp_mm); 808 free(engine->conns_out.oh_elems); 809 free(engine); 810} 811 812 813#if __GNUC__ 814__attribute__((nonnull(3))) 815#endif 816static lsquic_conn_t * 817remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 818 lsquic_conn_t *conn, const char *reason) 819{ 820 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 821 if (conn->cn_flags & LSCONN_HAS_INCOMING) 822 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 823 if (conn->cn_flags & LSCONN_RW_PENDING) 824 { 825 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 826 EV_LOG_CONN_EVENT(conn->cn_cid, 827 "removed from pending RW queue (%s)", reason); 828 } 829 conn = engine_decref_conn(engine, conn, 830 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 831 assert(conn); 832 return conn; 833} 834 835 836static lsquic_conn_t * 837conn_iter_next_one (lsquic_engine_t *engine) 838{ 839 lsquic_conn_t *conn = engine->iter_state.one.conn; 840 if (conn) 841 { 842 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 843 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 844 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 845 conn_attq_expired(engine, conn)) 846 { 847 attq_remove(engine->attq, conn); 848 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 849 } 850 engine->iter_state.one.conn = NULL; 851 } 852 return conn; 853} 854 855 856int 857lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 858 void *conn_ctx, const char *hostname, 859 unsigned short max_packet_size) 860{ 861 lsquic_conn_t *conn; 862 863 if (engine->flags & ENG_SERVER) 864 { 865 LSQ_ERROR("`%s' must only be called in client mode", __func__); 866 return -1; 867 } 868 869 if (0 == max_packet_size) 870 { 871 switch (peer_sa->sa_family) 872 { 873 case AF_INET: 874 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 875 break; 876 default: 877 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 878 break; 879 } 880 } 881 882 conn = new_full_conn_client(engine, hostname, max_packet_size); 883 if (!conn) 884 return -1; 885 lsquic_conn_record_peer_sa(conn, peer_sa); 886 conn->cn_peer_ctx = conn_ctx; 887 engine->iter_state.one.conn = conn; 888 process_connections(engine, conn_iter_next_one); 889 return 0; 890} 891 892 893static void 894remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 895{ 896 conn_hash_remove(&engine->full_conns, conn); 897 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 898} 899 900 901static void 902refflags2str (enum lsquic_conn_flags flags, char s[7]) 903{ 904 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 905 *s = 'H'; s += !!(flags & LSCONN_HASHED); 906 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 907 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 908 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 909 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 910 *s = '\0'; 911} 912 913 914static void 915engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 916{ 917 char str[7]; 918 assert(flag & CONN_REF_FLAGS); 919 assert(!(conn->cn_flags & flag)); 920 conn->cn_flags |= flag; 921 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 922 (refflags2str(conn->cn_flags, str), str)); 923} 924 925 926static lsquic_conn_t * 927engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 928 enum lsquic_conn_flags flags) 929{ 930 char str[7]; 931 assert(flags & CONN_REF_FLAGS); 932 assert(conn->cn_flags & flags); 933#ifndef NDEBUG 934 if (flags & LSCONN_CLOSING) 935 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 936#endif 937 conn->cn_flags &= ~flags; 938 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 939 (refflags2str(conn->cn_flags, str), str)); 940 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 941 { 942 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 943 destroy_conn(conn); 944 return NULL; 945 } 946 else 947 return conn; 948} 949 950 951/* This is not a general-purpose function. Only call from engine dtor. */ 952static void 953force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 954{ 955 assert(engine->flags & ENG_DTOR); 956 const enum lsquic_conn_flags flags = conn->cn_flags; 957 assert(conn->cn_flags & CONN_REF_FLAGS); 958 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 959 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 960 if (flags & LSCONN_HAS_INCOMING) 961 { 962 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 963 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 964 } 965 if (flags & LSCONN_RW_PENDING) 966 { 967 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 968 EV_LOG_CONN_EVENT(conn->cn_cid, 969 "removed from pending RW queue (engine destruction)"); 970 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 971 } 972 if (flags & LSCONN_ATTQ) 973 attq_remove(engine->attq, conn); 974 if (flags & LSCONN_HASHED) 975 remove_conn_from_hash(engine, conn); 976} 977 978 979/* Iterator for all connections. 980 * Returned connections are removed from the Incoming, Pending RW Event, 981 * and Advisory Tick Time queues if necessary. 982 */ 983static lsquic_conn_t * 984conn_iter_next_all (struct lsquic_engine *engine) 985{ 986 lsquic_conn_t *conn; 987 988 conn = conn_hash_next(&engine->full_conns); 989 990 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 991 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 992 if (conn && (conn->cn_flags & LSCONN_ATTQ) 993 && conn_attq_expired(engine, conn)) 994 { 995 attq_remove(engine->attq, conn); 996 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 997 } 998 999 return conn; 1000} 1001 1002 1003static lsquic_conn_t * 1004conn_iter_next_attq (struct lsquic_engine *engine) 1005{ 1006 lsquic_conn_t *conn; 1007 1008 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1009 if (conn) 1010 { 1011 assert(conn->cn_flags & LSCONN_ATTQ); 1012 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1013 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1014 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1015 } 1016 1017 return conn; 1018} 1019 1020 1021void 1022lsquic_engine_proc_all (lsquic_engine_t *engine) 1023{ 1024 ENGINE_IN(engine); 1025 /* We poke each connection every time as initial implementation. If it 1026 * proves to be too inefficient, we will need to figure out 1027 * a) when to stop processing; and 1028 * b) how to remember state between calls. 1029 */ 1030 conn_hash_reset_iter(&engine->full_conns); 1031 process_connections(engine, conn_iter_next_all); 1032 ENGINE_OUT(engine); 1033} 1034 1035 1036void 1037lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1038{ 1039 lsquic_time_t prev_min, cutoff; 1040 1041 LSQ_DEBUG("process connections in attq"); 1042 ENGINE_IN(engine); 1043 cutoff = lsquic_time_now(); 1044 prev_min = attq_set_min(engine->attq, cutoff); /* Prevent infinite loop */ 1045 engine->iter_state.attq.cutoff = cutoff; 1046 process_connections(engine, conn_iter_next_attq); 1047 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1048 ENGINE_OUT(engine); 1049} 1050 1051 1052static int 1053generate_header (const lsquic_packet_out_t *packet_out, 1054 const struct parse_funcs *pf, lsquic_cid_t cid, 1055 unsigned char *buf, size_t bufsz) 1056{ 1057 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1058 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1059 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1060 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1061 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1062} 1063 1064 1065static ssize_t 1066really_encrypt_packet (const lsquic_conn_t *conn, 1067 const lsquic_packet_out_t *packet_out, 1068 unsigned char *buf, size_t bufsz) 1069{ 1070 int enc, header_sz, is_hello_packet; 1071 size_t packet_sz; 1072 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1073 1074 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1075 header_buf, sizeof(header_buf)); 1076 if (header_sz < 0) 1077 return -1; 1078 1079 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1080 enc = lsquic_enc(conn->cn_enc_session, conn->cn_version, 0, 1081 packet_out->po_packno, header_buf, header_sz, 1082 packet_out->po_data, packet_out->po_data_sz, 1083 buf, bufsz, &packet_sz, is_hello_packet); 1084 if (0 == enc) 1085 { 1086 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1087 "ciphertext is %zd bytes", 1088 packet_out->po_packno, 1089 lsquic_po_header_length(packet_out->po_flags) + 1090 packet_out->po_data_sz, 1091 packet_sz); 1092 return packet_sz; 1093 } 1094 else 1095 return -1; 1096} 1097 1098 1099static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1100encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1101 lsquic_packet_out_t *packet_out) 1102{ 1103 ssize_t enc_sz; 1104 size_t bufsz; 1105 unsigned char *buf; 1106 1107 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1108 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1109 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1110 if (!buf) 1111 { 1112 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1113 bufsz); 1114 return ENCPA_NOMEM; 1115 } 1116 1117 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1118 1119 if (enc_sz < 0) 1120 { 1121 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1122 return ENCPA_BADCRYPT; 1123 } 1124 1125 packet_out->po_enc_data = buf; 1126 packet_out->po_enc_data_sz = enc_sz; 1127 packet_out->po_flags |= PO_ENCRYPTED; 1128 1129 return ENCPA_OK; 1130} 1131 1132 1133struct out_batch 1134{ 1135 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 1136 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 1137 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 1138}; 1139 1140 1141STAILQ_HEAD(closed_conns, lsquic_conn); 1142 1143 1144struct conns_out_iter 1145{ 1146 struct out_heap *coi_heap; 1147 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1148 coi_inactive_list; 1149 lsquic_conn_t *coi_next; 1150#ifndef NDEBUG 1151 lsquic_time_t coi_last_sent; 1152#endif 1153}; 1154 1155 1156static void 1157coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1158{ 1159 iter->coi_heap = &engine->conns_out; 1160 iter->coi_next = NULL; 1161 TAILQ_INIT(&iter->coi_active_list); 1162 TAILQ_INIT(&iter->coi_inactive_list); 1163#ifndef NDEBUG 1164 iter->coi_last_sent = 0; 1165#endif 1166} 1167 1168 1169static lsquic_conn_t * 1170coi_next (struct conns_out_iter *iter) 1171{ 1172 lsquic_conn_t *conn; 1173 1174 if (iter->coi_heap->oh_nelem > 0) 1175 { 1176 conn = oh_pop(iter->coi_heap); 1177 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1178 conn->cn_flags |= LSCONN_COI_ACTIVE; 1179#ifndef NDEBUG 1180 if (iter->coi_last_sent) 1181 assert(iter->coi_last_sent <= conn->cn_last_sent); 1182 iter->coi_last_sent = conn->cn_last_sent; 1183#endif 1184 return conn; 1185 } 1186 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1187 { 1188 conn = iter->coi_next; 1189 if (!conn) 1190 conn = TAILQ_FIRST(&iter->coi_active_list); 1191 if (conn) 1192 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1193 return conn; 1194 } 1195 else 1196 return NULL; 1197} 1198 1199 1200static void 1201coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1202{ 1203 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1204 { 1205 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1206 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1207 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1208 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1209 conn->cn_flags |= LSCONN_COI_INACTIVE; 1210 } 1211} 1212 1213 1214static void 1215coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1216{ 1217 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1218 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1219 { 1220 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1221 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1222 } 1223} 1224 1225 1226static void 1227coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1228{ 1229 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1230 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1231 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1232 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1233 conn->cn_flags |= LSCONN_COI_ACTIVE; 1234} 1235 1236 1237static void 1238coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1239{ 1240 lsquic_conn_t *conn; 1241 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1242 { 1243 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1244 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1245 oh_insert(iter->coi_heap, conn); 1246 } 1247 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1248 { 1249 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1250 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1251 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1252 } 1253} 1254 1255 1256static unsigned 1257send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1258 struct out_batch *batch, unsigned n_to_send) 1259{ 1260 int n_sent, i; 1261 lsquic_time_t now; 1262 1263 /* Set sent time before the write to avoid underestimating RTT */ 1264 now = lsquic_time_now(); 1265 for (i = 0; i < (int) n_to_send; ++i) 1266 batch->packets[i]->po_sent = now; 1267 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1268 n_to_send); 1269 if (n_sent >= 0) 1270 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1271 else 1272 { 1273 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1274 n_sent = 0; 1275 } 1276 if (n_sent > 0) 1277 engine->last_sent = now + n_sent; 1278 for (i = 0; i < n_sent; ++i) 1279 { 1280 eng_hist_inc(&engine->history, now, sl_packets_out); 1281 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1282 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1283 batch->packets[i]); 1284 /* `i' is added to maintain relative order */ 1285 batch->conns[i]->cn_last_sent = now + i; 1286 /* Release packet out buffer as soon as the packet is sent 1287 * successfully. If not successfully sent, we hold on to 1288 * this buffer until the packet sending is attempted again 1289 * or until it times out and regenerated. 1290 */ 1291 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1292 { 1293 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1294 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1295 batch->packets[i]->po_enc_data); 1296 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1297 } 1298 } 1299 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1300 for ( ; i < (int) n_to_send; ++i) 1301 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1302 /* Return packets to the connection in reverse order so that the packet 1303 * ordering is maintained. 1304 */ 1305 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1306 { 1307 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1308 batch->packets[i]); 1309 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1310 coi_reactivate(conns_iter, batch->conns[i]); 1311 } 1312 return n_sent; 1313} 1314 1315 1316/* Return 1 if went past deadline, 0 otherwise */ 1317static int 1318check_deadline (lsquic_engine_t *engine) 1319{ 1320 if (engine->pub.enp_settings.es_proc_time_thresh && 1321 lsquic_time_now() > engine->deadline) 1322 { 1323 LSQ_INFO("went past threshold of %u usec, stop sending", 1324 engine->pub.enp_settings.es_proc_time_thresh); 1325 engine->flags |= ENG_PAST_DEADLINE; 1326 return 1; 1327 } 1328 else 1329 return 0; 1330} 1331 1332 1333static void 1334send_packets_out (struct lsquic_engine *engine, 1335 struct closed_conns *closed_conns) 1336{ 1337 unsigned n, w, n_sent, n_batches_sent; 1338 lsquic_packet_out_t *packet_out; 1339 lsquic_conn_t *conn; 1340 struct out_batch batch; 1341 struct conns_out_iter conns_iter; 1342 int shrink, deadline_exceeded; 1343 1344 coi_init(&conns_iter, engine); 1345 n_batches_sent = 0; 1346 n_sent = 0, n = 0; 1347 shrink = 0; 1348 deadline_exceeded = 0; 1349 1350 while ((conn = coi_next(&conns_iter))) 1351 { 1352 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1353 if (!packet_out) { 1354 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1355 conn->cn_cid); 1356 coi_deactivate(&conns_iter, conn); 1357 continue; 1358 } 1359 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1360 { 1361 switch (encrypt_packet(engine, conn, packet_out)) 1362 { 1363 case ENCPA_NOMEM: 1364 /* Send what we have and wait for a more opportune moment */ 1365 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1366 goto end_for; 1367 case ENCPA_BADCRYPT: 1368 /* This is pretty bad: close connection immediately */ 1369 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1370 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1371 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1372 { 1373 if (!(conn->cn_flags & LSCONN_CLOSING)) 1374 { 1375 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1376 engine_incref_conn(conn, LSCONN_CLOSING); 1377 if (conn->cn_flags & LSCONN_HASHED) 1378 remove_conn_from_hash(engine, conn); 1379 } 1380 coi_remove(&conns_iter, conn); 1381 } 1382 continue; 1383 case ENCPA_OK: 1384 break; 1385 } 1386 } 1387 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1388 packet_out->po_packno, conn->cn_cid); 1389 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1390 if (packet_out->po_flags & PO_ENCRYPTED) 1391 { 1392 batch.outs[n].buf = packet_out->po_enc_data; 1393 batch.outs[n].sz = packet_out->po_enc_data_sz; 1394 } 1395 else 1396 { 1397 batch.outs[n].buf = packet_out->po_data; 1398 batch.outs[n].sz = packet_out->po_data_sz; 1399 } 1400 batch.outs [n].peer_ctx = conn->cn_peer_ctx; 1401 batch.outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1402 batch.outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1403 batch.conns [n] = conn; 1404 batch.packets[n] = packet_out; 1405 ++n; 1406 if (n == engine->batch_size) 1407 { 1408 n = 0; 1409 w = send_batch(engine, &conns_iter, &batch, engine->batch_size); 1410 ++n_batches_sent; 1411 n_sent += w; 1412 if (w < engine->batch_size) 1413 { 1414 shrink = 1; 1415 break; 1416 } 1417 deadline_exceeded = check_deadline(engine); 1418 if (deadline_exceeded) 1419 break; 1420 grow_batch_size(engine); 1421 } 1422 } 1423 end_for: 1424 1425 if (n > 0) { 1426 w = send_batch(engine, &conns_iter, &batch, n); 1427 n_sent += w; 1428 shrink = w < n; 1429 ++n_batches_sent; 1430 deadline_exceeded = check_deadline(engine); 1431 } 1432 1433 if (shrink) 1434 shrink_batch_size(engine); 1435 else if (n_batches_sent > 1 && !deadline_exceeded) 1436 grow_batch_size(engine); 1437 1438 coi_reheap(&conns_iter, engine); 1439 1440 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1441} 1442 1443 1444int 1445lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1446{ 1447 return !(engine->flags & ENG_PAST_DEADLINE) 1448 && ( engine->conns_out.oh_nelem > 0 1449 ) 1450 ; 1451} 1452 1453 1454static void 1455reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1456{ 1457 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1458 engine->flags &= ~ENG_PAST_DEADLINE; 1459} 1460 1461 1462/* TODO: this is a user-facing function, account for load */ 1463void 1464lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1465{ 1466 lsquic_conn_t *conn; 1467 struct closed_conns closed_conns; 1468 1469 STAILQ_INIT(&closed_conns); 1470 reset_deadline(engine, lsquic_time_now()); 1471 1472 send_packets_out(engine, &closed_conns); 1473 1474 while ((conn = STAILQ_FIRST(&closed_conns))) { 1475 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1476 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1477 } 1478 1479} 1480 1481 1482static void 1483process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1484{ 1485 lsquic_conn_t *conn; 1486 enum tick_st tick_st; 1487 lsquic_time_t now = lsquic_time_now(); 1488 struct closed_conns closed_conns; 1489 1490 engine->proc_time = now; 1491 eng_hist_tick(&engine->history, now); 1492 1493 STAILQ_INIT(&closed_conns); 1494 reset_deadline(engine, now); 1495 1496 while ((conn = next_conn(engine))) 1497 { 1498 tick_st = conn->cn_if->ci_tick(conn, now); 1499 if (conn_iter_next_rw_pend == next_conn) 1500 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1501 if (tick_st & TICK_SEND) 1502 { 1503 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1504 { 1505 oh_insert(&engine->conns_out, conn); 1506 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1507 } 1508 } 1509 if (tick_st & TICK_CLOSE) 1510 { 1511 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1512 engine_incref_conn(conn, LSCONN_CLOSING); 1513 if (conn->cn_flags & LSCONN_HASHED) 1514 remove_conn_from_hash(engine, conn); 1515 } 1516 } 1517 1518 if (lsquic_engine_has_unsent_packets(engine)) 1519 send_packets_out(engine, &closed_conns); 1520 1521 while ((conn = STAILQ_FIRST(&closed_conns))) { 1522 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1523 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1524 } 1525 1526} 1527 1528 1529/* Return 0 if packet is being processed by a real connection, 1 if the 1530 * packet was processed, but not by a connection, and -1 on error. 1531 */ 1532int 1533lsquic_engine_packet_in (lsquic_engine_t *engine, 1534 const unsigned char *packet_in_data, size_t packet_in_size, 1535 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1536 void *peer_ctx) 1537{ 1538 struct packin_parse_state ppstate; 1539 lsquic_packet_in_t *packet_in; 1540 1541 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1542 { 1543 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1544 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1545 errno = E2BIG; 1546 return -1; 1547 } 1548 1549 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1550 if (!packet_in) 1551 return -1; 1552 1553 /* Library does not modify packet_in_data, it is not referenced after 1554 * this function returns and subsequent release of pi_data is guarded 1555 * by PI_OWN_DATA flag. 1556 */ 1557 packet_in->pi_data = (unsigned char *) packet_in_data; 1558 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1559 engine->flags & ENG_SERVER, &ppstate)) 1560 { 1561 LSQ_DEBUG("Cannot parse incoming packet's header"); 1562 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1563 errno = EINVAL; 1564 return -1; 1565 } 1566 1567 packet_in->pi_received = lsquic_time_now(); 1568 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1569 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1570 peer_ctx); 1571} 1572 1573 1574#if __GNUC__ && !defined(NDEBUG) 1575__attribute__((weak)) 1576#endif 1577unsigned 1578lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1579{ 1580 return engine->pub.enp_settings.es_versions; 1581} 1582 1583 1584int 1585lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1586{ 1587 const lsquic_time_t *next_time; 1588 lsquic_time_t now; 1589 1590 next_time = attq_next_time(engine->attq); 1591 if (!next_time) 1592 return 0; 1593 1594 now = lsquic_time_now(); 1595 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1596 return 1; 1597} 1598 1599 1600unsigned 1601lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1602{ 1603 lsquic_time_t now; 1604 now = lsquic_time_now(); 1605 if (from_now < 0) 1606 now -= from_now; 1607 else 1608 now += from_now; 1609 return attq_count_before(engine->attq, now); 1610} 1611 1612 1613