lsquic_engine.c revision 8ca33e0e
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_engine.c - QUIC engine 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <limits.h> 10#include <stdint.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <string.h> 14#include <sys/queue.h> 15#include <time.h> 16#ifndef WIN32 17#include <sys/time.h> 18#include <netinet/in.h> 19#include <sys/types.h> 20#include <sys/stat.h> 21#include <fcntl.h> 22#include <unistd.h> 23#include <netdb.h> 24#endif 25 26 27 28#include "lsquic.h" 29#include "lsquic_types.h" 30#include "lsquic_alarmset.h" 31#include "lsquic_parse_common.h" 32#include "lsquic_parse.h" 33#include "lsquic_packet_in.h" 34#include "lsquic_packet_out.h" 35#include "lsquic_senhist.h" 36#include "lsquic_rtt.h" 37#include "lsquic_cubic.h" 38#include "lsquic_pacer.h" 39#include "lsquic_send_ctl.h" 40#include "lsquic_set.h" 41#include "lsquic_conn_flow.h" 42#include "lsquic_sfcw.h" 43#include "lsquic_stream.h" 44#include "lsquic_conn.h" 45#include "lsquic_full_conn.h" 46#include "lsquic_util.h" 47#include "lsquic_qtags.h" 48#include "lsquic_str.h" 49#include "lsquic_handshake.h" 50#include "lsquic_mm.h" 51#include "lsquic_conn_hash.h" 52#include "lsquic_engine_public.h" 53#include "lsquic_eng_hist.h" 54#include "lsquic_ev_log.h" 55#include "lsquic_version.h" 56#include "lsquic_hash.h" 57#include "lsquic_attq.h" 58#include "lsquic_min_heap.h" 59#include "lsquic_http1x_if.h" 60 61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE 62#include "lsquic_logger.h" 63 64#define MIN(a, b) ((a) < (b) ? (a) : (b)) 65 66 67/* The batch of outgoing packets grows and shrinks dynamically */ 68#define MAX_OUT_BATCH_SIZE 1024 69#define MIN_OUT_BATCH_SIZE 4 70#define INITIAL_OUT_BATCH_SIZE 32 71 72struct out_batch 73{ 74 lsquic_conn_t *conns [MAX_OUT_BATCH_SIZE]; 75 lsquic_packet_out_t *packets[MAX_OUT_BATCH_SIZE]; 76 struct lsquic_out_spec outs [MAX_OUT_BATCH_SIZE]; 77}; 78 79typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); 80 81static void 82process_connections (struct lsquic_engine *engine, conn_iter_f iter, 83 lsquic_time_t now); 84 85static void 86engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); 87 88static lsquic_conn_t * 89engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 90 enum lsquic_conn_flags flag); 91 92static void 93force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); 94 95/* Nested calls to LSQUIC are not supported */ 96#define ENGINE_IN(e) do { \ 97 assert(!((e)->pub.enp_flags & ENPUB_PROC)); \ 98 (e)->pub.enp_flags |= ENPUB_PROC; \ 99} while (0) 100 101#define ENGINE_OUT(e) do { \ 102 assert((e)->pub.enp_flags & ENPUB_PROC); \ 103 (e)->pub.enp_flags &= ~ENPUB_PROC; \ 104} while (0) 105 106/* A connection can be referenced from one of six places: 107 * 108 * 1. Connection hash: a connection starts its life in one of those. 109 * 110 * 2. Outgoing queue. 111 * 112 * 3. Tickable queue 113 * 114 * 4. Advisory Tick Time queue. 115 * 116 * 5. Closing connections queue. This is a transient queue -- it only 117 * exists for the duration of process_connections() function call. 118 * 119 * 6. Ticked connections queue. Another transient queue, similar to (5). 120 * 121 * The idea is to destroy the connection when it is no longer referenced. 122 * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In 123 * that case, the connection is referenced from two places: (2) and (5). 124 * After its packets are sent, it is only referenced in (5), and at the 125 * end of the function call, when it is removed from (5), reference count 126 * goes to zero and the connection is destroyed. If not all packets can 127 * be sent, at the end of the function call, the connection is referenced 128 * by (2) and will only be removed once all outgoing packets have been 129 * sent. 130 */ 131#define CONN_REF_FLAGS (LSCONN_HASHED \ 132 |LSCONN_HAS_OUTGOING \ 133 |LSCONN_TICKABLE \ 134 |LSCONN_TICKED \ 135 |LSCONN_CLOSING \ 136 |LSCONN_ATTQ) 137 138 139 140 141struct lsquic_engine 142{ 143 struct lsquic_engine_public pub; 144 enum { 145 ENG_SERVER = LSENG_SERVER, 146 ENG_HTTP = LSENG_HTTP, 147 ENG_COOLDOWN = (1 << 7), /* Cooldown: no new connections */ 148 ENG_PAST_DEADLINE 149 = (1 << 8), /* Previous call to a processing 150 * function went past time threshold. 151 */ 152#ifndef NDEBUG 153 ENG_DTOR = (1 << 26), /* Engine destructor */ 154#endif 155 } flags; 156 const struct lsquic_stream_if *stream_if; 157 void *stream_if_ctx; 158 lsquic_packets_out_f packets_out; 159 void *packets_out_ctx; 160 void *bad_handshake_ctx; 161 struct conn_hash conns_hash; 162 struct min_heap conns_tickable; 163 struct min_heap conns_out; 164 struct eng_hist history; 165 unsigned batch_size; 166 struct attq *attq; 167 /* Track time last time a packet was sent to give new connections 168 * priority lower than that of existing connections. 169 */ 170 lsquic_time_t last_sent; 171 unsigned n_conns; 172 lsquic_time_t deadline; 173 lsquic_time_t resume_sending_at; 174#if LSQUIC_CONN_STATS 175 struct { 176 unsigned conns; 177 } stats; 178 struct conn_stats conn_stats_sum; 179 FILE *stats_fh; 180#endif 181 struct out_batch out_batch; 182}; 183 184 185void 186lsquic_engine_init_settings (struct lsquic_engine_settings *settings, 187 unsigned flags) 188{ 189 memset(settings, 0, sizeof(*settings)); 190 settings->es_versions = LSQUIC_DF_VERSIONS; 191 if (flags & ENG_SERVER) 192 { 193 settings->es_cfcw = LSQUIC_DF_CFCW_SERVER; 194 settings->es_sfcw = LSQUIC_DF_SFCW_SERVER; 195 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER; 196 } 197 else 198 { 199 settings->es_cfcw = LSQUIC_DF_CFCW_CLIENT; 200 settings->es_sfcw = LSQUIC_DF_SFCW_CLIENT; 201 settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT; 202 } 203 settings->es_max_streams_in = LSQUIC_DF_MAX_STREAMS_IN; 204 settings->es_idle_conn_to = LSQUIC_DF_IDLE_CONN_TO; 205 settings->es_handshake_to = LSQUIC_DF_HANDSHAKE_TO; 206 settings->es_silent_close = LSQUIC_DF_SILENT_CLOSE; 207 settings->es_max_header_list_size 208 = LSQUIC_DF_MAX_HEADER_LIST_SIZE; 209 settings->es_ua = LSQUIC_DF_UA; 210 211 settings->es_pdmd = QTAG_X509; 212 settings->es_aead = QTAG_AESG; 213 settings->es_kexs = QTAG_C255; 214 settings->es_support_push = LSQUIC_DF_SUPPORT_PUSH; 215 settings->es_support_tcid0 = LSQUIC_DF_SUPPORT_TCID0; 216 settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; 217 settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; 218 settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; 219 settings->es_rw_once = LSQUIC_DF_RW_ONCE; 220 settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; 221 settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; 222 settings->es_clock_granularity = LSQUIC_DF_CLOCK_GRANULARITY; 223} 224 225 226/* Note: if returning an error, err_buf must be valid if non-NULL */ 227int 228lsquic_engine_check_settings (const struct lsquic_engine_settings *settings, 229 unsigned flags, 230 char *err_buf, size_t err_buf_sz) 231{ 232 if (settings->es_cfcw < LSQUIC_MIN_FCW || 233 settings->es_sfcw < LSQUIC_MIN_FCW) 234 { 235 if (err_buf) 236 snprintf(err_buf, err_buf_sz, "%s", 237 "flow control window set too low"); 238 return -1; 239 } 240 if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS)) 241 { 242 if (err_buf) 243 snprintf(err_buf, err_buf_sz, "%s", 244 "No supported QUIC versions specified"); 245 return -1; 246 } 247 if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS) 248 { 249 if (err_buf) 250 snprintf(err_buf, err_buf_sz, "%s", 251 "one or more unsupported QUIC version is specified"); 252 return -1; 253 } 254 return 0; 255} 256 257 258static void 259free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6) 260{ 261 free(packet_data); 262} 263 264 265static void * 266malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6) 267{ 268 return malloc(size); 269} 270 271 272static const struct lsquic_packout_mem_if stock_pmi = 273{ 274 malloc_buf, free_packet, free_packet, 275}; 276 277 278static int 279hash_conns_by_addr (const struct lsquic_engine *engine) 280{ 281 if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS) 282 return 1; 283 if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS) 284 && engine->pub.enp_settings.es_support_tcid0) 285 return 1; 286 return 0; 287} 288 289 290lsquic_engine_t * 291lsquic_engine_new (unsigned flags, 292 const struct lsquic_engine_api *api) 293{ 294 lsquic_engine_t *engine; 295 char err_buf[100]; 296 297 if (!api->ea_packets_out) 298 { 299 LSQ_ERROR("packets_out callback is not specified"); 300 return NULL; 301 } 302 303 if (api->ea_settings && 304 0 != lsquic_engine_check_settings(api->ea_settings, flags, 305 err_buf, sizeof(err_buf))) 306 { 307 LSQ_ERROR("cannot create engine: %s", err_buf); 308 return NULL; 309 } 310 311 engine = calloc(1, sizeof(*engine)); 312 if (!engine) 313 return NULL; 314 if (0 != lsquic_mm_init(&engine->pub.enp_mm)) 315 { 316 free(engine); 317 return NULL; 318 } 319 if (api->ea_settings) 320 engine->pub.enp_settings = *api->ea_settings; 321 else 322 lsquic_engine_init_settings(&engine->pub.enp_settings, flags); 323 engine->pub.enp_flags = ENPUB_CAN_SEND; 324 325 engine->flags = flags; 326 engine->stream_if = api->ea_stream_if; 327 engine->stream_if_ctx = api->ea_stream_if_ctx; 328 engine->packets_out = api->ea_packets_out; 329 engine->packets_out_ctx = api->ea_packets_out_ctx; 330 if (api->ea_hsi_if) 331 { 332 engine->pub.enp_hsi_if = api->ea_hsi_if; 333 engine->pub.enp_hsi_ctx = api->ea_hsi_ctx; 334 } 335 else 336 { 337 engine->pub.enp_hsi_if = lsquic_http1x_if; 338 engine->pub.enp_hsi_ctx = NULL; 339 } 340 if (api->ea_pmi) 341 { 342 engine->pub.enp_pmi = api->ea_pmi; 343 engine->pub.enp_pmi_ctx = api->ea_pmi_ctx; 344 } 345 else 346 { 347 engine->pub.enp_pmi = &stock_pmi; 348 engine->pub.enp_pmi_ctx = NULL; 349 } 350 engine->pub.enp_verify_cert = api->ea_verify_cert; 351 engine->pub.enp_verify_ctx = api->ea_verify_ctx; 352 engine->pub.enp_engine = engine; 353 conn_hash_init(&engine->conns_hash, 354 hash_conns_by_addr(engine) ? CHF_USE_ADDR : 0); 355 engine->attq = attq_create(); 356 eng_hist_init(&engine->history); 357 engine->batch_size = INITIAL_OUT_BATCH_SIZE; 358 359#if LSQUIC_CONN_STATS 360 engine->stats_fh = api->ea_stats_fh; 361#endif 362 363 LSQ_INFO("instantiated engine"); 364 return engine; 365} 366 367 368static void 369grow_batch_size (struct lsquic_engine *engine) 370{ 371 engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE; 372} 373 374 375static void 376shrink_batch_size (struct lsquic_engine *engine) 377{ 378 engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE; 379} 380 381 382#if LSQUIC_CONN_STATS 383void 384update_stats_sum (struct lsquic_engine *engine, struct lsquic_conn *conn) 385{ 386 unsigned long *const dst = (unsigned long *) &engine->conn_stats_sum; 387 const unsigned long *src; 388 const struct conn_stats *stats; 389 unsigned i; 390 391 if (conn->cn_if->ci_get_stats && (stats = conn->cn_if->ci_get_stats(conn))) 392 { 393 ++engine->stats.conns; 394 src = (unsigned long *) stats; 395 for (i = 0; i < sizeof(*stats) / sizeof(unsigned long); ++i) 396 dst[i] += src[i]; 397 } 398} 399 400 401#endif 402 403 404/* Wrapper to make sure important things occur before the connection is 405 * really destroyed. 406 */ 407static void 408destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) 409{ 410#if LSQUIC_CONN_STATS 411 update_stats_sum(engine, conn); 412#endif 413 --engine->n_conns; 414 conn->cn_flags |= LSCONN_NEVER_TICKABLE; 415 conn->cn_if->ci_destroy(conn); 416} 417 418 419static int 420maybe_grow_conn_heaps (struct lsquic_engine *engine) 421{ 422 struct min_heap_elem *els; 423 unsigned count; 424 425 if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable)) 426 return 0; /* Nothing to do */ 427 428 if (lsquic_mh_nalloc(&engine->conns_tickable)) 429 count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2; 430 else 431 count = 8; 432 433 els = malloc(sizeof(els[0]) * count); 434 if (!els) 435 { 436 LSQ_ERROR("%s: malloc failed", __func__); 437 return -1; 438 } 439 440 LSQ_DEBUG("grew heaps to %u elements", count / 2); 441 memcpy(&els[0], engine->conns_tickable.mh_elems, 442 sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable)); 443 memcpy(&els[count / 2], engine->conns_out.mh_elems, 444 sizeof(els[0]) * lsquic_mh_count(&engine->conns_out)); 445 free(engine->conns_tickable.mh_elems); 446 engine->conns_tickable.mh_elems = els; 447 engine->conns_out.mh_elems = &els[count / 2]; 448 engine->conns_tickable.mh_nalloc = count / 2; 449 engine->conns_out.mh_nalloc = count / 2; 450 return 0; 451} 452 453 454static lsquic_conn_t * 455new_full_conn_client (lsquic_engine_t *engine, const char *hostname, 456 unsigned short max_packet_size, const unsigned char *zero_rtt, 457 size_t zero_rtt_len) 458{ 459 lsquic_conn_t *conn; 460 unsigned flags; 461 if (0 != maybe_grow_conn_heaps(engine)) 462 return NULL; 463 flags = engine->flags & (ENG_SERVER|ENG_HTTP); 464 conn = full_conn_client_new(&engine->pub, engine->stream_if, 465 engine->stream_if_ctx, flags, hostname, 466 max_packet_size, zero_rtt, zero_rtt_len); 467 if (!conn) 468 return NULL; 469 ++engine->n_conns; 470 return conn; 471} 472 473 474static lsquic_conn_t * 475find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 476 struct packin_parse_state *ppstate, const struct sockaddr *sa_local) 477{ 478 lsquic_conn_t *conn; 479 480 if (conn_hash_using_addr(&engine->conns_hash)) 481 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 482 else if (packet_in->pi_flags & PI_CONN_ID) 483 conn = conn_hash_find_by_cid(&engine->conns_hash, 484 packet_in->pi_conn_id); 485 else 486 { 487 LSQ_DEBUG("packet header does not have connection ID: discarding"); 488 return NULL; 489 } 490 491 if (!conn) 492 return NULL; 493 494 conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); 495 if ((packet_in->pi_flags & PI_CONN_ID) 496 && conn->cn_cid != packet_in->pi_conn_id) 497 { 498 LSQ_DEBUG("connection IDs do not match"); 499 return NULL; 500 } 501 502 return conn; 503} 504 505 506#if !defined(NDEBUG) && __GNUC__ 507__attribute__((weak)) 508#endif 509void 510lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub, 511 lsquic_conn_t *conn) 512{ 513 if (0 == (enpub->enp_flags & ENPUB_PROC) && 514 0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE))) 515 { 516 lsquic_engine_t *engine = (lsquic_engine_t *) enpub; 517 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 518 engine_incref_conn(conn, LSCONN_TICKABLE); 519 } 520} 521 522 523void 524lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, 525 lsquic_conn_t *conn, lsquic_time_t tick_time) 526{ 527 lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; 528 if (conn->cn_flags & LSCONN_TICKABLE) 529 { 530 /* Optimization: no need to add the connection to the Advisory Tick 531 * Time Queue: it is about to be ticked, after which it its next tick 532 * time may be queried again. 533 */; 534 } 535 else if (conn->cn_flags & LSCONN_ATTQ) 536 { 537 if (lsquic_conn_adv_time(conn) != tick_time) 538 { 539 attq_remove(engine->attq, conn); 540 if (0 != attq_add(engine->attq, conn, tick_time)) 541 engine_decref_conn(engine, conn, LSCONN_ATTQ); 542 } 543 } 544 else if (0 == attq_add(engine->attq, conn, tick_time)) 545 engine_incref_conn(conn, LSCONN_ATTQ); 546} 547 548 549/* Return 0 if packet is being processed by a connections, otherwise return 1 */ 550static int 551process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, 552 struct packin_parse_state *ppstate, const struct sockaddr *sa_local, 553 const struct sockaddr *sa_peer, void *peer_ctx) 554{ 555 lsquic_conn_t *conn; 556 557 if (lsquic_packet_in_is_gquic_prst(packet_in) 558 && !engine->pub.enp_settings.es_honor_prst) 559 { 560 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 561 LSQ_DEBUG("public reset packet: discarding"); 562 return 1; 563 } 564 565 conn = find_conn(engine, packet_in, ppstate, sa_local); 566 567 if (!conn) 568 { 569 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 570 return 1; 571 } 572 573 if (0 == (conn->cn_flags & LSCONN_TICKABLE)) 574 { 575 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 576 engine_incref_conn(conn, LSCONN_TICKABLE); 577 } 578 lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); 579 lsquic_packet_in_upref(packet_in); 580 conn->cn_peer_ctx = peer_ctx; 581 conn->cn_if->ci_packet_in(conn, packet_in); 582 lsquic_packet_in_put(&engine->pub.enp_mm, packet_in); 583 return 0; 584} 585 586 587void 588lsquic_engine_destroy (lsquic_engine_t *engine) 589{ 590 lsquic_conn_t *conn; 591 592 LSQ_DEBUG("destroying engine"); 593#ifndef NDEBUG 594 engine->flags |= ENG_DTOR; 595#endif 596 597 while ((conn = lsquic_mh_pop(&engine->conns_out))) 598 { 599 assert(conn->cn_flags & LSCONN_HAS_OUTGOING); 600 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 601 } 602 603 while ((conn = lsquic_mh_pop(&engine->conns_tickable))) 604 { 605 assert(conn->cn_flags & LSCONN_TICKABLE); 606 (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE); 607 } 608 609 for (conn = conn_hash_first(&engine->conns_hash); conn; 610 conn = conn_hash_next(&engine->conns_hash)) 611 force_close_conn(engine, conn); 612 conn_hash_cleanup(&engine->conns_hash); 613 614 assert(0 == engine->n_conns); 615 attq_destroy(engine->attq); 616 617 assert(0 == lsquic_mh_count(&engine->conns_out)); 618 assert(0 == lsquic_mh_count(&engine->conns_tickable)); 619 lsquic_mm_cleanup(&engine->pub.enp_mm); 620 free(engine->conns_tickable.mh_elems); 621#if LSQUIC_CONN_STATS 622 if (engine->stats_fh) 623 { 624 const struct conn_stats *const stats = &engine->conn_stats_sum; 625 fprintf(engine->stats_fh, "Aggregate connection stats collected by engine:\n"); 626 fprintf(engine->stats_fh, "Connections: %u\n", engine->stats.conns); 627 fprintf(engine->stats_fh, "Ticks: %lu\n", stats->n_ticks); 628 fprintf(engine->stats_fh, "In:\n"); 629 fprintf(engine->stats_fh, " Total bytes: %lu\n", stats->in.bytes); 630 fprintf(engine->stats_fh, " packets: %lu\n", stats->in.packets); 631 fprintf(engine->stats_fh, " undecryptable packets: %lu\n", stats->in.undec_packets); 632 fprintf(engine->stats_fh, " duplicate packets: %lu\n", stats->in.dup_packets); 633 fprintf(engine->stats_fh, " error packets: %lu\n", stats->in.err_packets); 634 fprintf(engine->stats_fh, " STREAM frame count: %lu\n", stats->in.stream_frames); 635 fprintf(engine->stats_fh, " STREAM payload size: %lu\n", stats->in.stream_data_sz); 636 fprintf(engine->stats_fh, " Header bytes: %lu; uncompressed: %lu; ratio %.3lf\n", 637 stats->in.headers_comp, stats->in.headers_uncomp, 638 stats->in.headers_uncomp ? 639 (double) stats->in.headers_comp / (double) stats->in.headers_uncomp 640 : 0); 641 fprintf(engine->stats_fh, " ACK frames: %lu\n", stats->in.n_acks); 642 fprintf(engine->stats_fh, " ACK frames processed: %lu\n", stats->in.n_acks_proc); 643 fprintf(engine->stats_fh, " ACK frames merged to new: %lu\n", stats->in.n_acks_merged[0]); 644 fprintf(engine->stats_fh, " ACK frames merged to old: %lu\n", stats->in.n_acks_merged[1]); 645 fprintf(engine->stats_fh, "Out:\n"); 646 fprintf(engine->stats_fh, " Total bytes: %lu\n", stats->out.bytes); 647 fprintf(engine->stats_fh, " packets: %lu\n", stats->out.packets); 648 fprintf(engine->stats_fh, " retx packets: %lu\n", stats->out.retx_packets); 649 fprintf(engine->stats_fh, " STREAM frame count: %lu\n", stats->out.stream_frames); 650 fprintf(engine->stats_fh, " STREAM payload size: %lu\n", stats->out.stream_data_sz); 651 fprintf(engine->stats_fh, " Header bytes: %lu; uncompressed: %lu; ratio %.3lf\n", 652 stats->out.headers_comp, stats->out.headers_uncomp, 653 stats->out.headers_uncomp ? 654 (double) stats->out.headers_comp / (double) stats->out.headers_uncomp 655 : 0); 656 fprintf(engine->stats_fh, " ACKs: %lu\n", stats->out.acks); 657 } 658#endif 659 free(engine); 660} 661 662 663lsquic_conn_t * 664lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa, 665 const struct sockaddr *peer_sa, 666 void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, 667 const char *hostname, unsigned short max_packet_size, 668 const unsigned char *zero_rtt, size_t zero_rtt_len) 669{ 670 lsquic_conn_t *conn; 671 ENGINE_IN(engine); 672 673 if (engine->flags & ENG_SERVER) 674 { 675 LSQ_ERROR("`%s' must only be called in client mode", __func__); 676 goto err; 677 } 678 679 if (conn_hash_using_addr(&engine->conns_hash) 680 && conn_hash_find_by_addr(&engine->conns_hash, local_sa)) 681 { 682 LSQ_ERROR("cannot have more than one connection on the same port"); 683 goto err; 684 } 685 686 if (0 == max_packet_size) 687 { 688 switch (peer_sa->sa_family) 689 { 690 case AF_INET: 691 max_packet_size = QUIC_MAX_IPv4_PACKET_SZ; 692 break; 693 default: 694 max_packet_size = QUIC_MAX_IPv6_PACKET_SZ; 695 break; 696 } 697 } 698 699 conn = new_full_conn_client(engine, hostname, max_packet_size, 700 zero_rtt, zero_rtt_len); 701 if (!conn) 702 goto err; 703 lsquic_conn_record_sockaddr(conn, local_sa, peer_sa); 704 if (0 != conn_hash_add(&engine->conns_hash, conn)) 705 { 706 LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", 707 conn->cn_cid); 708 destroy_conn(engine, conn); 709 goto err; 710 } 711 assert(!(conn->cn_flags & 712 (CONN_REF_FLAGS 713 & ~LSCONN_TICKABLE /* This flag may be set as effect of user 714 callbacks */ 715 ))); 716 conn->cn_flags |= LSCONN_HASHED; 717 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 718 engine_incref_conn(conn, LSCONN_TICKABLE); 719 conn->cn_peer_ctx = peer_ctx; 720 lsquic_conn_set_ctx(conn, conn_ctx); 721 full_conn_client_call_on_new(conn); 722 end: 723 ENGINE_OUT(engine); 724 return conn; 725 err: 726 conn = NULL; 727 goto end; 728} 729 730 731static void 732remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) 733{ 734 conn_hash_remove(&engine->conns_hash, conn); 735 (void) engine_decref_conn(engine, conn, LSCONN_HASHED); 736} 737 738 739static void 740refflags2str (enum lsquic_conn_flags flags, char s[6]) 741{ 742 *s = 'C'; s += !!(flags & LSCONN_CLOSING); 743 *s = 'H'; s += !!(flags & LSCONN_HASHED); 744 *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); 745 *s = 'T'; s += !!(flags & LSCONN_TICKABLE); 746 *s = 'A'; s += !!(flags & LSCONN_ATTQ); 747 *s = 'K'; s += !!(flags & LSCONN_TICKED); 748 *s = '\0'; 749} 750 751 752static void 753engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) 754{ 755 char str[2][7]; 756 assert(flag & CONN_REF_FLAGS); 757 assert(!(conn->cn_flags & flag)); 758 conn->cn_flags |= flag; 759 LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 760 (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]), 761 (refflags2str(conn->cn_flags, str[1]), str[1])); 762} 763 764 765static lsquic_conn_t * 766engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, 767 enum lsquic_conn_flags flags) 768{ 769 char str[2][7]; 770 assert(flags & CONN_REF_FLAGS); 771 assert(conn->cn_flags & flags); 772#ifndef NDEBUG 773 if (flags & LSCONN_CLOSING) 774 assert(0 == (conn->cn_flags & LSCONN_HASHED)); 775#endif 776 conn->cn_flags &= ~flags; 777 LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid, 778 (refflags2str(conn->cn_flags | flags, str[0]), str[0]), 779 (refflags2str(conn->cn_flags, str[1]), str[1])); 780 if (0 == (conn->cn_flags & CONN_REF_FLAGS)) 781 { 782 eng_hist_inc(&engine->history, 0, sl_del_full_conns); 783 destroy_conn(engine, conn); 784 return NULL; 785 } 786 else 787 return conn; 788} 789 790 791/* This is not a general-purpose function. Only call from engine dtor. */ 792static void 793force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) 794{ 795 assert(engine->flags & ENG_DTOR); 796 const enum lsquic_conn_flags flags = conn->cn_flags; 797 assert(conn->cn_flags & CONN_REF_FLAGS); 798 assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ 799 assert(!(flags & LSCONN_TICKABLE)); /* Should be removed already */ 800 assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ 801 if (flags & LSCONN_ATTQ) 802 { 803 attq_remove(engine->attq, conn); 804 (void) engine_decref_conn(engine, conn, LSCONN_ATTQ); 805 } 806 if (flags & LSCONN_HASHED) 807 remove_conn_from_hash(engine, conn); 808} 809 810 811/* Iterator for tickable connections (those on the Tickable Queue). Before 812 * a connection is returned, it is removed from the Advisory Tick Time queue 813 * if necessary. 814 */ 815static lsquic_conn_t * 816conn_iter_next_tickable (struct lsquic_engine *engine) 817{ 818 lsquic_conn_t *conn; 819 820 conn = lsquic_mh_pop(&engine->conns_tickable); 821 822 if (conn) 823 conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE); 824 if (conn && (conn->cn_flags & LSCONN_ATTQ)) 825 { 826 attq_remove(engine->attq, conn); 827 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 828 } 829 830 return conn; 831} 832 833 834void 835lsquic_engine_process_conns (lsquic_engine_t *engine) 836{ 837 lsquic_conn_t *conn; 838 lsquic_time_t now; 839 840 ENGINE_IN(engine); 841 842 now = lsquic_time_now(); 843 while ((conn = attq_pop(engine->attq, now))) 844 { 845 conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); 846 if (conn && !(conn->cn_flags & LSCONN_TICKABLE)) 847 { 848 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 849 engine_incref_conn(conn, LSCONN_TICKABLE); 850 } 851 } 852 853 process_connections(engine, conn_iter_next_tickable, now); 854 ENGINE_OUT(engine); 855} 856 857 858static ssize_t 859really_encrypt_packet (const lsquic_conn_t *conn, 860 struct lsquic_packet_out *packet_out, 861 unsigned char *buf, size_t bufsz) 862{ 863 int header_sz, is_hello_packet; 864 enum enc_level enc_level; 865 size_t packet_sz; 866 unsigned char header_buf[QUIC_MAX_PUBHDR_SZ]; 867 868 header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out, 869 header_buf, sizeof(header_buf)); 870 if (header_sz < 0) 871 return -1; 872 873 is_hello_packet = !!(packet_out->po_flags & PO_HELLO); 874 enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session, 875 conn->cn_version, 0, 876 packet_out->po_packno, header_buf, header_sz, 877 packet_out->po_data, packet_out->po_data_sz, 878 buf, bufsz, &packet_sz, is_hello_packet); 879 if ((int) enc_level >= 0) 880 { 881 lsquic_packet_out_set_enc_level(packet_out, enc_level); 882 LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, " 883 "ciphertext is %zd bytes", 884 packet_out->po_packno, 885 conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 886 packet_out->po_data_sz, 887 packet_sz); 888 return packet_sz; 889 } 890 else 891 return -1; 892} 893 894 895static int 896conn_peer_ipv6 (const struct lsquic_conn *conn) 897{ 898 return AF_INET6 == conn->cn_peer_addr_u.sa.sa_family; 899} 900 901 902static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, } 903encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, 904 lsquic_packet_out_t *packet_out) 905{ 906 ssize_t enc_sz; 907 size_t bufsz; 908 unsigned sent_sz; 909 unsigned char *buf; 910 int ipv6; 911 912 bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) + 913 packet_out->po_data_sz + QUIC_PACKET_HASH_SZ; 914 if (bufsz > USHRT_MAX) 915 return ENCPA_BADCRYPT; /* To cause connection to close */ 916 ipv6 = conn_peer_ipv6(conn); 917 buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, 918 conn->cn_peer_ctx, bufsz, ipv6); 919 if (!buf) 920 { 921 LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd", 922 bufsz); 923 return ENCPA_NOMEM; 924 } 925 926 { 927 enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz); 928 sent_sz = enc_sz; 929 } 930 931 if (enc_sz < 0) 932 { 933 engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx, 934 conn->cn_peer_ctx, buf, ipv6); 935 return ENCPA_BADCRYPT; 936 } 937 938 packet_out->po_enc_data = buf; 939 packet_out->po_enc_data_sz = enc_sz; 940 packet_out->po_sent_sz = sent_sz; 941 packet_out->po_flags &= ~PO_IPv6; 942 packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT); 943 944 return ENCPA_OK; 945} 946 947 948static void 949release_or_return_enc_data (struct lsquic_engine *engine, 950 void (*pmi_rel_or_ret) (void *, void *, void *, char), 951 struct lsquic_conn *conn, struct lsquic_packet_out *packet_out) 952{ 953 pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx, 954 packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out)); 955 packet_out->po_flags &= ~PO_ENCRYPTED; 956 packet_out->po_enc_data = NULL; 957} 958 959 960static void 961release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 962 struct lsquic_packet_out *packet_out) 963{ 964 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release, 965 conn, packet_out); 966} 967 968 969static void 970return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn, 971 struct lsquic_packet_out *packet_out) 972{ 973 release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return, 974 conn, packet_out); 975} 976 977 978STAILQ_HEAD(conns_stailq, lsquic_conn); 979TAILQ_HEAD(conns_tailq, lsquic_conn); 980 981 982struct conns_out_iter 983{ 984 struct min_heap *coi_heap; 985 TAILQ_HEAD(, lsquic_conn) coi_active_list, 986 coi_inactive_list; 987 lsquic_conn_t *coi_next; 988#ifndef NDEBUG 989 lsquic_time_t coi_last_sent; 990#endif 991}; 992 993 994static void 995coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine) 996{ 997 iter->coi_heap = &engine->conns_out; 998 iter->coi_next = NULL; 999 TAILQ_INIT(&iter->coi_active_list); 1000 TAILQ_INIT(&iter->coi_inactive_list); 1001#ifndef NDEBUG 1002 iter->coi_last_sent = 0; 1003#endif 1004} 1005 1006 1007static lsquic_conn_t * 1008coi_next (struct conns_out_iter *iter) 1009{ 1010 lsquic_conn_t *conn; 1011 1012 if (lsquic_mh_count(iter->coi_heap) > 0) 1013 { 1014 conn = lsquic_mh_pop(iter->coi_heap); 1015 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1016 conn->cn_flags |= LSCONN_COI_ACTIVE; 1017#ifndef NDEBUG 1018 if (iter->coi_last_sent) 1019 assert(iter->coi_last_sent <= conn->cn_last_sent); 1020 iter->coi_last_sent = conn->cn_last_sent; 1021#endif 1022 return conn; 1023 } 1024 else if (!TAILQ_EMPTY(&iter->coi_active_list)) 1025 { 1026 conn = iter->coi_next; 1027 if (!conn) 1028 conn = TAILQ_FIRST(&iter->coi_active_list); 1029 if (conn) 1030 iter->coi_next = TAILQ_NEXT(conn, cn_next_out); 1031 return conn; 1032 } 1033 else 1034 return NULL; 1035} 1036 1037 1038static void 1039coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1040{ 1041 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1042 { 1043 assert(!TAILQ_EMPTY(&iter->coi_active_list)); 1044 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1045 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1046 TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out); 1047 conn->cn_flags |= LSCONN_COI_INACTIVE; 1048 } 1049} 1050 1051 1052static void 1053coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn) 1054{ 1055 assert(conn->cn_flags & LSCONN_COI_INACTIVE); 1056 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1057 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1058 TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); 1059 conn->cn_flags |= LSCONN_COI_ACTIVE; 1060} 1061 1062 1063static void 1064coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) 1065{ 1066 lsquic_conn_t *conn; 1067 while ((conn = TAILQ_FIRST(&iter->coi_active_list))) 1068 { 1069 TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); 1070 conn->cn_flags &= ~LSCONN_COI_ACTIVE; 1071 lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent); 1072 } 1073 while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) 1074 { 1075 TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out); 1076 conn->cn_flags &= ~LSCONN_COI_INACTIVE; 1077 (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); 1078 } 1079} 1080 1081 1082static unsigned 1083send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter, 1084 struct out_batch *batch, unsigned n_to_send) 1085{ 1086 int n_sent, i; 1087 lsquic_time_t now; 1088 1089 /* Set sent time before the write to avoid underestimating RTT */ 1090 now = lsquic_time_now(); 1091 for (i = 0; i < (int) n_to_send; ++i) 1092 batch->packets[i]->po_sent = now; 1093 n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs, 1094 n_to_send); 1095 if (n_sent < (int) n_to_send) 1096 { 1097 engine->pub.enp_flags &= ~ENPUB_CAN_SEND; 1098 engine->resume_sending_at = now + 1000000; 1099 LSQ_DEBUG("cannot send packets"); 1100 EV_LOG_GENERIC_EVENT("cannot send packets"); 1101 } 1102 if (n_sent >= 0) 1103 LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send); 1104 else 1105 { 1106 LSQ_DEBUG("packets out returned an error: %s", strerror(errno)); 1107 n_sent = 0; 1108 } 1109 if (n_sent > 0) 1110 engine->last_sent = now + n_sent; 1111 for (i = 0; i < n_sent; ++i) 1112 { 1113 eng_hist_inc(&engine->history, now, sl_packets_out); 1114 EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1115 batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i], 1116 batch->packets[i]); 1117 /* `i' is added to maintain relative order */ 1118 batch->conns[i]->cn_last_sent = now + i; 1119 /* Release packet out buffer as soon as the packet is sent 1120 * successfully. If not successfully sent, we hold on to 1121 * this buffer until the packet sending is attempted again 1122 * or until it times out and regenerated. 1123 */ 1124 if (batch->packets[i]->po_flags & PO_ENCRYPTED) 1125 release_enc_data(engine, batch->conns[i], batch->packets[i]); 1126 } 1127 if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT)) 1128 for ( ; i < (int) n_to_send; ++i) 1129 EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]); 1130 /* Return packets to the connection in reverse order so that the packet 1131 * ordering is maintained. 1132 */ 1133 for (i = (int) n_to_send - 1; i >= n_sent; --i) 1134 { 1135 batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i], 1136 batch->packets[i]); 1137 if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT))) 1138 coi_reactivate(conns_iter, batch->conns[i]); 1139 } 1140 return n_sent; 1141} 1142 1143 1144/* Return 1 if went past deadline, 0 otherwise */ 1145static int 1146check_deadline (lsquic_engine_t *engine) 1147{ 1148 if (engine->pub.enp_settings.es_proc_time_thresh && 1149 lsquic_time_now() > engine->deadline) 1150 { 1151 LSQ_INFO("went past threshold of %u usec, stop sending", 1152 engine->pub.enp_settings.es_proc_time_thresh); 1153 engine->flags |= ENG_PAST_DEADLINE; 1154 return 1; 1155 } 1156 else 1157 return 0; 1158} 1159 1160 1161static void 1162send_packets_out (struct lsquic_engine *engine, 1163 struct conns_tailq *ticked_conns, 1164 struct conns_stailq *closed_conns) 1165{ 1166 unsigned n, w, n_sent, n_batches_sent; 1167 lsquic_packet_out_t *packet_out; 1168 lsquic_conn_t *conn; 1169 struct out_batch *const batch = &engine->out_batch; 1170 struct conns_out_iter conns_iter; 1171 int shrink, deadline_exceeded; 1172 1173 coi_init(&conns_iter, engine); 1174 n_batches_sent = 0; 1175 n_sent = 0, n = 0; 1176 shrink = 0; 1177 deadline_exceeded = 0; 1178 1179 while ((conn = coi_next(&conns_iter))) 1180 { 1181 packet_out = conn->cn_if->ci_next_packet_to_send(conn); 1182 if (!packet_out) { 1183 LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64, 1184 conn->cn_cid); 1185 coi_deactivate(&conns_iter, conn); 1186 continue; 1187 } 1188 if ((packet_out->po_flags & PO_ENCRYPTED) 1189 && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn)) 1190 { 1191 /* Peer address changed since the packet was encrypted. Need to 1192 * reallocate. 1193 */ 1194 return_enc_data(engine, conn, packet_out); 1195 } 1196 if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT))) 1197 { 1198 switch (encrypt_packet(engine, conn, packet_out)) 1199 { 1200 case ENCPA_NOMEM: 1201 /* Send what we have and wait for a more opportune moment */ 1202 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1203 goto end_for; 1204 case ENCPA_BADCRYPT: 1205 /* This is pretty bad: close connection immediately */ 1206 conn->cn_if->ci_packet_not_sent(conn, packet_out); 1207 LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid); 1208 if (!(conn->cn_flags & LSCONN_EVANESCENT)) 1209 { 1210 if (!(conn->cn_flags & LSCONN_CLOSING)) 1211 { 1212 STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn); 1213 engine_incref_conn(conn, LSCONN_CLOSING); 1214 if (conn->cn_flags & LSCONN_HASHED) 1215 remove_conn_from_hash(engine, conn); 1216 } 1217 coi_deactivate(&conns_iter, conn); 1218 if (conn->cn_flags & LSCONN_TICKED) 1219 { 1220 TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked); 1221 engine_decref_conn(engine, conn, LSCONN_TICKED); 1222 } 1223 } 1224 continue; 1225 case ENCPA_OK: 1226 break; 1227 } 1228 } 1229 LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64, 1230 packet_out->po_packno, conn->cn_cid); 1231 assert(conn->cn_flags & LSCONN_HAS_PEER_SA); 1232 if (packet_out->po_flags & PO_ENCRYPTED) 1233 { 1234 batch->outs[n].buf = packet_out->po_enc_data; 1235 batch->outs[n].sz = packet_out->po_enc_data_sz; 1236 } 1237 else 1238 { 1239 batch->outs[n].buf = packet_out->po_data; 1240 batch->outs[n].sz = packet_out->po_data_sz; 1241 } 1242 batch->outs [n].peer_ctx = conn->cn_peer_ctx; 1243 batch->outs [n].local_sa = (struct sockaddr *) conn->cn_local_addr; 1244 batch->outs [n].dest_sa = (struct sockaddr *) conn->cn_peer_addr; 1245 batch->conns [n] = conn; 1246 batch->packets[n] = packet_out; 1247 ++n; 1248 if (n == engine->batch_size) 1249 { 1250 n = 0; 1251 w = send_batch(engine, &conns_iter, batch, engine->batch_size); 1252 ++n_batches_sent; 1253 n_sent += w; 1254 if (w < engine->batch_size) 1255 { 1256 shrink = 1; 1257 break; 1258 } 1259 deadline_exceeded = check_deadline(engine); 1260 if (deadline_exceeded) 1261 break; 1262 grow_batch_size(engine); 1263 } 1264 } 1265 end_for: 1266 1267 if (n > 0) { 1268 w = send_batch(engine, &conns_iter, batch, n); 1269 n_sent += w; 1270 shrink = w < n; 1271 ++n_batches_sent; 1272 deadline_exceeded = check_deadline(engine); 1273 } 1274 1275 if (shrink) 1276 shrink_batch_size(engine); 1277 else if (n_batches_sent > 1 && !deadline_exceeded) 1278 grow_batch_size(engine); 1279 1280 coi_reheap(&conns_iter, engine); 1281 1282 LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s"); 1283} 1284 1285 1286int 1287lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) 1288{ 1289 return lsquic_mh_count(&engine->conns_out) > 0 1290 ; 1291} 1292 1293 1294static void 1295reset_deadline (lsquic_engine_t *engine, lsquic_time_t now) 1296{ 1297 engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh; 1298 engine->flags &= ~ENG_PAST_DEADLINE; 1299} 1300 1301 1302/* TODO: this is a user-facing function, account for load */ 1303void 1304lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) 1305{ 1306 lsquic_conn_t *conn; 1307 struct conns_stailq closed_conns; 1308 struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns); 1309 1310 STAILQ_INIT(&closed_conns); 1311 reset_deadline(engine, lsquic_time_now()); 1312 if (!(engine->pub.enp_flags & ENPUB_CAN_SEND)) 1313 { 1314 LSQ_DEBUG("can send again"); 1315 EV_LOG_GENERIC_EVENT("can send again"); 1316 engine->pub.enp_flags |= ENPUB_CAN_SEND; 1317 } 1318 1319 send_packets_out(engine, &ticked_conns, &closed_conns); 1320 1321 while ((conn = STAILQ_FIRST(&closed_conns))) { 1322 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1323 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1324 } 1325 1326} 1327 1328 1329static void 1330process_connections (lsquic_engine_t *engine, conn_iter_f next_conn, 1331 lsquic_time_t now) 1332{ 1333 lsquic_conn_t *conn; 1334 enum tick_st tick_st; 1335 unsigned i; 1336 lsquic_time_t next_tick_time; 1337 struct conns_stailq closed_conns; 1338 struct conns_tailq ticked_conns; 1339 1340 eng_hist_tick(&engine->history, now); 1341 1342 STAILQ_INIT(&closed_conns); 1343 TAILQ_INIT(&ticked_conns); 1344 reset_deadline(engine, now); 1345 1346 if (!(engine->pub.enp_flags & ENPUB_CAN_SEND) 1347 && now > engine->resume_sending_at) 1348 { 1349 LSQ_NOTICE("failsafe activated: resume sending packets again after " 1350 "timeout"); 1351 EV_LOG_GENERIC_EVENT("resume sending packets again after timeout"); 1352 engine->pub.enp_flags |= ENPUB_CAN_SEND; 1353 } 1354 1355 i = 0; 1356 while ((conn = next_conn(engine)) 1357 ) 1358 { 1359 tick_st = conn->cn_if->ci_tick(conn, now); 1360 conn->cn_last_ticked = now + i /* Maintain relative order */ ++; 1361 if (tick_st & TICK_SEND) 1362 { 1363 if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) 1364 { 1365 lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent); 1366 engine_incref_conn(conn, LSCONN_HAS_OUTGOING); 1367 } 1368 } 1369 if (tick_st & TICK_CLOSE) 1370 { 1371 STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn); 1372 engine_incref_conn(conn, LSCONN_CLOSING); 1373 if (conn->cn_flags & LSCONN_HASHED) 1374 remove_conn_from_hash(engine, conn); 1375 } 1376 else 1377 { 1378 TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked); 1379 engine_incref_conn(conn, LSCONN_TICKED); 1380 } 1381 } 1382 1383 if ((engine->pub.enp_flags & ENPUB_CAN_SEND) 1384 && lsquic_engine_has_unsent_packets(engine)) 1385 send_packets_out(engine, &ticked_conns, &closed_conns); 1386 1387 while ((conn = STAILQ_FIRST(&closed_conns))) { 1388 STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn); 1389 (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); 1390 } 1391 1392 /* TODO Heapification can be optimized by switching to the Floyd method: 1393 * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap 1394 */ 1395 while ((conn = TAILQ_FIRST(&ticked_conns))) 1396 { 1397 TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked); 1398 engine_decref_conn(engine, conn, LSCONN_TICKED); 1399 if (!(conn->cn_flags & LSCONN_TICKABLE) 1400 && conn->cn_if->ci_is_tickable(conn)) 1401 { 1402 lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); 1403 engine_incref_conn(conn, LSCONN_TICKABLE); 1404 } 1405 else if (!(conn->cn_flags & LSCONN_ATTQ)) 1406 { 1407 next_tick_time = conn->cn_if->ci_next_tick_time(conn); 1408 if (next_tick_time) 1409 { 1410 if (0 == attq_add(engine->attq, conn, next_tick_time)) 1411 engine_incref_conn(conn, LSCONN_ATTQ); 1412 } 1413 else 1414 assert(0); 1415 } 1416 } 1417 1418} 1419 1420 1421/* Return 0 if packet is being processed by a real connection, 1 if the 1422 * packet was processed, but not by a connection, and -1 on error. 1423 */ 1424int 1425lsquic_engine_packet_in (lsquic_engine_t *engine, 1426 const unsigned char *packet_in_data, size_t packet_in_size, 1427 const struct sockaddr *sa_local, const struct sockaddr *sa_peer, 1428 void *peer_ctx) 1429{ 1430 struct packin_parse_state ppstate; 1431 lsquic_packet_in_t *packet_in; 1432 int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length, 1433 int is_server, struct packin_parse_state *); 1434 1435 if (packet_in_size > QUIC_MAX_PACKET_SZ) 1436 { 1437 LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming " 1438 "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ); 1439 errno = E2BIG; 1440 return -1; 1441 } 1442 1443 if (conn_hash_using_addr(&engine->conns_hash)) 1444 { 1445 const struct lsquic_conn *conn; 1446 conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local); 1447 if (!conn) 1448 return -1; 1449 if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS) 1450 parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin; 1451 else 1452 parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin; 1453 } 1454 else 1455 parse_packet_in_begin = lsquic_parse_packet_in_begin; 1456 1457 packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm); 1458 if (!packet_in) 1459 return -1; 1460 1461 /* Library does not modify packet_in_data, it is not referenced after 1462 * this function returns and subsequent release of pi_data is guarded 1463 * by PI_OWN_DATA flag. 1464 */ 1465 packet_in->pi_data = (unsigned char *) packet_in_data; 1466 if (0 != parse_packet_in_begin(packet_in, packet_in_size, 1467 engine->flags & ENG_SERVER, &ppstate)) 1468 { 1469 LSQ_DEBUG("Cannot parse incoming packet's header"); 1470 lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in); 1471 errno = EINVAL; 1472 return -1; 1473 } 1474 1475 packet_in->pi_received = lsquic_time_now(); 1476 eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in); 1477 return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer, 1478 peer_ctx); 1479} 1480 1481 1482#if __GNUC__ && !defined(NDEBUG) 1483__attribute__((weak)) 1484#endif 1485unsigned 1486lsquic_engine_quic_versions (const lsquic_engine_t *engine) 1487{ 1488 return engine->pub.enp_settings.es_versions; 1489} 1490 1491 1492int 1493lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) 1494{ 1495 const lsquic_time_t *next_attq_time; 1496 lsquic_time_t now, next_time; 1497 1498 if (((engine->flags & ENG_PAST_DEADLINE) 1499 && lsquic_mh_count(&engine->conns_out)) 1500 || lsquic_mh_count(&engine->conns_tickable)) 1501 { 1502 *diff = 0; 1503 return 1; 1504 } 1505 1506 next_attq_time = attq_next_time(engine->attq); 1507 if (engine->pub.enp_flags & ENPUB_CAN_SEND) 1508 { 1509 if (next_attq_time) 1510 next_time = *next_attq_time; 1511 else 1512 return 0; 1513 } 1514 else 1515 { 1516 if (next_attq_time) 1517 next_time = MIN(*next_attq_time, engine->resume_sending_at); 1518 else 1519 next_time = engine->resume_sending_at; 1520 } 1521 1522 now = lsquic_time_now(); 1523 *diff = (int) ((int64_t) next_time - (int64_t) now); 1524 return 1; 1525} 1526 1527 1528unsigned 1529lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now) 1530{ 1531 lsquic_time_t now; 1532 now = lsquic_time_now(); 1533 if (from_now < 0) 1534 now -= from_now; 1535 else 1536 now += from_now; 1537 return attq_count_before(engine->attq, now); 1538} 1539 1540 1541