lsquic_engine.c revision da710add
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <time.h> 15#ifndef WIN32 16#include <sys/time.h> 17#include <netinet/in.h> 18#include <sys/types.h> 19#include <sys/stat.h> 20#include <fcntl.h> 21#include <unistd.h> 22#include <netdb.h> 23#endif 24 25 26 27#include "lsquic.h" 28#include "lsquic_types.h" 29#include "lsquic_alarmset.h" 30#include "lsquic_parse.h" 31#include "lsquic_packet_in.h" 32#include "lsquic_packet_out.h" 33#include "lsquic_senhist.h" 34#include "lsquic_rtt.h" 35#include "lsquic_cubic.h" 36#include "lsquic_pacer.h" 37#include "lsquic_send_ctl.h" 38#include "lsquic_set.h" 39#include "lsquic_conn_flow.h" 40#include "lsquic_sfcw.h" 41#include "lsquic_stream.h" 42#include "lsquic_conn.h" 43#include "lsquic_full_conn.h" 44#include "lsquic_util.h" 45#include "lsquic_qtags.h" 46#include "lsquic_str.h" 47#include "lsquic_handshake.h" 48#include "lsquic_mm.h" 49#include "lsquic_conn_hash.h" 50#include "lsquic_engine_public.h" 51#include "lsquic_eng_hist.h" 52#include "lsquic_ev_log.h" 53#include "lsquic_version.h" 54#include "lsquic_hash.h" 55#include "lsquic_attq.h" 56 57#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 58#include "lsquic_logger.h" 59 60 61/* The batch of outgoing packets grows and shrinks dynamically */ 62#define MAX_OUT_BATCH_SIZE 1024 63#define MIN_OUT_BATCH_SIZE 256 64#define INITIAL_OUT_BATCH_SIZE 512 65 66struct out_batch 67{ 68 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 69 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 70 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 71}; 72 73typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 74 75static void 76process_connections (struct lsquic_engine *engine, conn_iter_f iter); 77 78static void 79engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 80 81static lsquic_conn_t * 82engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 83 enum lsquic_conn_flags flag); 84 85static void 86force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 87 88/* Nested calls to LSQUIC are not supported */ 89#define ENGINE_IN(e) do { \ 90 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 91 (e)->pub.enp_flags |= ENPUB_PROC; \ 92} while (0) 93 94#define ENGINE_OUT(e) do { \ 95 assert((e)->pub.enp_flags & ENPUB_PROC); \ 96 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 97} while (0) 98 99/* A connection can be referenced from one of six places: 100 * 101 * 1. Connection hash: a connection starts its life in one of those. 102 * 103 * 2. Outgoing queue. 104 * 105 * 3. Incoming queue. 106 * 107 * 4. Pending RW Events queue. 108 * 109 * 5. Advisory Tick Time queue. 110 * 111 * 6. Closing connections queue. This is a transient queue -- it only 112 * exists for the duration of process_connections() function call. 113 * 114 * The idea is to destroy the connection when it is no longer referenced. 115 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 116 * that case, the connection is referenced from two places: (2) and (6). 117 * After its packets are sent, it is only referenced in (6), and at the 118 * end of the function call, when it is removed from (6), reference count 119 * goes to zero and the connection is destroyed. If not all packets can 120 * be sent, at the end of the function call, the connection is referenced 121 * by (2) and will only be removed once all outgoing packets have been 122 * sent. 123 */ 124#define CONN_REF_FLAGS (LSCONN_HASHED \ 125 |LSCONN_HAS_OUTGOING \ 126 |LSCONN_HAS_INCOMING \ 127 |LSCONN_RW_PENDING \ 128 |LSCONN_CLOSING \ 129 |LSCONN_ATTQ) 130 131 132struct out_heap_elem 133{ 134 struct lsquic_conn *ohe_conn; 135 lsquic_time_t ohe_last_sent; 136}; 137 138 139struct out_heap 140{ 141 struct out_heap_elem *oh_elems; 142 unsigned oh_nalloc, 143 oh_nelem; 144}; 145 146 147 148 149struct lsquic_engine 150{ 151 struct lsquic_engine_public pub; 152 enum { 153 ENG_SERVER = LSENG_SERVER, 154 ENG_HTTP = LSENG_HTTP, 155 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 156 ENG_PAST_DEADLINE 157 = (1 << 8), /* Previous call to a processing 158 * function went past time threshold. 159 */ 160#ifndef NDEBUG 161 ENG_DTOR = (1 << 26), /* Engine destructor */ 162#endif 163 } flags; 164 const struct lsquic_stream_if *stream_if; 165 void *stream_if_ctx; 166 lsquic_packets_out_f packets_out; 167 void *packets_out_ctx; 168 void *bad_handshake_ctx; 169 struct conn_hash full_conns; 170 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 171 struct out_heap conns_out; 172 /* Use a union because only one iterator is being used at any one time */ 173 union { 174 struct { 175 /* This iterator does not have any state: it uses `conns_in' */ 176 int ignore; 177 } conn_in; 178 struct { 179 /* This iterator does not have any state: it uses `conns_pend_rw' */ 180 int ignore; 181 } rw_pend; 182 struct { 183 /* Iterator state to process connections in Advisory Tick Time 184 * queue. 185 */ 186 lsquic_time_t cutoff; 187 } attq; 188 struct { 189 /* Iterator state to process all connections */ 190 int ignore; 191 } all; 192 struct { 193 lsquic_conn_t *conn; 194 } one; 195 } iter_state; 196 struct eng_hist history; 197 unsigned batch_size; 198 unsigned time_until_desired_tick; 199 struct attq *attq; 200 lsquic_time_t proc_time; 201 /* Track time last time a packet was sent to give new connections 202 * priority lower than that of existing connections. 203 */ 204 lsquic_time_t last_sent; 205 lsquic_time_t deadline; 206 struct out_batch out_batch; 207}; 208 209 210#define OHE_PARENT(i) ((i - 1) / 2) 211#define OHE_LCHILD(i) (2 * i + 1) 212#define OHE_RCHILD(i) (2 * i + 2) 213 214 215static void 216heapify_out_heap (struct out_heap *heap, unsigned i) 217{ 218 struct out_heap_elem el; 219 unsigned smallest; 220 221 assert(i < heap->oh_nelem); 222 223 if (OHE_LCHILD(i) < heap->oh_nelem) 224 { 225 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 226 heap->oh_elems[ i ].ohe_last_sent) 227 smallest = OHE_LCHILD(i); 228 else 229 smallest = i; 230 if (OHE_RCHILD(i) < heap->oh_nelem && 231 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 232 heap->oh_elems[ smallest ].ohe_last_sent) 233 smallest = OHE_RCHILD(i); 234 } 235 else 236 smallest = i; 237 238 if (smallest != i) 239 { 240 el = heap->oh_elems[ smallest ]; 241 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 242 heap->oh_elems[ i ] = el; 243 heapify_out_heap(heap, smallest); 244 } 245} 246 247 248static void 249oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 250{ 251 struct out_heap_elem el; 252 unsigned nalloc, i; 253 254 if (heap->oh_nelem == heap->oh_nalloc) 255 { 256 if (0 == heap->oh_nalloc) 257 nalloc = 4; 258 else 259 nalloc = heap->oh_nalloc * 2; 260 heap->oh_elems = realloc(heap->oh_elems, 261 nalloc * sizeof(heap->oh_elems[0])); 262 if (!heap->oh_elems) 263 { /* Not much we can do here */ 264 LSQ_ERROR("realloc failed"); 265 return; 266 } 267 heap->oh_nalloc = nalloc; 268 } 269 270 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 271 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 272 ++heap->oh_nelem; 273 274 i = heap->oh_nelem - 1; 275 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 276 heap->oh_elems[ i ].ohe_last_sent) 277 { 278 el = heap->oh_elems[ OHE_PARENT(i) ]; 279 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 280 heap->oh_elems[ i ] = el; 281 i = OHE_PARENT(i); 282 } 283} 284 285 286static struct lsquic_conn * 287oh_pop (struct out_heap *heap) 288{ 289 struct lsquic_conn *conn; 290 291 assert(heap->oh_nelem); 292 293 conn = heap->oh_elems[0].ohe_conn; 294 --heap->oh_nelem; 295 if (heap->oh_nelem > 0) 296 { 297 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 298 heapify_out_heap(heap, 0); 299 } 300 301 return conn; 302} 303 304 305void 306lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 307 unsigned flags) 308{ 309 memset(settings, 0, sizeof(*settings)); 310 settings->es_versions = LSQUIC_DF_VERSIONS; 311 if (flags & ENG_SERVER) 312 { 313 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 314 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 315 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 316 } 317 else 318 { 319 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 320 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 321 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 322 } 323 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 324 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 325 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 326 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 327 settings->es_max_header_list_size 328 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 329 settings->es_ua = LSQUIC_DF_UA; 330 331 settings->es_pdmd = QTAG_X509; 332 settings->es_aead = QTAG_AESG; 333 settings->es_kexs = QTAG_C255; 334 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 335 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 336 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 337 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 338 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 339 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 340 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 341 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 342 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 343} 344 345 346/* Note: if returning an error, err_buf must be valid if non-NULL */ 347int 348lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 349 unsigned flags, 350 char *err_buf, size_t err_buf_sz) 351{ 352 if (settings->es_cfcw < LSQUIC_MIN_FCW || 353 settings->es_sfcw < LSQUIC_MIN_FCW) 354 { 355 if (err_buf) 356 snprintf(err_buf, err_buf_sz, "%s", 357 "flow control window set too low"); 358 return -1; 359 } 360 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 361 { 362 if (err_buf) 363 snprintf(err_buf, err_buf_sz, "%s", 364 "No supported QUIC versions specified"); 365 return -1; 366 } 367 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 368 { 369 if (err_buf) 370 snprintf(err_buf, err_buf_sz, "%s", 371 "one or more unsupported QUIC version is specified"); 372 return -1; 373 } 374 return 0; 375} 376 377 378static void 379free_packet (void *ctx, unsigned char *packet_data) 380{ 381 free(packet_data); 382} 383 384 385static void * 386malloc_buf (void *ctx, size_t size) 387{ 388 return malloc(size); 389} 390 391 392static const struct lsquic_packout_mem_if stock_pmi = 393{ 394 malloc_buf, (void(*)(void *, void *)) free_packet, 395}; 396 397 398lsquic_engine_t * 399lsquic_engine_new (unsigned flags, 400 const struct lsquic_engine_api *api) 401{ 402 lsquic_engine_t *engine; 403 int tag_buf_len; 404 char err_buf[100]; 405 406 if (!api->ea_packets_out) 407 { 408 LSQ_ERROR("packets_out callback is not specified"); 409 return NULL; 410 } 411 412 if (api->ea_settings && 413 0 != lsquic_engine_check_settings(api->ea_settings, flags, 414 err_buf, sizeof(err_buf))) 415 { 416 LSQ_ERROR("cannot create engine: %s", err_buf); 417 return NULL; 418 } 419 420 engine = calloc(1, sizeof(*engine)); 421 if (!engine) 422 return NULL; 423 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 424 { 425 free(engine); 426 return NULL; 427 } 428 if (api->ea_settings) 429 engine->pub.enp_settings = *api->ea_settings; 430 else 431 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 432 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 433 sizeof(engine->pub.enp_ver_tags_buf), 434 engine->pub.enp_settings.es_versions); 435 if (tag_buf_len <= 0) 436 { 437 LSQ_ERROR("cannot generate version tags buffer"); 438 free(engine); 439 return NULL; 440 } 441 engine->pub.enp_ver_tags_len = tag_buf_len; 442 443 engine->flags = flags; 444 engine->stream_if = api->ea_stream_if; 445 engine->stream_if_ctx = api->ea_stream_if_ctx; 446 engine->packets_out = api->ea_packets_out; 447 engine->packets_out_ctx = api->ea_packets_out_ctx; 448 if (api->ea_pmi) 449 { 450 engine->pub.enp_pmi = api->ea_pmi; 451 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 452 } 453 else 454 { 455 engine->pub.enp_pmi = &stock_pmi; 456 engine->pub.enp_pmi_ctx = NULL; 457 } 458 engine->pub.enp_engine = engine; 459 TAILQ_INIT(&engine->conns_in); 460 TAILQ_INIT(&engine->conns_pend_rw); 461 conn_hash_init(&engine->full_conns, ~0); 462 engine->attq = attq_create(); 463 eng_hist_init(&engine->history); 464 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 465 466 467 LSQ_INFO("instantiated engine"); 468 return engine; 469} 470 471 472static void 473grow_batch_size (struct lsquic_engine *engine) 474{ 475 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 476} 477 478 479static void 480shrink_batch_size (struct lsquic_engine *engine) 481{ 482 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 483} 484 485 486/* Wrapper to make sure important things occur before the connection is 487 * really destroyed. 488 */ 489static void 490destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 491{ 492 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 493 conn->cn_if->ci_destroy(conn); 494} 495 496 497static lsquic_conn_t * 498new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 499 unsigned short max_packet_size) 500{ 501 lsquic_conn_t *conn; 502 unsigned flags; 503 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 504 conn = full_conn_client_new(&engine->pub, engine->stream_if, 505 engine->stream_if_ctx, flags, hostname, max_packet_size); 506 if (!conn) 507 return NULL; 508 if (0 != conn_hash_add(&engine->full_conns, conn)) 509 { 510 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 511 conn->cn_cid); 512 destroy_conn(engine, conn); 513 return NULL; 514 } 515 assert(!(conn->cn_flags & 516 (CONN_REF_FLAGS 517 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 518 callbacks */ 519 ))); 520 conn->cn_flags |= LSCONN_HASHED; 521 return conn; 522} 523 524 525static lsquic_conn_t * 526find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 527 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 528 void *peer_ctx) 529{ 530 lsquic_conn_t *conn; 531 532 if (lsquic_packet_in_is_prst(packet_in) 533 && !engine->pub.enp_settings.es_honor_prst) 534 { 535 LSQ_DEBUG("public reset packet: discarding"); 536 return NULL; 537 } 538 539 if (!(packet_in->pi_flags & PI_CONN_ID)) 540 { 541 LSQ_DEBUG("packet header does not have connection ID: discarding"); 542 return NULL; 543 } 544 545 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id); 546 if (conn) 547 { 548 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 549 return conn; 550 } 551 552 return conn; 553} 554 555 556static void 557add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 558 enum rw_reason reason) 559{ 560 int hist_idx; 561 562 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 563 engine_incref_conn(conn, LSCONN_RW_PENDING); 564 565 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 566 conn->cn_rw_hist_buf[ hist_idx ] = reason; 567 ++conn->cn_rw_hist_idx; 568 569 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 570 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 571 "rw_hist: %.*s", (char) reason, 572 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 573 else 574 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 575 (char) reason); 576} 577 578 579#if !defined(NDEBUG) && __GNUC__ 580__attribute__((weak)) 581#endif 582void 583lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 584 lsquic_conn_t *conn, enum rw_reason reason) 585{ 586 if (0 == (enpub->enp_flags & ENPUB_PROC) && 587 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 588 { 589 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 590 add_conn_to_pend_rw(engine, conn, reason); 591 } 592} 593 594 595void 596lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 597 lsquic_conn_t *conn, lsquic_time_t tick_time) 598{ 599 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 600 /* Instead of performing an update, we simply remove the connection from 601 * the queue and add it back. This should not happen in at the time of 602 * this writing. 603 */ 604 if (conn->cn_flags & LSCONN_ATTQ) 605 { 606 attq_remove(engine->attq, conn); 607 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 608 } 609 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 610 0 == attq_maybe_add(engine->attq, conn, tick_time)) 611 engine_incref_conn(conn, LSCONN_ATTQ); 612} 613 614 615static void 616update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 617 int progress_made) 618{ 619 rw_hist_idx_t hist_idx; 620 const unsigned char *empty; 621 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 622 623 if (!pendrw_check) 624 return; 625 626 /* Convert previous entry to uppercase: */ 627 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 628 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 629 630 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 631 if (progress_made) 632 { 633 conn->cn_noprogress_count = 0; 634 return; 635 } 636 637 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 638 "no progress"); 639 ++conn->cn_noprogress_count; 640 if (conn->cn_noprogress_count <= pendrw_check) 641 return; 642 643 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 644 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 645 sizeof(conn->cn_rw_hist_buf)); 646 if (empty) 647 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 648 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 649 conn->cn_cid, conn->cn_noprogress_count, 650 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 651 else 652 { 653 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 654 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 655 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 656 conn->cn_cid, conn->cn_noprogress_count, 657 /* First part of history: */ 658 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 659 conn->cn_rw_hist_buf + hist_idx, 660 /* Second part of history: */ 661 hist_idx, conn->cn_rw_hist_buf); 662 } 663} 664 665 666/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 667static int 668process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 669 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 670 const struct sockaddr *sa_peer, void *peer_ctx) 671{ 672 lsquic_conn_t *conn; 673 674 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 675 if (!conn) 676 { 677 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 678 return 1; 679 } 680 681 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 682 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 683 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 684 } 685 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 686 lsquic_packet_in_upref(packet_in); 687 conn->cn_peer_ctx = peer_ctx; 688 conn->cn_if->ci_packet_in(conn, packet_in); 689 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 690 return 0; 691} 692 693 694static int 695conn_attq_expired (const struct lsquic_engine *engine, 696 const lsquic_conn_t *conn) 697{ 698 assert(conn->cn_attq_elem); 699 return lsquic_conn_adv_time(conn) < engine->proc_time; 700} 701 702 703/* Iterator for connections with incoming packets */ 704static lsquic_conn_t * 705conn_iter_next_incoming (struct lsquic_engine *engine) 706{ 707 enum lsquic_conn_flags addl_flags; 708 lsquic_conn_t *conn; 709 while ((conn = TAILQ_FIRST(&engine->conns_in))) 710 { 711 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 712 if (conn->cn_flags & LSCONN_RW_PENDING) 713 { 714 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 715 EV_LOG_CONN_EVENT(conn->cn_cid, 716 "removed from pending RW queue (processing incoming)"); 717 } 718 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 719 { 720 addl_flags = LSCONN_ATTQ; 721 attq_remove(engine->attq, conn); 722 } 723 else 724 addl_flags = 0; 725 conn = engine_decref_conn(engine, conn, 726 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 727 if (conn) 728 break; 729 } 730 return conn; 731} 732 733 734/* Iterator for connections with that have pending read/write events */ 735static lsquic_conn_t * 736conn_iter_next_rw_pend (struct lsquic_engine *engine) 737{ 738 enum lsquic_conn_flags addl_flags; 739 lsquic_conn_t *conn; 740 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 741 { 742 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 743 EV_LOG_CONN_EVENT(conn->cn_cid, 744 "removed from pending RW queue (processing pending RW conns)"); 745 if (conn->cn_flags & LSCONN_HAS_INCOMING) 746 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 747 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 748 { 749 addl_flags = LSCONN_ATTQ; 750 attq_remove(engine->attq, conn); 751 } 752 else 753 addl_flags = 0; 754 conn = engine_decref_conn(engine, conn, 755 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 756 if (conn) 757 break; 758 } 759 return conn; 760} 761 762 763void 764lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 765{ 766 LSQ_DEBUG("process connections with incoming packets"); 767 ENGINE_IN(engine); 768 process_connections(engine, conn_iter_next_incoming); 769 assert(TAILQ_EMPTY(&engine->conns_in)); 770 ENGINE_OUT(engine); 771} 772 773 774int 775lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 776{ 777 return !(engine->flags & ENG_PAST_DEADLINE) 778 && !TAILQ_EMPTY(&engine->conns_pend_rw); 779} 780 781 782void 783lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 784{ 785 LSQ_DEBUG("process connections with pending RW events"); 786 ENGINE_IN(engine); 787 process_connections(engine, conn_iter_next_rw_pend); 788 ENGINE_OUT(engine); 789} 790 791 792void 793lsquic_engine_destroy (lsquic_engine_t *engine) 794{ 795 lsquic_conn_t *conn; 796 797 LSQ_DEBUG("destroying engine"); 798#ifndef NDEBUG 799 engine->flags |= ENG_DTOR; 800#endif 801 802 while (engine->conns_out.oh_nelem > 0) 803 { 804 --engine->conns_out.oh_nelem; 805 conn = engine->conns_out.oh_elems[ 806 engine->conns_out.oh_nelem ].ohe_conn; 807 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 808 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 809 } 810 811 for (conn = conn_hash_first(&engine->full_conns); conn; 812 conn = conn_hash_next(&engine->full_conns)) 813 force_close_conn(engine, conn); 814 conn_hash_cleanup(&engine->full_conns); 815 816 817 attq_destroy(engine->attq); 818 819 assert(0 == engine->conns_out.oh_nelem); 820 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 821 lsquic_mm_cleanup(&engine->pub.enp_mm); 822 free(engine->conns_out.oh_elems); 823 free(engine); 824} 825 826 827#if __GNUC__ 828__attribute__((nonnull(3))) 829#endif 830static lsquic_conn_t * 831remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 832 lsquic_conn_t *conn, const char *reason) 833{ 834 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 835 if (conn->cn_flags & LSCONN_HAS_INCOMING) 836 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 837 if (conn->cn_flags & LSCONN_RW_PENDING) 838 { 839 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 840 EV_LOG_CONN_EVENT(conn->cn_cid, 841 "removed from pending RW queue (%s)", reason); 842 } 843 conn = engine_decref_conn(engine, conn, 844 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 845 assert(conn); 846 return conn; 847} 848 849 850static lsquic_conn_t * 851conn_iter_next_one (lsquic_engine_t *engine) 852{ 853 lsquic_conn_t *conn = engine->iter_state.one.conn; 854 if (conn) 855 { 856 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 857 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 858 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 859 conn_attq_expired(engine, conn)) 860 { 861 attq_remove(engine->attq, conn); 862 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 863 } 864 engine->iter_state.one.conn = NULL; 865 } 866 return conn; 867} 868 869 870lsquic_conn_t * 871lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 872 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 873 const char *hostname, unsigned short max_packet_size) 874{ 875 lsquic_conn_t *conn; 876 877 if (engine->flags & ENG_SERVER) 878 { 879 LSQ_ERROR("`%s' must only be called in client mode", __func__); 880 return NULL; 881 } 882 883 if (0 == max_packet_size) 884 { 885 switch (peer_sa->sa_family) 886 { 887 case AF_INET: 888 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 889 break; 890 default: 891 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 892 break; 893 } 894 } 895 896 conn = new_full_conn_client(engine, hostname, max_packet_size); 897 if (!conn) 898 return NULL; 899 ENGINE_IN(engine); 900 lsquic_conn_record_peer_sa(conn, peer_sa); 901 conn->cn_peer_ctx = peer_ctx; 902 lsquic_conn_set_ctx(conn, conn_ctx); 903 engine->iter_state.one.conn = conn; 904 full_conn_client_call_on_new(conn); 905 process_connections(engine, conn_iter_next_one); 906 ENGINE_OUT(engine); 907 return conn; 908} 909 910 911static void 912remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 913{ 914 conn_hash_remove(&engine->full_conns, conn); 915 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 916} 917 918 919static void 920refflags2str (enum lsquic_conn_flags flags, char s[7]) 921{ 922 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 923 *s = 'H'; s += !!(flags & LSCONN_HASHED); 924 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 925 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 926 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 927 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 928 *s = '\0'; 929} 930 931 932static void 933engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 934{ 935 char str[7]; 936 assert(flag & CONN_REF_FLAGS); 937 assert(!(conn->cn_flags & flag)); 938 conn->cn_flags |= flag; 939 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 940 (refflags2str(conn->cn_flags, str), str)); 941} 942 943 944static lsquic_conn_t * 945engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 946 enum lsquic_conn_flags flags) 947{ 948 char str[7]; 949 assert(flags & CONN_REF_FLAGS); 950 assert(conn->cn_flags & flags); 951#ifndef NDEBUG 952 if (flags & LSCONN_CLOSING) 953 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 954#endif 955 conn->cn_flags &= ~flags; 956 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 957 (refflags2str(conn->cn_flags, str), str)); 958 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 959 { 960 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 961 destroy_conn(engine, conn); 962 return NULL; 963 } 964 else 965 return conn; 966} 967 968 969/* This is not a general-purpose function. Only call from engine dtor. */ 970static void 971force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 972{ 973 assert(engine->flags & ENG_DTOR); 974 const enum lsquic_conn_flags flags = conn->cn_flags; 975 assert(conn->cn_flags & CONN_REF_FLAGS); 976 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 977 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 978 if (flags & LSCONN_HAS_INCOMING) 979 { 980 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 981 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 982 } 983 if (flags & LSCONN_RW_PENDING) 984 { 985 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 986 EV_LOG_CONN_EVENT(conn->cn_cid, 987 "removed from pending RW queue (engine destruction)"); 988 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 989 } 990 if (flags & LSCONN_ATTQ) 991 attq_remove(engine->attq, conn); 992 if (flags & LSCONN_HASHED) 993 remove_conn_from_hash(engine, conn); 994} 995 996 997/* Iterator for all connections. 998 * Returned connections are removed from the Incoming, Pending RW Event, 999 * and Advisory Tick Time queues if necessary. 1000 */ 1001static lsquic_conn_t * 1002conn_iter_next_all (struct lsquic_engine *engine) 1003{ 1004 lsquic_conn_t *conn; 1005 1006 conn = conn_hash_next(&engine->full_conns); 1007 1008 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 1009 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 1010 if (conn && (conn->cn_flags & LSCONN_ATTQ) 1011 && conn_attq_expired(engine, conn)) 1012 { 1013 attq_remove(engine->attq, conn); 1014 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1015 } 1016 1017 return conn; 1018} 1019 1020 1021static lsquic_conn_t * 1022conn_iter_next_attq (struct lsquic_engine *engine) 1023{ 1024 lsquic_conn_t *conn; 1025 1026 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1027 if (conn) 1028 { 1029 assert(conn->cn_flags & LSCONN_ATTQ); 1030 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1031 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1032 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1033 } 1034 1035 return conn; 1036} 1037 1038 1039void 1040lsquic_engine_proc_all (lsquic_engine_t *engine) 1041{ 1042 ENGINE_IN(engine); 1043 /* We poke each connection every time as initial implementation. If it 1044 * proves to be too inefficient, we will need to figure out 1045 * a) when to stop processing; and 1046 * b) how to remember state between calls. 1047 */ 1048 conn_hash_reset_iter(&engine->full_conns); 1049 process_connections(engine, conn_iter_next_all); 1050 ENGINE_OUT(engine); 1051} 1052 1053 1054void 1055lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1056{ 1057 lsquic_time_t prev_min, now; 1058 1059 now = lsquic_time_now(); 1060 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) 1061 { 1062 const lsquic_time_t *expected_time; 1063 int64_t diff; 1064 expected_time = attq_next_time(engine->attq); 1065 if (expected_time) 1066 diff = *expected_time - now; 1067 else 1068 diff = -1; 1069 LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff); 1070 } 1071 1072 ENGINE_IN(engine); 1073 prev_min = attq_set_min(engine->attq, now); /* Prevent infinite loop */ 1074 engine->iter_state.attq.cutoff = now; 1075 process_connections(engine, conn_iter_next_attq); 1076 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1077 ENGINE_OUT(engine); 1078} 1079 1080 1081static int 1082generate_header (const lsquic_packet_out_t *packet_out, 1083 const struct parse_funcs *pf, lsquic_cid_t cid, 1084 unsigned char *buf, size_t bufsz) 1085{ 1086 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1087 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1088 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1089 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1090 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1091} 1092 1093 1094static ssize_t 1095really_encrypt_packet (const lsquic_conn_t *conn, 1096 const lsquic_packet_out_t *packet_out, 1097 unsigned char *buf, size_t bufsz) 1098{ 1099 int enc, header_sz, is_hello_packet; 1100 size_t packet_sz; 1101 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1102 1103 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1104 header_buf, sizeof(header_buf)); 1105 if (header_sz < 0) 1106 return -1; 1107 1108 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1109 enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0, 1110 packet_out->po_packno, header_buf, header_sz, 1111 packet_out->po_data, packet_out->po_data_sz, 1112 buf, bufsz, &packet_sz, is_hello_packet); 1113 if (0 == enc) 1114 { 1115 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1116 "ciphertext is %zd bytes", 1117 packet_out->po_packno, 1118 lsquic_po_header_length(packet_out->po_flags) + 1119 packet_out->po_data_sz, 1120 packet_sz); 1121 return packet_sz; 1122 } 1123 else 1124 return -1; 1125} 1126 1127 1128static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1129encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1130 lsquic_packet_out_t *packet_out) 1131{ 1132 ssize_t enc_sz; 1133 size_t bufsz; 1134 unsigned sent_sz; 1135 unsigned char *buf; 1136 1137 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1138 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1139 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1140 if (!buf) 1141 { 1142 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1143 bufsz); 1144 return ENCPA_NOMEM; 1145 } 1146 1147 { 1148 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1149 sent_sz = enc_sz; 1150 } 1151 1152 if (enc_sz < 0) 1153 { 1154 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1155 return ENCPA_BADCRYPT; 1156 } 1157 1158 packet_out->po_enc_data = buf; 1159 packet_out->po_enc_data_sz = enc_sz; 1160 packet_out->po_sent_sz = sent_sz; 1161 packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ; 1162 1163 return ENCPA_OK; 1164} 1165 1166 1167STAILQ_HEAD(closed_conns, lsquic_conn); 1168 1169 1170struct conns_out_iter 1171{ 1172 struct out_heap *coi_heap; 1173 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1174 coi_inactive_list; 1175 lsquic_conn_t *coi_next; 1176#ifndef NDEBUG 1177 lsquic_time_t coi_last_sent; 1178#endif 1179}; 1180 1181 1182static void 1183coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1184{ 1185 iter->coi_heap = &engine->conns_out; 1186 iter->coi_next = NULL; 1187 TAILQ_INIT(&iter->coi_active_list); 1188 TAILQ_INIT(&iter->coi_inactive_list); 1189#ifndef NDEBUG 1190 iter->coi_last_sent = 0; 1191#endif 1192} 1193 1194 1195static lsquic_conn_t * 1196coi_next (struct conns_out_iter *iter) 1197{ 1198 lsquic_conn_t *conn; 1199 1200 if (iter->coi_heap->oh_nelem > 0) 1201 { 1202 conn = oh_pop(iter->coi_heap); 1203 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1204 conn->cn_flags |= LSCONN_COI_ACTIVE; 1205#ifndef NDEBUG 1206 if (iter->coi_last_sent) 1207 assert(iter->coi_last_sent <= conn->cn_last_sent); 1208 iter->coi_last_sent = conn->cn_last_sent; 1209#endif 1210 return conn; 1211 } 1212 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1213 { 1214 conn = iter->coi_next; 1215 if (!conn) 1216 conn = TAILQ_FIRST(&iter->coi_active_list); 1217 if (conn) 1218 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1219 return conn; 1220 } 1221 else 1222 return NULL; 1223} 1224 1225 1226static void 1227coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1228{ 1229 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1230 { 1231 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1232 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1233 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1234 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1235 conn->cn_flags |= LSCONN_COI_INACTIVE; 1236 } 1237} 1238 1239 1240static void 1241coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1242{ 1243 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1244 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1245 { 1246 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1247 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1248 } 1249} 1250 1251 1252static void 1253coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1254{ 1255 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1256 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1257 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1258 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1259 conn->cn_flags |= LSCONN_COI_ACTIVE; 1260} 1261 1262 1263static void 1264coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1265{ 1266 lsquic_conn_t *conn; 1267 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1268 { 1269 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1270 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1271 oh_insert(iter->coi_heap, conn); 1272 } 1273 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1274 { 1275 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1276 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1277 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1278 } 1279} 1280 1281 1282static unsigned 1283send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1284 struct out_batch *batch, unsigned n_to_send) 1285{ 1286 int n_sent, i; 1287 lsquic_time_t now; 1288 1289 /* Set sent time before the write to avoid underestimating RTT */ 1290 now = lsquic_time_now(); 1291 for (i = 0; i < (int) n_to_send; ++i) 1292 batch->packets[i]->po_sent = now; 1293 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1294 n_to_send); 1295 if (n_sent >= 0) 1296 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1297 else 1298 { 1299 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1300 n_sent = 0; 1301 } 1302 if (n_sent > 0) 1303 engine->last_sent = now + n_sent; 1304 for (i = 0; i < n_sent; ++i) 1305 { 1306 eng_hist_inc(&engine->history, now, sl_packets_out); 1307 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1308 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1309 batch->packets[i]); 1310 /* `i' is added to maintain relative order */ 1311 batch->conns[i]->cn_last_sent = now + i; 1312 /* Release packet out buffer as soon as the packet is sent 1313 * successfully. If not successfully sent, we hold on to 1314 * this buffer until the packet sending is attempted again 1315 * or until it times out and regenerated. 1316 */ 1317 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1318 { 1319 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1320 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1321 batch->packets[i]->po_enc_data); 1322 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1323 } 1324 } 1325 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1326 for ( ; i < (int) n_to_send; ++i) 1327 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1328 /* Return packets to the connection in reverse order so that the packet 1329 * ordering is maintained. 1330 */ 1331 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1332 { 1333 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1334 batch->packets[i]); 1335 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1336 coi_reactivate(conns_iter, batch->conns[i]); 1337 } 1338 return n_sent; 1339} 1340 1341 1342/* Return 1 if went past deadline, 0 otherwise */ 1343static int 1344check_deadline (lsquic_engine_t *engine) 1345{ 1346 if (engine->pub.enp_settings.es_proc_time_thresh && 1347 lsquic_time_now() > engine->deadline) 1348 { 1349 LSQ_INFO("went past threshold of %u usec, stop sending", 1350 engine->pub.enp_settings.es_proc_time_thresh); 1351 engine->flags |= ENG_PAST_DEADLINE; 1352 return 1; 1353 } 1354 else 1355 return 0; 1356} 1357 1358 1359static void 1360send_packets_out (struct lsquic_engine *engine, 1361 struct closed_conns *closed_conns) 1362{ 1363 unsigned n, w, n_sent, n_batches_sent; 1364 lsquic_packet_out_t *packet_out; 1365 lsquic_conn_t *conn; 1366 struct out_batch *const batch = &engine->out_batch; 1367 struct conns_out_iter conns_iter; 1368 int shrink, deadline_exceeded; 1369 1370 coi_init(&conns_iter, engine); 1371 n_batches_sent = 0; 1372 n_sent = 0, n = 0; 1373 shrink = 0; 1374 deadline_exceeded = 0; 1375 1376 while ((conn = coi_next(&conns_iter))) 1377 { 1378 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1379 if (!packet_out) { 1380 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1381 conn->cn_cid); 1382 coi_deactivate(&conns_iter, conn); 1383 continue; 1384 } 1385 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1386 { 1387 switch (encrypt_packet(engine, conn, packet_out)) 1388 { 1389 case ENCPA_NOMEM: 1390 /* Send what we have and wait for a more opportune moment */ 1391 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1392 goto end_for; 1393 case ENCPA_BADCRYPT: 1394 /* This is pretty bad: close connection immediately */ 1395 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1396 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1397 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1398 { 1399 if (!(conn->cn_flags & LSCONN_CLOSING)) 1400 { 1401 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1402 engine_incref_conn(conn, LSCONN_CLOSING); 1403 if (conn->cn_flags & LSCONN_HASHED) 1404 remove_conn_from_hash(engine, conn); 1405 } 1406 coi_remove(&conns_iter, conn); 1407 } 1408 continue; 1409 case ENCPA_OK: 1410 break; 1411 } 1412 } 1413 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1414 packet_out->po_packno, conn->cn_cid); 1415 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1416 if (packet_out->po_flags & PO_ENCRYPTED) 1417 { 1418 batch->outs[n].buf = packet_out->po_enc_data; 1419 batch->outs[n].sz = packet_out->po_enc_data_sz; 1420 } 1421 else 1422 { 1423 batch->outs[n].buf = packet_out->po_data; 1424 batch->outs[n].sz = packet_out->po_data_sz; 1425 } 1426 batch->outs [n].peer_ctx = conn->cn_peer_ctx; 1427 batch->outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1428 batch->outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1429 batch->conns [n] = conn; 1430 batch->packets[n] = packet_out; 1431 ++n; 1432 if (n == engine->batch_size) 1433 { 1434 n = 0; 1435 w = send_batch(engine, &conns_iter, batch, engine->batch_size); 1436 ++n_batches_sent; 1437 n_sent += w; 1438 if (w < engine->batch_size) 1439 { 1440 shrink = 1; 1441 break; 1442 } 1443 deadline_exceeded = check_deadline(engine); 1444 if (deadline_exceeded) 1445 break; 1446 grow_batch_size(engine); 1447 } 1448 } 1449 end_for: 1450 1451 if (n > 0) { 1452 w = send_batch(engine, &conns_iter, batch, n); 1453 n_sent += w; 1454 shrink = w < n; 1455 ++n_batches_sent; 1456 deadline_exceeded = check_deadline(engine); 1457 } 1458 1459 if (shrink) 1460 shrink_batch_size(engine); 1461 else if (n_batches_sent > 1 && !deadline_exceeded) 1462 grow_batch_size(engine); 1463 1464 coi_reheap(&conns_iter, engine); 1465 1466 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1467} 1468 1469 1470int 1471lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1472{ 1473 return !(engine->flags & ENG_PAST_DEADLINE) 1474 && ( engine->conns_out.oh_nelem > 0 1475 ) 1476 ; 1477} 1478 1479 1480static void 1481reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1482{ 1483 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1484 engine->flags &= ~ENG_PAST_DEADLINE; 1485} 1486 1487 1488/* TODO: this is a user-facing function, account for load */ 1489void 1490lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1491{ 1492 lsquic_conn_t *conn; 1493 struct closed_conns closed_conns; 1494 1495 STAILQ_INIT(&closed_conns); 1496 reset_deadline(engine, lsquic_time_now()); 1497 1498 send_packets_out(engine, &closed_conns); 1499 1500 while ((conn = STAILQ_FIRST(&closed_conns))) { 1501 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1502 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1503 } 1504 1505} 1506 1507 1508static void 1509process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1510{ 1511 lsquic_conn_t *conn; 1512 enum tick_st tick_st; 1513 lsquic_time_t now = lsquic_time_now(); 1514 struct closed_conns closed_conns; 1515 1516 engine->proc_time = now; 1517 eng_hist_tick(&engine->history, now); 1518 1519 STAILQ_INIT(&closed_conns); 1520 reset_deadline(engine, now); 1521 1522 while ((conn = next_conn(engine))) 1523 { 1524 tick_st = conn->cn_if->ci_tick(conn, now); 1525 if (conn_iter_next_rw_pend == next_conn) 1526 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1527 if (tick_st & TICK_SEND) 1528 { 1529 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1530 { 1531 oh_insert(&engine->conns_out, conn); 1532 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1533 } 1534 } 1535 if (tick_st & TICK_CLOSE) 1536 { 1537 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1538 engine_incref_conn(conn, LSCONN_CLOSING); 1539 if (conn->cn_flags & LSCONN_HASHED) 1540 remove_conn_from_hash(engine, conn); 1541 } 1542 } 1543 1544 if (lsquic_engine_has_unsent_packets(engine)) 1545 send_packets_out(engine, &closed_conns); 1546 1547 while ((conn = STAILQ_FIRST(&closed_conns))) { 1548 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1549 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1550 } 1551 1552} 1553 1554 1555/* Return 0 if packet is being processed by a real connection, 1 if the 1556 * packet was processed, but not by a connection, and -1 on error. 1557 */ 1558int 1559lsquic_engine_packet_in (lsquic_engine_t *engine, 1560 const unsigned char *packet_in_data, size_t packet_in_size, 1561 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1562 void *peer_ctx) 1563{ 1564 struct packin_parse_state ppstate; 1565 lsquic_packet_in_t *packet_in; 1566 1567 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1568 { 1569 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1570 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1571 errno = E2BIG; 1572 return -1; 1573 } 1574 1575 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1576 if (!packet_in) 1577 return -1; 1578 1579 /* Library does not modify packet_in_data, it is not referenced after 1580 * this function returns and subsequent release of pi_data is guarded 1581 * by PI_OWN_DATA flag. 1582 */ 1583 packet_in->pi_data = (unsigned char *) packet_in_data; 1584 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1585 engine->flags & ENG_SERVER, &ppstate)) 1586 { 1587 LSQ_DEBUG("Cannot parse incoming packet's header"); 1588 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1589 errno = EINVAL; 1590 return -1; 1591 } 1592 1593 packet_in->pi_received = lsquic_time_now(); 1594 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1595 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1596 peer_ctx); 1597} 1598 1599 1600#if __GNUC__ && !defined(NDEBUG) 1601__attribute__((weak)) 1602#endif 1603unsigned 1604lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1605{ 1606 return engine->pub.enp_settings.es_versions; 1607} 1608 1609 1610int 1611lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1612{ 1613 const lsquic_time_t *next_time; 1614 lsquic_time_t now; 1615 1616 next_time = attq_next_time(engine->attq); 1617 if (!next_time) 1618 return 0; 1619 1620 now = lsquic_time_now(); 1621 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1622 return 1; 1623} 1624 1625 1626unsigned 1627lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1628{ 1629 lsquic_time_t now; 1630 now = lsquic_time_now(); 1631 if (from_now < 0) 1632 now -= from_now; 1633 else 1634 now += from_now; 1635 return attq_count_before(engine->attq, now); 1636} 1637 1638 1639