lsquic_engine.c revision bf2c7037
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <limits.h>
10#include <stdint.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <string.h>
14#include <sys/queue.h>
15#include <time.h>
16#ifndef WIN32
17#include <sys/time.h>
18#include <netinet/in.h>
19#include <sys/types.h>
20#include <sys/stat.h>
21#include <fcntl.h>
22#include <unistd.h>
23#include <netdb.h>
24#endif
25
26
27
28#include "lsquic.h"
29#include "lsquic_types.h"
30#include "lsquic_alarmset.h"
31#include "lsquic_parse_common.h"
32#include "lsquic_parse.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_packet_out.h"
35#include "lsquic_senhist.h"
36#include "lsquic_rtt.h"
37#include "lsquic_cubic.h"
38#include "lsquic_pacer.h"
39#include "lsquic_send_ctl.h"
40#include "lsquic_set.h"
41#include "lsquic_conn_flow.h"
42#include "lsquic_sfcw.h"
43#include "lsquic_stream.h"
44#include "lsquic_conn.h"
45#include "lsquic_full_conn.h"
46#include "lsquic_util.h"
47#include "lsquic_qtags.h"
48#include "lsquic_str.h"
49#include "lsquic_handshake.h"
50#include "lsquic_mm.h"
51#include "lsquic_conn_hash.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_eng_hist.h"
54#include "lsquic_ev_log.h"
55#include "lsquic_version.h"
56#include "lsquic_hash.h"
57#include "lsquic_attq.h"
58#include "lsquic_min_heap.h"
59#include "lsquic_http1x_if.h"
60
61#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
62#include "lsquic_logger.h"
63
64
65/* The batch of outgoing packets grows and shrinks dynamically */
66#define MAX_OUT_BATCH_SIZE 1024
67#define MIN_OUT_BATCH_SIZE 256
68#define INITIAL_OUT_BATCH_SIZE 512
69
70struct out_batch
71{
72    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
73    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
74    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
75};
76
77typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
78
79static void
80process_connections (struct lsquic_engine *engine, conn_iter_f iter,
81                     lsquic_time_t now);
82
83static void
84engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
85
86static lsquic_conn_t *
87engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
88                                        enum lsquic_conn_flags flag);
89
90static void
91force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
92
93/* Nested calls to LSQUIC are not supported */
94#define ENGINE_IN(e) do {                               \
95    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
96    (e)->pub.enp_flags |= ENPUB_PROC;                   \
97} while (0)
98
99#define ENGINE_OUT(e) do {                              \
100    assert((e)->pub.enp_flags & ENPUB_PROC);            \
101    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
102} while (0)
103
104/* A connection can be referenced from one of six places:
105 *
106 *   1. Connection hash: a connection starts its life in one of those.
107 *
108 *   2. Outgoing queue.
109 *
110 *   3. Tickable queue
111 *
112 *   4. Advisory Tick Time queue.
113 *
114 *   5. Closing connections queue.  This is a transient queue -- it only
115 *      exists for the duration of process_connections() function call.
116 *
117 *   6. Ticked connections queue.  Another transient queue, similar to (5).
118 *
119 * The idea is to destroy the connection when it is no longer referenced.
120 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
121 * that case, the connection is referenced from two places: (2) and (5).
122 * After its packets are sent, it is only referenced in (5), and at the
123 * end of the function call, when it is removed from (5), reference count
124 * goes to zero and the connection is destroyed.  If not all packets can
125 * be sent, at the end of the function call, the connection is referenced
126 * by (2) and will only be removed once all outgoing packets have been
127 * sent.
128 */
129#define CONN_REF_FLAGS  (LSCONN_HASHED          \
130                        |LSCONN_HAS_OUTGOING    \
131                        |LSCONN_TICKABLE        \
132                        |LSCONN_TICKED          \
133                        |LSCONN_CLOSING         \
134                        |LSCONN_ATTQ)
135
136
137
138
139struct lsquic_engine
140{
141    struct lsquic_engine_public        pub;
142    enum {
143        ENG_SERVER      = LSENG_SERVER,
144        ENG_HTTP        = LSENG_HTTP,
145        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
146        ENG_PAST_DEADLINE
147                        = (1 <<  8),    /* Previous call to a processing
148                                         * function went past time threshold.
149                                         */
150#ifndef NDEBUG
151        ENG_DTOR        = (1 << 26),    /* Engine destructor */
152#endif
153    }                                  flags;
154    const struct lsquic_stream_if     *stream_if;
155    void                              *stream_if_ctx;
156    lsquic_packets_out_f               packets_out;
157    void                              *packets_out_ctx;
158    void                              *bad_handshake_ctx;
159    struct conn_hash                   conns_hash;
160    struct min_heap                    conns_tickable;
161    struct min_heap                    conns_out;
162    struct eng_hist                    history;
163    unsigned                           batch_size;
164    struct attq                       *attq;
165    /* Track time last time a packet was sent to give new connections
166     * priority lower than that of existing connections.
167     */
168    lsquic_time_t                      last_sent;
169    unsigned                           n_conns;
170    lsquic_time_t                      deadline;
171    struct out_batch                   out_batch;
172};
173
174
175void
176lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
177                             unsigned flags)
178{
179    memset(settings, 0, sizeof(*settings));
180    settings->es_versions        = LSQUIC_DF_VERSIONS;
181    if (flags & ENG_SERVER)
182    {
183        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
184        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
185        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
186    }
187    else
188    {
189        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
190        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
191        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
192    }
193    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
194    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
195    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
196    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
197    settings->es_max_header_list_size
198                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
199    settings->es_ua              = LSQUIC_DF_UA;
200
201    settings->es_pdmd            = QTAG_X509;
202    settings->es_aead            = QTAG_AESG;
203    settings->es_kexs            = QTAG_C255;
204    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
205    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
206    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
207    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
208    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
209    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
210    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
211    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
212}
213
214
215/* Note: if returning an error, err_buf must be valid if non-NULL */
216int
217lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
218                              unsigned flags,
219                              char *err_buf, size_t err_buf_sz)
220{
221    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
222        settings->es_sfcw < LSQUIC_MIN_FCW)
223    {
224        if (err_buf)
225            snprintf(err_buf, err_buf_sz, "%s",
226                                            "flow control window set too low");
227        return -1;
228    }
229    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
230    {
231        if (err_buf)
232            snprintf(err_buf, err_buf_sz, "%s",
233                        "No supported QUIC versions specified");
234        return -1;
235    }
236    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
237    {
238        if (err_buf)
239            snprintf(err_buf, err_buf_sz, "%s",
240                        "one or more unsupported QUIC version is specified");
241        return -1;
242    }
243    return 0;
244}
245
246
247static void
248free_packet (void *ctx, void *conn_ctx, void *packet_data, char is_ipv6)
249{
250    free(packet_data);
251}
252
253
254static void *
255malloc_buf (void *ctx, void *conn_ctx, unsigned short size, char is_ipv6)
256{
257    return malloc(size);
258}
259
260
261static const struct lsquic_packout_mem_if stock_pmi =
262{
263    malloc_buf, free_packet, free_packet,
264};
265
266
267static int
268hash_conns_by_addr (const struct lsquic_engine *engine)
269{
270    if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS)
271        return 1;
272    if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS)
273                                && engine->pub.enp_settings.es_support_tcid0)
274        return 1;
275    return 0;
276}
277
278
279lsquic_engine_t *
280lsquic_engine_new (unsigned flags,
281                   const struct lsquic_engine_api *api)
282{
283    lsquic_engine_t *engine;
284    int tag_buf_len;
285    char err_buf[100];
286
287    if (!api->ea_packets_out)
288    {
289        LSQ_ERROR("packets_out callback is not specified");
290        return NULL;
291    }
292
293    if (api->ea_settings &&
294                0 != lsquic_engine_check_settings(api->ea_settings, flags,
295                                                    err_buf, sizeof(err_buf)))
296    {
297        LSQ_ERROR("cannot create engine: %s", err_buf);
298        return NULL;
299    }
300
301    engine = calloc(1, sizeof(*engine));
302    if (!engine)
303        return NULL;
304    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
305    {
306        free(engine);
307        return NULL;
308    }
309    if (api->ea_settings)
310        engine->pub.enp_settings        = *api->ea_settings;
311    else
312        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
313    tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf,
314                                    sizeof(engine->pub.enp_ver_tags_buf),
315                                    engine->pub.enp_settings.es_versions);
316    if (tag_buf_len <= 0)
317    {
318        LSQ_ERROR("cannot generate version tags buffer");
319        free(engine);
320        return NULL;
321    }
322    engine->pub.enp_ver_tags_len = tag_buf_len;
323    engine->pub.enp_flags = ENPUB_CAN_SEND;
324
325    engine->flags           = flags;
326    engine->stream_if       = api->ea_stream_if;
327    engine->stream_if_ctx   = api->ea_stream_if_ctx;
328    engine->packets_out     = api->ea_packets_out;
329    engine->packets_out_ctx = api->ea_packets_out_ctx;
330    if (api->ea_hsi_if)
331    {
332        engine->pub.enp_hsi_if  = api->ea_hsi_if;
333        engine->pub.enp_hsi_ctx = api->ea_hsi_ctx;
334    }
335    else
336    {
337        engine->pub.enp_hsi_if  = lsquic_http1x_if;
338        engine->pub.enp_hsi_ctx = NULL;
339    }
340    if (api->ea_pmi)
341    {
342        engine->pub.enp_pmi      = api->ea_pmi;
343        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
344    }
345    else
346    {
347        engine->pub.enp_pmi      = &stock_pmi;
348        engine->pub.enp_pmi_ctx  = NULL;
349    }
350    engine->pub.enp_verify_cert  = api->ea_verify_cert;
351    engine->pub.enp_verify_ctx   = api->ea_verify_ctx;
352    engine->pub.enp_engine = engine;
353    conn_hash_init(&engine->conns_hash,
354                        hash_conns_by_addr(engine) ?  CHF_USE_ADDR : 0);
355    engine->attq = attq_create();
356    eng_hist_init(&engine->history);
357    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
358
359
360    LSQ_INFO("instantiated engine");
361    return engine;
362}
363
364
365static void
366grow_batch_size (struct lsquic_engine *engine)
367{
368    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
369}
370
371
372static void
373shrink_batch_size (struct lsquic_engine *engine)
374{
375    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
376}
377
378
379/* Wrapper to make sure important things occur before the connection is
380 * really destroyed.
381 */
382static void
383destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
384{
385    --engine->n_conns;
386    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
387    conn->cn_if->ci_destroy(conn);
388}
389
390
391static int
392maybe_grow_conn_heaps (struct lsquic_engine *engine)
393{
394    struct min_heap_elem *els;
395    unsigned count;
396
397    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
398        return 0;   /* Nothing to do */
399
400    if (lsquic_mh_nalloc(&engine->conns_tickable))
401        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
402    else
403        count = 8;
404
405    els = malloc(sizeof(els[0]) * count);
406    if (!els)
407    {
408        LSQ_ERROR("%s: malloc failed", __func__);
409        return -1;
410    }
411
412    LSQ_DEBUG("grew heaps to %u elements", count / 2);
413    memcpy(&els[0], engine->conns_tickable.mh_elems,
414                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
415    memcpy(&els[count / 2], engine->conns_out.mh_elems,
416                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
417    free(engine->conns_tickable.mh_elems);
418    engine->conns_tickable.mh_elems = els;
419    engine->conns_out.mh_elems = &els[count / 2];
420    engine->conns_tickable.mh_nalloc = count / 2;
421    engine->conns_out.mh_nalloc = count / 2;
422    return 0;
423}
424
425
426static lsquic_conn_t *
427new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
428                      unsigned short max_packet_size)
429{
430    lsquic_conn_t *conn;
431    unsigned flags;
432    if (0 != maybe_grow_conn_heaps(engine))
433        return NULL;
434    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
435    conn = full_conn_client_new(&engine->pub, engine->stream_if,
436                    engine->stream_if_ctx, flags, hostname, max_packet_size);
437    if (!conn)
438        return NULL;
439    ++engine->n_conns;
440    return conn;
441}
442
443
444static lsquic_conn_t *
445find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
446         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
447{
448    lsquic_conn_t *conn;
449
450    if (conn_hash_using_addr(&engine->conns_hash))
451        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
452    else if (packet_in->pi_flags & PI_CONN_ID)
453        conn = conn_hash_find_by_cid(&engine->conns_hash,
454                                                    packet_in->pi_conn_id);
455    else
456    {
457        LSQ_DEBUG("packet header does not have connection ID: discarding");
458        return NULL;
459    }
460
461    if (!conn)
462        return NULL;
463
464    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
465    if ((packet_in->pi_flags & PI_CONN_ID)
466        && conn->cn_cid != packet_in->pi_conn_id)
467    {
468        LSQ_DEBUG("connection IDs do not match");
469        return NULL;
470    }
471
472    return conn;
473}
474
475
476#if !defined(NDEBUG) && __GNUC__
477__attribute__((weak))
478#endif
479void
480lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
481                                    lsquic_conn_t *conn)
482{
483    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
484        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
485    {
486        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
487        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
488        engine_incref_conn(conn, LSCONN_TICKABLE);
489    }
490}
491
492
493void
494lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
495                                lsquic_conn_t *conn, lsquic_time_t tick_time)
496{
497    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
498    if (conn->cn_flags & LSCONN_TICKABLE)
499    {
500        /* Optimization: no need to add the connection to the Advisory Tick
501         * Time Queue: it is about to be ticked, after which it its next tick
502         * time may be queried again.
503         */;
504    }
505    else if (conn->cn_flags & LSCONN_ATTQ)
506    {
507        if (lsquic_conn_adv_time(conn) != tick_time)
508        {
509            attq_remove(engine->attq, conn);
510            if (0 != attq_add(engine->attq, conn, tick_time))
511                engine_decref_conn(engine, conn, LSCONN_ATTQ);
512        }
513    }
514    else if (0 == attq_add(engine->attq, conn, tick_time))
515        engine_incref_conn(conn, LSCONN_ATTQ);
516}
517
518
519/* Return 0 if packet is being processed by a connections, otherwise return 1 */
520static int
521process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
522       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
523       const struct sockaddr *sa_peer, void *peer_ctx)
524{
525    lsquic_conn_t *conn;
526
527    if (lsquic_packet_in_is_gquic_prst(packet_in)
528                                && !engine->pub.enp_settings.es_honor_prst)
529    {
530        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
531        LSQ_DEBUG("public reset packet: discarding");
532        return 1;
533    }
534
535    conn = find_conn(engine, packet_in, ppstate, sa_local);
536
537    if (!conn)
538    {
539        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
540        return 1;
541    }
542
543    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
544    {
545        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
546        engine_incref_conn(conn, LSCONN_TICKABLE);
547    }
548    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
549    lsquic_packet_in_upref(packet_in);
550    conn->cn_peer_ctx = peer_ctx;
551    conn->cn_if->ci_packet_in(conn, packet_in);
552    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
553    return 0;
554}
555
556
557void
558lsquic_engine_destroy (lsquic_engine_t *engine)
559{
560    lsquic_conn_t *conn;
561
562    LSQ_DEBUG("destroying engine");
563#ifndef NDEBUG
564    engine->flags |= ENG_DTOR;
565#endif
566
567    while ((conn = lsquic_mh_pop(&engine->conns_out)))
568    {
569        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
570        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
571    }
572
573    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
574    {
575        assert(conn->cn_flags & LSCONN_TICKABLE);
576        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
577    }
578
579    for (conn = conn_hash_first(&engine->conns_hash); conn;
580                            conn = conn_hash_next(&engine->conns_hash))
581        force_close_conn(engine, conn);
582    conn_hash_cleanup(&engine->conns_hash);
583
584    assert(0 == engine->n_conns);
585    attq_destroy(engine->attq);
586
587    assert(0 == lsquic_mh_count(&engine->conns_out));
588    assert(0 == lsquic_mh_count(&engine->conns_tickable));
589    lsquic_mm_cleanup(&engine->pub.enp_mm);
590    free(engine->conns_tickable.mh_elems);
591    free(engine);
592}
593
594
595lsquic_conn_t *
596lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
597                       const struct sockaddr *peer_sa,
598                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
599                       const char *hostname, unsigned short max_packet_size)
600{
601    lsquic_conn_t *conn;
602    ENGINE_IN(engine);
603
604    if (engine->flags & ENG_SERVER)
605    {
606        LSQ_ERROR("`%s' must only be called in client mode", __func__);
607        goto err;
608    }
609
610    if (conn_hash_using_addr(&engine->conns_hash)
611                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
612    {
613        LSQ_ERROR("cannot have more than one connection on the same port");
614        goto err;
615    }
616
617    if (0 == max_packet_size)
618    {
619        switch (peer_sa->sa_family)
620        {
621        case AF_INET:
622            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
623            break;
624        default:
625            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
626            break;
627        }
628    }
629
630    conn = new_full_conn_client(engine, hostname, max_packet_size);
631    if (!conn)
632        goto err;
633    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
634    if (0 != conn_hash_add(&engine->conns_hash, conn))
635    {
636        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
637            conn->cn_cid);
638        destroy_conn(engine, conn);
639        goto err;
640    }
641    assert(!(conn->cn_flags &
642        (CONN_REF_FLAGS
643         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
644                                 callbacks */
645                             )));
646    conn->cn_flags |= LSCONN_HASHED;
647    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
648    engine_incref_conn(conn, LSCONN_TICKABLE);
649    conn->cn_peer_ctx = peer_ctx;
650    lsquic_conn_set_ctx(conn, conn_ctx);
651    full_conn_client_call_on_new(conn);
652  end:
653    ENGINE_OUT(engine);
654    return conn;
655  err:
656    conn = NULL;
657    goto end;
658}
659
660
661static void
662remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
663{
664    conn_hash_remove(&engine->conns_hash, conn);
665    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
666}
667
668
669static void
670refflags2str (enum lsquic_conn_flags flags, char s[6])
671{
672    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
673    *s = 'H'; s += !!(flags & LSCONN_HASHED);
674    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
675    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
676    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
677    *s = 'K'; s += !!(flags & LSCONN_TICKED);
678    *s = '\0';
679}
680
681
682static void
683engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
684{
685    char str[2][7];
686    assert(flag & CONN_REF_FLAGS);
687    assert(!(conn->cn_flags & flag));
688    conn->cn_flags |= flag;
689    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
690                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
691                    (refflags2str(conn->cn_flags, str[1]), str[1]));
692}
693
694
695static lsquic_conn_t *
696engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
697                                        enum lsquic_conn_flags flags)
698{
699    char str[2][7];
700    assert(flags & CONN_REF_FLAGS);
701    assert(conn->cn_flags & flags);
702#ifndef NDEBUG
703    if (flags & LSCONN_CLOSING)
704        assert(0 == (conn->cn_flags & LSCONN_HASHED));
705#endif
706    conn->cn_flags &= ~flags;
707    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
708                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
709                    (refflags2str(conn->cn_flags, str[1]), str[1]));
710    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
711    {
712        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
713        destroy_conn(engine, conn);
714        return NULL;
715    }
716    else
717        return conn;
718}
719
720
721/* This is not a general-purpose function.  Only call from engine dtor. */
722static void
723force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
724{
725    assert(engine->flags & ENG_DTOR);
726    const enum lsquic_conn_flags flags = conn->cn_flags;
727    assert(conn->cn_flags & CONN_REF_FLAGS);
728    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
729    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
730    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
731    if (flags & LSCONN_ATTQ)
732    {
733        attq_remove(engine->attq, conn);
734        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
735    }
736    if (flags & LSCONN_HASHED)
737        remove_conn_from_hash(engine, conn);
738}
739
740
741/* Iterator for tickable connections (those on the Tickable Queue).  Before
742 * a connection is returned, it is removed from the Advisory Tick Time queue
743 * if necessary.
744 */
745static lsquic_conn_t *
746conn_iter_next_tickable (struct lsquic_engine *engine)
747{
748    lsquic_conn_t *conn;
749
750    conn = lsquic_mh_pop(&engine->conns_tickable);
751
752    if (conn)
753        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
754    if (conn && (conn->cn_flags & LSCONN_ATTQ))
755    {
756        attq_remove(engine->attq, conn);
757        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
758    }
759
760    return conn;
761}
762
763
764void
765lsquic_engine_process_conns (lsquic_engine_t *engine)
766{
767    lsquic_conn_t *conn;
768    lsquic_time_t now;
769
770    ENGINE_IN(engine);
771
772    now = lsquic_time_now();
773    while ((conn = attq_pop(engine->attq, now)))
774    {
775        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
776        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
777        {
778            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
779            engine_incref_conn(conn, LSCONN_TICKABLE);
780        }
781    }
782
783    process_connections(engine, conn_iter_next_tickable, now);
784    ENGINE_OUT(engine);
785}
786
787
788static ssize_t
789really_encrypt_packet (const lsquic_conn_t *conn,
790                       struct lsquic_packet_out *packet_out,
791                       unsigned char *buf, size_t bufsz)
792{
793    int header_sz, is_hello_packet;
794    enum enc_level enc_level;
795    size_t packet_sz;
796    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
797
798    header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out,
799                                            header_buf, sizeof(header_buf));
800    if (header_sz < 0)
801        return -1;
802
803    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
804    enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session,
805                conn->cn_version, 0,
806                packet_out->po_packno, header_buf, header_sz,
807                packet_out->po_data, packet_out->po_data_sz,
808                buf, bufsz, &packet_sz, is_hello_packet);
809    if ((int) enc_level >= 0)
810    {
811        lsquic_packet_out_set_enc_level(packet_out, enc_level);
812        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, "
813            "ciphertext is %zd bytes",
814            packet_out->po_packno,
815            conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
816                                                packet_out->po_data_sz,
817            packet_sz);
818        return packet_sz;
819    }
820    else
821        return -1;
822}
823
824
825static int
826conn_peer_ipv6 (const struct lsquic_conn *conn)
827{
828    return AF_INET6 == ((struct sockaddr *) conn->cn_peer_addr)->sa_family;
829}
830
831
832static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
833encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
834                                            lsquic_packet_out_t *packet_out)
835{
836    ssize_t enc_sz;
837    size_t bufsz;
838    unsigned sent_sz;
839    unsigned char *buf;
840    int ipv6;
841
842    bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
843                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
844    if (bufsz > USHRT_MAX)
845        return ENCPA_BADCRYPT;  /* To cause connection to close */
846    ipv6 = conn_peer_ipv6(conn);
847    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx,
848                                            conn->cn_peer_ctx, bufsz, ipv6);
849    if (!buf)
850    {
851        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
852                                                                        bufsz);
853        return ENCPA_NOMEM;
854    }
855
856    {
857        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
858        sent_sz = enc_sz;
859    }
860
861    if (enc_sz < 0)
862    {
863        engine->pub.enp_pmi->pmi_return(engine->pub.enp_pmi_ctx,
864                                                conn->cn_peer_ctx, buf, ipv6);
865        return ENCPA_BADCRYPT;
866    }
867
868    packet_out->po_enc_data    = buf;
869    packet_out->po_enc_data_sz = enc_sz;
870    packet_out->po_sent_sz     = sent_sz;
871    packet_out->po_flags &= ~PO_IPv6;
872    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ|(ipv6 << POIPv6_SHIFT);
873
874    return ENCPA_OK;
875}
876
877
878static void
879release_or_return_enc_data (struct lsquic_engine *engine,
880                void (*pmi_rel_or_ret) (void *, void *, void *, char),
881                struct lsquic_conn *conn, struct lsquic_packet_out *packet_out)
882{
883    pmi_rel_or_ret(engine->pub.enp_pmi_ctx, conn->cn_peer_ctx,
884                packet_out->po_enc_data, lsquic_packet_out_ipv6(packet_out));
885    packet_out->po_flags &= ~PO_ENCRYPTED;
886    packet_out->po_enc_data = NULL;
887}
888
889
890static void
891release_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
892                                        struct lsquic_packet_out *packet_out)
893{
894    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_release,
895                                conn, packet_out);
896}
897
898
899static void
900return_enc_data (struct lsquic_engine *engine, struct lsquic_conn *conn,
901                                        struct lsquic_packet_out *packet_out)
902{
903    release_or_return_enc_data(engine, engine->pub.enp_pmi->pmi_return,
904                                conn, packet_out);
905}
906
907
908STAILQ_HEAD(conns_stailq, lsquic_conn);
909TAILQ_HEAD(conns_tailq, lsquic_conn);
910
911
912struct conns_out_iter
913{
914    struct min_heap            *coi_heap;
915    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
916                                coi_inactive_list;
917    lsquic_conn_t              *coi_next;
918#ifndef NDEBUG
919    lsquic_time_t               coi_last_sent;
920#endif
921};
922
923
924static void
925coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
926{
927    iter->coi_heap = &engine->conns_out;
928    iter->coi_next = NULL;
929    TAILQ_INIT(&iter->coi_active_list);
930    TAILQ_INIT(&iter->coi_inactive_list);
931#ifndef NDEBUG
932    iter->coi_last_sent = 0;
933#endif
934}
935
936
937static lsquic_conn_t *
938coi_next (struct conns_out_iter *iter)
939{
940    lsquic_conn_t *conn;
941
942    if (lsquic_mh_count(iter->coi_heap) > 0)
943    {
944        conn = lsquic_mh_pop(iter->coi_heap);
945        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
946        conn->cn_flags |= LSCONN_COI_ACTIVE;
947#ifndef NDEBUG
948        if (iter->coi_last_sent)
949            assert(iter->coi_last_sent <= conn->cn_last_sent);
950        iter->coi_last_sent = conn->cn_last_sent;
951#endif
952        return conn;
953    }
954    else if (!TAILQ_EMPTY(&iter->coi_active_list))
955    {
956        conn = iter->coi_next;
957        if (!conn)
958            conn = TAILQ_FIRST(&iter->coi_active_list);
959        if (conn)
960            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
961        return conn;
962    }
963    else
964        return NULL;
965}
966
967
968static void
969coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
970{
971    if (!(conn->cn_flags & LSCONN_EVANESCENT))
972    {
973        assert(!TAILQ_EMPTY(&iter->coi_active_list));
974        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
975        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
976        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
977        conn->cn_flags |= LSCONN_COI_INACTIVE;
978    }
979}
980
981
982static void
983coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
984{
985    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
986    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
987    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
988    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
989    conn->cn_flags |= LSCONN_COI_ACTIVE;
990}
991
992
993static void
994coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
995{
996    lsquic_conn_t *conn;
997    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
998    {
999        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1000        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1001        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
1002    }
1003    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1004    {
1005        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1006        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1007        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1008    }
1009}
1010
1011
1012static unsigned
1013send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1014                  struct out_batch *batch, unsigned n_to_send)
1015{
1016    int n_sent, i;
1017    lsquic_time_t now;
1018
1019    /* Set sent time before the write to avoid underestimating RTT */
1020    now = lsquic_time_now();
1021    for (i = 0; i < (int) n_to_send; ++i)
1022        batch->packets[i]->po_sent = now;
1023    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1024                                                                n_to_send);
1025    if (n_sent < (int) n_to_send)
1026    {
1027        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
1028        LSQ_DEBUG("cannot send packets");
1029        EV_LOG_GENERIC_EVENT("cannot send packets");
1030    }
1031    if (n_sent >= 0)
1032        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1033    else
1034    {
1035        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1036        n_sent = 0;
1037    }
1038    if (n_sent > 0)
1039        engine->last_sent = now + n_sent;
1040    for (i = 0; i < n_sent; ++i)
1041    {
1042        eng_hist_inc(&engine->history, now, sl_packets_out);
1043        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1044        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1045                                                    batch->packets[i]);
1046        /* `i' is added to maintain relative order */
1047        batch->conns[i]->cn_last_sent = now + i;
1048        /* Release packet out buffer as soon as the packet is sent
1049         * successfully.  If not successfully sent, we hold on to
1050         * this buffer until the packet sending is attempted again
1051         * or until it times out and regenerated.
1052         */
1053        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1054            release_enc_data(engine, batch->conns[i], batch->packets[i]);
1055    }
1056    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1057        for ( ; i < (int) n_to_send; ++i)
1058            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1059    /* Return packets to the connection in reverse order so that the packet
1060     * ordering is maintained.
1061     */
1062    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1063    {
1064        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1065                                                    batch->packets[i]);
1066        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1067            coi_reactivate(conns_iter, batch->conns[i]);
1068    }
1069    return n_sent;
1070}
1071
1072
1073/* Return 1 if went past deadline, 0 otherwise */
1074static int
1075check_deadline (lsquic_engine_t *engine)
1076{
1077    if (engine->pub.enp_settings.es_proc_time_thresh &&
1078                                lsquic_time_now() > engine->deadline)
1079    {
1080        LSQ_INFO("went past threshold of %u usec, stop sending",
1081                            engine->pub.enp_settings.es_proc_time_thresh);
1082        engine->flags |= ENG_PAST_DEADLINE;
1083        return 1;
1084    }
1085    else
1086        return 0;
1087}
1088
1089
1090static void
1091send_packets_out (struct lsquic_engine *engine,
1092                  struct conns_tailq *ticked_conns,
1093                  struct conns_stailq *closed_conns)
1094{
1095    unsigned n, w, n_sent, n_batches_sent;
1096    lsquic_packet_out_t *packet_out;
1097    lsquic_conn_t *conn;
1098    struct out_batch *const batch = &engine->out_batch;
1099    struct conns_out_iter conns_iter;
1100    int shrink, deadline_exceeded;
1101
1102    coi_init(&conns_iter, engine);
1103    n_batches_sent = 0;
1104    n_sent = 0, n = 0;
1105    shrink = 0;
1106    deadline_exceeded = 0;
1107
1108    while ((conn = coi_next(&conns_iter)))
1109    {
1110        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1111        if (!packet_out) {
1112            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1113                                                            conn->cn_cid);
1114            coi_deactivate(&conns_iter, conn);
1115            continue;
1116        }
1117        if ((packet_out->po_flags & PO_ENCRYPTED)
1118                && lsquic_packet_out_ipv6(packet_out) != conn_peer_ipv6(conn))
1119        {
1120            /* Peer address changed since the packet was encrypted.  Need to
1121             * reallocate.
1122             */
1123            return_enc_data(engine, conn, packet_out);
1124        }
1125        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1126        {
1127            switch (encrypt_packet(engine, conn, packet_out))
1128            {
1129            case ENCPA_NOMEM:
1130                /* Send what we have and wait for a more opportune moment */
1131                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1132                goto end_for;
1133            case ENCPA_BADCRYPT:
1134                /* This is pretty bad: close connection immediately */
1135                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1136                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1137                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1138                {
1139                    if (!(conn->cn_flags & LSCONN_CLOSING))
1140                    {
1141                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1142                        engine_incref_conn(conn, LSCONN_CLOSING);
1143                        if (conn->cn_flags & LSCONN_HASHED)
1144                            remove_conn_from_hash(engine, conn);
1145                    }
1146                    coi_deactivate(&conns_iter, conn);
1147                    if (conn->cn_flags & LSCONN_TICKED)
1148                    {
1149                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1150                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1151                    }
1152                }
1153                continue;
1154            case ENCPA_OK:
1155                break;
1156            }
1157        }
1158        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1159                                        packet_out->po_packno, conn->cn_cid);
1160        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1161        if (packet_out->po_flags & PO_ENCRYPTED)
1162        {
1163            batch->outs[n].buf     = packet_out->po_enc_data;
1164            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1165        }
1166        else
1167        {
1168            batch->outs[n].buf     = packet_out->po_data;
1169            batch->outs[n].sz      = packet_out->po_data_sz;
1170        }
1171        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1172        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1173        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1174        batch->conns  [n]          = conn;
1175        batch->packets[n]          = packet_out;
1176        ++n;
1177        if (n == engine->batch_size)
1178        {
1179            n = 0;
1180            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1181            ++n_batches_sent;
1182            n_sent += w;
1183            if (w < engine->batch_size)
1184            {
1185                shrink = 1;
1186                break;
1187            }
1188            deadline_exceeded = check_deadline(engine);
1189            if (deadline_exceeded)
1190                break;
1191            grow_batch_size(engine);
1192        }
1193    }
1194  end_for:
1195
1196    if (n > 0) {
1197        w = send_batch(engine, &conns_iter, batch, n);
1198        n_sent += w;
1199        shrink = w < n;
1200        ++n_batches_sent;
1201        deadline_exceeded = check_deadline(engine);
1202    }
1203
1204    if (shrink)
1205        shrink_batch_size(engine);
1206    else if (n_batches_sent > 1 && !deadline_exceeded)
1207        grow_batch_size(engine);
1208
1209    coi_reheap(&conns_iter, engine);
1210
1211    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1212}
1213
1214
1215int
1216lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1217{
1218    return lsquic_mh_count(&engine->conns_out) > 0
1219    ;
1220}
1221
1222
1223static void
1224reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1225{
1226    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1227    engine->flags &= ~ENG_PAST_DEADLINE;
1228}
1229
1230
1231/* TODO: this is a user-facing function, account for load */
1232void
1233lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1234{
1235    lsquic_conn_t *conn;
1236    struct conns_stailq closed_conns;
1237    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1238
1239    STAILQ_INIT(&closed_conns);
1240    reset_deadline(engine, lsquic_time_now());
1241    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1242    {
1243        LSQ_DEBUG("can send again");
1244        EV_LOG_GENERIC_EVENT("can send again");
1245        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1246    }
1247
1248    send_packets_out(engine, &ticked_conns, &closed_conns);
1249
1250    while ((conn = STAILQ_FIRST(&closed_conns))) {
1251        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1252        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1253    }
1254
1255}
1256
1257
1258static void
1259process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1260                     lsquic_time_t now)
1261{
1262    lsquic_conn_t *conn;
1263    enum tick_st tick_st;
1264    unsigned i;
1265    lsquic_time_t next_tick_time;
1266    struct conns_stailq closed_conns;
1267    struct conns_tailq ticked_conns;
1268
1269    eng_hist_tick(&engine->history, now);
1270
1271    STAILQ_INIT(&closed_conns);
1272    TAILQ_INIT(&ticked_conns);
1273    reset_deadline(engine, now);
1274
1275    i = 0;
1276    while ((conn = next_conn(engine))
1277          )
1278    {
1279        tick_st = conn->cn_if->ci_tick(conn, now);
1280        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1281        if (tick_st & TICK_SEND)
1282        {
1283            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1284            {
1285                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1286                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1287            }
1288        }
1289        if (tick_st & TICK_CLOSE)
1290        {
1291            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1292            engine_incref_conn(conn, LSCONN_CLOSING);
1293            if (conn->cn_flags & LSCONN_HASHED)
1294                remove_conn_from_hash(engine, conn);
1295        }
1296        else
1297        {
1298            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1299            engine_incref_conn(conn, LSCONN_TICKED);
1300        }
1301    }
1302
1303    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1304                        && lsquic_engine_has_unsent_packets(engine))
1305        send_packets_out(engine, &ticked_conns, &closed_conns);
1306
1307    while ((conn = STAILQ_FIRST(&closed_conns))) {
1308        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1309        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1310    }
1311
1312    /* TODO Heapification can be optimized by switching to the Floyd method:
1313     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1314     */
1315    while ((conn = TAILQ_FIRST(&ticked_conns)))
1316    {
1317        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1318        engine_decref_conn(engine, conn, LSCONN_TICKED);
1319        if (!(conn->cn_flags & LSCONN_TICKABLE)
1320            && conn->cn_if->ci_is_tickable(conn))
1321        {
1322            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1323            engine_incref_conn(conn, LSCONN_TICKABLE);
1324        }
1325        else if (!(conn->cn_flags & LSCONN_ATTQ))
1326        {
1327            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1328            if (next_tick_time)
1329            {
1330                if (0 == attq_add(engine->attq, conn, next_tick_time))
1331                    engine_incref_conn(conn, LSCONN_ATTQ);
1332            }
1333            else
1334                assert(0);
1335        }
1336    }
1337
1338}
1339
1340
1341/* Return 0 if packet is being processed by a real connection, 1 if the
1342 * packet was processed, but not by a connection, and -1 on error.
1343 */
1344int
1345lsquic_engine_packet_in (lsquic_engine_t *engine,
1346    const unsigned char *packet_in_data, size_t packet_in_size,
1347    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1348    void *peer_ctx)
1349{
1350    struct packin_parse_state ppstate;
1351    lsquic_packet_in_t *packet_in;
1352    int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length,
1353                                int is_server, struct packin_parse_state *);
1354
1355    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1356    {
1357        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1358            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1359        errno = E2BIG;
1360        return -1;
1361    }
1362
1363    if (conn_hash_using_addr(&engine->conns_hash))
1364    {
1365        const struct lsquic_conn *conn;
1366        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
1367        if (!conn)
1368            return -1;
1369        if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS)
1370            parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin;
1371        else
1372            parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin;
1373    }
1374    else
1375        parse_packet_in_begin = lsquic_parse_packet_in_begin;
1376
1377    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1378    if (!packet_in)
1379        return -1;
1380
1381    /* Library does not modify packet_in_data, it is not referenced after
1382     * this function returns and subsequent release of pi_data is guarded
1383     * by PI_OWN_DATA flag.
1384     */
1385    packet_in->pi_data = (unsigned char *) packet_in_data;
1386    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1387                                        engine->flags & ENG_SERVER, &ppstate))
1388    {
1389        LSQ_DEBUG("Cannot parse incoming packet's header");
1390        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1391        errno = EINVAL;
1392        return -1;
1393    }
1394
1395    packet_in->pi_received = lsquic_time_now();
1396    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1397    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1398                                                                    peer_ctx);
1399}
1400
1401
1402#if __GNUC__ && !defined(NDEBUG)
1403__attribute__((weak))
1404#endif
1405unsigned
1406lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1407{
1408    return engine->pub.enp_settings.es_versions;
1409}
1410
1411
1412int
1413lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1414{
1415    const lsquic_time_t *next_time;
1416    lsquic_time_t now;
1417
1418    if (((engine->flags & ENG_PAST_DEADLINE)
1419                                    && lsquic_mh_count(&engine->conns_out))
1420        || lsquic_mh_count(&engine->conns_tickable))
1421    {
1422        *diff = 0;
1423        return 1;
1424    }
1425
1426    next_time = attq_next_time(engine->attq);
1427    if (!next_time)
1428        return 0;
1429
1430    now = lsquic_time_now();
1431    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1432    return 1;
1433}
1434
1435
1436unsigned
1437lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1438{
1439    lsquic_time_t now;
1440    now = lsquic_time_now();
1441    if (from_now < 0)
1442        now -= from_now;
1443    else
1444        now += from_now;
1445    return attq_count_before(engine->attq, now);
1446}
1447
1448
1449