lsquic_engine.c revision c51ce338
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/time.h>
15#include <time.h>
16#include <netinet/in.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21#include <netdb.h>
22
23
24
25#include "lsquic.h"
26#include "lsquic_types.h"
27#include "lsquic_alarmset.h"
28#include "lsquic_parse.h"
29#include "lsquic_packet_in.h"
30#include "lsquic_packet_out.h"
31#include "lsquic_senhist.h"
32#include "lsquic_rtt.h"
33#include "lsquic_cubic.h"
34#include "lsquic_pacer.h"
35#include "lsquic_send_ctl.h"
36#include "lsquic_set.h"
37#include "lsquic_conn_flow.h"
38#include "lsquic_sfcw.h"
39#include "lsquic_stream.h"
40#include "lsquic_conn.h"
41#include "lsquic_full_conn.h"
42#include "lsquic_util.h"
43#include "lsquic_qtags.h"
44#include "lsquic_str.h"
45#include "lsquic_handshake.h"
46#include "lsquic_mm.h"
47#include "lsquic_conn_hash.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_eng_hist.h"
50#include "lsquic_ev_log.h"
51#include "lsquic_version.h"
52#include "lsquic_attq.h"
53
54#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
55#include "lsquic_logger.h"
56
57
58/* The batch of outgoing packets grows and shrinks dynamically */
59#define MAX_OUT_BATCH_SIZE 1024
60#define MIN_OUT_BATCH_SIZE 256
61#define INITIAL_OUT_BATCH_SIZE 512
62
63typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
64
65static void
66process_connections (struct lsquic_engine *engine, conn_iter_f iter);
67
68static void
69engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
70
71static lsquic_conn_t *
72engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
73                                        enum lsquic_conn_flags flag);
74
75static void
76force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
77
78/* Nested calls to LSQUIC are not supported */
79#define ENGINE_IN(e) do {                               \
80    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
81    (e)->pub.enp_flags |= ENPUB_PROC;                   \
82} while (0)
83
84#define ENGINE_OUT(e) do {                              \
85    assert((e)->pub.enp_flags & ENPUB_PROC);            \
86    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
87} while (0)
88
89/* A connection can be referenced from one of six places:
90 *
91 *   1. Connection hash: a connection starts its life in one of those.
92 *
93 *   2. Outgoing queue.
94 *
95 *   3. Incoming queue.
96 *
97 *   4. Pending RW Events queue.
98 *
99 *   5. Advisory Tick Time queue.
100 *
101 *   6. Closing connections queue.  This is a transient queue -- it only
102 *      exists for the duration of process_connections() function call.
103 *
104 * The idea is to destroy the connection when it is no longer referenced.
105 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
106 * that case, the connection is referenced from two places: (2) and (6).
107 * After its packets are sent, it is only referenced in (6), and at the
108 * end of the function call, when it is removed from (6), reference count
109 * goes to zero and the connection is destroyed.  If not all packets can
110 * be sent, at the end of the function call, the connection is referenced
111 * by (2) and will only be removed once all outgoing packets have been
112 * sent.
113 */
114#define CONN_REF_FLAGS  (LSCONN_HASHED          \
115                        |LSCONN_HAS_OUTGOING    \
116                        |LSCONN_HAS_INCOMING    \
117                        |LSCONN_RW_PENDING      \
118                        |LSCONN_CLOSING         \
119                        |LSCONN_ATTQ)
120
121
122struct out_heap_elem
123{
124    struct lsquic_conn  *ohe_conn;
125    lsquic_time_t        ohe_last_sent;
126};
127
128
129struct out_heap
130{
131    struct out_heap_elem    *oh_elems;
132    unsigned                 oh_nalloc,
133                             oh_nelem;
134};
135
136
137struct lsquic_engine
138{
139    struct lsquic_engine_public        pub;
140    enum {
141        ENG_SERVER      = LSENG_SERVER,
142        ENG_HTTP        = LSENG_HTTP,
143        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
144        ENG_PAST_DEADLINE
145                        = (1 <<  8),    /* Previous call to a processing
146                                         * function went past time threshold.
147                                         */
148#ifndef NDEBUG
149        ENG_DTOR        = (1 << 26),    /* Engine destructor */
150#endif
151    }                                  flags;
152    const struct lsquic_stream_if     *stream_if;
153    void                              *stream_if_ctx;
154    lsquic_packets_out_f               packets_out;
155    void                              *packets_out_ctx;
156    void                              *bad_handshake_ctx;
157    struct conn_hash                   full_conns;
158    TAILQ_HEAD(, lsquic_conn)          conns_in, conns_pend_rw;
159    struct out_heap                    conns_out;
160    /* Use a union because only one iterator is being used at any one time */
161    union {
162        struct {
163            /* This iterator does not have any state: it uses `conns_in' */
164        }           conn_in;
165        struct {
166            /* This iterator does not have any state: it uses `conns_pend_rw' */
167        }           rw_pend;
168        struct {
169            /* Iterator state to process connections in Advisory Tick Time
170             * queue.
171             */
172            lsquic_time_t   cutoff;
173        }           attq;
174        struct {
175            /* Iterator state to process all connections */
176        }           all;
177        struct {
178            lsquic_conn_t  *conn;
179        }           one;
180    }                                  iter_state;
181    struct eng_hist                    history;
182    unsigned                           batch_size;
183    unsigned                           time_until_desired_tick;
184    struct attq                       *attq;
185    lsquic_time_t                      proc_time;
186    /* Track time last time a packet was sent to give new connections
187     * priority lower than that of existing connections.
188     */
189    lsquic_time_t                      last_sent;
190    lsquic_time_t                      deadline;
191};
192
193
194#define OHE_PARENT(i) ((i - 1) / 2)
195#define OHE_LCHILD(i) (2 * i + 1)
196#define OHE_RCHILD(i) (2 * i + 2)
197
198
199static void
200heapify_out_heap (struct out_heap *heap, unsigned i)
201{
202    struct out_heap_elem el;
203    unsigned smallest;
204
205    assert(i < heap->oh_nelem);
206
207    if (OHE_LCHILD(i) < heap->oh_nelem)
208    {
209        if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
210                                    heap->oh_elems[ i ].ohe_last_sent)
211            smallest = OHE_LCHILD(i);
212        else
213            smallest = i;
214        if (OHE_RCHILD(i) < heap->oh_nelem &&
215            heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
216                                    heap->oh_elems[ smallest ].ohe_last_sent)
217            smallest = OHE_RCHILD(i);
218    }
219    else
220        smallest = i;
221
222    if (smallest != i)
223    {
224        el = heap->oh_elems[ smallest ];
225        heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
226        heap->oh_elems[ i ] = el;
227        heapify_out_heap(heap, smallest);
228    }
229}
230
231
232static void
233oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
234{
235    struct out_heap_elem el;
236    unsigned nalloc, i;
237
238    if (heap->oh_nelem == heap->oh_nalloc)
239    {
240        if (0 == heap->oh_nalloc)
241            nalloc = 4;
242        else
243            nalloc = heap->oh_nalloc * 2;
244        heap->oh_elems = realloc(heap->oh_elems,
245                                    nalloc * sizeof(heap->oh_elems[0]));
246        if (!heap->oh_elems)
247        {   /* Not much we can do here */
248            LSQ_ERROR("realloc failed");
249            return;
250        }
251        heap->oh_nalloc = nalloc;
252    }
253
254    heap->oh_elems[ heap->oh_nelem ].ohe_conn      = conn;
255    heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
256    ++heap->oh_nelem;
257
258    i = heap->oh_nelem - 1;
259    while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
260                                    heap->oh_elems[ i ].ohe_last_sent)
261    {
262        el = heap->oh_elems[ OHE_PARENT(i) ];
263        heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
264        heap->oh_elems[ i ] = el;
265        i = OHE_PARENT(i);
266    }
267}
268
269
270static struct lsquic_conn *
271oh_pop (struct out_heap *heap)
272{
273    struct lsquic_conn *conn;
274
275    assert(heap->oh_nelem);
276
277    conn = heap->oh_elems[0].ohe_conn;
278    --heap->oh_nelem;
279    if (heap->oh_nelem > 0)
280    {
281        heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
282        heapify_out_heap(heap, 0);
283    }
284
285    return conn;
286}
287
288
289void
290lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
291                             unsigned flags)
292{
293    memset(settings, 0, sizeof(*settings));
294    settings->es_versions        = LSQUIC_DF_VERSIONS;
295    if (flags & ENG_SERVER)
296    {
297        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
298        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
299        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
300    }
301    else
302    {
303        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
304        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
305        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
306    }
307    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
308    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
309    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
310    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
311    settings->es_max_header_list_size
312                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
313    settings->es_ua              = LSQUIC_DF_UA;
314
315    settings->es_pdmd            = QTAG_X509;
316    settings->es_aead            = QTAG_AESG;
317    settings->es_kexs            = QTAG_C255;
318    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
319    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
320    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
321    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
322    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
323    settings->es_pendrw_check    = LSQUIC_DF_PENDRW_CHECK;
324    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
325    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
326    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
327}
328
329
330/* Note: if returning an error, err_buf must be valid if non-NULL */
331int
332lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
333                              unsigned flags,
334                              char *err_buf, size_t err_buf_sz)
335{
336    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
337        settings->es_sfcw < LSQUIC_MIN_FCW)
338    {
339        if (err_buf)
340            snprintf(err_buf, err_buf_sz, "%s",
341                                            "flow control window set too low");
342        return -1;
343    }
344    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
345    {
346        if (err_buf)
347            snprintf(err_buf, err_buf_sz, "%s",
348                        "No supported QUIC versions specified");
349        return -1;
350    }
351    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
352    {
353        if (err_buf)
354            snprintf(err_buf, err_buf_sz, "%s",
355                        "one or more unsupported QUIC version is specified");
356        return -1;
357    }
358    return 0;
359}
360
361
362static void
363free_packet (void *ctx, unsigned char *packet_data)
364{
365    free(packet_data);
366}
367
368
369static void *
370malloc_buf (void *ctx, size_t size)
371{
372    return malloc(size);
373}
374
375
376static const struct lsquic_packout_mem_if stock_pmi =
377{
378    malloc_buf, (void(*)(void *, void *)) free_packet,
379};
380
381
382lsquic_engine_t *
383lsquic_engine_new (unsigned flags,
384                   const struct lsquic_engine_api *api)
385{
386    lsquic_engine_t *engine;
387    int tag_buf_len;
388    char err_buf[100];
389
390    if (!api->ea_packets_out)
391    {
392        LSQ_ERROR("packets_out callback is not specified");
393        return NULL;
394    }
395
396    if (api->ea_settings &&
397                0 != lsquic_engine_check_settings(api->ea_settings, flags,
398                                                    err_buf, sizeof(err_buf)))
399    {
400        LSQ_ERROR("cannot create engine: %s", err_buf);
401        return NULL;
402    }
403
404    engine = calloc(1, sizeof(*engine));
405    if (!engine)
406        return NULL;
407    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
408    {
409        free(engine);
410        return NULL;
411    }
412    if (api->ea_settings)
413        engine->pub.enp_settings        = *api->ea_settings;
414    else
415        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
416    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
417                                    sizeof(engine->pub.enp_ver_tags_buf),
418                                    engine->pub.enp_settings.es_versions);
419    if (tag_buf_len <= 0)
420    {
421        LSQ_ERROR("cannot generate version tags buffer");
422        free(engine);
423        return NULL;
424    }
425    engine->pub.enp_ver_tags_len = tag_buf_len;
426
427    engine->flags           = flags;
428    engine->stream_if       = api->ea_stream_if;
429    engine->stream_if_ctx   = api->ea_stream_if_ctx;
430    engine->packets_out     = api->ea_packets_out;
431    engine->packets_out_ctx = api->ea_packets_out_ctx;
432    if (api->ea_pmi)
433    {
434        engine->pub.enp_pmi      = api->ea_pmi;
435        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
436    }
437    else
438    {
439        engine->pub.enp_pmi      = &stock_pmi;
440        engine->pub.enp_pmi_ctx  = NULL;
441    }
442    engine->pub.enp_engine = engine;
443    TAILQ_INIT(&engine->conns_in);
444    TAILQ_INIT(&engine->conns_pend_rw);
445    conn_hash_init(&engine->full_conns, ~0);
446    engine->attq = attq_create();
447    eng_hist_init(&engine->history);
448    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
449
450
451    LSQ_INFO("instantiated engine");
452    return engine;
453}
454
455
456static void
457grow_batch_size (struct lsquic_engine *engine)
458{
459    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
460}
461
462
463static void
464shrink_batch_size (struct lsquic_engine *engine)
465{
466    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
467}
468
469
470/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */
471static void
472destroy_conn (lsquic_conn_t *conn)
473{
474    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
475    conn->cn_if->ci_destroy(conn);
476}
477
478
479static lsquic_conn_t *
480new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
481                      unsigned short max_packet_size)
482{
483    lsquic_conn_t *conn;
484    unsigned flags;
485    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
486    conn = full_conn_client_new(&engine->pub, engine->stream_if,
487                    engine->stream_if_ctx, flags, hostname, max_packet_size);
488    if (!conn)
489        return NULL;
490    if (0 != conn_hash_add_new(&engine->full_conns, conn))
491    {
492        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
493            conn->cn_cid);
494        destroy_conn(conn);
495        return NULL;
496    }
497    assert(!(conn->cn_flags &
498        (CONN_REF_FLAGS
499         & ~LSCONN_RW_PENDING /* This flag may be set as effect of user
500                                 callbacks */
501                             )));
502    conn->cn_flags |= LSCONN_HASHED;
503    return conn;
504}
505
506
507static lsquic_conn_t *
508find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
509         struct packin_parse_state *ppstate, const struct sockaddr *sa_peer,
510         void *peer_ctx)
511{
512    lsquic_conn_t *conn;
513
514    if (lsquic_packet_in_is_prst(packet_in)
515                                && !engine->pub.enp_settings.es_honor_prst)
516    {
517        LSQ_DEBUG("public reset packet: discarding");
518        return NULL;
519    }
520
521    if (!(packet_in->pi_flags & PI_CONN_ID))
522    {
523        LSQ_DEBUG("packet header does not have connection ID: discarding");
524        return NULL;
525    }
526
527    conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL);
528    if (conn)
529    {
530        conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
531        return conn;
532    }
533
534    return conn;
535}
536
537
538static void
539add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
540                                                        enum rw_reason reason)
541{
542    int hist_idx;
543
544    TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
545    engine_incref_conn(conn, LSCONN_RW_PENDING);
546
547    hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
548    conn->cn_rw_hist_buf[ hist_idx ] = reason;
549    ++conn->cn_rw_hist_idx;
550
551    if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
552        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
553            "rw_hist: %.*s", (char) reason,
554            (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
555    else
556        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
557                                                                (char) reason);
558}
559
560
561#if !defined(NDEBUG) && __GNUC__
562__attribute__((weak))
563#endif
564void
565lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
566                                    lsquic_conn_t *conn, enum rw_reason reason)
567{
568    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
569        0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
570    {
571        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
572        add_conn_to_pend_rw(engine, conn, reason);
573    }
574}
575
576
577void
578lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
579                                lsquic_conn_t *conn, lsquic_time_t tick_time)
580{
581    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
582    /* Instead of performing an update, we simply remove the connection from
583     * the queue and add it back.  This should not happen in at the time of
584     * this writing.
585     */
586    if (conn->cn_flags & LSCONN_ATTQ)
587    {
588        attq_remove(engine->attq, conn);
589        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
590    }
591    if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
592                        0 == attq_maybe_add(engine->attq, conn, tick_time))
593        engine_incref_conn(conn, LSCONN_ATTQ);
594}
595
596
597static void
598update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
599                                                            int progress_made)
600{
601    rw_hist_idx_t hist_idx;
602    const unsigned char *empty;
603    const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
604
605    if (!pendrw_check)
606        return;
607
608    /* Convert previous entry to uppercase: */
609    hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
610    conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
611
612    LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
613    if (progress_made)
614    {
615        conn->cn_noprogress_count = 0;
616        return;
617    }
618
619    EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
620                                                                "no progress");
621    ++conn->cn_noprogress_count;
622    if (conn->cn_noprogress_count <= pendrw_check)
623        return;
624
625    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
626    empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
627                                            sizeof(conn->cn_rw_hist_buf));
628    if (empty)
629        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
630            "(rw_hist: %.*s): will not put it onto Pend RW queue again",
631            conn->cn_cid, conn->cn_noprogress_count,
632            (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
633    else
634    {
635        hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
636        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
637            "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
638            conn->cn_cid, conn->cn_noprogress_count,
639            /* First part of history: */
640            (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
641                                            conn->cn_rw_hist_buf + hist_idx,
642            /* Second part of history: */
643            hist_idx, conn->cn_rw_hist_buf);
644    }
645}
646
647
648/* Return 0 if packet is being processed by a connections, otherwise return 1 */
649static int
650process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
651       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
652       const struct sockaddr *sa_peer, void *peer_ctx)
653{
654    lsquic_conn_t *conn;
655
656    conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx);
657    if (!conn)
658    {
659        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
660        return 1;
661    }
662
663    if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
664        TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
665        engine_incref_conn(conn, LSCONN_HAS_INCOMING);
666    }
667    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
668    lsquic_packet_in_upref(packet_in);
669    conn->cn_peer_ctx = peer_ctx;
670    conn->cn_if->ci_packet_in(conn, packet_in);
671    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
672    return 0;
673}
674
675
676static int
677conn_attq_expired (const struct lsquic_engine *engine,
678                                                const lsquic_conn_t *conn)
679{
680    assert(conn->cn_attq_elem);
681    return lsquic_conn_adv_time(conn) < engine->proc_time;
682}
683
684
685/* Iterator for connections with incoming packets */
686static lsquic_conn_t *
687conn_iter_next_incoming (struct lsquic_engine *engine)
688{
689    enum lsquic_conn_flags addl_flags;
690    lsquic_conn_t *conn;
691    while ((conn = TAILQ_FIRST(&engine->conns_in)))
692    {
693        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
694        if (conn->cn_flags & LSCONN_RW_PENDING)
695        {
696            TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
697            EV_LOG_CONN_EVENT(conn->cn_cid,
698                "removed from pending RW queue (processing incoming)");
699        }
700        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
701        {
702            addl_flags = LSCONN_ATTQ;
703            attq_remove(engine->attq, conn);
704        }
705        else
706            addl_flags = 0;
707        conn = engine_decref_conn(engine, conn,
708                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
709        if (conn)
710            break;
711    }
712    return conn;
713}
714
715
716/* Iterator for connections with that have pending read/write events */
717static lsquic_conn_t *
718conn_iter_next_rw_pend (struct lsquic_engine *engine)
719{
720    enum lsquic_conn_flags addl_flags;
721    lsquic_conn_t *conn;
722    while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
723    {
724        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
725        EV_LOG_CONN_EVENT(conn->cn_cid,
726            "removed from pending RW queue (processing pending RW conns)");
727        if (conn->cn_flags & LSCONN_HAS_INCOMING)
728            TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
729        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
730        {
731            addl_flags = LSCONN_ATTQ;
732            attq_remove(engine->attq, conn);
733        }
734        else
735            addl_flags = 0;
736        conn = engine_decref_conn(engine, conn,
737                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
738        if (conn)
739            break;
740    }
741    return conn;
742}
743
744
745void
746lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
747{
748    LSQ_DEBUG("process connections with incoming packets");
749    ENGINE_IN(engine);
750    process_connections(engine, conn_iter_next_incoming);
751    assert(TAILQ_EMPTY(&engine->conns_in));
752    ENGINE_OUT(engine);
753}
754
755
756int
757lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
758{
759    return !(engine->flags & ENG_PAST_DEADLINE)
760        && !TAILQ_EMPTY(&engine->conns_pend_rw);
761}
762
763
764void
765lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
766{
767    LSQ_DEBUG("process connections with pending RW events");
768    ENGINE_IN(engine);
769    process_connections(engine, conn_iter_next_rw_pend);
770    ENGINE_OUT(engine);
771}
772
773
774void
775lsquic_engine_destroy (lsquic_engine_t *engine)
776{
777    lsquic_conn_t *conn;
778
779    LSQ_DEBUG("destroying engine");
780#ifndef NDEBUG
781    engine->flags |= ENG_DTOR;
782#endif
783
784    while (engine->conns_out.oh_nelem > 0)
785    {
786        --engine->conns_out.oh_nelem;
787        conn = engine->conns_out.oh_elems[
788                                engine->conns_out.oh_nelem ].ohe_conn;
789        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
790        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
791    }
792
793    for (conn = conn_hash_first(&engine->full_conns); conn;
794                            conn = conn_hash_next(&engine->full_conns))
795        force_close_conn(engine, conn);
796    conn_hash_cleanup(&engine->full_conns);
797
798
799    attq_destroy(engine->attq);
800
801    assert(0 == engine->conns_out.oh_nelem);
802    assert(TAILQ_EMPTY(&engine->conns_pend_rw));
803    lsquic_mm_cleanup(&engine->pub.enp_mm);
804    free(engine->conns_out.oh_elems);
805    free(engine);
806}
807
808
809#if __GNUC__
810__attribute__((nonnull(3)))
811#endif
812static lsquic_conn_t *
813remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
814                                lsquic_conn_t *conn, const char *reason)
815{
816    assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
817    if (conn->cn_flags & LSCONN_HAS_INCOMING)
818        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
819    if (conn->cn_flags & LSCONN_RW_PENDING)
820    {
821        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
822        EV_LOG_CONN_EVENT(conn->cn_cid,
823                        "removed from pending RW queue (%s)", reason);
824    }
825    conn = engine_decref_conn(engine, conn,
826                        LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
827    assert(conn);
828    return conn;
829}
830
831
832static lsquic_conn_t *
833conn_iter_next_one (lsquic_engine_t *engine)
834{
835    lsquic_conn_t *conn = engine->iter_state.one.conn;
836    if (conn)
837    {
838        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
839            conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
840        if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
841                                            conn_attq_expired(engine, conn))
842        {
843            attq_remove(engine->attq, conn);
844            conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
845        }
846        engine->iter_state.one.conn = NULL;
847    }
848    return conn;
849}
850
851
852int
853lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
854                       void *conn_ctx, const char *hostname,
855                       unsigned short max_packet_size)
856{
857    lsquic_conn_t *conn;
858
859    if (engine->flags & ENG_SERVER)
860    {
861        LSQ_ERROR("`%s' must only be called in client mode", __func__);
862        return -1;
863    }
864
865    if (0 == max_packet_size)
866    {
867        switch (peer_sa->sa_family)
868        {
869        case AF_INET:
870            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
871            break;
872        default:
873            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
874            break;
875        }
876    }
877
878    conn = new_full_conn_client(engine, hostname, max_packet_size);
879    if (!conn)
880        return -1;
881    ENGINE_IN(engine);
882    lsquic_conn_record_peer_sa(conn, peer_sa);
883    conn->cn_peer_ctx = conn_ctx;
884    engine->iter_state.one.conn = conn;
885    process_connections(engine, conn_iter_next_one);
886    ENGINE_OUT(engine);
887    return 0;
888}
889
890
891static void
892remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
893{
894        conn_hash_remove(&engine->full_conns, conn);
895    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
896}
897
898
899static void
900refflags2str (enum lsquic_conn_flags flags, char s[7])
901{
902    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
903    *s = 'H'; s += !!(flags & LSCONN_HASHED);
904    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
905    *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
906    *s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
907    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
908    *s = '\0';
909}
910
911
912static void
913engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
914{
915    char str[7];
916    assert(flag & CONN_REF_FLAGS);
917    assert(!(conn->cn_flags & flag));
918    conn->cn_flags |= flag;
919    LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
920                            (refflags2str(conn->cn_flags, str), str));
921}
922
923
924static lsquic_conn_t *
925engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
926                                        enum lsquic_conn_flags flags)
927{
928    char str[7];
929    assert(flags & CONN_REF_FLAGS);
930    assert(conn->cn_flags & flags);
931#ifndef NDEBUG
932    if (flags & LSCONN_CLOSING)
933        assert(0 == (conn->cn_flags & LSCONN_HASHED));
934#endif
935    conn->cn_flags &= ~flags;
936    LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
937                            (refflags2str(conn->cn_flags, str), str));
938    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
939    {
940            eng_hist_inc(&engine->history, 0, sl_del_full_conns);
941        destroy_conn(conn);
942        return NULL;
943    }
944    else
945        return conn;
946}
947
948
949/* This is not a general-purpose function.  Only call from engine dtor. */
950static void
951force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
952{
953    assert(engine->flags & ENG_DTOR);
954    const enum lsquic_conn_flags flags = conn->cn_flags;
955    assert(conn->cn_flags & CONN_REF_FLAGS);
956    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
957    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
958    if (flags & LSCONN_HAS_INCOMING)
959    {
960        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
961        (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
962    }
963    if (flags & LSCONN_RW_PENDING)
964    {
965        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
966        EV_LOG_CONN_EVENT(conn->cn_cid,
967            "removed from pending RW queue (engine destruction)");
968        (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
969    }
970    if (flags & LSCONN_ATTQ)
971        attq_remove(engine->attq, conn);
972    if (flags & LSCONN_HASHED)
973        remove_conn_from_hash(engine, conn);
974}
975
976
977/* Iterator for all connections.
978 * Returned connections are removed from the Incoming, Pending RW Event,
979 * and Advisory Tick Time queues if necessary.
980 */
981static lsquic_conn_t *
982conn_iter_next_all (struct lsquic_engine *engine)
983{
984    lsquic_conn_t *conn;
985
986    conn = conn_hash_next(&engine->full_conns);
987
988    if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
989        conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
990    if (conn && (conn->cn_flags & LSCONN_ATTQ)
991                                        && conn_attq_expired(engine, conn))
992    {
993        attq_remove(engine->attq, conn);
994        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
995    }
996
997    return conn;
998}
999
1000
1001static lsquic_conn_t *
1002conn_iter_next_attq (struct lsquic_engine *engine)
1003{
1004    lsquic_conn_t *conn;
1005
1006    conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
1007    if (conn)
1008    {
1009        assert(conn->cn_flags & LSCONN_ATTQ);
1010        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
1011            conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
1012        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1013    }
1014
1015    return conn;
1016}
1017
1018
1019void
1020lsquic_engine_proc_all (lsquic_engine_t *engine)
1021{
1022    ENGINE_IN(engine);
1023    /* We poke each connection every time as initial implementation.  If it
1024     * proves to be too inefficient, we will need to figure out
1025     *          a) when to stop processing; and
1026     *          b) how to remember state between calls.
1027     */
1028    conn_hash_reset_iter(&engine->full_conns);
1029    process_connections(engine, conn_iter_next_all);
1030    ENGINE_OUT(engine);
1031}
1032
1033
1034void
1035lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
1036{
1037    lsquic_time_t prev_min, cutoff;
1038
1039    LSQ_DEBUG("process connections in attq");
1040    ENGINE_IN(engine);
1041    cutoff = lsquic_time_now();
1042    prev_min = attq_set_min(engine->attq, cutoff);  /* Prevent infinite loop */
1043    engine->iter_state.attq.cutoff = cutoff;
1044    process_connections(engine, conn_iter_next_attq);
1045    attq_set_min(engine->attq, prev_min);           /* Restore previos value */
1046    ENGINE_OUT(engine);
1047}
1048
1049
1050static int
1051generate_header (const lsquic_packet_out_t *packet_out,
1052                 const struct parse_funcs *pf, lsquic_cid_t cid,
1053                 unsigned char *buf, size_t bufsz)
1054{
1055    return pf->pf_gen_reg_pkt_header(buf, bufsz,
1056        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
1057        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
1058        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
1059        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
1060}
1061
1062
1063static ssize_t
1064really_encrypt_packet (const lsquic_conn_t *conn,
1065                       const lsquic_packet_out_t *packet_out,
1066                       unsigned char *buf, size_t bufsz)
1067{
1068    int enc, header_sz, is_hello_packet;
1069    size_t packet_sz;
1070    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
1071
1072    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
1073                                            header_buf, sizeof(header_buf));
1074    if (header_sz < 0)
1075        return -1;
1076
1077    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
1078    enc = conn->cn_esf->esf_encrypt(conn->cn_enc_session, conn->cn_version, 0,
1079                packet_out->po_packno, header_buf, header_sz,
1080                packet_out->po_data, packet_out->po_data_sz,
1081                buf, bufsz, &packet_sz, is_hello_packet);
1082    if (0 == enc)
1083    {
1084        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
1085            "ciphertext is %zd bytes",
1086            packet_out->po_packno,
1087            lsquic_po_header_length(packet_out->po_flags) +
1088                                                packet_out->po_data_sz,
1089            packet_sz);
1090        return packet_sz;
1091    }
1092    else
1093        return -1;
1094}
1095
1096
1097static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
1098encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
1099                                            lsquic_packet_out_t *packet_out)
1100{
1101    ssize_t enc_sz;
1102    size_t bufsz;
1103    unsigned char *buf;
1104
1105    bufsz = lsquic_po_header_length(packet_out->po_flags) +
1106                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
1107    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
1108    if (!buf)
1109    {
1110        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
1111                                                                        bufsz);
1112        return ENCPA_NOMEM;
1113    }
1114
1115        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
1116
1117    if (enc_sz < 0)
1118    {
1119        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
1120        return ENCPA_BADCRYPT;
1121    }
1122
1123    packet_out->po_enc_data    = buf;
1124    packet_out->po_enc_data_sz = enc_sz;
1125    packet_out->po_flags |= PO_ENCRYPTED;
1126
1127    return ENCPA_OK;
1128}
1129
1130
1131struct out_batch
1132{
1133    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
1134    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
1135    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
1136};
1137
1138
1139STAILQ_HEAD(closed_conns, lsquic_conn);
1140
1141
1142struct conns_out_iter
1143{
1144    struct out_heap            *coi_heap;
1145    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
1146                                coi_inactive_list;
1147    lsquic_conn_t              *coi_next;
1148#ifndef NDEBUG
1149    lsquic_time_t               coi_last_sent;
1150#endif
1151};
1152
1153
1154static void
1155coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
1156{
1157    iter->coi_heap = &engine->conns_out;
1158    iter->coi_next = NULL;
1159    TAILQ_INIT(&iter->coi_active_list);
1160    TAILQ_INIT(&iter->coi_inactive_list);
1161#ifndef NDEBUG
1162    iter->coi_last_sent = 0;
1163#endif
1164}
1165
1166
1167static lsquic_conn_t *
1168coi_next (struct conns_out_iter *iter)
1169{
1170    lsquic_conn_t *conn;
1171
1172    if (iter->coi_heap->oh_nelem > 0)
1173    {
1174        conn = oh_pop(iter->coi_heap);
1175        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1176        conn->cn_flags |= LSCONN_COI_ACTIVE;
1177#ifndef NDEBUG
1178        if (iter->coi_last_sent)
1179            assert(iter->coi_last_sent <= conn->cn_last_sent);
1180        iter->coi_last_sent = conn->cn_last_sent;
1181#endif
1182        return conn;
1183    }
1184    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1185    {
1186        conn = iter->coi_next;
1187        if (!conn)
1188            conn = TAILQ_FIRST(&iter->coi_active_list);
1189        if (conn)
1190            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1191        return conn;
1192    }
1193    else
1194        return NULL;
1195}
1196
1197
1198static void
1199coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1200{
1201    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1202    {
1203        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1204        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1205        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1206        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1207        conn->cn_flags |= LSCONN_COI_INACTIVE;
1208    }
1209}
1210
1211
1212static void
1213coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn)
1214{
1215    assert(conn->cn_flags & LSCONN_COI_ACTIVE);
1216    if (conn->cn_flags & LSCONN_COI_ACTIVE)
1217    {
1218        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1219        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1220    }
1221}
1222
1223
1224static void
1225coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1226{
1227    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1228    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1229    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1230    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1231    conn->cn_flags |= LSCONN_COI_ACTIVE;
1232}
1233
1234
1235static void
1236coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1237{
1238    lsquic_conn_t *conn;
1239    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1240    {
1241        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1242        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1243        oh_insert(iter->coi_heap, conn);
1244    }
1245    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1246    {
1247        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1248        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1249        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1250    }
1251}
1252
1253
1254static unsigned
1255send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1256                  struct out_batch *batch, unsigned n_to_send)
1257{
1258    int n_sent, i;
1259    lsquic_time_t now;
1260
1261    /* Set sent time before the write to avoid underestimating RTT */
1262    now = lsquic_time_now();
1263    for (i = 0; i < (int) n_to_send; ++i)
1264        batch->packets[i]->po_sent = now;
1265    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1266                                                                n_to_send);
1267    if (n_sent >= 0)
1268        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1269    else
1270    {
1271        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1272        n_sent = 0;
1273    }
1274    if (n_sent > 0)
1275        engine->last_sent = now + n_sent;
1276    for (i = 0; i < n_sent; ++i)
1277    {
1278        eng_hist_inc(&engine->history, now, sl_packets_out);
1279        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1280        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1281                                                    batch->packets[i]);
1282        /* `i' is added to maintain relative order */
1283        batch->conns[i]->cn_last_sent = now + i;
1284        /* Release packet out buffer as soon as the packet is sent
1285         * successfully.  If not successfully sent, we hold on to
1286         * this buffer until the packet sending is attempted again
1287         * or until it times out and regenerated.
1288         */
1289        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1290        {
1291            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1292            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1293                                                batch->packets[i]->po_enc_data);
1294            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1295        }
1296    }
1297    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1298        for ( ; i < (int) n_to_send; ++i)
1299            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1300    /* Return packets to the connection in reverse order so that the packet
1301     * ordering is maintained.
1302     */
1303    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1304    {
1305        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1306                                                    batch->packets[i]);
1307        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1308            coi_reactivate(conns_iter, batch->conns[i]);
1309    }
1310    return n_sent;
1311}
1312
1313
1314/* Return 1 if went past deadline, 0 otherwise */
1315static int
1316check_deadline (lsquic_engine_t *engine)
1317{
1318    if (engine->pub.enp_settings.es_proc_time_thresh &&
1319                                lsquic_time_now() > engine->deadline)
1320    {
1321        LSQ_INFO("went past threshold of %u usec, stop sending",
1322                            engine->pub.enp_settings.es_proc_time_thresh);
1323        engine->flags |= ENG_PAST_DEADLINE;
1324        return 1;
1325    }
1326    else
1327        return 0;
1328}
1329
1330
1331static void
1332send_packets_out (struct lsquic_engine *engine,
1333                  struct closed_conns *closed_conns)
1334{
1335    unsigned n, w, n_sent, n_batches_sent;
1336    lsquic_packet_out_t *packet_out;
1337    lsquic_conn_t *conn;
1338    struct out_batch batch;
1339    struct conns_out_iter conns_iter;
1340    int shrink, deadline_exceeded;
1341
1342    coi_init(&conns_iter, engine);
1343    n_batches_sent = 0;
1344    n_sent = 0, n = 0;
1345    shrink = 0;
1346    deadline_exceeded = 0;
1347
1348    while ((conn = coi_next(&conns_iter)))
1349    {
1350        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1351        if (!packet_out) {
1352            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1353                                                            conn->cn_cid);
1354            coi_deactivate(&conns_iter, conn);
1355            continue;
1356        }
1357        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1358        {
1359            switch (encrypt_packet(engine, conn, packet_out))
1360            {
1361            case ENCPA_NOMEM:
1362                /* Send what we have and wait for a more opportune moment */
1363                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1364                goto end_for;
1365            case ENCPA_BADCRYPT:
1366                /* This is pretty bad: close connection immediately */
1367                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1368                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1369                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1370                {
1371                    if (!(conn->cn_flags & LSCONN_CLOSING))
1372                    {
1373                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1374                        engine_incref_conn(conn, LSCONN_CLOSING);
1375                        if (conn->cn_flags & LSCONN_HASHED)
1376                            remove_conn_from_hash(engine, conn);
1377                    }
1378                    coi_remove(&conns_iter, conn);
1379                }
1380                continue;
1381            case ENCPA_OK:
1382                break;
1383            }
1384        }
1385        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1386                                        packet_out->po_packno, conn->cn_cid);
1387        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1388        if (packet_out->po_flags & PO_ENCRYPTED)
1389        {
1390            batch.outs[n].buf     = packet_out->po_enc_data;
1391            batch.outs[n].sz      = packet_out->po_enc_data_sz;
1392        }
1393        else
1394        {
1395            batch.outs[n].buf     = packet_out->po_data;
1396            batch.outs[n].sz      = packet_out->po_data_sz;
1397        }
1398        batch.outs   [n].peer_ctx = conn->cn_peer_ctx;
1399        batch.outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1400        batch.outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1401        batch.conns  [n]          = conn;
1402        batch.packets[n]          = packet_out;
1403        ++n;
1404        if (n == engine->batch_size)
1405        {
1406            n = 0;
1407            w = send_batch(engine, &conns_iter, &batch, engine->batch_size);
1408            ++n_batches_sent;
1409            n_sent += w;
1410            if (w < engine->batch_size)
1411            {
1412                shrink = 1;
1413                break;
1414            }
1415            deadline_exceeded = check_deadline(engine);
1416            if (deadline_exceeded)
1417                break;
1418            grow_batch_size(engine);
1419        }
1420    }
1421  end_for:
1422
1423    if (n > 0) {
1424        w = send_batch(engine, &conns_iter, &batch, n);
1425        n_sent += w;
1426        shrink = w < n;
1427        ++n_batches_sent;
1428        deadline_exceeded = check_deadline(engine);
1429    }
1430
1431    if (shrink)
1432        shrink_batch_size(engine);
1433    else if (n_batches_sent > 1 && !deadline_exceeded)
1434        grow_batch_size(engine);
1435
1436    coi_reheap(&conns_iter, engine);
1437
1438    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1439}
1440
1441
1442int
1443lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1444{
1445    return !(engine->flags & ENG_PAST_DEADLINE)
1446        && (    engine->conns_out.oh_nelem > 0
1447           )
1448    ;
1449}
1450
1451
1452static void
1453reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1454{
1455    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1456    engine->flags &= ~ENG_PAST_DEADLINE;
1457}
1458
1459
1460/* TODO: this is a user-facing function, account for load */
1461void
1462lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1463{
1464    lsquic_conn_t *conn;
1465    struct closed_conns closed_conns;
1466
1467    STAILQ_INIT(&closed_conns);
1468    reset_deadline(engine, lsquic_time_now());
1469
1470    send_packets_out(engine, &closed_conns);
1471
1472    while ((conn = STAILQ_FIRST(&closed_conns))) {
1473        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1474        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1475    }
1476
1477}
1478
1479
1480static void
1481process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
1482{
1483    lsquic_conn_t *conn;
1484    enum tick_st tick_st;
1485    lsquic_time_t now = lsquic_time_now();
1486    struct closed_conns closed_conns;
1487
1488    engine->proc_time = now;
1489    eng_hist_tick(&engine->history, now);
1490
1491    STAILQ_INIT(&closed_conns);
1492    reset_deadline(engine, now);
1493
1494    while ((conn = next_conn(engine)))
1495    {
1496        tick_st = conn->cn_if->ci_tick(conn, now);
1497        if (conn_iter_next_rw_pend == next_conn)
1498            update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
1499        if (tick_st & TICK_SEND)
1500        {
1501            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1502            {
1503                oh_insert(&engine->conns_out, conn);
1504                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1505            }
1506        }
1507        if (tick_st & TICK_CLOSE)
1508        {
1509            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1510            engine_incref_conn(conn, LSCONN_CLOSING);
1511            if (conn->cn_flags & LSCONN_HASHED)
1512                remove_conn_from_hash(engine, conn);
1513        }
1514    }
1515
1516    if (lsquic_engine_has_unsent_packets(engine))
1517        send_packets_out(engine, &closed_conns);
1518
1519    while ((conn = STAILQ_FIRST(&closed_conns))) {
1520        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1521        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1522    }
1523
1524}
1525
1526
1527/* Return 0 if packet is being processed by a real connection, 1 if the
1528 * packet was processed, but not by a connection, and -1 on error.
1529 */
1530int
1531lsquic_engine_packet_in (lsquic_engine_t *engine,
1532    const unsigned char *packet_in_data, size_t packet_in_size,
1533    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1534    void *peer_ctx)
1535{
1536    struct packin_parse_state ppstate;
1537    lsquic_packet_in_t *packet_in;
1538
1539    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1540    {
1541        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1542            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1543        errno = E2BIG;
1544        return -1;
1545    }
1546
1547    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1548    if (!packet_in)
1549        return -1;
1550
1551    /* Library does not modify packet_in_data, it is not referenced after
1552     * this function returns and subsequent release of pi_data is guarded
1553     * by PI_OWN_DATA flag.
1554     */
1555    packet_in->pi_data = (unsigned char *) packet_in_data;
1556    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1557                                        engine->flags & ENG_SERVER, &ppstate))
1558    {
1559        LSQ_DEBUG("Cannot parse incoming packet's header");
1560        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1561        errno = EINVAL;
1562        return -1;
1563    }
1564
1565    packet_in->pi_received = lsquic_time_now();
1566    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1567    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1568                                                                    peer_ctx);
1569}
1570
1571
1572#if __GNUC__ && !defined(NDEBUG)
1573__attribute__((weak))
1574#endif
1575unsigned
1576lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1577{
1578    return engine->pub.enp_settings.es_versions;
1579}
1580
1581
1582int
1583lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1584{
1585    const lsquic_time_t *next_time;
1586    lsquic_time_t now;
1587
1588    next_time = attq_next_time(engine->attq);
1589    if (!next_time)
1590        return 0;
1591
1592    now = lsquic_time_now();
1593    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1594    return 1;
1595}
1596
1597
1598unsigned
1599lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1600{
1601    lsquic_time_t now;
1602    now = lsquic_time_now();
1603    if (from_now < 0)
1604        now -= from_now;
1605    else
1606        now += from_now;
1607    return attq_count_before(engine->attq, now);
1608}
1609
1610
1611