lsquic_engine.c revision 6f126d80
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <time.h>
15#ifndef WIN32
16#include <sys/time.h>
17#include <netinet/in.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <fcntl.h>
21#include <unistd.h>
22#include <netdb.h>
23#endif
24
25
26
27#include "lsquic.h"
28#include "lsquic_types.h"
29#include "lsquic_alarmset.h"
30#include "lsquic_parse_common.h"
31#include "lsquic_parse.h"
32#include "lsquic_packet_in.h"
33#include "lsquic_packet_out.h"
34#include "lsquic_senhist.h"
35#include "lsquic_rtt.h"
36#include "lsquic_cubic.h"
37#include "lsquic_pacer.h"
38#include "lsquic_send_ctl.h"
39#include "lsquic_set.h"
40#include "lsquic_conn_flow.h"
41#include "lsquic_sfcw.h"
42#include "lsquic_stream.h"
43#include "lsquic_conn.h"
44#include "lsquic_full_conn.h"
45#include "lsquic_util.h"
46#include "lsquic_qtags.h"
47#include "lsquic_str.h"
48#include "lsquic_handshake.h"
49#include "lsquic_mm.h"
50#include "lsquic_conn_hash.h"
51#include "lsquic_engine_public.h"
52#include "lsquic_eng_hist.h"
53#include "lsquic_ev_log.h"
54#include "lsquic_version.h"
55#include "lsquic_hash.h"
56#include "lsquic_attq.h"
57#include "lsquic_min_heap.h"
58
59#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
60#include "lsquic_logger.h"
61
62
63/* The batch of outgoing packets grows and shrinks dynamically */
64#define MAX_OUT_BATCH_SIZE 1024
65#define MIN_OUT_BATCH_SIZE 256
66#define INITIAL_OUT_BATCH_SIZE 512
67
68struct out_batch
69{
70    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
71    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
72    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
73};
74
75typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
76
77static void
78process_connections (struct lsquic_engine *engine, conn_iter_f iter,
79                     lsquic_time_t now);
80
81static void
82engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
83
84static lsquic_conn_t *
85engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
86                                        enum lsquic_conn_flags flag);
87
88static void
89force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
90
91/* Nested calls to LSQUIC are not supported */
92#define ENGINE_IN(e) do {                               \
93    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
94    (e)->pub.enp_flags |= ENPUB_PROC;                   \
95} while (0)
96
97#define ENGINE_OUT(e) do {                              \
98    assert((e)->pub.enp_flags & ENPUB_PROC);            \
99    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
100} while (0)
101
102/* A connection can be referenced from one of six places:
103 *
104 *   1. Connection hash: a connection starts its life in one of those.
105 *
106 *   2. Outgoing queue.
107 *
108 *   3. Tickable queue
109 *
110 *   4. Advisory Tick Time queue.
111 *
112 *   5. Closing connections queue.  This is a transient queue -- it only
113 *      exists for the duration of process_connections() function call.
114 *
115 *   6. Ticked connections queue.  Another transient queue, similar to (5).
116 *
117 * The idea is to destroy the connection when it is no longer referenced.
118 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
119 * that case, the connection is referenced from two places: (2) and (5).
120 * After its packets are sent, it is only referenced in (5), and at the
121 * end of the function call, when it is removed from (5), reference count
122 * goes to zero and the connection is destroyed.  If not all packets can
123 * be sent, at the end of the function call, the connection is referenced
124 * by (2) and will only be removed once all outgoing packets have been
125 * sent.
126 */
127#define CONN_REF_FLAGS  (LSCONN_HASHED          \
128                        |LSCONN_HAS_OUTGOING    \
129                        |LSCONN_TICKABLE        \
130                        |LSCONN_TICKED          \
131                        |LSCONN_CLOSING         \
132                        |LSCONN_ATTQ)
133
134
135
136
137struct lsquic_engine
138{
139    struct lsquic_engine_public        pub;
140    enum {
141        ENG_SERVER      = LSENG_SERVER,
142        ENG_HTTP        = LSENG_HTTP,
143        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
144        ENG_PAST_DEADLINE
145                        = (1 <<  8),    /* Previous call to a processing
146                                         * function went past time threshold.
147                                         */
148#ifndef NDEBUG
149        ENG_DTOR        = (1 << 26),    /* Engine destructor */
150#endif
151    }                                  flags;
152    const struct lsquic_stream_if     *stream_if;
153    void                              *stream_if_ctx;
154    lsquic_packets_out_f               packets_out;
155    void                              *packets_out_ctx;
156    void                              *bad_handshake_ctx;
157    struct conn_hash                   conns_hash;
158    struct min_heap                    conns_tickable;
159    struct min_heap                    conns_out;
160    struct eng_hist                    history;
161    unsigned                           batch_size;
162    struct attq                       *attq;
163    /* Track time last time a packet was sent to give new connections
164     * priority lower than that of existing connections.
165     */
166    lsquic_time_t                      last_sent;
167    unsigned                           n_conns;
168    lsquic_time_t                      deadline;
169    struct out_batch                   out_batch;
170};
171
172
173void
174lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
175                             unsigned flags)
176{
177    memset(settings, 0, sizeof(*settings));
178    settings->es_versions        = LSQUIC_DF_VERSIONS;
179    if (flags & ENG_SERVER)
180    {
181        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
182        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
183        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
184    }
185    else
186    {
187        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
188        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
189        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
190    }
191    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
192    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
193    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
194    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
195    settings->es_max_header_list_size
196                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
197    settings->es_ua              = LSQUIC_DF_UA;
198
199    settings->es_pdmd            = QTAG_X509;
200    settings->es_aead            = QTAG_AESG;
201    settings->es_kexs            = QTAG_C255;
202    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
203    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
204    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
205    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
206    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
207    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
208    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
209    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
210}
211
212
213/* Note: if returning an error, err_buf must be valid if non-NULL */
214int
215lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
216                              unsigned flags,
217                              char *err_buf, size_t err_buf_sz)
218{
219    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
220        settings->es_sfcw < LSQUIC_MIN_FCW)
221    {
222        if (err_buf)
223            snprintf(err_buf, err_buf_sz, "%s",
224                                            "flow control window set too low");
225        return -1;
226    }
227    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
228    {
229        if (err_buf)
230            snprintf(err_buf, err_buf_sz, "%s",
231                        "No supported QUIC versions specified");
232        return -1;
233    }
234    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
235    {
236        if (err_buf)
237            snprintf(err_buf, err_buf_sz, "%s",
238                        "one or more unsupported QUIC version is specified");
239        return -1;
240    }
241    return 0;
242}
243
244
245static void
246free_packet (void *ctx, unsigned char *packet_data)
247{
248    free(packet_data);
249}
250
251
252static void *
253malloc_buf (void *ctx, size_t size)
254{
255    return malloc(size);
256}
257
258
259static const struct lsquic_packout_mem_if stock_pmi =
260{
261    malloc_buf, (void(*)(void *, void *)) free_packet,
262};
263
264
265static int
266hash_conns_by_addr (const struct lsquic_engine *engine)
267{
268    if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS)
269        return 1;
270    if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS)
271                                && engine->pub.enp_settings.es_support_tcid0)
272        return 1;
273    return 0;
274}
275
276
277lsquic_engine_t *
278lsquic_engine_new (unsigned flags,
279                   const struct lsquic_engine_api *api)
280{
281    lsquic_engine_t *engine;
282    int tag_buf_len;
283    char err_buf[100];
284
285    if (!api->ea_packets_out)
286    {
287        LSQ_ERROR("packets_out callback is not specified");
288        return NULL;
289    }
290
291    if (api->ea_settings &&
292                0 != lsquic_engine_check_settings(api->ea_settings, flags,
293                                                    err_buf, sizeof(err_buf)))
294    {
295        LSQ_ERROR("cannot create engine: %s", err_buf);
296        return NULL;
297    }
298
299    engine = calloc(1, sizeof(*engine));
300    if (!engine)
301        return NULL;
302    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
303    {
304        free(engine);
305        return NULL;
306    }
307    if (api->ea_settings)
308        engine->pub.enp_settings        = *api->ea_settings;
309    else
310        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
311    tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf,
312                                    sizeof(engine->pub.enp_ver_tags_buf),
313                                    engine->pub.enp_settings.es_versions);
314    if (tag_buf_len <= 0)
315    {
316        LSQ_ERROR("cannot generate version tags buffer");
317        free(engine);
318        return NULL;
319    }
320    engine->pub.enp_ver_tags_len = tag_buf_len;
321    engine->pub.enp_flags = ENPUB_CAN_SEND;
322
323    engine->flags           = flags;
324    engine->stream_if       = api->ea_stream_if;
325    engine->stream_if_ctx   = api->ea_stream_if_ctx;
326    engine->packets_out     = api->ea_packets_out;
327    engine->packets_out_ctx = api->ea_packets_out_ctx;
328    if (api->ea_pmi)
329    {
330        engine->pub.enp_pmi      = api->ea_pmi;
331        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
332    }
333    else
334    {
335        engine->pub.enp_pmi      = &stock_pmi;
336        engine->pub.enp_pmi_ctx  = NULL;
337    }
338    engine->pub.enp_verify_cert  = api->ea_verify_cert;
339    engine->pub.enp_verify_ctx   = api->ea_verify_ctx;
340    engine->pub.enp_engine = engine;
341    conn_hash_init(&engine->conns_hash,
342                        hash_conns_by_addr(engine) ?  CHF_USE_ADDR : 0);
343    engine->attq = attq_create();
344    eng_hist_init(&engine->history);
345    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
346
347
348    LSQ_INFO("instantiated engine");
349    return engine;
350}
351
352
353static void
354grow_batch_size (struct lsquic_engine *engine)
355{
356    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
357}
358
359
360static void
361shrink_batch_size (struct lsquic_engine *engine)
362{
363    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
364}
365
366
367/* Wrapper to make sure important things occur before the connection is
368 * really destroyed.
369 */
370static void
371destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
372{
373    --engine->n_conns;
374    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
375    conn->cn_if->ci_destroy(conn);
376}
377
378
379static int
380maybe_grow_conn_heaps (struct lsquic_engine *engine)
381{
382    struct min_heap_elem *els;
383    unsigned count;
384
385    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
386        return 0;   /* Nothing to do */
387
388    if (lsquic_mh_nalloc(&engine->conns_tickable))
389        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
390    else
391        count = 8;
392
393    els = malloc(sizeof(els[0]) * count);
394    if (!els)
395    {
396        LSQ_ERROR("%s: malloc failed", __func__);
397        return -1;
398    }
399
400    LSQ_DEBUG("grew heaps to %u elements", count / 2);
401    memcpy(&els[0], engine->conns_tickable.mh_elems,
402                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
403    memcpy(&els[count / 2], engine->conns_out.mh_elems,
404                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
405    free(engine->conns_tickable.mh_elems);
406    engine->conns_tickable.mh_elems = els;
407    engine->conns_out.mh_elems = &els[count / 2];
408    engine->conns_tickable.mh_nalloc = count / 2;
409    engine->conns_out.mh_nalloc = count / 2;
410    return 0;
411}
412
413
414static lsquic_conn_t *
415new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
416                      unsigned short max_packet_size)
417{
418    lsquic_conn_t *conn;
419    unsigned flags;
420    if (0 != maybe_grow_conn_heaps(engine))
421        return NULL;
422    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
423    conn = full_conn_client_new(&engine->pub, engine->stream_if,
424                    engine->stream_if_ctx, flags, hostname, max_packet_size);
425    if (!conn)
426        return NULL;
427    ++engine->n_conns;
428    return conn;
429}
430
431
432static lsquic_conn_t *
433find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
434         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
435{
436    lsquic_conn_t *conn;
437
438    if (conn_hash_using_addr(&engine->conns_hash))
439        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
440    else if (packet_in->pi_flags & PI_CONN_ID)
441        conn = conn_hash_find_by_cid(&engine->conns_hash,
442                                                    packet_in->pi_conn_id);
443    else
444    {
445        LSQ_DEBUG("packet header does not have connection ID: discarding");
446        return NULL;
447    }
448
449    if (!conn)
450        return NULL;
451
452    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
453    if ((packet_in->pi_flags & PI_CONN_ID)
454        && conn->cn_cid != packet_in->pi_conn_id)
455    {
456        LSQ_DEBUG("connection IDs do not match");
457        return NULL;
458    }
459
460    return conn;
461}
462
463
464#if !defined(NDEBUG) && __GNUC__
465__attribute__((weak))
466#endif
467void
468lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
469                                    lsquic_conn_t *conn)
470{
471    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
472        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
473    {
474        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
475        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
476        engine_incref_conn(conn, LSCONN_TICKABLE);
477    }
478}
479
480
481void
482lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
483                                lsquic_conn_t *conn, lsquic_time_t tick_time)
484{
485    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
486    if (conn->cn_flags & LSCONN_TICKABLE)
487    {
488        /* Optimization: no need to add the connection to the Advisory Tick
489         * Time Queue: it is about to be ticked, after which it its next tick
490         * time may be queried again.
491         */;
492    }
493    else if (conn->cn_flags & LSCONN_ATTQ)
494    {
495        if (lsquic_conn_adv_time(conn) != tick_time)
496        {
497            attq_remove(engine->attq, conn);
498            if (0 != attq_add(engine->attq, conn, tick_time))
499                engine_decref_conn(engine, conn, LSCONN_ATTQ);
500        }
501    }
502    else if (0 == attq_add(engine->attq, conn, tick_time))
503        engine_incref_conn(conn, LSCONN_ATTQ);
504}
505
506
507/* Return 0 if packet is being processed by a connections, otherwise return 1 */
508static int
509process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
510       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
511       const struct sockaddr *sa_peer, void *peer_ctx)
512{
513    lsquic_conn_t *conn;
514
515    if (lsquic_packet_in_is_gquic_prst(packet_in)
516                                && !engine->pub.enp_settings.es_honor_prst)
517    {
518        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
519        LSQ_DEBUG("public reset packet: discarding");
520        return 1;
521    }
522
523    conn = find_conn(engine, packet_in, ppstate, sa_local);
524
525    if (!conn)
526    {
527        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
528        return 1;
529    }
530
531    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
532    {
533        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
534        engine_incref_conn(conn, LSCONN_TICKABLE);
535    }
536    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
537    lsquic_packet_in_upref(packet_in);
538    conn->cn_peer_ctx = peer_ctx;
539    conn->cn_if->ci_packet_in(conn, packet_in);
540    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
541    return 0;
542}
543
544
545void
546lsquic_engine_destroy (lsquic_engine_t *engine)
547{
548    lsquic_conn_t *conn;
549
550    LSQ_DEBUG("destroying engine");
551#ifndef NDEBUG
552    engine->flags |= ENG_DTOR;
553#endif
554
555    while ((conn = lsquic_mh_pop(&engine->conns_out)))
556    {
557        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
558        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
559    }
560
561    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
562    {
563        assert(conn->cn_flags & LSCONN_TICKABLE);
564        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
565    }
566
567    for (conn = conn_hash_first(&engine->conns_hash); conn;
568                            conn = conn_hash_next(&engine->conns_hash))
569        force_close_conn(engine, conn);
570    conn_hash_cleanup(&engine->conns_hash);
571
572    assert(0 == engine->n_conns);
573    attq_destroy(engine->attq);
574
575    assert(0 == lsquic_mh_count(&engine->conns_out));
576    assert(0 == lsquic_mh_count(&engine->conns_tickable));
577    lsquic_mm_cleanup(&engine->pub.enp_mm);
578    free(engine->conns_tickable.mh_elems);
579    free(engine);
580}
581
582
583lsquic_conn_t *
584lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
585                       const struct sockaddr *peer_sa,
586                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
587                       const char *hostname, unsigned short max_packet_size)
588{
589    lsquic_conn_t *conn;
590    ENGINE_IN(engine);
591
592    if (engine->flags & ENG_SERVER)
593    {
594        LSQ_ERROR("`%s' must only be called in client mode", __func__);
595        goto err;
596    }
597
598    if (conn_hash_using_addr(&engine->conns_hash)
599                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
600    {
601        LSQ_ERROR("cannot have more than one connection on the same port");
602        goto err;
603    }
604
605    if (0 == max_packet_size)
606    {
607        switch (peer_sa->sa_family)
608        {
609        case AF_INET:
610            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
611            break;
612        default:
613            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
614            break;
615        }
616    }
617
618    conn = new_full_conn_client(engine, hostname, max_packet_size);
619    if (!conn)
620        goto err;
621    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
622    if (0 != conn_hash_add(&engine->conns_hash, conn))
623    {
624        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
625            conn->cn_cid);
626        destroy_conn(engine, conn);
627        goto err;
628    }
629    assert(!(conn->cn_flags &
630        (CONN_REF_FLAGS
631         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
632                                 callbacks */
633                             )));
634    conn->cn_flags |= LSCONN_HASHED;
635    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
636    engine_incref_conn(conn, LSCONN_TICKABLE);
637    conn->cn_peer_ctx = peer_ctx;
638    lsquic_conn_set_ctx(conn, conn_ctx);
639    full_conn_client_call_on_new(conn);
640  end:
641    ENGINE_OUT(engine);
642    return conn;
643  err:
644    conn = NULL;
645    goto end;
646}
647
648
649static void
650remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
651{
652    conn_hash_remove(&engine->conns_hash, conn);
653    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
654}
655
656
657static void
658refflags2str (enum lsquic_conn_flags flags, char s[6])
659{
660    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
661    *s = 'H'; s += !!(flags & LSCONN_HASHED);
662    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
663    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
664    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
665    *s = 'K'; s += !!(flags & LSCONN_TICKED);
666    *s = '\0';
667}
668
669
670static void
671engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
672{
673    char str[2][7];
674    assert(flag & CONN_REF_FLAGS);
675    assert(!(conn->cn_flags & flag));
676    conn->cn_flags |= flag;
677    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
678                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
679                    (refflags2str(conn->cn_flags, str[1]), str[1]));
680}
681
682
683static lsquic_conn_t *
684engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
685                                        enum lsquic_conn_flags flags)
686{
687    char str[2][7];
688    assert(flags & CONN_REF_FLAGS);
689    assert(conn->cn_flags & flags);
690#ifndef NDEBUG
691    if (flags & LSCONN_CLOSING)
692        assert(0 == (conn->cn_flags & LSCONN_HASHED));
693#endif
694    conn->cn_flags &= ~flags;
695    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
696                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
697                    (refflags2str(conn->cn_flags, str[1]), str[1]));
698    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
699    {
700        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
701        destroy_conn(engine, conn);
702        return NULL;
703    }
704    else
705        return conn;
706}
707
708
709/* This is not a general-purpose function.  Only call from engine dtor. */
710static void
711force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
712{
713    assert(engine->flags & ENG_DTOR);
714    const enum lsquic_conn_flags flags = conn->cn_flags;
715    assert(conn->cn_flags & CONN_REF_FLAGS);
716    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
717    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
718    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
719    if (flags & LSCONN_ATTQ)
720    {
721        attq_remove(engine->attq, conn);
722        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
723    }
724    if (flags & LSCONN_HASHED)
725        remove_conn_from_hash(engine, conn);
726}
727
728
729/* Iterator for tickable connections (those on the Tickable Queue).  Before
730 * a connection is returned, it is removed from the Advisory Tick Time queue
731 * if necessary.
732 */
733static lsquic_conn_t *
734conn_iter_next_tickable (struct lsquic_engine *engine)
735{
736    lsquic_conn_t *conn;
737
738    conn = lsquic_mh_pop(&engine->conns_tickable);
739
740    if (conn)
741        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
742    if (conn && (conn->cn_flags & LSCONN_ATTQ))
743    {
744        attq_remove(engine->attq, conn);
745        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
746    }
747
748    return conn;
749}
750
751
752void
753lsquic_engine_process_conns (lsquic_engine_t *engine)
754{
755    lsquic_conn_t *conn;
756    lsquic_time_t now;
757
758    ENGINE_IN(engine);
759
760    now = lsquic_time_now();
761    while ((conn = attq_pop(engine->attq, now)))
762    {
763        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
764        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
765        {
766            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
767            engine_incref_conn(conn, LSCONN_TICKABLE);
768        }
769    }
770
771    process_connections(engine, conn_iter_next_tickable, now);
772    ENGINE_OUT(engine);
773}
774
775
776static ssize_t
777really_encrypt_packet (const lsquic_conn_t *conn,
778                       struct lsquic_packet_out *packet_out,
779                       unsigned char *buf, size_t bufsz)
780{
781    int header_sz, is_hello_packet;
782    enum enc_level enc_level;
783    size_t packet_sz;
784    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
785
786    header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out,
787                                            header_buf, sizeof(header_buf));
788    if (header_sz < 0)
789        return -1;
790
791    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
792    enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session,
793                conn->cn_version, 0,
794                packet_out->po_packno, header_buf, header_sz,
795                packet_out->po_data, packet_out->po_data_sz,
796                buf, bufsz, &packet_sz, is_hello_packet);
797    if ((int) enc_level >= 0)
798    {
799        lsquic_packet_out_set_enc_level(packet_out, enc_level);
800        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, "
801            "ciphertext is %zd bytes",
802            packet_out->po_packno,
803            conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
804                                                packet_out->po_data_sz,
805            packet_sz);
806        return packet_sz;
807    }
808    else
809        return -1;
810}
811
812
813static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
814encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
815                                            lsquic_packet_out_t *packet_out)
816{
817    ssize_t enc_sz;
818    size_t bufsz;
819    unsigned sent_sz;
820    unsigned char *buf;
821
822    bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
823                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
824    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
825    if (!buf)
826    {
827        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
828                                                                        bufsz);
829        return ENCPA_NOMEM;
830    }
831
832    {
833        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
834        sent_sz = enc_sz;
835    }
836
837    if (enc_sz < 0)
838    {
839        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
840        return ENCPA_BADCRYPT;
841    }
842
843    packet_out->po_enc_data    = buf;
844    packet_out->po_enc_data_sz = enc_sz;
845    packet_out->po_sent_sz     = sent_sz;
846    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ;
847
848    return ENCPA_OK;
849}
850
851
852STAILQ_HEAD(conns_stailq, lsquic_conn);
853TAILQ_HEAD(conns_tailq, lsquic_conn);
854
855
856struct conns_out_iter
857{
858    struct min_heap            *coi_heap;
859    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
860                                coi_inactive_list;
861    lsquic_conn_t              *coi_next;
862#ifndef NDEBUG
863    lsquic_time_t               coi_last_sent;
864#endif
865};
866
867
868static void
869coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
870{
871    iter->coi_heap = &engine->conns_out;
872    iter->coi_next = NULL;
873    TAILQ_INIT(&iter->coi_active_list);
874    TAILQ_INIT(&iter->coi_inactive_list);
875#ifndef NDEBUG
876    iter->coi_last_sent = 0;
877#endif
878}
879
880
881static lsquic_conn_t *
882coi_next (struct conns_out_iter *iter)
883{
884    lsquic_conn_t *conn;
885
886    if (lsquic_mh_count(iter->coi_heap) > 0)
887    {
888        conn = lsquic_mh_pop(iter->coi_heap);
889        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
890        conn->cn_flags |= LSCONN_COI_ACTIVE;
891#ifndef NDEBUG
892        if (iter->coi_last_sent)
893            assert(iter->coi_last_sent <= conn->cn_last_sent);
894        iter->coi_last_sent = conn->cn_last_sent;
895#endif
896        return conn;
897    }
898    else if (!TAILQ_EMPTY(&iter->coi_active_list))
899    {
900        conn = iter->coi_next;
901        if (!conn)
902            conn = TAILQ_FIRST(&iter->coi_active_list);
903        if (conn)
904            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
905        return conn;
906    }
907    else
908        return NULL;
909}
910
911
912static void
913coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
914{
915    if (!(conn->cn_flags & LSCONN_EVANESCENT))
916    {
917        assert(!TAILQ_EMPTY(&iter->coi_active_list));
918        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
919        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
920        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
921        conn->cn_flags |= LSCONN_COI_INACTIVE;
922    }
923}
924
925
926static void
927coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
928{
929    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
930    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
931    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
932    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
933    conn->cn_flags |= LSCONN_COI_ACTIVE;
934}
935
936
937static void
938coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
939{
940    lsquic_conn_t *conn;
941    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
942    {
943        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
944        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
945        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
946    }
947    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
948    {
949        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
950        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
951        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
952    }
953}
954
955
956static unsigned
957send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
958                  struct out_batch *batch, unsigned n_to_send)
959{
960    int n_sent, i;
961    lsquic_time_t now;
962
963    /* Set sent time before the write to avoid underestimating RTT */
964    now = lsquic_time_now();
965    for (i = 0; i < (int) n_to_send; ++i)
966        batch->packets[i]->po_sent = now;
967    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
968                                                                n_to_send);
969    if (n_sent >= 0)
970        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
971    else
972    {
973        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
974        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
975        EV_LOG_GENERIC_EVENT("cannot send packets");
976        n_sent = 0;
977    }
978    if (n_sent > 0)
979        engine->last_sent = now + n_sent;
980    for (i = 0; i < n_sent; ++i)
981    {
982        eng_hist_inc(&engine->history, now, sl_packets_out);
983        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
984        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
985                                                    batch->packets[i]);
986        /* `i' is added to maintain relative order */
987        batch->conns[i]->cn_last_sent = now + i;
988        /* Release packet out buffer as soon as the packet is sent
989         * successfully.  If not successfully sent, we hold on to
990         * this buffer until the packet sending is attempted again
991         * or until it times out and regenerated.
992         */
993        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
994        {
995            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
996            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
997                                                batch->packets[i]->po_enc_data);
998            batch->packets[i]->po_enc_data = NULL;  /* JIC */
999        }
1000    }
1001    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1002        for ( ; i < (int) n_to_send; ++i)
1003            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1004    /* Return packets to the connection in reverse order so that the packet
1005     * ordering is maintained.
1006     */
1007    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1008    {
1009        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1010                                                    batch->packets[i]);
1011        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1012            coi_reactivate(conns_iter, batch->conns[i]);
1013    }
1014    return n_sent;
1015}
1016
1017
1018/* Return 1 if went past deadline, 0 otherwise */
1019static int
1020check_deadline (lsquic_engine_t *engine)
1021{
1022    if (engine->pub.enp_settings.es_proc_time_thresh &&
1023                                lsquic_time_now() > engine->deadline)
1024    {
1025        LSQ_INFO("went past threshold of %u usec, stop sending",
1026                            engine->pub.enp_settings.es_proc_time_thresh);
1027        engine->flags |= ENG_PAST_DEADLINE;
1028        return 1;
1029    }
1030    else
1031        return 0;
1032}
1033
1034
1035static void
1036send_packets_out (struct lsquic_engine *engine,
1037                  struct conns_tailq *ticked_conns,
1038                  struct conns_stailq *closed_conns)
1039{
1040    unsigned n, w, n_sent, n_batches_sent;
1041    lsquic_packet_out_t *packet_out;
1042    lsquic_conn_t *conn;
1043    struct out_batch *const batch = &engine->out_batch;
1044    struct conns_out_iter conns_iter;
1045    int shrink, deadline_exceeded;
1046
1047    coi_init(&conns_iter, engine);
1048    n_batches_sent = 0;
1049    n_sent = 0, n = 0;
1050    shrink = 0;
1051    deadline_exceeded = 0;
1052
1053    while ((conn = coi_next(&conns_iter)))
1054    {
1055        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1056        if (!packet_out) {
1057            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1058                                                            conn->cn_cid);
1059            coi_deactivate(&conns_iter, conn);
1060            continue;
1061        }
1062        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1063        {
1064            switch (encrypt_packet(engine, conn, packet_out))
1065            {
1066            case ENCPA_NOMEM:
1067                /* Send what we have and wait for a more opportune moment */
1068                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1069                goto end_for;
1070            case ENCPA_BADCRYPT:
1071                /* This is pretty bad: close connection immediately */
1072                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1073                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1074                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1075                {
1076                    if (!(conn->cn_flags & LSCONN_CLOSING))
1077                    {
1078                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1079                        engine_incref_conn(conn, LSCONN_CLOSING);
1080                        if (conn->cn_flags & LSCONN_HASHED)
1081                            remove_conn_from_hash(engine, conn);
1082                    }
1083                    coi_deactivate(&conns_iter, conn);
1084                    if (conn->cn_flags & LSCONN_TICKED)
1085                    {
1086                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1087                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1088                    }
1089                }
1090                continue;
1091            case ENCPA_OK:
1092                break;
1093            }
1094        }
1095        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1096                                        packet_out->po_packno, conn->cn_cid);
1097        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1098        if (packet_out->po_flags & PO_ENCRYPTED)
1099        {
1100            batch->outs[n].buf     = packet_out->po_enc_data;
1101            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1102        }
1103        else
1104        {
1105            batch->outs[n].buf     = packet_out->po_data;
1106            batch->outs[n].sz      = packet_out->po_data_sz;
1107        }
1108        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1109        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1110        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1111        batch->conns  [n]          = conn;
1112        batch->packets[n]          = packet_out;
1113        ++n;
1114        if (n == engine->batch_size)
1115        {
1116            n = 0;
1117            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1118            ++n_batches_sent;
1119            n_sent += w;
1120            if (w < engine->batch_size)
1121            {
1122                shrink = 1;
1123                break;
1124            }
1125            deadline_exceeded = check_deadline(engine);
1126            if (deadline_exceeded)
1127                break;
1128            grow_batch_size(engine);
1129        }
1130    }
1131  end_for:
1132
1133    if (n > 0) {
1134        w = send_batch(engine, &conns_iter, batch, n);
1135        n_sent += w;
1136        shrink = w < n;
1137        ++n_batches_sent;
1138        deadline_exceeded = check_deadline(engine);
1139    }
1140
1141    if (shrink)
1142        shrink_batch_size(engine);
1143    else if (n_batches_sent > 1 && !deadline_exceeded)
1144        grow_batch_size(engine);
1145
1146    coi_reheap(&conns_iter, engine);
1147
1148    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1149}
1150
1151
1152int
1153lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1154{
1155    return lsquic_mh_count(&engine->conns_out) > 0
1156    ;
1157}
1158
1159
1160static void
1161reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1162{
1163    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1164    engine->flags &= ~ENG_PAST_DEADLINE;
1165}
1166
1167
1168/* TODO: this is a user-facing function, account for load */
1169void
1170lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1171{
1172    lsquic_conn_t *conn;
1173    struct conns_stailq closed_conns;
1174    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1175
1176    STAILQ_INIT(&closed_conns);
1177    reset_deadline(engine, lsquic_time_now());
1178    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1179    {
1180        LSQ_DEBUG("can send again");
1181        EV_LOG_GENERIC_EVENT("can send again");
1182        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1183    }
1184
1185    send_packets_out(engine, &ticked_conns, &closed_conns);
1186
1187    while ((conn = STAILQ_FIRST(&closed_conns))) {
1188        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1189        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1190    }
1191
1192}
1193
1194
1195static void
1196process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1197                     lsquic_time_t now)
1198{
1199    lsquic_conn_t *conn;
1200    enum tick_st tick_st;
1201    unsigned i;
1202    lsquic_time_t next_tick_time;
1203    struct conns_stailq closed_conns;
1204    struct conns_tailq ticked_conns;
1205
1206    eng_hist_tick(&engine->history, now);
1207
1208    STAILQ_INIT(&closed_conns);
1209    TAILQ_INIT(&ticked_conns);
1210    reset_deadline(engine, now);
1211
1212    i = 0;
1213    while ((conn = next_conn(engine))
1214          )
1215    {
1216        tick_st = conn->cn_if->ci_tick(conn, now);
1217        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1218        if (tick_st & TICK_SEND)
1219        {
1220            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1221            {
1222                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1223                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1224            }
1225        }
1226        if (tick_st & TICK_CLOSE)
1227        {
1228            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1229            engine_incref_conn(conn, LSCONN_CLOSING);
1230            if (conn->cn_flags & LSCONN_HASHED)
1231                remove_conn_from_hash(engine, conn);
1232        }
1233        else
1234        {
1235            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1236            engine_incref_conn(conn, LSCONN_TICKED);
1237        }
1238    }
1239
1240    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1241                        && lsquic_engine_has_unsent_packets(engine))
1242        send_packets_out(engine, &ticked_conns, &closed_conns);
1243
1244    while ((conn = STAILQ_FIRST(&closed_conns))) {
1245        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1246        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1247    }
1248
1249    /* TODO Heapification can be optimized by switching to the Floyd method:
1250     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1251     */
1252    while ((conn = TAILQ_FIRST(&ticked_conns)))
1253    {
1254        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1255        engine_decref_conn(engine, conn, LSCONN_TICKED);
1256        if (!(conn->cn_flags & LSCONN_TICKABLE)
1257            && conn->cn_if->ci_is_tickable(conn))
1258        {
1259            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1260            engine_incref_conn(conn, LSCONN_TICKABLE);
1261        }
1262        else if (!(conn->cn_flags & LSCONN_ATTQ))
1263        {
1264            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1265            if (next_tick_time)
1266            {
1267                if (0 == attq_add(engine->attq, conn, next_tick_time))
1268                    engine_incref_conn(conn, LSCONN_ATTQ);
1269            }
1270            else
1271                assert(0);
1272        }
1273    }
1274
1275}
1276
1277
1278/* Return 0 if packet is being processed by a real connection, 1 if the
1279 * packet was processed, but not by a connection, and -1 on error.
1280 */
1281int
1282lsquic_engine_packet_in (lsquic_engine_t *engine,
1283    const unsigned char *packet_in_data, size_t packet_in_size,
1284    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1285    void *peer_ctx)
1286{
1287    struct packin_parse_state ppstate;
1288    lsquic_packet_in_t *packet_in;
1289    int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length,
1290                                int is_server, struct packin_parse_state *);
1291
1292    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1293    {
1294        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1295            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1296        errno = E2BIG;
1297        return -1;
1298    }
1299
1300    if (conn_hash_using_addr(&engine->conns_hash))
1301    {
1302        const struct lsquic_conn *conn;
1303        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
1304        if (!conn)
1305            return -1;
1306        if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS)
1307            parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin;
1308        else
1309            parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin;
1310    }
1311    else
1312        parse_packet_in_begin = lsquic_parse_packet_in_begin;
1313
1314    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1315    if (!packet_in)
1316        return -1;
1317
1318    /* Library does not modify packet_in_data, it is not referenced after
1319     * this function returns and subsequent release of pi_data is guarded
1320     * by PI_OWN_DATA flag.
1321     */
1322    packet_in->pi_data = (unsigned char *) packet_in_data;
1323    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1324                                        engine->flags & ENG_SERVER, &ppstate))
1325    {
1326        LSQ_DEBUG("Cannot parse incoming packet's header");
1327        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1328        errno = EINVAL;
1329        return -1;
1330    }
1331
1332    packet_in->pi_received = lsquic_time_now();
1333    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1334    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1335                                                                    peer_ctx);
1336}
1337
1338
1339#if __GNUC__ && !defined(NDEBUG)
1340__attribute__((weak))
1341#endif
1342unsigned
1343lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1344{
1345    return engine->pub.enp_settings.es_versions;
1346}
1347
1348
1349int
1350lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1351{
1352    const lsquic_time_t *next_time;
1353    lsquic_time_t now;
1354
1355    if (((engine->flags & ENG_PAST_DEADLINE)
1356                                    && lsquic_mh_count(&engine->conns_out))
1357        || lsquic_mh_count(&engine->conns_tickable))
1358    {
1359        *diff = 0;
1360        return 1;
1361    }
1362
1363    next_time = attq_next_time(engine->attq);
1364    if (!next_time)
1365        return 0;
1366
1367    now = lsquic_time_now();
1368    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1369    return 1;
1370}
1371
1372
1373unsigned
1374lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1375{
1376    lsquic_time_t now;
1377    now = lsquic_time_now();
1378    if (from_now < 0)
1379        now -= from_now;
1380    else
1381        now += from_now;
1382    return attq_count_before(engine->attq, now);
1383}
1384