lsquic_engine.c revision 50aadb33
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/time.h>
15#include <time.h>
16#include <netinet/in.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21#include <netdb.h>
22
23#ifndef NDEBUG
24#include <sys/types.h>
25#include <regex.h>      /* For code that loses packets */
26#endif
27
28
29
30#include "lsquic.h"
31#include "lsquic_types.h"
32#include "lsquic_alarmset.h"
33#include "lsquic_parse.h"
34#include "lsquic_packet_in.h"
35#include "lsquic_packet_out.h"
36#include "lsquic_senhist.h"
37#include "lsquic_rtt.h"
38#include "lsquic_cubic.h"
39#include "lsquic_pacer.h"
40#include "lsquic_send_ctl.h"
41#include "lsquic_set.h"
42#include "lsquic_conn_flow.h"
43#include "lsquic_sfcw.h"
44#include "lsquic_stream.h"
45#include "lsquic_conn.h"
46#include "lsquic_full_conn.h"
47#include "lsquic_util.h"
48#include "lsquic_qtags.h"
49#include "lsquic_handshake.h"
50#include "lsquic_mm.h"
51#include "lsquic_conn_hash.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_eng_hist.h"
54#include "lsquic_ev_log.h"
55#include "lsquic_version.h"
56#include "lsquic_attq.h"
57
58#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
59#include "lsquic_logger.h"
60
61
62/* The batch of outgoing packets grows and shrinks dynamically */
63#define MAX_OUT_BATCH_SIZE 1024
64#define MIN_OUT_BATCH_SIZE 256
65#define INITIAL_OUT_BATCH_SIZE 512
66
67typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
68
69static void
70process_connections (struct lsquic_engine *engine, conn_iter_f iter);
71
72static void
73engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
74
75static lsquic_conn_t *
76engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
77                                        enum lsquic_conn_flags flag);
78
79static void
80force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
81
82/* Nested calls to LSQUIC are not supported */
83#define ENGINE_IN(e) do {                               \
84    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
85    (e)->pub.enp_flags |= ENPUB_PROC;                   \
86} while (0)
87
88#define ENGINE_OUT(e) do {                              \
89    assert((e)->pub.enp_flags & ENPUB_PROC);            \
90    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
91} while (0)
92
93/* A connection can be referenced from one of six places:
94 *
95 *   1. Connection hash: a connection starts its life in one of those.
96 *
97 *   2. Outgoing queue.
98 *
99 *   3. Incoming queue.
100 *
101 *   4. Pending RW Events queue.
102 *
103 *   5. Advisory Tick Time queue.
104 *
105 *   6. Closing connections queue.  This is a transient queue -- it only
106 *      exists for the duration of process_connections() function call.
107 *
108 * The idea is to destroy the connection when it is no longer referenced.
109 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
110 * that case, the connection is referenced from two places: (2) and (6).
111 * After its packets are sent, it is only referenced in (6), and at the
112 * end of the function call, when it is removed from (6), reference count
113 * goes to zero and the connection is destroyed.  If not all packets can
114 * be sent, at the end of the function call, the connection is referenced
115 * by (2) and will only be removed once all outgoing packets have been
116 * sent.
117 */
118#define CONN_REF_FLAGS  (LSCONN_HASHED          \
119                        |LSCONN_HAS_OUTGOING    \
120                        |LSCONN_HAS_INCOMING    \
121                        |LSCONN_RW_PENDING      \
122                        |LSCONN_CLOSING         \
123                        |LSCONN_ATTQ)
124
125
126struct out_heap_elem
127{
128    struct lsquic_conn  *ohe_conn;
129    lsquic_time_t        ohe_last_sent;
130};
131
132
133struct out_heap
134{
135    struct out_heap_elem    *oh_elems;
136    unsigned                 oh_nalloc,
137                             oh_nelem;
138};
139
140
141struct lsquic_engine
142{
143    struct lsquic_engine_public        pub;
144    enum {
145        ENG_SERVER      = LSENG_SERVER,
146        ENG_HTTP        = LSENG_HTTP,
147        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
148        ENG_PAST_DEADLINE
149                        = (1 <<  8),    /* Previous call to a processing
150                                         * function went past time threshold.
151                                         */
152#ifndef NDEBUG
153        ENG_DTOR        = (1 << 26),    /* Engine destructor */
154#endif
155    }                                  flags;
156    const struct lsquic_stream_if     *stream_if;
157    void                              *stream_if_ctx;
158    lsquic_packets_out_f               packets_out;
159    void                              *packets_out_ctx;
160    void                              *bad_handshake_ctx;
161    struct conn_hash                   full_conns;
162    TAILQ_HEAD(, lsquic_conn)          conns_in, conns_pend_rw;
163    struct out_heap                    conns_out;
164    /* Use a union because only one iterator is being used at any one time */
165    union {
166        struct {
167            /* This iterator does not have any state: it uses `conns_in' */
168        }           conn_in;
169        struct {
170            /* This iterator does not have any state: it uses `conns_pend_rw' */
171        }           rw_pend;
172        struct {
173            /* Iterator state to process connections in Advisory Tick Time
174             * queue.
175             */
176            lsquic_time_t   cutoff;
177        }           attq;
178        struct {
179            /* Iterator state to process all connections */
180        }           all;
181        struct {
182            lsquic_conn_t  *conn;
183        }           one;
184    }                                  iter_state;
185    struct eng_hist                    history;
186    unsigned                           batch_size;
187    unsigned                           time_until_desired_tick;
188    struct attq                       *attq;
189    lsquic_time_t                      proc_time;
190    /* Track time last time a packet was sent to give new connections
191     * priority lower than that of existing connections.
192     */
193    lsquic_time_t                      last_sent;
194    lsquic_time_t                      deadline;
195};
196
197
198#define OHE_PARENT(i) ((i - 1) / 2)
199#define OHE_LCHILD(i) (2 * i + 1)
200#define OHE_RCHILD(i) (2 * i + 2)
201
202
203static void
204heapify_out_heap (struct out_heap *heap, unsigned i)
205{
206    struct out_heap_elem el;
207    unsigned smallest;
208
209    assert(i < heap->oh_nelem);
210
211    if (OHE_LCHILD(i) < heap->oh_nelem)
212    {
213        if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
214                                    heap->oh_elems[ i ].ohe_last_sent)
215            smallest = OHE_LCHILD(i);
216        else
217            smallest = i;
218        if (OHE_RCHILD(i) < heap->oh_nelem &&
219            heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
220                                    heap->oh_elems[ smallest ].ohe_last_sent)
221            smallest = OHE_RCHILD(i);
222    }
223    else
224        smallest = i;
225
226    if (smallest != i)
227    {
228        el = heap->oh_elems[ smallest ];
229        heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
230        heap->oh_elems[ i ] = el;
231        heapify_out_heap(heap, smallest);
232    }
233}
234
235
236static void
237oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
238{
239    struct out_heap_elem el;
240    unsigned nalloc, i;
241
242    if (heap->oh_nelem == heap->oh_nalloc)
243    {
244        if (0 == heap->oh_nalloc)
245            nalloc = 4;
246        else
247            nalloc = heap->oh_nalloc * 2;
248        heap->oh_elems = realloc(heap->oh_elems,
249                                    nalloc * sizeof(heap->oh_elems[0]));
250        if (!heap->oh_elems)
251        {   /* Not much we can do here */
252            LSQ_ERROR("realloc failed");
253            return;
254        }
255        heap->oh_nalloc = nalloc;
256    }
257
258    heap->oh_elems[ heap->oh_nelem ].ohe_conn      = conn;
259    heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
260    ++heap->oh_nelem;
261
262    i = heap->oh_nelem - 1;
263    while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
264                                    heap->oh_elems[ i ].ohe_last_sent)
265    {
266        el = heap->oh_elems[ OHE_PARENT(i) ];
267        heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
268        heap->oh_elems[ i ] = el;
269        i = OHE_PARENT(i);
270    }
271}
272
273
274static struct lsquic_conn *
275oh_pop (struct out_heap *heap)
276{
277    struct lsquic_conn *conn;
278
279    assert(heap->oh_nelem);
280
281    conn = heap->oh_elems[0].ohe_conn;
282    --heap->oh_nelem;
283    if (heap->oh_nelem > 0)
284    {
285        heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
286        heapify_out_heap(heap, 0);
287    }
288
289    return conn;
290}
291
292
293void
294lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
295                             unsigned flags)
296{
297    memset(settings, 0, sizeof(*settings));
298    settings->es_versions        = LSQUIC_DF_VERSIONS;
299    if (flags & ENG_SERVER)
300    {
301        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
302        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
303        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
304    }
305    else
306    {
307        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
308        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
309        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
310    }
311    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
312    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
313    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
314    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
315    settings->es_max_header_list_size
316                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
317    settings->es_ua              = LSQUIC_DF_UA;
318
319    settings->es_pdmd            = QTAG_X509;
320    settings->es_aead            = QTAG_AESG;
321    settings->es_kexs            = QTAG_C255;
322    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
323    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
324    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
325    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
326    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
327    settings->es_pendrw_check    = LSQUIC_DF_PENDRW_CHECK;
328    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
329    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
330    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
331}
332
333
334/* Note: if returning an error, err_buf must be valid if non-NULL */
335int
336lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
337                              unsigned flags,
338                              char *err_buf, size_t err_buf_sz)
339{
340    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
341        settings->es_sfcw < LSQUIC_MIN_FCW)
342    {
343        if (err_buf)
344            snprintf(err_buf, err_buf_sz, "%s",
345                                            "flow control window set too low");
346        return -1;
347    }
348    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
349    {
350        if (err_buf)
351            snprintf(err_buf, err_buf_sz, "%s",
352                        "No supported QUIC versions specified");
353        return -1;
354    }
355    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
356    {
357        if (err_buf)
358            snprintf(err_buf, err_buf_sz, "%s",
359                        "one or more unsupported QUIC version is specified");
360        return -1;
361    }
362    return 0;
363}
364
365
366static void
367free_packet (void *ctx, unsigned char *packet_data)
368{
369    free(packet_data);
370}
371
372
373static void *
374malloc_buf (void *ctx, size_t size)
375{
376    return malloc(size);
377}
378
379
380static const struct lsquic_packout_mem_if stock_pmi =
381{
382    malloc_buf, (void(*)(void *, void *)) free_packet,
383};
384
385
386lsquic_engine_t *
387lsquic_engine_new (unsigned flags,
388                   const struct lsquic_engine_api *api)
389{
390    lsquic_engine_t *engine;
391    int tag_buf_len;
392    char err_buf[100];
393
394    if (!api->ea_packets_out)
395    {
396        LSQ_ERROR("packets_out callback is not specified");
397        return NULL;
398    }
399
400    if (api->ea_settings &&
401                0 != lsquic_engine_check_settings(api->ea_settings, flags,
402                                                    err_buf, sizeof(err_buf)))
403    {
404        LSQ_ERROR("cannot create engine: %s", err_buf);
405        return NULL;
406    }
407
408    engine = calloc(1, sizeof(*engine));
409    if (!engine)
410        return NULL;
411    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
412    {
413        free(engine);
414        return NULL;
415    }
416    if (api->ea_settings)
417        engine->pub.enp_settings        = *api->ea_settings;
418    else
419        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
420    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
421                                    sizeof(engine->pub.enp_ver_tags_buf),
422                                    engine->pub.enp_settings.es_versions);
423    if (tag_buf_len <= 0)
424    {
425        LSQ_ERROR("cannot generate version tags buffer");
426        free(engine);
427        return NULL;
428    }
429    engine->pub.enp_ver_tags_len = tag_buf_len;
430
431    engine->flags           = flags;
432    engine->stream_if       = api->ea_stream_if;
433    engine->stream_if_ctx   = api->ea_stream_if_ctx;
434    engine->packets_out     = api->ea_packets_out;
435    engine->packets_out_ctx = api->ea_packets_out_ctx;
436    if (api->ea_pmi)
437    {
438        engine->pub.enp_pmi      = api->ea_pmi;
439        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
440    }
441    else
442    {
443        engine->pub.enp_pmi      = &stock_pmi;
444        engine->pub.enp_pmi_ctx  = NULL;
445    }
446    engine->pub.enp_engine = engine;
447    TAILQ_INIT(&engine->conns_in);
448    TAILQ_INIT(&engine->conns_pend_rw);
449    conn_hash_init(&engine->full_conns, ~0);
450    engine->attq = attq_create();
451    eng_hist_init(&engine->history);
452    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
453
454
455    LSQ_INFO("instantiated engine");
456    return engine;
457}
458
459
460static void
461grow_batch_size (struct lsquic_engine *engine)
462{
463    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
464}
465
466
467static void
468shrink_batch_size (struct lsquic_engine *engine)
469{
470    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
471}
472
473
474/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */
475static void
476destroy_conn (lsquic_conn_t *conn)
477{
478    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
479    conn->cn_if->ci_destroy(conn);
480}
481
482
483static lsquic_conn_t *
484new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
485                      unsigned short max_packet_size)
486{
487    lsquic_conn_t *conn;
488    unsigned flags;
489    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
490    conn = full_conn_client_new(&engine->pub, engine->stream_if,
491                    engine->stream_if_ctx, flags, hostname, max_packet_size);
492    if (!conn)
493        return NULL;
494    if (0 != conn_hash_add_new(&engine->full_conns, conn))
495    {
496        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
497            conn->cn_cid);
498        destroy_conn(conn);
499        return NULL;
500    }
501    assert(!(conn->cn_flags &
502        (CONN_REF_FLAGS
503         & ~LSCONN_RW_PENDING /* This flag may be set as effect of user
504                                 callbacks */
505                             )));
506    conn->cn_flags |= LSCONN_HASHED;
507    return conn;
508}
509
510
511static lsquic_conn_t *
512find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
513         struct packin_parse_state *ppstate, const struct sockaddr *sa_peer,
514         void *peer_ctx)
515{
516    lsquic_conn_t *conn;
517
518    if (lsquic_packet_in_is_prst(packet_in)
519                                && !engine->pub.enp_settings.es_honor_prst)
520    {
521        LSQ_DEBUG("public reset packet: discarding");
522        return NULL;
523    }
524
525    if (!(packet_in->pi_flags & PI_CONN_ID))
526    {
527        LSQ_DEBUG("packet header does not have connection ID: discarding");
528        return NULL;
529    }
530
531    conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL);
532    if (conn)
533    {
534        conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
535        return conn;
536    }
537
538    return conn;
539}
540
541
542static void
543add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
544                                                        enum rw_reason reason)
545{
546    int hist_idx;
547
548    TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
549    engine_incref_conn(conn, LSCONN_RW_PENDING);
550
551    hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
552    conn->cn_rw_hist_buf[ hist_idx ] = reason;
553    ++conn->cn_rw_hist_idx;
554
555    if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
556        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
557            "rw_hist: %.*s", (char) reason,
558            (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
559    else
560        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
561                                                                (char) reason);
562}
563
564
565#if !defined(NDEBUG) && __GNUC__
566__attribute__((weak))
567#endif
568void
569lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
570                                    lsquic_conn_t *conn, enum rw_reason reason)
571{
572    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
573        0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
574    {
575        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
576        add_conn_to_pend_rw(engine, conn, reason);
577    }
578}
579
580
581void
582lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
583                                lsquic_conn_t *conn, lsquic_time_t tick_time)
584{
585    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
586    /* Instead of performing an update, we simply remove the connection from
587     * the queue and add it back.  This should not happen in at the time of
588     * this writing.
589     */
590    if (conn->cn_flags & LSCONN_ATTQ)
591    {
592        attq_remove(engine->attq, conn);
593        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
594    }
595    if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
596                        0 == attq_maybe_add(engine->attq, conn, tick_time))
597        engine_incref_conn(conn, LSCONN_ATTQ);
598}
599
600
601static void
602update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
603                                                            int progress_made)
604{
605    rw_hist_idx_t hist_idx;
606    const unsigned char *empty;
607    const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
608
609    if (!pendrw_check)
610        return;
611
612    /* Convert previous entry to uppercase: */
613    hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
614    conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
615
616    LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
617    if (progress_made)
618    {
619        conn->cn_noprogress_count = 0;
620        return;
621    }
622
623    EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
624                                                                "no progress");
625    ++conn->cn_noprogress_count;
626    if (conn->cn_noprogress_count <= pendrw_check)
627        return;
628
629    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
630    empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
631                                            sizeof(conn->cn_rw_hist_buf));
632    if (empty)
633        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
634            "(rw_hist: %.*s): will not put it onto Pend RW queue again",
635            conn->cn_cid, conn->cn_noprogress_count,
636            (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
637    else
638    {
639        hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
640        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
641            "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
642            conn->cn_cid, conn->cn_noprogress_count,
643            /* First part of history: */
644            (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
645                                            conn->cn_rw_hist_buf + hist_idx,
646            /* Second part of history: */
647            hist_idx, conn->cn_rw_hist_buf);
648    }
649}
650
651
652/* Return 0 if packet is being processed by a connections, otherwise return 1 */
653static int
654process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
655       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
656       const struct sockaddr *sa_peer, void *peer_ctx)
657{
658    lsquic_conn_t *conn;
659
660    conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx);
661    if (!conn)
662    {
663        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
664        return 1;
665    }
666
667    if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
668        TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
669        engine_incref_conn(conn, LSCONN_HAS_INCOMING);
670    }
671    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
672    lsquic_packet_in_upref(packet_in);
673    conn->cn_peer_ctx = peer_ctx;
674    conn->cn_if->ci_packet_in(conn, packet_in);
675    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
676    return 0;
677}
678
679
680static int
681conn_attq_expired (const struct lsquic_engine *engine,
682                                                const lsquic_conn_t *conn)
683{
684    assert(conn->cn_attq_elem);
685    return lsquic_conn_adv_time(conn) < engine->proc_time;
686}
687
688
689/* Iterator for connections with incoming packets */
690static lsquic_conn_t *
691conn_iter_next_incoming (struct lsquic_engine *engine)
692{
693    enum lsquic_conn_flags addl_flags;
694    lsquic_conn_t *conn;
695    while ((conn = TAILQ_FIRST(&engine->conns_in)))
696    {
697        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
698        if (conn->cn_flags & LSCONN_RW_PENDING)
699        {
700            TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
701            EV_LOG_CONN_EVENT(conn->cn_cid,
702                "removed from pending RW queue (processing incoming)");
703        }
704        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
705        {
706            addl_flags = LSCONN_ATTQ;
707            attq_remove(engine->attq, conn);
708        }
709        else
710            addl_flags = 0;
711        conn = engine_decref_conn(engine, conn,
712                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
713        if (conn)
714            break;
715    }
716    return conn;
717}
718
719
720/* Iterator for connections with that have pending read/write events */
721static lsquic_conn_t *
722conn_iter_next_rw_pend (struct lsquic_engine *engine)
723{
724    enum lsquic_conn_flags addl_flags;
725    lsquic_conn_t *conn;
726    while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
727    {
728        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
729        EV_LOG_CONN_EVENT(conn->cn_cid,
730            "removed from pending RW queue (processing pending RW conns)");
731        if (conn->cn_flags & LSCONN_HAS_INCOMING)
732            TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
733        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
734        {
735            addl_flags = LSCONN_ATTQ;
736            attq_remove(engine->attq, conn);
737        }
738        else
739            addl_flags = 0;
740        conn = engine_decref_conn(engine, conn,
741                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
742        if (conn)
743            break;
744    }
745    return conn;
746}
747
748
749void
750lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
751{
752    LSQ_DEBUG("process connections with incoming packets");
753    ENGINE_IN(engine);
754    process_connections(engine, conn_iter_next_incoming);
755    assert(TAILQ_EMPTY(&engine->conns_in));
756    ENGINE_OUT(engine);
757}
758
759
760int
761lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
762{
763    return !(engine->flags & ENG_PAST_DEADLINE)
764        && !TAILQ_EMPTY(&engine->conns_pend_rw);
765}
766
767
768void
769lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
770{
771    LSQ_DEBUG("process connections with pending RW events");
772    ENGINE_IN(engine);
773    process_connections(engine, conn_iter_next_rw_pend);
774    ENGINE_OUT(engine);
775}
776
777
778void
779lsquic_engine_destroy (lsquic_engine_t *engine)
780{
781    lsquic_conn_t *conn;
782
783    LSQ_DEBUG("destroying engine");
784#ifndef NDEBUG
785    engine->flags |= ENG_DTOR;
786#endif
787
788    while (engine->conns_out.oh_nelem > 0)
789    {
790        --engine->conns_out.oh_nelem;
791        conn = engine->conns_out.oh_elems[
792                                engine->conns_out.oh_nelem ].ohe_conn;
793        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
794        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
795    }
796
797    for (conn = conn_hash_first(&engine->full_conns); conn;
798                            conn = conn_hash_next(&engine->full_conns))
799        force_close_conn(engine, conn);
800    conn_hash_cleanup(&engine->full_conns);
801
802
803    attq_destroy(engine->attq);
804
805    assert(0 == engine->conns_out.oh_nelem);
806    assert(TAILQ_EMPTY(&engine->conns_pend_rw));
807    lsquic_mm_cleanup(&engine->pub.enp_mm);
808    free(engine->conns_out.oh_elems);
809    free(engine);
810}
811
812
813#if __GNUC__
814__attribute__((nonnull(3)))
815#endif
816static lsquic_conn_t *
817remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
818                                lsquic_conn_t *conn, const char *reason)
819{
820    assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
821    if (conn->cn_flags & LSCONN_HAS_INCOMING)
822        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
823    if (conn->cn_flags & LSCONN_RW_PENDING)
824    {
825        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
826        EV_LOG_CONN_EVENT(conn->cn_cid,
827                        "removed from pending RW queue (%s)", reason);
828    }
829    conn = engine_decref_conn(engine, conn,
830                        LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
831    assert(conn);
832    return conn;
833}
834
835
836static lsquic_conn_t *
837conn_iter_next_one (lsquic_engine_t *engine)
838{
839    lsquic_conn_t *conn = engine->iter_state.one.conn;
840    if (conn)
841    {
842        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
843            conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
844        if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
845                                            conn_attq_expired(engine, conn))
846        {
847            attq_remove(engine->attq, conn);
848            conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
849        }
850        engine->iter_state.one.conn = NULL;
851    }
852    return conn;
853}
854
855
856int
857lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
858                       void *conn_ctx, const char *hostname,
859                       unsigned short max_packet_size)
860{
861    lsquic_conn_t *conn;
862
863    if (engine->flags & ENG_SERVER)
864    {
865        LSQ_ERROR("`%s' must only be called in client mode", __func__);
866        return -1;
867    }
868
869    if (0 == max_packet_size)
870    {
871        switch (peer_sa->sa_family)
872        {
873        case AF_INET:
874            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
875            break;
876        default:
877            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
878            break;
879        }
880    }
881
882    conn = new_full_conn_client(engine, hostname, max_packet_size);
883    if (!conn)
884        return -1;
885    lsquic_conn_record_peer_sa(conn, peer_sa);
886    conn->cn_peer_ctx = conn_ctx;
887    engine->iter_state.one.conn = conn;
888    process_connections(engine, conn_iter_next_one);
889    return 0;
890}
891
892
893static void
894remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
895{
896        conn_hash_remove(&engine->full_conns, conn);
897    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
898}
899
900
901static void
902refflags2str (enum lsquic_conn_flags flags, char s[7])
903{
904    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
905    *s = 'H'; s += !!(flags & LSCONN_HASHED);
906    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
907    *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
908    *s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
909    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
910    *s = '\0';
911}
912
913
914static void
915engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
916{
917    char str[7];
918    assert(flag & CONN_REF_FLAGS);
919    assert(!(conn->cn_flags & flag));
920    conn->cn_flags |= flag;
921    LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
922                            (refflags2str(conn->cn_flags, str), str));
923}
924
925
926static lsquic_conn_t *
927engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
928                                        enum lsquic_conn_flags flags)
929{
930    char str[7];
931    assert(flags & CONN_REF_FLAGS);
932    assert(conn->cn_flags & flags);
933#ifndef NDEBUG
934    if (flags & LSCONN_CLOSING)
935        assert(0 == (conn->cn_flags & LSCONN_HASHED));
936#endif
937    conn->cn_flags &= ~flags;
938    LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
939                            (refflags2str(conn->cn_flags, str), str));
940    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
941    {
942            eng_hist_inc(&engine->history, 0, sl_del_full_conns);
943        destroy_conn(conn);
944        return NULL;
945    }
946    else
947        return conn;
948}
949
950
951/* This is not a general-purpose function.  Only call from engine dtor. */
952static void
953force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
954{
955    assert(engine->flags & ENG_DTOR);
956    const enum lsquic_conn_flags flags = conn->cn_flags;
957    assert(conn->cn_flags & CONN_REF_FLAGS);
958    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
959    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
960    if (flags & LSCONN_HAS_INCOMING)
961    {
962        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
963        (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
964    }
965    if (flags & LSCONN_RW_PENDING)
966    {
967        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
968        EV_LOG_CONN_EVENT(conn->cn_cid,
969            "removed from pending RW queue (engine destruction)");
970        (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
971    }
972    if (flags & LSCONN_ATTQ)
973        attq_remove(engine->attq, conn);
974    if (flags & LSCONN_HASHED)
975        remove_conn_from_hash(engine, conn);
976}
977
978
979/* Iterator for all connections.
980 * Returned connections are removed from the Incoming, Pending RW Event,
981 * and Advisory Tick Time queues if necessary.
982 */
983static lsquic_conn_t *
984conn_iter_next_all (struct lsquic_engine *engine)
985{
986    lsquic_conn_t *conn;
987
988    conn = conn_hash_next(&engine->full_conns);
989
990    if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
991        conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
992    if (conn && (conn->cn_flags & LSCONN_ATTQ)
993                                        && conn_attq_expired(engine, conn))
994    {
995        attq_remove(engine->attq, conn);
996        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
997    }
998
999    return conn;
1000}
1001
1002
1003static lsquic_conn_t *
1004conn_iter_next_attq (struct lsquic_engine *engine)
1005{
1006    lsquic_conn_t *conn;
1007
1008    conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
1009    if (conn)
1010    {
1011        assert(conn->cn_flags & LSCONN_ATTQ);
1012        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
1013            conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
1014        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1015    }
1016
1017    return conn;
1018}
1019
1020
1021void
1022lsquic_engine_proc_all (lsquic_engine_t *engine)
1023{
1024    ENGINE_IN(engine);
1025    /* We poke each connection every time as initial implementation.  If it
1026     * proves to be too inefficient, we will need to figure out
1027     *          a) when to stop processing; and
1028     *          b) how to remember state between calls.
1029     */
1030    conn_hash_reset_iter(&engine->full_conns);
1031    process_connections(engine, conn_iter_next_all);
1032    ENGINE_OUT(engine);
1033}
1034
1035
1036void
1037lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
1038{
1039    lsquic_time_t prev_min, cutoff;
1040
1041    LSQ_DEBUG("process connections in attq");
1042    ENGINE_IN(engine);
1043    cutoff = lsquic_time_now();
1044    prev_min = attq_set_min(engine->attq, cutoff);  /* Prevent infinite loop */
1045    engine->iter_state.attq.cutoff = cutoff;
1046    process_connections(engine, conn_iter_next_attq);
1047    attq_set_min(engine->attq, prev_min);           /* Restore previos value */
1048    ENGINE_OUT(engine);
1049}
1050
1051
1052static int
1053generate_header (const lsquic_packet_out_t *packet_out,
1054                 const struct parse_funcs *pf, lsquic_cid_t cid,
1055                 unsigned char *buf, size_t bufsz)
1056{
1057    return pf->pf_gen_reg_pkt_header(buf, bufsz,
1058        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
1059        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
1060        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
1061        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
1062}
1063
1064
1065static ssize_t
1066really_encrypt_packet (const lsquic_conn_t *conn,
1067                       const lsquic_packet_out_t *packet_out,
1068                       unsigned char *buf, size_t bufsz)
1069{
1070    int enc, header_sz, is_hello_packet;
1071    size_t packet_sz;
1072    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
1073
1074    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
1075                                            header_buf, sizeof(header_buf));
1076    if (header_sz < 0)
1077        return -1;
1078
1079    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
1080    enc = lsquic_enc(conn->cn_enc_session, conn->cn_version, 0,
1081                packet_out->po_packno, header_buf, header_sz,
1082                packet_out->po_data, packet_out->po_data_sz,
1083                buf, bufsz, &packet_sz, is_hello_packet);
1084    if (0 == enc)
1085    {
1086        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
1087            "ciphertext is %zd bytes",
1088            packet_out->po_packno,
1089            lsquic_po_header_length(packet_out->po_flags) +
1090                                                packet_out->po_data_sz,
1091            packet_sz);
1092        return packet_sz;
1093    }
1094    else
1095        return -1;
1096}
1097
1098
1099static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
1100encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
1101                                            lsquic_packet_out_t *packet_out)
1102{
1103    ssize_t enc_sz;
1104    size_t bufsz;
1105    unsigned char *buf;
1106
1107    bufsz = lsquic_po_header_length(packet_out->po_flags) +
1108                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
1109    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
1110    if (!buf)
1111    {
1112        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
1113                                                                        bufsz);
1114        return ENCPA_NOMEM;
1115    }
1116
1117        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
1118
1119    if (enc_sz < 0)
1120    {
1121        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
1122        return ENCPA_BADCRYPT;
1123    }
1124
1125    packet_out->po_enc_data    = buf;
1126    packet_out->po_enc_data_sz = enc_sz;
1127    packet_out->po_flags |= PO_ENCRYPTED;
1128
1129    return ENCPA_OK;
1130}
1131
1132
1133struct out_batch
1134{
1135    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
1136    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
1137    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
1138};
1139
1140
1141STAILQ_HEAD(closed_conns, lsquic_conn);
1142
1143
1144struct conns_out_iter
1145{
1146    struct out_heap            *coi_heap;
1147    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
1148                                coi_inactive_list;
1149    lsquic_conn_t              *coi_next;
1150#ifndef NDEBUG
1151    lsquic_time_t               coi_last_sent;
1152#endif
1153};
1154
1155
1156static void
1157coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
1158{
1159    iter->coi_heap = &engine->conns_out;
1160    iter->coi_next = NULL;
1161    TAILQ_INIT(&iter->coi_active_list);
1162    TAILQ_INIT(&iter->coi_inactive_list);
1163#ifndef NDEBUG
1164    iter->coi_last_sent = 0;
1165#endif
1166}
1167
1168
1169static lsquic_conn_t *
1170coi_next (struct conns_out_iter *iter)
1171{
1172    lsquic_conn_t *conn;
1173
1174    if (iter->coi_heap->oh_nelem > 0)
1175    {
1176        conn = oh_pop(iter->coi_heap);
1177        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1178        conn->cn_flags |= LSCONN_COI_ACTIVE;
1179#ifndef NDEBUG
1180        if (iter->coi_last_sent)
1181            assert(iter->coi_last_sent <= conn->cn_last_sent);
1182        iter->coi_last_sent = conn->cn_last_sent;
1183#endif
1184        return conn;
1185    }
1186    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1187    {
1188        conn = iter->coi_next;
1189        if (!conn)
1190            conn = TAILQ_FIRST(&iter->coi_active_list);
1191        if (conn)
1192            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1193        return conn;
1194    }
1195    else
1196        return NULL;
1197}
1198
1199
1200static void
1201coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1202{
1203    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1204    {
1205        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1206        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1207        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1208        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1209        conn->cn_flags |= LSCONN_COI_INACTIVE;
1210    }
1211}
1212
1213
1214static void
1215coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn)
1216{
1217    assert(conn->cn_flags & LSCONN_COI_ACTIVE);
1218    if (conn->cn_flags & LSCONN_COI_ACTIVE)
1219    {
1220        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1221        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1222    }
1223}
1224
1225
1226static void
1227coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1228{
1229    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1230    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1231    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1232    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1233    conn->cn_flags |= LSCONN_COI_ACTIVE;
1234}
1235
1236
1237static void
1238coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1239{
1240    lsquic_conn_t *conn;
1241    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1242    {
1243        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1244        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1245        oh_insert(iter->coi_heap, conn);
1246    }
1247    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1248    {
1249        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1250        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1251        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1252    }
1253}
1254
1255
1256static unsigned
1257send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1258                  struct out_batch *batch, unsigned n_to_send)
1259{
1260    int n_sent, i;
1261    lsquic_time_t now;
1262
1263    /* Set sent time before the write to avoid underestimating RTT */
1264    now = lsquic_time_now();
1265    for (i = 0; i < (int) n_to_send; ++i)
1266        batch->packets[i]->po_sent = now;
1267    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1268                                                                n_to_send);
1269    if (n_sent >= 0)
1270        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1271    else
1272    {
1273        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1274        n_sent = 0;
1275    }
1276    if (n_sent > 0)
1277        engine->last_sent = now + n_sent;
1278    for (i = 0; i < n_sent; ++i)
1279    {
1280        eng_hist_inc(&engine->history, now, sl_packets_out);
1281        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1282        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1283                                                    batch->packets[i]);
1284        /* `i' is added to maintain relative order */
1285        batch->conns[i]->cn_last_sent = now + i;
1286        /* Release packet out buffer as soon as the packet is sent
1287         * successfully.  If not successfully sent, we hold on to
1288         * this buffer until the packet sending is attempted again
1289         * or until it times out and regenerated.
1290         */
1291        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1292        {
1293            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1294            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1295                                                batch->packets[i]->po_enc_data);
1296            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1297        }
1298    }
1299    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1300        for ( ; i < (int) n_to_send; ++i)
1301            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1302    /* Return packets to the connection in reverse order so that the packet
1303     * ordering is maintained.
1304     */
1305    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1306    {
1307        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1308                                                    batch->packets[i]);
1309        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1310            coi_reactivate(conns_iter, batch->conns[i]);
1311    }
1312    return n_sent;
1313}
1314
1315
1316/* Return 1 if went past deadline, 0 otherwise */
1317static int
1318check_deadline (lsquic_engine_t *engine)
1319{
1320    if (engine->pub.enp_settings.es_proc_time_thresh &&
1321                                lsquic_time_now() > engine->deadline)
1322    {
1323        LSQ_INFO("went past threshold of %u usec, stop sending",
1324                            engine->pub.enp_settings.es_proc_time_thresh);
1325        engine->flags |= ENG_PAST_DEADLINE;
1326        return 1;
1327    }
1328    else
1329        return 0;
1330}
1331
1332
1333static void
1334send_packets_out (struct lsquic_engine *engine,
1335                  struct closed_conns *closed_conns)
1336{
1337    unsigned n, w, n_sent, n_batches_sent;
1338    lsquic_packet_out_t *packet_out;
1339    lsquic_conn_t *conn;
1340    struct out_batch batch;
1341    struct conns_out_iter conns_iter;
1342    int shrink, deadline_exceeded;
1343
1344    coi_init(&conns_iter, engine);
1345    n_batches_sent = 0;
1346    n_sent = 0, n = 0;
1347    shrink = 0;
1348    deadline_exceeded = 0;
1349
1350    while ((conn = coi_next(&conns_iter)))
1351    {
1352        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1353        if (!packet_out) {
1354            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1355                                                            conn->cn_cid);
1356            coi_deactivate(&conns_iter, conn);
1357            continue;
1358        }
1359        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1360        {
1361            switch (encrypt_packet(engine, conn, packet_out))
1362            {
1363            case ENCPA_NOMEM:
1364                /* Send what we have and wait for a more opportune moment */
1365                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1366                goto end_for;
1367            case ENCPA_BADCRYPT:
1368                /* This is pretty bad: close connection immediately */
1369                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1370                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1371                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1372                {
1373                    if (!(conn->cn_flags & LSCONN_CLOSING))
1374                    {
1375                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1376                        engine_incref_conn(conn, LSCONN_CLOSING);
1377                        if (conn->cn_flags & LSCONN_HASHED)
1378                            remove_conn_from_hash(engine, conn);
1379                    }
1380                    coi_remove(&conns_iter, conn);
1381                }
1382                continue;
1383            case ENCPA_OK:
1384                break;
1385            }
1386        }
1387        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1388                                        packet_out->po_packno, conn->cn_cid);
1389        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1390        if (packet_out->po_flags & PO_ENCRYPTED)
1391        {
1392            batch.outs[n].buf     = packet_out->po_enc_data;
1393            batch.outs[n].sz      = packet_out->po_enc_data_sz;
1394        }
1395        else
1396        {
1397            batch.outs[n].buf     = packet_out->po_data;
1398            batch.outs[n].sz      = packet_out->po_data_sz;
1399        }
1400        batch.outs   [n].peer_ctx = conn->cn_peer_ctx;
1401        batch.outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1402        batch.outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1403        batch.conns  [n]          = conn;
1404        batch.packets[n]          = packet_out;
1405        ++n;
1406        if (n == engine->batch_size)
1407        {
1408            n = 0;
1409            w = send_batch(engine, &conns_iter, &batch, engine->batch_size);
1410            ++n_batches_sent;
1411            n_sent += w;
1412            if (w < engine->batch_size)
1413            {
1414                shrink = 1;
1415                break;
1416            }
1417            deadline_exceeded = check_deadline(engine);
1418            if (deadline_exceeded)
1419                break;
1420            grow_batch_size(engine);
1421        }
1422    }
1423  end_for:
1424
1425    if (n > 0) {
1426        w = send_batch(engine, &conns_iter, &batch, n);
1427        n_sent += w;
1428        shrink = w < n;
1429        ++n_batches_sent;
1430        deadline_exceeded = check_deadline(engine);
1431    }
1432
1433    if (shrink)
1434        shrink_batch_size(engine);
1435    else if (n_batches_sent > 1 && !deadline_exceeded)
1436        grow_batch_size(engine);
1437
1438    coi_reheap(&conns_iter, engine);
1439
1440    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1441}
1442
1443
1444int
1445lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1446{
1447    return !(engine->flags & ENG_PAST_DEADLINE)
1448        && (    engine->conns_out.oh_nelem > 0
1449           )
1450    ;
1451}
1452
1453
1454static void
1455reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1456{
1457    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1458    engine->flags &= ~ENG_PAST_DEADLINE;
1459}
1460
1461
1462/* TODO: this is a user-facing function, account for load */
1463void
1464lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1465{
1466    lsquic_conn_t *conn;
1467    struct closed_conns closed_conns;
1468
1469    STAILQ_INIT(&closed_conns);
1470    reset_deadline(engine, lsquic_time_now());
1471
1472    send_packets_out(engine, &closed_conns);
1473
1474    while ((conn = STAILQ_FIRST(&closed_conns))) {
1475        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1476        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1477    }
1478
1479}
1480
1481
1482static void
1483process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
1484{
1485    lsquic_conn_t *conn;
1486    enum tick_st tick_st;
1487    lsquic_time_t now = lsquic_time_now();
1488    struct closed_conns closed_conns;
1489
1490    engine->proc_time = now;
1491    eng_hist_tick(&engine->history, now);
1492
1493    STAILQ_INIT(&closed_conns);
1494    reset_deadline(engine, now);
1495
1496    while ((conn = next_conn(engine)))
1497    {
1498        tick_st = conn->cn_if->ci_tick(conn, now);
1499        if (conn_iter_next_rw_pend == next_conn)
1500            update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
1501        if (tick_st & TICK_SEND)
1502        {
1503            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1504            {
1505                oh_insert(&engine->conns_out, conn);
1506                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1507            }
1508        }
1509        if (tick_st & TICK_CLOSE)
1510        {
1511            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1512            engine_incref_conn(conn, LSCONN_CLOSING);
1513            if (conn->cn_flags & LSCONN_HASHED)
1514                remove_conn_from_hash(engine, conn);
1515        }
1516    }
1517
1518    if (lsquic_engine_has_unsent_packets(engine))
1519        send_packets_out(engine, &closed_conns);
1520
1521    while ((conn = STAILQ_FIRST(&closed_conns))) {
1522        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1523        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1524    }
1525
1526}
1527
1528
1529/* Return 0 if packet is being processed by a real connection, 1 if the
1530 * packet was processed, but not by a connection, and -1 on error.
1531 */
1532int
1533lsquic_engine_packet_in (lsquic_engine_t *engine,
1534    const unsigned char *packet_in_data, size_t packet_in_size,
1535    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1536    void *peer_ctx)
1537{
1538    struct packin_parse_state ppstate;
1539    lsquic_packet_in_t *packet_in;
1540
1541    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1542    {
1543        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1544            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1545        errno = E2BIG;
1546        return -1;
1547    }
1548
1549    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1550    if (!packet_in)
1551        return -1;
1552
1553    /* Library does not modify packet_in_data, it is not referenced after
1554     * this function returns and subsequent release of pi_data is guarded
1555     * by PI_OWN_DATA flag.
1556     */
1557    packet_in->pi_data = (unsigned char *) packet_in_data;
1558    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1559                                        engine->flags & ENG_SERVER, &ppstate))
1560    {
1561        LSQ_DEBUG("Cannot parse incoming packet's header");
1562        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1563        errno = EINVAL;
1564        return -1;
1565    }
1566
1567    packet_in->pi_received = lsquic_time_now();
1568    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1569    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1570                                                                    peer_ctx);
1571}
1572
1573
1574#if __GNUC__ && !defined(NDEBUG)
1575__attribute__((weak))
1576#endif
1577unsigned
1578lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1579{
1580    return engine->pub.enp_settings.es_versions;
1581}
1582
1583
1584int
1585lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1586{
1587    const lsquic_time_t *next_time;
1588    lsquic_time_t now;
1589
1590    next_time = attq_next_time(engine->attq);
1591    if (!next_time)
1592        return 0;
1593
1594    now = lsquic_time_now();
1595    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1596    return 1;
1597}
1598
1599
1600unsigned
1601lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1602{
1603    lsquic_time_t now;
1604    now = lsquic_time_now();
1605    if (from_now < 0)
1606        now -= from_now;
1607    else
1608        now += from_now;
1609    return attq_count_before(engine->attq, now);
1610}
1611
1612
1613