lsquic_engine.c revision 1b97e4af
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_engine.c - QUIC engine
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/queue.h>
14#include <sys/time.h>
15#include <time.h>
16#include <netinet/in.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21#include <netdb.h>
22
23
24
25#include "lsquic.h"
26#include "lsquic_types.h"
27#include "lsquic_alarmset.h"
28#include "lsquic_parse.h"
29#include "lsquic_packet_in.h"
30#include "lsquic_packet_out.h"
31#include "lsquic_senhist.h"
32#include "lsquic_rtt.h"
33#include "lsquic_cubic.h"
34#include "lsquic_pacer.h"
35#include "lsquic_send_ctl.h"
36#include "lsquic_set.h"
37#include "lsquic_conn_flow.h"
38#include "lsquic_sfcw.h"
39#include "lsquic_stream.h"
40#include "lsquic_conn.h"
41#include "lsquic_full_conn.h"
42#include "lsquic_util.h"
43#include "lsquic_qtags.h"
44#include "lsquic_handshake.h"
45#include "lsquic_mm.h"
46#include "lsquic_conn_hash.h"
47#include "lsquic_engine_public.h"
48#include "lsquic_eng_hist.h"
49#include "lsquic_ev_log.h"
50#include "lsquic_version.h"
51#include "lsquic_attq.h"
52
53#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
54#include "lsquic_logger.h"
55
56
57/* The batch of outgoing packets grows and shrinks dynamically */
58#define MAX_OUT_BATCH_SIZE 1024
59#define MIN_OUT_BATCH_SIZE 256
60#define INITIAL_OUT_BATCH_SIZE 512
61
62typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
63
64static void
65process_connections (struct lsquic_engine *engine, conn_iter_f iter);
66
67static void
68engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
69
70static lsquic_conn_t *
71engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
72                                        enum lsquic_conn_flags flag);
73
74static void
75force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
76
77/* Nested calls to LSQUIC are not supported */
78#define ENGINE_IN(e) do {                               \
79    assert(!((e)->pub.enp_flags & ENPUB_PROC));         \
80    (e)->pub.enp_flags |= ENPUB_PROC;                   \
81} while (0)
82
83#define ENGINE_OUT(e) do {                              \
84    assert((e)->pub.enp_flags & ENPUB_PROC);            \
85    (e)->pub.enp_flags &= ~ENPUB_PROC;                  \
86} while (0)
87
88/* A connection can be referenced from one of six places:
89 *
90 *   1. Connection hash: a connection starts its life in one of those.
91 *
92 *   2. Outgoing queue.
93 *
94 *   3. Incoming queue.
95 *
96 *   4. Pending RW Events queue.
97 *
98 *   5. Advisory Tick Time queue.
99 *
100 *   6. Closing connections queue.  This is a transient queue -- it only
101 *      exists for the duration of process_connections() function call.
102 *
103 * The idea is to destroy the connection when it is no longer referenced.
104 * For example, a connection tick may return TICK_SEND|TICK_CLOSE.  In
105 * that case, the connection is referenced from two places: (2) and (6).
106 * After its packets are sent, it is only referenced in (6), and at the
107 * end of the function call, when it is removed from (6), reference count
108 * goes to zero and the connection is destroyed.  If not all packets can
109 * be sent, at the end of the function call, the connection is referenced
110 * by (2) and will only be removed once all outgoing packets have been
111 * sent.
112 */
113#define CONN_REF_FLAGS  (LSCONN_HASHED          \
114                        |LSCONN_HAS_OUTGOING    \
115                        |LSCONN_HAS_INCOMING    \
116                        |LSCONN_RW_PENDING      \
117                        |LSCONN_CLOSING         \
118                        |LSCONN_ATTQ)
119
120
121struct out_heap_elem
122{
123    struct lsquic_conn  *ohe_conn;
124    lsquic_time_t        ohe_last_sent;
125};
126
127
128struct out_heap
129{
130    struct out_heap_elem    *oh_elems;
131    unsigned                 oh_nalloc,
132                             oh_nelem;
133};
134
135
136struct lsquic_engine
137{
138    struct lsquic_engine_public        pub;
139    enum {
140        ENG_SERVER      = LSENG_SERVER,
141        ENG_HTTP        = LSENG_HTTP,
142        ENG_COOLDOWN    = (1 <<  7),    /* Cooldown: no new connections */
143        ENG_PAST_DEADLINE
144                        = (1 <<  8),    /* Previous call to a processing
145                                         * function went past time threshold.
146                                         */
147#ifndef NDEBUG
148        ENG_DTOR        = (1 << 26),    /* Engine destructor */
149#endif
150    }                                  flags;
151    const struct lsquic_stream_if     *stream_if;
152    void                              *stream_if_ctx;
153    lsquic_packets_out_f               packets_out;
154    void                              *packets_out_ctx;
155    void                              *bad_handshake_ctx;
156    struct conn_hash                   full_conns;
157    TAILQ_HEAD(, lsquic_conn)          conns_in, conns_pend_rw;
158    struct out_heap                    conns_out;
159    /* Use a union because only one iterator is being used at any one time */
160    union {
161        struct {
162            /* This iterator does not have any state: it uses `conns_in' */
163        }           conn_in;
164        struct {
165            /* This iterator does not have any state: it uses `conns_pend_rw' */
166        }           rw_pend;
167        struct {
168            /* Iterator state to process connections in Advisory Tick Time
169             * queue.
170             */
171            lsquic_time_t   cutoff;
172        }           attq;
173        struct {
174            /* Iterator state to process all connections */
175        }           all;
176        struct {
177            lsquic_conn_t  *conn;
178        }           one;
179    }                                  iter_state;
180    struct eng_hist                    history;
181    unsigned                           batch_size;
182    unsigned                           time_until_desired_tick;
183    struct attq                       *attq;
184    lsquic_time_t                      proc_time;
185    /* Track time last time a packet was sent to give new connections
186     * priority lower than that of existing connections.
187     */
188    lsquic_time_t                      last_sent;
189    lsquic_time_t                      deadline;
190};
191
192
193#define OHE_PARENT(i) ((i - 1) / 2)
194#define OHE_LCHILD(i) (2 * i + 1)
195#define OHE_RCHILD(i) (2 * i + 2)
196
197
198static void
199heapify_out_heap (struct out_heap *heap, unsigned i)
200{
201    struct out_heap_elem el;
202    unsigned smallest;
203
204    assert(i < heap->oh_nelem);
205
206    if (OHE_LCHILD(i) < heap->oh_nelem)
207    {
208        if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
209                                    heap->oh_elems[ i ].ohe_last_sent)
210            smallest = OHE_LCHILD(i);
211        else
212            smallest = i;
213        if (OHE_RCHILD(i) < heap->oh_nelem &&
214            heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
215                                    heap->oh_elems[ smallest ].ohe_last_sent)
216            smallest = OHE_RCHILD(i);
217    }
218    else
219        smallest = i;
220
221    if (smallest != i)
222    {
223        el = heap->oh_elems[ smallest ];
224        heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
225        heap->oh_elems[ i ] = el;
226        heapify_out_heap(heap, smallest);
227    }
228}
229
230
231static void
232oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
233{
234    struct out_heap_elem el;
235    unsigned nalloc, i;
236
237    if (heap->oh_nelem == heap->oh_nalloc)
238    {
239        if (0 == heap->oh_nalloc)
240            nalloc = 4;
241        else
242            nalloc = heap->oh_nalloc * 2;
243        heap->oh_elems = realloc(heap->oh_elems,
244                                    nalloc * sizeof(heap->oh_elems[0]));
245        if (!heap->oh_elems)
246        {   /* Not much we can do here */
247            LSQ_ERROR("realloc failed");
248            return;
249        }
250        heap->oh_nalloc = nalloc;
251    }
252
253    heap->oh_elems[ heap->oh_nelem ].ohe_conn      = conn;
254    heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
255    ++heap->oh_nelem;
256
257    i = heap->oh_nelem - 1;
258    while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
259                                    heap->oh_elems[ i ].ohe_last_sent)
260    {
261        el = heap->oh_elems[ OHE_PARENT(i) ];
262        heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
263        heap->oh_elems[ i ] = el;
264        i = OHE_PARENT(i);
265    }
266}
267
268
269static struct lsquic_conn *
270oh_pop (struct out_heap *heap)
271{
272    struct lsquic_conn *conn;
273
274    assert(heap->oh_nelem);
275
276    conn = heap->oh_elems[0].ohe_conn;
277    --heap->oh_nelem;
278    if (heap->oh_nelem > 0)
279    {
280        heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
281        heapify_out_heap(heap, 0);
282    }
283
284    return conn;
285}
286
287
288void
289lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
290                             unsigned flags)
291{
292    memset(settings, 0, sizeof(*settings));
293    settings->es_versions        = LSQUIC_DF_VERSIONS;
294    if (flags & ENG_SERVER)
295    {
296        settings->es_cfcw        = LSQUIC_DF_CFCW_SERVER;
297        settings->es_sfcw        = LSQUIC_DF_SFCW_SERVER;
298        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_SERVER;
299    }
300    else
301    {
302        settings->es_cfcw        = LSQUIC_DF_CFCW_CLIENT;
303        settings->es_sfcw        = LSQUIC_DF_SFCW_CLIENT;
304        settings->es_support_srej= LSQUIC_DF_SUPPORT_SREJ_CLIENT;
305    }
306    settings->es_max_streams_in  = LSQUIC_DF_MAX_STREAMS_IN;
307    settings->es_idle_conn_to    = LSQUIC_DF_IDLE_CONN_TO;
308    settings->es_handshake_to    = LSQUIC_DF_HANDSHAKE_TO;
309    settings->es_silent_close    = LSQUIC_DF_SILENT_CLOSE;
310    settings->es_max_header_list_size
311                                 = LSQUIC_DF_MAX_HEADER_LIST_SIZE;
312    settings->es_ua              = LSQUIC_DF_UA;
313
314    settings->es_pdmd            = QTAG_X509;
315    settings->es_aead            = QTAG_AESG;
316    settings->es_kexs            = QTAG_C255;
317    settings->es_support_push    = LSQUIC_DF_SUPPORT_PUSH;
318    settings->es_support_tcid0   = LSQUIC_DF_SUPPORT_TCID0;
319    settings->es_support_nstp    = LSQUIC_DF_SUPPORT_NSTP;
320    settings->es_honor_prst      = LSQUIC_DF_HONOR_PRST;
321    settings->es_progress_check  = LSQUIC_DF_PROGRESS_CHECK;
322    settings->es_pendrw_check    = LSQUIC_DF_PENDRW_CHECK;
323    settings->es_rw_once         = LSQUIC_DF_RW_ONCE;
324    settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
325    settings->es_pace_packets    = LSQUIC_DF_PACE_PACKETS;
326}
327
328
329/* Note: if returning an error, err_buf must be valid if non-NULL */
330int
331lsquic_engine_check_settings (const struct lsquic_engine_settings *settings,
332                              unsigned flags,
333                              char *err_buf, size_t err_buf_sz)
334{
335    if (settings->es_cfcw < LSQUIC_MIN_FCW ||
336        settings->es_sfcw < LSQUIC_MIN_FCW)
337    {
338        if (err_buf)
339            snprintf(err_buf, err_buf_sz, "%s",
340                                            "flow control window set too low");
341        return -1;
342    }
343    if (0 == (settings->es_versions & LSQUIC_SUPPORTED_VERSIONS))
344    {
345        if (err_buf)
346            snprintf(err_buf, err_buf_sz, "%s",
347                        "No supported QUIC versions specified");
348        return -1;
349    }
350    if (settings->es_versions & ~LSQUIC_SUPPORTED_VERSIONS)
351    {
352        if (err_buf)
353            snprintf(err_buf, err_buf_sz, "%s",
354                        "one or more unsupported QUIC version is specified");
355        return -1;
356    }
357    return 0;
358}
359
360
361static void
362free_packet (void *ctx, unsigned char *packet_data)
363{
364    free(packet_data);
365}
366
367
368static void *
369malloc_buf (void *ctx, size_t size)
370{
371    return malloc(size);
372}
373
374
375static const struct lsquic_packout_mem_if stock_pmi =
376{
377    malloc_buf, (void(*)(void *, void *)) free_packet,
378};
379
380
381lsquic_engine_t *
382lsquic_engine_new (unsigned flags,
383                   const struct lsquic_engine_api *api)
384{
385    lsquic_engine_t *engine;
386    int tag_buf_len;
387    char err_buf[100];
388
389    if (!api->ea_packets_out)
390    {
391        LSQ_ERROR("packets_out callback is not specified");
392        return NULL;
393    }
394
395    if (api->ea_settings &&
396                0 != lsquic_engine_check_settings(api->ea_settings, flags,
397                                                    err_buf, sizeof(err_buf)))
398    {
399        LSQ_ERROR("cannot create engine: %s", err_buf);
400        return NULL;
401    }
402
403    engine = calloc(1, sizeof(*engine));
404    if (!engine)
405        return NULL;
406    if (0 != lsquic_mm_init(&engine->pub.enp_mm))
407    {
408        free(engine);
409        return NULL;
410    }
411    if (api->ea_settings)
412        engine->pub.enp_settings        = *api->ea_settings;
413    else
414        lsquic_engine_init_settings(&engine->pub.enp_settings, flags);
415    tag_buf_len = gen_ver_tags(engine->pub.enp_ver_tags_buf,
416                                    sizeof(engine->pub.enp_ver_tags_buf),
417                                    engine->pub.enp_settings.es_versions);
418    if (tag_buf_len <= 0)
419    {
420        LSQ_ERROR("cannot generate version tags buffer");
421        free(engine);
422        return NULL;
423    }
424    engine->pub.enp_ver_tags_len = tag_buf_len;
425
426    engine->flags           = flags;
427    engine->stream_if       = api->ea_stream_if;
428    engine->stream_if_ctx   = api->ea_stream_if_ctx;
429    engine->packets_out     = api->ea_packets_out;
430    engine->packets_out_ctx = api->ea_packets_out_ctx;
431    if (api->ea_pmi)
432    {
433        engine->pub.enp_pmi      = api->ea_pmi;
434        engine->pub.enp_pmi_ctx  = api->ea_pmi_ctx;
435    }
436    else
437    {
438        engine->pub.enp_pmi      = &stock_pmi;
439        engine->pub.enp_pmi_ctx  = NULL;
440    }
441    engine->pub.enp_engine = engine;
442    TAILQ_INIT(&engine->conns_in);
443    TAILQ_INIT(&engine->conns_pend_rw);
444    conn_hash_init(&engine->full_conns, ~0);
445    engine->attq = attq_create();
446    eng_hist_init(&engine->history);
447    engine->batch_size = INITIAL_OUT_BATCH_SIZE;
448
449
450    LSQ_INFO("instantiated engine");
451    return engine;
452}
453
454
455static void
456grow_batch_size (struct lsquic_engine *engine)
457{
458    engine->batch_size <<= engine->batch_size < MAX_OUT_BATCH_SIZE;
459}
460
461
462static void
463shrink_batch_size (struct lsquic_engine *engine)
464{
465    engine->batch_size >>= engine->batch_size > MIN_OUT_BATCH_SIZE;
466}
467
468
469/* Wrapper to make sure LSCONN_NEVER_PEND_RW gets set */
470static void
471destroy_conn (lsquic_conn_t *conn)
472{
473    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
474    conn->cn_if->ci_destroy(conn);
475}
476
477
478static lsquic_conn_t *
479new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
480                      unsigned short max_packet_size)
481{
482    lsquic_conn_t *conn;
483    unsigned flags;
484    flags = engine->flags & (ENG_SERVER|ENG_HTTP);
485    conn = full_conn_client_new(&engine->pub, engine->stream_if,
486                    engine->stream_if_ctx, flags, hostname, max_packet_size);
487    if (!conn)
488        return NULL;
489    if (0 != conn_hash_add_new(&engine->full_conns, conn))
490    {
491        LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
492            conn->cn_cid);
493        destroy_conn(conn);
494        return NULL;
495    }
496    assert(!(conn->cn_flags &
497        (CONN_REF_FLAGS
498         & ~LSCONN_RW_PENDING /* This flag may be set as effect of user
499                                 callbacks */
500                             )));
501    conn->cn_flags |= LSCONN_HASHED;
502    return conn;
503}
504
505
506static lsquic_conn_t *
507find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
508         struct packin_parse_state *ppstate, const struct sockaddr *sa_peer,
509         void *peer_ctx)
510{
511    lsquic_conn_t *conn;
512
513    if (lsquic_packet_in_is_prst(packet_in)
514                                && !engine->pub.enp_settings.es_honor_prst)
515    {
516        LSQ_DEBUG("public reset packet: discarding");
517        return NULL;
518    }
519
520    if (!(packet_in->pi_flags & PI_CONN_ID))
521    {
522        LSQ_DEBUG("packet header does not have connection ID: discarding");
523        return NULL;
524    }
525
526    conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id, NULL);
527    if (conn)
528    {
529        conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
530        return conn;
531    }
532
533    return conn;
534}
535
536
537static void
538add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
539                                                        enum rw_reason reason)
540{
541    int hist_idx;
542
543    TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
544    engine_incref_conn(conn, LSCONN_RW_PENDING);
545
546    hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
547    conn->cn_rw_hist_buf[ hist_idx ] = reason;
548    ++conn->cn_rw_hist_idx;
549
550    if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
551        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
552            "rw_hist: %.*s", (char) reason,
553            (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
554    else
555        EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
556                                                                (char) reason);
557}
558
559
560#if !defined(NDEBUG) && __GNUC__
561__attribute__((weak))
562#endif
563void
564lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
565                                    lsquic_conn_t *conn, enum rw_reason reason)
566{
567    if (0 == (enpub->enp_flags & ENPUB_PROC) &&
568        0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
569    {
570        lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
571        add_conn_to_pend_rw(engine, conn, reason);
572    }
573}
574
575
576void
577lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
578                                lsquic_conn_t *conn, lsquic_time_t tick_time)
579{
580    lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
581    /* Instead of performing an update, we simply remove the connection from
582     * the queue and add it back.  This should not happen in at the time of
583     * this writing.
584     */
585    if (conn->cn_flags & LSCONN_ATTQ)
586    {
587        attq_remove(engine->attq, conn);
588        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
589    }
590    if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
591                        0 == attq_maybe_add(engine->attq, conn, tick_time))
592        engine_incref_conn(conn, LSCONN_ATTQ);
593}
594
595
596static void
597update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
598                                                            int progress_made)
599{
600    rw_hist_idx_t hist_idx;
601    const unsigned char *empty;
602    const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
603
604    if (!pendrw_check)
605        return;
606
607    /* Convert previous entry to uppercase: */
608    hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
609    conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
610
611    LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
612    if (progress_made)
613    {
614        conn->cn_noprogress_count = 0;
615        return;
616    }
617
618    EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
619                                                                "no progress");
620    ++conn->cn_noprogress_count;
621    if (conn->cn_noprogress_count <= pendrw_check)
622        return;
623
624    conn->cn_flags |= LSCONN_NEVER_PEND_RW;
625    empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
626                                            sizeof(conn->cn_rw_hist_buf));
627    if (empty)
628        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
629            "(rw_hist: %.*s): will not put it onto Pend RW queue again",
630            conn->cn_cid, conn->cn_noprogress_count,
631            (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
632    else
633    {
634        hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
635        LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
636            "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
637            conn->cn_cid, conn->cn_noprogress_count,
638            /* First part of history: */
639            (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
640                                            conn->cn_rw_hist_buf + hist_idx,
641            /* Second part of history: */
642            hist_idx, conn->cn_rw_hist_buf);
643    }
644}
645
646
647/* Return 0 if packet is being processed by a connections, otherwise return 1 */
648static int
649process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
650       struct packin_parse_state *ppstate, const struct sockaddr *sa_local,
651       const struct sockaddr *sa_peer, void *peer_ctx)
652{
653    lsquic_conn_t *conn;
654
655    conn = find_or_create_conn(engine, packet_in, ppstate, sa_peer, peer_ctx);
656    if (!conn)
657    {
658        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
659        return 1;
660    }
661
662    if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
663        TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
664        engine_incref_conn(conn, LSCONN_HAS_INCOMING);
665    }
666    lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
667    lsquic_packet_in_upref(packet_in);
668    conn->cn_peer_ctx = peer_ctx;
669    conn->cn_if->ci_packet_in(conn, packet_in);
670    lsquic_packet_in_put(&engine->pub.enp_mm, packet_in);
671    return 0;
672}
673
674
675static int
676conn_attq_expired (const struct lsquic_engine *engine,
677                                                const lsquic_conn_t *conn)
678{
679    assert(conn->cn_attq_elem);
680    return lsquic_conn_adv_time(conn) < engine->proc_time;
681}
682
683
684/* Iterator for connections with incoming packets */
685static lsquic_conn_t *
686conn_iter_next_incoming (struct lsquic_engine *engine)
687{
688    enum lsquic_conn_flags addl_flags;
689    lsquic_conn_t *conn;
690    while ((conn = TAILQ_FIRST(&engine->conns_in)))
691    {
692        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
693        if (conn->cn_flags & LSCONN_RW_PENDING)
694        {
695            TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
696            EV_LOG_CONN_EVENT(conn->cn_cid,
697                "removed from pending RW queue (processing incoming)");
698        }
699        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
700        {
701            addl_flags = LSCONN_ATTQ;
702            attq_remove(engine->attq, conn);
703        }
704        else
705            addl_flags = 0;
706        conn = engine_decref_conn(engine, conn,
707                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
708        if (conn)
709            break;
710    }
711    return conn;
712}
713
714
715/* Iterator for connections with that have pending read/write events */
716static lsquic_conn_t *
717conn_iter_next_rw_pend (struct lsquic_engine *engine)
718{
719    enum lsquic_conn_flags addl_flags;
720    lsquic_conn_t *conn;
721    while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
722    {
723        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
724        EV_LOG_CONN_EVENT(conn->cn_cid,
725            "removed from pending RW queue (processing pending RW conns)");
726        if (conn->cn_flags & LSCONN_HAS_INCOMING)
727            TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
728        if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
729        {
730            addl_flags = LSCONN_ATTQ;
731            attq_remove(engine->attq, conn);
732        }
733        else
734            addl_flags = 0;
735        conn = engine_decref_conn(engine, conn,
736                        LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
737        if (conn)
738            break;
739    }
740    return conn;
741}
742
743
744void
745lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
746{
747    LSQ_DEBUG("process connections with incoming packets");
748    ENGINE_IN(engine);
749    process_connections(engine, conn_iter_next_incoming);
750    assert(TAILQ_EMPTY(&engine->conns_in));
751    ENGINE_OUT(engine);
752}
753
754
755int
756lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
757{
758    return !(engine->flags & ENG_PAST_DEADLINE)
759        && !TAILQ_EMPTY(&engine->conns_pend_rw);
760}
761
762
763void
764lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
765{
766    LSQ_DEBUG("process connections with pending RW events");
767    ENGINE_IN(engine);
768    process_connections(engine, conn_iter_next_rw_pend);
769    ENGINE_OUT(engine);
770}
771
772
773void
774lsquic_engine_destroy (lsquic_engine_t *engine)
775{
776    lsquic_conn_t *conn;
777
778    LSQ_DEBUG("destroying engine");
779#ifndef NDEBUG
780    engine->flags |= ENG_DTOR;
781#endif
782
783    while (engine->conns_out.oh_nelem > 0)
784    {
785        --engine->conns_out.oh_nelem;
786        conn = engine->conns_out.oh_elems[
787                                engine->conns_out.oh_nelem ].ohe_conn;
788        assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
789        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
790    }
791
792    for (conn = conn_hash_first(&engine->full_conns); conn;
793                            conn = conn_hash_next(&engine->full_conns))
794        force_close_conn(engine, conn);
795    conn_hash_cleanup(&engine->full_conns);
796
797
798    attq_destroy(engine->attq);
799
800    assert(0 == engine->conns_out.oh_nelem);
801    assert(TAILQ_EMPTY(&engine->conns_pend_rw));
802    lsquic_mm_cleanup(&engine->pub.enp_mm);
803    free(engine->conns_out.oh_elems);
804    free(engine);
805}
806
807
808#if __GNUC__
809__attribute__((nonnull(3)))
810#endif
811static lsquic_conn_t *
812remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
813                                lsquic_conn_t *conn, const char *reason)
814{
815    assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
816    if (conn->cn_flags & LSCONN_HAS_INCOMING)
817        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
818    if (conn->cn_flags & LSCONN_RW_PENDING)
819    {
820        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
821        EV_LOG_CONN_EVENT(conn->cn_cid,
822                        "removed from pending RW queue (%s)", reason);
823    }
824    conn = engine_decref_conn(engine, conn,
825                        LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
826    assert(conn);
827    return conn;
828}
829
830
831static lsquic_conn_t *
832conn_iter_next_one (lsquic_engine_t *engine)
833{
834    lsquic_conn_t *conn = engine->iter_state.one.conn;
835    if (conn)
836    {
837        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
838            conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
839        if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
840                                            conn_attq_expired(engine, conn))
841        {
842            attq_remove(engine->attq, conn);
843            conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
844        }
845        engine->iter_state.one.conn = NULL;
846    }
847    return conn;
848}
849
850
851int
852lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
853                       void *conn_ctx, const char *hostname,
854                       unsigned short max_packet_size)
855{
856    lsquic_conn_t *conn;
857
858    if (engine->flags & ENG_SERVER)
859    {
860        LSQ_ERROR("`%s' must only be called in client mode", __func__);
861        return -1;
862    }
863
864    if (0 == max_packet_size)
865    {
866        switch (peer_sa->sa_family)
867        {
868        case AF_INET:
869            max_packet_size = QUIC_MAX_IPv4_PACKET_SZ;
870            break;
871        default:
872            max_packet_size = QUIC_MAX_IPv6_PACKET_SZ;
873            break;
874        }
875    }
876
877    conn = new_full_conn_client(engine, hostname, max_packet_size);
878    if (!conn)
879        return -1;
880    lsquic_conn_record_peer_sa(conn, peer_sa);
881    conn->cn_peer_ctx = conn_ctx;
882    engine->iter_state.one.conn = conn;
883    process_connections(engine, conn_iter_next_one);
884    return 0;
885}
886
887
888static void
889remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
890{
891        conn_hash_remove(&engine->full_conns, conn);
892    (void) engine_decref_conn(engine, conn, LSCONN_HASHED);
893}
894
895
896static void
897refflags2str (enum lsquic_conn_flags flags, char s[7])
898{
899    *s = 'C'; s += !!(flags & LSCONN_CLOSING);
900    *s = 'H'; s += !!(flags & LSCONN_HASHED);
901    *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
902    *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
903    *s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
904    *s = 'A'; s += !!(flags & LSCONN_ATTQ);
905    *s = '\0';
906}
907
908
909static void
910engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
911{
912    char str[7];
913    assert(flag & CONN_REF_FLAGS);
914    assert(!(conn->cn_flags & flag));
915    conn->cn_flags |= flag;
916    LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
917                            (refflags2str(conn->cn_flags, str), str));
918}
919
920
921static lsquic_conn_t *
922engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
923                                        enum lsquic_conn_flags flags)
924{
925    char str[7];
926    assert(flags & CONN_REF_FLAGS);
927    assert(conn->cn_flags & flags);
928#ifndef NDEBUG
929    if (flags & LSCONN_CLOSING)
930        assert(0 == (conn->cn_flags & LSCONN_HASHED));
931#endif
932    conn->cn_flags &= ~flags;
933    LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
934                            (refflags2str(conn->cn_flags, str), str));
935    if (0 == (conn->cn_flags & CONN_REF_FLAGS))
936    {
937            eng_hist_inc(&engine->history, 0, sl_del_full_conns);
938        destroy_conn(conn);
939        return NULL;
940    }
941    else
942        return conn;
943}
944
945
946/* This is not a general-purpose function.  Only call from engine dtor. */
947static void
948force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
949{
950    assert(engine->flags & ENG_DTOR);
951    const enum lsquic_conn_flags flags = conn->cn_flags;
952    assert(conn->cn_flags & CONN_REF_FLAGS);
953    assert(!(flags & LSCONN_HAS_OUTGOING));  /* Should be removed already */
954    assert(!(flags & LSCONN_CLOSING));  /* It is in transient queue? */
955    if (flags & LSCONN_HAS_INCOMING)
956    {
957        TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
958        (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
959    }
960    if (flags & LSCONN_RW_PENDING)
961    {
962        TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
963        EV_LOG_CONN_EVENT(conn->cn_cid,
964            "removed from pending RW queue (engine destruction)");
965        (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
966    }
967    if (flags & LSCONN_ATTQ)
968        attq_remove(engine->attq, conn);
969    if (flags & LSCONN_HASHED)
970        remove_conn_from_hash(engine, conn);
971}
972
973
974/* Iterator for all connections.
975 * Returned connections are removed from the Incoming, Pending RW Event,
976 * and Advisory Tick Time queues if necessary.
977 */
978static lsquic_conn_t *
979conn_iter_next_all (struct lsquic_engine *engine)
980{
981    lsquic_conn_t *conn;
982
983    conn = conn_hash_next(&engine->full_conns);
984
985    if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
986        conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
987    if (conn && (conn->cn_flags & LSCONN_ATTQ)
988                                        && conn_attq_expired(engine, conn))
989    {
990        attq_remove(engine->attq, conn);
991        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
992    }
993
994    return conn;
995}
996
997
998static lsquic_conn_t *
999conn_iter_next_attq (struct lsquic_engine *engine)
1000{
1001    lsquic_conn_t *conn;
1002
1003    conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
1004    if (conn)
1005    {
1006        assert(conn->cn_flags & LSCONN_ATTQ);
1007        if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
1008            conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
1009        conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
1010    }
1011
1012    return conn;
1013}
1014
1015
1016void
1017lsquic_engine_proc_all (lsquic_engine_t *engine)
1018{
1019    ENGINE_IN(engine);
1020    /* We poke each connection every time as initial implementation.  If it
1021     * proves to be too inefficient, we will need to figure out
1022     *          a) when to stop processing; and
1023     *          b) how to remember state between calls.
1024     */
1025    conn_hash_reset_iter(&engine->full_conns);
1026    process_connections(engine, conn_iter_next_all);
1027    ENGINE_OUT(engine);
1028}
1029
1030
1031void
1032lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
1033{
1034    lsquic_time_t prev_min, cutoff;
1035
1036    LSQ_DEBUG("process connections in attq");
1037    ENGINE_IN(engine);
1038    cutoff = lsquic_time_now();
1039    prev_min = attq_set_min(engine->attq, cutoff);  /* Prevent infinite loop */
1040    engine->iter_state.attq.cutoff = cutoff;
1041    process_connections(engine, conn_iter_next_attq);
1042    attq_set_min(engine->attq, prev_min);           /* Restore previos value */
1043    ENGINE_OUT(engine);
1044}
1045
1046
1047static int
1048generate_header (const lsquic_packet_out_t *packet_out,
1049                 const struct parse_funcs *pf, lsquic_cid_t cid,
1050                 unsigned char *buf, size_t bufsz)
1051{
1052    return pf->pf_gen_reg_pkt_header(buf, bufsz,
1053        packet_out->po_flags & PO_CONN_ID ? &cid                    : NULL,
1054        packet_out->po_flags & PO_VERSION ? &packet_out->po_ver_tag : NULL,
1055        packet_out->po_flags & PO_NONCE   ? packet_out->po_nonce    : NULL,
1056        packet_out->po_packno, lsquic_packet_out_packno_bits(packet_out));
1057}
1058
1059
1060static ssize_t
1061really_encrypt_packet (const lsquic_conn_t *conn,
1062                       const lsquic_packet_out_t *packet_out,
1063                       unsigned char *buf, size_t bufsz)
1064{
1065    int enc, header_sz, is_hello_packet;
1066    size_t packet_sz;
1067    unsigned char header_buf[QUIC_MAX_PUBHDR_SZ];
1068
1069    header_sz = generate_header(packet_out, conn->cn_pf, conn->cn_cid,
1070                                            header_buf, sizeof(header_buf));
1071    if (header_sz < 0)
1072        return -1;
1073
1074    is_hello_packet = !!(packet_out->po_flags & PO_HELLO);
1075    enc = lsquic_enc(conn->cn_enc_session, conn->cn_version, 0,
1076                packet_out->po_packno, header_buf, header_sz,
1077                packet_out->po_data, packet_out->po_data_sz,
1078                buf, bufsz, &packet_sz, is_hello_packet);
1079    if (0 == enc)
1080    {
1081        LSQ_DEBUG("encrypted packet %"PRIu64"; plaintext is %u bytes, "
1082            "ciphertext is %zd bytes",
1083            packet_out->po_packno,
1084            lsquic_po_header_length(packet_out->po_flags) +
1085                                                packet_out->po_data_sz,
1086            packet_sz);
1087        return packet_sz;
1088    }
1089    else
1090        return -1;
1091}
1092
1093
1094static enum { ENCPA_OK, ENCPA_NOMEM, ENCPA_BADCRYPT, }
1095encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
1096                                            lsquic_packet_out_t *packet_out)
1097{
1098    ssize_t enc_sz;
1099    size_t bufsz;
1100    unsigned char *buf;
1101
1102    bufsz = lsquic_po_header_length(packet_out->po_flags) +
1103                                packet_out->po_data_sz + QUIC_PACKET_HASH_SZ;
1104    buf = engine->pub.enp_pmi->pmi_allocate(engine->pub.enp_pmi_ctx, bufsz);
1105    if (!buf)
1106    {
1107        LSQ_DEBUG("could not allocate memory for outgoing packet of size %zd",
1108                                                                        bufsz);
1109        return ENCPA_NOMEM;
1110    }
1111
1112        enc_sz = really_encrypt_packet(conn, packet_out, buf, bufsz);
1113
1114    if (enc_sz < 0)
1115    {
1116        engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx, buf);
1117        return ENCPA_BADCRYPT;
1118    }
1119
1120    packet_out->po_enc_data    = buf;
1121    packet_out->po_enc_data_sz = enc_sz;
1122    packet_out->po_flags |= PO_ENCRYPTED;
1123
1124    return ENCPA_OK;
1125}
1126
1127
1128struct out_batch
1129{
1130    lsquic_conn_t           *conns  [MAX_OUT_BATCH_SIZE];
1131    lsquic_packet_out_t     *packets[MAX_OUT_BATCH_SIZE];
1132    struct lsquic_out_spec   outs   [MAX_OUT_BATCH_SIZE];
1133};
1134
1135
1136STAILQ_HEAD(closed_conns, lsquic_conn);
1137
1138
1139struct conns_out_iter
1140{
1141    struct out_heap            *coi_heap;
1142    TAILQ_HEAD(, lsquic_conn)   coi_active_list,
1143                                coi_inactive_list;
1144    lsquic_conn_t              *coi_next;
1145#ifndef NDEBUG
1146    lsquic_time_t               coi_last_sent;
1147#endif
1148};
1149
1150
1151static void
1152coi_init (struct conns_out_iter *iter, struct lsquic_engine *engine)
1153{
1154    iter->coi_heap = &engine->conns_out;
1155    iter->coi_next = NULL;
1156    TAILQ_INIT(&iter->coi_active_list);
1157    TAILQ_INIT(&iter->coi_inactive_list);
1158#ifndef NDEBUG
1159    iter->coi_last_sent = 0;
1160#endif
1161}
1162
1163
1164static lsquic_conn_t *
1165coi_next (struct conns_out_iter *iter)
1166{
1167    lsquic_conn_t *conn;
1168
1169    if (iter->coi_heap->oh_nelem > 0)
1170    {
1171        conn = oh_pop(iter->coi_heap);
1172        TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1173        conn->cn_flags |= LSCONN_COI_ACTIVE;
1174#ifndef NDEBUG
1175        if (iter->coi_last_sent)
1176            assert(iter->coi_last_sent <= conn->cn_last_sent);
1177        iter->coi_last_sent = conn->cn_last_sent;
1178#endif
1179        return conn;
1180    }
1181    else if (!TAILQ_EMPTY(&iter->coi_active_list))
1182    {
1183        conn = iter->coi_next;
1184        if (!conn)
1185            conn = TAILQ_FIRST(&iter->coi_active_list);
1186        if (conn)
1187            iter->coi_next = TAILQ_NEXT(conn, cn_next_out);
1188        return conn;
1189    }
1190    else
1191        return NULL;
1192}
1193
1194
1195static void
1196coi_deactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1197{
1198    if (!(conn->cn_flags & LSCONN_EVANESCENT))
1199    {
1200        assert(!TAILQ_EMPTY(&iter->coi_active_list));
1201        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1202        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1203        TAILQ_INSERT_TAIL(&iter->coi_inactive_list, conn, cn_next_out);
1204        conn->cn_flags |= LSCONN_COI_INACTIVE;
1205    }
1206}
1207
1208
1209static void
1210coi_remove (struct conns_out_iter *iter, lsquic_conn_t *conn)
1211{
1212    assert(conn->cn_flags & LSCONN_COI_ACTIVE);
1213    if (conn->cn_flags & LSCONN_COI_ACTIVE)
1214    {
1215        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1216        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1217    }
1218}
1219
1220
1221static void
1222coi_reactivate (struct conns_out_iter *iter, lsquic_conn_t *conn)
1223{
1224    assert(conn->cn_flags & LSCONN_COI_INACTIVE);
1225    TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1226    conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1227    TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
1228    conn->cn_flags |= LSCONN_COI_ACTIVE;
1229}
1230
1231
1232static void
1233coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
1234{
1235    lsquic_conn_t *conn;
1236    while ((conn = TAILQ_FIRST(&iter->coi_active_list)))
1237    {
1238        TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
1239        conn->cn_flags &= ~LSCONN_COI_ACTIVE;
1240        oh_insert(iter->coi_heap, conn);
1241    }
1242    while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
1243    {
1244        TAILQ_REMOVE(&iter->coi_inactive_list, conn, cn_next_out);
1245        conn->cn_flags &= ~LSCONN_COI_INACTIVE;
1246        (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
1247    }
1248}
1249
1250
1251static unsigned
1252send_batch (lsquic_engine_t *engine, struct conns_out_iter *conns_iter,
1253                  struct out_batch *batch, unsigned n_to_send)
1254{
1255    int n_sent, i;
1256    lsquic_time_t now;
1257
1258    /* Set sent time before the write to avoid underestimating RTT */
1259    now = lsquic_time_now();
1260    for (i = 0; i < (int) n_to_send; ++i)
1261        batch->packets[i]->po_sent = now;
1262    n_sent = engine->packets_out(engine->packets_out_ctx, batch->outs,
1263                                                                n_to_send);
1264    if (n_sent >= 0)
1265        LSQ_DEBUG("packets out returned %d (out of %u)", n_sent, n_to_send);
1266    else
1267    {
1268        LSQ_DEBUG("packets out returned an error: %s", strerror(errno));
1269        n_sent = 0;
1270    }
1271    if (n_sent > 0)
1272        engine->last_sent = now + n_sent;
1273    for (i = 0; i < n_sent; ++i)
1274    {
1275        eng_hist_inc(&engine->history, now, sl_packets_out);
1276        EV_LOG_PACKET_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1277        batch->conns[i]->cn_if->ci_packet_sent(batch->conns[i],
1278                                                    batch->packets[i]);
1279        /* `i' is added to maintain relative order */
1280        batch->conns[i]->cn_last_sent = now + i;
1281        /* Release packet out buffer as soon as the packet is sent
1282         * successfully.  If not successfully sent, we hold on to
1283         * this buffer until the packet sending is attempted again
1284         * or until it times out and regenerated.
1285         */
1286        if (batch->packets[i]->po_flags & PO_ENCRYPTED)
1287        {
1288            batch->packets[i]->po_flags &= ~PO_ENCRYPTED;
1289            engine->pub.enp_pmi->pmi_release(engine->pub.enp_pmi_ctx,
1290                                                batch->packets[i]->po_enc_data);
1291            batch->packets[i]->po_enc_data = NULL;  /* JIC */
1292        }
1293    }
1294    if (LSQ_LOG_ENABLED_EXT(LSQ_LOG_DEBUG, LSQLM_EVENT))
1295        for ( ; i < (int) n_to_send; ++i)
1296            EV_LOG_PACKET_NOT_SENT(batch->conns[i]->cn_cid, batch->packets[i]);
1297    /* Return packets to the connection in reverse order so that the packet
1298     * ordering is maintained.
1299     */
1300    for (i = (int) n_to_send - 1; i >= n_sent; --i)
1301    {
1302        batch->conns[i]->cn_if->ci_packet_not_sent(batch->conns[i],
1303                                                    batch->packets[i]);
1304        if (!(batch->conns[i]->cn_flags & (LSCONN_COI_ACTIVE|LSCONN_EVANESCENT)))
1305            coi_reactivate(conns_iter, batch->conns[i]);
1306    }
1307    return n_sent;
1308}
1309
1310
1311/* Return 1 if went past deadline, 0 otherwise */
1312static int
1313check_deadline (lsquic_engine_t *engine)
1314{
1315    if (engine->pub.enp_settings.es_proc_time_thresh &&
1316                                lsquic_time_now() > engine->deadline)
1317    {
1318        LSQ_INFO("went past threshold of %u usec, stop sending",
1319                            engine->pub.enp_settings.es_proc_time_thresh);
1320        engine->flags |= ENG_PAST_DEADLINE;
1321        return 1;
1322    }
1323    else
1324        return 0;
1325}
1326
1327
1328static void
1329send_packets_out (struct lsquic_engine *engine,
1330                  struct closed_conns *closed_conns)
1331{
1332    unsigned n, w, n_sent, n_batches_sent;
1333    lsquic_packet_out_t *packet_out;
1334    lsquic_conn_t *conn;
1335    struct out_batch batch;
1336    struct conns_out_iter conns_iter;
1337    int shrink, deadline_exceeded;
1338
1339    coi_init(&conns_iter, engine);
1340    n_batches_sent = 0;
1341    n_sent = 0, n = 0;
1342    shrink = 0;
1343    deadline_exceeded = 0;
1344
1345    while ((conn = coi_next(&conns_iter)))
1346    {
1347        packet_out = conn->cn_if->ci_next_packet_to_send(conn);
1348        if (!packet_out) {
1349            LSQ_DEBUG("batched all outgoing packets for conn %"PRIu64,
1350                                                            conn->cn_cid);
1351            coi_deactivate(&conns_iter, conn);
1352            continue;
1353        }
1354        if (!(packet_out->po_flags & (PO_ENCRYPTED|PO_NOENCRYPT)))
1355        {
1356            switch (encrypt_packet(engine, conn, packet_out))
1357            {
1358            case ENCPA_NOMEM:
1359                /* Send what we have and wait for a more opportune moment */
1360                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1361                goto end_for;
1362            case ENCPA_BADCRYPT:
1363                /* This is pretty bad: close connection immediately */
1364                conn->cn_if->ci_packet_not_sent(conn, packet_out);
1365                LSQ_INFO("conn %"PRIu64" has unsendable packets", conn->cn_cid);
1366                if (!(conn->cn_flags & LSCONN_EVANESCENT))
1367                {
1368                    if (!(conn->cn_flags & LSCONN_CLOSING))
1369                    {
1370                        STAILQ_INSERT_TAIL(closed_conns, conn, cn_next_closed_conn);
1371                        engine_incref_conn(conn, LSCONN_CLOSING);
1372                        if (conn->cn_flags & LSCONN_HASHED)
1373                            remove_conn_from_hash(engine, conn);
1374                    }
1375                    coi_remove(&conns_iter, conn);
1376                }
1377                continue;
1378            case ENCPA_OK:
1379                break;
1380            }
1381        }
1382        LSQ_DEBUG("batched packet %"PRIu64" for connection %"PRIu64,
1383                                        packet_out->po_packno, conn->cn_cid);
1384        assert(conn->cn_flags & LSCONN_HAS_PEER_SA);
1385        if (packet_out->po_flags & PO_ENCRYPTED)
1386        {
1387            batch.outs[n].buf     = packet_out->po_enc_data;
1388            batch.outs[n].sz      = packet_out->po_enc_data_sz;
1389        }
1390        else
1391        {
1392            batch.outs[n].buf     = packet_out->po_data;
1393            batch.outs[n].sz      = packet_out->po_data_sz;
1394        }
1395        batch.outs   [n].peer_ctx = conn->cn_peer_ctx;
1396        batch.outs   [n].local_sa = (struct sockaddr *) conn->cn_local_addr;
1397        batch.outs   [n].dest_sa  = (struct sockaddr *) conn->cn_peer_addr;
1398        batch.conns  [n]          = conn;
1399        batch.packets[n]          = packet_out;
1400        ++n;
1401        if (n == engine->batch_size)
1402        {
1403            n = 0;
1404            w = send_batch(engine, &conns_iter, &batch, engine->batch_size);
1405            ++n_batches_sent;
1406            n_sent += w;
1407            if (w < engine->batch_size)
1408            {
1409                shrink = 1;
1410                break;
1411            }
1412            deadline_exceeded = check_deadline(engine);
1413            if (deadline_exceeded)
1414                break;
1415            grow_batch_size(engine);
1416        }
1417    }
1418  end_for:
1419
1420    if (n > 0) {
1421        w = send_batch(engine, &conns_iter, &batch, n);
1422        n_sent += w;
1423        shrink = w < n;
1424        ++n_batches_sent;
1425        deadline_exceeded = check_deadline(engine);
1426    }
1427
1428    if (shrink)
1429        shrink_batch_size(engine);
1430    else if (n_batches_sent > 1 && !deadline_exceeded)
1431        grow_batch_size(engine);
1432
1433    coi_reheap(&conns_iter, engine);
1434
1435    LSQ_DEBUG("%s: sent %u packet%.*s", __func__, n_sent, n_sent != 1, "s");
1436}
1437
1438
1439int
1440lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
1441{
1442    return !(engine->flags & ENG_PAST_DEADLINE)
1443        && (    engine->conns_out.oh_nelem > 0
1444           )
1445    ;
1446}
1447
1448
1449static void
1450reset_deadline (lsquic_engine_t *engine, lsquic_time_t now)
1451{
1452    engine->deadline = now + engine->pub.enp_settings.es_proc_time_thresh;
1453    engine->flags &= ~ENG_PAST_DEADLINE;
1454}
1455
1456
1457/* TODO: this is a user-facing function, account for load */
1458void
1459lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
1460{
1461    lsquic_conn_t *conn;
1462    struct closed_conns closed_conns;
1463
1464    STAILQ_INIT(&closed_conns);
1465    reset_deadline(engine, lsquic_time_now());
1466
1467    send_packets_out(engine, &closed_conns);
1468
1469    while ((conn = STAILQ_FIRST(&closed_conns))) {
1470        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1471        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1472    }
1473
1474}
1475
1476
1477static void
1478process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
1479{
1480    lsquic_conn_t *conn;
1481    enum tick_st tick_st;
1482    lsquic_time_t now = lsquic_time_now();
1483    struct closed_conns closed_conns;
1484
1485    engine->proc_time = now;
1486    eng_hist_tick(&engine->history, now);
1487
1488    STAILQ_INIT(&closed_conns);
1489    reset_deadline(engine, now);
1490
1491    while ((conn = next_conn(engine)))
1492    {
1493        tick_st = conn->cn_if->ci_tick(conn, now);
1494        if (conn_iter_next_rw_pend == next_conn)
1495            update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
1496        if (tick_st & TICK_SEND)
1497        {
1498            if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
1499            {
1500                oh_insert(&engine->conns_out, conn);
1501                engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
1502            }
1503        }
1504        if (tick_st & TICK_CLOSE)
1505        {
1506            STAILQ_INSERT_TAIL(&closed_conns, conn, cn_next_closed_conn);
1507            engine_incref_conn(conn, LSCONN_CLOSING);
1508            if (conn->cn_flags & LSCONN_HASHED)
1509                remove_conn_from_hash(engine, conn);
1510        }
1511    }
1512
1513    if (lsquic_engine_has_unsent_packets(engine))
1514        send_packets_out(engine, &closed_conns);
1515
1516    while ((conn = STAILQ_FIRST(&closed_conns))) {
1517        STAILQ_REMOVE_HEAD(&closed_conns, cn_next_closed_conn);
1518        (void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
1519    }
1520
1521}
1522
1523
1524/* Return 0 if packet is being processed by a real connection, 1 if the
1525 * packet was processed, but not by a connection, and -1 on error.
1526 */
1527int
1528lsquic_engine_packet_in (lsquic_engine_t *engine,
1529    const unsigned char *packet_in_data, size_t packet_in_size,
1530    const struct sockaddr *sa_local, const struct sockaddr *sa_peer,
1531    void *peer_ctx)
1532{
1533    struct packin_parse_state ppstate;
1534    lsquic_packet_in_t *packet_in;
1535
1536    if (packet_in_size > QUIC_MAX_PACKET_SZ)
1537    {
1538        LSQ_DEBUG("Cannot handle packet_in_size(%zd) > %d packet incoming "
1539            "packet's header", packet_in_size, QUIC_MAX_PACKET_SZ);
1540        errno = E2BIG;
1541        return -1;
1542    }
1543
1544    packet_in = lsquic_mm_get_packet_in(&engine->pub.enp_mm);
1545    if (!packet_in)
1546        return -1;
1547
1548    /* Library does not modify packet_in_data, it is not referenced after
1549     * this function returns and subsequent release of pi_data is guarded
1550     * by PI_OWN_DATA flag.
1551     */
1552    packet_in->pi_data = (unsigned char *) packet_in_data;
1553    if (0 != parse_packet_in_begin(packet_in, packet_in_size,
1554                                        engine->flags & ENG_SERVER, &ppstate))
1555    {
1556        LSQ_DEBUG("Cannot parse incoming packet's header");
1557        lsquic_mm_put_packet_in(&engine->pub.enp_mm, packet_in);
1558        errno = EINVAL;
1559        return -1;
1560    }
1561
1562    packet_in->pi_received = lsquic_time_now();
1563    eng_hist_inc(&engine->history, packet_in->pi_received, sl_packets_in);
1564    return process_packet_in(engine, packet_in, &ppstate, sa_local, sa_peer,
1565                                                                    peer_ctx);
1566}
1567
1568
1569#if __GNUC__ && !defined(NDEBUG)
1570__attribute__((weak))
1571#endif
1572unsigned
1573lsquic_engine_quic_versions (const lsquic_engine_t *engine)
1574{
1575    return engine->pub.enp_settings.es_versions;
1576}
1577
1578
1579int
1580lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
1581{
1582    const lsquic_time_t *next_time;
1583    lsquic_time_t now;
1584
1585    next_time = attq_next_time(engine->attq);
1586    if (!next_time)
1587        return 0;
1588
1589    now = lsquic_time_now();
1590    *diff = (int) ((int64_t) *next_time - (int64_t) now);
1591    return 1;
1592}
1593
1594
1595unsigned
1596lsquic_engine_count_attq (lsquic_engine_t *engine, int from_now)
1597{
1598    lsquic_time_t now;
1599    now = lsquic_time_now();
1600    if (from_now < 0)
1601        now -= from_now;
1602    else
1603        now += from_now;
1604    return attq_count_before(engine->attq, now);
1605}
1606
1607
1608