lsquic_engine.c revision 14e3680d
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <time.h>
15#ifndef WIN32
16#include <sys/time.h>
17#include <netinet/in.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <fcntl.h>
21#include <unistd.h>
22#include <netdb.h>
23#endif
24
25
26
27#include "lsquic.h"
28#include "lsquic_types.h"
29#include "lsquic_alarmset.h"
30#include "lsquic_parse_common.h"
31#include "lsquic_parse.h"
32#include "lsquic_packet_in.h"
33#include "lsquic_packet_out.h"
34#include "lsquic_senhist.h"
35#include "lsquic_rtt.h"
36#include "lsquic_cubic.h"
37#include "lsquic_pacer.h"
38#include "lsquic_send_ctl.h"
39#include "lsquic_set.h"
40#include "lsquic_conn_flow.h"
41#include "lsquic_sfcw.h"
42#include "lsquic_stream.h"
43#include "lsquic_conn.h"
44#include "lsquic_full_conn.h"
45#include "lsquic_util.h"
46#include "lsquic_qtags.h"
47#include "lsquic_str.h"
48#include "lsquic_handshake.h"
49#include "lsquic_mm.h"
50#include "lsquic_conn_hash.h"
51#include "lsquic_engine_public.h"
52#include "lsquic_eng_hist.h"
53#include "lsquic_ev_log.h"
54#include "lsquic_version.h"
55#include "lsquic_hash.h"
56#include "lsquic_attq.h"
57#include "lsquic_min_heap.h"
58#include "lsquic_http1x_if.h"
59
60#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
61#include "lsquic_logger.h"
62
63
64/* The batch of outgoing packets grows and shrinks dynamically */
65#define MAX_OUT_BATCH_SIZE 1024
66#define MIN_OUT_BATCH_SIZE 256
67#define INITIAL_OUT_BATCH_SIZE 512
68
69struct out_batch
70{
71    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
72    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
73    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
74};
75
76typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
77
78static void
79process_connections (struct lsquic_engine *engine, conn_iter_f iter,
80                     lsquic_time_t now);
81
82static void
83engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
84
85static lsquic_conn_t *
86engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
87                                        enum lsquic_conn_flags flag);
88
89static void
90force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
91
92/* Nested calls to LSQUIC are not supported */
93#define ENGINE_IN(e) do {                               \
94    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
95    (e)->pub.enp_flags |= ENPUB_PROC;                   \
96} while (0)
97
98#define ENGINE_OUT(e) do {                              \
99    assert((e)->pub.enp_flags & ENPUB_PROC);            \
100    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
101} while (0)
102
103/* A connection can be referenced from one of six places:
104 *
105 *   1. Connection hash: a connection starts its life in one of those.
106 *
107 *   2. Outgoing queue.
108 *
109 *   3. Tickable queue
110 *
111 *   4. Advisory Tick Time queue.
112 *
113 *   5. Closing connections queue.  This is a transient queue -- it only
114 *      exists for the duration of process_connections() function call.
115 *
116 *   6. Ticked connections queue.  Another transient queue, similar to (5).
117 *
118 * The idea is to destroy the connection when it is no longer referenced.
119 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
120 * that case, the connection is referenced from two places: (2) and (5).
121 * After its packets are sent, it is only referenced in (5), and at the
122 * end of the function call, when it is removed from (5), reference count
123 * goes to zero and the connection is destroyed.  If not all packets can
124 * be sent, at the end of the function call, the connection is referenced
125 * by (2) and will only be removed once all outgoing packets have been
126 * sent.
127 */
128#define CONN_REF_FLAGS  (LSCONN_HASHED          \
129                        |LSCONN_HAS_OUTGOING    \
130                        |LSCONN_TICKABLE        \
131                        |LSCONN_TICKED          \
132                        |LSCONN_CLOSING         \
133                        |LSCONN_ATTQ)
134
135
136
137
138struct lsquic_engine
139{
140    struct lsquic_engine_public        pub;
141    enum {
142        ENG_SERVER      = LSENG_SERVER,
143        ENG_HTTP        = LSENG_HTTP,
144        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
145        ENG_PAST_DEADLINE
146                        = (1 <<  8),    /* Previous call to a processing
147                                         * function went past time threshold.
148                                         */
149#ifndef NDEBUG
150        ENG_DTOR        = (1 << 26),    /* Engine destructor */
151#endif
152    }                                  flags;
153    const struct lsquic_stream_if     *stream_if;
154    void                              *stream_if_ctx;
155    lsquic_packets_out_f               packets_out;
156    void                              *packets_out_ctx;
157    void                              *bad_handshake_ctx;
158    struct conn_hash                   conns_hash;
159    struct min_heap                    conns_tickable;
160    struct min_heap                    conns_out;
161    struct eng_hist                    history;
162    unsigned                           batch_size;
163    struct attq                       *attq;
164    /* Track time last time a packet was sent to give new connections
165     * priority lower than that of existing connections.
166     */
167    lsquic_time_t                      last_sent;
168    unsigned                           n_conns;
169    lsquic_time_t                      deadline;
170    struct out_batch                   out_batch;
171};
172
173
174void
175lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
176                             unsigned flags)
177{
178    memset(settings, 0, sizeof(*settings));
179    settings->es_versions        = LSQUIC_DF_VERSIONS;
180    if (flags & ENG_SERVER)
181    {
182        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
183        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
184        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
185    }
186    else
187    {
188        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
189        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
190        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
191    }
192    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
193    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
194    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
195    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
196    settings->es_max_header_list_size
197                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
198    settings->es_ua              = LSQUIC_DF_UA;
199
200    settings->es_pdmd            = QTAG_X509;
201    settings->es_aead            = QTAG_AESG;
202    settings->es_kexs            = QTAG_C255;
203    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
204    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
205    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
206    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
207    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
208    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
209    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
210    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
211}
212
213
214/* Note: if returning an error, err_buf must be valid if non-NULL */
215int
216lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
217                              unsigned flags,
218                              char *err_buf, size_t err_buf_sz)
219{
220    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
221        settings->es_sfcw < LSQUIC_MIN_FCW)
222    {
223        if (err_buf)
224            snprintf(err_buf, err_buf_sz, "%s",
225                                            "flow control window set too low");
226        return -1;
227    }
228    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
229    {
230        if (err_buf)
231            snprintf(err_buf, err_buf_sz, "%s",
232                        "No supported QUIC versions specified");
233        return -1;
234    }
235    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
236    {
237        if (err_buf)
238            snprintf(err_buf, err_buf_sz, "%s",
239                        "one or more unsupported QUIC version is specified");
240        return -1;
241    }
242    return 0;
243}
244
245
246static void
247free_packet (void *ctx, unsigned char *packet_data)
248{
249    free(packet_data);
250}
251
252
253static void *
254malloc_buf (void *ctx, size_t size)
255{
256    return malloc(size);
257}
258
259
260static const struct lsquic_packout_mem_if stock_pmi =
261{
262    malloc_buf, (void(*)(void *, void *)) free_packet,
263};
264
265
266static int
267hash_conns_by_addr (const struct lsquic_engine *engine)
268{
269    if (engine->pub.enp_settings.es_versions & LSQUIC_FORCED_TCID0_VERSIONS)
270        return 1;
271    if ((engine->pub.enp_settings.es_versions & LSQUIC_GQUIC_HEADER_VERSIONS)
272                                && engine->pub.enp_settings.es_support_tcid0)
273        return 1;
274    return 0;
275}
276
277
278lsquic_engine_t *
279lsquic_engine_new (unsigned flags,
280                   const struct lsquic_engine_api *api)
281{
282    lsquic_engine_t *engine;
283    int tag_buf_len;
284    char err_buf[100];
285
286    if (!api->ea_packets_out)
287    {
288        LSQ_ERROR("packets_out callback is not specified");
289        return NULL;
290    }
291
292    if (api->ea_settings &&
293                0 != lsquic_engine_check_settings(api->ea_settings, flags,
294                                                    err_buf, sizeof(err_buf)))
295    {
296        LSQ_ERROR("cannot create engine: %s", err_buf);
297        return NULL;
298    }
299
300    engine = calloc(1, sizeof(*engine));
301    if (!engine)
302        return NULL;
303    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
304    {
305        free(engine);
306        return NULL;
307    }
308    if (api->ea_settings)
309        engine->pub.enp_settings        = *api->ea_settings;
310    else
311        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
312    tag_buf_len = lsquic_gen_ver_tags(engine->pub.enp_ver_tags_buf,
313                                    sizeof(engine->pub.enp_ver_tags_buf),
314                                    engine->pub.enp_settings.es_versions);
315    if (tag_buf_len <= 0)
316    {
317        LSQ_ERROR("cannot generate version tags buffer");
318        free(engine);
319        return NULL;
320    }
321    engine->pub.enp_ver_tags_len = tag_buf_len;
322    engine->pub.enp_flags = ENPUB_CAN_SEND;
323
324    engine->flags           = flags;
325    engine->stream_if       = api->ea_stream_if;
326    engine->stream_if_ctx   = api->ea_stream_if_ctx;
327    engine->packets_out     = api->ea_packets_out;
328    engine->packets_out_ctx = api->ea_packets_out_ctx;
329    if (api->ea_hsi_if)
330    {
331        engine->pub.enp_hsi_if  = api->ea_hsi_if;
332        engine->pub.enp_hsi_ctx = api->ea_hsi_ctx;
333    }
334    else
335    {
336        engine->pub.enp_hsi_if  = lsquic_http1x_if;
337        engine->pub.enp_hsi_ctx = NULL;
338    }
339    if (api->ea_pmi)
340    {
341        engine->pub.enp_pmi      = api->ea_pmi;
342        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
343    }
344    else
345    {
346        engine->pub.enp_pmi      = &stock_pmi;
347        engine->pub.enp_pmi_ctx  = NULL;
348    }
349    engine->pub.enp_verify_cert  = api->ea_verify_cert;
350    engine->pub.enp_verify_ctx   = api->ea_verify_ctx;
351    engine->pub.enp_engine = engine;
352    conn_hash_init(&engine->conns_hash,
353                        hash_conns_by_addr(engine) ?  CHF_USE_ADDR : 0);
354    engine->attq = attq_create();
355    eng_hist_init(&engine->history);
356    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
357
358
359    LSQ_INFO("instantiated engine");
360    return engine;
361}
362
363
364static void
365grow_batch_size (struct lsquic_engine *engine)
366{
367    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
368}
369
370
371static void
372shrink_batch_size (struct lsquic_engine *engine)
373{
374    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
375}
376
377
378/* Wrapper to make sure important things occur before the connection is
379 * really destroyed.
380 */
381static void
382destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
383{
384    --engine->n_conns;
385    conn->cn_flags |= LSCONN_NEVER_TICKABLE;
386    conn->cn_if->ci_destroy(conn);
387}
388
389
390static int
391maybe_grow_conn_heaps (struct lsquic_engine *engine)
392{
393    struct min_heap_elem *els;
394    unsigned count;
395
396    if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
397        return 0;   /* Nothing to do */
398
399    if (lsquic_mh_nalloc(&engine->conns_tickable))
400        count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
401    else
402        count = 8;
403
404    els = malloc(sizeof(els[0]) * count);
405    if (!els)
406    {
407        LSQ_ERROR("%s: malloc failed", __func__);
408        return -1;
409    }
410
411    LSQ_DEBUG("grew heaps to %u elements", count / 2);
412    memcpy(&els[0], engine->conns_tickable.mh_elems,
413                sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
414    memcpy(&els[count / 2], engine->conns_out.mh_elems,
415                sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
416    free(engine->conns_tickable.mh_elems);
417    engine->conns_tickable.mh_elems = els;
418    engine->conns_out.mh_elems = &els[count / 2];
419    engine->conns_tickable.mh_nalloc = count / 2;
420    engine->conns_out.mh_nalloc = count / 2;
421    return 0;
422}
423
424
425static lsquic_conn_t *
426new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
427                      unsigned short max_packet_size)
428{
429    lsquic_conn_t *conn;
430    unsigned flags;
431    if (0 != maybe_grow_conn_heaps(engine))
432        return NULL;
433    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
434    conn = full_conn_client_new(&engine->pub, engine->stream_if,
435                    engine->stream_if_ctx, flags, hostname, max_packet_size);
436    if (!conn)
437        return NULL;
438    ++engine->n_conns;
439    return conn;
440}
441
442
443static lsquic_conn_t *
444find_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
445         struct packin_parse_state *ppstate, const struct sockaddr *sa_local)
446{
447    lsquic_conn_t *conn;
448
449    if (conn_hash_using_addr(&engine->conns_hash))
450        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
451    else if (packet_in->pi_flags & PI_CONN_ID)
452        conn = conn_hash_find_by_cid(&engine->conns_hash,
453                                                    packet_in->pi_conn_id);
454    else
455    {
456        LSQ_DEBUG("packet header does not have connection ID: discarding");
457        return NULL;
458    }
459
460    if (!conn)
461        return NULL;
462
463    conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
464    if ((packet_in->pi_flags & PI_CONN_ID)
465        && conn->cn_cid != packet_in->pi_conn_id)
466    {
467        LSQ_DEBUG("connection IDs do not match");
468        return NULL;
469    }
470
471    return conn;
472}
473
474
475#if !defined(NDEBUG) && __GNUC__
476__attribute__((weak))
477#endif
478void
479lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
480                                    lsquic_conn_t *conn)
481{
482    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
483        0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
484    {
485        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
486        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
487        engine_incref_conn(conn, LSCONN_TICKABLE);
488    }
489}
490
491
492void
493lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
494                                lsquic_conn_t *conn, lsquic_time_t tick_time)
495{
496    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
497    if (conn->cn_flags & LSCONN_TICKABLE)
498    {
499        /* Optimization: no need to add the connection to the Advisory Tick
500         * Time Queue: it is about to be ticked, after which it its next tick
501         * time may be queried again.
502         */;
503    }
504    else if (conn->cn_flags & LSCONN_ATTQ)
505    {
506        if (lsquic_conn_adv_time(conn) != tick_time)
507        {
508            attq_remove(engine->attq, conn);
509            if (0 != attq_add(engine->attq, conn, tick_time))
510                engine_decref_conn(engine, conn, LSCONN_ATTQ);
511        }
512    }
513    else if (0 == attq_add(engine->attq, conn, tick_time))
514        engine_incref_conn(conn, LSCONN_ATTQ);
515}
516
517
518/* Return 0 if packet is being processed by a connections, otherwise return 1 */
519static int
520process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
521       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
522       const struct sockaddr *sa_peer, void *peer_ctx)
523{
524    lsquic_conn_t *conn;
525
526    if (lsquic_packet_in_is_gquic_prst(packet_in)
527                                && !engine->pub.enp_settings.es_honor_prst)
528    {
529        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
530        LSQ_DEBUG("public reset packet: discarding");
531        return 1;
532    }
533
534    conn = find_conn(engine, packet_in, ppstate, sa_local);
535
536    if (!conn)
537    {
538        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
539        return 1;
540    }
541
542    if (0 == (conn->cn_flags & LSCONN_TICKABLE))
543    {
544        lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
545        engine_incref_conn(conn, LSCONN_TICKABLE);
546    }
547    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
548    lsquic_packet_in_upref(packet_in);
549    conn->cn_peer_ctx = peer_ctx;
550    conn->cn_if->ci_packet_in(conn, packet_in);
551    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
552    return 0;
553}
554
555
556void
557lsquic_engine_destroy (lsquic_engine_t *engine)
558{
559    lsquic_conn_t *conn;
560
561    LSQ_DEBUG("destroying engine");
562#ifndef NDEBUG
563    engine->flags |= ENG_DTOR;
564#endif
565
566    while ((conn = lsquic_mh_pop(&engine->conns_out)))
567    {
568        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
569        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
570    }
571
572    while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
573    {
574        assert(conn->cn_flags & LSCONN_TICKABLE);
575        (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
576    }
577
578    for (conn = conn_hash_first(&engine->conns_hash); conn;
579                            conn = conn_hash_next(&engine->conns_hash))
580        force_close_conn(engine, conn);
581    conn_hash_cleanup(&engine->conns_hash);
582
583    assert(0 == engine->n_conns);
584    attq_destroy(engine->attq);
585
586    assert(0 == lsquic_mh_count(&engine->conns_out));
587    assert(0 == lsquic_mh_count(&engine->conns_tickable));
588    lsquic_mm_cleanup(&engine->pub.enp_mm);
589    free(engine->conns_tickable.mh_elems);
590    free(engine);
591}
592
593
594lsquic_conn_t *
595lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *local_sa,
596                       const struct sockaddr *peer_sa,
597                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
598                       const char *hostname, unsigned short max_packet_size)
599{
600    lsquic_conn_t *conn;
601    ENGINE_IN(engine);
602
603    if (engine->flags & ENG_SERVER)
604    {
605        LSQ_ERROR("`%s' must only be called in client mode", __func__);
606        goto err;
607    }
608
609    if (conn_hash_using_addr(&engine->conns_hash)
610                && conn_hash_find_by_addr(&engine->conns_hash, local_sa))
611    {
612        LSQ_ERROR("cannot have more than one connection on the same port");
613        goto err;
614    }
615
616    if (0 == max_packet_size)
617    {
618        switch (peer_sa->sa_family)
619        {
620        case AF_INET:
621            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
622            break;
623        default:
624            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
625            break;
626        }
627    }
628
629    conn = new_full_conn_client(engine, hostname, max_packet_size);
630    if (!conn)
631        goto err;
632    lsquic_conn_record_sockaddr(conn, local_sa, peer_sa);
633    if (0 != conn_hash_add(&engine->conns_hash, conn))
634    {
635        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
636            conn->cn_cid);
637        destroy_conn(engine, conn);
638        goto err;
639    }
640    assert(!(conn->cn_flags &
641        (CONN_REF_FLAGS
642         & ~LSCONN_TICKABLE /* This flag may be set as effect of user
643                                 callbacks */
644                             )));
645    conn->cn_flags |= LSCONN_HASHED;
646    lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
647    engine_incref_conn(conn, LSCONN_TICKABLE);
648    conn->cn_peer_ctx = peer_ctx;
649    lsquic_conn_set_ctx(conn, conn_ctx);
650    full_conn_client_call_on_new(conn);
651  end:
652    ENGINE_OUT(engine);
653    return conn;
654  err:
655    conn = NULL;
656    goto end;
657}
658
659
660static void
661remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
662{
663    conn_hash_remove(&engine->conns_hash, conn);
664    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
665}
666
667
668static void
669refflags2str (enum lsquic_conn_flags flags, char s[6])
670{
671    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
672    *s = 'H'; s += !!(flags & LSCONN_HASHED);
673    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
674    *s = 'T'; s += !!(flags & LSCONN_TICKABLE);
675    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
676    *s = 'K'; s += !!(flags & LSCONN_TICKED);
677    *s = '\0';
678}
679
680
681static void
682engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
683{
684    char str[2][7];
685    assert(flag & CONN_REF_FLAGS);
686    assert(!(conn->cn_flags & flag));
687    conn->cn_flags |= flag;
688    LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
689                    (refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
690                    (refflags2str(conn->cn_flags, str[1]), str[1]));
691}
692
693
694static lsquic_conn_t *
695engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
696                                        enum lsquic_conn_flags flags)
697{
698    char str[2][7];
699    assert(flags & CONN_REF_FLAGS);
700    assert(conn->cn_flags & flags);
701#ifndef NDEBUG
702    if (flags & LSCONN_CLOSING)
703        assert(0 == (conn->cn_flags & LSCONN_HASHED));
704#endif
705    conn->cn_flags &= ~flags;
706    LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
707                    (refflags2str(conn->cn_flags | flags, str[0]), str[0]),
708                    (refflags2str(conn->cn_flags, str[1]), str[1]));
709    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
710    {
711        eng_hist_inc(&engine->history, 0, sl_del_full_conns);
712        destroy_conn(engine, conn);
713        return NULL;
714    }
715    else
716        return conn;
717}
718
719
720/* This is not a general-purpose function.  Only call from engine dtor. */
721static void
722force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
723{
724    assert(engine->flags & ENG_DTOR);
725    const enum lsquic_conn_flags flags = conn->cn_flags;
726    assert(conn->cn_flags & CONN_REF_FLAGS);
727    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
728    assert(!(flags & LSCONN_TICKABLE));    /* Should be removed already */
729    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
730    if (flags & LSCONN_ATTQ)
731    {
732        attq_remove(engine->attq, conn);
733        (void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
734    }
735    if (flags & LSCONN_HASHED)
736        remove_conn_from_hash(engine, conn);
737}
738
739
740/* Iterator for tickable connections (those on the Tickable Queue).  Before
741 * a connection is returned, it is removed from the Advisory Tick Time queue
742 * if necessary.
743 */
744static lsquic_conn_t *
745conn_iter_next_tickable (struct lsquic_engine *engine)
746{
747    lsquic_conn_t *conn;
748
749    conn = lsquic_mh_pop(&engine->conns_tickable);
750
751    if (conn)
752        conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
753    if (conn && (conn->cn_flags & LSCONN_ATTQ))
754    {
755        attq_remove(engine->attq, conn);
756        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
757    }
758
759    return conn;
760}
761
762
763void
764lsquic_engine_process_conns (lsquic_engine_t *engine)
765{
766    lsquic_conn_t *conn;
767    lsquic_time_t now;
768
769    ENGINE_IN(engine);
770
771    now = lsquic_time_now();
772    while ((conn = attq_pop(engine->attq, now)))
773    {
774        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
775        if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
776        {
777            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
778            engine_incref_conn(conn, LSCONN_TICKABLE);
779        }
780    }
781
782    process_connections(engine, conn_iter_next_tickable, now);
783    ENGINE_OUT(engine);
784}
785
786
787static ssize_t
788really_encrypt_packet (const lsquic_conn_t *conn,
789                       struct lsquic_packet_out *packet_out,
790                       unsigned char *buf, size_t bufsz)
791{
792    int header_sz, is_hello_packet;
793    enum enc_level enc_level;
794    size_t packet_sz;
795    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
796
797    header_sz = conn->cn_pf->pf_gen_reg_pkt_header(conn, packet_out,
798                                            header_buf, sizeof(header_buf));
799    if (header_sz < 0)
800        return -1;
801
802    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
803    enc_level = conn->cn_esf->esf_encrypt(conn->cn_enc_session,
804                conn->cn_version, 0,
805                packet_out->po_packno, header_buf, header_sz,
806                packet_out->po_data, packet_out->po_data_sz,
807                buf, bufsz, &packet_sz, is_hello_packet);
808    if ((int) enc_level >= 0)
809    {
810        lsquic_packet_out_set_enc_level(packet_out, enc_level);
811        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %zu bytes, "
812            "ciphertext is %zd bytes",
813            packet_out->po_packno,
814            conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
815                                                packet_out->po_data_sz,
816            packet_sz);
817        return packet_sz;
818    }
819    else
820        return -1;
821}
822
823
824static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
825encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
826                                            lsquic_packet_out_t *packet_out)
827{
828    ssize_t enc_sz;
829    size_t bufsz;
830    unsigned sent_sz;
831    unsigned char *buf;
832
833    bufsz = conn->cn_pf->pf_packout_header_size(conn, packet_out->po_flags) +
834                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
835    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
836    if (!buf)
837    {
838        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
839                                                                        bufsz);
840        return ENCPA_NOMEM;
841    }
842
843    {
844        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
845        sent_sz = enc_sz;
846    }
847
848    if (enc_sz < 0)
849    {
850        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
851        return ENCPA_BADCRYPT;
852    }
853
854    packet_out->po_enc_data    = buf;
855    packet_out->po_enc_data_sz = enc_sz;
856    packet_out->po_sent_sz     = sent_sz;
857    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ;
858
859    return ENCPA_OK;
860}
861
862
863STAILQ_HEAD(conns_stailq, lsquic_conn);
864TAILQ_HEAD(conns_tailq, lsquic_conn);
865
866
867struct conns_out_iter
868{
869    struct min_heap            *coi_heap;
870    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
871                                coi_inactive_list;
872    lsquic_conn_t              *coi_next;
873#ifndef NDEBUG
874    lsquic_time_t               coi_last_sent;
875#endif
876};
877
878
879static void
880coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
881{
882    iter->coi_heap = &engine->conns_out;
883    iter->coi_next = NULL;
884    TAILQ_INIT(&iter->coi_active_list);
885    TAILQ_INIT(&iter->coi_inactive_list);
886#ifndef NDEBUG
887    iter->coi_last_sent = 0;
888#endif
889}
890
891
892static lsquic_conn_t *
893coi_next (struct conns_out_iter *iter)
894{
895    lsquic_conn_t *conn;
896
897    if (lsquic_mh_count(iter->coi_heap) > 0)
898    {
899        conn = lsquic_mh_pop(iter->coi_heap);
900        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
901        conn->cn_flags |= LSCONN_COI_ACTIVE;
902#ifndef NDEBUG
903        if (iter->coi_last_sent)
904            assert(iter->coi_last_sent <= conn->cn_last_sent);
905        iter->coi_last_sent = conn->cn_last_sent;
906#endif
907        return conn;
908    }
909    else if (!TAILQ_EMPTY(&iter->coi_active_list))
910    {
911        conn = iter->coi_next;
912        if (!conn)
913            conn = TAILQ_FIRST(&iter->coi_active_list);
914        if (conn)
915            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
916        return conn;
917    }
918    else
919        return NULL;
920}
921
922
923static void
924coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
925{
926    if (!(conn->cn_flags & LSCONN_EVANESCENT))
927    {
928        assert(!TAILQ_EMPTY(&iter->coi_active_list));
929        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
930        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
931        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
932        conn->cn_flags |= LSCONN_COI_INACTIVE;
933    }
934}
935
936
937static void
938coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
939{
940    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
941    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
942    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
943    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
944    conn->cn_flags |= LSCONN_COI_ACTIVE;
945}
946
947
948static void
949coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
950{
951    lsquic_conn_t *conn;
952    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
953    {
954        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
955        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
956        lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
957    }
958    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
959    {
960        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
961        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
962        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
963    }
964}
965
966
967static unsigned
968send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
969                  struct out_batch *batch, unsigned n_to_send)
970{
971    int n_sent, i;
972    lsquic_time_t now;
973
974    /* Set sent time before the write to avoid underestimating RTT */
975    now = lsquic_time_now();
976    for (i = 0; i < (int) n_to_send; ++i)
977        batch->packets[i]->po_sent = now;
978    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
979                                                                n_to_send);
980    if (n_sent < (int) n_to_send)
981    {
982        engine->pub.enp_flags &= ~ENPUB_CAN_SEND;
983        LSQ_DEBUG("cannot send packets");
984        EV_LOG_GENERIC_EVENT("cannot send packets");
985    }
986    if (n_sent >= 0)
987        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
988    else
989    {
990        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
991        n_sent = 0;
992    }
993    if (n_sent > 0)
994        engine->last_sent = now + n_sent;
995    for (i = 0; i < n_sent; ++i)
996    {
997        eng_hist_inc(&engine->history, now, sl_packets_out);
998        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
999        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1000                                                    batch->packets[i]);
1001        /* `i' is added to maintain relative order */
1002        batch->conns[i]->cn_last_sent = now + i;
1003        /* Release packet out buffer as soon as the packet is sent
1004         * successfully.  If not successfully sent, we hold on to
1005         * this buffer until the packet sending is attempted again
1006         * or until it times out and regenerated.
1007         */
1008        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1009        {
1010            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1011            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1012                                                batch->packets[i]->po_enc_data);
1013            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1014        }
1015    }
1016    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1017        for ( ; i < (int) n_to_send; ++i)
1018            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1019    /* Return packets to the connection in reverse order so that the packet
1020     * ordering is maintained.
1021     */
1022    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1023    {
1024        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1025                                                    batch->packets[i]);
1026        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1027            coi_reactivate(conns_iter, batch->conns[i]);
1028    }
1029    return n_sent;
1030}
1031
1032
1033/* Return 1 if went past deadline, 0 otherwise */
1034static int
1035check_deadline (lsquic_engine_t *engine)
1036{
1037    if (engine->pub.enp_settings.es_proc_time_thresh &&
1038                                lsquic_time_now() > engine->deadline)
1039    {
1040        LSQ_INFO("went past threshold of %u usec, stop sending",
1041                            engine->pub.enp_settings.es_proc_time_thresh);
1042        engine->flags |= ENG_PAST_DEADLINE;
1043        return 1;
1044    }
1045    else
1046        return 0;
1047}
1048
1049
1050static void
1051send_packets_out (struct lsquic_engine *engine,
1052                  struct conns_tailq *ticked_conns,
1053                  struct conns_stailq *closed_conns)
1054{
1055    unsigned n, w, n_sent, n_batches_sent;
1056    lsquic_packet_out_t *packet_out;
1057    lsquic_conn_t *conn;
1058    struct out_batch *const batch = &engine->out_batch;
1059    struct conns_out_iter conns_iter;
1060    int shrink, deadline_exceeded;
1061
1062    coi_init(&conns_iter, engine);
1063    n_batches_sent = 0;
1064    n_sent = 0, n = 0;
1065    shrink = 0;
1066    deadline_exceeded = 0;
1067
1068    while ((conn = coi_next(&conns_iter)))
1069    {
1070        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1071        if (!packet_out) {
1072            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1073                                                            conn->cn_cid);
1074            coi_deactivate(&conns_iter, conn);
1075            continue;
1076        }
1077        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1078        {
1079            switch (encrypt_packet(engine, conn, packet_out))
1080            {
1081            case ENCPA_NOMEM:
1082                /* Send what we have and wait for a more opportune moment */
1083                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1084                goto end_for;
1085            case ENCPA_BADCRYPT:
1086                /* This is pretty bad: close connection immediately */
1087                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1088                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1089                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1090                {
1091                    if (!(conn->cn_flags & LSCONN_CLOSING))
1092                    {
1093                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1094                        engine_incref_conn(conn, LSCONN_CLOSING);
1095                        if (conn->cn_flags & LSCONN_HASHED)
1096                            remove_conn_from_hash(engine, conn);
1097                    }
1098                    coi_deactivate(&conns_iter, conn);
1099                    if (conn->cn_flags & LSCONN_TICKED)
1100                    {
1101                        TAILQ_REMOVE(ticked_conns, conn, cn_next_ticked);
1102                        engine_decref_conn(engine, conn, LSCONN_TICKED);
1103                    }
1104                }
1105                continue;
1106            case ENCPA_OK:
1107                break;
1108            }
1109        }
1110        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1111                                        packet_out->po_packno, conn->cn_cid);
1112        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1113        if (packet_out->po_flags & PO_ENCRYPTED)
1114        {
1115            batch->outs[n].buf     = packet_out->po_enc_data;
1116            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1117        }
1118        else
1119        {
1120            batch->outs[n].buf     = packet_out->po_data;
1121            batch->outs[n].sz      = packet_out->po_data_sz;
1122        }
1123        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1124        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1125        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1126        batch->conns  [n]          = conn;
1127        batch->packets[n]          = packet_out;
1128        ++n;
1129        if (n == engine->batch_size)
1130        {
1131            n = 0;
1132            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1133            ++n_batches_sent;
1134            n_sent += w;
1135            if (w < engine->batch_size)
1136            {
1137                shrink = 1;
1138                break;
1139            }
1140            deadline_exceeded = check_deadline(engine);
1141            if (deadline_exceeded)
1142                break;
1143            grow_batch_size(engine);
1144        }
1145    }
1146  end_for:
1147
1148    if (n > 0) {
1149        w = send_batch(engine, &conns_iter, batch, n);
1150        n_sent += w;
1151        shrink = w < n;
1152        ++n_batches_sent;
1153        deadline_exceeded = check_deadline(engine);
1154    }
1155
1156    if (shrink)
1157        shrink_batch_size(engine);
1158    else if (n_batches_sent > 1 && !deadline_exceeded)
1159        grow_batch_size(engine);
1160
1161    coi_reheap(&conns_iter, engine);
1162
1163    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1164}
1165
1166
1167int
1168lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1169{
1170    return lsquic_mh_count(&engine->conns_out) > 0
1171    ;
1172}
1173
1174
1175static void
1176reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1177{
1178    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1179    engine->flags &= ~ENG_PAST_DEADLINE;
1180}
1181
1182
1183/* TODO: this is a user-facing function, account for load */
1184void
1185lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1186{
1187    lsquic_conn_t *conn;
1188    struct conns_stailq closed_conns;
1189    struct conns_tailq ticked_conns = TAILQ_HEAD_INITIALIZER(ticked_conns);
1190
1191    STAILQ_INIT(&closed_conns);
1192    reset_deadline(engine, lsquic_time_now());
1193    if (!(engine->pub.enp_flags & ENPUB_CAN_SEND))
1194    {
1195        LSQ_DEBUG("can send again");
1196        EV_LOG_GENERIC_EVENT("can send again");
1197        engine->pub.enp_flags |= ENPUB_CAN_SEND;
1198    }
1199
1200    send_packets_out(engine, &ticked_conns, &closed_conns);
1201
1202    while ((conn = STAILQ_FIRST(&closed_conns))) {
1203        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1204        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1205    }
1206
1207}
1208
1209
1210static void
1211process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
1212                     lsquic_time_t now)
1213{
1214    lsquic_conn_t *conn;
1215    enum tick_st tick_st;
1216    unsigned i;
1217    lsquic_time_t next_tick_time;
1218    struct conns_stailq closed_conns;
1219    struct conns_tailq ticked_conns;
1220
1221    eng_hist_tick(&engine->history, now);
1222
1223    STAILQ_INIT(&closed_conns);
1224    TAILQ_INIT(&ticked_conns);
1225    reset_deadline(engine, now);
1226
1227    i = 0;
1228    while ((conn = next_conn(engine))
1229          )
1230    {
1231        tick_st = conn->cn_if->ci_tick(conn, now);
1232        conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
1233        if (tick_st & TICK_SEND)
1234        {
1235            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1236            {
1237                lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
1238                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1239            }
1240        }
1241        if (tick_st & TICK_CLOSE)
1242        {
1243            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1244            engine_incref_conn(conn, LSCONN_CLOSING);
1245            if (conn->cn_flags & LSCONN_HASHED)
1246                remove_conn_from_hash(engine, conn);
1247        }
1248        else
1249        {
1250            TAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
1251            engine_incref_conn(conn, LSCONN_TICKED);
1252        }
1253    }
1254
1255    if ((engine->pub.enp_flags & ENPUB_CAN_SEND)
1256                        && lsquic_engine_has_unsent_packets(engine))
1257        send_packets_out(engine, &ticked_conns, &closed_conns);
1258
1259    while ((conn = STAILQ_FIRST(&closed_conns))) {
1260        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1261        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1262    }
1263
1264    /* TODO Heapification can be optimized by switching to the Floyd method:
1265     * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
1266     */
1267    while ((conn = TAILQ_FIRST(&ticked_conns)))
1268    {
1269        TAILQ_REMOVE(&ticked_conns, conn, cn_next_ticked);
1270        engine_decref_conn(engine, conn, LSCONN_TICKED);
1271        if (!(conn->cn_flags & LSCONN_TICKABLE)
1272            && conn->cn_if->ci_is_tickable(conn))
1273        {
1274            lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
1275            engine_incref_conn(conn, LSCONN_TICKABLE);
1276        }
1277        else if (!(conn->cn_flags & LSCONN_ATTQ))
1278        {
1279            next_tick_time = conn->cn_if->ci_next_tick_time(conn);
1280            if (next_tick_time)
1281            {
1282                if (0 == attq_add(engine->attq, conn, next_tick_time))
1283                    engine_incref_conn(conn, LSCONN_ATTQ);
1284            }
1285            else
1286                assert(0);
1287        }
1288    }
1289
1290}
1291
1292
1293/* Return 0 if packet is being processed by a real connection, 1 if the
1294 * packet was processed, but not by a connection, and -1 on error.
1295 */
1296int
1297lsquic_engine_packet_in (lsquic_engine_t *engine,
1298    const unsigned char *packet_in_data, size_t packet_in_size,
1299    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1300    void *peer_ctx)
1301{
1302    struct packin_parse_state ppstate;
1303    lsquic_packet_in_t *packet_in;
1304    int (*parse_packet_in_begin) (struct lsquic_packet_in *, size_t length,
1305                                int is_server, struct packin_parse_state *);
1306
1307    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1308    {
1309        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1310            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1311        errno = E2BIG;
1312        return -1;
1313    }
1314
1315    if (conn_hash_using_addr(&engine->conns_hash))
1316    {
1317        const struct lsquic_conn *conn;
1318        conn = conn_hash_find_by_addr(&engine->conns_hash, sa_local);
1319        if (!conn)
1320            return -1;
1321        if ((1 << conn->cn_version) & LSQUIC_GQUIC_HEADER_VERSIONS)
1322            parse_packet_in_begin = lsquic_gquic_parse_packet_in_begin;
1323        else
1324            parse_packet_in_begin = lsquic_iquic_parse_packet_in_begin;
1325    }
1326    else
1327        parse_packet_in_begin = lsquic_parse_packet_in_begin;
1328
1329    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1330    if (!packet_in)
1331        return -1;
1332
1333    /* Library does not modify packet_in_data, it is not referenced after
1334     * this function returns and subsequent release of pi_data is guarded
1335     * by PI_OWN_DATA flag.
1336     */
1337    packet_in->pi_data = (unsigned char *) packet_in_data;
1338    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1339                                        engine->flags & ENG_SERVER, &ppstate))
1340    {
1341        LSQ_DEBUG("Cannot parse incoming packet's header");
1342        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1343        errno = EINVAL;
1344        return -1;
1345    }
1346
1347    packet_in->pi_received = lsquic_time_now();
1348    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1349    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1350                                                                    peer_ctx);
1351}
1352
1353
1354#if __GNUC__ && !defined(NDEBUG)
1355__attribute__((weak))
1356#endif
1357unsigned
1358lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1359{
1360    return engine->pub.enp_settings.es_versions;
1361}
1362
1363
1364int
1365lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1366{
1367    const lsquic_time_t *next_time;
1368    lsquic_time_t now;
1369
1370    if (((engine->flags & ENG_PAST_DEADLINE)
1371                                    && lsquic_mh_count(&engine->conns_out))
1372        || lsquic_mh_count(&engine->conns_tickable))
1373    {
1374        *diff = 0;
1375        return 1;
1376    }
1377
1378    next_time = attq_next_time(engine->attq);
1379    if (!next_time)
1380        return 0;
1381
1382    now = lsquic_time_now();
1383    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1384    return 1;
1385}
1386
1387
1388unsigned
1389lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1390{
1391    lsquic_time_t now;
1392    now = lsquic_time_now();
1393    if (from_now < 0)
1394        now -= from_now;
1395    else
1396        now += from_now;
1397    return attq_count_before(engine->attq, now);
1398}
1399