lsquic_engine.c revision 16a9b66a
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/time.h>
15#include <time.h>
16#include <netinet/in.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21#include <netdb.h>
22
23
24
25#include "lsquic.h"
26#include "lsquic_types.h"
27#include "lsquic_alarmset.h"
28#include "lsquic_parse.h"
29#include "lsquic_packet_in.h"
30#include "lsquic_packet_out.h"
31#include "lsquic_senhist.h"
32#include "lsquic_rtt.h"
33#include "lsquic_cubic.h"
34#include "lsquic_pacer.h"
35#include "lsquic_send_ctl.h"
36#include "lsquic_set.h"
37#include "lsquic_conn_flow.h"
38#include "lsquic_sfcw.h"
39#include "lsquic_stream.h"
40#include "lsquic_conn.h"
41#include "lsquic_full_conn.h"
42#include "lsquic_util.h"
43#include "lsquic_qtags.h"
44#include "lsquic_str.h"
45#include "lsquic_handshake.h"
46#include "lsquic_mm.h"
47#include "lsquic_conn_hash.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_eng_hist.h"
50#include "lsquic_ev_log.h"
51#include "lsquic_version.h"
52#include "lsquic_hash.h"
53#include "lsquic_attq.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
56#include "lsquic_logger.h"
57
58
59/* The batch of outgoing packets grows and shrinks dynamically */
60#define MAX_OUT_BATCH_SIZE 1024
61#define MIN_OUT_BATCH_SIZE 256
62#define INITIAL_OUT_BATCH_SIZE 512
63
64typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
65
66static void
67process_connections (struct lsquic_engine *engine, conn_iter_f iter);
68
69static void
70engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
71
72static lsquic_conn_t *
73engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
74                                        enum lsquic_conn_flags flag);
75
76static void
77force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
78
79/* Nested calls to LSQUIC are not supported */
80#define ENGINE_IN(e) do {                               \
81    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
82    (e)->pub.enp_flags |= ENPUB_PROC;                   \
83} while (0)
84
85#define ENGINE_OUT(e) do {                              \
86    assert((e)->pub.enp_flags & ENPUB_PROC);            \
87    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
88} while (0)
89
90/* A connection can be referenced from one of six places:
91 *
92 *   1. Connection hash: a connection starts its life in one of those.
93 *
94 *   2. Outgoing queue.
95 *
96 *   3. Incoming queue.
97 *
98 *   4. Pending RW Events queue.
99 *
100 *   5. Advisory Tick Time queue.
101 *
102 *   6. Closing connections queue.  This is a transient queue -- it only
103 *      exists for the duration of process_connections() function call.
104 *
105 * The idea is to destroy the connection when it is no longer referenced.
106 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
107 * that case, the connection is referenced from two places: (2) and (6).
108 * After its packets are sent, it is only referenced in (6), and at the
109 * end of the function call, when it is removed from (6), reference count
110 * goes to zero and the connection is destroyed.  If not all packets can
111 * be sent, at the end of the function call, the connection is referenced
112 * by (2) and will only be removed once all outgoing packets have been
113 * sent.
114 */
115#define CONN_REF_FLAGS  (LSCONN_HASHED          \
116                        |LSCONN_HAS_OUTGOING    \
117                        |LSCONN_HAS_INCOMING    \
118                        |LSCONN_RW_PENDING      \
119                        |LSCONN_CLOSING         \
120                        |LSCONN_ATTQ)
121
122
123struct out_heap_elem
124{
125    struct lsquic_conn  *ohe_conn;
126    lsquic_time_t        ohe_last_sent;
127};
128
129
130struct out_heap
131{
132    struct out_heap_elem    *oh_elems;
133    unsigned                 oh_nalloc,
134                             oh_nelem;
135};
136
137
138
139
140struct lsquic_engine
141{
142    struct lsquic_engine_public        pub;
143    enum {
144        ENG_SERVER      = LSENG_SERVER,
145        ENG_HTTP        = LSENG_HTTP,
146        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
147        ENG_PAST_DEADLINE
148                        = (1 <<  8),    /* Previous call to a processing
149                                         * function went past time threshold.
150                                         */
151#ifndef NDEBUG
152        ENG_DTOR        = (1 << 26),    /* Engine destructor */
153#endif
154    }                                  flags;
155    const struct lsquic_stream_if     *stream_if;
156    void                              *stream_if_ctx;
157    lsquic_packets_out_f               packets_out;
158    void                              *packets_out_ctx;
159    void                              *bad_handshake_ctx;
160    struct conn_hash                   full_conns;
161    TAILQ_HEAD(, lsquic_conn)          conns_in, conns_pend_rw;
162    struct out_heap                    conns_out;
163    /* Use a union because only one iterator is being used at any one time */
164    union {
165        struct {
166            /* This iterator does not have any state: it uses `conns_in' */
167        }           conn_in;
168        struct {
169            /* This iterator does not have any state: it uses `conns_pend_rw' */
170        }           rw_pend;
171        struct {
172            /* Iterator state to process connections in Advisory Tick Time
173             * queue.
174             */
175            lsquic_time_t   cutoff;
176        }           attq;
177        struct {
178            /* Iterator state to process all connections */
179        }           all;
180        struct {
181            lsquic_conn_t  *conn;
182        }           one;
183    }                                  iter_state;
184    struct eng_hist                    history;
185    unsigned                           batch_size;
186    unsigned                           time_until_desired_tick;
187    struct attq                       *attq;
188    lsquic_time_t                      proc_time;
189    /* Track time last time a packet was sent to give new connections
190     * priority lower than that of existing connections.
191     */
192    lsquic_time_t                      last_sent;
193    lsquic_time_t                      deadline;
194};
195
196
197#define OHE_PARENT(i) ((i - 1) / 2)
198#define OHE_LCHILD(i) (2 * i + 1)
199#define OHE_RCHILD(i) (2 * i + 2)
200
201
202static void
203heapify_out_heap (struct out_heap *heap, unsigned i)
204{
205    struct out_heap_elem el;
206    unsigned smallest;
207
208    assert(i < heap->oh_nelem);
209
210    if (OHE_LCHILD(i) < heap->oh_nelem)
211    {
212        if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
213                                    heap->oh_elems[ i ].ohe_last_sent)
214            smallest = OHE_LCHILD(i);
215        else
216            smallest = i;
217        if (OHE_RCHILD(i) < heap->oh_nelem &&
218            heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
219                                    heap->oh_elems[ smallest ].ohe_last_sent)
220            smallest = OHE_RCHILD(i);
221    }
222    else
223        smallest = i;
224
225    if (smallest != i)
226    {
227        el = heap->oh_elems[ smallest ];
228        heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
229        heap->oh_elems[ i ] = el;
230        heapify_out_heap(heap, smallest);
231    }
232}
233
234
235static void
236oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
237{
238    struct out_heap_elem el;
239    unsigned nalloc, i;
240
241    if (heap->oh_nelem == heap->oh_nalloc)
242    {
243        if (0 == heap->oh_nalloc)
244            nalloc = 4;
245        else
246            nalloc = heap->oh_nalloc * 2;
247        heap->oh_elems = realloc(heap->oh_elems,
248                                    nalloc * sizeof(heap->oh_elems[0]));
249        if (!heap->oh_elems)
250        {   /* Not much we can do here */
251            LSQ_ERROR("realloc failed");
252            return;
253        }
254        heap->oh_nalloc = nalloc;
255    }
256
257    heap->oh_elems[ heap->oh_nelem ].ohe_conn      = conn;
258    heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
259    ++heap->oh_nelem;
260
261    i = heap->oh_nelem - 1;
262    while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
263                                    heap->oh_elems[ i ].ohe_last_sent)
264    {
265        el = heap->oh_elems[ OHE_PARENT(i) ];
266        heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
267        heap->oh_elems[ i ] = el;
268        i = OHE_PARENT(i);
269    }
270}
271
272
273static struct lsquic_conn *
274oh_pop (struct out_heap *heap)
275{
276    struct lsquic_conn *conn;
277
278    assert(heap->oh_nelem);
279
280    conn = heap->oh_elems[0].ohe_conn;
281    --heap->oh_nelem;
282    if (heap->oh_nelem > 0)
283    {
284        heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
285        heapify_out_heap(heap, 0);
286    }
287
288    return conn;
289}
290
291
292void
293lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
294                             unsigned flags)
295{
296    memset(settings, 0, sizeof(*settings));
297    settings->es_versions        = LSQUIC_DF_VERSIONS;
298    if (flags & ENG_SERVER)
299    {
300        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
301        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
302        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
303    }
304    else
305    {
306        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
307        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
308        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
309    }
310    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
311    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
312    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
313    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
314    settings->es_max_header_list_size
315                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
316    settings->es_ua              = LSQUIC_DF_UA;
317
318    settings->es_pdmd            = QTAG_X509;
319    settings->es_aead            = QTAG_AESG;
320    settings->es_kexs            = QTAG_C255;
321    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
322    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
323    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
324    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
325    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
326    settings->es_pendrw_check    = LSQUIC_DF_PENDRW_CHECK;
327    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
328    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
329    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
330}
331
332
333/* Note: if returning an error, err_buf must be valid if non-NULL */
334int
335lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
336                              unsigned flags,
337                              char *err_buf, size_t err_buf_sz)
338{
339    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
340        settings->es_sfcw < LSQUIC_MIN_FCW)
341    {
342        if (err_buf)
343            snprintf(err_buf, err_buf_sz, "%s",
344                                            "flow control window set too low");
345        return -1;
346    }
347    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
348    {
349        if (err_buf)
350            snprintf(err_buf, err_buf_sz, "%s",
351                        "No supported QUIC versions specified");
352        return -1;
353    }
354    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
355    {
356        if (err_buf)
357            snprintf(err_buf, err_buf_sz, "%s",
358                        "one or more unsupported QUIC version is specified");
359        return -1;
360    }
361    return 0;
362}
363
364
365static void
366free_packet (void *ctx, unsigned char *packet_data)
367{
368    free(packet_data);
369}
370
371
372static void *
373malloc_buf (void *ctx, size_t size)
374{
375    return malloc(size);
376}
377
378
379static const struct lsquic_packout_mem_if stock_pmi =
380{
381    malloc_buf, (void(*)(void *, void *)) free_packet,
382};
383
384
385lsquic_engine_t *
386lsquic_engine_new (unsigned flags,
387                   const struct lsquic_engine_api *api)
388{
389    lsquic_engine_t *engine;
390    int tag_buf_len;
391    char err_buf[100];
392
393    if (!api->ea_packets_out)
394    {
395        LSQ_ERROR("packets_out callback is not specified");
396        return NULL;
397    }
398
399    if (api->ea_settings &&
400                0 != lsquic_engine_check_settings(api->ea_settings, flags,
401                                                    err_buf, sizeof(err_buf)))
402    {
403        LSQ_ERROR("cannot create engine: %s", err_buf);
404        return NULL;
405    }
406
407    engine = calloc(1, sizeof(*engine));
408    if (!engine)
409        return NULL;
410    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
411    {
412        free(engine);
413        return NULL;
414    }
415    if (api->ea_settings)
416        engine->pub.enp_settings        = *api->ea_settings;
417    else
418        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
419    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
420                                    sizeof(engine->pub.enp_ver_tags_buf),
421                                    engine->pub.enp_settings.es_versions);
422    if (tag_buf_len <= 0)
423    {
424        LSQ_ERROR("cannot generate version tags buffer");
425        free(engine);
426        return NULL;
427    }
428    engine->pub.enp_ver_tags_len = tag_buf_len;
429
430    engine->flags           = flags;
431    engine->stream_if       = api->ea_stream_if;
432    engine->stream_if_ctx   = api->ea_stream_if_ctx;
433    engine->packets_out     = api->ea_packets_out;
434    engine->packets_out_ctx = api->ea_packets_out_ctx;
435    if (api->ea_pmi)
436    {
437        engine->pub.enp_pmi      = api->ea_pmi;
438        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
439    }
440    else
441    {
442        engine->pub.enp_pmi      = &stock_pmi;
443        engine->pub.enp_pmi_ctx  = NULL;
444    }
445    engine->pub.enp_engine = engine;
446    TAILQ_INIT(&engine->conns_in);
447    TAILQ_INIT(&engine->conns_pend_rw);
448    conn_hash_init(&engine->full_conns, ~0);
449    engine->attq = attq_create();
450    eng_hist_init(&engine->history);
451    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
452
453
454    LSQ_INFO("instantiated engine");
455    return engine;
456}
457
458
459static void
460grow_batch_size (struct lsquic_engine *engine)
461{
462    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
463}
464
465
466static void
467shrink_batch_size (struct lsquic_engine *engine)
468{
469    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
470}
471
472
473/* Wrapper to make sure important things occur before the connection is
474 * really destroyed.
475 */
476static void
477destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
478{
479    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
480    conn->cn_if->ci_destroy(conn);
481}
482
483
484static lsquic_conn_t *
485new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
486                      unsigned short max_packet_size)
487{
488    lsquic_conn_t *conn;
489    unsigned flags;
490    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
491    conn = full_conn_client_new(&engine->pub, engine->stream_if,
492                    engine->stream_if_ctx, flags, hostname, max_packet_size);
493    if (!conn)
494        return NULL;
495    if (0 != conn_hash_add(&engine->full_conns, conn))
496    {
497        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
498            conn->cn_cid);
499        destroy_conn(engine, conn);
500        return NULL;
501    }
502    assert(!(conn->cn_flags &
503        (CONN_REF_FLAGS
504         & ~LSCONN_RW_PENDING /* This flag may be set as effect of user
505                                 callbacks */
506                             )));
507    conn->cn_flags |= LSCONN_HASHED;
508    return conn;
509}
510
511
512static lsquic_conn_t *
513find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
514         struct packin_parse_state *ppstate, const struct sockaddr *sa_peer,
515         void *peer_ctx)
516{
517    lsquic_conn_t *conn;
518
519    if (lsquic_packet_in_is_prst(packet_in)
520                                && !engine->pub.enp_settings.es_honor_prst)
521    {
522        LSQ_DEBUG("public reset packet: discarding");
523        return NULL;
524    }
525
526    if (!(packet_in->pi_flags & PI_CONN_ID))
527    {
528        LSQ_DEBUG("packet header does not have connection ID: discarding");
529        return NULL;
530    }
531
532    conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id);
533    if (conn)
534    {
535        conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
536        return conn;
537    }
538
539    return conn;
540}
541
542
543static void
544add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
545                                                        enum rw_reason reason)
546{
547    int hist_idx;
548
549    TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
550    engine_incref_conn(conn, LSCONN_RW_PENDING);
551
552    hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
553    conn->cn_rw_hist_buf[ hist_idx ] = reason;
554    ++conn->cn_rw_hist_idx;
555
556    if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
557        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
558            "rw_hist: %.*s", (char) reason,
559            (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
560    else
561        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
562                                                                (char) reason);
563}
564
565
566#if !defined(NDEBUG) && __GNUC__
567__attribute__((weak))
568#endif
569void
570lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
571                                    lsquic_conn_t *conn, enum rw_reason reason)
572{
573    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
574        0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
575    {
576        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
577        add_conn_to_pend_rw(engine, conn, reason);
578    }
579}
580
581
582void
583lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
584                                lsquic_conn_t *conn, lsquic_time_t tick_time)
585{
586    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
587    /* Instead of performing an update, we simply remove the connection from
588     * the queue and add it back.  This should not happen in at the time of
589     * this writing.
590     */
591    if (conn->cn_flags & LSCONN_ATTQ)
592    {
593        attq_remove(engine->attq, conn);
594        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
595    }
596    if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
597                        0 == attq_maybe_add(engine->attq, conn, tick_time))
598        engine_incref_conn(conn, LSCONN_ATTQ);
599}
600
601
602static void
603update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
604                                                            int progress_made)
605{
606    rw_hist_idx_t hist_idx;
607    const unsigned char *empty;
608    const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
609
610    if (!pendrw_check)
611        return;
612
613    /* Convert previous entry to uppercase: */
614    hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
615    conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
616
617    LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
618    if (progress_made)
619    {
620        conn->cn_noprogress_count = 0;
621        return;
622    }
623
624    EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
625                                                                "no progress");
626    ++conn->cn_noprogress_count;
627    if (conn->cn_noprogress_count <= pendrw_check)
628        return;
629
630    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
631    empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
632                                            sizeof(conn->cn_rw_hist_buf));
633    if (empty)
634        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
635            "(rw_hist: %.*s): will not put it onto Pend RW queue again",
636            conn->cn_cid, conn->cn_noprogress_count,
637            (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
638    else
639    {
640        hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
641        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
642            "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
643            conn->cn_cid, conn->cn_noprogress_count,
644            /* First part of history: */
645            (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
646                                            conn->cn_rw_hist_buf + hist_idx,
647            /* Second part of history: */
648            hist_idx, conn->cn_rw_hist_buf);
649    }
650}
651
652
653/* Return 0 if packet is being processed by a connections, otherwise return 1 */
654static int
655process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
656       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
657       const struct sockaddr *sa_peer, void *peer_ctx)
658{
659    lsquic_conn_t *conn;
660
661    conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx);
662    if (!conn)
663    {
664        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
665        return 1;
666    }
667
668    if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
669        TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
670        engine_incref_conn(conn, LSCONN_HAS_INCOMING);
671    }
672    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
673    lsquic_packet_in_upref(packet_in);
674    conn->cn_peer_ctx = peer_ctx;
675    conn->cn_if->ci_packet_in(conn, packet_in);
676    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
677    return 0;
678}
679
680
681static int
682conn_attq_expired (const struct lsquic_engine *engine,
683                                                const lsquic_conn_t *conn)
684{
685    assert(conn->cn_attq_elem);
686    return lsquic_conn_adv_time(conn) < engine->proc_time;
687}
688
689
690/* Iterator for connections with incoming packets */
691static lsquic_conn_t *
692conn_iter_next_incoming (struct lsquic_engine *engine)
693{
694    enum lsquic_conn_flags addl_flags;
695    lsquic_conn_t *conn;
696    while ((conn = TAILQ_FIRST(&engine->conns_in)))
697    {
698        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
699        if (conn->cn_flags & LSCONN_RW_PENDING)
700        {
701            TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
702            EV_LOG_CONN_EVENT(conn->cn_cid,
703                "removed from pending RW queue (processing incoming)");
704        }
705        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
706        {
707            addl_flags = LSCONN_ATTQ;
708            attq_remove(engine->attq, conn);
709        }
710        else
711            addl_flags = 0;
712        conn = engine_decref_conn(engine, conn,
713                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
714        if (conn)
715            break;
716    }
717    return conn;
718}
719
720
721/* Iterator for connections with that have pending read/write events */
722static lsquic_conn_t *
723conn_iter_next_rw_pend (struct lsquic_engine *engine)
724{
725    enum lsquic_conn_flags addl_flags;
726    lsquic_conn_t *conn;
727    while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
728    {
729        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
730        EV_LOG_CONN_EVENT(conn->cn_cid,
731            "removed from pending RW queue (processing pending RW conns)");
732        if (conn->cn_flags & LSCONN_HAS_INCOMING)
733            TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
734        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
735        {
736            addl_flags = LSCONN_ATTQ;
737            attq_remove(engine->attq, conn);
738        }
739        else
740            addl_flags = 0;
741        conn = engine_decref_conn(engine, conn,
742                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
743        if (conn)
744            break;
745    }
746    return conn;
747}
748
749
750void
751lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
752{
753    LSQ_DEBUG("process connections with incoming packets");
754    ENGINE_IN(engine);
755    process_connections(engine, conn_iter_next_incoming);
756    assert(TAILQ_EMPTY(&engine->conns_in));
757    ENGINE_OUT(engine);
758}
759
760
761int
762lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
763{
764    return !(engine->flags & ENG_PAST_DEADLINE)
765        && !TAILQ_EMPTY(&engine->conns_pend_rw);
766}
767
768
769void
770lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
771{
772    LSQ_DEBUG("process connections with pending RW events");
773    ENGINE_IN(engine);
774    process_connections(engine, conn_iter_next_rw_pend);
775    ENGINE_OUT(engine);
776}
777
778
779void
780lsquic_engine_destroy (lsquic_engine_t *engine)
781{
782    lsquic_conn_t *conn;
783
784    LSQ_DEBUG("destroying engine");
785#ifndef NDEBUG
786    engine->flags |= ENG_DTOR;
787#endif
788
789    while (engine->conns_out.oh_nelem > 0)
790    {
791        --engine->conns_out.oh_nelem;
792        conn = engine->conns_out.oh_elems[
793                                engine->conns_out.oh_nelem ].ohe_conn;
794        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
795        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
796    }
797
798    for (conn = conn_hash_first(&engine->full_conns); conn;
799                            conn = conn_hash_next(&engine->full_conns))
800        force_close_conn(engine, conn);
801    conn_hash_cleanup(&engine->full_conns);
802
803
804    attq_destroy(engine->attq);
805
806    assert(0 == engine->conns_out.oh_nelem);
807    assert(TAILQ_EMPTY(&engine->conns_pend_rw));
808    lsquic_mm_cleanup(&engine->pub.enp_mm);
809    free(engine->conns_out.oh_elems);
810    free(engine);
811}
812
813
814#if __GNUC__
815__attribute__((nonnull(3)))
816#endif
817static lsquic_conn_t *
818remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
819                                lsquic_conn_t *conn, const char *reason)
820{
821    assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
822    if (conn->cn_flags & LSCONN_HAS_INCOMING)
823        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
824    if (conn->cn_flags & LSCONN_RW_PENDING)
825    {
826        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
827        EV_LOG_CONN_EVENT(conn->cn_cid,
828                        "removed from pending RW queue (%s)", reason);
829    }
830    conn = engine_decref_conn(engine, conn,
831                        LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
832    assert(conn);
833    return conn;
834}
835
836
837static lsquic_conn_t *
838conn_iter_next_one (lsquic_engine_t *engine)
839{
840    lsquic_conn_t *conn = engine->iter_state.one.conn;
841    if (conn)
842    {
843        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
844            conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
845        if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
846                                            conn_attq_expired(engine, conn))
847        {
848            attq_remove(engine->attq, conn);
849            conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
850        }
851        engine->iter_state.one.conn = NULL;
852    }
853    return conn;
854}
855
856
857lsquic_conn_t *
858lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
859                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
860                       const char *hostname, unsigned short max_packet_size)
861{
862    lsquic_conn_t *conn;
863
864    if (engine->flags & ENG_SERVER)
865    {
866        LSQ_ERROR("`%s' must only be called in client mode", __func__);
867        return NULL;
868    }
869
870    if (0 == max_packet_size)
871    {
872        switch (peer_sa->sa_family)
873        {
874        case AF_INET:
875            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
876            break;
877        default:
878            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
879            break;
880        }
881    }
882
883    conn = new_full_conn_client(engine, hostname, max_packet_size);
884    if (!conn)
885        return NULL;
886    ENGINE_IN(engine);
887    lsquic_conn_record_peer_sa(conn, peer_sa);
888    conn->cn_peer_ctx = peer_ctx;
889    lsquic_conn_set_ctx(conn, conn_ctx);
890    engine->iter_state.one.conn = conn;
891    full_conn_client_call_on_new(conn);
892    process_connections(engine, conn_iter_next_one);
893    ENGINE_OUT(engine);
894    return conn;
895}
896
897
898static void
899remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
900{
901        conn_hash_remove(&engine->full_conns, conn);
902    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
903}
904
905
906static void
907refflags2str (enum lsquic_conn_flags flags, char s[7])
908{
909    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
910    *s = 'H'; s += !!(flags & LSCONN_HASHED);
911    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
912    *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
913    *s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
914    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
915    *s = '\0';
916}
917
918
919static void
920engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
921{
922    char str[7];
923    assert(flag & CONN_REF_FLAGS);
924    assert(!(conn->cn_flags & flag));
925    conn->cn_flags |= flag;
926    LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
927                            (refflags2str(conn->cn_flags, str), str));
928}
929
930
931static lsquic_conn_t *
932engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
933                                        enum lsquic_conn_flags flags)
934{
935    char str[7];
936    assert(flags & CONN_REF_FLAGS);
937    assert(conn->cn_flags & flags);
938#ifndef NDEBUG
939    if (flags & LSCONN_CLOSING)
940        assert(0 == (conn->cn_flags & LSCONN_HASHED));
941#endif
942    conn->cn_flags &= ~flags;
943    LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
944                            (refflags2str(conn->cn_flags, str), str));
945    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
946    {
947            eng_hist_inc(&engine->history, 0, sl_del_full_conns);
948        destroy_conn(engine, conn);
949        return NULL;
950    }
951    else
952        return conn;
953}
954
955
956/* This is not a general-purpose function.  Only call from engine dtor. */
957static void
958force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
959{
960    assert(engine->flags & ENG_DTOR);
961    const enum lsquic_conn_flags flags = conn->cn_flags;
962    assert(conn->cn_flags & CONN_REF_FLAGS);
963    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
964    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
965    if (flags & LSCONN_HAS_INCOMING)
966    {
967        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
968        (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
969    }
970    if (flags & LSCONN_RW_PENDING)
971    {
972        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
973        EV_LOG_CONN_EVENT(conn->cn_cid,
974            "removed from pending RW queue (engine destruction)");
975        (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
976    }
977    if (flags & LSCONN_ATTQ)
978        attq_remove(engine->attq, conn);
979    if (flags & LSCONN_HASHED)
980        remove_conn_from_hash(engine, conn);
981}
982
983
984/* Iterator for all connections.
985 * Returned connections are removed from the Incoming, Pending RW Event,
986 * and Advisory Tick Time queues if necessary.
987 */
988static lsquic_conn_t *
989conn_iter_next_all (struct lsquic_engine *engine)
990{
991    lsquic_conn_t *conn;
992
993    conn = conn_hash_next(&engine->full_conns);
994
995    if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
996        conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
997    if (conn && (conn->cn_flags & LSCONN_ATTQ)
998                                        && conn_attq_expired(engine, conn))
999    {
1000        attq_remove(engine->attq, conn);
1001        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1002    }
1003
1004    return conn;
1005}
1006
1007
1008static lsquic_conn_t *
1009conn_iter_next_attq (struct lsquic_engine *engine)
1010{
1011    lsquic_conn_t *conn;
1012
1013    conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
1014    if (conn)
1015    {
1016        assert(conn->cn_flags & LSCONN_ATTQ);
1017        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
1018            conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
1019        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1020    }
1021
1022    return conn;
1023}
1024
1025
1026void
1027lsquic_engine_proc_all (lsquic_engine_t *engine)
1028{
1029    ENGINE_IN(engine);
1030    /* We poke each connection every time as initial implementation.  If it
1031     * proves to be too inefficient, we will need to figure out
1032     *          a) when to stop processing; and
1033     *          b) how to remember state between calls.
1034     */
1035    conn_hash_reset_iter(&engine->full_conns);
1036    process_connections(engine, conn_iter_next_all);
1037    ENGINE_OUT(engine);
1038}
1039
1040
1041void
1042lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
1043{
1044    lsquic_time_t prev_min, now;
1045
1046    now = lsquic_time_now();
1047    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
1048    {
1049        const lsquic_time_t *expected_time;
1050        int64_t diff;
1051        expected_time = attq_next_time(engine->attq);
1052        if (expected_time)
1053            diff = *expected_time - now;
1054        else
1055            diff = -1;
1056        LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff);
1057    }
1058
1059    ENGINE_IN(engine);
1060    prev_min = attq_set_min(engine->attq, now);  /* Prevent infinite loop */
1061    engine->iter_state.attq.cutoff = now;
1062    process_connections(engine, conn_iter_next_attq);
1063    attq_set_min(engine->attq, prev_min);           /* Restore previos value */
1064    ENGINE_OUT(engine);
1065}
1066
1067
1068static int
1069generate_header (const lsquic_packet_out_t *packet_out,
1070                 const struct parse_funcs *pf, lsquic_cid_t cid,
1071                 unsigned char *buf, size_t bufsz)
1072{
1073    return pf->pf_gen_reg_pkt_header(buf, bufsz,
1074        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
1075        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
1076        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
1077        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
1078}
1079
1080
1081static ssize_t
1082really_encrypt_packet (const lsquic_conn_t *conn,
1083                       const lsquic_packet_out_t *packet_out,
1084                       unsigned char *buf, size_t bufsz)
1085{
1086    int enc, header_sz, is_hello_packet;
1087    size_t packet_sz;
1088    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
1089
1090    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
1091                                            header_buf, sizeof(header_buf));
1092    if (header_sz < 0)
1093        return -1;
1094
1095    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
1096    enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0,
1097                packet_out->po_packno, header_buf, header_sz,
1098                packet_out->po_data, packet_out->po_data_sz,
1099                buf, bufsz, &packet_sz, is_hello_packet);
1100    if (0 == enc)
1101    {
1102        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
1103            "ciphertext is %zd bytes",
1104            packet_out->po_packno,
1105            lsquic_po_header_length(packet_out->po_flags) +
1106                                                packet_out->po_data_sz,
1107            packet_sz);
1108        return packet_sz;
1109    }
1110    else
1111        return -1;
1112}
1113
1114
1115static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
1116encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
1117                                            lsquic_packet_out_t *packet_out)
1118{
1119    ssize_t enc_sz;
1120    size_t bufsz;
1121    unsigned sent_sz;
1122    unsigned char *buf;
1123
1124    bufsz = lsquic_po_header_length(packet_out->po_flags) +
1125                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
1126    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
1127    if (!buf)
1128    {
1129        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
1130                                                                        bufsz);
1131        return ENCPA_NOMEM;
1132    }
1133
1134    {
1135        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
1136        sent_sz = enc_sz;
1137    }
1138
1139    if (enc_sz < 0)
1140    {
1141        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
1142        return ENCPA_BADCRYPT;
1143    }
1144
1145    packet_out->po_enc_data    = buf;
1146    packet_out->po_enc_data_sz = enc_sz;
1147    packet_out->po_sent_sz     = sent_sz;
1148    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ;
1149
1150    return ENCPA_OK;
1151}
1152
1153
1154struct out_batch
1155{
1156    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
1157    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
1158    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
1159};
1160
1161
1162STAILQ_HEAD(closed_conns, lsquic_conn);
1163
1164
1165struct conns_out_iter
1166{
1167    struct out_heap            *coi_heap;
1168    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
1169                                coi_inactive_list;
1170    lsquic_conn_t              *coi_next;
1171#ifndef NDEBUG
1172    lsquic_time_t               coi_last_sent;
1173#endif
1174};
1175
1176
1177static void
1178coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
1179{
1180    iter->coi_heap = &engine->conns_out;
1181    iter->coi_next = NULL;
1182    TAILQ_INIT(&iter->coi_active_list);
1183    TAILQ_INIT(&iter->coi_inactive_list);
1184#ifndef NDEBUG
1185    iter->coi_last_sent = 0;
1186#endif
1187}
1188
1189
1190static lsquic_conn_t *
1191coi_next (struct conns_out_iter *iter)
1192{
1193    lsquic_conn_t *conn;
1194
1195    if (iter->coi_heap->oh_nelem > 0)
1196    {
1197        conn = oh_pop(iter->coi_heap);
1198        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1199        conn->cn_flags |= LSCONN_COI_ACTIVE;
1200#ifndef NDEBUG
1201        if (iter->coi_last_sent)
1202            assert(iter->coi_last_sent <= conn->cn_last_sent);
1203        iter->coi_last_sent = conn->cn_last_sent;
1204#endif
1205        return conn;
1206    }
1207    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1208    {
1209        conn = iter->coi_next;
1210        if (!conn)
1211            conn = TAILQ_FIRST(&iter->coi_active_list);
1212        if (conn)
1213            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1214        return conn;
1215    }
1216    else
1217        return NULL;
1218}
1219
1220
1221static void
1222coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1223{
1224    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1225    {
1226        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1227        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1228        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1229        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1230        conn->cn_flags |= LSCONN_COI_INACTIVE;
1231    }
1232}
1233
1234
1235static void
1236coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn)
1237{
1238    assert(conn->cn_flags & LSCONN_COI_ACTIVE);
1239    if (conn->cn_flags & LSCONN_COI_ACTIVE)
1240    {
1241        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1242        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1243    }
1244}
1245
1246
1247static void
1248coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1249{
1250    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1251    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1252    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1253    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1254    conn->cn_flags |= LSCONN_COI_ACTIVE;
1255}
1256
1257
1258static void
1259coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1260{
1261    lsquic_conn_t *conn;
1262    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1263    {
1264        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1265        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1266        oh_insert(iter->coi_heap, conn);
1267    }
1268    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1269    {
1270        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1271        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1272        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1273    }
1274}
1275
1276
1277static unsigned
1278send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1279                  struct out_batch *batch, unsigned n_to_send)
1280{
1281    int n_sent, i;
1282    lsquic_time_t now;
1283
1284    /* Set sent time before the write to avoid underestimating RTT */
1285    now = lsquic_time_now();
1286    for (i = 0; i < (int) n_to_send; ++i)
1287        batch->packets[i]->po_sent = now;
1288    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1289                                                                n_to_send);
1290    if (n_sent >= 0)
1291        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1292    else
1293    {
1294        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1295        n_sent = 0;
1296    }
1297    if (n_sent > 0)
1298        engine->last_sent = now + n_sent;
1299    for (i = 0; i < n_sent; ++i)
1300    {
1301        eng_hist_inc(&engine->history, now, sl_packets_out);
1302        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1303        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1304                                                    batch->packets[i]);
1305        /* `i' is added to maintain relative order */
1306        batch->conns[i]->cn_last_sent = now + i;
1307        /* Release packet out buffer as soon as the packet is sent
1308         * successfully.  If not successfully sent, we hold on to
1309         * this buffer until the packet sending is attempted again
1310         * or until it times out and regenerated.
1311         */
1312        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1313        {
1314            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1315            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1316                                                batch->packets[i]->po_enc_data);
1317            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1318        }
1319    }
1320    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1321        for ( ; i < (int) n_to_send; ++i)
1322            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1323    /* Return packets to the connection in reverse order so that the packet
1324     * ordering is maintained.
1325     */
1326    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1327    {
1328        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1329                                                    batch->packets[i]);
1330        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1331            coi_reactivate(conns_iter, batch->conns[i]);
1332    }
1333    return n_sent;
1334}
1335
1336
1337/* Return 1 if went past deadline, 0 otherwise */
1338static int
1339check_deadline (lsquic_engine_t *engine)
1340{
1341    if (engine->pub.enp_settings.es_proc_time_thresh &&
1342                                lsquic_time_now() > engine->deadline)
1343    {
1344        LSQ_INFO("went past threshold of %u usec, stop sending",
1345                            engine->pub.enp_settings.es_proc_time_thresh);
1346        engine->flags |= ENG_PAST_DEADLINE;
1347        return 1;
1348    }
1349    else
1350        return 0;
1351}
1352
1353
1354static void
1355send_packets_out (struct lsquic_engine *engine,
1356                  struct closed_conns *closed_conns)
1357{
1358    unsigned n, w, n_sent, n_batches_sent;
1359    lsquic_packet_out_t *packet_out;
1360    lsquic_conn_t *conn;
1361    struct out_batch batch;
1362    struct conns_out_iter conns_iter;
1363    int shrink, deadline_exceeded;
1364
1365    coi_init(&conns_iter, engine);
1366    n_batches_sent = 0;
1367    n_sent = 0, n = 0;
1368    shrink = 0;
1369    deadline_exceeded = 0;
1370
1371    while ((conn = coi_next(&conns_iter)))
1372    {
1373        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1374        if (!packet_out) {
1375            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1376                                                            conn->cn_cid);
1377            coi_deactivate(&conns_iter, conn);
1378            continue;
1379        }
1380        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1381        {
1382            switch (encrypt_packet(engine, conn, packet_out))
1383            {
1384            case ENCPA_NOMEM:
1385                /* Send what we have and wait for a more opportune moment */
1386                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1387                goto end_for;
1388            case ENCPA_BADCRYPT:
1389                /* This is pretty bad: close connection immediately */
1390                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1391                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1392                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1393                {
1394                    if (!(conn->cn_flags & LSCONN_CLOSING))
1395                    {
1396                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1397                        engine_incref_conn(conn, LSCONN_CLOSING);
1398                        if (conn->cn_flags & LSCONN_HASHED)
1399                            remove_conn_from_hash(engine, conn);
1400                    }
1401                    coi_remove(&conns_iter, conn);
1402                }
1403                continue;
1404            case ENCPA_OK:
1405                break;
1406            }
1407        }
1408        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1409                                        packet_out->po_packno, conn->cn_cid);
1410        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1411        if (packet_out->po_flags & PO_ENCRYPTED)
1412        {
1413            batch.outs[n].buf     = packet_out->po_enc_data;
1414            batch.outs[n].sz      = packet_out->po_enc_data_sz;
1415        }
1416        else
1417        {
1418            batch.outs[n].buf     = packet_out->po_data;
1419            batch.outs[n].sz      = packet_out->po_data_sz;
1420        }
1421        batch.outs   [n].peer_ctx = conn->cn_peer_ctx;
1422        batch.outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1423        batch.outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1424        batch.conns  [n]          = conn;
1425        batch.packets[n]          = packet_out;
1426        ++n;
1427        if (n == engine->batch_size)
1428        {
1429            n = 0;
1430            w = send_batch(engine, &conns_iter, &batch, engine->batch_size);
1431            ++n_batches_sent;
1432            n_sent += w;
1433            if (w < engine->batch_size)
1434            {
1435                shrink = 1;
1436                break;
1437            }
1438            deadline_exceeded = check_deadline(engine);
1439            if (deadline_exceeded)
1440                break;
1441            grow_batch_size(engine);
1442        }
1443    }
1444  end_for:
1445
1446    if (n > 0) {
1447        w = send_batch(engine, &conns_iter, &batch, n);
1448        n_sent += w;
1449        shrink = w < n;
1450        ++n_batches_sent;
1451        deadline_exceeded = check_deadline(engine);
1452    }
1453
1454    if (shrink)
1455        shrink_batch_size(engine);
1456    else if (n_batches_sent > 1 && !deadline_exceeded)
1457        grow_batch_size(engine);
1458
1459    coi_reheap(&conns_iter, engine);
1460
1461    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1462}
1463
1464
1465int
1466lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1467{
1468    return !(engine->flags & ENG_PAST_DEADLINE)
1469        && (    engine->conns_out.oh_nelem > 0
1470           )
1471    ;
1472}
1473
1474
1475static void
1476reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1477{
1478    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1479    engine->flags &= ~ENG_PAST_DEADLINE;
1480}
1481
1482
1483/* TODO: this is a user-facing function, account for load */
1484void
1485lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1486{
1487    lsquic_conn_t *conn;
1488    struct closed_conns closed_conns;
1489
1490    STAILQ_INIT(&closed_conns);
1491    reset_deadline(engine, lsquic_time_now());
1492
1493    send_packets_out(engine, &closed_conns);
1494
1495    while ((conn = STAILQ_FIRST(&closed_conns))) {
1496        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1497        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1498    }
1499
1500}
1501
1502
1503static void
1504process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
1505{
1506    lsquic_conn_t *conn;
1507    enum tick_st tick_st;
1508    lsquic_time_t now = lsquic_time_now();
1509    struct closed_conns closed_conns;
1510
1511    engine->proc_time = now;
1512    eng_hist_tick(&engine->history, now);
1513
1514    STAILQ_INIT(&closed_conns);
1515    reset_deadline(engine, now);
1516
1517    while ((conn = next_conn(engine)))
1518    {
1519        tick_st = conn->cn_if->ci_tick(conn, now);
1520        if (conn_iter_next_rw_pend == next_conn)
1521            update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
1522        if (tick_st & TICK_SEND)
1523        {
1524            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1525            {
1526                oh_insert(&engine->conns_out, conn);
1527                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1528            }
1529        }
1530        if (tick_st & TICK_CLOSE)
1531        {
1532            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1533            engine_incref_conn(conn, LSCONN_CLOSING);
1534            if (conn->cn_flags & LSCONN_HASHED)
1535                remove_conn_from_hash(engine, conn);
1536        }
1537    }
1538
1539    if (lsquic_engine_has_unsent_packets(engine))
1540        send_packets_out(engine, &closed_conns);
1541
1542    while ((conn = STAILQ_FIRST(&closed_conns))) {
1543        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1544        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1545    }
1546
1547}
1548
1549
1550/* Return 0 if packet is being processed by a real connection, 1 if the
1551 * packet was processed, but not by a connection, and -1 on error.
1552 */
1553int
1554lsquic_engine_packet_in (lsquic_engine_t *engine,
1555    const unsigned char *packet_in_data, size_t packet_in_size,
1556    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1557    void *peer_ctx)
1558{
1559    struct packin_parse_state ppstate;
1560    lsquic_packet_in_t *packet_in;
1561
1562    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1563    {
1564        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1565            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1566        errno = E2BIG;
1567        return -1;
1568    }
1569
1570    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1571    if (!packet_in)
1572        return -1;
1573
1574    /* Library does not modify packet_in_data, it is not referenced after
1575     * this function returns and subsequent release of pi_data is guarded
1576     * by PI_OWN_DATA flag.
1577     */
1578    packet_in->pi_data = (unsigned char *) packet_in_data;
1579    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1580                                        engine->flags & ENG_SERVER, &ppstate))
1581    {
1582        LSQ_DEBUG("Cannot parse incoming packet's header");
1583        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1584        errno = EINVAL;
1585        return -1;
1586    }
1587
1588    packet_in->pi_received = lsquic_time_now();
1589    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1590    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1591                                                                    peer_ctx);
1592}
1593
1594
1595#if __GNUC__ && !defined(NDEBUG)
1596__attribute__((weak))
1597#endif
1598unsigned
1599lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1600{
1601    return engine->pub.enp_settings.es_versions;
1602}
1603
1604
1605int
1606lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1607{
1608    const lsquic_time_t *next_time;
1609    lsquic_time_t now;
1610
1611    next_time = attq_next_time(engine->attq);
1612    if (!next_time)
1613        return 0;
1614
1615    now = lsquic_time_now();
1616    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1617    return 1;
1618}
1619
1620
1621unsigned
1622lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1623{
1624    lsquic_time_t now;
1625    now = lsquic_time_now();
1626    if (from_now < 0)
1627        now -= from_now;
1628    else
1629        now += from_now;
1630    return attq_count_before(engine->attq, now);
1631}
1632
1633
1634