lsquic_engine.c revision 8ca33e0e
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <limits.h>
10#include <stdint.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <string.h>
14#include <sys/queue.h>
15#include <time.h>
16#ifndef WIN32
17#include <sys/time.h>
18#include <netinet/in.h>
19#include <sys/types.h>
20#include <sys/stat.h>
21#include <fcntl.h>
22#include <unistd.h>
23#include <netdb.h>
24#endif
25
26
27
28#include "lsquic.h"
29#include "lsquic_types.h"
30#include "lsquic_alarmset.h"
31#include "lsquic_parse_common.h"
32#include "lsquic_parse.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_packet_out.h"
35#include "lsquic_senhist.h"
36#include "lsquic_rtt.h"
37#include "lsquic_cubic.h"
38#include "lsquic_pacer.h"
39#include "lsquic_send_ctl.h"
40#include "lsquic_set.h"
41#include "lsquic_conn_flow.h"
42#include "lsquic_sfcw.h"
43#include "lsquic_stream.h"
44#include "lsquic_conn.h"
45#include "lsquic_full_conn.h"
46#include "lsquic_util.h"
47#include "lsquic_qtags.h"
48#include "lsquic_str.h"
49#include "lsquic_handshake.h"
50#include "lsquic_mm.h"
51#include "lsquic_conn_hash.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_eng_hist.h"
54#include "lsquic_ev_log.h"
55#include "lsquic_version.h"
56#include "lsquic_hash.h"
57#include "lsquic_attq.h"
58#include "lsquic_min_heap.h"
59#include "lsquic_http1x_if.h"
60
61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
62#include "lsquic_logger.h"
63
64#define MIN(a, b) ((a) < (b) ? (a) : (b))
65
66
67/* The batch of outgoing packets grows and shrinks dynamically */
68#define MAX_OUT_BATCH_SIZE 1024
69#define MIN_OUT_BATCH_SIZE 4
70#define INITIAL_OUT_BATCH_SIZE 32
71
72struct out_batch
73{
74    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
75    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
76    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
77};
78
79typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
80
81static void
82process_connections (struct lsquic_engine *engine, conn_iter_f iter,
83                     lsquic_time_t now);
84
85static void
86engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
87
88static lsquic_conn_t *
89engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
90                                        enum lsquic_conn_flags flag);
91
92static void
93force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
94
95/* Nested calls to LSQUIC are not supported */
96#define ENGINE_IN(e) do {                               \
97    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
98    (e)->pub.enp_flags |= ENPUB_PROC;                   \
99} while (0)
100
101#define ENGINE_OUT(e) do {                              \
102    assert((e)->pub.enp_flags & ENPUB_PROC);            \
103    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
104} while (0)
105
106/* A connection can be referenced from one of six places:
107 *
108 *   1. Connection hash: a connection starts its life in one of those.
109 *
110 *   2. Outgoing queue.
111 *
112 *   3. Tickable queue
113 *
114 *   4. Advisory Tick Time queue.
115 *
116 *   5. Closing connections queue.  This is a transient queue -- it only
117 *      exists for the duration of process_connections() function call.
118 *
119 *   6. Ticked connections queue.  Another transient queue, similar to (5).
120 *
121 * The idea is to destroy the connection when it is no longer referenced.
122 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
123 * that case, the connection is referenced from two places: (2) and (5).
124 * After its packets are sent, it is only referenced in (5), and at the
125 * end of the function call, when it is removed from (5), reference count
126 * goes to zero and the connection is destroyed.  If not all packets can
127 * be sent, at the end of the function call, the connection is referenced
128 * by (2) and will only be removed once all outgoing packets have been
129 * sent.
130 */
131#define CONN_REF_FLAGS  (LSCONN_HASHED          \
132                        |LSCONN_HAS_OUTGOING    \
133                        |LSCONN_TICKABLE        \
134                        |LSCONN_TICKED          \
135                        |LSCONN_CLOSING         \
136                        |LSCONN_ATTQ)
137
138
139
140
141struct lsquic_engine
142{
143    struct lsquic_engine_public        pub;
144    enum {
145        ENG_SERVER      = LSENG_SERVER,
146        ENG_HTTP        = LSENG_HTTP,
147        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
148        ENG_PAST_DEADLINE
149                        = (1 <<  8),    /* Previous call to a processing
150                                         * function went past time threshold.
151                                         */
152#ifndef NDEBUG
153        ENG_DTOR        = (1 << 26),    /* Engine destructor */
154#endif
155    }                                  flags;
156    const struct lsquic_stream_if     *stream_if;
157    void                              *stream_if_ctx;
158    lsquic_packets_out_f               packets_out;
159    void                              *packets_out_ctx;
160    void                              *bad_handshake_ctx;
161    struct conn_hash                   conns_hash;
162    struct min_heap                    conns_tickable;
163    struct min_heap                    conns_out;
164    struct eng_hist                    history;
165    unsigned                           batch_size;
166    struct attq                       *attq;
167    /* Track time last time a packet was sent to give new connections
168     * priority lower than that of existing connections.
169     */
170    lsquic_time_t                      last_sent;
171    unsigned                           n_conns;
172    lsquic_time_t                      deadline;
173    lsquic_time_t                      resume_sending_at;
174#if LSQUIC_CONN_STATS
175    struct {
176        unsigned                conns;
177    }                                  stats;
178    struct conn_stats                  conn_stats_sum;
179    FILE                              *stats_fh;
180#endif
181    struct out_batch                   out_batch;
182};
183
184
185void
186lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
187                             unsigned flags)
188{
189    memset(settings, 0, sizeof(*settings));
190    settings->es_versions        = LSQUIC_DF_VERSIONS;
191    if (flags & ENG_SERVER)
192    {
193        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
194        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
195        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
196    }
197    else
198    {
199        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
200        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
201        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
202    }
203    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
204    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
205    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
206    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
207    settings->es_max_header_list_size
208                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
209    settings->es_ua              = LSQUIC_DF_UA;
210
211    settings->es_pdmd            = QTAG_X509;
212    settings->es_aead            = QTAG_AESG;
213    settings->es_kexs            = QTAG_C255;
214    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
215    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
216    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
217    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
218    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
219    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
220    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
221    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
222    settings->es_clock_granularity = LSQUIC_DF_CLOCK_GRANULARITY;
223}
224
225
226/* Note: if returning an error, err_buf must be valid if non-NULL */
227int
228lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
229                              unsigned flags,
230                              char *err_buf, size_t err_buf_sz)
231{
232    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
233        settings->es_sfcw < LSQUIC_MIN_FCW)
234    {
235        if (err_buf)
236            snprintf(err_buf, err_buf_sz, "%s",
237                                            "flow control window set too low");
238        return -1;
239    }
240    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
241    {
242        if (err_buf)
243            snprintf(err_buf, err_buf_sz, "%s",
244                        "No supported QUIC versions specified");
245        return -1;
246    }
247    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
248    {
249        if (err_buf)
250            snprintf(err_buf, err_buf_sz, "%s",
251                        "one or more unsupported QUIC version is specified");
252        return -1;
253    }
254    return 0;
255}
256
257
258static void
259free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6)
260{
261    free(packet_data);
262}
263
264
265static void *
266malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6)
267{
268    return malloc(size);
269}
270
271
272static const struct lsquic_packout_mem_if stock_pmi =
273{
274    malloc_buf, free_packet, free_packet,
275};
276
277
278static int
279hash_conns_by_addr (const struct lsquic_engine *engine)
280{
281    if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS)
282        return 1;
283    if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS)
284                                && engine->pub.enp_settings.es_support_tcid0)
285        return 1;
286    return 0;
287}
288
289
290lsquic_engine_t *
291lsquic_engine_new (unsigned flags,
292                   const struct lsquic_engine_api *api)
293{
294    lsquic_engine_t *engine;
295    char err_buf[100];
296
297    if (!api->ea_packets_out)
298    {
299        LSQ_ERROR("packets_out callback is not specified");
300        return NULL;
301    }
302
303    if (api->ea_settings &&
304                0 != lsquic_engine_check_settings(api->ea_settings, flags,
305                                                    err_buf, sizeof(err_buf)))
306    {
307        LSQ_ERROR("cannot create engine: %s", err_buf);
308        return NULL;
309    }
310
311    engine = calloc(1, sizeof(*engine));
312    if (!engine)
313        return NULL;
314    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
315    {
316        free(engine);
317        return NULL;
318    }
319    if (api->ea_settings)
320        engine->pub.enp_settings        = *api->ea_settings;
321    else
322        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
323    engine->pub.enp_flags = ENPUB_CAN_SEND;
324
325    engine->flags           = flags;
326    engine->stream_if       = api->ea_stream_if;
327    engine->stream_if_ctx   = api->ea_stream_if_ctx;
328    engine->packets_out     = api->ea_packets_out;
329    engine->packets_out_ctx = api->ea_packets_out_ctx;
330    if (api->ea_hsi_if)
331    {
332        engine->pub.enp_hsi_if  = api->ea_hsi_if;
333        engine->pub.enp_hsi_ctx = api->ea_hsi_ctx;
334    }
335    else
336    {
337        engine->pub.enp_hsi_if  = lsquic_http1x_if;
338        engine->pub.enp_hsi_ctx = NULL;
339    }
340    if (api->ea_pmi)
341    {
342        engine->pub.enp_pmi      = api->ea_pmi;
343        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
344    }
345    else
346    {
347        engine->pub.enp_pmi      = &stock_pmi;
348        engine->pub.enp_pmi_ctx  = NULL;
349    }
350    engine->pub.enp_verify_cert  = api->ea_verify_cert;
351    engine->pub.enp_verify_ctx   = api->ea_verify_ctx;
352    engine->pub.enp_engine = engine;
353    conn_hash_init(&engine->conns_hash,
354                        hash_conns_by_addr(engine) ?  CHF_USE_ADDR : 0);
355    engine->attq = attq_create();
356    eng_hist_init(&engine->history);
357    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
358
359#if LSQUIC_CONN_STATS
360    engine->stats_fh = api->ea_stats_fh;
361#endif
362
363    LSQ_INFO("instantiated engine");
364    return engine;
365}
366
367
368static void
369grow_batch_size (struct lsquic_engine *engine)
370{
371    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
372}
373
374
375static void
376shrink_batch_size (struct lsquic_engine *engine)
377{
378    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
379}
380
381
382#if LSQUIC_CONN_STATS
383void
384update_stats_sum (struct lsquic_engine *engine, struct lsquic_conn *conn)
385{
386    unsigned long *const dst = (unsigned long *) &engine->conn_stats_sum;
387    const unsigned long *src;
388    const struct conn_stats *stats;
389    unsigned i;
390
391    if (conn->cn_if->ci_get_stats && (stats = conn->cn_if->ci_get_stats(conn)))
392    {
393        ++engine->stats.conns;
394        src = (unsigned long *) stats;
395        for (i = 0; i < sizeof(*stats) / sizeof(unsigned long); ++i)
396            dst[i] += src[i];
397    }
398}
399
400
401#endif
402
403
404/* Wrapper to make sure important things occur before the connection is
405 * really destroyed.
406 */
407static void
408destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
409{
410#if LSQUIC_CONN_STATS
411    update_stats_sum(engine, conn);
412#endif
413    --engine->n_conns;
414    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
415    conn->cn_if->ci_destroy(conn);
416}
417
418
419static int
420maybe_grow_conn_heaps (struct lsquic_engine *engine)
421{
422    struct min_heap_elem *els;
423    unsigned count;
424
425    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
426        return 0;   /* Nothing to do */
427
428    if (lsquic_mh_nalloc(&engine->conns_tickable))
429        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
430    else
431        count = 8;
432
433    els = malloc(sizeof(els[0]) * count);
434    if (!els)
435    {
436        LSQ_ERROR("%s: malloc failed", __func__);
437        return -1;
438    }
439
440    LSQ_DEBUG("grew heaps to %u elements", count / 2);
441    memcpy(&els[0], engine->conns_tickable.mh_elems,
442                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
443    memcpy(&els[count / 2], engine->conns_out.mh_elems,
444                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
445    free(engine->conns_tickable.mh_elems);
446    engine->conns_tickable.mh_elems = els;
447    engine->conns_out.mh_elems = &els[count / 2];
448    engine->conns_tickable.mh_nalloc = count / 2;
449    engine->conns_out.mh_nalloc = count / 2;
450    return 0;
451}
452
453
454static lsquic_conn_t *
455new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
456                      unsigned short max_packet_size, const unsigned char *zero_rtt,
457                                                        size_t zero_rtt_len)
458{
459    lsquic_conn_t *conn;
460    unsigned flags;
461    if (0 != maybe_grow_conn_heaps(engine))
462        return NULL;
463    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
464    conn = full_conn_client_new(&engine->pub, engine->stream_if,
465                                engine->stream_if_ctx, flags, hostname,
466                                max_packet_size, zero_rtt, zero_rtt_len);
467    if (!conn)
468        return NULL;
469    ++engine->n_conns;
470    return conn;
471}
472
473
474static lsquic_conn_t *
475find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
476         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
477{
478    lsquic_conn_t *conn;
479
480    if (conn_hash_using_addr(&engine->conns_hash))
481        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
482    else if (packet_in->pi_flags & PI_CONN_ID)
483        conn = conn_hash_find_by_cid(&engine->conns_hash,
484                                                    packet_in->pi_conn_id);
485    else
486    {
487        LSQ_DEBUG("packet header does not have connection ID: discarding");
488        return NULL;
489    }
490
491    if (!conn)
492        return NULL;
493
494    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
495    if ((packet_in->pi_flags & PI_CONN_ID)
496        && conn->cn_cid != packet_in->pi_conn_id)
497    {
498        LSQ_DEBUG("connection IDs do not match");
499        return NULL;
500    }
501
502    return conn;
503}
504
505
506#if !defined(NDEBUG) && __GNUC__
507__attribute__((weak))
508#endif
509void
510lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
511                                    lsquic_conn_t *conn)
512{
513    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
514        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
515    {
516        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
517        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
518        engine_incref_conn(conn, LSCONN_TICKABLE);
519    }
520}
521
522
523void
524lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
525                                lsquic_conn_t *conn, lsquic_time_t tick_time)
526{
527    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
528    if (conn->cn_flags & LSCONN_TICKABLE)
529    {
530        /* Optimization: no need to add the connection to the Advisory Tick
531         * Time Queue: it is about to be ticked, after which it its next tick
532         * time may be queried again.
533         */;
534    }
535    else if (conn->cn_flags & LSCONN_ATTQ)
536    {
537        if (lsquic_conn_adv_time(conn) != tick_time)
538        {
539            attq_remove(engine->attq, conn);
540            if (0 != attq_add(engine->attq, conn, tick_time))
541                engine_decref_conn(engine, conn, LSCONN_ATTQ);
542        }
543    }
544    else if (0 == attq_add(engine->attq, conn, tick_time))
545        engine_incref_conn(conn, LSCONN_ATTQ);
546}
547
548
549/* Return 0 if packet is being processed by a connections, otherwise return 1 */
550static int
551process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
552       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
553       const struct sockaddr *sa_peer, void *peer_ctx)
554{
555    lsquic_conn_t *conn;
556
557    if (lsquic_packet_in_is_gquic_prst(packet_in)
558                                && !engine->pub.enp_settings.es_honor_prst)
559    {
560        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
561        LSQ_DEBUG("public reset packet: discarding");
562        return 1;
563    }
564
565    conn = find_conn(engine, packet_in, ppstate, sa_local);
566
567    if (!conn)
568    {
569        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
570        return 1;
571    }
572
573    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
574    {
575        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
576        engine_incref_conn(conn, LSCONN_TICKABLE);
577    }
578    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
579    lsquic_packet_in_upref(packet_in);
580    conn->cn_peer_ctx = peer_ctx;
581    conn->cn_if->ci_packet_in(conn, packet_in);
582    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
583    return 0;
584}
585
586
587void
588lsquic_engine_destroy (lsquic_engine_t *engine)
589{
590    lsquic_conn_t *conn;
591
592    LSQ_DEBUG("destroying engine");
593#ifndef NDEBUG
594    engine->flags |= ENG_DTOR;
595#endif
596
597    while ((conn = lsquic_mh_pop(&engine->conns_out)))
598    {
599        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
600        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
601    }
602
603    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
604    {
605        assert(conn->cn_flags & LSCONN_TICKABLE);
606        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
607    }
608
609    for (conn = conn_hash_first(&engine->conns_hash); conn;
610                            conn = conn_hash_next(&engine->conns_hash))
611        force_close_conn(engine, conn);
612    conn_hash_cleanup(&engine->conns_hash);
613
614    assert(0 == engine->n_conns);
615    attq_destroy(engine->attq);
616
617    assert(0 == lsquic_mh_count(&engine->conns_out));
618    assert(0 == lsquic_mh_count(&engine->conns_tickable));
619    lsquic_mm_cleanup(&engine->pub.enp_mm);
620    free(engine->conns_tickable.mh_elems);
621#if LSQUIC_CONN_STATS
622    if (engine->stats_fh)
623    {
624        const struct conn_stats *const stats = &engine->conn_stats_sum;
625        fprintf(engine->stats_fh, "Aggregate connection stats collected by engine:\n");
626        fprintf(engine->stats_fh, "Connections: %u\n", engine->stats.conns);
627        fprintf(engine->stats_fh, "Ticks: %lu\n", stats->n_ticks);
628        fprintf(engine->stats_fh, "In:\n");
629        fprintf(engine->stats_fh, "    Total bytes: %lu\n", stats->in.bytes);
630        fprintf(engine->stats_fh, "    packets: %lu\n", stats->in.packets);
631        fprintf(engine->stats_fh, "    undecryptable packets: %lu\n", stats->in.undec_packets);
632        fprintf(engine->stats_fh, "    duplicate packets: %lu\n", stats->in.dup_packets);
633        fprintf(engine->stats_fh, "    error packets: %lu\n", stats->in.err_packets);
634        fprintf(engine->stats_fh, "    STREAM frame count: %lu\n", stats->in.stream_frames);
635        fprintf(engine->stats_fh, "    STREAM payload size: %lu\n", stats->in.stream_data_sz);
636        fprintf(engine->stats_fh, "    Header bytes: %lu; uncompressed: %lu; ratio %.3lf\n",
637            stats->in.headers_comp, stats->in.headers_uncomp,
638            stats->in.headers_uncomp ?
639            (double) stats->in.headers_comp / (double) stats->in.headers_uncomp
640            : 0);
641        fprintf(engine->stats_fh, "    ACK frames: %lu\n", stats->in.n_acks);
642        fprintf(engine->stats_fh, "    ACK frames processed: %lu\n", stats->in.n_acks_proc);
643        fprintf(engine->stats_fh, "    ACK frames merged to new: %lu\n", stats->in.n_acks_merged[0]);
644        fprintf(engine->stats_fh, "    ACK frames merged to old: %lu\n", stats->in.n_acks_merged[1]);
645        fprintf(engine->stats_fh, "Out:\n");
646        fprintf(engine->stats_fh, "    Total bytes: %lu\n", stats->out.bytes);
647        fprintf(engine->stats_fh, "    packets: %lu\n", stats->out.packets);
648        fprintf(engine->stats_fh, "    retx packets: %lu\n", stats->out.retx_packets);
649        fprintf(engine->stats_fh, "    STREAM frame count: %lu\n", stats->out.stream_frames);
650        fprintf(engine->stats_fh, "    STREAM payload size: %lu\n", stats->out.stream_data_sz);
651        fprintf(engine->stats_fh, "    Header bytes: %lu; uncompressed: %lu; ratio %.3lf\n",
652            stats->out.headers_comp, stats->out.headers_uncomp,
653            stats->out.headers_uncomp ?
654            (double) stats->out.headers_comp / (double) stats->out.headers_uncomp
655            : 0);
656        fprintf(engine->stats_fh, "    ACKs: %lu\n", stats->out.acks);
657    }
658#endif
659    free(engine);
660}
661
662
663lsquic_conn_t *
664lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
665                       const struct sockaddr *peer_sa,
666                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
667                       const char *hostname, unsigned short max_packet_size,
668                       const unsigned char *zero_rtt, size_t zero_rtt_len)
669{
670    lsquic_conn_t *conn;
671    ENGINE_IN(engine);
672
673    if (engine->flags & ENG_SERVER)
674    {
675        LSQ_ERROR("`%s' must only be called in client mode", __func__);
676        goto err;
677    }
678
679    if (conn_hash_using_addr(&engine->conns_hash)
680                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
681    {
682        LSQ_ERROR("cannot have more than one connection on the same port");
683        goto err;
684    }
685
686    if (0 == max_packet_size)
687    {
688        switch (peer_sa->sa_family)
689        {
690        case AF_INET:
691            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
692            break;
693        default:
694            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
695            break;
696        }
697    }
698
699    conn = new_full_conn_client(engine, hostname, max_packet_size,
700                                                    zero_rtt, zero_rtt_len);
701    if (!conn)
702        goto err;
703    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
704    if (0 != conn_hash_add(&engine->conns_hash, conn))
705    {
706        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
707            conn->cn_cid);
708        destroy_conn(engine, conn);
709        goto err;
710    }
711    assert(!(conn->cn_flags &
712        (CONN_REF_FLAGS
713         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
714                                 callbacks */
715                             )));
716    conn->cn_flags |= LSCONN_HASHED;
717    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
718    engine_incref_conn(conn, LSCONN_TICKABLE);
719    conn->cn_peer_ctx = peer_ctx;
720    lsquic_conn_set_ctx(conn, conn_ctx);
721    full_conn_client_call_on_new(conn);
722  end:
723    ENGINE_OUT(engine);
724    return conn;
725  err:
726    conn = NULL;
727    goto end;
728}
729
730
731static void
732remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
733{
734    conn_hash_remove(&engine->conns_hash, conn);
735    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
736}
737
738
739static void
740refflags2str (enum lsquic_conn_flags flags, char s[6])
741{
742    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
743    *s = 'H'; s += !!(flags & LSCONN_HASHED);
744    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
745    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
746    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
747    *s = 'K'; s += !!(flags & LSCONN_TICKED);
748    *s = '\0';
749}
750
751
752static void
753engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
754{
755    char str[2][7];
756    assert(flag & CONN_REF_FLAGS);
757    assert(!(conn->cn_flags & flag));
758    conn->cn_flags |= flag;
759    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
760                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
761                    (refflags2str(conn->cn_flags, str[1]), str[1]));
762}
763
764
765static lsquic_conn_t *
766engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
767                                        enum lsquic_conn_flags flags)
768{
769    char str[2][7];
770    assert(flags & CONN_REF_FLAGS);
771    assert(conn->cn_flags & flags);
772#ifndef NDEBUG
773    if (flags & LSCONN_CLOSING)
774        assert(0 == (conn->cn_flags & LSCONN_HASHED));
775#endif
776    conn->cn_flags &= ~flags;
777    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
778                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
779                    (refflags2str(conn->cn_flags, str[1]), str[1]));
780    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
781    {
782        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
783        destroy_conn(engine, conn);
784        return NULL;
785    }
786    else
787        return conn;
788}
789
790
791/* This is not a general-purpose function.  Only call from engine dtor. */
792static void
793force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
794{
795    assert(engine->flags & ENG_DTOR);
796    const enum lsquic_conn_flags flags = conn->cn_flags;
797    assert(conn->cn_flags & CONN_REF_FLAGS);
798    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
799    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
800    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
801    if (flags & LSCONN_ATTQ)
802    {
803        attq_remove(engine->attq, conn);
804        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
805    }
806    if (flags & LSCONN_HASHED)
807        remove_conn_from_hash(engine, conn);
808}
809
810
811/* Iterator for tickable connections (those on the Tickable Queue).  Before
812 * a connection is returned, it is removed from the Advisory Tick Time queue
813 * if necessary.
814 */
815static lsquic_conn_t *
816conn_iter_next_tickable (struct lsquic_engine *engine)
817{
818    lsquic_conn_t *conn;
819
820    conn = lsquic_mh_pop(&engine->conns_tickable);
821
822    if (conn)
823        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
824    if (conn && (conn->cn_flags & LSCONN_ATTQ))
825    {
826        attq_remove(engine->attq, conn);
827        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
828    }
829
830    return conn;
831}
832
833
834void
835lsquic_engine_process_conns (lsquic_engine_t *engine)
836{
837    lsquic_conn_t *conn;
838    lsquic_time_t now;
839
840    ENGINE_IN(engine);
841
842    now = lsquic_time_now();
843    while ((conn = attq_pop(engine->attq, now)))
844    {
845        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
846        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
847        {
848            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
849            engine_incref_conn(conn, LSCONN_TICKABLE);
850        }
851    }
852
853    process_connections(engine, conn_iter_next_tickable, now);
854    ENGINE_OUT(engine);
855}
856
857
858static ssize_t
859really_encrypt_packet (const lsquic_conn_t *conn,
860                       struct lsquic_packet_out *packet_out,
861                       unsigned char *buf, size_t bufsz)
862{
863    int header_sz, is_hello_packet;
864    enum enc_level enc_level;
865    size_t packet_sz;
866    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
867
868    header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out,
869                                            header_buf, sizeof(header_buf));
870    if (header_sz < 0)
871        return -1;
872
873    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
874    enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session,
875                conn->cn_version, 0,
876                packet_out->po_packno, header_buf, header_sz,
877                packet_out->po_data, packet_out->po_data_sz,
878                buf, bufsz, &packet_sz, is_hello_packet);
879    if ((int) enc_level >= 0)
880    {
881        lsquic_packet_out_set_enc_level(packet_out, enc_level);
882        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, "
883            "ciphertext is %zd bytes",
884            packet_out->po_packno,
885            conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
886                                                packet_out->po_data_sz,
887            packet_sz);
888        return packet_sz;
889    }
890    else
891        return -1;
892}
893
894
895static int
896conn_peer_ipv6 (const struct lsquic_conn *conn)
897{
898    return AF_INET6 == conn->cn_peer_addr_u.sa.sa_family;
899}
900
901
902static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
903encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
904                                            lsquic_packet_out_t *packet_out)
905{
906    ssize_t enc_sz;
907    size_t bufsz;
908    unsigned sent_sz;
909    unsigned char *buf;
910    int ipv6;
911
912    bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
913                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
914    if (bufsz > USHRT_MAX)
915        return ENCPA_BADCRYPT;  /* To cause connection to close */
916    ipv6 = conn_peer_ipv6(conn);
917    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx,
918                                            conn->cn_peer_ctx, bufsz, ipv6);
919    if (!buf)
920    {
921        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
922                                                                        bufsz);
923        return ENCPA_NOMEM;
924    }
925
926    {
927        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
928        sent_sz = enc_sz;
929    }
930
931    if (enc_sz < 0)
932    {
933        engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx,
934                                                conn->cn_peer_ctx, buf, ipv6);
935        return ENCPA_BADCRYPT;
936    }
937
938    packet_out->po_enc_data    = buf;
939    packet_out->po_enc_data_sz = enc_sz;
940    packet_out->po_sent_sz     = sent_sz;
941    packet_out->po_flags &= ~PO_IPv6;
942    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT);
943
944    return ENCPA_OK;
945}
946
947
948static void
949release_or_return_enc_data (struct lsquic_engine *engine,
950                void (*pmi_rel_or_ret) (void *, void *, void *, char),
951                struct lsquic_conn *conn, struct lsquic_packet_out *packet_out)
952{
953    pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx,
954                packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out));
955    packet_out->po_flags &= ~PO_ENCRYPTED;
956    packet_out->po_enc_data = NULL;
957}
958
959
960static void
961release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
962                                        struct lsquic_packet_out *packet_out)
963{
964    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release,
965                                conn, packet_out);
966}
967
968
969static void
970return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
971                                        struct lsquic_packet_out *packet_out)
972{
973    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return,
974                                conn, packet_out);
975}
976
977
978STAILQ_HEAD(conns_stailq, lsquic_conn);
979TAILQ_HEAD(conns_tailq, lsquic_conn);
980
981
982struct conns_out_iter
983{
984    struct min_heap            *coi_heap;
985    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
986                                coi_inactive_list;
987    lsquic_conn_t              *coi_next;
988#ifndef NDEBUG
989    lsquic_time_t               coi_last_sent;
990#endif
991};
992
993
994static void
995coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
996{
997    iter->coi_heap = &engine->conns_out;
998    iter->coi_next = NULL;
999    TAILQ_INIT(&iter->coi_active_list);
1000    TAILQ_INIT(&iter->coi_inactive_list);
1001#ifndef NDEBUG
1002    iter->coi_last_sent = 0;
1003#endif
1004}
1005
1006
1007static lsquic_conn_t *
1008coi_next (struct conns_out_iter *iter)
1009{
1010    lsquic_conn_t *conn;
1011
1012    if (lsquic_mh_count(iter->coi_heap) > 0)
1013    {
1014        conn = lsquic_mh_pop(iter->coi_heap);
1015        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1016        conn->cn_flags |= LSCONN_COI_ACTIVE;
1017#ifndef NDEBUG
1018        if (iter->coi_last_sent)
1019            assert(iter->coi_last_sent <= conn->cn_last_sent);
1020        iter->coi_last_sent = conn->cn_last_sent;
1021#endif
1022        return conn;
1023    }
1024    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1025    {
1026        conn = iter->coi_next;
1027        if (!conn)
1028            conn = TAILQ_FIRST(&iter->coi_active_list);
1029        if (conn)
1030            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1031        return conn;
1032    }
1033    else
1034        return NULL;
1035}
1036
1037
1038static void
1039coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1040{
1041    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1042    {
1043        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1044        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1045        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1046        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1047        conn->cn_flags |= LSCONN_COI_INACTIVE;
1048    }
1049}
1050
1051
1052static void
1053coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1054{
1055    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1056    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1057    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1058    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1059    conn->cn_flags |= LSCONN_COI_ACTIVE;
1060}
1061
1062
1063static void
1064coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1065{
1066    lsquic_conn_t *conn;
1067    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1068    {
1069        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1070        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1071        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
1072    }
1073    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1074    {
1075        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1076        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1077        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1078    }
1079}
1080
1081
1082static unsigned
1083send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1084                  struct out_batch *batch, unsigned n_to_send)
1085{
1086    int n_sent, i;
1087    lsquic_time_t now;
1088
1089    /* Set sent time before the write to avoid underestimating RTT */
1090    now = lsquic_time_now();
1091    for (i = 0; i < (int) n_to_send; ++i)
1092        batch->packets[i]->po_sent = now;
1093    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1094                                                                n_to_send);
1095    if (n_sent < (int) n_to_send)
1096    {
1097        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
1098        engine->resume_sending_at = now + 1000000;
1099        LSQ_DEBUG("cannot send packets");
1100        EV_LOG_GENERIC_EVENT("cannot send packets");
1101    }
1102    if (n_sent >= 0)
1103        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1104    else
1105    {
1106        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1107        n_sent = 0;
1108    }
1109    if (n_sent > 0)
1110        engine->last_sent = now + n_sent;
1111    for (i = 0; i < n_sent; ++i)
1112    {
1113        eng_hist_inc(&engine->history, now, sl_packets_out);
1114        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1115        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1116                                                    batch->packets[i]);
1117        /* `i' is added to maintain relative order */
1118        batch->conns[i]->cn_last_sent = now + i;
1119        /* Release packet out buffer as soon as the packet is sent
1120         * successfully.  If not successfully sent, we hold on to
1121         * this buffer until the packet sending is attempted again
1122         * or until it times out and regenerated.
1123         */
1124        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1125            release_enc_data(engine, batch->conns[i], batch->packets[i]);
1126    }
1127    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1128        for ( ; i < (int) n_to_send; ++i)
1129            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1130    /* Return packets to the connection in reverse order so that the packet
1131     * ordering is maintained.
1132     */
1133    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1134    {
1135        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1136                                                    batch->packets[i]);
1137        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1138            coi_reactivate(conns_iter, batch->conns[i]);
1139    }
1140    return n_sent;
1141}
1142
1143
1144/* Return 1 if went past deadline, 0 otherwise */
1145static int
1146check_deadline (lsquic_engine_t *engine)
1147{
1148    if (engine->pub.enp_settings.es_proc_time_thresh &&
1149                                lsquic_time_now() > engine->deadline)
1150    {
1151        LSQ_INFO("went past threshold of %u usec, stop sending",
1152                            engine->pub.enp_settings.es_proc_time_thresh);
1153        engine->flags |= ENG_PAST_DEADLINE;
1154        return 1;
1155    }
1156    else
1157        return 0;
1158}
1159
1160
1161static void
1162send_packets_out (struct lsquic_engine *engine,
1163                  struct conns_tailq *ticked_conns,
1164                  struct conns_stailq *closed_conns)
1165{
1166    unsigned n, w, n_sent, n_batches_sent;
1167    lsquic_packet_out_t *packet_out;
1168    lsquic_conn_t *conn;
1169    struct out_batch *const batch = &engine->out_batch;
1170    struct conns_out_iter conns_iter;
1171    int shrink, deadline_exceeded;
1172
1173    coi_init(&conns_iter, engine);
1174    n_batches_sent = 0;
1175    n_sent = 0, n = 0;
1176    shrink = 0;
1177    deadline_exceeded = 0;
1178
1179    while ((conn = coi_next(&conns_iter)))
1180    {
1181        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1182        if (!packet_out) {
1183            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1184                                                            conn->cn_cid);
1185            coi_deactivate(&conns_iter, conn);
1186            continue;
1187        }
1188        if ((packet_out->po_flags & PO_ENCRYPTED)
1189                && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn))
1190        {
1191            /* Peer address changed since the packet was encrypted.  Need to
1192             * reallocate.
1193             */
1194            return_enc_data(engine, conn, packet_out);
1195        }
1196        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1197        {
1198            switch (encrypt_packet(engine, conn, packet_out))
1199            {
1200            case ENCPA_NOMEM:
1201                /* Send what we have and wait for a more opportune moment */
1202                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1203                goto end_for;
1204            case ENCPA_BADCRYPT:
1205                /* This is pretty bad: close connection immediately */
1206                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1207                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1208                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1209                {
1210                    if (!(conn->cn_flags & LSCONN_CLOSING))
1211                    {
1212                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1213                        engine_incref_conn(conn, LSCONN_CLOSING);
1214                        if (conn->cn_flags & LSCONN_HASHED)
1215                            remove_conn_from_hash(engine, conn);
1216                    }
1217                    coi_deactivate(&conns_iter, conn);
1218                    if (conn->cn_flags & LSCONN_TICKED)
1219                    {
1220                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1221                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1222                    }
1223                }
1224                continue;
1225            case ENCPA_OK:
1226                break;
1227            }
1228        }
1229        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1230                                        packet_out->po_packno, conn->cn_cid);
1231        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1232        if (packet_out->po_flags & PO_ENCRYPTED)
1233        {
1234            batch->outs[n].buf     = packet_out->po_enc_data;
1235            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1236        }
1237        else
1238        {
1239            batch->outs[n].buf     = packet_out->po_data;
1240            batch->outs[n].sz      = packet_out->po_data_sz;
1241        }
1242        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1243        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1244        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1245        batch->conns  [n]          = conn;
1246        batch->packets[n]          = packet_out;
1247        ++n;
1248        if (n == engine->batch_size)
1249        {
1250            n = 0;
1251            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1252            ++n_batches_sent;
1253            n_sent += w;
1254            if (w < engine->batch_size)
1255            {
1256                shrink = 1;
1257                break;
1258            }
1259            deadline_exceeded = check_deadline(engine);
1260            if (deadline_exceeded)
1261                break;
1262            grow_batch_size(engine);
1263        }
1264    }
1265  end_for:
1266
1267    if (n > 0) {
1268        w = send_batch(engine, &conns_iter, batch, n);
1269        n_sent += w;
1270        shrink = w < n;
1271        ++n_batches_sent;
1272        deadline_exceeded = check_deadline(engine);
1273    }
1274
1275    if (shrink)
1276        shrink_batch_size(engine);
1277    else if (n_batches_sent > 1 && !deadline_exceeded)
1278        grow_batch_size(engine);
1279
1280    coi_reheap(&conns_iter, engine);
1281
1282    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1283}
1284
1285
1286int
1287lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1288{
1289    return lsquic_mh_count(&engine->conns_out) > 0
1290    ;
1291}
1292
1293
1294static void
1295reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1296{
1297    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1298    engine->flags &= ~ENG_PAST_DEADLINE;
1299}
1300
1301
1302/* TODO: this is a user-facing function, account for load */
1303void
1304lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1305{
1306    lsquic_conn_t *conn;
1307    struct conns_stailq closed_conns;
1308    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1309
1310    STAILQ_INIT(&closed_conns);
1311    reset_deadline(engine, lsquic_time_now());
1312    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1313    {
1314        LSQ_DEBUG("can send again");
1315        EV_LOG_GENERIC_EVENT("can send again");
1316        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1317    }
1318
1319    send_packets_out(engine, &ticked_conns, &closed_conns);
1320
1321    while ((conn = STAILQ_FIRST(&closed_conns))) {
1322        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1323        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1324    }
1325
1326}
1327
1328
1329static void
1330process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1331                     lsquic_time_t now)
1332{
1333    lsquic_conn_t *conn;
1334    enum tick_st tick_st;
1335    unsigned i;
1336    lsquic_time_t next_tick_time;
1337    struct conns_stailq closed_conns;
1338    struct conns_tailq ticked_conns;
1339
1340    eng_hist_tick(&engine->history, now);
1341
1342    STAILQ_INIT(&closed_conns);
1343    TAILQ_INIT(&ticked_conns);
1344    reset_deadline(engine, now);
1345
1346    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND)
1347                                        && now > engine->resume_sending_at)
1348    {
1349        LSQ_NOTICE("failsafe activated: resume sending packets again after "
1350                    "timeout");
1351        EV_LOG_GENERIC_EVENT("resume sending packets again after timeout");
1352        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1353    }
1354
1355    i = 0;
1356    while ((conn = next_conn(engine))
1357          )
1358    {
1359        tick_st = conn->cn_if->ci_tick(conn, now);
1360        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1361        if (tick_st & TICK_SEND)
1362        {
1363            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1364            {
1365                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1366                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1367            }
1368        }
1369        if (tick_st & TICK_CLOSE)
1370        {
1371            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1372            engine_incref_conn(conn, LSCONN_CLOSING);
1373            if (conn->cn_flags & LSCONN_HASHED)
1374                remove_conn_from_hash(engine, conn);
1375        }
1376        else
1377        {
1378            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1379            engine_incref_conn(conn, LSCONN_TICKED);
1380        }
1381    }
1382
1383    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1384                        && lsquic_engine_has_unsent_packets(engine))
1385        send_packets_out(engine, &ticked_conns, &closed_conns);
1386
1387    while ((conn = STAILQ_FIRST(&closed_conns))) {
1388        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1389        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1390    }
1391
1392    /* TODO Heapification can be optimized by switching to the Floyd method:
1393     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1394     */
1395    while ((conn = TAILQ_FIRST(&ticked_conns)))
1396    {
1397        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1398        engine_decref_conn(engine, conn, LSCONN_TICKED);
1399        if (!(conn->cn_flags & LSCONN_TICKABLE)
1400            && conn->cn_if->ci_is_tickable(conn))
1401        {
1402            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1403            engine_incref_conn(conn, LSCONN_TICKABLE);
1404        }
1405        else if (!(conn->cn_flags & LSCONN_ATTQ))
1406        {
1407            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1408            if (next_tick_time)
1409            {
1410                if (0 == attq_add(engine->attq, conn, next_tick_time))
1411                    engine_incref_conn(conn, LSCONN_ATTQ);
1412            }
1413            else
1414                assert(0);
1415        }
1416    }
1417
1418}
1419
1420
1421/* Return 0 if packet is being processed by a real connection, 1 if the
1422 * packet was processed, but not by a connection, and -1 on error.
1423 */
1424int
1425lsquic_engine_packet_in (lsquic_engine_t *engine,
1426    const unsigned char *packet_in_data, size_t packet_in_size,
1427    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1428    void *peer_ctx)
1429{
1430    struct packin_parse_state ppstate;
1431    lsquic_packet_in_t *packet_in;
1432    int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length,
1433                                int is_server, struct packin_parse_state *);
1434
1435    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1436    {
1437        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1438            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1439        errno = E2BIG;
1440        return -1;
1441    }
1442
1443    if (conn_hash_using_addr(&engine->conns_hash))
1444    {
1445        const struct lsquic_conn *conn;
1446        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
1447        if (!conn)
1448            return -1;
1449        if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS)
1450            parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin;
1451        else
1452            parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin;
1453    }
1454    else
1455        parse_packet_in_begin = lsquic_parse_packet_in_begin;
1456
1457    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1458    if (!packet_in)
1459        return -1;
1460
1461    /* Library does not modify packet_in_data, it is not referenced after
1462     * this function returns and subsequent release of pi_data is guarded
1463     * by PI_OWN_DATA flag.
1464     */
1465    packet_in->pi_data = (unsigned char *) packet_in_data;
1466    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1467                                        engine->flags & ENG_SERVER, &ppstate))
1468    {
1469        LSQ_DEBUG("Cannot parse incoming packet's header");
1470        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1471        errno = EINVAL;
1472        return -1;
1473    }
1474
1475    packet_in->pi_received = lsquic_time_now();
1476    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1477    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1478                                                                    peer_ctx);
1479}
1480
1481
1482#if __GNUC__ && !defined(NDEBUG)
1483__attribute__((weak))
1484#endif
1485unsigned
1486lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1487{
1488    return engine->pub.enp_settings.es_versions;
1489}
1490
1491
1492int
1493lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1494{
1495    const lsquic_time_t *next_attq_time;
1496    lsquic_time_t now, next_time;
1497
1498    if (((engine->flags & ENG_PAST_DEADLINE)
1499                                    && lsquic_mh_count(&engine->conns_out))
1500        || lsquic_mh_count(&engine->conns_tickable))
1501    {
1502        *diff = 0;
1503        return 1;
1504    }
1505
1506    next_attq_time = attq_next_time(engine->attq);
1507    if (engine->pub.enp_flags & ENPUB_CAN_SEND)
1508    {
1509        if (next_attq_time)
1510            next_time = *next_attq_time;
1511        else
1512            return 0;
1513    }
1514    else
1515    {
1516        if (next_attq_time)
1517            next_time = MIN(*next_attq_time, engine->resume_sending_at);
1518        else
1519            next_time = engine->resume_sending_at;
1520    }
1521
1522    now = lsquic_time_now();
1523    *diff = (int) ((int64_t) next_time - (int64_t) now);
1524    return 1;
1525}
1526
1527
1528unsigned
1529lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1530{
1531    lsquic_time_t now;
1532    now = lsquic_time_now();
1533    if (from_now < 0)
1534        now -= from_now;
1535    else
1536        now += from_now;
1537    return attq_count_before(engine->attq, now);
1538}
1539
1540
1541