lsquic_engine.c revision 8252b0b9
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <limits.h>
10#include <stdint.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <string.h>
14#include <sys/queue.h>
15#include <time.h>
16#ifndef WIN32
17#include <sys/time.h>
18#include <netinet/in.h>
19#include <sys/types.h>
20#include <sys/stat.h>
21#include <fcntl.h>
22#include <unistd.h>
23#include <netdb.h>
24#endif
25
26
27
28#include "lsquic.h"
29#include "lsquic_types.h"
30#include "lsquic_alarmset.h"
31#include "lsquic_parse_common.h"
32#include "lsquic_parse.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_packet_out.h"
35#include "lsquic_senhist.h"
36#include "lsquic_rtt.h"
37#include "lsquic_cubic.h"
38#include "lsquic_pacer.h"
39#include "lsquic_send_ctl.h"
40#include "lsquic_set.h"
41#include "lsquic_conn_flow.h"
42#include "lsquic_sfcw.h"
43#include "lsquic_stream.h"
44#include "lsquic_conn.h"
45#include "lsquic_full_conn.h"
46#include "lsquic_util.h"
47#include "lsquic_qtags.h"
48#include "lsquic_str.h"
49#include "lsquic_handshake.h"
50#include "lsquic_mm.h"
51#include "lsquic_conn_hash.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_eng_hist.h"
54#include "lsquic_ev_log.h"
55#include "lsquic_version.h"
56#include "lsquic_hash.h"
57#include "lsquic_attq.h"
58#include "lsquic_min_heap.h"
59#include "lsquic_http1x_if.h"
60
61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
62#include "lsquic_logger.h"
63
64#define MIN(a, b) ((a) < (b) ? (a) : (b))
65
66
67/* The batch of outgoing packets grows and shrinks dynamically */
68#define MAX_OUT_BATCH_SIZE 1024
69#define MIN_OUT_BATCH_SIZE 256
70#define INITIAL_OUT_BATCH_SIZE 512
71
72struct out_batch
73{
74    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
75    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
76    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
77};
78
79typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
80
81static void
82process_connections (struct lsquic_engine *engine, conn_iter_f iter,
83                     lsquic_time_t now);
84
85static void
86engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
87
88static lsquic_conn_t *
89engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
90                                        enum lsquic_conn_flags flag);
91
92static void
93force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
94
95/* Nested calls to LSQUIC are not supported */
96#define ENGINE_IN(e) do {                               \
97    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
98    (e)->pub.enp_flags |= ENPUB_PROC;                   \
99} while (0)
100
101#define ENGINE_OUT(e) do {                              \
102    assert((e)->pub.enp_flags & ENPUB_PROC);            \
103    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
104} while (0)
105
106/* A connection can be referenced from one of six places:
107 *
108 *   1. Connection hash: a connection starts its life in one of those.
109 *
110 *   2. Outgoing queue.
111 *
112 *   3. Tickable queue
113 *
114 *   4. Advisory Tick Time queue.
115 *
116 *   5. Closing connections queue.  This is a transient queue -- it only
117 *      exists for the duration of process_connections() function call.
118 *
119 *   6. Ticked connections queue.  Another transient queue, similar to (5).
120 *
121 * The idea is to destroy the connection when it is no longer referenced.
122 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
123 * that case, the connection is referenced from two places: (2) and (5).
124 * After its packets are sent, it is only referenced in (5), and at the
125 * end of the function call, when it is removed from (5), reference count
126 * goes to zero and the connection is destroyed.  If not all packets can
127 * be sent, at the end of the function call, the connection is referenced
128 * by (2) and will only be removed once all outgoing packets have been
129 * sent.
130 */
131#define CONN_REF_FLAGS  (LSCONN_HASHED          \
132                        |LSCONN_HAS_OUTGOING    \
133                        |LSCONN_TICKABLE        \
134                        |LSCONN_TICKED          \
135                        |LSCONN_CLOSING         \
136                        |LSCONN_ATTQ)
137
138
139
140
141struct lsquic_engine
142{
143    struct lsquic_engine_public        pub;
144    enum {
145        ENG_SERVER      = LSENG_SERVER,
146        ENG_HTTP        = LSENG_HTTP,
147        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
148        ENG_PAST_DEADLINE
149                        = (1 <<  8),    /* Previous call to a processing
150                                         * function went past time threshold.
151                                         */
152#ifndef NDEBUG
153        ENG_DTOR        = (1 << 26),    /* Engine destructor */
154#endif
155    }                                  flags;
156    const struct lsquic_stream_if     *stream_if;
157    void                              *stream_if_ctx;
158    lsquic_packets_out_f               packets_out;
159    void                              *packets_out_ctx;
160    void                              *bad_handshake_ctx;
161    struct conn_hash                   conns_hash;
162    struct min_heap                    conns_tickable;
163    struct min_heap                    conns_out;
164    struct eng_hist                    history;
165    unsigned                           batch_size;
166    struct attq                       *attq;
167    /* Track time last time a packet was sent to give new connections
168     * priority lower than that of existing connections.
169     */
170    lsquic_time_t                      last_sent;
171    unsigned                           n_conns;
172    lsquic_time_t                      deadline;
173    lsquic_time_t                      resume_sending_at;
174    struct out_batch                   out_batch;
175};
176
177
178void
179lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
180                             unsigned flags)
181{
182    memset(settings, 0, sizeof(*settings));
183    settings->es_versions        = LSQUIC_DF_VERSIONS;
184    if (flags & ENG_SERVER)
185    {
186        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
187        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
188        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
189    }
190    else
191    {
192        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
193        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
194        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
195    }
196    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
197    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
198    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
199    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
200    settings->es_max_header_list_size
201                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
202    settings->es_ua              = LSQUIC_DF_UA;
203
204    settings->es_pdmd            = QTAG_X509;
205    settings->es_aead            = QTAG_AESG;
206    settings->es_kexs            = QTAG_C255;
207    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
208    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
209    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
210    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
211    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
212    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
213    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
214    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
215}
216
217
218/* Note: if returning an error, err_buf must be valid if non-NULL */
219int
220lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
221                              unsigned flags,
222                              char *err_buf, size_t err_buf_sz)
223{
224    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
225        settings->es_sfcw < LSQUIC_MIN_FCW)
226    {
227        if (err_buf)
228            snprintf(err_buf, err_buf_sz, "%s",
229                                            "flow control window set too low");
230        return -1;
231    }
232    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
233    {
234        if (err_buf)
235            snprintf(err_buf, err_buf_sz, "%s",
236                        "No supported QUIC versions specified");
237        return -1;
238    }
239    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
240    {
241        if (err_buf)
242            snprintf(err_buf, err_buf_sz, "%s",
243                        "one or more unsupported QUIC version is specified");
244        return -1;
245    }
246    return 0;
247}
248
249
250static void
251free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6)
252{
253    free(packet_data);
254}
255
256
257static void *
258malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6)
259{
260    return malloc(size);
261}
262
263
264static const struct lsquic_packout_mem_if stock_pmi =
265{
266    malloc_buf, free_packet, free_packet,
267};
268
269
270static int
271hash_conns_by_addr (const struct lsquic_engine *engine)
272{
273    if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS)
274        return 1;
275    if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS)
276                                && engine->pub.enp_settings.es_support_tcid0)
277        return 1;
278    return 0;
279}
280
281
282lsquic_engine_t *
283lsquic_engine_new (unsigned flags,
284                   const struct lsquic_engine_api *api)
285{
286    lsquic_engine_t *engine;
287    int tag_buf_len;
288    char err_buf[100];
289
290    if (!api->ea_packets_out)
291    {
292        LSQ_ERROR("packets_out callback is not specified");
293        return NULL;
294    }
295
296    if (api->ea_settings &&
297                0 != lsquic_engine_check_settings(api->ea_settings, flags,
298                                                    err_buf, sizeof(err_buf)))
299    {
300        LSQ_ERROR("cannot create engine: %s", err_buf);
301        return NULL;
302    }
303
304    engine = calloc(1, sizeof(*engine));
305    if (!engine)
306        return NULL;
307    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
308    {
309        free(engine);
310        return NULL;
311    }
312    if (api->ea_settings)
313        engine->pub.enp_settings        = *api->ea_settings;
314    else
315        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
316    tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf,
317                                    sizeof(engine->pub.enp_ver_tags_buf),
318                                    engine->pub.enp_settings.es_versions);
319    if (tag_buf_len <= 0)
320    {
321        LSQ_ERROR("cannot generate version tags buffer");
322        free(engine);
323        return NULL;
324    }
325    engine->pub.enp_ver_tags_len = tag_buf_len;
326    engine->pub.enp_flags = ENPUB_CAN_SEND;
327
328    engine->flags           = flags;
329    engine->stream_if       = api->ea_stream_if;
330    engine->stream_if_ctx   = api->ea_stream_if_ctx;
331    engine->packets_out     = api->ea_packets_out;
332    engine->packets_out_ctx = api->ea_packets_out_ctx;
333    if (api->ea_hsi_if)
334    {
335        engine->pub.enp_hsi_if  = api->ea_hsi_if;
336        engine->pub.enp_hsi_ctx = api->ea_hsi_ctx;
337    }
338    else
339    {
340        engine->pub.enp_hsi_if  = lsquic_http1x_if;
341        engine->pub.enp_hsi_ctx = NULL;
342    }
343    if (api->ea_pmi)
344    {
345        engine->pub.enp_pmi      = api->ea_pmi;
346        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
347    }
348    else
349    {
350        engine->pub.enp_pmi      = &stock_pmi;
351        engine->pub.enp_pmi_ctx  = NULL;
352    }
353    engine->pub.enp_verify_cert  = api->ea_verify_cert;
354    engine->pub.enp_verify_ctx   = api->ea_verify_ctx;
355    engine->pub.enp_engine = engine;
356    conn_hash_init(&engine->conns_hash,
357                        hash_conns_by_addr(engine) ?  CHF_USE_ADDR : 0);
358    engine->attq = attq_create();
359    eng_hist_init(&engine->history);
360    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
361
362
363    LSQ_INFO("instantiated engine");
364    return engine;
365}
366
367
368static void
369grow_batch_size (struct lsquic_engine *engine)
370{
371    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
372}
373
374
375static void
376shrink_batch_size (struct lsquic_engine *engine)
377{
378    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
379}
380
381
382/* Wrapper to make sure important things occur before the connection is
383 * really destroyed.
384 */
385static void
386destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
387{
388    --engine->n_conns;
389    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
390    conn->cn_if->ci_destroy(conn);
391}
392
393
394static int
395maybe_grow_conn_heaps (struct lsquic_engine *engine)
396{
397    struct min_heap_elem *els;
398    unsigned count;
399
400    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
401        return 0;   /* Nothing to do */
402
403    if (lsquic_mh_nalloc(&engine->conns_tickable))
404        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
405    else
406        count = 8;
407
408    els = malloc(sizeof(els[0]) * count);
409    if (!els)
410    {
411        LSQ_ERROR("%s: malloc failed", __func__);
412        return -1;
413    }
414
415    LSQ_DEBUG("grew heaps to %u elements", count / 2);
416    memcpy(&els[0], engine->conns_tickable.mh_elems,
417                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
418    memcpy(&els[count / 2], engine->conns_out.mh_elems,
419                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
420    free(engine->conns_tickable.mh_elems);
421    engine->conns_tickable.mh_elems = els;
422    engine->conns_out.mh_elems = &els[count / 2];
423    engine->conns_tickable.mh_nalloc = count / 2;
424    engine->conns_out.mh_nalloc = count / 2;
425    return 0;
426}
427
428
429static lsquic_conn_t *
430new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
431                      unsigned short max_packet_size)
432{
433    lsquic_conn_t *conn;
434    unsigned flags;
435    if (0 != maybe_grow_conn_heaps(engine))
436        return NULL;
437    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
438    conn = full_conn_client_new(&engine->pub, engine->stream_if,
439                    engine->stream_if_ctx, flags, hostname, max_packet_size);
440    if (!conn)
441        return NULL;
442    ++engine->n_conns;
443    return conn;
444}
445
446
447static lsquic_conn_t *
448find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
449         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
450{
451    lsquic_conn_t *conn;
452
453    if (conn_hash_using_addr(&engine->conns_hash))
454        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
455    else if (packet_in->pi_flags & PI_CONN_ID)
456        conn = conn_hash_find_by_cid(&engine->conns_hash,
457                                                    packet_in->pi_conn_id);
458    else
459    {
460        LSQ_DEBUG("packet header does not have connection ID: discarding");
461        return NULL;
462    }
463
464    if (!conn)
465        return NULL;
466
467    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
468    if ((packet_in->pi_flags & PI_CONN_ID)
469        && conn->cn_cid != packet_in->pi_conn_id)
470    {
471        LSQ_DEBUG("connection IDs do not match");
472        return NULL;
473    }
474
475    return conn;
476}
477
478
479#if !defined(NDEBUG) && __GNUC__
480__attribute__((weak))
481#endif
482void
483lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
484                                    lsquic_conn_t *conn)
485{
486    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
487        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
488    {
489        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
490        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
491        engine_incref_conn(conn, LSCONN_TICKABLE);
492    }
493}
494
495
496void
497lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
498                                lsquic_conn_t *conn, lsquic_time_t tick_time)
499{
500    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
501    if (conn->cn_flags & LSCONN_TICKABLE)
502    {
503        /* Optimization: no need to add the connection to the Advisory Tick
504         * Time Queue: it is about to be ticked, after which it its next tick
505         * time may be queried again.
506         */;
507    }
508    else if (conn->cn_flags & LSCONN_ATTQ)
509    {
510        if (lsquic_conn_adv_time(conn) != tick_time)
511        {
512            attq_remove(engine->attq, conn);
513            if (0 != attq_add(engine->attq, conn, tick_time))
514                engine_decref_conn(engine, conn, LSCONN_ATTQ);
515        }
516    }
517    else if (0 == attq_add(engine->attq, conn, tick_time))
518        engine_incref_conn(conn, LSCONN_ATTQ);
519}
520
521
522/* Return 0 if packet is being processed by a connections, otherwise return 1 */
523static int
524process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
525       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
526       const struct sockaddr *sa_peer, void *peer_ctx)
527{
528    lsquic_conn_t *conn;
529
530    if (lsquic_packet_in_is_gquic_prst(packet_in)
531                                && !engine->pub.enp_settings.es_honor_prst)
532    {
533        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
534        LSQ_DEBUG("public reset packet: discarding");
535        return 1;
536    }
537
538    conn = find_conn(engine, packet_in, ppstate, sa_local);
539
540    if (!conn)
541    {
542        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
543        return 1;
544    }
545
546    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
547    {
548        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
549        engine_incref_conn(conn, LSCONN_TICKABLE);
550    }
551    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
552    lsquic_packet_in_upref(packet_in);
553    conn->cn_peer_ctx = peer_ctx;
554    conn->cn_if->ci_packet_in(conn, packet_in);
555    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
556    return 0;
557}
558
559
560void
561lsquic_engine_destroy (lsquic_engine_t *engine)
562{
563    lsquic_conn_t *conn;
564
565    LSQ_DEBUG("destroying engine");
566#ifndef NDEBUG
567    engine->flags |= ENG_DTOR;
568#endif
569
570    while ((conn = lsquic_mh_pop(&engine->conns_out)))
571    {
572        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
573        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
574    }
575
576    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
577    {
578        assert(conn->cn_flags & LSCONN_TICKABLE);
579        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
580    }
581
582    for (conn = conn_hash_first(&engine->conns_hash); conn;
583                            conn = conn_hash_next(&engine->conns_hash))
584        force_close_conn(engine, conn);
585    conn_hash_cleanup(&engine->conns_hash);
586
587    assert(0 == engine->n_conns);
588    attq_destroy(engine->attq);
589
590    assert(0 == lsquic_mh_count(&engine->conns_out));
591    assert(0 == lsquic_mh_count(&engine->conns_tickable));
592    lsquic_mm_cleanup(&engine->pub.enp_mm);
593    free(engine->conns_tickable.mh_elems);
594    free(engine);
595}
596
597
598lsquic_conn_t *
599lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
600                       const struct sockaddr *peer_sa,
601                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
602                       const char *hostname, unsigned short max_packet_size)
603{
604    lsquic_conn_t *conn;
605    ENGINE_IN(engine);
606
607    if (engine->flags & ENG_SERVER)
608    {
609        LSQ_ERROR("`%s' must only be called in client mode", __func__);
610        goto err;
611    }
612
613    if (conn_hash_using_addr(&engine->conns_hash)
614                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
615    {
616        LSQ_ERROR("cannot have more than one connection on the same port");
617        goto err;
618    }
619
620    if (0 == max_packet_size)
621    {
622        switch (peer_sa->sa_family)
623        {
624        case AF_INET:
625            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
626            break;
627        default:
628            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
629            break;
630        }
631    }
632
633    conn = new_full_conn_client(engine, hostname, max_packet_size);
634    if (!conn)
635        goto err;
636    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
637    if (0 != conn_hash_add(&engine->conns_hash, conn))
638    {
639        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
640            conn->cn_cid);
641        destroy_conn(engine, conn);
642        goto err;
643    }
644    assert(!(conn->cn_flags &
645        (CONN_REF_FLAGS
646         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
647                                 callbacks */
648                             )));
649    conn->cn_flags |= LSCONN_HASHED;
650    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
651    engine_incref_conn(conn, LSCONN_TICKABLE);
652    conn->cn_peer_ctx = peer_ctx;
653    lsquic_conn_set_ctx(conn, conn_ctx);
654    full_conn_client_call_on_new(conn);
655  end:
656    ENGINE_OUT(engine);
657    return conn;
658  err:
659    conn = NULL;
660    goto end;
661}
662
663
664static void
665remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
666{
667    conn_hash_remove(&engine->conns_hash, conn);
668    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
669}
670
671
672static void
673refflags2str (enum lsquic_conn_flags flags, char s[6])
674{
675    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
676    *s = 'H'; s += !!(flags & LSCONN_HASHED);
677    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
678    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
679    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
680    *s = 'K'; s += !!(flags & LSCONN_TICKED);
681    *s = '\0';
682}
683
684
685static void
686engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
687{
688    char str[2][7];
689    assert(flag & CONN_REF_FLAGS);
690    assert(!(conn->cn_flags & flag));
691    conn->cn_flags |= flag;
692    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
693                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
694                    (refflags2str(conn->cn_flags, str[1]), str[1]));
695}
696
697
698static lsquic_conn_t *
699engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
700                                        enum lsquic_conn_flags flags)
701{
702    char str[2][7];
703    assert(flags & CONN_REF_FLAGS);
704    assert(conn->cn_flags & flags);
705#ifndef NDEBUG
706    if (flags & LSCONN_CLOSING)
707        assert(0 == (conn->cn_flags & LSCONN_HASHED));
708#endif
709    conn->cn_flags &= ~flags;
710    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
711                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
712                    (refflags2str(conn->cn_flags, str[1]), str[1]));
713    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
714    {
715        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
716        destroy_conn(engine, conn);
717        return NULL;
718    }
719    else
720        return conn;
721}
722
723
724/* This is not a general-purpose function.  Only call from engine dtor. */
725static void
726force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
727{
728    assert(engine->flags & ENG_DTOR);
729    const enum lsquic_conn_flags flags = conn->cn_flags;
730    assert(conn->cn_flags & CONN_REF_FLAGS);
731    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
732    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
733    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
734    if (flags & LSCONN_ATTQ)
735    {
736        attq_remove(engine->attq, conn);
737        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
738    }
739    if (flags & LSCONN_HASHED)
740        remove_conn_from_hash(engine, conn);
741}
742
743
744/* Iterator for tickable connections (those on the Tickable Queue).  Before
745 * a connection is returned, it is removed from the Advisory Tick Time queue
746 * if necessary.
747 */
748static lsquic_conn_t *
749conn_iter_next_tickable (struct lsquic_engine *engine)
750{
751    lsquic_conn_t *conn;
752
753    conn = lsquic_mh_pop(&engine->conns_tickable);
754
755    if (conn)
756        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
757    if (conn && (conn->cn_flags & LSCONN_ATTQ))
758    {
759        attq_remove(engine->attq, conn);
760        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
761    }
762
763    return conn;
764}
765
766
767void
768lsquic_engine_process_conns (lsquic_engine_t *engine)
769{
770    lsquic_conn_t *conn;
771    lsquic_time_t now;
772
773    ENGINE_IN(engine);
774
775    now = lsquic_time_now();
776    while ((conn = attq_pop(engine->attq, now)))
777    {
778        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
779        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
780        {
781            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
782            engine_incref_conn(conn, LSCONN_TICKABLE);
783        }
784    }
785
786    process_connections(engine, conn_iter_next_tickable, now);
787    ENGINE_OUT(engine);
788}
789
790
791static ssize_t
792really_encrypt_packet (const lsquic_conn_t *conn,
793                       struct lsquic_packet_out *packet_out,
794                       unsigned char *buf, size_t bufsz)
795{
796    int header_sz, is_hello_packet;
797    enum enc_level enc_level;
798    size_t packet_sz;
799    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
800
801    header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out,
802                                            header_buf, sizeof(header_buf));
803    if (header_sz < 0)
804        return -1;
805
806    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
807    enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session,
808                conn->cn_version, 0,
809                packet_out->po_packno, header_buf, header_sz,
810                packet_out->po_data, packet_out->po_data_sz,
811                buf, bufsz, &packet_sz, is_hello_packet);
812    if ((int) enc_level >= 0)
813    {
814        lsquic_packet_out_set_enc_level(packet_out, enc_level);
815        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, "
816            "ciphertext is %zd bytes",
817            packet_out->po_packno,
818            conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
819                                                packet_out->po_data_sz,
820            packet_sz);
821        return packet_sz;
822    }
823    else
824        return -1;
825}
826
827
828static int
829conn_peer_ipv6 (const struct lsquic_conn *conn)
830{
831    return AF_INET6 == ((struct sockaddr *) conn->cn_peer_addr)->sa_family;
832}
833
834
835static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
836encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
837                                            lsquic_packet_out_t *packet_out)
838{
839    ssize_t enc_sz;
840    size_t bufsz;
841    unsigned sent_sz;
842    unsigned char *buf;
843    int ipv6;
844
845    bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
846                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
847    if (bufsz > USHRT_MAX)
848        return ENCPA_BADCRYPT;  /* To cause connection to close */
849    ipv6 = conn_peer_ipv6(conn);
850    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx,
851                                            conn->cn_peer_ctx, bufsz, ipv6);
852    if (!buf)
853    {
854        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
855                                                                        bufsz);
856        return ENCPA_NOMEM;
857    }
858
859    {
860        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
861        sent_sz = enc_sz;
862    }
863
864    if (enc_sz < 0)
865    {
866        engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx,
867                                                conn->cn_peer_ctx, buf, ipv6);
868        return ENCPA_BADCRYPT;
869    }
870
871    packet_out->po_enc_data    = buf;
872    packet_out->po_enc_data_sz = enc_sz;
873    packet_out->po_sent_sz     = sent_sz;
874    packet_out->po_flags &= ~PO_IPv6;
875    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT);
876
877    return ENCPA_OK;
878}
879
880
881static void
882release_or_return_enc_data (struct lsquic_engine *engine,
883                void (*pmi_rel_or_ret) (void *, void *, void *, char),
884                struct lsquic_conn *conn, struct lsquic_packet_out *packet_out)
885{
886    pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx,
887                packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out));
888    packet_out->po_flags &= ~PO_ENCRYPTED;
889    packet_out->po_enc_data = NULL;
890}
891
892
893static void
894release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
895                                        struct lsquic_packet_out *packet_out)
896{
897    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release,
898                                conn, packet_out);
899}
900
901
902static void
903return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
904                                        struct lsquic_packet_out *packet_out)
905{
906    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return,
907                                conn, packet_out);
908}
909
910
911STAILQ_HEAD(conns_stailq, lsquic_conn);
912TAILQ_HEAD(conns_tailq, lsquic_conn);
913
914
915struct conns_out_iter
916{
917    struct min_heap            *coi_heap;
918    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
919                                coi_inactive_list;
920    lsquic_conn_t              *coi_next;
921#ifndef NDEBUG
922    lsquic_time_t               coi_last_sent;
923#endif
924};
925
926
927static void
928coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
929{
930    iter->coi_heap = &engine->conns_out;
931    iter->coi_next = NULL;
932    TAILQ_INIT(&iter->coi_active_list);
933    TAILQ_INIT(&iter->coi_inactive_list);
934#ifndef NDEBUG
935    iter->coi_last_sent = 0;
936#endif
937}
938
939
940static lsquic_conn_t *
941coi_next (struct conns_out_iter *iter)
942{
943    lsquic_conn_t *conn;
944
945    if (lsquic_mh_count(iter->coi_heap) > 0)
946    {
947        conn = lsquic_mh_pop(iter->coi_heap);
948        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
949        conn->cn_flags |= LSCONN_COI_ACTIVE;
950#ifndef NDEBUG
951        if (iter->coi_last_sent)
952            assert(iter->coi_last_sent <= conn->cn_last_sent);
953        iter->coi_last_sent = conn->cn_last_sent;
954#endif
955        return conn;
956    }
957    else if (!TAILQ_EMPTY(&iter->coi_active_list))
958    {
959        conn = iter->coi_next;
960        if (!conn)
961            conn = TAILQ_FIRST(&iter->coi_active_list);
962        if (conn)
963            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
964        return conn;
965    }
966    else
967        return NULL;
968}
969
970
971static void
972coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
973{
974    if (!(conn->cn_flags & LSCONN_EVANESCENT))
975    {
976        assert(!TAILQ_EMPTY(&iter->coi_active_list));
977        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
978        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
979        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
980        conn->cn_flags |= LSCONN_COI_INACTIVE;
981    }
982}
983
984
985static void
986coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
987{
988    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
989    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
990    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
991    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
992    conn->cn_flags |= LSCONN_COI_ACTIVE;
993}
994
995
996static void
997coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
998{
999    lsquic_conn_t *conn;
1000    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1001    {
1002        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1003        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1004        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
1005    }
1006    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1007    {
1008        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1009        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1010        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1011    }
1012}
1013
1014
1015static unsigned
1016send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1017                  struct out_batch *batch, unsigned n_to_send)
1018{
1019    int n_sent, i;
1020    lsquic_time_t now;
1021
1022    /* Set sent time before the write to avoid underestimating RTT */
1023    now = lsquic_time_now();
1024    for (i = 0; i < (int) n_to_send; ++i)
1025        batch->packets[i]->po_sent = now;
1026    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1027                                                                n_to_send);
1028    if (n_sent < (int) n_to_send)
1029    {
1030        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
1031        engine->resume_sending_at = now + 1000000;
1032        LSQ_DEBUG("cannot send packets");
1033        EV_LOG_GENERIC_EVENT("cannot send packets");
1034    }
1035    if (n_sent >= 0)
1036        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1037    else
1038    {
1039        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1040        n_sent = 0;
1041    }
1042    if (n_sent > 0)
1043        engine->last_sent = now + n_sent;
1044    for (i = 0; i < n_sent; ++i)
1045    {
1046        eng_hist_inc(&engine->history, now, sl_packets_out);
1047        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1048        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1049                                                    batch->packets[i]);
1050        /* `i' is added to maintain relative order */
1051        batch->conns[i]->cn_last_sent = now + i;
1052        /* Release packet out buffer as soon as the packet is sent
1053         * successfully.  If not successfully sent, we hold on to
1054         * this buffer until the packet sending is attempted again
1055         * or until it times out and regenerated.
1056         */
1057        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1058            release_enc_data(engine, batch->conns[i], batch->packets[i]);
1059    }
1060    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1061        for ( ; i < (int) n_to_send; ++i)
1062            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1063    /* Return packets to the connection in reverse order so that the packet
1064     * ordering is maintained.
1065     */
1066    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1067    {
1068        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1069                                                    batch->packets[i]);
1070        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1071            coi_reactivate(conns_iter, batch->conns[i]);
1072    }
1073    return n_sent;
1074}
1075
1076
1077/* Return 1 if went past deadline, 0 otherwise */
1078static int
1079check_deadline (lsquic_engine_t *engine)
1080{
1081    if (engine->pub.enp_settings.es_proc_time_thresh &&
1082                                lsquic_time_now() > engine->deadline)
1083    {
1084        LSQ_INFO("went past threshold of %u usec, stop sending",
1085                            engine->pub.enp_settings.es_proc_time_thresh);
1086        engine->flags |= ENG_PAST_DEADLINE;
1087        return 1;
1088    }
1089    else
1090        return 0;
1091}
1092
1093
1094static void
1095send_packets_out (struct lsquic_engine *engine,
1096                  struct conns_tailq *ticked_conns,
1097                  struct conns_stailq *closed_conns)
1098{
1099    unsigned n, w, n_sent, n_batches_sent;
1100    lsquic_packet_out_t *packet_out;
1101    lsquic_conn_t *conn;
1102    struct out_batch *const batch = &engine->out_batch;
1103    struct conns_out_iter conns_iter;
1104    int shrink, deadline_exceeded;
1105
1106    coi_init(&conns_iter, engine);
1107    n_batches_sent = 0;
1108    n_sent = 0, n = 0;
1109    shrink = 0;
1110    deadline_exceeded = 0;
1111
1112    while ((conn = coi_next(&conns_iter)))
1113    {
1114        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1115        if (!packet_out) {
1116            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1117                                                            conn->cn_cid);
1118            coi_deactivate(&conns_iter, conn);
1119            continue;
1120        }
1121        if ((packet_out->po_flags & PO_ENCRYPTED)
1122                && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn))
1123        {
1124            /* Peer address changed since the packet was encrypted.  Need to
1125             * reallocate.
1126             */
1127            return_enc_data(engine, conn, packet_out);
1128        }
1129        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1130        {
1131            switch (encrypt_packet(engine, conn, packet_out))
1132            {
1133            case ENCPA_NOMEM:
1134                /* Send what we have and wait for a more opportune moment */
1135                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1136                goto end_for;
1137            case ENCPA_BADCRYPT:
1138                /* This is pretty bad: close connection immediately */
1139                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1140                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1141                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1142                {
1143                    if (!(conn->cn_flags & LSCONN_CLOSING))
1144                    {
1145                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1146                        engine_incref_conn(conn, LSCONN_CLOSING);
1147                        if (conn->cn_flags & LSCONN_HASHED)
1148                            remove_conn_from_hash(engine, conn);
1149                    }
1150                    coi_deactivate(&conns_iter, conn);
1151                    if (conn->cn_flags & LSCONN_TICKED)
1152                    {
1153                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1154                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1155                    }
1156                }
1157                continue;
1158            case ENCPA_OK:
1159                break;
1160            }
1161        }
1162        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1163                                        packet_out->po_packno, conn->cn_cid);
1164        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1165        if (packet_out->po_flags & PO_ENCRYPTED)
1166        {
1167            batch->outs[n].buf     = packet_out->po_enc_data;
1168            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1169        }
1170        else
1171        {
1172            batch->outs[n].buf     = packet_out->po_data;
1173            batch->outs[n].sz      = packet_out->po_data_sz;
1174        }
1175        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1176        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1177        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1178        batch->conns  [n]          = conn;
1179        batch->packets[n]          = packet_out;
1180        ++n;
1181        if (n == engine->batch_size)
1182        {
1183            n = 0;
1184            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1185            ++n_batches_sent;
1186            n_sent += w;
1187            if (w < engine->batch_size)
1188            {
1189                shrink = 1;
1190                break;
1191            }
1192            deadline_exceeded = check_deadline(engine);
1193            if (deadline_exceeded)
1194                break;
1195            grow_batch_size(engine);
1196        }
1197    }
1198  end_for:
1199
1200    if (n > 0) {
1201        w = send_batch(engine, &conns_iter, batch, n);
1202        n_sent += w;
1203        shrink = w < n;
1204        ++n_batches_sent;
1205        deadline_exceeded = check_deadline(engine);
1206    }
1207
1208    if (shrink)
1209        shrink_batch_size(engine);
1210    else if (n_batches_sent > 1 && !deadline_exceeded)
1211        grow_batch_size(engine);
1212
1213    coi_reheap(&conns_iter, engine);
1214
1215    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1216}
1217
1218
1219int
1220lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1221{
1222    return lsquic_mh_count(&engine->conns_out) > 0
1223    ;
1224}
1225
1226
1227static void
1228reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1229{
1230    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1231    engine->flags &= ~ENG_PAST_DEADLINE;
1232}
1233
1234
1235/* TODO: this is a user-facing function, account for load */
1236void
1237lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1238{
1239    lsquic_conn_t *conn;
1240    struct conns_stailq closed_conns;
1241    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1242
1243    STAILQ_INIT(&closed_conns);
1244    reset_deadline(engine, lsquic_time_now());
1245    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1246    {
1247        LSQ_DEBUG("can send again");
1248        EV_LOG_GENERIC_EVENT("can send again");
1249        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1250    }
1251
1252    send_packets_out(engine, &ticked_conns, &closed_conns);
1253
1254    while ((conn = STAILQ_FIRST(&closed_conns))) {
1255        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1256        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1257    }
1258
1259}
1260
1261
1262static void
1263process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1264                     lsquic_time_t now)
1265{
1266    lsquic_conn_t *conn;
1267    enum tick_st tick_st;
1268    unsigned i;
1269    lsquic_time_t next_tick_time;
1270    struct conns_stailq closed_conns;
1271    struct conns_tailq ticked_conns;
1272
1273    eng_hist_tick(&engine->history, now);
1274
1275    STAILQ_INIT(&closed_conns);
1276    TAILQ_INIT(&ticked_conns);
1277    reset_deadline(engine, now);
1278
1279    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND)
1280                                        && now > engine->resume_sending_at)
1281    {
1282        LSQ_NOTICE("failsafe activated: resume sending packets again after "
1283                    "timeout");
1284        EV_LOG_GENERIC_EVENT("resume sending packets again after timeout");
1285        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1286    }
1287
1288    i = 0;
1289    while ((conn = next_conn(engine))
1290          )
1291    {
1292        tick_st = conn->cn_if->ci_tick(conn, now);
1293        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1294        if (tick_st & TICK_SEND)
1295        {
1296            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1297            {
1298                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1299                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1300            }
1301        }
1302        if (tick_st & TICK_CLOSE)
1303        {
1304            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1305            engine_incref_conn(conn, LSCONN_CLOSING);
1306            if (conn->cn_flags & LSCONN_HASHED)
1307                remove_conn_from_hash(engine, conn);
1308        }
1309        else
1310        {
1311            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1312            engine_incref_conn(conn, LSCONN_TICKED);
1313        }
1314    }
1315
1316    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1317                        && lsquic_engine_has_unsent_packets(engine))
1318        send_packets_out(engine, &ticked_conns, &closed_conns);
1319
1320    while ((conn = STAILQ_FIRST(&closed_conns))) {
1321        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1322        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1323    }
1324
1325    /* TODO Heapification can be optimized by switching to the Floyd method:
1326     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1327     */
1328    while ((conn = TAILQ_FIRST(&ticked_conns)))
1329    {
1330        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1331        engine_decref_conn(engine, conn, LSCONN_TICKED);
1332        if (!(conn->cn_flags & LSCONN_TICKABLE)
1333            && conn->cn_if->ci_is_tickable(conn))
1334        {
1335            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1336            engine_incref_conn(conn, LSCONN_TICKABLE);
1337        }
1338        else if (!(conn->cn_flags & LSCONN_ATTQ))
1339        {
1340            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1341            if (next_tick_time)
1342            {
1343                if (0 == attq_add(engine->attq, conn, next_tick_time))
1344                    engine_incref_conn(conn, LSCONN_ATTQ);
1345            }
1346            else
1347                assert(0);
1348        }
1349    }
1350
1351}
1352
1353
1354/* Return 0 if packet is being processed by a real connection, 1 if the
1355 * packet was processed, but not by a connection, and -1 on error.
1356 */
1357int
1358lsquic_engine_packet_in (lsquic_engine_t *engine,
1359    const unsigned char *packet_in_data, size_t packet_in_size,
1360    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1361    void *peer_ctx)
1362{
1363    struct packin_parse_state ppstate;
1364    lsquic_packet_in_t *packet_in;
1365    int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length,
1366                                int is_server, struct packin_parse_state *);
1367
1368    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1369    {
1370        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1371            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1372        errno = E2BIG;
1373        return -1;
1374    }
1375
1376    if (conn_hash_using_addr(&engine->conns_hash))
1377    {
1378        const struct lsquic_conn *conn;
1379        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
1380        if (!conn)
1381            return -1;
1382        if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS)
1383            parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin;
1384        else
1385            parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin;
1386    }
1387    else
1388        parse_packet_in_begin = lsquic_parse_packet_in_begin;
1389
1390    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1391    if (!packet_in)
1392        return -1;
1393
1394    /* Library does not modify packet_in_data, it is not referenced after
1395     * this function returns and subsequent release of pi_data is guarded
1396     * by PI_OWN_DATA flag.
1397     */
1398    packet_in->pi_data = (unsigned char *) packet_in_data;
1399    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1400                                        engine->flags & ENG_SERVER, &ppstate))
1401    {
1402        LSQ_DEBUG("Cannot parse incoming packet's header");
1403        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1404        errno = EINVAL;
1405        return -1;
1406    }
1407
1408    packet_in->pi_received = lsquic_time_now();
1409    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1410    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1411                                                                    peer_ctx);
1412}
1413
1414
1415#if __GNUC__ && !defined(NDEBUG)
1416__attribute__((weak))
1417#endif
1418unsigned
1419lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1420{
1421    return engine->pub.enp_settings.es_versions;
1422}
1423
1424
1425int
1426lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1427{
1428    const lsquic_time_t *next_attq_time;
1429    lsquic_time_t now, next_time;
1430
1431    if (((engine->flags & ENG_PAST_DEADLINE)
1432                                    && lsquic_mh_count(&engine->conns_out))
1433        || lsquic_mh_count(&engine->conns_tickable))
1434    {
1435        *diff = 0;
1436        return 1;
1437    }
1438
1439    next_attq_time = attq_next_time(engine->attq);
1440    if (engine->pub.enp_flags & ENPUB_CAN_SEND)
1441    {
1442        if (next_attq_time)
1443            next_time = *next_attq_time;
1444        else
1445            return 0;
1446    }
1447    else
1448    {
1449        if (next_attq_time)
1450            next_time = MIN(*next_attq_time, engine->resume_sending_at);
1451        else
1452            next_time = engine->resume_sending_at;
1453    }
1454
1455    now = lsquic_time_now();
1456    *diff = (int) ((int64_t) next_time - (int64_t) now);
1457    return 1;
1458}
1459
1460
1461unsigned
1462lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1463{
1464    lsquic_time_t now;
1465    now = lsquic_time_now();
1466    if (from_now < 0)
1467        now -= from_now;
1468    else
1469        now += from_now;
1470    return attq_count_before(engine->attq, now);
1471}
1472
1473
1474