lsquic_engine.c revision da710add
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <time.h>
15#ifndef WIN32
16#include <sys/time.h>
17#include <netinet/in.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <fcntl.h>
21#include <unistd.h>
22#include <netdb.h>
23#endif
24
25
26
27#include "lsquic.h"
28#include "lsquic_types.h"
29#include "lsquic_alarmset.h"
30#include "lsquic_parse.h"
31#include "lsquic_packet_in.h"
32#include "lsquic_packet_out.h"
33#include "lsquic_senhist.h"
34#include "lsquic_rtt.h"
35#include "lsquic_cubic.h"
36#include "lsquic_pacer.h"
37#include "lsquic_send_ctl.h"
38#include "lsquic_set.h"
39#include "lsquic_conn_flow.h"
40#include "lsquic_sfcw.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn.h"
43#include "lsquic_full_conn.h"
44#include "lsquic_util.h"
45#include "lsquic_qtags.h"
46#include "lsquic_str.h"
47#include "lsquic_handshake.h"
48#include "lsquic_mm.h"
49#include "lsquic_conn_hash.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_eng_hist.h"
52#include "lsquic_ev_log.h"
53#include "lsquic_version.h"
54#include "lsquic_hash.h"
55#include "lsquic_attq.h"
56
57#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
58#include "lsquic_logger.h"
59
60
61/* The batch of outgoing packets grows and shrinks dynamically */
62#define MAX_OUT_BATCH_SIZE 1024
63#define MIN_OUT_BATCH_SIZE 256
64#define INITIAL_OUT_BATCH_SIZE 512
65
66struct out_batch
67{
68    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
69    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
70    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
71};
72
73typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
74
75static void
76process_connections (struct lsquic_engine *engine, conn_iter_f iter);
77
78static void
79engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
80
81static lsquic_conn_t *
82engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
83                                        enum lsquic_conn_flags flag);
84
85static void
86force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
87
88/* Nested calls to LSQUIC are not supported */
89#define ENGINE_IN(e) do {                               \
90    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
91    (e)->pub.enp_flags |= ENPUB_PROC;                   \
92} while (0)
93
94#define ENGINE_OUT(e) do {                              \
95    assert((e)->pub.enp_flags & ENPUB_PROC);            \
96    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
97} while (0)
98
99/* A connection can be referenced from one of six places:
100 *
101 *   1. Connection hash: a connection starts its life in one of those.
102 *
103 *   2. Outgoing queue.
104 *
105 *   3. Incoming queue.
106 *
107 *   4. Pending RW Events queue.
108 *
109 *   5. Advisory Tick Time queue.
110 *
111 *   6. Closing connections queue.  This is a transient queue -- it only
112 *      exists for the duration of process_connections() function call.
113 *
114 * The idea is to destroy the connection when it is no longer referenced.
115 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
116 * that case, the connection is referenced from two places: (2) and (6).
117 * After its packets are sent, it is only referenced in (6), and at the
118 * end of the function call, when it is removed from (6), reference count
119 * goes to zero and the connection is destroyed.  If not all packets can
120 * be sent, at the end of the function call, the connection is referenced
121 * by (2) and will only be removed once all outgoing packets have been
122 * sent.
123 */
124#define CONN_REF_FLAGS  (LSCONN_HASHED          \
125                        |LSCONN_HAS_OUTGOING    \
126                        |LSCONN_HAS_INCOMING    \
127                        |LSCONN_RW_PENDING      \
128                        |LSCONN_CLOSING         \
129                        |LSCONN_ATTQ)
130
131
132struct out_heap_elem
133{
134    struct lsquic_conn  *ohe_conn;
135    lsquic_time_t        ohe_last_sent;
136};
137
138
139struct out_heap
140{
141    struct out_heap_elem    *oh_elems;
142    unsigned                 oh_nalloc,
143                             oh_nelem;
144};
145
146
147
148
149struct lsquic_engine
150{
151    struct lsquic_engine_public        pub;
152    enum {
153        ENG_SERVER      = LSENG_SERVER,
154        ENG_HTTP        = LSENG_HTTP,
155        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
156        ENG_PAST_DEADLINE
157                        = (1 <<  8),    /* Previous call to a processing
158                                         * function went past time threshold.
159                                         */
160#ifndef NDEBUG
161        ENG_DTOR        = (1 << 26),    /* Engine destructor */
162#endif
163    }                                  flags;
164    const struct lsquic_stream_if     *stream_if;
165    void                              *stream_if_ctx;
166    lsquic_packets_out_f               packets_out;
167    void                              *packets_out_ctx;
168    void                              *bad_handshake_ctx;
169    struct conn_hash                   full_conns;
170    TAILQ_HEAD(, lsquic_conn)          conns_in, conns_pend_rw;
171    struct out_heap                    conns_out;
172    /* Use a union because only one iterator is being used at any one time */
173    union {
174        struct {
175            /* This iterator does not have any state: it uses `conns_in' */
176            int ignore;
177        }           conn_in;
178        struct {
179            /* This iterator does not have any state: it uses `conns_pend_rw' */
180            int ignore;
181        }           rw_pend;
182        struct {
183            /* Iterator state to process connections in Advisory Tick Time
184             * queue.
185             */
186            lsquic_time_t   cutoff;
187        }           attq;
188        struct {
189            /* Iterator state to process all connections */
190            int ignore;
191        }           all;
192        struct {
193            lsquic_conn_t  *conn;
194        }           one;
195    }                                  iter_state;
196    struct eng_hist                    history;
197    unsigned                           batch_size;
198    unsigned                           time_until_desired_tick;
199    struct attq                       *attq;
200    lsquic_time_t                      proc_time;
201    /* Track time last time a packet was sent to give new connections
202     * priority lower than that of existing connections.
203     */
204    lsquic_time_t                      last_sent;
205    lsquic_time_t                      deadline;
206    struct out_batch                   out_batch;
207};
208
209
210#define OHE_PARENT(i) ((i - 1) / 2)
211#define OHE_LCHILD(i) (2 * i + 1)
212#define OHE_RCHILD(i) (2 * i + 2)
213
214
215static void
216heapify_out_heap (struct out_heap *heap, unsigned i)
217{
218    struct out_heap_elem el;
219    unsigned smallest;
220
221    assert(i < heap->oh_nelem);
222
223    if (OHE_LCHILD(i) < heap->oh_nelem)
224    {
225        if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
226                                    heap->oh_elems[ i ].ohe_last_sent)
227            smallest = OHE_LCHILD(i);
228        else
229            smallest = i;
230        if (OHE_RCHILD(i) < heap->oh_nelem &&
231            heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
232                                    heap->oh_elems[ smallest ].ohe_last_sent)
233            smallest = OHE_RCHILD(i);
234    }
235    else
236        smallest = i;
237
238    if (smallest != i)
239    {
240        el = heap->oh_elems[ smallest ];
241        heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
242        heap->oh_elems[ i ] = el;
243        heapify_out_heap(heap, smallest);
244    }
245}
246
247
248static void
249oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
250{
251    struct out_heap_elem el;
252    unsigned nalloc, i;
253
254    if (heap->oh_nelem == heap->oh_nalloc)
255    {
256        if (0 == heap->oh_nalloc)
257            nalloc = 4;
258        else
259            nalloc = heap->oh_nalloc * 2;
260        heap->oh_elems = realloc(heap->oh_elems,
261                                    nalloc * sizeof(heap->oh_elems[0]));
262        if (!heap->oh_elems)
263        {   /* Not much we can do here */
264            LSQ_ERROR("realloc failed");
265            return;
266        }
267        heap->oh_nalloc = nalloc;
268    }
269
270    heap->oh_elems[ heap->oh_nelem ].ohe_conn      = conn;
271    heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
272    ++heap->oh_nelem;
273
274    i = heap->oh_nelem - 1;
275    while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
276                                    heap->oh_elems[ i ].ohe_last_sent)
277    {
278        el = heap->oh_elems[ OHE_PARENT(i) ];
279        heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
280        heap->oh_elems[ i ] = el;
281        i = OHE_PARENT(i);
282    }
283}
284
285
286static struct lsquic_conn *
287oh_pop (struct out_heap *heap)
288{
289    struct lsquic_conn *conn;
290
291    assert(heap->oh_nelem);
292
293    conn = heap->oh_elems[0].ohe_conn;
294    --heap->oh_nelem;
295    if (heap->oh_nelem > 0)
296    {
297        heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
298        heapify_out_heap(heap, 0);
299    }
300
301    return conn;
302}
303
304
305void
306lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
307                             unsigned flags)
308{
309    memset(settings, 0, sizeof(*settings));
310    settings->es_versions        = LSQUIC_DF_VERSIONS;
311    if (flags & ENG_SERVER)
312    {
313        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
314        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
315        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
316    }
317    else
318    {
319        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
320        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
321        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
322    }
323    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
324    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
325    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
326    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
327    settings->es_max_header_list_size
328                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
329    settings->es_ua              = LSQUIC_DF_UA;
330
331    settings->es_pdmd            = QTAG_X509;
332    settings->es_aead            = QTAG_AESG;
333    settings->es_kexs            = QTAG_C255;
334    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
335    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
336    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
337    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
338    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
339    settings->es_pendrw_check    = LSQUIC_DF_PENDRW_CHECK;
340    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
341    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
342    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
343}
344
345
346/* Note: if returning an error, err_buf must be valid if non-NULL */
347int
348lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
349                              unsigned flags,
350                              char *err_buf, size_t err_buf_sz)
351{
352    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
353        settings->es_sfcw < LSQUIC_MIN_FCW)
354    {
355        if (err_buf)
356            snprintf(err_buf, err_buf_sz, "%s",
357                                            "flow control window set too low");
358        return -1;
359    }
360    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
361    {
362        if (err_buf)
363            snprintf(err_buf, err_buf_sz, "%s",
364                        "No supported QUIC versions specified");
365        return -1;
366    }
367    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
368    {
369        if (err_buf)
370            snprintf(err_buf, err_buf_sz, "%s",
371                        "one or more unsupported QUIC version is specified");
372        return -1;
373    }
374    return 0;
375}
376
377
378static void
379free_packet (void *ctx, unsigned char *packet_data)
380{
381    free(packet_data);
382}
383
384
385static void *
386malloc_buf (void *ctx, size_t size)
387{
388    return malloc(size);
389}
390
391
392static const struct lsquic_packout_mem_if stock_pmi =
393{
394    malloc_buf, (void(*)(void *, void *)) free_packet,
395};
396
397
398lsquic_engine_t *
399lsquic_engine_new (unsigned flags,
400                   const struct lsquic_engine_api *api)
401{
402    lsquic_engine_t *engine;
403    int tag_buf_len;
404    char err_buf[100];
405
406    if (!api->ea_packets_out)
407    {
408        LSQ_ERROR("packets_out callback is not specified");
409        return NULL;
410    }
411
412    if (api->ea_settings &&
413                0 != lsquic_engine_check_settings(api->ea_settings, flags,
414                                                    err_buf, sizeof(err_buf)))
415    {
416        LSQ_ERROR("cannot create engine: %s", err_buf);
417        return NULL;
418    }
419
420    engine = calloc(1, sizeof(*engine));
421    if (!engine)
422        return NULL;
423    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
424    {
425        free(engine);
426        return NULL;
427    }
428    if (api->ea_settings)
429        engine->pub.enp_settings        = *api->ea_settings;
430    else
431        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
432    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
433                                    sizeof(engine->pub.enp_ver_tags_buf),
434                                    engine->pub.enp_settings.es_versions);
435    if (tag_buf_len <= 0)
436    {
437        LSQ_ERROR("cannot generate version tags buffer");
438        free(engine);
439        return NULL;
440    }
441    engine->pub.enp_ver_tags_len = tag_buf_len;
442
443    engine->flags           = flags;
444    engine->stream_if       = api->ea_stream_if;
445    engine->stream_if_ctx   = api->ea_stream_if_ctx;
446    engine->packets_out     = api->ea_packets_out;
447    engine->packets_out_ctx = api->ea_packets_out_ctx;
448    if (api->ea_pmi)
449    {
450        engine->pub.enp_pmi      = api->ea_pmi;
451        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
452    }
453    else
454    {
455        engine->pub.enp_pmi      = &stock_pmi;
456        engine->pub.enp_pmi_ctx  = NULL;
457    }
458    engine->pub.enp_engine = engine;
459    TAILQ_INIT(&engine->conns_in);
460    TAILQ_INIT(&engine->conns_pend_rw);
461    conn_hash_init(&engine->full_conns, ~0);
462    engine->attq = attq_create();
463    eng_hist_init(&engine->history);
464    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
465
466
467    LSQ_INFO("instantiated engine");
468    return engine;
469}
470
471
472static void
473grow_batch_size (struct lsquic_engine *engine)
474{
475    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
476}
477
478
479static void
480shrink_batch_size (struct lsquic_engine *engine)
481{
482    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
483}
484
485
486/* Wrapper to make sure important things occur before the connection is
487 * really destroyed.
488 */
489static void
490destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
491{
492    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
493    conn->cn_if->ci_destroy(conn);
494}
495
496
497static lsquic_conn_t *
498new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
499                      unsigned short max_packet_size)
500{
501    lsquic_conn_t *conn;
502    unsigned flags;
503    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
504    conn = full_conn_client_new(&engine->pub, engine->stream_if,
505                    engine->stream_if_ctx, flags, hostname, max_packet_size);
506    if (!conn)
507        return NULL;
508    if (0 != conn_hash_add(&engine->full_conns, conn))
509    {
510        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
511            conn->cn_cid);
512        destroy_conn(engine, conn);
513        return NULL;
514    }
515    assert(!(conn->cn_flags &
516        (CONN_REF_FLAGS
517         & ~LSCONN_RW_PENDING /* This flag may be set as effect of user
518                                 callbacks */
519                             )));
520    conn->cn_flags |= LSCONN_HASHED;
521    return conn;
522}
523
524
525static lsquic_conn_t *
526find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
527         struct packin_parse_state *ppstate, const struct sockaddr *sa_peer,
528         void *peer_ctx)
529{
530    lsquic_conn_t *conn;
531
532    if (lsquic_packet_in_is_prst(packet_in)
533                                && !engine->pub.enp_settings.es_honor_prst)
534    {
535        LSQ_DEBUG("public reset packet: discarding");
536        return NULL;
537    }
538
539    if (!(packet_in->pi_flags & PI_CONN_ID))
540    {
541        LSQ_DEBUG("packet header does not have connection ID: discarding");
542        return NULL;
543    }
544
545    conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id);
546    if (conn)
547    {
548        conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
549        return conn;
550    }
551
552    return conn;
553}
554
555
556static void
557add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
558                                                        enum rw_reason reason)
559{
560    int hist_idx;
561
562    TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
563    engine_incref_conn(conn, LSCONN_RW_PENDING);
564
565    hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
566    conn->cn_rw_hist_buf[ hist_idx ] = reason;
567    ++conn->cn_rw_hist_idx;
568
569    if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
570        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
571            "rw_hist: %.*s", (char) reason,
572            (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
573    else
574        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
575                                                                (char) reason);
576}
577
578
579#if !defined(NDEBUG) && __GNUC__
580__attribute__((weak))
581#endif
582void
583lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
584                                    lsquic_conn_t *conn, enum rw_reason reason)
585{
586    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
587        0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
588    {
589        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
590        add_conn_to_pend_rw(engine, conn, reason);
591    }
592}
593
594
595void
596lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
597                                lsquic_conn_t *conn, lsquic_time_t tick_time)
598{
599    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
600    /* Instead of performing an update, we simply remove the connection from
601     * the queue and add it back.  This should not happen in at the time of
602     * this writing.
603     */
604    if (conn->cn_flags & LSCONN_ATTQ)
605    {
606        attq_remove(engine->attq, conn);
607        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
608    }
609    if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
610                        0 == attq_maybe_add(engine->attq, conn, tick_time))
611        engine_incref_conn(conn, LSCONN_ATTQ);
612}
613
614
615static void
616update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
617                                                            int progress_made)
618{
619    rw_hist_idx_t hist_idx;
620    const unsigned char *empty;
621    const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
622
623    if (!pendrw_check)
624        return;
625
626    /* Convert previous entry to uppercase: */
627    hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
628    conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
629
630    LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
631    if (progress_made)
632    {
633        conn->cn_noprogress_count = 0;
634        return;
635    }
636
637    EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
638                                                                "no progress");
639    ++conn->cn_noprogress_count;
640    if (conn->cn_noprogress_count <= pendrw_check)
641        return;
642
643    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
644    empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
645                                            sizeof(conn->cn_rw_hist_buf));
646    if (empty)
647        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
648            "(rw_hist: %.*s): will not put it onto Pend RW queue again",
649            conn->cn_cid, conn->cn_noprogress_count,
650            (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
651    else
652    {
653        hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
654        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
655            "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
656            conn->cn_cid, conn->cn_noprogress_count,
657            /* First part of history: */
658            (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
659                                            conn->cn_rw_hist_buf + hist_idx,
660            /* Second part of history: */
661            hist_idx, conn->cn_rw_hist_buf);
662    }
663}
664
665
666/* Return 0 if packet is being processed by a connections, otherwise return 1 */
667static int
668process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
669       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
670       const struct sockaddr *sa_peer, void *peer_ctx)
671{
672    lsquic_conn_t *conn;
673
674    conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx);
675    if (!conn)
676    {
677        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
678        return 1;
679    }
680
681    if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
682        TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
683        engine_incref_conn(conn, LSCONN_HAS_INCOMING);
684    }
685    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
686    lsquic_packet_in_upref(packet_in);
687    conn->cn_peer_ctx = peer_ctx;
688    conn->cn_if->ci_packet_in(conn, packet_in);
689    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
690    return 0;
691}
692
693
694static int
695conn_attq_expired (const struct lsquic_engine *engine,
696                                                const lsquic_conn_t *conn)
697{
698    assert(conn->cn_attq_elem);
699    return lsquic_conn_adv_time(conn) < engine->proc_time;
700}
701
702
703/* Iterator for connections with incoming packets */
704static lsquic_conn_t *
705conn_iter_next_incoming (struct lsquic_engine *engine)
706{
707    enum lsquic_conn_flags addl_flags;
708    lsquic_conn_t *conn;
709    while ((conn = TAILQ_FIRST(&engine->conns_in)))
710    {
711        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
712        if (conn->cn_flags & LSCONN_RW_PENDING)
713        {
714            TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
715            EV_LOG_CONN_EVENT(conn->cn_cid,
716                "removed from pending RW queue (processing incoming)");
717        }
718        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
719        {
720            addl_flags = LSCONN_ATTQ;
721            attq_remove(engine->attq, conn);
722        }
723        else
724            addl_flags = 0;
725        conn = engine_decref_conn(engine, conn,
726                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
727        if (conn)
728            break;
729    }
730    return conn;
731}
732
733
734/* Iterator for connections with that have pending read/write events */
735static lsquic_conn_t *
736conn_iter_next_rw_pend (struct lsquic_engine *engine)
737{
738    enum lsquic_conn_flags addl_flags;
739    lsquic_conn_t *conn;
740    while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
741    {
742        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
743        EV_LOG_CONN_EVENT(conn->cn_cid,
744            "removed from pending RW queue (processing pending RW conns)");
745        if (conn->cn_flags & LSCONN_HAS_INCOMING)
746            TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
747        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
748        {
749            addl_flags = LSCONN_ATTQ;
750            attq_remove(engine->attq, conn);
751        }
752        else
753            addl_flags = 0;
754        conn = engine_decref_conn(engine, conn,
755                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
756        if (conn)
757            break;
758    }
759    return conn;
760}
761
762
763void
764lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
765{
766    LSQ_DEBUG("process connections with incoming packets");
767    ENGINE_IN(engine);
768    process_connections(engine, conn_iter_next_incoming);
769    assert(TAILQ_EMPTY(&engine->conns_in));
770    ENGINE_OUT(engine);
771}
772
773
774int
775lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
776{
777    return !(engine->flags & ENG_PAST_DEADLINE)
778        && !TAILQ_EMPTY(&engine->conns_pend_rw);
779}
780
781
782void
783lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
784{
785    LSQ_DEBUG("process connections with pending RW events");
786    ENGINE_IN(engine);
787    process_connections(engine, conn_iter_next_rw_pend);
788    ENGINE_OUT(engine);
789}
790
791
792void
793lsquic_engine_destroy (lsquic_engine_t *engine)
794{
795    lsquic_conn_t *conn;
796
797    LSQ_DEBUG("destroying engine");
798#ifndef NDEBUG
799    engine->flags |= ENG_DTOR;
800#endif
801
802    while (engine->conns_out.oh_nelem > 0)
803    {
804        --engine->conns_out.oh_nelem;
805        conn = engine->conns_out.oh_elems[
806                                engine->conns_out.oh_nelem ].ohe_conn;
807        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
808        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
809    }
810
811    for (conn = conn_hash_first(&engine->full_conns); conn;
812                            conn = conn_hash_next(&engine->full_conns))
813        force_close_conn(engine, conn);
814    conn_hash_cleanup(&engine->full_conns);
815
816
817    attq_destroy(engine->attq);
818
819    assert(0 == engine->conns_out.oh_nelem);
820    assert(TAILQ_EMPTY(&engine->conns_pend_rw));
821    lsquic_mm_cleanup(&engine->pub.enp_mm);
822    free(engine->conns_out.oh_elems);
823    free(engine);
824}
825
826
827#if __GNUC__
828__attribute__((nonnull(3)))
829#endif
830static lsquic_conn_t *
831remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
832                                lsquic_conn_t *conn, const char *reason)
833{
834    assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
835    if (conn->cn_flags & LSCONN_HAS_INCOMING)
836        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
837    if (conn->cn_flags & LSCONN_RW_PENDING)
838    {
839        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
840        EV_LOG_CONN_EVENT(conn->cn_cid,
841                        "removed from pending RW queue (%s)", reason);
842    }
843    conn = engine_decref_conn(engine, conn,
844                        LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
845    assert(conn);
846    return conn;
847}
848
849
850static lsquic_conn_t *
851conn_iter_next_one (lsquic_engine_t *engine)
852{
853    lsquic_conn_t *conn = engine->iter_state.one.conn;
854    if (conn)
855    {
856        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
857            conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
858        if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
859                                            conn_attq_expired(engine, conn))
860        {
861            attq_remove(engine->attq, conn);
862            conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
863        }
864        engine->iter_state.one.conn = NULL;
865    }
866    return conn;
867}
868
869
870lsquic_conn_t *
871lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
872                       void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
873                       const char *hostname, unsigned short max_packet_size)
874{
875    lsquic_conn_t *conn;
876
877    if (engine->flags & ENG_SERVER)
878    {
879        LSQ_ERROR("`%s' must only be called in client mode", __func__);
880        return NULL;
881    }
882
883    if (0 == max_packet_size)
884    {
885        switch (peer_sa->sa_family)
886        {
887        case AF_INET:
888            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
889            break;
890        default:
891            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
892            break;
893        }
894    }
895
896    conn = new_full_conn_client(engine, hostname, max_packet_size);
897    if (!conn)
898        return NULL;
899    ENGINE_IN(engine);
900    lsquic_conn_record_peer_sa(conn, peer_sa);
901    conn->cn_peer_ctx = peer_ctx;
902    lsquic_conn_set_ctx(conn, conn_ctx);
903    engine->iter_state.one.conn = conn;
904    full_conn_client_call_on_new(conn);
905    process_connections(engine, conn_iter_next_one);
906    ENGINE_OUT(engine);
907    return conn;
908}
909
910
911static void
912remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
913{
914        conn_hash_remove(&engine->full_conns, conn);
915    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
916}
917
918
919static void
920refflags2str (enum lsquic_conn_flags flags, char s[7])
921{
922    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
923    *s = 'H'; s += !!(flags & LSCONN_HASHED);
924    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
925    *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
926    *s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
927    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
928    *s = '\0';
929}
930
931
932static void
933engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
934{
935    char str[7];
936    assert(flag & CONN_REF_FLAGS);
937    assert(!(conn->cn_flags & flag));
938    conn->cn_flags |= flag;
939    LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
940                            (refflags2str(conn->cn_flags, str), str));
941}
942
943
944static lsquic_conn_t *
945engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
946                                        enum lsquic_conn_flags flags)
947{
948    char str[7];
949    assert(flags & CONN_REF_FLAGS);
950    assert(conn->cn_flags & flags);
951#ifndef NDEBUG
952    if (flags & LSCONN_CLOSING)
953        assert(0 == (conn->cn_flags & LSCONN_HASHED));
954#endif
955    conn->cn_flags &= ~flags;
956    LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
957                            (refflags2str(conn->cn_flags, str), str));
958    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
959    {
960            eng_hist_inc(&engine->history, 0, sl_del_full_conns);
961        destroy_conn(engine, conn);
962        return NULL;
963    }
964    else
965        return conn;
966}
967
968
969/* This is not a general-purpose function.  Only call from engine dtor. */
970static void
971force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
972{
973    assert(engine->flags & ENG_DTOR);
974    const enum lsquic_conn_flags flags = conn->cn_flags;
975    assert(conn->cn_flags & CONN_REF_FLAGS);
976    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
977    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
978    if (flags & LSCONN_HAS_INCOMING)
979    {
980        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
981        (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
982    }
983    if (flags & LSCONN_RW_PENDING)
984    {
985        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
986        EV_LOG_CONN_EVENT(conn->cn_cid,
987            "removed from pending RW queue (engine destruction)");
988        (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
989    }
990    if (flags & LSCONN_ATTQ)
991        attq_remove(engine->attq, conn);
992    if (flags & LSCONN_HASHED)
993        remove_conn_from_hash(engine, conn);
994}
995
996
997/* Iterator for all connections.
998 * Returned connections are removed from the Incoming, Pending RW Event,
999 * and Advisory Tick Time queues if necessary.
1000 */
1001static lsquic_conn_t *
1002conn_iter_next_all (struct lsquic_engine *engine)
1003{
1004    lsquic_conn_t *conn;
1005
1006    conn = conn_hash_next(&engine->full_conns);
1007
1008    if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
1009        conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
1010    if (conn && (conn->cn_flags & LSCONN_ATTQ)
1011                                        && conn_attq_expired(engine, conn))
1012    {
1013        attq_remove(engine->attq, conn);
1014        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1015    }
1016
1017    return conn;
1018}
1019
1020
1021static lsquic_conn_t *
1022conn_iter_next_attq (struct lsquic_engine *engine)
1023{
1024    lsquic_conn_t *conn;
1025
1026    conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
1027    if (conn)
1028    {
1029        assert(conn->cn_flags & LSCONN_ATTQ);
1030        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
1031            conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
1032        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1033    }
1034
1035    return conn;
1036}
1037
1038
1039void
1040lsquic_engine_proc_all (lsquic_engine_t *engine)
1041{
1042    ENGINE_IN(engine);
1043    /* We poke each connection every time as initial implementation.  If it
1044     * proves to be too inefficient, we will need to figure out
1045     *          a) when to stop processing; and
1046     *          b) how to remember state between calls.
1047     */
1048    conn_hash_reset_iter(&engine->full_conns);
1049    process_connections(engine, conn_iter_next_all);
1050    ENGINE_OUT(engine);
1051}
1052
1053
1054void
1055lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
1056{
1057    lsquic_time_t prev_min, now;
1058
1059    now = lsquic_time_now();
1060    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
1061    {
1062        const lsquic_time_t *expected_time;
1063        int64_t diff;
1064        expected_time = attq_next_time(engine->attq);
1065        if (expected_time)
1066            diff = *expected_time - now;
1067        else
1068            diff = -1;
1069        LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff);
1070    }
1071
1072    ENGINE_IN(engine);
1073    prev_min = attq_set_min(engine->attq, now);  /* Prevent infinite loop */
1074    engine->iter_state.attq.cutoff = now;
1075    process_connections(engine, conn_iter_next_attq);
1076    attq_set_min(engine->attq, prev_min);           /* Restore previos value */
1077    ENGINE_OUT(engine);
1078}
1079
1080
1081static int
1082generate_header (const lsquic_packet_out_t *packet_out,
1083                 const struct parse_funcs *pf, lsquic_cid_t cid,
1084                 unsigned char *buf, size_t bufsz)
1085{
1086    return pf->pf_gen_reg_pkt_header(buf, bufsz,
1087        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
1088        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
1089        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
1090        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
1091}
1092
1093
1094static ssize_t
1095really_encrypt_packet (const lsquic_conn_t *conn,
1096                       const lsquic_packet_out_t *packet_out,
1097                       unsigned char *buf, size_t bufsz)
1098{
1099    int enc, header_sz, is_hello_packet;
1100    size_t packet_sz;
1101    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
1102
1103    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
1104                                            header_buf, sizeof(header_buf));
1105    if (header_sz < 0)
1106        return -1;
1107
1108    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
1109    enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0,
1110                packet_out->po_packno, header_buf, header_sz,
1111                packet_out->po_data, packet_out->po_data_sz,
1112                buf, bufsz, &packet_sz, is_hello_packet);
1113    if (0 == enc)
1114    {
1115        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
1116            "ciphertext is %zd bytes",
1117            packet_out->po_packno,
1118            lsquic_po_header_length(packet_out->po_flags) +
1119                                                packet_out->po_data_sz,
1120            packet_sz);
1121        return packet_sz;
1122    }
1123    else
1124        return -1;
1125}
1126
1127
1128static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
1129encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
1130                                            lsquic_packet_out_t *packet_out)
1131{
1132    ssize_t enc_sz;
1133    size_t bufsz;
1134    unsigned sent_sz;
1135    unsigned char *buf;
1136
1137    bufsz = lsquic_po_header_length(packet_out->po_flags) +
1138                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
1139    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
1140    if (!buf)
1141    {
1142        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
1143                                                                        bufsz);
1144        return ENCPA_NOMEM;
1145    }
1146
1147    {
1148        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
1149        sent_sz = enc_sz;
1150    }
1151
1152    if (enc_sz < 0)
1153    {
1154        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
1155        return ENCPA_BADCRYPT;
1156    }
1157
1158    packet_out->po_enc_data    = buf;
1159    packet_out->po_enc_data_sz = enc_sz;
1160    packet_out->po_sent_sz     = sent_sz;
1161    packet_out->po_flags |= PO_ENCRYPTED|PO_SENT_SZ;
1162
1163    return ENCPA_OK;
1164}
1165
1166
1167STAILQ_HEAD(closed_conns, lsquic_conn);
1168
1169
1170struct conns_out_iter
1171{
1172    struct out_heap            *coi_heap;
1173    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
1174                                coi_inactive_list;
1175    lsquic_conn_t              *coi_next;
1176#ifndef NDEBUG
1177    lsquic_time_t               coi_last_sent;
1178#endif
1179};
1180
1181
1182static void
1183coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
1184{
1185    iter->coi_heap = &engine->conns_out;
1186    iter->coi_next = NULL;
1187    TAILQ_INIT(&iter->coi_active_list);
1188    TAILQ_INIT(&iter->coi_inactive_list);
1189#ifndef NDEBUG
1190    iter->coi_last_sent = 0;
1191#endif
1192}
1193
1194
1195static lsquic_conn_t *
1196coi_next (struct conns_out_iter *iter)
1197{
1198    lsquic_conn_t *conn;
1199
1200    if (iter->coi_heap->oh_nelem > 0)
1201    {
1202        conn = oh_pop(iter->coi_heap);
1203        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1204        conn->cn_flags |= LSCONN_COI_ACTIVE;
1205#ifndef NDEBUG
1206        if (iter->coi_last_sent)
1207            assert(iter->coi_last_sent <= conn->cn_last_sent);
1208        iter->coi_last_sent = conn->cn_last_sent;
1209#endif
1210        return conn;
1211    }
1212    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1213    {
1214        conn = iter->coi_next;
1215        if (!conn)
1216            conn = TAILQ_FIRST(&iter->coi_active_list);
1217        if (conn)
1218            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1219        return conn;
1220    }
1221    else
1222        return NULL;
1223}
1224
1225
1226static void
1227coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1228{
1229    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1230    {
1231        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1232        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1233        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1234        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1235        conn->cn_flags |= LSCONN_COI_INACTIVE;
1236    }
1237}
1238
1239
1240static void
1241coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn)
1242{
1243    assert(conn->cn_flags & LSCONN_COI_ACTIVE);
1244    if (conn->cn_flags & LSCONN_COI_ACTIVE)
1245    {
1246        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1247        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1248    }
1249}
1250
1251
1252static void
1253coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1254{
1255    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1256    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1257    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1258    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1259    conn->cn_flags |= LSCONN_COI_ACTIVE;
1260}
1261
1262
1263static void
1264coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1265{
1266    lsquic_conn_t *conn;
1267    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1268    {
1269        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1270        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1271        oh_insert(iter->coi_heap, conn);
1272    }
1273    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1274    {
1275        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1276        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1277        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1278    }
1279}
1280
1281
1282static unsigned
1283send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1284                  struct out_batch *batch, unsigned n_to_send)
1285{
1286    int n_sent, i;
1287    lsquic_time_t now;
1288
1289    /* Set sent time before the write to avoid underestimating RTT */
1290    now = lsquic_time_now();
1291    for (i = 0; i < (int) n_to_send; ++i)
1292        batch->packets[i]->po_sent = now;
1293    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1294                                                                n_to_send);
1295    if (n_sent >= 0)
1296        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1297    else
1298    {
1299        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1300        n_sent = 0;
1301    }
1302    if (n_sent > 0)
1303        engine->last_sent = now + n_sent;
1304    for (i = 0; i < n_sent; ++i)
1305    {
1306        eng_hist_inc(&engine->history, now, sl_packets_out);
1307        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1308        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1309                                                    batch->packets[i]);
1310        /* `i' is added to maintain relative order */
1311        batch->conns[i]->cn_last_sent = now + i;
1312        /* Release packet out buffer as soon as the packet is sent
1313         * successfully.  If not successfully sent, we hold on to
1314         * this buffer until the packet sending is attempted again
1315         * or until it times out and regenerated.
1316         */
1317        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1318        {
1319            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1320            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1321                                                batch->packets[i]->po_enc_data);
1322            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1323        }
1324    }
1325    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1326        for ( ; i < (int) n_to_send; ++i)
1327            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1328    /* Return packets to the connection in reverse order so that the packet
1329     * ordering is maintained.
1330     */
1331    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1332    {
1333        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1334                                                    batch->packets[i]);
1335        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1336            coi_reactivate(conns_iter, batch->conns[i]);
1337    }
1338    return n_sent;
1339}
1340
1341
1342/* Return 1 if went past deadline, 0 otherwise */
1343static int
1344check_deadline (lsquic_engine_t *engine)
1345{
1346    if (engine->pub.enp_settings.es_proc_time_thresh &&
1347                                lsquic_time_now() > engine->deadline)
1348    {
1349        LSQ_INFO("went past threshold of %u usec, stop sending",
1350                            engine->pub.enp_settings.es_proc_time_thresh);
1351        engine->flags |= ENG_PAST_DEADLINE;
1352        return 1;
1353    }
1354    else
1355        return 0;
1356}
1357
1358
1359static void
1360send_packets_out (struct lsquic_engine *engine,
1361                  struct closed_conns *closed_conns)
1362{
1363    unsigned n, w, n_sent, n_batches_sent;
1364    lsquic_packet_out_t *packet_out;
1365    lsquic_conn_t *conn;
1366    struct out_batch *const batch = &engine->out_batch;
1367    struct conns_out_iter conns_iter;
1368    int shrink, deadline_exceeded;
1369
1370    coi_init(&conns_iter, engine);
1371    n_batches_sent = 0;
1372    n_sent = 0, n = 0;
1373    shrink = 0;
1374    deadline_exceeded = 0;
1375
1376    while ((conn = coi_next(&conns_iter)))
1377    {
1378        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1379        if (!packet_out) {
1380            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1381                                                            conn->cn_cid);
1382            coi_deactivate(&conns_iter, conn);
1383            continue;
1384        }
1385        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1386        {
1387            switch (encrypt_packet(engine, conn, packet_out))
1388            {
1389            case ENCPA_NOMEM:
1390                /* Send what we have and wait for a more opportune moment */
1391                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1392                goto end_for;
1393            case ENCPA_BADCRYPT:
1394                /* This is pretty bad: close connection immediately */
1395                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1396                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1397                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1398                {
1399                    if (!(conn->cn_flags & LSCONN_CLOSING))
1400                    {
1401                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1402                        engine_incref_conn(conn, LSCONN_CLOSING);
1403                        if (conn->cn_flags & LSCONN_HASHED)
1404                            remove_conn_from_hash(engine, conn);
1405                    }
1406                    coi_remove(&conns_iter, conn);
1407                }
1408                continue;
1409            case ENCPA_OK:
1410                break;
1411            }
1412        }
1413        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1414                                        packet_out->po_packno, conn->cn_cid);
1415        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1416        if (packet_out->po_flags & PO_ENCRYPTED)
1417        {
1418            batch->outs[n].buf     = packet_out->po_enc_data;
1419            batch->outs[n].sz      = packet_out->po_enc_data_sz;
1420        }
1421        else
1422        {
1423            batch->outs[n].buf     = packet_out->po_data;
1424            batch->outs[n].sz      = packet_out->po_data_sz;
1425        }
1426        batch->outs   [n].peer_ctx = conn->cn_peer_ctx;
1427        batch->outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1428        batch->outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1429        batch->conns  [n]          = conn;
1430        batch->packets[n]          = packet_out;
1431        ++n;
1432        if (n == engine->batch_size)
1433        {
1434            n = 0;
1435            w = send_batch(engine, &conns_iter, batch, engine->batch_size);
1436            ++n_batches_sent;
1437            n_sent += w;
1438            if (w < engine->batch_size)
1439            {
1440                shrink = 1;
1441                break;
1442            }
1443            deadline_exceeded = check_deadline(engine);
1444            if (deadline_exceeded)
1445                break;
1446            grow_batch_size(engine);
1447        }
1448    }
1449  end_for:
1450
1451    if (n > 0) {
1452        w = send_batch(engine, &conns_iter, batch, n);
1453        n_sent += w;
1454        shrink = w < n;
1455        ++n_batches_sent;
1456        deadline_exceeded = check_deadline(engine);
1457    }
1458
1459    if (shrink)
1460        shrink_batch_size(engine);
1461    else if (n_batches_sent > 1 && !deadline_exceeded)
1462        grow_batch_size(engine);
1463
1464    coi_reheap(&conns_iter, engine);
1465
1466    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1467}
1468
1469
1470int
1471lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1472{
1473    return !(engine->flags & ENG_PAST_DEADLINE)
1474        && (    engine->conns_out.oh_nelem > 0
1475           )
1476    ;
1477}
1478
1479
1480static void
1481reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1482{
1483    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1484    engine->flags &= ~ENG_PAST_DEADLINE;
1485}
1486
1487
1488/* TODO: this is a user-facing function, account for load */
1489void
1490lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1491{
1492    lsquic_conn_t *conn;
1493    struct closed_conns closed_conns;
1494
1495    STAILQ_INIT(&closed_conns);
1496    reset_deadline(engine, lsquic_time_now());
1497
1498    send_packets_out(engine, &closed_conns);
1499
1500    while ((conn = STAILQ_FIRST(&closed_conns))) {
1501        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1502        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1503    }
1504
1505}
1506
1507
1508static void
1509process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
1510{
1511    lsquic_conn_t *conn;
1512    enum tick_st tick_st;
1513    lsquic_time_t now = lsquic_time_now();
1514    struct closed_conns closed_conns;
1515
1516    engine->proc_time = now;
1517    eng_hist_tick(&engine->history, now);
1518
1519    STAILQ_INIT(&closed_conns);
1520    reset_deadline(engine, now);
1521
1522    while ((conn = next_conn(engine)))
1523    {
1524        tick_st = conn->cn_if->ci_tick(conn, now);
1525        if (conn_iter_next_rw_pend == next_conn)
1526            update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
1527        if (tick_st & TICK_SEND)
1528        {
1529            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1530            {
1531                oh_insert(&engine->conns_out, conn);
1532                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1533            }
1534        }
1535        if (tick_st & TICK_CLOSE)
1536        {
1537            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1538            engine_incref_conn(conn, LSCONN_CLOSING);
1539            if (conn->cn_flags & LSCONN_HASHED)
1540                remove_conn_from_hash(engine, conn);
1541        }
1542    }
1543
1544    if (lsquic_engine_has_unsent_packets(engine))
1545        send_packets_out(engine, &closed_conns);
1546
1547    while ((conn = STAILQ_FIRST(&closed_conns))) {
1548        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1549        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1550    }
1551
1552}
1553
1554
1555/* Return 0 if packet is being processed by a real connection, 1 if the
1556 * packet was processed, but not by a connection, and -1 on error.
1557 */
1558int
1559lsquic_engine_packet_in (lsquic_engine_t *engine,
1560    const unsigned char *packet_in_data, size_t packet_in_size,
1561    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1562    void *peer_ctx)
1563{
1564    struct packin_parse_state ppstate;
1565    lsquic_packet_in_t *packet_in;
1566
1567    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1568    {
1569        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1570            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1571        errno = E2BIG;
1572        return -1;
1573    }
1574
1575    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1576    if (!packet_in)
1577        return -1;
1578
1579    /* Library does not modify packet_in_data, it is not referenced after
1580     * this function returns and subsequent release of pi_data is guarded
1581     * by PI_OWN_DATA flag.
1582     */
1583    packet_in->pi_data = (unsigned char *) packet_in_data;
1584    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1585                                        engine->flags & ENG_SERVER, &ppstate))
1586    {
1587        LSQ_DEBUG("Cannot parse incoming packet's header");
1588        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1589        errno = EINVAL;
1590        return -1;
1591    }
1592
1593    packet_in->pi_received = lsquic_time_now();
1594    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1595    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1596                                                                    peer_ctx);
1597}
1598
1599
1600#if __GNUC__ && !defined(NDEBUG)
1601__attribute__((weak))
1602#endif
1603unsigned
1604lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1605{
1606    return engine->pub.enp_settings.es_versions;
1607}
1608
1609
1610int
1611lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1612{
1613    const lsquic_time_t *next_time;
1614    lsquic_time_t now;
1615
1616    next_time = attq_next_time(engine->attq);
1617    if (!next_time)
1618        return 0;
1619
1620    now = lsquic_time_now();
1621    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1622    return 1;
1623}
1624
1625
1626unsigned
1627lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1628{
1629    lsquic_time_t now;
1630    now = lsquic_time_now();
1631    if (from_now < 0)
1632        now -= from_now;
1633    else
1634        now += from_now;
1635    return attq_count_before(engine->attq, now);
1636}
1637
1638
1639