lsquic_engine.c revision 1b97e4af
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/queue.h> 14#include <sys/time.h> 15#include <time.h> 16#include <netinet/in.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <fcntl.h> 20#include <unistd.h> 21#include <netdb.h> 22 23 24 25#include "lsquic.h" 26#include "lsquic_types.h" 27#include "lsquic_alarmset.h" 28#include "lsquic_parse.h" 29#include "lsquic_packet_in.h" 30#include "lsquic_packet_out.h" 31#include "lsquic_senhist.h" 32#include "lsquic_rtt.h" 33#include "lsquic_cubic.h" 34#include "lsquic_pacer.h" 35#include "lsquic_send_ctl.h" 36#include "lsquic_set.h" 37#include "lsquic_conn_flow.h" 38#include "lsquic_sfcw.h" 39#include "lsquic_stream.h" 40#include "lsquic_conn.h" 41#include "lsquic_full_conn.h" 42#include "lsquic_util.h" 43#include "lsquic_qtags.h" 44#include "lsquic_handshake.h" 45#include "lsquic_mm.h" 46#include "lsquic_conn_hash.h" 47#include "lsquic_engine_public.h" 48#include "lsquic_eng_hist.h" 49#include "lsquic_ev_log.h" 50#include "lsquic_version.h" 51#include "lsquic_attq.h" 52 53#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 54#include "lsquic_logger.h" 55 56 57/* The batch of outgoing packets grows and shrinks dynamically */ 58#define MAX_OUT_BATCH_SIZE 1024 59#define MIN_OUT_BATCH_SIZE 256 60#define INITIAL_OUT_BATCH_SIZE 512 61 62typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 63 64static void 65process_connections (struct lsquic_engine *engine, conn_iter_f iter); 66 67static void 68engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 69 70static lsquic_conn_t * 71engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 72 enum lsquic_conn_flags flag); 73 74static void 75force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 76 77/* Nested calls to LSQUIC are not supported */ 78#define ENGINE_IN(e) do { \ 79 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 80 (e)->pub.enp_flags |= ENPUB_PROC; \ 81} while (0) 82 83#define ENGINE_OUT(e) do { \ 84 assert((e)->pub.enp_flags & ENPUB_PROC); \ 85 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 86} while (0) 87 88/* A connection can be referenced from one of six places: 89 * 90 * 1. Connection hash: a connection starts its life in one of those. 91 * 92 * 2. Outgoing queue. 93 * 94 * 3. Incoming queue. 95 * 96 * 4. Pending RW Events queue. 97 * 98 * 5. Advisory Tick Time queue. 99 * 100 * 6. Closing connections queue. This is a transient queue -- it only 101 * exists for the duration of process_connections() function call. 102 * 103 * The idea is to destroy the connection when it is no longer referenced. 104 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 105 * that case, the connection is referenced from two places: (2) and (6). 106 * After its packets are sent, it is only referenced in (6), and at the 107 * end of the function call, when it is removed from (6), reference count 108 * goes to zero and the connection is destroyed. If not all packets can 109 * be sent, at the end of the function call, the connection is referenced 110 * by (2) and will only be removed once all outgoing packets have been 111 * sent. 112 */ 113#define CONN_REF_FLAGS (LSCONN_HASHED \ 114 |LSCONN_HAS_OUTGOING \ 115 |LSCONN_HAS_INCOMING \ 116 |LSCONN_RW_PENDING \ 117 |LSCONN_CLOSING \ 118 |LSCONN_ATTQ) 119 120 121struct out_heap_elem 122{ 123 struct lsquic_conn *ohe_conn; 124 lsquic_time_t ohe_last_sent; 125}; 126 127 128struct out_heap 129{ 130 struct out_heap_elem *oh_elems; 131 unsigned oh_nalloc, 132 oh_nelem; 133}; 134 135 136struct lsquic_engine 137{ 138 struct lsquic_engine_public pub; 139 enum { 140 ENG_SERVER = LSENG_SERVER, 141 ENG_HTTP = LSENG_HTTP, 142 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 143 ENG_PAST_DEADLINE 144 = (1 << 8), /* Previous call to a processing 145 * function went past time threshold. 146 */ 147#ifndef NDEBUG 148 ENG_DTOR = (1 << 26), /* Engine destructor */ 149#endif 150 } flags; 151 const struct lsquic_stream_if *stream_if; 152 void *stream_if_ctx; 153 lsquic_packets_out_f packets_out; 154 void *packets_out_ctx; 155 void *bad_handshake_ctx; 156 struct conn_hash full_conns; 157 TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; 158 struct out_heap conns_out; 159 /* Use a union because only one iterator is being used at any one time */ 160 union { 161 struct { 162 /* This iterator does not have any state: it uses `conns_in' */ 163 } conn_in; 164 struct { 165 /* This iterator does not have any state: it uses `conns_pend_rw' */ 166 } rw_pend; 167 struct { 168 /* Iterator state to process connections in Advisory Tick Time 169 * queue. 170 */ 171 lsquic_time_t cutoff; 172 } attq; 173 struct { 174 /* Iterator state to process all connections */ 175 } all; 176 struct { 177 lsquic_conn_t *conn; 178 } one; 179 } iter_state; 180 struct eng_hist history; 181 unsigned batch_size; 182 unsigned time_until_desired_tick; 183 struct attq *attq; 184 lsquic_time_t proc_time; 185 /* Track time last time a packet was sent to give new connections 186 * priority lower than that of existing connections. 187 */ 188 lsquic_time_t last_sent; 189 lsquic_time_t deadline; 190}; 191 192 193#define OHE_PARENT(i) ((i - 1) / 2) 194#define OHE_LCHILD(i) (2 * i + 1) 195#define OHE_RCHILD(i) (2 * i + 2) 196 197 198static void 199heapify_out_heap (struct out_heap *heap, unsigned i) 200{ 201 struct out_heap_elem el; 202 unsigned smallest; 203 204 assert(i < heap->oh_nelem); 205 206 if (OHE_LCHILD(i) < heap->oh_nelem) 207 { 208 if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < 209 heap->oh_elems[ i ].ohe_last_sent) 210 smallest = OHE_LCHILD(i); 211 else 212 smallest = i; 213 if (OHE_RCHILD(i) < heap->oh_nelem && 214 heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < 215 heap->oh_elems[ smallest ].ohe_last_sent) 216 smallest = OHE_RCHILD(i); 217 } 218 else 219 smallest = i; 220 221 if (smallest != i) 222 { 223 el = heap->oh_elems[ smallest ]; 224 heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; 225 heap->oh_elems[ i ] = el; 226 heapify_out_heap(heap, smallest); 227 } 228} 229 230 231static void 232oh_insert (struct out_heap *heap, lsquic_conn_t *conn) 233{ 234 struct out_heap_elem el; 235 unsigned nalloc, i; 236 237 if (heap->oh_nelem == heap->oh_nalloc) 238 { 239 if (0 == heap->oh_nalloc) 240 nalloc = 4; 241 else 242 nalloc = heap->oh_nalloc * 2; 243 heap->oh_elems = realloc(heap->oh_elems, 244 nalloc * sizeof(heap->oh_elems[0])); 245 if (!heap->oh_elems) 246 { /* Not much we can do here */ 247 LSQ_ERROR("realloc failed"); 248 return; 249 } 250 heap->oh_nalloc = nalloc; 251 } 252 253 heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; 254 heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; 255 ++heap->oh_nelem; 256 257 i = heap->oh_nelem - 1; 258 while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > 259 heap->oh_elems[ i ].ohe_last_sent) 260 { 261 el = heap->oh_elems[ OHE_PARENT(i) ]; 262 heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; 263 heap->oh_elems[ i ] = el; 264 i = OHE_PARENT(i); 265 } 266} 267 268 269static struct lsquic_conn * 270oh_pop (struct out_heap *heap) 271{ 272 struct lsquic_conn *conn; 273 274 assert(heap->oh_nelem); 275 276 conn = heap->oh_elems[0].ohe_conn; 277 --heap->oh_nelem; 278 if (heap->oh_nelem > 0) 279 { 280 heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; 281 heapify_out_heap(heap, 0); 282 } 283 284 return conn; 285} 286 287 288void 289lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 290 unsigned flags) 291{ 292 memset(settings, 0, sizeof(*settings)); 293 settings->es_versions = LSQUIC_DF_VERSIONS; 294 if (flags & ENG_SERVER) 295 { 296 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 297 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 298 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 299 } 300 else 301 { 302 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 303 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 304 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 305 } 306 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 307 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 308 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 309 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 310 settings->es_max_header_list_size 311 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 312 settings->es_ua = LSQUIC_DF_UA; 313 314 settings->es_pdmd = QTAG_X509; 315 settings->es_aead = QTAG_AESG; 316 settings->es_kexs = QTAG_C255; 317 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 318 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 319 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 320 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 321 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 322 settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; 323 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 324 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 325 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 326} 327 328 329/* Note: if returning an error, err_buf must be valid if non-NULL */ 330int 331lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 332 unsigned flags, 333 char *err_buf, size_t err_buf_sz) 334{ 335 if (settings->es_cfcw < LSQUIC_MIN_FCW || 336 settings->es_sfcw < LSQUIC_MIN_FCW) 337 { 338 if (err_buf) 339 snprintf(err_buf, err_buf_sz, "%s", 340 "flow control window set too low"); 341 return -1; 342 } 343 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 344 { 345 if (err_buf) 346 snprintf(err_buf, err_buf_sz, "%s", 347 "No supported QUIC versions specified"); 348 return -1; 349 } 350 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 351 { 352 if (err_buf) 353 snprintf(err_buf, err_buf_sz, "%s", 354 "one or more unsupported QUIC version is specified"); 355 return -1; 356 } 357 return 0; 358} 359 360 361static void 362free_packet (void *ctx, unsigned char *packet_data) 363{ 364 free(packet_data); 365} 366 367 368static void * 369malloc_buf (void *ctx, size_t size) 370{ 371 return malloc(size); 372} 373 374 375static const struct lsquic_packout_mem_if stock_pmi = 376{ 377 malloc_buf, (void(*)(void *, void *)) free_packet, 378}; 379 380 381lsquic_engine_t * 382lsquic_engine_new (unsigned flags, 383 const struct lsquic_engine_api *api) 384{ 385 lsquic_engine_t *engine; 386 int tag_buf_len; 387 char err_buf[100]; 388 389 if (!api->ea_packets_out) 390 { 391 LSQ_ERROR("packets_out callback is not specified"); 392 return NULL; 393 } 394 395 if (api->ea_settings && 396 0 != lsquic_engine_check_settings(api->ea_settings, flags, 397 err_buf, sizeof(err_buf))) 398 { 399 LSQ_ERROR("cannot create engine: %s", err_buf); 400 return NULL; 401 } 402 403 engine = calloc(1, sizeof(*engine)); 404 if (!engine) 405 return NULL; 406 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 407 { 408 free(engine); 409 return NULL; 410 } 411 if (api->ea_settings) 412 engine->pub.enp_settings = *api->ea_settings; 413 else 414 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 415 tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf, 416 sizeof(engine->pub.enp_ver_tags_buf), 417 engine->pub.enp_settings.es_versions); 418 if (tag_buf_len <= 0) 419 { 420 LSQ_ERROR("cannot generate version tags buffer"); 421 free(engine); 422 return NULL; 423 } 424 engine->pub.enp_ver_tags_len = tag_buf_len; 425 426 engine->flags = flags; 427 engine->stream_if = api->ea_stream_if; 428 engine->stream_if_ctx = api->ea_stream_if_ctx; 429 engine->packets_out = api->ea_packets_out; 430 engine->packets_out_ctx = api->ea_packets_out_ctx; 431 if (api->ea_pmi) 432 { 433 engine->pub.enp_pmi = api->ea_pmi; 434 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 435 } 436 else 437 { 438 engine->pub.enp_pmi = &stock_pmi; 439 engine->pub.enp_pmi_ctx = NULL; 440 } 441 engine->pub.enp_engine = engine; 442 TAILQ_INIT(&engine->conns_in); 443 TAILQ_INIT(&engine->conns_pend_rw); 444 conn_hash_init(&engine->full_conns, ~0); 445 engine->attq = attq_create(); 446 eng_hist_init(&engine->history); 447 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 448 449 450 LSQ_INFO("instantiated engine"); 451 return engine; 452} 453 454 455static void 456grow_batch_size (struct lsquic_engine *engine) 457{ 458 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 459} 460 461 462static void 463shrink_batch_size (struct lsquic_engine *engine) 464{ 465 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 466} 467 468 469/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */ 470static void 471destroy_conn (lsquic_conn_t *conn) 472{ 473 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 474 conn->cn_if->ci_destroy(conn); 475} 476 477 478static lsquic_conn_t * 479new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 480 unsigned short max_packet_size) 481{ 482 lsquic_conn_t *conn; 483 unsigned flags; 484 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 485 conn = full_conn_client_new(&engine->pub, engine->stream_if, 486 engine->stream_if_ctx, flags, hostname, max_packet_size); 487 if (!conn) 488 return NULL; 489 if (0 != conn_hash_add_new(&engine->full_conns, conn)) 490 { 491 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 492 conn->cn_cid); 493 destroy_conn(conn); 494 return NULL; 495 } 496 assert(!(conn->cn_flags & 497 (CONN_REF_FLAGS 498 & ~LSCONN_RW_PENDING /* This flag may be set as effect of user 499 callbacks */ 500 ))); 501 conn->cn_flags |= LSCONN_HASHED; 502 return conn; 503} 504 505 506static lsquic_conn_t * 507find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 508 struct packin_parse_state *ppstate, const struct sockaddr *sa_peer, 509 void *peer_ctx) 510{ 511 lsquic_conn_t *conn; 512 513 if (lsquic_packet_in_is_prst(packet_in) 514 && !engine->pub.enp_settings.es_honor_prst) 515 { 516 LSQ_DEBUG("public reset packet: discarding"); 517 return NULL; 518 } 519 520 if (!(packet_in->pi_flags & PI_CONN_ID)) 521 { 522 LSQ_DEBUG("packet header does not have connection ID: discarding"); 523 return NULL; 524 } 525 526 conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL); 527 if (conn) 528 { 529 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 530 return conn; 531 } 532 533 return conn; 534} 535 536 537static void 538add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, 539 enum rw_reason reason) 540{ 541 int hist_idx; 542 543 TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); 544 engine_incref_conn(conn, LSCONN_RW_PENDING); 545 546 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 547 conn->cn_rw_hist_buf[ hist_idx ] = reason; 548 ++conn->cn_rw_hist_idx; 549 550 if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) 551 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " 552 "rw_hist: %.*s", (char) reason, 553 (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 554 else 555 EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", 556 (char) reason); 557} 558 559 560#if !defined(NDEBUG) && __GNUC__ 561__attribute__((weak)) 562#endif 563void 564lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, 565 lsquic_conn_t *conn, enum rw_reason reason) 566{ 567 if (0 == (enpub->enp_flags & ENPUB_PROC) && 568 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) 569 { 570 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 571 add_conn_to_pend_rw(engine, conn, reason); 572 } 573} 574 575 576void 577lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 578 lsquic_conn_t *conn, lsquic_time_t tick_time) 579{ 580 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 581 /* Instead of performing an update, we simply remove the connection from 582 * the queue and add it back. This should not happen in at the time of 583 * this writing. 584 */ 585 if (conn->cn_flags & LSCONN_ATTQ) 586 { 587 attq_remove(engine->attq, conn); 588 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 589 } 590 if (conn && !(conn->cn_flags & LSCONN_ATTQ) && 591 0 == attq_maybe_add(engine->attq, conn, tick_time)) 592 engine_incref_conn(conn, LSCONN_ATTQ); 593} 594 595 596static void 597update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, 598 int progress_made) 599{ 600 rw_hist_idx_t hist_idx; 601 const unsigned char *empty; 602 const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; 603 604 if (!pendrw_check) 605 return; 606 607 /* Convert previous entry to uppercase: */ 608 hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); 609 conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; 610 611 LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); 612 if (progress_made) 613 { 614 conn->cn_noprogress_count = 0; 615 return; 616 } 617 618 EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " 619 "no progress"); 620 ++conn->cn_noprogress_count; 621 if (conn->cn_noprogress_count <= pendrw_check) 622 return; 623 624 conn->cn_flags |= LSCONN_NEVER_PEND_RW; 625 empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, 626 sizeof(conn->cn_rw_hist_buf)); 627 if (empty) 628 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 629 "(rw_hist: %.*s): will not put it onto Pend RW queue again", 630 conn->cn_cid, conn->cn_noprogress_count, 631 (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); 632 else 633 { 634 hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); 635 LSQ_WARN("conn %"PRIu64" noprogress count reached %u " 636 "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", 637 conn->cn_cid, conn->cn_noprogress_count, 638 /* First part of history: */ 639 (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), 640 conn->cn_rw_hist_buf + hist_idx, 641 /* Second part of history: */ 642 hist_idx, conn->cn_rw_hist_buf); 643 } 644} 645 646 647/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 648static int 649process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 650 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 651 const struct sockaddr *sa_peer, void *peer_ctx) 652{ 653 lsquic_conn_t *conn; 654 655 conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx); 656 if (!conn) 657 { 658 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 659 return 1; 660 } 661 662 if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { 663 TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); 664 engine_incref_conn(conn, LSCONN_HAS_INCOMING); 665 } 666 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 667 lsquic_packet_in_upref(packet_in); 668 conn->cn_peer_ctx = peer_ctx; 669 conn->cn_if->ci_packet_in(conn, packet_in); 670 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 671 return 0; 672} 673 674 675static int 676conn_attq_expired (const struct lsquic_engine *engine, 677 const lsquic_conn_t *conn) 678{ 679 assert(conn->cn_attq_elem); 680 return lsquic_conn_adv_time(conn) < engine->proc_time; 681} 682 683 684/* Iterator for connections with incoming packets */ 685static lsquic_conn_t * 686conn_iter_next_incoming (struct lsquic_engine *engine) 687{ 688 enum lsquic_conn_flags addl_flags; 689 lsquic_conn_t *conn; 690 while ((conn = TAILQ_FIRST(&engine->conns_in))) 691 { 692 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 693 if (conn->cn_flags & LSCONN_RW_PENDING) 694 { 695 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 696 EV_LOG_CONN_EVENT(conn->cn_cid, 697 "removed from pending RW queue (processing incoming)"); 698 } 699 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 700 { 701 addl_flags = LSCONN_ATTQ; 702 attq_remove(engine->attq, conn); 703 } 704 else 705 addl_flags = 0; 706 conn = engine_decref_conn(engine, conn, 707 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 708 if (conn) 709 break; 710 } 711 return conn; 712} 713 714 715/* Iterator for connections with that have pending read/write events */ 716static lsquic_conn_t * 717conn_iter_next_rw_pend (struct lsquic_engine *engine) 718{ 719 enum lsquic_conn_flags addl_flags; 720 lsquic_conn_t *conn; 721 while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) 722 { 723 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 724 EV_LOG_CONN_EVENT(conn->cn_cid, 725 "removed from pending RW queue (processing pending RW conns)"); 726 if (conn->cn_flags & LSCONN_HAS_INCOMING) 727 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 728 if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) 729 { 730 addl_flags = LSCONN_ATTQ; 731 attq_remove(engine->attq, conn); 732 } 733 else 734 addl_flags = 0; 735 conn = engine_decref_conn(engine, conn, 736 LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); 737 if (conn) 738 break; 739 } 740 return conn; 741} 742 743 744void 745lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) 746{ 747 LSQ_DEBUG("process connections with incoming packets"); 748 ENGINE_IN(engine); 749 process_connections(engine, conn_iter_next_incoming); 750 assert(TAILQ_EMPTY(&engine->conns_in)); 751 ENGINE_OUT(engine); 752} 753 754 755int 756lsquic_engine_has_pend_rw (lsquic_engine_t *engine) 757{ 758 return !(engine->flags & ENG_PAST_DEADLINE) 759 && !TAILQ_EMPTY(&engine->conns_pend_rw); 760} 761 762 763void 764lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) 765{ 766 LSQ_DEBUG("process connections with pending RW events"); 767 ENGINE_IN(engine); 768 process_connections(engine, conn_iter_next_rw_pend); 769 ENGINE_OUT(engine); 770} 771 772 773void 774lsquic_engine_destroy (lsquic_engine_t *engine) 775{ 776 lsquic_conn_t *conn; 777 778 LSQ_DEBUG("destroying engine"); 779#ifndef NDEBUG 780 engine->flags |= ENG_DTOR; 781#endif 782 783 while (engine->conns_out.oh_nelem > 0) 784 { 785 --engine->conns_out.oh_nelem; 786 conn = engine->conns_out.oh_elems[ 787 engine->conns_out.oh_nelem ].ohe_conn; 788 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 789 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 790 } 791 792 for (conn = conn_hash_first(&engine->full_conns); conn; 793 conn = conn_hash_next(&engine->full_conns)) 794 force_close_conn(engine, conn); 795 conn_hash_cleanup(&engine->full_conns); 796 797 798 attq_destroy(engine->attq); 799 800 assert(0 == engine->conns_out.oh_nelem); 801 assert(TAILQ_EMPTY(&engine->conns_pend_rw)); 802 lsquic_mm_cleanup(&engine->pub.enp_mm); 803 free(engine->conns_out.oh_elems); 804 free(engine); 805} 806 807 808#if __GNUC__ 809__attribute__((nonnull(3))) 810#endif 811static lsquic_conn_t * 812remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, 813 lsquic_conn_t *conn, const char *reason) 814{ 815 assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); 816 if (conn->cn_flags & LSCONN_HAS_INCOMING) 817 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 818 if (conn->cn_flags & LSCONN_RW_PENDING) 819 { 820 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 821 EV_LOG_CONN_EVENT(conn->cn_cid, 822 "removed from pending RW queue (%s)", reason); 823 } 824 conn = engine_decref_conn(engine, conn, 825 LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); 826 assert(conn); 827 return conn; 828} 829 830 831static lsquic_conn_t * 832conn_iter_next_one (lsquic_engine_t *engine) 833{ 834 lsquic_conn_t *conn = engine->iter_state.one.conn; 835 if (conn) 836 { 837 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 838 conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); 839 if (conn && (conn->cn_flags & LSCONN_ATTQ) && 840 conn_attq_expired(engine, conn)) 841 { 842 attq_remove(engine->attq, conn); 843 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 844 } 845 engine->iter_state.one.conn = NULL; 846 } 847 return conn; 848} 849 850 851int 852lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, 853 void *conn_ctx, const char *hostname, 854 unsigned short max_packet_size) 855{ 856 lsquic_conn_t *conn; 857 858 if (engine->flags & ENG_SERVER) 859 { 860 LSQ_ERROR("`%s' must only be called in client mode", __func__); 861 return -1; 862 } 863 864 if (0 == max_packet_size) 865 { 866 switch (peer_sa->sa_family) 867 { 868 case AF_INET: 869 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 870 break; 871 default: 872 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 873 break; 874 } 875 } 876 877 conn = new_full_conn_client(engine, hostname, max_packet_size); 878 if (!conn) 879 return -1; 880 lsquic_conn_record_peer_sa(conn, peer_sa); 881 conn->cn_peer_ctx = conn_ctx; 882 engine->iter_state.one.conn = conn; 883 process_connections(engine, conn_iter_next_one); 884 return 0; 885} 886 887 888static void 889remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 890{ 891 conn_hash_remove(&engine->full_conns, conn); 892 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 893} 894 895 896static void 897refflags2str (enum lsquic_conn_flags flags, char s[7]) 898{ 899 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 900 *s = 'H'; s += !!(flags & LSCONN_HASHED); 901 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 902 *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); 903 *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); 904 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 905 *s = '\0'; 906} 907 908 909static void 910engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 911{ 912 char str[7]; 913 assert(flag & CONN_REF_FLAGS); 914 assert(!(conn->cn_flags & flag)); 915 conn->cn_flags |= flag; 916 LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid, 917 (refflags2str(conn->cn_flags, str), str)); 918} 919 920 921static lsquic_conn_t * 922engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 923 enum lsquic_conn_flags flags) 924{ 925 char str[7]; 926 assert(flags & CONN_REF_FLAGS); 927 assert(conn->cn_flags & flags); 928#ifndef NDEBUG 929 if (flags & LSCONN_CLOSING) 930 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 931#endif 932 conn->cn_flags &= ~flags; 933 LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid, 934 (refflags2str(conn->cn_flags, str), str)); 935 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 936 { 937 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 938 destroy_conn(conn); 939 return NULL; 940 } 941 else 942 return conn; 943} 944 945 946/* This is not a general-purpose function. Only call from engine dtor. */ 947static void 948force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 949{ 950 assert(engine->flags & ENG_DTOR); 951 const enum lsquic_conn_flags flags = conn->cn_flags; 952 assert(conn->cn_flags & CONN_REF_FLAGS); 953 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 954 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 955 if (flags & LSCONN_HAS_INCOMING) 956 { 957 TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); 958 (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); 959 } 960 if (flags & LSCONN_RW_PENDING) 961 { 962 TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); 963 EV_LOG_CONN_EVENT(conn->cn_cid, 964 "removed from pending RW queue (engine destruction)"); 965 (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); 966 } 967 if (flags & LSCONN_ATTQ) 968 attq_remove(engine->attq, conn); 969 if (flags & LSCONN_HASHED) 970 remove_conn_from_hash(engine, conn); 971} 972 973 974/* Iterator for all connections. 975 * Returned connections are removed from the Incoming, Pending RW Event, 976 * and Advisory Tick Time queues if necessary. 977 */ 978static lsquic_conn_t * 979conn_iter_next_all (struct lsquic_engine *engine) 980{ 981 lsquic_conn_t *conn; 982 983 conn = conn_hash_next(&engine->full_conns); 984 985 if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) 986 conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); 987 if (conn && (conn->cn_flags & LSCONN_ATTQ) 988 && conn_attq_expired(engine, conn)) 989 { 990 attq_remove(engine->attq, conn); 991 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 992 } 993 994 return conn; 995} 996 997 998static lsquic_conn_t * 999conn_iter_next_attq (struct lsquic_engine *engine) 1000{ 1001 lsquic_conn_t *conn; 1002 1003 conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); 1004 if (conn) 1005 { 1006 assert(conn->cn_flags & LSCONN_ATTQ); 1007 if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) 1008 conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); 1009 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 1010 } 1011 1012 return conn; 1013} 1014 1015 1016void 1017lsquic_engine_proc_all (lsquic_engine_t *engine) 1018{ 1019 ENGINE_IN(engine); 1020 /* We poke each connection every time as initial implementation. If it 1021 * proves to be too inefficient, we will need to figure out 1022 * a) when to stop processing; and 1023 * b) how to remember state between calls. 1024 */ 1025 conn_hash_reset_iter(&engine->full_conns); 1026 process_connections(engine, conn_iter_next_all); 1027 ENGINE_OUT(engine); 1028} 1029 1030 1031void 1032lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) 1033{ 1034 lsquic_time_t prev_min, cutoff; 1035 1036 LSQ_DEBUG("process connections in attq"); 1037 ENGINE_IN(engine); 1038 cutoff = lsquic_time_now(); 1039 prev_min = attq_set_min(engine->attq, cutoff); /* Prevent infinite loop */ 1040 engine->iter_state.attq.cutoff = cutoff; 1041 process_connections(engine, conn_iter_next_attq); 1042 attq_set_min(engine->attq, prev_min); /* Restore previos value */ 1043 ENGINE_OUT(engine); 1044} 1045 1046 1047static int 1048generate_header (const lsquic_packet_out_t *packet_out, 1049 const struct parse_funcs *pf, lsquic_cid_t cid, 1050 unsigned char *buf, size_t bufsz) 1051{ 1052 return pf->pf_gen_reg_pkt_header(buf, bufsz, 1053 packet_out->po_flags & PO_CONN_ID ? &cid : NULL, 1054 packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL, 1055 packet_out->po_flags & PO_NONCE ? packet_out->po_nonce : NULL, 1056 packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out)); 1057} 1058 1059 1060static ssize_t 1061really_encrypt_packet (const lsquic_conn_t *conn, 1062 const lsquic_packet_out_t *packet_out, 1063 unsigned char *buf, size_t bufsz) 1064{ 1065 int enc, header_sz, is_hello_packet; 1066 size_t packet_sz; 1067 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 1068 1069 header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid, 1070 header_buf, sizeof(header_buf)); 1071 if (header_sz < 0) 1072 return -1; 1073 1074 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 1075 enc = lsquic_enc(conn->cn_enc_session, conn->cn_version, 0, 1076 packet_out->po_packno, header_buf, header_sz, 1077 packet_out->po_data, packet_out->po_data_sz, 1078 buf, bufsz, &packet_sz, is_hello_packet); 1079 if (0 == enc) 1080 { 1081 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, " 1082 "ciphertext is %zd bytes", 1083 packet_out->po_packno, 1084 lsquic_po_header_length(packet_out->po_flags) + 1085 packet_out->po_data_sz, 1086 packet_sz); 1087 return packet_sz; 1088 } 1089 else 1090 return -1; 1091} 1092 1093 1094static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 1095encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 1096 lsquic_packet_out_t *packet_out) 1097{ 1098 ssize_t enc_sz; 1099 size_t bufsz; 1100 unsigned char *buf; 1101 1102 bufsz = lsquic_po_header_length(packet_out->po_flags) + 1103 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 1104 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz); 1105 if (!buf) 1106 { 1107 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 1108 bufsz); 1109 return ENCPA_NOMEM; 1110 } 1111 1112 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 1113 1114 if (enc_sz < 0) 1115 { 1116 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf); 1117 return ENCPA_BADCRYPT; 1118 } 1119 1120 packet_out->po_enc_data = buf; 1121 packet_out->po_enc_data_sz = enc_sz; 1122 packet_out->po_flags |= PO_ENCRYPTED; 1123 1124 return ENCPA_OK; 1125} 1126 1127 1128struct out_batch 1129{ 1130 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 1131 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 1132 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 1133}; 1134 1135 1136STAILQ_HEAD(closed_conns, lsquic_conn); 1137 1138 1139struct conns_out_iter 1140{ 1141 struct out_heap *coi_heap; 1142 TAILQ_HEAD(, lsquic_conn) coi_active_list, 1143 coi_inactive_list; 1144 lsquic_conn_t *coi_next; 1145#ifndef NDEBUG 1146 lsquic_time_t coi_last_sent; 1147#endif 1148}; 1149 1150 1151static void 1152coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 1153{ 1154 iter->coi_heap = &engine->conns_out; 1155 iter->coi_next = NULL; 1156 TAILQ_INIT(&iter->coi_active_list); 1157 TAILQ_INIT(&iter->coi_inactive_list); 1158#ifndef NDEBUG 1159 iter->coi_last_sent = 0; 1160#endif 1161} 1162 1163 1164static lsquic_conn_t * 1165coi_next (struct conns_out_iter *iter) 1166{ 1167 lsquic_conn_t *conn; 1168 1169 if (iter->coi_heap->oh_nelem > 0) 1170 { 1171 conn = oh_pop(iter->coi_heap); 1172 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1173 conn->cn_flags |= LSCONN_COI_ACTIVE; 1174#ifndef NDEBUG 1175 if (iter->coi_last_sent) 1176 assert(iter->coi_last_sent <= conn->cn_last_sent); 1177 iter->coi_last_sent = conn->cn_last_sent; 1178#endif 1179 return conn; 1180 } 1181 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1182 { 1183 conn = iter->coi_next; 1184 if (!conn) 1185 conn = TAILQ_FIRST(&iter->coi_active_list); 1186 if (conn) 1187 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1188 return conn; 1189 } 1190 else 1191 return NULL; 1192} 1193 1194 1195static void 1196coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1197{ 1198 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1199 { 1200 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1201 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1202 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1203 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1204 conn->cn_flags |= LSCONN_COI_INACTIVE; 1205 } 1206} 1207 1208 1209static void 1210coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn) 1211{ 1212 assert(conn->cn_flags & LSCONN_COI_ACTIVE); 1213 if (conn->cn_flags & LSCONN_COI_ACTIVE) 1214 { 1215 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1216 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1217 } 1218} 1219 1220 1221static void 1222coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1223{ 1224 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1225 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1226 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1227 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1228 conn->cn_flags |= LSCONN_COI_ACTIVE; 1229} 1230 1231 1232static void 1233coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1234{ 1235 lsquic_conn_t *conn; 1236 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1237 { 1238 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1239 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1240 oh_insert(iter->coi_heap, conn); 1241 } 1242 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1243 { 1244 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1245 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1246 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1247 } 1248} 1249 1250 1251static unsigned 1252send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1253 struct out_batch *batch, unsigned n_to_send) 1254{ 1255 int n_sent, i; 1256 lsquic_time_t now; 1257 1258 /* Set sent time before the write to avoid underestimating RTT */ 1259 now = lsquic_time_now(); 1260 for (i = 0; i < (int) n_to_send; ++i) 1261 batch->packets[i]->po_sent = now; 1262 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1263 n_to_send); 1264 if (n_sent >= 0) 1265 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1266 else 1267 { 1268 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1269 n_sent = 0; 1270 } 1271 if (n_sent > 0) 1272 engine->last_sent = now + n_sent; 1273 for (i = 0; i < n_sent; ++i) 1274 { 1275 eng_hist_inc(&engine->history, now, sl_packets_out); 1276 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1277 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1278 batch->packets[i]); 1279 /* `i' is added to maintain relative order */ 1280 batch->conns[i]->cn_last_sent = now + i; 1281 /* Release packet out buffer as soon as the packet is sent 1282 * successfully. If not successfully sent, we hold on to 1283 * this buffer until the packet sending is attempted again 1284 * or until it times out and regenerated. 1285 */ 1286 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1287 { 1288 batch->packets[i]->po_flags &= ~PO_ENCRYPTED; 1289 engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, 1290 batch->packets[i]->po_enc_data); 1291 batch->packets[i]->po_enc_data = NULL; /* JIC */ 1292 } 1293 } 1294 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1295 for ( ; i < (int) n_to_send; ++i) 1296 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1297 /* Return packets to the connection in reverse order so that the packet 1298 * ordering is maintained. 1299 */ 1300 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1301 { 1302 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1303 batch->packets[i]); 1304 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1305 coi_reactivate(conns_iter, batch->conns[i]); 1306 } 1307 return n_sent; 1308} 1309 1310 1311/* Return 1 if went past deadline, 0 otherwise */ 1312static int 1313check_deadline (lsquic_engine_t *engine) 1314{ 1315 if (engine->pub.enp_settings.es_proc_time_thresh && 1316 lsquic_time_now() > engine->deadline) 1317 { 1318 LSQ_INFO("went past threshold of %u usec, stop sending", 1319 engine->pub.enp_settings.es_proc_time_thresh); 1320 engine->flags |= ENG_PAST_DEADLINE; 1321 return 1; 1322 } 1323 else 1324 return 0; 1325} 1326 1327 1328static void 1329send_packets_out (struct lsquic_engine *engine, 1330 struct closed_conns *closed_conns) 1331{ 1332 unsigned n, w, n_sent, n_batches_sent; 1333 lsquic_packet_out_t *packet_out; 1334 lsquic_conn_t *conn; 1335 struct out_batch batch; 1336 struct conns_out_iter conns_iter; 1337 int shrink, deadline_exceeded; 1338 1339 coi_init(&conns_iter, engine); 1340 n_batches_sent = 0; 1341 n_sent = 0, n = 0; 1342 shrink = 0; 1343 deadline_exceeded = 0; 1344 1345 while ((conn = coi_next(&conns_iter))) 1346 { 1347 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1348 if (!packet_out) { 1349 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1350 conn->cn_cid); 1351 coi_deactivate(&conns_iter, conn); 1352 continue; 1353 } 1354 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1355 { 1356 switch (encrypt_packet(engine, conn, packet_out)) 1357 { 1358 case ENCPA_NOMEM: 1359 /* Send what we have and wait for a more opportune moment */ 1360 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1361 goto end_for; 1362 case ENCPA_BADCRYPT: 1363 /* This is pretty bad: close connection immediately */ 1364 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1365 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1366 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1367 { 1368 if (!(conn->cn_flags & LSCONN_CLOSING)) 1369 { 1370 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1371 engine_incref_conn(conn, LSCONN_CLOSING); 1372 if (conn->cn_flags & LSCONN_HASHED) 1373 remove_conn_from_hash(engine, conn); 1374 } 1375 coi_remove(&conns_iter, conn); 1376 } 1377 continue; 1378 case ENCPA_OK: 1379 break; 1380 } 1381 } 1382 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1383 packet_out->po_packno, conn->cn_cid); 1384 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1385 if (packet_out->po_flags & PO_ENCRYPTED) 1386 { 1387 batch.outs[n].buf = packet_out->po_enc_data; 1388 batch.outs[n].sz = packet_out->po_enc_data_sz; 1389 } 1390 else 1391 { 1392 batch.outs[n].buf = packet_out->po_data; 1393 batch.outs[n].sz = packet_out->po_data_sz; 1394 } 1395 batch.outs [n].peer_ctx = conn->cn_peer_ctx; 1396 batch.outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1397 batch.outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1398 batch.conns [n] = conn; 1399 batch.packets[n] = packet_out; 1400 ++n; 1401 if (n == engine->batch_size) 1402 { 1403 n = 0; 1404 w = send_batch(engine, &conns_iter, &batch, engine->batch_size); 1405 ++n_batches_sent; 1406 n_sent += w; 1407 if (w < engine->batch_size) 1408 { 1409 shrink = 1; 1410 break; 1411 } 1412 deadline_exceeded = check_deadline(engine); 1413 if (deadline_exceeded) 1414 break; 1415 grow_batch_size(engine); 1416 } 1417 } 1418 end_for: 1419 1420 if (n > 0) { 1421 w = send_batch(engine, &conns_iter, &batch, n); 1422 n_sent += w; 1423 shrink = w < n; 1424 ++n_batches_sent; 1425 deadline_exceeded = check_deadline(engine); 1426 } 1427 1428 if (shrink) 1429 shrink_batch_size(engine); 1430 else if (n_batches_sent > 1 && !deadline_exceeded) 1431 grow_batch_size(engine); 1432 1433 coi_reheap(&conns_iter, engine); 1434 1435 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1436} 1437 1438 1439int 1440lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1441{ 1442 return !(engine->flags & ENG_PAST_DEADLINE) 1443 && ( engine->conns_out.oh_nelem > 0 1444 ) 1445 ; 1446} 1447 1448 1449static void 1450reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1451{ 1452 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1453 engine->flags &= ~ENG_PAST_DEADLINE; 1454} 1455 1456 1457/* TODO: this is a user-facing function, account for load */ 1458void 1459lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1460{ 1461 lsquic_conn_t *conn; 1462 struct closed_conns closed_conns; 1463 1464 STAILQ_INIT(&closed_conns); 1465 reset_deadline(engine, lsquic_time_now()); 1466 1467 send_packets_out(engine, &closed_conns); 1468 1469 while ((conn = STAILQ_FIRST(&closed_conns))) { 1470 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1471 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1472 } 1473 1474} 1475 1476 1477static void 1478process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) 1479{ 1480 lsquic_conn_t *conn; 1481 enum tick_st tick_st; 1482 lsquic_time_t now = lsquic_time_now(); 1483 struct closed_conns closed_conns; 1484 1485 engine->proc_time = now; 1486 eng_hist_tick(&engine->history, now); 1487 1488 STAILQ_INIT(&closed_conns); 1489 reset_deadline(engine, now); 1490 1491 while ((conn = next_conn(engine))) 1492 { 1493 tick_st = conn->cn_if->ci_tick(conn, now); 1494 if (conn_iter_next_rw_pend == next_conn) 1495 update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); 1496 if (tick_st & TICK_SEND) 1497 { 1498 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1499 { 1500 oh_insert(&engine->conns_out, conn); 1501 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1502 } 1503 } 1504 if (tick_st & TICK_CLOSE) 1505 { 1506 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1507 engine_incref_conn(conn, LSCONN_CLOSING); 1508 if (conn->cn_flags & LSCONN_HASHED) 1509 remove_conn_from_hash(engine, conn); 1510 } 1511 } 1512 1513 if (lsquic_engine_has_unsent_packets(engine)) 1514 send_packets_out(engine, &closed_conns); 1515 1516 while ((conn = STAILQ_FIRST(&closed_conns))) { 1517 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1518 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1519 } 1520 1521} 1522 1523 1524/* Return 0 if packet is being processed by a real connection, 1 if the 1525 * packet was processed, but not by a connection, and -1 on error. 1526 */ 1527int 1528lsquic_engine_packet_in (lsquic_engine_t *engine, 1529 const unsigned char *packet_in_data, size_t packet_in_size, 1530 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1531 void *peer_ctx) 1532{ 1533 struct packin_parse_state ppstate; 1534 lsquic_packet_in_t *packet_in; 1535 1536 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1537 { 1538 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1539 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1540 errno = E2BIG; 1541 return -1; 1542 } 1543 1544 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1545 if (!packet_in) 1546 return -1; 1547 1548 /* Library does not modify packet_in_data, it is not referenced after 1549 * this function returns and subsequent release of pi_data is guarded 1550 * by PI_OWN_DATA flag. 1551 */ 1552 packet_in->pi_data = (unsigned char *) packet_in_data; 1553 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1554 engine->flags & ENG_SERVER, &ppstate)) 1555 { 1556 LSQ_DEBUG("Cannot parse incoming packet's header"); 1557 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1558 errno = EINVAL; 1559 return -1; 1560 } 1561 1562 packet_in->pi_received = lsquic_time_now(); 1563 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1564 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1565 peer_ctx); 1566} 1567 1568 1569#if __GNUC__ && !defined(NDEBUG) 1570__attribute__((weak)) 1571#endif 1572unsigned 1573lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1574{ 1575 return engine->pub.enp_settings.es_versions; 1576} 1577 1578 1579int 1580lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1581{ 1582 const lsquic_time_t *next_time; 1583 lsquic_time_t now; 1584 1585 next_time = attq_next_time(engine->attq); 1586 if (!next_time) 1587 return 0; 1588 1589 now = lsquic_time_now(); 1590 *diff = (int) ((int64_t) *next_time - (int64_t) now); 1591 return 1; 1592} 1593 1594 1595unsigned 1596lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1597{ 1598 lsquic_time_t now; 1599 now = lsquic_time_now(); 1600 if (from_now < 0) 1601 now -= from_now; 1602 else 1603 now += from_now; 1604 return attq_count_before(engine->attq, now); 1605} 1606 1607 1608