lsquic_engine.c revision cd7bc383
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <time.h>
15#ifndef WIN32
16#include <sys/time.h>
17#include <netinet/in.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <fcntl.h>
21#include <unistd.h>
22#include <netdb.h>
23#endif
24
25
26
27#include "lsquic.h"
28#include "lsquic_types.h"
29#include "lsquic_alarmset.h"
30#include "lsquic_parse.h"
31#include "lsquic_packet_in.h"
32#include "lsquic_packet_out.h"
33#include "lsquic_senhist.h"
34#include "lsquic_rtt.h"
35#include "lsquic_cubic.h"
36#include "lsquic_pacer.h"
37#include "lsquic_send_ctl.h"
38#include "lsquic_set.h"
39#include "lsquic_conn_flow.h"
40#include "lsquic_sfcw.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn.h"
43#include "lsquic_full_conn.h"
44#include "lsquic_util.h"
45#include "lsquic_qtags.h"
46#include "lsquic_str.h"
47#include "lsquic_handshake.h"
48#include "lsquic_mm.h"
49#include "lsquic_conn_hash.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_eng_hist.h"
52#include "lsquic_ev_log.h"
53#include "lsquic_version.h"
54#include "lsquic_hash.h"
55#include "lsquic_attq.h"
56#include "lsquic_min_heap.h"
57
58#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
59#include "lsquic_logger.h"
60
61
62/* The batch of outgoing packets grows and shrinks dynamically */
63#define MAX_OUT_BATCH_SIZE 1024
64#define MIN_OUT_BATCH_SIZE 256
65#define INITIAL_OUT_BATCH_SIZE 512
66
67struct out_batch
68{
69    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
70    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
71    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
72};
73
74typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
75
76static void
77process_connections (struct lsquic_engine *engine, conn_iter_f iter,
78                     lsquic_time_t now);
79
80static void
81engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
82
83static lsquic_conn_t *
84engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
85                                        enum lsquic_conn_flags flag);
86
87static void
88force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
89
90/* Nested calls to LSQUIC are not supported */
91#define ENGINE_IN(e) do {                               \
92    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
93    (e)->pub.enp_flags |= ENPUB_PROC;                   \
94} while (0)
95
96#define ENGINE_OUT(e) do {                              \
97    assert((e)->pub.enp_flags & ENPUB_PROC);            \
98    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
99} while (0)
100
101/* A connection can be referenced from one of six places:
102 *
103 *   1. Connection hash: a connection starts its life in one of those.
104 *
105 *   2. Outgoing queue.
106 *
107 *   3. Tickable queue
108 *
109 *   4. Advisory Tick Time queue.
110 *
111 *   5. Closing connections queue.  This is a transient queue -- it only
112 *      exists for the duration of process_connections() function call.
113 *
114 *   6. Ticked connections queue.  Another transient queue, similar to (5).
115 *
116 * The idea is to destroy the connection when it is no longer referenced.
117 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
118 * that case, the connection is referenced from two places: (2) and (5).
119 * After its packets are sent, it is only referenced in (5), and at the
120 * end of the function call, when it is removed from (5), reference count
121 * goes to zero and the connection is destroyed.  If not all packets can
122 * be sent, at the end of the function call, the connection is referenced
123 * by (2) and will only be removed once all outgoing packets have been
124 * sent.
125 */
126#define CONN_REF_FLAGS  (LSCONN_HASHED          \
127                        |LSCONN_HAS_OUTGOING    \
128                        |LSCONN_TICKABLE        \
129                        |LSCONN_TICKED          \
130                        |LSCONN_CLOSING         \
131                        |LSCONN_ATTQ)
132
133
134
135
136struct lsquic_engine
137{
138    struct lsquic_engine_public        pub;
139    enum {
140        ENG_SERVER      = LSENG_SERVER,
141        ENG_HTTP        = LSENG_HTTP,
142        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
143        ENG_PAST_DEADLINE
144                        = (1 <<  8),    /* Previous call to a processing
145                                         * function went past time threshold.
146                                         */
147#ifndef NDEBUG
148        ENG_DTOR        = (1 << 26),    /* Engine destructor */
149#endif
150    }                                  flags;
151    const struct lsquic_stream_if     *stream_if;
152    void                              *stream_if_ctx;
153    lsquic_packets_out_f               packets_out;
154    void                              *packets_out_ctx;
155    void                              *bad_handshake_ctx;
156    struct conn_hash                   conns_hash;
157    struct min_heap                    conns_tickable;
158    struct min_heap                    conns_out;
159    struct eng_hist                    history;
160    unsigned                           batch_size;
161    struct attq                       *attq;
162    /* Track time last time a packet was sent to give new connections
163     * priority lower than that of existing connections.
164     */
165    lsquic_time_t                      last_sent;
166    unsigned                           n_conns;
167    lsquic_time_t                      deadline;
168    struct out_batch                   out_batch;
169};
170
171
172void
173lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
174                             unsigned flags)
175{
176    memset(settings, 0, sizeof(*settings));
177    settings->es_versions        = LSQUIC_DF_VERSIONS;
178    if (flags & ENG_SERVER)
179    {
180        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
181        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
182        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
183    }
184    else
185    {
186        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
187        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
188        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
189    }
190    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
191    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
192    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
193    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
194    settings->es_max_header_list_size
195                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
196    settings->es_ua              = LSQUIC_DF_UA;
197
198    settings->es_pdmd            = QTAG_X509;
199    settings->es_aead            = QTAG_AESG;
200    settings->es_kexs            = QTAG_C255;
201    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
202    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
203    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
204    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
205    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
206    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
207    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
208    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
209}
210
211
212/* Note: if returning an error, err_buf must be valid if non-NULL */
213int
214lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
215                              unsigned flags,
216                              char *err_buf, size_t err_buf_sz)
217{
218    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
219        settings->es_sfcw < LSQUIC_MIN_FCW)
220    {
221        if (err_buf)
222            snprintf(err_buf, err_buf_sz, "%s",
223                                            "flow control window set too low");
224        return -1;
225    }
226    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
227    {
228        if (err_buf)
229            snprintf(err_buf, err_buf_sz, "%s",
230                        "No supported QUIC versions specified");
231        return -1;
232    }
233    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
234    {
235        if (err_buf)
236            snprintf(err_buf, err_buf_sz, "%s",
237                        "one or more unsupported QUIC version is specified");
238        return -1;
239    }
240    return 0;
241}
242
243
244static void
245free_packet (void *ctx, unsigned char *packet_data)
246{
247    free(packet_data);
248}
249
250
251static void *
252malloc_buf (void *ctx, size_t size)
253{
254    return malloc(size);
255}
256
257
258static const struct lsquic_packout_mem_if stock_pmi =
259{
260    malloc_buf, (void(*)(void *, void *)) free_packet,
261};
262
263
264lsquic_engine_t *
265lsquic_engine_new (unsigned flags,
266                   const struct lsquic_engine_api *api)
267{
268    lsquic_engine_t *engine;
269    int tag_buf_len;
270    char err_buf[100];
271
272    if (!api->ea_packets_out)
273    {
274        LSQ_ERROR("packets_out callback is not specified");
275        return NULL;
276    }
277
278    if (api->ea_settings &&
279                0 != lsquic_engine_check_settings(api->ea_settings, flags,
280                                                    err_buf, sizeof(err_buf)))
281    {
282        LSQ_ERROR("cannot create engine: %s", err_buf);
283        return NULL;
284    }
285
286    engine = calloc(1, sizeof(*engine));
287    if (!engine)
288        return NULL;
289    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
290    {
291        free(engine);
292        return NULL;
293    }
294    if (api->ea_settings)
295        engine->pub.enp_settings        = *api->ea_settings;
296    else
297        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
298    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
299                                    sizeof(engine->pub.enp_ver_tags_buf),
300                                    engine->pub.enp_settings.es_versions);
301    if (tag_buf_len <= 0)
302    {
303        LSQ_ERROR("cannot generate version tags buffer");
304        free(engine);
305        return NULL;
306    }
307    engine->pub.enp_ver_tags_len = tag_buf_len;
308    engine->pub.enp_flags = ENPUB_CAN_SEND;
309
310    engine->flags           = flags;
311    engine->stream_if       = api->ea_stream_if;
312    engine->stream_if_ctx   = api->ea_stream_if_ctx;
313    engine->packets_out     = api->ea_packets_out;
314    engine->packets_out_ctx = api->ea_packets_out_ctx;
315    if (api->ea_pmi)
316    {
317        engine->pub.enp_pmi      = api->ea_pmi;
318        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
319    }
320    else
321    {
322        engine->pub.enp_pmi      = &stock_pmi;
323        engine->pub.enp_pmi_ctx  = NULL;
324    }
325    engine->pub.enp_engine = engine;
326    conn_hash_init(&engine->conns_hash,
327        !(flags & ENG_SERVER) && engine->pub.enp_settings.es_support_tcid0 ?
328        CHF_USE_ADDR : 0);
329    engine->attq = attq_create();
330    eng_hist_init(&engine->history);
331    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
332
333
334    LSQ_INFO("instantiated engine");
335    return engine;
336}
337
338
339static void
340grow_batch_size (struct lsquic_engine *engine)
341{
342    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
343}
344
345
346static void
347shrink_batch_size (struct lsquic_engine *engine)
348{
349    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
350}
351
352
353/* Wrapper to make sure important things occur before the connection is
354 * really destroyed.
355 */
356static void
357destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
358{
359    --engine->n_conns;
360    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
361    conn->cn_if->ci_destroy(conn);
362}
363
364
365static int
366maybe_grow_conn_heaps (struct lsquic_engine *engine)
367{
368    struct min_heap_elem *els;
369    unsigned count;
370
371    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
372        return 0;   /* Nothing to do */
373
374    if (lsquic_mh_nalloc(&engine->conns_tickable))
375        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
376    else
377        count = 8;
378
379    els = malloc(sizeof(els[0]) * count);
380    if (!els)
381    {
382        LSQ_ERROR("%s: malloc failed", __func__);
383        return -1;
384    }
385
386    LSQ_DEBUG("grew heaps to %u elements", count / 2);
387    memcpy(&els[0], engine->conns_tickable.mh_elems,
388                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
389    memcpy(&els[count / 2], engine->conns_out.mh_elems,
390                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
391    free(engine->conns_tickable.mh_elems);
392    engine->conns_tickable.mh_elems = els;
393    engine->conns_out.mh_elems = &els[count / 2];
394    engine->conns_tickable.mh_nalloc = count / 2;
395    engine->conns_out.mh_nalloc = count / 2;
396    return 0;
397}
398
399
400static lsquic_conn_t *
401new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
402                      unsigned short max_packet_size)
403{
404    lsquic_conn_t *conn;
405    unsigned flags;
406    if (0 != maybe_grow_conn_heaps(engine))
407        return NULL;
408    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
409    conn = full_conn_client_new(&engine->pub, engine->stream_if,
410                    engine->stream_if_ctx, flags, hostname, max_packet_size);
411    if (!conn)
412        return NULL;
413    ++engine->n_conns;
414    return conn;
415}
416
417
418static lsquic_conn_t *
419find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
420         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
421{
422    lsquic_conn_t *conn;
423
424    if (conn_hash_using_addr(&engine->conns_hash))
425        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
426    else if (packet_in->pi_flags & PI_CONN_ID)
427        conn = conn_hash_find_by_cid(&engine->conns_hash,
428                                                    packet_in->pi_conn_id);
429    else
430    {
431        LSQ_DEBUG("packet header does not have connection ID: discarding");
432        return NULL;
433    }
434
435    if (!conn)
436        return NULL;
437
438    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
439    if ((packet_in->pi_flags & PI_CONN_ID)
440        && conn->cn_cid != packet_in->pi_conn_id)
441    {
442        LSQ_DEBUG("connection IDs do not match");
443        return NULL;
444    }
445
446    return conn;
447}
448
449
450#if !defined(NDEBUG) && __GNUC__
451__attribute__((weak))
452#endif
453void
454lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
455                                    lsquic_conn_t *conn)
456{
457    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
458        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
459    {
460        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
461        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
462        engine_incref_conn(conn, LSCONN_TICKABLE);
463    }
464}
465
466
467void
468lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
469                                lsquic_conn_t *conn, lsquic_time_t tick_time)
470{
471    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
472    if (conn->cn_flags & LSCONN_TICKABLE)
473    {
474        /* Optimization: no need to add the connection to the Advisory Tick
475         * Time Queue: it is about to be ticked, after which it its next tick
476         * time may be queried again.
477         */;
478    }
479    else if (conn->cn_flags & LSCONN_ATTQ)
480    {
481        if (lsquic_conn_adv_time(conn) != tick_time)
482        {
483            attq_remove(engine->attq, conn);
484            if (0 != attq_add(engine->attq, conn, tick_time))
485                engine_decref_conn(engine, conn, LSCONN_ATTQ);
486        }
487    }
488    else if (0 == attq_add(engine->attq, conn, tick_time))
489        engine_incref_conn(conn, LSCONN_ATTQ);
490}
491
492
493/* Return 0 if packet is being processed by a connections, otherwise return 1 */
494static int
495process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
496       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
497       const struct sockaddr *sa_peer, void *peer_ctx)
498{
499    lsquic_conn_t *conn;
500
501    if (lsquic_packet_in_is_prst(packet_in)
502                                && !engine->pub.enp_settings.es_honor_prst)
503    {
504        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
505        LSQ_DEBUG("public reset packet: discarding");
506        return 1;
507    }
508
509    conn = find_conn(engine, packet_in, ppstate, sa_local);
510
511    if (!conn)
512    {
513        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
514        return 1;
515    }
516
517    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
518    {
519        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
520        engine_incref_conn(conn, LSCONN_TICKABLE);
521    }
522    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
523    lsquic_packet_in_upref(packet_in);
524    conn->cn_peer_ctx = peer_ctx;
525    conn->cn_if->ci_packet_in(conn, packet_in);
526    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
527    return 0;
528}
529
530
531void
532lsquic_engine_destroy (lsquic_engine_t *engine)
533{
534    lsquic_conn_t *conn;
535
536    LSQ_DEBUG("destroying engine");
537#ifndef NDEBUG
538    engine->flags |= ENG_DTOR;
539#endif
540
541    while ((conn = lsquic_mh_pop(&engine->conns_out)))
542    {
543        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
544        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
545    }
546
547    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
548    {
549        assert(conn->cn_flags & LSCONN_TICKABLE);
550        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
551    }
552
553    for (conn = conn_hash_first(&engine->conns_hash); conn;
554                            conn = conn_hash_next(&engine->conns_hash))
555        force_close_conn(engine, conn);
556    conn_hash_cleanup(&engine->conns_hash);
557
558    assert(0 == engine->n_conns);
559    attq_destroy(engine->attq);
560
561    assert(0 == lsquic_mh_count(&engine->conns_out));
562    assert(0 == lsquic_mh_count(&engine->conns_tickable));
563    free(engine->conns_tickable.mh_elems);
564    free(engine);
565}
566
567
568lsquic_conn_t *
569lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
570                       const struct sockaddr *peer_sa,
571                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
572                       const char *hostname, unsigned short max_packet_size)
573{
574    lsquic_conn_t *conn;
575    ENGINE_IN(engine);
576
577    if (engine->flags & ENG_SERVER)
578    {
579        LSQ_ERROR("`%s' must only be called in client mode", __func__);
580        goto err;
581    }
582
583    if (conn_hash_using_addr(&engine->conns_hash)
584                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
585    {
586        LSQ_ERROR("cannot have more than one connection on the same port");
587        goto err;
588    }
589
590    if (0 == max_packet_size)
591    {
592        switch (peer_sa->sa_family)
593        {
594        case AF_INET:
595            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
596            break;
597        default:
598            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
599            break;
600        }
601    }
602
603    conn = new_full_conn_client(engine, hostname, max_packet_size);
604    if (!conn)
605        goto err;
606    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
607    if (0 != conn_hash_add(&engine->conns_hash, conn))
608    {
609        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
610            conn->cn_cid);
611        destroy_conn(engine, conn);
612        goto err;
613    }
614    assert(!(conn->cn_flags &
615        (CONN_REF_FLAGS
616         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
617                                 callbacks */
618                             )));
619    conn->cn_flags |= LSCONN_HASHED;
620    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
621    engine_incref_conn(conn, LSCONN_TICKABLE);
622    conn->cn_peer_ctx = peer_ctx;
623    lsquic_conn_set_ctx(conn, conn_ctx);
624    full_conn_client_call_on_new(conn);
625  end:
626    ENGINE_OUT(engine);
627    return conn;
628  err:
629    conn = NULL;
630    goto end;
631}
632
633
634static void
635remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
636{
637    conn_hash_remove(&engine->conns_hash, conn);
638    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
639}
640
641
642static void
643refflags2str (enum lsquic_conn_flags flags, char s[6])
644{
645    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
646    *s = 'H'; s += !!(flags & LSCONN_HASHED);
647    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
648    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
649    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
650    *s = 'K'; s += !!(flags & LSCONN_TICKED);
651    *s = '\0';
652}
653
654
655static void
656engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
657{
658    char str[2][7];
659    assert(flag & CONN_REF_FLAGS);
660    assert(!(conn->cn_flags & flag));
661    conn->cn_flags |= flag;
662    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
663                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
664                    (refflags2str(conn->cn_flags, str[1]), str[1]));
665}
666
667
668static lsquic_conn_t *
669engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
670                                        enum lsquic_conn_flags flags)
671{
672    char str[2][7];
673    assert(flags & CONN_REF_FLAGS);
674    assert(conn->cn_flags & flags);
675#ifndef NDEBUG
676    if (flags & LSCONN_CLOSING)
677        assert(0 == (conn->cn_flags & LSCONN_HASHED));
678#endif
679    conn->cn_flags &= ~flags;
680    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
681                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
682                    (refflags2str(conn->cn_flags, str[1]), str[1]));
683    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
684    {
685        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
686        destroy_conn(engine, conn);
687        return NULL;
688    }
689    else
690        return conn;
691}
692
693
694/* This is not a general-purpose function.  Only call from engine dtor. */
695static void
696force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
697{
698    assert(engine->flags & ENG_DTOR);
699    const enum lsquic_conn_flags flags = conn->cn_flags;
700    assert(conn->cn_flags & CONN_REF_FLAGS);
701    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
702    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
703    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
704    if (flags & LSCONN_ATTQ)
705    {
706        attq_remove(engine->attq, conn);
707        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
708    }
709    if (flags & LSCONN_HASHED)
710        remove_conn_from_hash(engine, conn);
711}
712
713
714/* Iterator for tickable connections (those on the Tickable Queue).  Before
715 * a connection is returned, it is removed from the Advisory Tick Time queue
716 * if necessary.
717 */
718static lsquic_conn_t *
719conn_iter_next_tickable (struct lsquic_engine *engine)
720{
721    lsquic_conn_t *conn;
722
723    conn = lsquic_mh_pop(&engine->conns_tickable);
724
725    if (conn)
726        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
727    if (conn && (conn->cn_flags & LSCONN_ATTQ))
728    {
729        attq_remove(engine->attq, conn);
730        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
731    }
732
733    return conn;
734}
735
736
737void
738lsquic_engine_process_conns (lsquic_engine_t *engine)
739{
740    lsquic_conn_t *conn;
741    lsquic_time_t now;
742
743    ENGINE_IN(engine);
744
745    now = lsquic_time_now();
746    while ((conn = attq_pop(engine->attq, now)))
747    {
748        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
749        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
750        {
751            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
752            engine_incref_conn(conn, LSCONN_TICKABLE);
753        }
754    }
755
756    process_connections(engine, conn_iter_next_tickable, now);
757    ENGINE_OUT(engine);
758}
759
760
761static int
762generate_header (const lsquic_packet_out_t *packet_out,
763                 const struct parse_funcs *pf, lsquic_cid_t cid,
764                 unsigned char *buf, size_t bufsz)
765{
766    return pf->pf_gen_reg_pkt_header(buf, bufsz,
767        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
768        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
769        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
770        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
771}
772
773
774static ssize_t
775really_encrypt_packet (const lsquic_conn_t *conn,
776                       const lsquic_packet_out_t *packet_out,
777                       unsigned char *buf, size_t bufsz)
778{
779    int enc, header_sz, is_hello_packet;
780    size_t packet_sz;
781    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
782
783    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
784                                            header_buf, sizeof(header_buf));
785    if (header_sz < 0)
786        return -1;
787
788    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
789    enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0,
790                packet_out->po_packno, header_buf, header_sz,
791                packet_out->po_data, packet_out->po_data_sz,
792                buf, bufsz, &packet_sz, is_hello_packet);
793    if (0 == enc)
794    {
795        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
796            "ciphertext is %zd bytes",
797            packet_out->po_packno,
798            lsquic_po_header_length(packet_out->po_flags) +
799                                                packet_out->po_data_sz,
800            packet_sz);
801        return packet_sz;
802    }
803    else
804        return -1;
805}
806
807
808static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
809encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
810                                            lsquic_packet_out_t *packet_out)
811{
812    ssize_t enc_sz;
813    size_t bufsz;
814    unsigned sent_sz;
815    unsigned char *buf;
816
817    bufsz = lsquic_po_header_length(packet_out->po_flags) +
818                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
819    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
820    if (!buf)
821    {
822        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
823                                                                        bufsz);
824        return ENCPA_NOMEM;
825    }
826
827    {
828        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
829        sent_sz = enc_sz;
830    }
831
832    if (enc_sz < 0)
833    {
834        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
835        return ENCPA_BADCRYPT;
836    }
837
838    packet_out->po_enc_data    = buf;
839    packet_out->po_enc_data_sz = enc_sz;
840    packet_out->po_sent_sz     = sent_sz;
841    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ;
842
843    return ENCPA_OK;
844}
845
846
847STAILQ_HEAD(conns_stailq, lsquic_conn);
848TAILQ_HEAD(conns_tailq, lsquic_conn);
849
850
851struct conns_out_iter
852{
853    struct min_heap            *coi_heap;
854    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
855                                coi_inactive_list;
856    lsquic_conn_t              *coi_next;
857#ifndef NDEBUG
858    lsquic_time_t               coi_last_sent;
859#endif
860};
861
862
863static void
864coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
865{
866    iter->coi_heap = &engine->conns_out;
867    iter->coi_next = NULL;
868    TAILQ_INIT(&iter->coi_active_list);
869    TAILQ_INIT(&iter->coi_inactive_list);
870#ifndef NDEBUG
871    iter->coi_last_sent = 0;
872#endif
873}
874
875
876static lsquic_conn_t *
877coi_next (struct conns_out_iter *iter)
878{
879    lsquic_conn_t *conn;
880
881    if (lsquic_mh_count(iter->coi_heap) > 0)
882    {
883        conn = lsquic_mh_pop(iter->coi_heap);
884        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
885        conn->cn_flags |= LSCONN_COI_ACTIVE;
886#ifndef NDEBUG
887        if (iter->coi_last_sent)
888            assert(iter->coi_last_sent <= conn->cn_last_sent);
889        iter->coi_last_sent = conn->cn_last_sent;
890#endif
891        return conn;
892    }
893    else if (!TAILQ_EMPTY(&iter->coi_active_list))
894    {
895        conn = iter->coi_next;
896        if (!conn)
897            conn = TAILQ_FIRST(&iter->coi_active_list);
898        if (conn)
899            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
900        return conn;
901    }
902    else
903        return NULL;
904}
905
906
907static void
908coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
909{
910    if (!(conn->cn_flags & LSCONN_EVANESCENT))
911    {
912        assert(!TAILQ_EMPTY(&iter->coi_active_list));
913        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
914        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
915        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
916        conn->cn_flags |= LSCONN_COI_INACTIVE;
917    }
918}
919
920
921static void
922coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
923{
924    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
925    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
926    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
927    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
928    conn->cn_flags |= LSCONN_COI_ACTIVE;
929}
930
931
932static void
933coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
934{
935    lsquic_conn_t *conn;
936    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
937    {
938        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
939        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
940        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
941    }
942    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
943    {
944        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
945        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
946        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
947    }
948}
949
950
951static unsigned
952send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
953                  struct out_batch *batch, unsigned n_to_send)
954{
955    int n_sent, i;
956    lsquic_time_t now;
957
958    /* Set sent time before the write to avoid underestimating RTT */
959    now = lsquic_time_now();
960    for (i = 0; i < (int) n_to_send; ++i)
961        batch->packets[i]->po_sent = now;
962    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
963                                                                n_to_send);
964    if (n_sent >= 0)
965        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
966    else
967    {
968        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
969        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
970        EV_LOG_GENERIC_EVENT("cannot send packets");
971        n_sent = 0;
972    }
973    if (n_sent > 0)
974        engine->last_sent = now + n_sent;
975    for (i = 0; i < n_sent; ++i)
976    {
977        eng_hist_inc(&engine->history, now, sl_packets_out);
978        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
979        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
980                                                    batch->packets[i]);
981        /* `i' is added to maintain relative order */
982        batch->conns[i]->cn_last_sent = now + i;
983        /* Release packet out buffer as soon as the packet is sent
984         * successfully.  If not successfully sent, we hold on to
985         * this buffer until the packet sending is attempted again
986         * or until it times out and regenerated.
987         */
988        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
989        {
990            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
991            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
992                                                batch->packets[i]->po_enc_data);
993            batch->packets[i]->po_enc_data = NULL;  /* JIC */
994        }
995    }
996    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
997        for ( ; i < (int) n_to_send; ++i)
998            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
999    /* Return packets to the connection in reverse order so that the packet
1000     * ordering is maintained.
1001     */
1002    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1003    {
1004        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1005                                                    batch->packets[i]);
1006        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1007            coi_reactivate(conns_iter, batch->conns[i]);
1008    }
1009    return n_sent;
1010}
1011
1012
1013/* Return 1 if went past deadline, 0 otherwise */
1014static int
1015check_deadline (lsquic_engine_t *engine)
1016{
1017    if (engine->pub.enp_settings.es_proc_time_thresh &&
1018                                lsquic_time_now() > engine->deadline)
1019    {
1020        LSQ_INFO("went past threshold of %u usec, stop sending",
1021                            engine->pub.enp_settings.es_proc_time_thresh);
1022        engine->flags |= ENG_PAST_DEADLINE;
1023        return 1;
1024    }
1025    else
1026        return 0;
1027}
1028
1029
1030static void
1031send_packets_out (struct lsquic_engine *engine,
1032                  struct conns_tailq *ticked_conns,
1033                  struct conns_stailq *closed_conns)
1034{
1035    unsigned n, w, n_sent, n_batches_sent;
1036    lsquic_packet_out_t *packet_out;
1037    lsquic_conn_t *conn;
1038    struct out_batch *const batch = &engine->out_batch;
1039    struct conns_out_iter conns_iter;
1040    int shrink, deadline_exceeded;
1041
1042    coi_init(&conns_iter, engine);
1043    n_batches_sent = 0;
1044    n_sent = 0, n = 0;
1045    shrink = 0;
1046    deadline_exceeded = 0;
1047
1048    while ((conn = coi_next(&conns_iter)))
1049    {
1050        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1051        if (!packet_out) {
1052            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1053                                                            conn->cn_cid);
1054            coi_deactivate(&conns_iter, conn);
1055            continue;
1056        }
1057        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1058        {
1059            switch (encrypt_packet(engine, conn, packet_out))
1060            {
1061            case ENCPA_NOMEM:
1062                /* Send what we have and wait for a more opportune moment */
1063                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1064                goto end_for;
1065            case ENCPA_BADCRYPT:
1066                /* This is pretty bad: close connection immediately */
1067                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1068                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1069                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1070                {
1071                    if (!(conn->cn_flags & LSCONN_CLOSING))
1072                    {
1073                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1074                        engine_incref_conn(conn, LSCONN_CLOSING);
1075                        if (conn->cn_flags & LSCONN_HASHED)
1076                            remove_conn_from_hash(engine, conn);
1077                    }
1078                    coi_deactivate(&conns_iter, conn);
1079                    if (conn->cn_flags & LSCONN_TICKED)
1080                    {
1081                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1082                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1083                    }
1084                }
1085                continue;
1086            case ENCPA_OK:
1087                break;
1088            }
1089        }
1090        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1091                                        packet_out->po_packno, conn->cn_cid);
1092        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1093        if (packet_out->po_flags & PO_ENCRYPTED)
1094        {
1095            batch->outs[n].buf     = packet_out->po_enc_data;
1096            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1097        }
1098        else
1099        {
1100            batch->outs[n].buf     = packet_out->po_data;
1101            batch->outs[n].sz      = packet_out->po_data_sz;
1102        }
1103        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1104        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1105        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1106        batch->conns  [n]          = conn;
1107        batch->packets[n]          = packet_out;
1108        ++n;
1109        if (n == engine->batch_size)
1110        {
1111            n = 0;
1112            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1113            ++n_batches_sent;
1114            n_sent += w;
1115            if (w < engine->batch_size)
1116            {
1117                shrink = 1;
1118                break;
1119            }
1120            deadline_exceeded = check_deadline(engine);
1121            if (deadline_exceeded)
1122                break;
1123            grow_batch_size(engine);
1124        }
1125    }
1126  end_for:
1127
1128    if (n > 0) {
1129        w = send_batch(engine, &conns_iter, batch, n);
1130        n_sent += w;
1131        shrink = w < n;
1132        ++n_batches_sent;
1133        deadline_exceeded = check_deadline(engine);
1134    }
1135
1136    if (shrink)
1137        shrink_batch_size(engine);
1138    else if (n_batches_sent > 1 && !deadline_exceeded)
1139        grow_batch_size(engine);
1140
1141    coi_reheap(&conns_iter, engine);
1142
1143    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1144}
1145
1146
1147int
1148lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1149{
1150    return lsquic_mh_count(&engine->conns_out) > 0
1151    ;
1152}
1153
1154
1155static void
1156reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1157{
1158    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1159    engine->flags &= ~ENG_PAST_DEADLINE;
1160}
1161
1162
1163/* TODO: this is a user-facing function, account for load */
1164void
1165lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1166{
1167    lsquic_conn_t *conn;
1168    struct conns_stailq closed_conns;
1169    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1170
1171    STAILQ_INIT(&closed_conns);
1172    reset_deadline(engine, lsquic_time_now());
1173    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1174    {
1175        LSQ_DEBUG("can send again");
1176        EV_LOG_GENERIC_EVENT("can send again");
1177        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1178    }
1179
1180    send_packets_out(engine, &ticked_conns, &closed_conns);
1181
1182    while ((conn = STAILQ_FIRST(&closed_conns))) {
1183        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1184        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1185    }
1186
1187}
1188
1189
1190static void
1191process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1192                     lsquic_time_t now)
1193{
1194    lsquic_conn_t *conn;
1195    enum tick_st tick_st;
1196    unsigned i;
1197    lsquic_time_t next_tick_time;
1198    struct conns_stailq closed_conns;
1199    struct conns_tailq ticked_conns;
1200
1201    eng_hist_tick(&engine->history, now);
1202
1203    STAILQ_INIT(&closed_conns);
1204    TAILQ_INIT(&ticked_conns);
1205    reset_deadline(engine, now);
1206
1207    i = 0;
1208    while ((conn = next_conn(engine))
1209          )
1210    {
1211        tick_st = conn->cn_if->ci_tick(conn, now);
1212        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1213        if (tick_st & TICK_SEND)
1214        {
1215            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1216            {
1217                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1218                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1219            }
1220        }
1221        if (tick_st & TICK_CLOSE)
1222        {
1223            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1224            engine_incref_conn(conn, LSCONN_CLOSING);
1225            if (conn->cn_flags & LSCONN_HASHED)
1226                remove_conn_from_hash(engine, conn);
1227        }
1228        else
1229        {
1230            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1231            engine_incref_conn(conn, LSCONN_TICKED);
1232        }
1233    }
1234
1235    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1236                        && lsquic_engine_has_unsent_packets(engine))
1237        send_packets_out(engine, &ticked_conns, &closed_conns);
1238
1239    while ((conn = STAILQ_FIRST(&closed_conns))) {
1240        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1241        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1242    }
1243
1244    /* TODO Heapification can be optimized by switching to the Floyd method:
1245     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1246     */
1247    while ((conn = TAILQ_FIRST(&ticked_conns)))
1248    {
1249        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1250        engine_decref_conn(engine, conn, LSCONN_TICKED);
1251        if (!(conn->cn_flags & LSCONN_TICKABLE)
1252            && conn->cn_if->ci_is_tickable(conn))
1253        {
1254            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1255            engine_incref_conn(conn, LSCONN_TICKABLE);
1256        }
1257        else if (!(conn->cn_flags & LSCONN_ATTQ))
1258        {
1259            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1260            if (next_tick_time)
1261            {
1262                if (0 == attq_add(engine->attq, conn, next_tick_time))
1263                    engine_incref_conn(conn, LSCONN_ATTQ);
1264            }
1265            else
1266                assert(0);
1267        }
1268    }
1269
1270}
1271
1272
1273/* Return 0 if packet is being processed by a real connection, 1 if the
1274 * packet was processed, but not by a connection, and -1 on error.
1275 */
1276int
1277lsquic_engine_packet_in (lsquic_engine_t *engine,
1278    const unsigned char *packet_in_data, size_t packet_in_size,
1279    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1280    void *peer_ctx)
1281{
1282    struct packin_parse_state ppstate;
1283    lsquic_packet_in_t *packet_in;
1284
1285    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1286    {
1287        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1288            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1289        errno = E2BIG;
1290        return -1;
1291    }
1292
1293    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1294    if (!packet_in)
1295        return -1;
1296
1297    /* Library does not modify packet_in_data, it is not referenced after
1298     * this function returns and subsequent release of pi_data is guarded
1299     * by PI_OWN_DATA flag.
1300     */
1301    packet_in->pi_data = (unsigned char *) packet_in_data;
1302    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1303                                        engine->flags & ENG_SERVER, &ppstate))
1304    {
1305        LSQ_DEBUG("Cannot parse incoming packet's header");
1306        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1307        errno = EINVAL;
1308        return -1;
1309    }
1310
1311    packet_in->pi_received = lsquic_time_now();
1312    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1313    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1314                                                                    peer_ctx);
1315}
1316
1317
1318#if __GNUC__ && !defined(NDEBUG)
1319__attribute__((weak))
1320#endif
1321unsigned
1322lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1323{
1324    return engine->pub.enp_settings.es_versions;
1325}
1326
1327
1328int
1329lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1330{
1331    const lsquic_time_t *next_time;
1332    lsquic_time_t now;
1333
1334    if (((engine->flags & ENG_PAST_DEADLINE)
1335                                    && lsquic_mh_count(&engine->conns_out))
1336        || lsquic_mh_count(&engine->conns_tickable))
1337    {
1338        *diff = 0;
1339        return 1;
1340    }
1341
1342    next_time = attq_next_time(engine->attq);
1343    if (!next_time)
1344        return 0;
1345
1346    now = lsquic_time_now();
1347    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1348    return 1;
1349}
1350
1351
1352unsigned
1353lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1354{
1355    lsquic_time_t now;
1356    now = lsquic_time_now();
1357    if (from_now < 0)
1358        now -= from_now;
1359    else
1360        now += from_now;
1361    return attq_count_before(engine->attq, now);
1362}
1363