lsquic_full_conn.c revision 83287402
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_full_conn.c -- A "full" connection object has full functionality
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <netinet/in.h>
10#include <stdarg.h>
11#include <stdlib.h>
12#include <string.h>
13#include <sys/socket.h>
14#include <sys/queue.h>
15#include <sys/time.h>
16
17#include "lsquic_types.h"
18#include "lsquic.h"
19#include "lsquic_alarmset.h"
20#include "lsquic_packet_common.h"
21#include "lsquic_parse.h"
22#include "lsquic_packet_in.h"
23#include "lsquic_packet_out.h"
24#include "lsquic_rechist.h"
25#include "lsquic_util.h"
26#include "lsquic_conn_flow.h"
27#include "lsquic_sfcw.h"
28#include "lsquic_stream.h"
29#include "lsquic_senhist.h"
30#include "lsquic_rtt.h"
31#include "lsquic_cubic.h"
32#include "lsquic_pacer.h"
33#include "lsquic_send_ctl.h"
34#include "lsquic_set.h"
35#include "lsquic_malo.h"
36#include "lsquic_chsk_stream.h"
37#include "lsquic_str.h"
38#include "lsquic_qtags.h"
39#include "lsquic_handshake.h"
40#include "lsquic_headers_stream.h"
41#include "lsquic_frame_common.h"
42#include "lsquic_frame_reader.h"
43#include "lsquic_mm.h"
44#include "lsquic_engine_public.h"
45#include "lsquic_spi.h"
46#include "lsquic_ev_log.h"
47#include "lsquic_version.h"
48#include "lsquic_hash.h"
49
50#include "lsquic_conn.h"
51#include "lsquic_conn_public.h"
52#include "lsquic_ver_neg.h"
53#include "lsquic_full_conn.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_CONN
56#define LSQUIC_LOG_CONN_ID conn->fc_conn.cn_cid
57#include "lsquic_logger.h"
58
59enum { STREAM_IF_STD, STREAM_IF_HSK, STREAM_IF_HDR, N_STREAM_IFS };
60
61#define MAX_ANY_PACKETS_SINCE_LAST_ACK  20
62#define MAX_RETR_PACKETS_SINCE_LAST_ACK 2
63#define ACK_TIMEOUT                     25000
64#define TIME_BETWEEN_PINGS              15000000
65#define IDLE_TIMEOUT                    30000000
66
67/* IMPORTANT: Keep values of FC_SERVER and FC_HTTP same as LSENG_SERVER
68 * and LSENG_HTTP.
69 */
70enum full_conn_flags {
71    FC_SERVER         = LSENG_SERVER,   /* Server mode */
72    FC_HTTP           = LSENG_HTTP,     /* HTTP mode */
73    FC_TIMED_OUT      = (1 << 2),
74    FC_ERROR          = (1 << 3),
75    FC_ABORTED        = (1 << 4),
76    FC_CLOSING        = (1 << 5),   /* Closing */
77    FC_SEND_PING      = (1 << 6),   /* PING frame scheduled */
78    FC_NSTP           = (1 << 7),   /* NSTP mode */
79    FC_SEND_GOAWAY    = (1 << 8),
80    FC_SEND_WUF       = (1 << 9),
81    FC_SEND_STOP_WAITING
82                      = (1 <<10),
83    FC_ACK_QUEUED     = (1 <<11),
84    FC_ACK_HAD_MISS   = (1 <<12),   /* Last ACK frame had missing packets. */
85    FC_CREATED_OK     = (1 <<13),
86    FC_RECV_CLOSE     = (1 <<14),   /* Received CONNECTION_CLOSE frame */
87    FC_GOING_AWAY     = (1 <<15),   /* Do not accept or create new streams */
88    FC_GOAWAY_SENT    = (1 <<16),   /* Only send GOAWAY once */
89    FC_SUPPORT_PUSH   = (1 <<17),
90    FC_GOT_PRST       = (1 <<18),   /* Received public reset packet */
91    FC_FIRST_TICK     = (1 <<19),
92    FC_TICK_CLOSE     = (1 <<20),   /* We returned TICK_CLOSE */
93};
94
95#define FC_IMMEDIATE_CLOSE_FLAGS \
96            (FC_TIMED_OUT|FC_ERROR|FC_ABORTED)
97
98#if LSQUIC_KEEP_STREAM_HISTORY
99#define KEEP_CLOSED_STREAM_HISTORY 0
100#endif
101
102#if KEEP_CLOSED_STREAM_HISTORY
103struct stream_history
104{
105    uint32_t            shist_stream_id;
106    enum stream_flags   shist_stream_flags;
107    unsigned char       shist_hist_buf[1 << SM_HIST_BITS];
108};
109#define SHIST_BITS 5
110#define SHIST_MASK ((1 << SHIST_BITS) - 1)
111#endif
112
113struct full_conn
114{
115    struct lsquic_conn           fc_conn;
116    struct lsquic_hash          *fc_streams;
117    struct lsquic_rechist        fc_rechist;
118    struct {
119        const struct lsquic_stream_if   *stream_if;
120        void                            *stream_if_ctx;
121    }                            fc_stream_ifs[N_STREAM_IFS];
122    lsquic_conn_ctx_t           *fc_conn_ctx;
123    struct lsquic_send_ctl       fc_send_ctl;
124    struct lsquic_conn_public    fc_pub;
125    lsquic_alarmset_t            fc_alset;
126    lsquic_set32_t               fc_closed_stream_ids[2];
127    const struct lsquic_engine_settings
128                                *fc_settings;
129    struct lsquic_engine_public *fc_enpub;
130    lsquic_packno_t              fc_max_ack_packno;
131    lsquic_packno_t              fc_max_swf_packno;
132    struct {
133        unsigned    max_streams_in;
134        unsigned    max_streams_out;
135        unsigned    max_conn_send;
136        unsigned    max_stream_send;
137    }                            fc_cfg;
138    enum full_conn_flags         fc_flags;
139    /* Number of packets received since last ACK sent: */
140    unsigned                     fc_n_slack_all;
141    /* Number ackable packets received since last ACK was sent: */
142    unsigned                     fc_n_slack_akbl;
143    unsigned                     fc_n_delayed_streams;
144    unsigned                     fc_n_cons_unretx;
145    uint32_t                     fc_last_stream_id;
146    uint32_t                     fc_max_peer_stream_id;
147    uint32_t                     fc_goaway_stream_id;
148    struct ver_neg               fc_ver_neg;
149    union {
150        struct client_hsk_ctx    client;
151    }                            fc_hsk_ctx;
152#if FULL_CONN_STATS
153    struct {
154        unsigned            n_all_packets_in,
155                            n_packets_out,
156                            n_undec_packets,
157                            n_dup_packets,
158                            n_err_packets;
159        unsigned long       stream_data_sz;
160    }                            fc_stats;
161#endif
162#if KEEP_CLOSED_STREAM_HISTORY
163    /* Rolling log of histories of closed streams.  Older entries are
164     * overwritten.
165     */
166    struct stream_history        fc_stream_histories[1 << SHIST_BITS];
167    unsigned                     fc_stream_hist_idx;
168#endif
169};
170
171
172#define ABORT_WITH_FLAG(conn, flag, errmsg...) do {                         \
173    (conn)->fc_flags |= flag;                                               \
174    LSQ_ERROR("Abort connection: " errmsg);                                  \
175} while (0)
176
177#define ABORT_ERROR(errmsg...) ABORT_WITH_FLAG(conn, FC_ERROR, errmsg)
178
179#define ABORT_TIMEOUT(errmsg...) ABORT_WITH_FLAG(conn, FC_TIMED_OUT, errmsg)
180
181static void
182idle_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now);
183
184static void
185ping_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now);
186
187static void
188handshake_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now);
189
190static void
191ack_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now);
192
193static lsquic_stream_t *
194new_stream (struct full_conn *conn, uint32_t stream_id);
195
196static void
197reset_ack_state (struct full_conn *conn);
198
199static const struct headers_stream_callbacks headers_callbacks;
200
201#if KEEP_CLOSED_STREAM_HISTORY
202
203static void
204save_stream_history (struct full_conn *conn, const lsquic_stream_t *stream)
205{
206    sm_hist_idx_t idx;
207    struct stream_history *const shist =
208        &conn->fc_stream_histories[ conn->fc_stream_hist_idx++ & SHIST_MASK ];
209
210    shist->shist_stream_id    = stream->id;
211    shist->shist_stream_flags = stream->stream_flags;
212
213    idx = stream->sm_hist_idx & SM_HIST_IDX_MASK;
214    if ('\0' == stream->sm_hist_buf[ idx ])
215        memcpy(shist->shist_hist_buf, stream->sm_hist_buf, idx + 1);
216    else
217    {
218        memcpy(shist->shist_hist_buf,
219            stream->sm_hist_buf + idx, sizeof(stream->sm_hist_buf) - idx);
220        memcpy(shist->shist_hist_buf + sizeof(shist->shist_hist_buf) - idx,
221            stream->sm_hist_buf, idx);
222    }
223}
224
225
226static const struct stream_history *
227find_stream_history (const struct full_conn *conn, uint32_t stream_id)
228{
229    const struct stream_history *shist;
230    const struct stream_history *const shist_end =
231                        conn->fc_stream_histories + (1 << SHIST_BITS);
232    for (shist = conn->fc_stream_histories; shist < shist_end; ++shist)
233        if (shist->shist_stream_id == stream_id)
234            return shist;
235    return NULL;
236}
237
238
239#   define SAVE_STREAM_HISTORY(conn, stream) save_stream_history(conn, stream)
240#else
241#   define SAVE_STREAM_HISTORY(conn, stream)
242#endif
243
244static unsigned
245highest_bit_set (unsigned sz)
246{
247#if __GNUC__
248    unsigned clz = __builtin_clz(sz);
249    return 31 - clz;
250#else
251    unsigned n, y;
252    n = 32;
253    y = sz >> 16;   if (y) { n -= 16; sz = y; }
254    y = sz >>  8;   if (y) { n -=  8; sz = y; }
255    y = sz >>  4;   if (y) { n -=  4; sz = y; }
256    y = sz >>  2;   if (y) { n -=  2; sz = y; }
257    y = sz >>  1;   if (y) return 31 - n + 2;
258    return 31 - n + sz;
259#endif
260}
261
262
263static void
264set_versions (struct full_conn *conn, unsigned versions)
265{
266    conn->fc_ver_neg.vn_supp = versions;
267    conn->fc_ver_neg.vn_ver  = highest_bit_set(versions);
268    conn->fc_ver_neg.vn_buf  = lsquic_ver2tag(conn->fc_ver_neg.vn_ver);
269    conn->fc_conn.cn_version = conn->fc_ver_neg.vn_ver;
270    LSQ_DEBUG("negotiating version %s",
271                            lsquic_ver2str[conn->fc_ver_neg.vn_ver]);
272}
273
274
275static void
276init_ver_neg (struct full_conn *conn, unsigned versions)
277{
278    set_versions(conn, versions);
279    conn->fc_ver_neg.vn_tag   = &conn->fc_ver_neg.vn_buf;
280    conn->fc_ver_neg.vn_state = VN_START;
281}
282
283
284/* If peer supplies odd values, we abort the connection immediately rather
285 * that wait for it to finish "naturally" due to inability to send things.
286 */
287static void
288conn_on_peer_config (struct full_conn *conn, unsigned peer_cfcw,
289                     unsigned peer_sfcw, unsigned max_streams_out)
290{
291    lsquic_stream_t *stream;
292    struct lsquic_hash_elem *el;
293
294    LSQ_INFO("Applying peer config: cfcw: %u; sfcw: %u; # streams: %u",
295        peer_cfcw, peer_sfcw, max_streams_out);
296
297    if (peer_cfcw < conn->fc_pub.conn_cap.cc_sent)
298    {
299        ABORT_ERROR("peer specified CFCW=%u bytes, which is smaller than "
300            "the amount of data already sent on this connection (%"PRIu64
301            " bytes)", peer_cfcw, conn->fc_pub.conn_cap.cc_sent);
302        return;
303    }
304
305    conn->fc_cfg.max_streams_out = max_streams_out;
306    conn->fc_pub.conn_cap.cc_max = peer_cfcw;
307
308    for (el = lsquic_hash_first(conn->fc_streams); el;
309                                 el = lsquic_hash_next(conn->fc_streams))
310    {
311        stream = lsquic_hashelem_getdata(el);
312        if (0 != lsquic_stream_set_max_send_off(stream, peer_sfcw))
313        {
314            ABORT_ERROR("cannot set peer-supplied SFCW=%u on stream %u",
315                peer_sfcw, stream->id);
316            return;
317        }
318    }
319
320    conn->fc_cfg.max_stream_send = peer_sfcw;
321}
322
323
324static int
325send_smhl (const struct full_conn *conn)
326{
327    uint32_t smhl;
328    return conn->fc_conn.cn_enc_session
329        && (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE)
330        && 0 == conn->fc_conn.cn_esf->esf_get_peer_setting(
331                            conn->fc_conn.cn_enc_session, QTAG_SMHL, &smhl)
332        && 1 == smhl;
333}
334
335
336/* Once handshake has been completed, send settings to peer if appropriate.
337 */
338static void
339maybe_send_settings (struct full_conn *conn)
340{
341    struct lsquic_http2_setting settings[2];
342    unsigned n_settings = 0;
343
344    if (conn->fc_settings->es_max_header_list_size && send_smhl(conn))
345    {
346        settings[n_settings].id    = SETTINGS_MAX_HEADER_LIST_SIZE;
347        settings[n_settings].value = conn->fc_settings->es_max_header_list_size;
348        LSQ_DEBUG("sending settings SETTINGS_MAX_HEADER_LIST_SIZE=%u",
349                                                settings[n_settings].value);
350        ++n_settings;
351    }
352    if (!(conn->fc_flags & FC_SERVER) && !conn->fc_settings->es_support_push)
353    {
354        settings[n_settings].id    = SETTINGS_ENABLE_PUSH;
355        settings[n_settings].value = 0;
356        LSQ_DEBUG("sending settings SETTINGS_ENABLE_PUSH=%u",
357                                                settings[n_settings].value);
358        ++n_settings;
359    }
360
361    if (n_settings)
362    {
363        if (0 != lsquic_headers_stream_send_settings(conn->fc_pub.hs,
364                                                        settings, n_settings))
365            ABORT_ERROR("could not send settings");
366    }
367    else
368        LSQ_DEBUG("not sending any settings");
369}
370
371
372static int
373apply_peer_settings (struct full_conn *conn)
374{
375    uint32_t cfcw, sfcw, mids;
376    unsigned n;
377    const struct {
378        uint32_t    tag;
379        uint32_t   *val;
380        const char *tag_str;
381    } tags[] = {
382        { QTAG_CFCW, &cfcw, "CFCW", },
383        { QTAG_SFCW, &sfcw, "SFCW", },
384        { QTAG_MIDS, &mids, "MIDS", },
385    };
386
387#ifndef NDEBUG
388    if (getenv("LSQUIC_TEST_ENGINE_DTOR"))
389        return 0;
390#endif
391
392        for (n = 0; n < sizeof(tags) / sizeof(tags[0]); ++n)
393            if (0 != conn->fc_conn.cn_esf->esf_get_peer_setting(
394                        conn->fc_conn.cn_enc_session, tags[n].tag, tags[n].val))
395            {
396                LSQ_INFO("peer did not supply value for %s", tags[n].tag_str);
397                return -1;
398            }
399
400    LSQ_DEBUG("peer settings: CFCW: %u; SFCW: %u; MIDS: %u",
401        cfcw, sfcw, mids);
402    conn_on_peer_config(conn, cfcw, sfcw, mids);
403    if (conn->fc_flags & FC_HTTP)
404        maybe_send_settings(conn);
405    return 0;
406}
407
408
409static const struct conn_iface full_conn_iface;
410
411static struct full_conn *
412new_conn_common (lsquic_cid_t cid, struct lsquic_engine_public *enpub,
413                 const struct lsquic_stream_if *stream_if,
414                 void *stream_if_ctx, unsigned flags,
415                 unsigned short max_packet_size)
416{
417    struct full_conn *conn;
418    lsquic_stream_t *headers_stream;
419    int saved_errno;
420
421    assert(0 == (flags & ~(FC_SERVER|FC_HTTP)));
422
423    conn = calloc(1, sizeof(*conn));
424    if (!conn)
425        return NULL;
426    headers_stream = NULL;
427    conn->fc_conn.cn_cid = cid;
428    conn->fc_conn.cn_pack_size = max_packet_size;
429    conn->fc_flags = flags;
430    conn->fc_enpub = enpub;
431    conn->fc_pub.enpub = enpub;
432    conn->fc_pub.mm = &enpub->enp_mm;
433    conn->fc_pub.lconn = &conn->fc_conn;
434    conn->fc_pub.send_ctl = &conn->fc_send_ctl;
435    conn->fc_pub.packet_out_malo =
436                        lsquic_malo_create(sizeof(struct lsquic_packet_out));
437    conn->fc_stream_ifs[STREAM_IF_STD].stream_if     = stream_if;
438    conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx = stream_if_ctx;
439    conn->fc_settings = &enpub->enp_settings;
440    /* Calculate maximum number of incoming streams using the same mechanism
441     * and parameters as found in Chrome:
442     */
443    conn->fc_cfg.max_streams_in =
444        (unsigned) ((float) enpub->enp_settings.es_max_streams_in * 1.1f);
445    if (conn->fc_cfg.max_streams_in <
446                                enpub->enp_settings.es_max_streams_in + 10)
447        conn->fc_cfg.max_streams_in =
448                                enpub->enp_settings.es_max_streams_in + 10;
449    /* `max_streams_out' gets reset when handshake is complete and we
450     * learn of peer settings.  100 seems like a sane default value
451     * because it is what other implementations use.  In server mode,
452     * we do not open any streams until the handshake is complete; in
453     * client mode, we are limited to 98 outgoing requests alongside
454     * handshake and headers streams.
455     */
456    conn->fc_cfg.max_streams_out = 100;
457    TAILQ_INIT(&conn->fc_pub.sending_streams);
458    TAILQ_INIT(&conn->fc_pub.rw_streams);
459    TAILQ_INIT(&conn->fc_pub.service_streams);
460    lsquic_conn_cap_init(&conn->fc_pub.conn_cap, LSQUIC_MIN_FCW);
461    lsquic_alarmset_init(&conn->fc_alset, cid);
462    lsquic_alarmset_init_alarm(&conn->fc_alset, AL_IDLE, idle_alarm_expired, conn);
463    lsquic_alarmset_init_alarm(&conn->fc_alset, AL_ACK, ack_alarm_expired, conn);
464    lsquic_alarmset_init_alarm(&conn->fc_alset, AL_PING, ping_alarm_expired, conn);
465    lsquic_alarmset_init_alarm(&conn->fc_alset, AL_HANDSHAKE, handshake_alarm_expired, conn);
466    lsquic_set32_init(&conn->fc_closed_stream_ids[0]);
467    lsquic_set32_init(&conn->fc_closed_stream_ids[1]);
468    lsquic_cfcw_init(&conn->fc_pub.cfcw, &conn->fc_pub, conn->fc_settings->es_cfcw);
469    lsquic_send_ctl_init(&conn->fc_send_ctl, &conn->fc_alset, conn->fc_enpub,
470                 &conn->fc_ver_neg, &conn->fc_pub, conn->fc_conn.cn_pack_size);
471
472    conn->fc_streams = lsquic_hash_create();
473    if (!conn->fc_streams)
474        goto cleanup_on_error;
475    lsquic_rechist_init(&conn->fc_rechist, cid);
476    if (conn->fc_flags & FC_HTTP)
477    {
478        conn->fc_pub.hs = lsquic_headers_stream_new(
479            !!(conn->fc_flags & FC_SERVER), conn->fc_pub.mm, conn->fc_settings,
480                                                     &headers_callbacks, conn);
481        if (!conn->fc_pub.hs)
482            goto cleanup_on_error;
483        conn->fc_stream_ifs[STREAM_IF_HDR].stream_if     = lsquic_headers_stream_if;
484        conn->fc_stream_ifs[STREAM_IF_HDR].stream_if_ctx = conn->fc_pub.hs;
485        headers_stream = new_stream(conn, LSQUIC_STREAM_HEADERS);
486        if (!headers_stream)
487            goto cleanup_on_error;
488    }
489    else
490    {
491        conn->fc_stream_ifs[STREAM_IF_HDR].stream_if     = stream_if;
492        conn->fc_stream_ifs[STREAM_IF_HDR].stream_if_ctx = stream_if_ctx;
493    }
494    if (conn->fc_settings->es_support_push)
495        conn->fc_flags |= FC_SUPPORT_PUSH;
496    conn->fc_conn.cn_if = &full_conn_iface;
497    return conn;
498
499  cleanup_on_error:
500    saved_errno = errno;
501
502    if (conn->fc_streams)
503        lsquic_hash_destroy(conn->fc_streams);
504    lsquic_rechist_cleanup(&conn->fc_rechist);
505    if (conn->fc_flags & FC_HTTP)
506    {
507        if (conn->fc_pub.hs)
508            lsquic_headers_stream_destroy(conn->fc_pub.hs);
509        if (headers_stream)
510            lsquic_stream_destroy(headers_stream);
511    }
512    memset(conn, 0, sizeof(*conn));
513    free(conn);
514
515    errno = saved_errno;
516    return NULL;
517}
518
519
520struct lsquic_conn *
521full_conn_client_new (struct lsquic_engine_public *enpub,
522                      const struct lsquic_stream_if *stream_if,
523                      void *stream_if_ctx, unsigned flags,
524                      const char *hostname, unsigned short max_packet_size)
525{
526    struct full_conn *conn;
527    enum lsquic_version version;
528    lsquic_cid_t cid;
529    const struct enc_session_funcs *esf;
530
531    version = highest_bit_set(enpub->enp_settings.es_versions);
532    esf = select_esf_by_ver(version);
533    cid = esf->esf_generate_cid();
534    conn = new_conn_common(cid, enpub, stream_if, stream_if_ctx, flags,
535                                                            max_packet_size);
536    if (!conn)
537        return NULL;
538    conn->fc_conn.cn_esf = esf;
539    conn->fc_conn.cn_enc_session =
540        conn->fc_conn.cn_esf->esf_create_client(hostname, cid, conn->fc_enpub);
541    if (!conn->fc_conn.cn_enc_session)
542    {
543        LSQ_WARN("could not create enc session: %s", strerror(errno));
544        conn->fc_conn.cn_if->ci_destroy(&conn->fc_conn);
545        return NULL;
546    }
547
548    if (conn->fc_flags & FC_HTTP)
549        conn->fc_last_stream_id = LSQUIC_STREAM_HEADERS;   /* Client goes 5, 7, 9.... */
550    else
551        conn->fc_last_stream_id = LSQUIC_STREAM_HANDSHAKE;
552    conn->fc_hsk_ctx.client.lconn   = &conn->fc_conn;
553    conn->fc_hsk_ctx.client.mm      = &enpub->enp_mm;
554    conn->fc_hsk_ctx.client.ver_neg = &conn->fc_ver_neg;
555    conn->fc_stream_ifs[STREAM_IF_HSK]
556                .stream_if     = &lsquic_client_hsk_stream_if;
557    conn->fc_stream_ifs[STREAM_IF_HSK].stream_if_ctx = &conn->fc_hsk_ctx.client;
558    init_ver_neg(conn, conn->fc_settings->es_versions);
559    conn->fc_conn.cn_pf = select_pf_by_ver(conn->fc_ver_neg.vn_ver);
560    if (conn->fc_settings->es_handshake_to)
561        lsquic_alarmset_set(&conn->fc_alset, AL_HANDSHAKE,
562                    lsquic_time_now() + conn->fc_settings->es_handshake_to);
563    if (!new_stream(conn, LSQUIC_STREAM_HANDSHAKE))
564    {
565        LSQ_WARN("could not create handshake stream: %s", strerror(errno));
566        conn->fc_conn.cn_if->ci_destroy(&conn->fc_conn);
567        return NULL;
568    }
569    conn->fc_flags |= FC_CREATED_OK;
570    conn->fc_conn_ctx = stream_if->on_new_conn(stream_if_ctx, &conn->fc_conn);
571    LSQ_INFO("Created new client connection");
572    EV_LOG_CONN_EVENT(cid, "created full connection");
573    return &conn->fc_conn;
574}
575
576
577static int
578is_our_stream (const struct full_conn *conn, const lsquic_stream_t *stream)
579{
580    int is_server = !!(conn->fc_flags & FC_SERVER);
581    return (1 & stream->id) ^ is_server;
582}
583
584
585static unsigned
586count_streams (const struct full_conn *conn, int peer)
587{
588    const lsquic_stream_t *stream;
589    unsigned count;
590    int ours;
591    int is_server;
592    struct lsquic_hash_elem *el;
593
594    peer = !!peer;
595    is_server = !!(conn->fc_flags & FC_SERVER);
596    count = 0;
597
598    for (el = lsquic_hash_first(conn->fc_streams); el;
599                                 el = lsquic_hash_next(conn->fc_streams))
600    {
601        stream = lsquic_hashelem_getdata(el);
602        ours = (1 & stream->id) ^ is_server;
603        if (ours ^ peer)
604            count += !lsquic_stream_is_closed(stream);
605    }
606
607    return count;
608}
609
610
611static void
612full_conn_ci_destroy (lsquic_conn_t *lconn)
613{
614    struct full_conn *conn = (struct full_conn *) lconn;
615    struct lsquic_hash_elem *el;
616    struct lsquic_stream *stream;
617
618    LSQ_DEBUG("destroy connection");
619    conn->fc_flags |= FC_CLOSING;
620    lsquic_set32_cleanup(&conn->fc_closed_stream_ids[0]);
621    lsquic_set32_cleanup(&conn->fc_closed_stream_ids[1]);
622    for (el = lsquic_hash_first(conn->fc_streams); el;
623                                 el = lsquic_hash_next(conn->fc_streams))
624    {
625        stream = lsquic_hashelem_getdata(el);
626        lsquic_stream_destroy(stream);
627    }
628    lsquic_hash_destroy(conn->fc_streams);
629    if (conn->fc_flags & FC_CREATED_OK)
630        conn->fc_stream_ifs[STREAM_IF_STD].stream_if
631                    ->on_conn_closed(&conn->fc_conn);
632    if (conn->fc_pub.hs)
633        lsquic_headers_stream_destroy(conn->fc_pub.hs);
634
635    lsquic_send_ctl_cleanup(&conn->fc_send_ctl);
636    lsquic_rechist_cleanup(&conn->fc_rechist);
637    if (conn->fc_conn.cn_enc_session)
638        conn->fc_conn.cn_esf->esf_destroy(conn->fc_conn.cn_enc_session);
639    lsquic_malo_destroy(conn->fc_pub.packet_out_malo);
640#if FULL_CONN_STATS
641    LSQ_NOTICE("received %u packets, of which %u were not decryptable, %u were "
642        "dups and %u were errors; sent %u packets, avg stream data per outgoing"
643        " packet is %lu bytes",
644        conn->fc_stats.n_all_packets_in, conn->fc_stats.n_undec_packets,
645        conn->fc_stats.n_dup_packets, conn->fc_stats.n_err_packets,
646        conn->fc_stats.n_packets_out,
647        conn->fc_stats.stream_data_sz / conn->fc_stats.n_packets_out);
648#endif
649    EV_LOG_CONN_EVENT(LSQUIC_LOG_CONN_ID, "full connection destroyed");
650    free(conn);
651}
652
653
654static void
655conn_mark_stream_closed (struct full_conn *conn, uint32_t stream_id)
656{   /* Because stream IDs are distributed unevenly -- there is a set of odd
657     * stream IDs and a set of even stream IDs -- it is more efficient to
658     * maintain two sets of closed stream IDs.
659     */
660    int idx = stream_id & 1;
661    stream_id >>= 1;
662    if (0 != lsquic_set32_add(&conn->fc_closed_stream_ids[idx], stream_id))
663        ABORT_ERROR("could not add element to set: %s", strerror(errno));
664}
665
666
667static int
668conn_is_stream_closed (struct full_conn *conn, uint32_t stream_id)
669{
670    int idx = stream_id & 1;
671    stream_id >>= 1;
672    return lsquic_set32_has(&conn->fc_closed_stream_ids[idx], stream_id);
673}
674
675
676static void
677set_ack_timer (struct full_conn *conn, lsquic_time_t now)
678{
679    lsquic_alarmset_set(&conn->fc_alset, AL_ACK, now + ACK_TIMEOUT);
680    LSQ_DEBUG("ACK alarm set to %"PRIu64, now + ACK_TIMEOUT);
681}
682
683
684static void
685ack_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now)
686{
687    struct full_conn *conn = ctx;
688    LSQ_DEBUG("ACK timer expired (%"PRIu64" < %"PRIu64"): ACK queued",
689        expiry, now);
690    conn->fc_flags |= FC_ACK_QUEUED;
691}
692
693
694static void
695try_queueing_ack (struct full_conn *conn, int was_missing, lsquic_time_t now)
696{
697    if (conn->fc_n_slack_akbl >= MAX_RETR_PACKETS_SINCE_LAST_ACK ||
698        (conn->fc_conn.cn_version < LSQVER_039 /* Since Q039 do not ack ACKs */
699            && conn->fc_n_slack_all >= MAX_ANY_PACKETS_SINCE_LAST_ACK) ||
700        ((conn->fc_flags & FC_ACK_HAD_MISS) && was_missing)      ||
701        lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl) > 1)
702    {
703        lsquic_alarmset_unset(&conn->fc_alset, AL_ACK);
704        lsquic_send_ctl_sanity_check(&conn->fc_send_ctl);
705        conn->fc_flags |= FC_ACK_QUEUED;
706        LSQ_DEBUG("ACK queued: ackable: %u; all: %u; had_miss: %d; "
707            "was_missing: %d; n_stop_waiting: %u",
708            conn->fc_n_slack_akbl, conn->fc_n_slack_all,
709            !!(conn->fc_flags & FC_ACK_HAD_MISS), was_missing,
710            lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl));
711    }
712    else if (conn->fc_n_slack_akbl > 0)
713        set_ack_timer(conn, now);
714}
715
716
717static void
718reset_ack_state (struct full_conn *conn)
719{
720    conn->fc_n_slack_all  = 0;
721    conn->fc_n_slack_akbl = 0;
722    lsquic_send_ctl_n_stop_waiting_reset(&conn->fc_send_ctl);
723    conn->fc_flags &= ~FC_ACK_QUEUED;
724    lsquic_alarmset_unset(&conn->fc_alset, AL_ACK);
725    lsquic_send_ctl_sanity_check(&conn->fc_send_ctl);
726    LSQ_DEBUG("ACK state reset");
727}
728
729
730static lsquic_stream_t *
731new_stream_ext (struct full_conn *conn, uint32_t stream_id, int if_idx,
732                enum stream_ctor_flags stream_ctor_flags)
733{
734    lsquic_stream_t *stream = lsquic_stream_new_ext(stream_id, &conn->fc_pub,
735        conn->fc_stream_ifs[if_idx].stream_if,
736        conn->fc_stream_ifs[if_idx].stream_if_ctx, conn->fc_settings->es_sfcw,
737        conn->fc_cfg.max_stream_send, stream_ctor_flags);
738    if (stream)
739        lsquic_hash_insert(conn->fc_streams, &stream->id, sizeof(stream->id),
740                                                                        stream);
741    return stream;
742}
743
744
745static lsquic_stream_t *
746new_stream (struct full_conn *conn, uint32_t stream_id)
747{
748    enum stream_ctor_flags flags;
749    int idx;
750    switch (stream_id)
751    {
752    case LSQUIC_STREAM_HANDSHAKE:
753        idx = STREAM_IF_HSK;
754        flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW;
755        break;
756    case LSQUIC_STREAM_HEADERS:
757        idx = STREAM_IF_HDR;
758        flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW;
759        if (!(conn->fc_flags & FC_HTTP) &&
760                                    conn->fc_enpub->enp_settings.es_rw_once)
761            flags |= SCF_DISP_RW_ONCE;
762        break;
763    default:
764        idx = STREAM_IF_STD;
765        flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW;
766        if (conn->fc_enpub->enp_settings.es_rw_once)
767            flags |= SCF_DISP_RW_ONCE;
768        break;
769    }
770    return new_stream_ext(conn, stream_id, idx, flags);
771}
772
773
774static uint32_t
775generate_stream_id (struct full_conn *conn)
776{
777    conn->fc_last_stream_id += 2;
778    return conn->fc_last_stream_id;
779}
780
781
782unsigned
783lsquic_conn_n_pending_streams (const lsquic_conn_t *lconn)
784{
785    struct full_conn *conn = (struct full_conn *) lconn;
786    return conn->fc_n_delayed_streams;
787}
788
789
790unsigned
791lsquic_conn_cancel_pending_streams (lsquic_conn_t *lconn, unsigned n)
792{
793    struct full_conn *conn = (struct full_conn *) lconn;
794    if (n > conn->fc_n_delayed_streams)
795        conn->fc_n_delayed_streams = 0;
796    else
797        conn->fc_n_delayed_streams -= n;
798    return conn->fc_n_delayed_streams;
799}
800
801
802static int
803either_side_going_away (const struct full_conn *conn)
804{
805    return (conn->fc_flags & FC_GOING_AWAY)
806        || (conn->fc_conn.cn_flags & LSCONN_PEER_GOING_AWAY);
807}
808
809
810void
811lsquic_conn_make_stream (lsquic_conn_t *lconn)
812{
813    struct full_conn *conn = (struct full_conn *) lconn;
814    unsigned stream_count = count_streams(conn, 0);
815    if (stream_count < conn->fc_cfg.max_streams_out)
816    {
817        if (!new_stream(conn, generate_stream_id(conn)))
818            ABORT_ERROR("could not create new stream: %s", strerror(errno));
819    }
820    else if (either_side_going_away(conn))
821        (void) conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_new_stream(
822            conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx, NULL);
823    else
824    {
825        ++conn->fc_n_delayed_streams;
826        LSQ_DEBUG("delayed stream creation.  Backlog size: %u",
827                                                conn->fc_n_delayed_streams);
828    }
829}
830
831
832static lsquic_stream_t *
833find_stream_by_id (struct full_conn *conn, uint32_t stream_id)
834{
835    struct lsquic_hash_elem *el;
836    el = lsquic_hash_find(conn->fc_streams, &stream_id, sizeof(stream_id));
837    if (el)
838        return lsquic_hashelem_getdata(el);
839    else
840        return NULL;
841}
842
843
844lsquic_stream_t *
845lsquic_conn_get_stream_by_id (lsquic_conn_t *lconn, uint32_t stream_id)
846{
847    struct full_conn *conn = (struct full_conn *) lconn;
848    return find_stream_by_id(conn, stream_id);
849}
850
851
852static unsigned
853count_zero_bytes (const unsigned char *p, size_t len)
854{
855    const unsigned char *const end = p + len;
856    while (p < end && 0 == *p)
857        ++p;
858    return len - (end - p);
859}
860
861
862static unsigned
863process_padding_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
864                       const unsigned char *p, size_t len)
865{
866    if (conn->fc_conn.cn_version >= LSQVER_038)
867        return count_zero_bytes(p, len);
868    if (lsquic_is_zero(p, len))
869    {
870        EV_LOG_PADDING_FRAME_IN(LSQUIC_LOG_CONN_ID, len);
871        return len;
872    }
873    else
874        return 0;
875}
876
877
878static unsigned
879process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
880                    const unsigned char *p, size_t len)
881{   /* This frame causes ACK frame to be queued, but nothing to do here;
882     * return the length of this frame.
883     */
884    EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID);
885    LSQ_DEBUG("received PING");
886    return 1;
887}
888
889
890static int
891is_peer_initiated (const struct full_conn *conn, uint32_t stream_id)
892{
893    unsigned is_server = !!(conn->fc_flags & FC_SERVER);
894    int peer_initiated = (stream_id & 1) == is_server;
895    return peer_initiated;
896}
897
898
899static unsigned
900process_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
901                      const unsigned char *p, size_t len)
902{
903    stream_frame_t *stream_frame;
904    lsquic_stream_t *stream;
905    int parsed_len;
906
907    stream_frame = lsquic_malo_get(conn->fc_pub.mm->malo.stream_frame);
908    if (!stream_frame)
909    {
910        LSQ_WARN("could not allocate stream frame: %s", strerror(errno));
911        return 0;
912    }
913
914    parsed_len = conn->fc_conn.cn_pf->pf_parse_stream_frame(p, len,
915                                                            stream_frame);
916    if (parsed_len < 0) {
917        lsquic_malo_put(stream_frame);
918        return 0;
919    }
920    EV_LOG_STREAM_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_frame);
921    LSQ_DEBUG("Got stream frame for stream #%u", stream_frame->stream_id);
922    if (conn->fc_flags & FC_CLOSING)
923    {
924        LSQ_DEBUG("Connection closing: ignore frame");
925        lsquic_malo_put(stream_frame);
926        return parsed_len;
927    }
928
929    stream = find_stream_by_id(conn, stream_frame->stream_id);
930    if (!stream)
931    {
932        if (conn_is_stream_closed(conn, stream_frame->stream_id))
933        {
934            LSQ_DEBUG("drop frame for closed stream %u", stream_frame->stream_id);
935            lsquic_malo_put(stream_frame);
936            return parsed_len;
937        }
938        if (is_peer_initiated(conn, stream_frame->stream_id))
939        {
940            unsigned in_count = count_streams(conn, 1);
941            LSQ_DEBUG("number of peer-initiated streams: %u", in_count);
942            if (in_count >= conn->fc_cfg.max_streams_in)
943            {
944                ABORT_ERROR("incoming stream would exceed limit: %u",
945                                        conn->fc_cfg.max_streams_in);
946                lsquic_malo_put(stream_frame);
947                return 0;
948            }
949            if ((conn->fc_flags & FC_GOING_AWAY) &&
950                stream_frame->stream_id > conn->fc_max_peer_stream_id)
951            {
952                LSQ_DEBUG("going away: drop frame for new stream %u",
953                                                    stream_frame->stream_id);
954                lsquic_malo_put(stream_frame);
955                return parsed_len;
956            }
957        }
958        else
959        {
960            ABORT_ERROR("frame for never-initiated stream");
961            lsquic_malo_put(stream_frame);
962            return 0;
963        }
964        stream = new_stream(conn, stream_frame->stream_id);
965        if (!stream)
966        {
967            ABORT_ERROR("cannot create new stream: %s", strerror(errno));
968            lsquic_malo_put(stream_frame);
969            return 0;
970        }
971        if (stream_frame->stream_id > conn->fc_max_peer_stream_id)
972            conn->fc_max_peer_stream_id = stream_frame->stream_id;
973    }
974
975    stream_frame->packet_in = lsquic_packet_in_get(packet_in);
976    if (0 != lsquic_stream_frame_in(stream, stream_frame))
977    {
978        ABORT_ERROR("cannot insert stream frame");
979        return 0;
980    }
981
982    return parsed_len;
983}
984
985
986static unsigned
987process_invalid_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
988                                            const unsigned char *p, size_t len)
989{
990    ABORT_ERROR("invalid frame");
991    return 0;
992}
993
994
995/* Reset locally-initiated streams whose IDs is larger than the stream ID
996 * specified in received GOAWAY frame.
997 */
998static void
999reset_local_streams_over_goaway (struct full_conn *conn)
1000{
1001    const unsigned is_server = !!(conn->fc_flags & FC_SERVER);
1002    lsquic_stream_t *stream;
1003    struct lsquic_hash_elem *el;
1004
1005    for (el = lsquic_hash_first(conn->fc_streams); el;
1006                                 el = lsquic_hash_next(conn->fc_streams))
1007    {
1008        stream = lsquic_hashelem_getdata(el);
1009        if (stream->id > conn->fc_goaway_stream_id &&
1010            ((stream->id & 1) ^ is_server /* Locally initiated? */))
1011        {
1012            lsquic_stream_received_goaway(stream);
1013        }
1014    }
1015}
1016
1017
1018static unsigned
1019process_goaway_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1020                                            const unsigned char *p, size_t len)
1021{
1022    uint32_t error_code, stream_id;
1023    uint16_t reason_length;
1024    const char *reason;
1025    const int parsed_len = conn->fc_conn.cn_pf->pf_parse_goaway_frame(p, len,
1026                            &error_code, &stream_id, &reason_length, &reason);
1027    if (parsed_len < 0)
1028        return 0;
1029    EV_LOG_GOAWAY_FRAME_IN(LSQUIC_LOG_CONN_ID, error_code, stream_id,
1030        reason_length, reason);
1031    LSQ_DEBUG("received GOAWAY frame, last good stream ID: %u, error code: 0x%X,"
1032        " reason: `%.*s'", stream_id, error_code, reason_length, reason);
1033    if (0 == (conn->fc_conn.cn_flags & LSCONN_PEER_GOING_AWAY))
1034    {
1035        conn->fc_conn.cn_flags |= LSCONN_PEER_GOING_AWAY;
1036        conn->fc_goaway_stream_id = stream_id;
1037        if (conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_goaway_received)
1038        {
1039            LSQ_DEBUG("calling on_goaway_received");
1040            conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_goaway_received(
1041                                            &conn->fc_conn);
1042        }
1043        else
1044            LSQ_DEBUG("on_goaway_received not registered");
1045        reset_local_streams_over_goaway(conn);
1046    }
1047    else
1048        LSQ_DEBUG("ignore duplicate GOAWAY frame");
1049    return parsed_len;
1050}
1051
1052
1053static void
1054log_invalid_ack_frame (struct full_conn *conn, const unsigned char *p,
1055                                int parsed_len, const struct ack_info *acki)
1056{
1057    char *buf;
1058    size_t sz;
1059
1060    buf = malloc(0x1000);
1061    if (buf)
1062    {
1063        lsquic_senhist_tostr(&conn->fc_send_ctl.sc_senhist, buf, 0x1000);
1064        LSQ_WARN("send history: %s", buf);
1065        hexdump(p, parsed_len, buf, 0x1000);
1066        LSQ_WARN("raw ACK frame:\n%s", buf);
1067        free(buf);
1068    }
1069    else
1070        LSQ_WARN("malloc failed");
1071
1072    buf = acki2str(acki, &sz);
1073    if (buf)
1074    {
1075        LSQ_WARN("parsed ACK frame: %.*s", (int) sz, buf);
1076        free(buf);
1077    }
1078    else
1079        LSQ_WARN("malloc failed");
1080}
1081
1082
1083static unsigned
1084process_ack_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1085                                            const unsigned char *p, size_t len)
1086{
1087    const int parsed_len = conn->fc_conn.cn_pf->pf_parse_ack_frame(p, len,
1088                                                        conn->fc_pub.mm->acki);
1089    if (parsed_len < 0)
1090        return 0;
1091    if (packet_in->pi_packno > conn->fc_max_ack_packno)
1092    {
1093        EV_LOG_ACK_FRAME_IN(LSQUIC_LOG_CONN_ID, conn->fc_pub.mm->acki);
1094        if (0 == lsquic_send_ctl_got_ack(&conn->fc_send_ctl,
1095                             conn->fc_pub.mm->acki, packet_in->pi_received))
1096        {
1097            conn->fc_max_ack_packno = packet_in->pi_packno;
1098            if (lsquic_send_ctl_largest_ack2ed(&conn->fc_send_ctl))
1099                lsquic_rechist_stop_wait(&conn->fc_rechist,
1100                    lsquic_send_ctl_largest_ack2ed(&conn->fc_send_ctl) + 1);
1101        }
1102        else
1103        {
1104            log_invalid_ack_frame(conn, p, parsed_len, conn->fc_pub.mm->acki);
1105            ABORT_ERROR("Received invalid ACK");
1106        }
1107    }
1108    else
1109        LSQ_DEBUG("Ignore old ack (max %"PRIu64")", conn->fc_max_ack_packno);
1110    return parsed_len;
1111}
1112
1113
1114static unsigned
1115process_stop_waiting_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1116                                            const unsigned char *p, size_t len)
1117{
1118    lsquic_packno_t least, cutoff;
1119    enum lsquic_packno_bits bits;
1120    int parsed_len;
1121
1122    bits = lsquic_packet_in_packno_bits(packet_in);
1123
1124    if (conn->fc_flags & FC_NSTP)
1125    {
1126        LSQ_DEBUG("NSTP on: ignore STOP_WAITING frame");
1127        parsed_len = conn->fc_conn.cn_pf->pf_skip_stop_waiting_frame(len, bits);
1128        if (parsed_len > 0)
1129            return (unsigned) parsed_len;
1130        else
1131            return 0;
1132    }
1133
1134    parsed_len = conn->fc_conn.cn_pf->pf_parse_stop_waiting_frame(p, len,
1135                                            packet_in->pi_packno, bits, &least);
1136    if (parsed_len < 0)
1137        return 0;
1138
1139    if (packet_in->pi_packno <= conn->fc_max_swf_packno)
1140    {
1141        LSQ_DEBUG("ignore old STOP_WAITING frame");
1142        return parsed_len;
1143    }
1144
1145    LSQ_DEBUG("Got STOP_WAITING frame, least unacked: %"PRIu64, least);
1146    EV_LOG_STOP_WAITING_FRAME_IN(LSQUIC_LOG_CONN_ID, least);
1147
1148    if (least > packet_in->pi_packno)
1149    {
1150        ABORT_ERROR("received invalid STOP_WAITING: %"PRIu64" is larger "
1151            "than the packet number%"PRIu64, least, packet_in->pi_packno);
1152        return 0;
1153    }
1154
1155    cutoff = lsquic_rechist_cutoff(&conn->fc_rechist);
1156    if (cutoff && least < cutoff)
1157    {
1158        ABORT_ERROR("received invalid STOP_WAITING: %"PRIu64" is smaller "
1159            "than the cutoff %"PRIu64, least, cutoff);
1160        return 0;
1161    }
1162
1163    conn->fc_max_swf_packno = packet_in->pi_packno;
1164    lsquic_rechist_stop_wait(&conn->fc_rechist, least);
1165    return parsed_len;
1166}
1167
1168
1169static unsigned
1170process_blocked_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1171                                            const unsigned char *p, size_t len)
1172{
1173    uint32_t stream_id;
1174    const int parsed_len = conn->fc_conn.cn_pf->pf_parse_blocked_frame(p, len,
1175                                                                    &stream_id);
1176    if (parsed_len < 0)
1177        return 0;
1178    EV_LOG_BLOCKED_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id);
1179    LSQ_DEBUG("Peer reports stream %u as blocked", stream_id);
1180    return parsed_len;
1181}
1182
1183
1184static unsigned
1185process_connection_close_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1186                                const unsigned char *p, size_t len)
1187{
1188    lsquic_stream_t *stream;
1189    struct lsquic_hash_elem *el;
1190    uint32_t error_code;
1191    uint16_t reason_len;
1192    uint8_t reason_off;
1193    int parsed_len;
1194
1195    parsed_len = conn->fc_conn.cn_pf->pf_parse_connect_close_frame(p, len,
1196                                        &error_code, &reason_len, &reason_off);
1197    if (parsed_len < 0)
1198        return 0;
1199    EV_LOG_CONNECTION_CLOSE_FRAME_IN(LSQUIC_LOG_CONN_ID, error_code,
1200                            (int) reason_len, (const char *) p + reason_off);
1201    LSQ_INFO("Received CONNECTION_CLOSE frame (code: %u; reason: %.*s)",
1202                error_code, (int) reason_len, (const char *) p + reason_off);
1203    conn->fc_flags |= FC_RECV_CLOSE;
1204    if (!(conn->fc_flags & FC_CLOSING))
1205    {
1206        for (el = lsquic_hash_first(conn->fc_streams); el;
1207                                     el = lsquic_hash_next(conn->fc_streams))
1208        {
1209            stream = lsquic_hashelem_getdata(el);
1210            lsquic_stream_shutdown_internal(stream);
1211        }
1212        conn->fc_flags |= FC_CLOSING;
1213    }
1214    return parsed_len;
1215}
1216
1217
1218static unsigned
1219process_rst_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1220                                            const unsigned char *p, size_t len)
1221{
1222    uint32_t stream_id, error_code;
1223    uint64_t offset;
1224    lsquic_stream_t *stream;
1225    const int parsed_len = conn->fc_conn.cn_pf->pf_parse_rst_frame(p, len,
1226                                            &stream_id, &offset, &error_code);
1227    if (parsed_len < 0)
1228        return 0;
1229
1230    EV_LOG_RST_STREAM_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id, offset,
1231                                                                error_code);
1232    LSQ_DEBUG("Got RST_STREAM; stream: %u; offset: 0x%"PRIX64, stream_id,
1233                                                                    offset);
1234    if (0 == stream_id)
1235    {   /* Follow reference implementation and ignore this apparently
1236         * invalid frame.
1237         */
1238        return parsed_len;
1239    }
1240
1241    if (LSQUIC_STREAM_HANDSHAKE == stream_id ||
1242        ((conn->fc_flags & FC_HTTP) && LSQUIC_STREAM_HEADERS == stream_id))
1243    {
1244        ABORT_ERROR("received reset on static stream %u", stream_id);
1245        return 0;
1246    }
1247
1248    stream = find_stream_by_id(conn, stream_id);
1249    if (!stream)
1250    {
1251        if (conn_is_stream_closed(conn, stream_id))
1252        {
1253            LSQ_DEBUG("got reset frame for closed stream %u", stream_id);
1254            return parsed_len;
1255        }
1256        if (!is_peer_initiated(conn, stream_id))
1257        {
1258            ABORT_ERROR("received reset for never-initiated stream %u",
1259                                                                    stream_id);
1260            return 0;
1261        }
1262        stream = new_stream(conn, stream_id);
1263        if (!stream)
1264        {
1265            ABORT_ERROR("cannot create new stream: %s", strerror(errno));
1266            return 0;
1267        }
1268        if (stream_id > conn->fc_max_peer_stream_id)
1269            conn->fc_max_peer_stream_id = stream_id;
1270    }
1271
1272    if (0 != lsquic_stream_rst_in(stream, offset, error_code))
1273    {
1274        ABORT_ERROR("received invalid RST_STREAM");
1275        return 0;
1276    }
1277    lsquic_send_ctl_reset_stream(&conn->fc_send_ctl, stream_id);
1278    return parsed_len;
1279}
1280
1281
1282static unsigned
1283process_window_update_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1284                                             const unsigned char *p, size_t len)
1285{
1286    uint32_t stream_id;
1287    uint64_t offset;
1288    const int parsed_len =
1289                conn->fc_conn.cn_pf->pf_parse_window_update_frame(p, len,
1290                                                        &stream_id, &offset);
1291    if (parsed_len < 0)
1292        return 0;
1293    EV_LOG_WINDOW_UPDATE_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id, offset);
1294    if (stream_id)
1295    {
1296        lsquic_stream_t *stream = find_stream_by_id(conn, stream_id);
1297        if (stream)
1298        {
1299            LSQ_DEBUG("Got window update frame, stream: %u; offset: 0x%"PRIX64,
1300                                                            stream_id, offset);
1301            lsquic_stream_window_update(stream, offset);
1302        }
1303        else    /* Perhaps a result of lost packets? */
1304            LSQ_DEBUG("Got window update frame for non-existing stream %u "
1305                                 "(offset: 0x%"PRIX64")", stream_id, offset);
1306    }
1307    else if (offset > conn->fc_pub.conn_cap.cc_max)
1308    {
1309        conn->fc_pub.conn_cap.cc_max = offset;
1310        assert(conn->fc_pub.conn_cap.cc_max >= conn->fc_pub.conn_cap.cc_sent);
1311        LSQ_DEBUG("Connection WUF, new offset 0x%"PRIX64, offset);
1312    }
1313    else
1314        LSQ_DEBUG("Throw ouw duplicate connection WUF");
1315    return parsed_len;
1316}
1317
1318
1319typedef unsigned (*process_frame_f)(
1320    struct full_conn *, lsquic_packet_in_t *, const unsigned char *p, size_t);
1321
1322static process_frame_f const process_frames[N_QUIC_FRAMES] =
1323{
1324    [QUIC_FRAME_ACK]                  =  process_ack_frame,
1325    [QUIC_FRAME_BLOCKED]              =  process_blocked_frame,
1326    [QUIC_FRAME_CONNECTION_CLOSE]     =  process_connection_close_frame,
1327    [QUIC_FRAME_GOAWAY]               =  process_goaway_frame,
1328    [QUIC_FRAME_INVALID]              =  process_invalid_frame,
1329    [QUIC_FRAME_PADDING]              =  process_padding_frame,
1330    [QUIC_FRAME_PING]                 =  process_ping_frame,
1331    [QUIC_FRAME_RST_STREAM]           =  process_rst_stream_frame,
1332    [QUIC_FRAME_STOP_WAITING]         =  process_stop_waiting_frame,
1333    [QUIC_FRAME_STREAM]               =  process_stream_frame,
1334    [QUIC_FRAME_WINDOW_UPDATE]        =  process_window_update_frame,
1335};
1336
1337static unsigned
1338process_packet_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
1339                      const unsigned char *p, size_t len)
1340{
1341    enum QUIC_FRAME_TYPE type = conn->fc_conn.cn_pf->pf_parse_frame_type(p[0]);
1342    packet_in->pi_frame_types |= 1 << type;
1343    return process_frames[type](conn, packet_in, p, len);
1344}
1345
1346
1347static void
1348process_ver_neg_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1349{
1350    int s;
1351    struct ver_iter vi;
1352    lsquic_ver_tag_t ver_tag;
1353    enum lsquic_version version;
1354    unsigned versions = 0;
1355
1356    LSQ_DEBUG("Processing version-negotiation packet");
1357
1358    if (conn->fc_ver_neg.vn_state != VN_START)
1359    {
1360        LSQ_DEBUG("ignore a likely duplicate version negotiation packet");
1361        return;
1362    }
1363
1364    for (s = packet_in_ver_first(packet_in, &vi, &ver_tag); s;
1365                     s = packet_in_ver_next(&vi, &ver_tag))
1366    {
1367        version = lsquic_tag2ver(ver_tag);
1368        if (version < N_LSQVER)
1369        {
1370            versions |= 1 << version;
1371            LSQ_DEBUG("server supports version %s", lsquic_ver2str[version]);
1372        }
1373    }
1374
1375    if (versions & (1 << conn->fc_ver_neg.vn_ver))
1376    {
1377        ABORT_ERROR("server replied with version we support: %s",
1378                                    lsquic_ver2str[conn->fc_ver_neg.vn_ver]);
1379        return;
1380    }
1381
1382    versions &= conn->fc_ver_neg.vn_supp;
1383    if (0 == versions)
1384    {
1385        ABORT_ERROR("client does not support any of the server-specified "
1386                    "versions");
1387        return;
1388    }
1389
1390    set_versions(conn, versions);
1391    conn->fc_ver_neg.vn_state = VN_IN_PROGRESS;
1392    lsquic_send_ctl_expire_all(&conn->fc_send_ctl);
1393}
1394
1395
1396static void
1397reconstruct_packet_number (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1398{
1399    lsquic_packno_t cur_packno, max_packno;
1400    enum lsquic_packno_bits bits;
1401
1402    cur_packno = packet_in->pi_packno;
1403    max_packno = lsquic_rechist_largest_packno(&conn->fc_rechist);
1404    bits = lsquic_packet_in_packno_bits(packet_in);
1405    packet_in->pi_packno = restore_packno(cur_packno, bits, max_packno);
1406    LSQ_DEBUG("reconstructed (bits: %u, packno: %"PRIu64", max: %"PRIu64") "
1407        "to %"PRIu64"", bits, cur_packno, max_packno, packet_in->pi_packno);
1408}
1409
1410
1411static int
1412conn_decrypt_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1413{
1414        return lsquic_conn_decrypt_packet(&conn->fc_conn, conn->fc_enpub,
1415                                                                packet_in);
1416}
1417
1418
1419static void
1420parse_regular_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1421{
1422    const unsigned char *p, *pend;
1423    unsigned len;
1424
1425    p = packet_in->pi_data + packet_in->pi_header_sz;
1426    pend = packet_in->pi_data + packet_in->pi_data_sz;
1427
1428    while (p < pend)
1429    {
1430        len = process_packet_frame(conn, packet_in, p, pend - p);
1431        if (len > 0)
1432            p += len;
1433        else
1434        {
1435            ABORT_ERROR("Error parsing frame");
1436            break;
1437        }
1438    }
1439}
1440
1441
1442static int
1443process_regular_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1444{
1445    enum received_st st;
1446    int was_missing;
1447
1448    reconstruct_packet_number(conn, packet_in);
1449    EV_LOG_PACKET_IN(LSQUIC_LOG_CONN_ID, packet_in);
1450
1451#if FULL_CONN_STATS
1452    ++conn->fc_stats.n_all_packets_in;
1453#endif
1454
1455    /* The packet is decrypted before receive history is updated.  This is
1456     * done to make sure that a bad packet won't occupy a slot in receive
1457     * history and subsequent good packet won't be marked as a duplicate.
1458     */
1459    if (0 == (packet_in->pi_flags & PI_DECRYPTED) &&
1460        0 != conn_decrypt_packet(conn, packet_in))
1461    {
1462        LSQ_INFO("could not decrypt packet");
1463#if FULL_CONN_STATS
1464        ++conn->fc_stats.n_undec_packets;
1465#endif
1466        return 0;
1467    }
1468
1469    st = lsquic_rechist_received(&conn->fc_rechist, packet_in->pi_packno,
1470                                                    packet_in->pi_received);
1471    switch (st) {
1472    case REC_ST_OK:
1473        parse_regular_packet(conn, packet_in);
1474        if (0 == (conn->fc_flags & FC_ACK_QUEUED))
1475        {
1476            was_missing = packet_in->pi_packno !=
1477                            lsquic_rechist_largest_packno(&conn->fc_rechist);
1478            conn->fc_n_slack_all  += 1;
1479            conn->fc_n_slack_akbl +=
1480                            !!(packet_in->pi_frame_types & QFRAME_ACKABLE_MASK);
1481            try_queueing_ack(conn, was_missing, packet_in->pi_received);
1482        }
1483        return 0;
1484    case REC_ST_DUP:
1485#if FULL_CONN_STATS
1486    ++conn->fc_stats.n_dup_packets;
1487#endif
1488        LSQ_INFO("packet %"PRIu64" is a duplicate", packet_in->pi_packno);
1489        return 0;
1490    default:
1491        assert(0);
1492        /* Fall through */
1493    case REC_ST_ERR:
1494#if FULL_CONN_STATS
1495    ++conn->fc_stats.n_err_packets;
1496#endif
1497        LSQ_INFO("error processing packet %"PRIu64, packet_in->pi_packno);
1498        return -1;
1499    }
1500}
1501
1502
1503static int
1504process_incoming_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in)
1505{
1506    LSQ_DEBUG("Processing packet %"PRIu64, packet_in->pi_packno);
1507    /* See flowchart in Section 4.1 of [draft-ietf-quic-transport-00].  We test
1508     * for the common case first.
1509     */
1510    const unsigned flags = lsquic_packet_in_public_flags(packet_in);
1511    if (0 == (flags & (PACKET_PUBLIC_FLAGS_RST|PACKET_PUBLIC_FLAGS_VERSION)))
1512    {
1513        if (conn->fc_ver_neg.vn_tag)
1514        {
1515            assert(conn->fc_ver_neg.vn_state != VN_END);
1516            conn->fc_ver_neg.vn_state = VN_END;
1517            conn->fc_ver_neg.vn_tag = NULL;
1518            conn->fc_conn.cn_version = conn->fc_ver_neg.vn_ver;
1519            conn->fc_conn.cn_flags |= LSCONN_VER_SET;
1520            if (conn->fc_conn.cn_version >= LSQVER_037)
1521            {
1522                assert(!(conn->fc_flags & FC_NSTP)); /* This bit off at start */
1523                if (conn->fc_settings->es_support_nstp)
1524                {
1525                    conn->fc_flags |= FC_NSTP;
1526                    lsquic_send_ctl_turn_nstp_on(&conn->fc_send_ctl);
1527                }
1528            }
1529            LSQ_DEBUG("end of version negotiation: agreed upon %s",
1530                                    lsquic_ver2str[conn->fc_ver_neg.vn_ver]);
1531        }
1532        return process_regular_packet(conn, packet_in);
1533    }
1534    else if (flags & PACKET_PUBLIC_FLAGS_RST)
1535    {
1536        LSQ_INFO("received public reset packet: aborting connection");
1537        conn->fc_flags |= FC_GOT_PRST;
1538        return -1;
1539    }
1540    else
1541    {
1542        if (conn->fc_flags & FC_SERVER)
1543            return process_regular_packet(conn, packet_in);
1544        else if (conn->fc_ver_neg.vn_tag)
1545        {
1546            process_ver_neg_packet(conn, packet_in);
1547            return 0;
1548        }
1549        else
1550        {
1551            LSQ_DEBUG("unexpected version negotiation packet: ignore it");
1552            return 0;
1553        }
1554    }
1555}
1556
1557
1558static void
1559idle_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now)
1560{
1561    struct full_conn *conn = ctx;
1562    LSQ_DEBUG("connection timed out");
1563    conn->fc_flags |= FC_TIMED_OUT;
1564}
1565
1566
1567static void
1568handshake_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now)
1569{
1570    struct full_conn *conn = ctx;
1571    LSQ_DEBUG("connection timed out: handshake timed out");
1572    conn->fc_flags |= FC_TIMED_OUT;
1573}
1574
1575
1576static void
1577ping_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now)
1578{
1579    struct full_conn *conn = ctx;
1580    LSQ_DEBUG("Ping alarm rang: schedule PING frame to be generated");
1581    conn->fc_flags |= FC_SEND_PING;
1582}
1583
1584
1585static void
1586zero_pad_packet (lsquic_packet_out_t *packet_out)
1587{
1588    memset(packet_out->po_data + packet_out->po_data_sz, 0,
1589                        packet_out->po_n_alloc - packet_out->po_data_sz);
1590    packet_out->po_data_sz = packet_out->po_n_alloc;
1591    packet_out->po_frame_types |= 1 << QUIC_FRAME_PADDING;
1592}
1593
1594
1595#define MAX_STREAM_FRAME_HEADER  (1 + 4 + 8 + 2)
1596#define MIN_STREAM_FRAME_PAYLOAD 10     /* Some sane value */
1597
1598#define MIN(a, b) ((a) < (b) ? (a) : (b))
1599
1600
1601static lsquic_packet_out_t *
1602get_writeable_packet (struct full_conn *conn, unsigned need_at_least)
1603{
1604    lsquic_packet_out_t *packet_out;
1605    int is_err;
1606
1607    assert(need_at_least <= QUIC_MAX_PAYLOAD_SZ);
1608    packet_out = lsquic_send_ctl_get_writeable_packet(&conn->fc_send_ctl,
1609                                                    need_at_least, &is_err);
1610    if (!packet_out && is_err)
1611        ABORT_ERROR("cannot allocate packet: %s", strerror(errno));
1612    return packet_out;
1613}
1614
1615
1616static int
1617generate_wuf_stream (struct full_conn *conn, lsquic_stream_t *stream)
1618{
1619    lsquic_packet_out_t *packet_out = get_writeable_packet(conn, QUIC_WUF_SZ);
1620    if (!packet_out)
1621        return 0;
1622    const uint64_t recv_off = lsquic_stream_fc_recv_off(stream);
1623    int sz = conn->fc_conn.cn_pf->pf_gen_window_update_frame(
1624                packet_out->po_data + packet_out->po_data_sz,
1625                     lsquic_packet_out_avail(packet_out), stream->id, recv_off);
1626    if (sz < 0) {
1627        ABORT_ERROR("gen_window_update_frame failed");
1628        return 0;
1629    }
1630    packet_out->po_data_sz += sz;
1631    packet_out->po_frame_types |= 1 << QUIC_FRAME_WINDOW_UPDATE;
1632    LSQ_DEBUG("wrote WUF: stream %u; offset 0x%"PRIX64, stream->id, recv_off);
1633    return 1;
1634}
1635
1636
1637static void
1638generate_wuf_conn (struct full_conn *conn)
1639{
1640    assert(conn->fc_flags & FC_SEND_WUF);
1641    lsquic_packet_out_t *packet_out = get_writeable_packet(conn, QUIC_WUF_SZ);
1642    if (!packet_out)
1643        return;
1644    const uint64_t recv_off = lsquic_cfcw_get_fc_recv_off(&conn->fc_pub.cfcw);
1645    int sz = conn->fc_conn.cn_pf->pf_gen_window_update_frame(
1646                     packet_out->po_data + packet_out->po_data_sz,
1647                     lsquic_packet_out_avail(packet_out), 0, recv_off);
1648    if (sz < 0) {
1649        ABORT_ERROR("gen_window_update_frame failed");
1650        return;
1651    }
1652    packet_out->po_data_sz += sz;
1653    packet_out->po_frame_types |= 1 << QUIC_FRAME_WINDOW_UPDATE;
1654    conn->fc_flags &= ~FC_SEND_WUF;
1655    LSQ_DEBUG("wrote connection WUF: offset 0x%"PRIX64, recv_off);
1656}
1657
1658
1659static void
1660generate_goaway_frame (struct full_conn *conn)
1661{
1662    int reason_len = 0;
1663    lsquic_packet_out_t *packet_out =
1664        get_writeable_packet(conn, QUIC_GOAWAY_FRAME_SZ + reason_len);
1665    if (!packet_out)
1666        return;
1667    int sz = conn->fc_conn.cn_pf->pf_gen_goaway_frame(
1668                 packet_out->po_data + packet_out->po_data_sz,
1669                 lsquic_packet_out_avail(packet_out), 0, conn->fc_max_peer_stream_id,
1670                 NULL, reason_len);
1671    if (sz < 0) {
1672        ABORT_ERROR("gen_goaway_frame failed");
1673        return;
1674    }
1675    packet_out->po_data_sz += sz;
1676    packet_out->po_frame_types |= 1 << QUIC_FRAME_GOAWAY;
1677    conn->fc_flags &= ~FC_SEND_GOAWAY;
1678    conn->fc_flags |=  FC_GOAWAY_SENT;
1679    LSQ_DEBUG("wrote GOAWAY frame: stream id: %u", conn->fc_max_peer_stream_id);
1680}
1681
1682
1683static void
1684generate_connection_close_packet (struct full_conn *conn)
1685{
1686    lsquic_packet_out_t *packet_out;
1687
1688    packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0);
1689    if (!packet_out)
1690    {
1691        ABORT_ERROR("cannot allocate packet: %s", strerror(errno));
1692        return;
1693    }
1694
1695    lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out);
1696    int sz = conn->fc_conn.cn_pf->pf_gen_connect_close_frame(packet_out->po_data + packet_out->po_data_sz,
1697                     lsquic_packet_out_avail(packet_out), 16 /* PEER_GOING_AWAY */,
1698                     NULL, 0);
1699    if (sz < 0) {
1700        ABORT_ERROR("generate_connection_close_packet failed");
1701        return;
1702    }
1703    packet_out->po_data_sz += sz;
1704    packet_out->po_frame_types |= 1 << QUIC_FRAME_CONNECTION_CLOSE;
1705    LSQ_DEBUG("generated CONNECTION_CLOSE frame in its own packet");
1706}
1707
1708
1709static int
1710generate_blocked_frame (struct full_conn *conn, uint32_t stream_id)
1711{
1712    lsquic_packet_out_t *packet_out =
1713                            get_writeable_packet(conn, QUIC_BLOCKED_FRAME_SZ);
1714    if (!packet_out)
1715        return 0;
1716    int sz = conn->fc_conn.cn_pf->pf_gen_blocked_frame(
1717                                 packet_out->po_data + packet_out->po_data_sz,
1718                                 lsquic_packet_out_avail(packet_out), stream_id);
1719    if (sz < 0) {
1720        ABORT_ERROR("gen_blocked_frame failed");
1721        return 0;
1722    }
1723    packet_out->po_data_sz += sz;
1724    packet_out->po_frame_types |= 1 << QUIC_FRAME_BLOCKED;
1725    LSQ_DEBUG("wrote blocked frame: stream %u", stream_id);
1726    return 1;
1727}
1728
1729
1730static int
1731generate_stream_blocked_frame (struct full_conn *conn, lsquic_stream_t *stream)
1732{
1733    if (generate_blocked_frame(conn, stream->id))
1734    {
1735        lsquic_stream_blocked_frame_sent(stream);
1736        return 1;
1737    }
1738    else
1739        return 0;
1740}
1741
1742
1743static int
1744generate_rst_stream_frame (struct full_conn *conn, lsquic_stream_t *stream)
1745{
1746    lsquic_packet_out_t *packet_out;
1747    int sz, s;
1748
1749    packet_out = get_writeable_packet(conn, QUIC_RST_STREAM_SZ);
1750    if (!packet_out)
1751        return 0;
1752    sz = conn->fc_conn.cn_pf->pf_gen_rst_frame(
1753                     packet_out->po_data + packet_out->po_data_sz,
1754                     lsquic_packet_out_avail(packet_out), stream->id,
1755                     stream->tosend_off, stream->error_code);
1756    if (sz < 0) {
1757        ABORT_ERROR("gen_rst_frame failed");
1758        return 0;
1759    }
1760    packet_out->po_data_sz += sz;
1761    packet_out->po_frame_types |= 1 << QUIC_FRAME_RST_STREAM;
1762    s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream,
1763                                                    QUIC_FRAME_RST_STREAM, 0);
1764    if (s != 0)
1765    {
1766        ABORT_ERROR("adding stream to packet failed: %s", strerror(errno));
1767        return 0;
1768    }
1769    lsquic_stream_rst_frame_sent(stream);
1770    LSQ_DEBUG("wrote RST: stream %u; offset 0x%"PRIX64"; error code 0x%X",
1771                        stream->id, stream->tosend_off, stream->error_code);
1772    return 1;
1773}
1774
1775
1776static void
1777generate_ping_frame (struct full_conn *conn)
1778{
1779    lsquic_packet_out_t *packet_out = get_writeable_packet(conn, 1);
1780    if (!packet_out)
1781    {
1782        LSQ_DEBUG("cannot get writeable packet for PING frame");
1783        return;
1784    }
1785    int sz = conn->fc_conn.cn_pf->pf_gen_ping_frame(
1786                            packet_out->po_data + packet_out->po_data_sz,
1787                            lsquic_packet_out_avail(packet_out));
1788    if (sz < 0) {
1789        ABORT_ERROR("gen_blocked_frame failed");
1790        return;
1791    }
1792    packet_out->po_data_sz += sz;
1793    packet_out->po_frame_types |= 1 << QUIC_FRAME_PING;
1794    LSQ_DEBUG("wrote PING frame");
1795}
1796
1797
1798static void
1799generate_stop_waiting_frame (struct full_conn *conn)
1800{
1801    assert(conn->fc_flags & FC_SEND_STOP_WAITING);
1802
1803    int sz;
1804    unsigned packnum_len;
1805    lsquic_packno_t least_unacked;
1806    lsquic_packet_out_t *packet_out;
1807
1808    /* Get packet that has room for the minimum size STOP_WAITING frame: */
1809    packet_out = get_writeable_packet(conn, 1 + packno_bits2len(PACKNO_LEN_1));
1810    if (!packet_out)
1811        return;
1812
1813    /* Now calculate number of bytes we really need.  If there is not enough
1814     * room in the current packet, get a new one.
1815     */
1816    packnum_len = packno_bits2len(lsquic_packet_out_packno_bits(packet_out));
1817    if ((unsigned) lsquic_packet_out_avail(packet_out) < 1 + packnum_len)
1818    {
1819        packet_out = get_writeable_packet(conn, 1 + packnum_len);
1820        if (!packet_out)
1821            return;
1822        /* Here, a new packet has been allocated, The number of bytes needed
1823         * to represent packet number in the STOP_WAITING frame may have
1824         * increased.  However, this does not matter, because the newly
1825         * allocated packet must have room for a STOP_WAITING frame of any
1826         * size.
1827         */
1828    }
1829
1830    least_unacked = lsquic_send_ctl_smallest_unacked(&conn->fc_send_ctl);
1831    sz = conn->fc_conn.cn_pf->pf_gen_stop_waiting_frame(
1832                    packet_out->po_data + packet_out->po_data_sz,
1833                    lsquic_packet_out_avail(packet_out), packet_out->po_packno,
1834                    lsquic_packet_out_packno_bits(packet_out), least_unacked);
1835    if (sz < 0) {
1836        ABORT_ERROR("gen_stop_waiting_frame failed");
1837        return;
1838    }
1839    packet_out->po_data_sz += sz;
1840    packet_out->po_regen_sz += sz;
1841    packet_out->po_frame_types |= 1 << QUIC_FRAME_STOP_WAITING;
1842    conn->fc_flags &= ~FC_SEND_STOP_WAITING;
1843    LSQ_DEBUG("wrote STOP_WAITING frame: least unacked: %"PRIu64,
1844                                                            least_unacked);
1845    EV_LOG_GENERATED_STOP_WAITING_FRAME(LSQUIC_LOG_CONN_ID, least_unacked);
1846}
1847
1848
1849static int
1850generate_stream_frame (struct full_conn *conn, lsquic_stream_t *stream)
1851{
1852    lsquic_packet_out_t *packet_out;
1853    size_t n_to_send, need_at_least;
1854    int len, s;
1855    unsigned short off;
1856
1857    /* Send one packet's worth */
1858    n_to_send = lsquic_stream_tosend_sz(stream);
1859    assert(n_to_send > 0 || lsquic_stream_tosend_fin(stream));  /* Otherwise, why are we called? */
1860    need_at_least =
1861        MAX_STREAM_FRAME_HEADER + MIN(MIN_STREAM_FRAME_PAYLOAD, n_to_send);
1862    packet_out = get_writeable_packet(conn, need_at_least);
1863    if (!packet_out)
1864        return 0;
1865    off = packet_out->po_data_sz;
1866    len = conn->fc_conn.cn_pf->pf_gen_stream_frame(
1867                packet_out->po_data + packet_out->po_data_sz,
1868                lsquic_packet_out_avail(packet_out), stream->id,
1869                lsquic_stream_tosend_offset(stream),
1870                (gsf_fin_f) lsquic_stream_tosend_fin,
1871                (gsf_size_f) lsquic_stream_tosend_sz,
1872                (gsf_read_f) lsquic_stream_tosend_read, stream);
1873    if (len < 0)
1874    {
1875        ABORT_ERROR("gen_stream_frame failed");
1876        return 0;
1877    }
1878#if FULL_CONN_STATS
1879    conn->fc_stats.stream_data_sz += len -
1880        conn->fc_conn.cn_pf->pf_parse_stream_frame_header_sz(
1881                                packet_out->po_data[packet_out->po_data_sz]);
1882#endif
1883    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf,
1884                            packet_out->po_data + packet_out->po_data_sz, len);
1885    packet_out->po_data_sz += len;
1886    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1887    s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream,
1888                                                        QUIC_FRAME_STREAM, off);
1889    if (s != 0)
1890    {
1891        ABORT_ERROR("adding stream to packet failed: %s", strerror(errno));
1892        return 0;
1893    }
1894    lsquic_stream_stream_frame_sent(stream);
1895    LSQ_DEBUG("Put %d bytes of data from stream %u into packet on outgoing "
1896        "queue", len, stream->id);
1897    return 1;
1898}
1899
1900
1901/* TODO I think we can separate this function into two functions: one that
1902 *  processes STREAM_SEND_WUF, STREAM_SEND_BLOCKED, and STREAM_SEND_RST
1903 *  queues, and one that processes STREAM_SEND_DATA queue, as the four flags
1904 *  are not dependent upon each other.  Double-check this and implement.
1905 */
1906static int
1907process_stream_ready_to_send (struct full_conn *conn, lsquic_stream_t *stream,
1908                              int send_data)
1909{
1910    int r = 1, written;
1911    if (stream->stream_flags & STREAM_SEND_WUF)
1912        r &= generate_wuf_stream(conn, stream);
1913    if (send_data && (stream->stream_flags & STREAM_SEND_DATA))
1914    {
1915        if (LSQUIC_STREAM_HANDSHAKE == stream->id)
1916        {
1917            /* Handshake messages are sent in brand-new packets.  If handshake
1918             * is not complete, the packet is zero-padded.
1919             */
1920            lsquic_packet_out_t *packet_out = get_writeable_packet(conn, 0);
1921            written = generate_stream_frame(conn, stream);
1922            r &= written;
1923            if (written)
1924            {
1925                packet_out->po_flags |= PO_HELLO;
1926                LSQ_DEBUG("packet %"PRIu64" is carrying CHLO data",
1927                                                    packet_out->po_packno);
1928                if (!(conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE))
1929                {
1930                    zero_pad_packet(packet_out);
1931                    LSQ_DEBUG("zero-padded packet %"PRIu64,
1932                                                    packet_out->po_packno);
1933                }
1934            }
1935            assert(packet_out ==
1936                        lsquic_send_ctl_last_scheduled(&conn->fc_send_ctl));
1937        }
1938        else
1939            r &= generate_stream_frame(conn, stream);
1940    }
1941    if (stream->stream_flags & STREAM_SEND_BLOCKED)
1942        r &= generate_stream_blocked_frame(conn, stream);
1943    if (stream->stream_flags & STREAM_SEND_RST)
1944        r &= generate_rst_stream_frame(conn, stream);
1945    return r;
1946}
1947
1948
1949/* Return 1 if any STREAM frames were packetized, 0 otherwise. */
1950static int
1951process_streams_ready_to_send (struct full_conn *conn)
1952{
1953    lsquic_stream_t *stream;
1954    struct stream_prio_iter spi;
1955    int stream_frames_packetized;
1956
1957    assert(!TAILQ_EMPTY(&conn->fc_pub.sending_streams));
1958
1959    lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.sending_streams),
1960        TAILQ_LAST(&conn->fc_pub.sending_streams, lsquic_streams_tailq),
1961        (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_send_stream),
1962        STREAM_SENDING_FLAGS, conn->fc_conn.cn_cid, "send");
1963
1964    /* The first iteration takes care of all WINDOW_UPDATE, BLOCKED, and
1965     * RST_STREAM frames.  After that, we turn on priority level exhaustion
1966     * mechanism in the iterator and send data.
1967     */
1968
1969    for (stream = lsquic_spi_first(&spi); stream;
1970                                            stream = lsquic_spi_next(&spi))
1971        if (!process_stream_ready_to_send(conn, stream, 0))
1972            return 0;
1973
1974    lsquic_spi_exhaust_on(&spi);
1975
1976    stream_frames_packetized = 0;
1977    for (stream = lsquic_spi_first(&spi); stream;
1978                                            stream = lsquic_spi_next(&spi))
1979        if (process_stream_ready_to_send(conn, stream, 1))
1980            ++stream_frames_packetized;
1981        else
1982            break;
1983
1984    return stream_frames_packetized > 0;
1985}
1986
1987
1988static void
1989service_streams (struct full_conn *conn)
1990{
1991    struct lsquic_hash_elem *el;
1992    lsquic_stream_t *stream, *next;
1993    int n_our_destroyed = 0;
1994
1995    for (stream = TAILQ_FIRST(&conn->fc_pub.service_streams); stream; stream = next)
1996    {
1997        next = TAILQ_NEXT(stream, next_service_stream);
1998        if (stream->stream_flags & STREAM_CALL_ONCLOSE)
1999            lsquic_stream_call_on_close(stream);
2000        if (stream->stream_flags & STREAM_FREE_STREAM)
2001        {
2002            n_our_destroyed += is_our_stream(conn, stream);
2003            TAILQ_REMOVE(&conn->fc_pub.service_streams, stream, next_service_stream);
2004            el = lsquic_hash_find(conn->fc_streams, &stream->id, sizeof(stream->id));
2005            if (el)
2006                lsquic_hash_erase(conn->fc_streams, el);
2007            conn_mark_stream_closed(conn, stream->id);
2008            SAVE_STREAM_HISTORY(conn, stream);
2009            lsquic_stream_destroy(stream);
2010        }
2011    }
2012
2013    if (either_side_going_away(conn))
2014        while (conn->fc_n_delayed_streams)
2015        {
2016            --conn->fc_n_delayed_streams;
2017            LSQ_DEBUG("goaway mode: delayed stream results in null ctor");
2018            (void) conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_new_stream(
2019                conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx, NULL);
2020        }
2021    else
2022        while (n_our_destroyed && conn->fc_n_delayed_streams)
2023        {
2024            --n_our_destroyed;
2025            --conn->fc_n_delayed_streams;
2026            LSQ_DEBUG("creating delayed stream");
2027            if (!new_stream(conn, generate_stream_id(conn)))
2028            {
2029                ABORT_ERROR("%s: cannot create new stream: %s", __func__,
2030                                                            strerror(errno));
2031                break;
2032            }
2033            assert(count_streams(conn, 0) <= conn->fc_cfg.max_streams_out);
2034        }
2035}
2036
2037
2038struct filter_stream_ctx
2039{
2040    struct full_conn    *conn;
2041    uint32_t             last_stream_id,
2042                         max_peer_stream_id;
2043};
2044
2045
2046static int
2047filter_out_old_streams (void *ctx, lsquic_stream_t *stream)
2048{
2049    struct filter_stream_ctx *const fctx = ctx;
2050    return ((!((stream->id ^ fctx->last_stream_id)     & 1) &&
2051                                   stream->id > fctx->last_stream_id)
2052           ||
2053            (!((stream->id ^ fctx->max_peer_stream_id) & 1) &&
2054                                   stream->id > fctx->max_peer_stream_id));
2055}
2056
2057
2058static int
2059dispatch_stream_rw_events (struct full_conn *conn, lsquic_stream_t *stream)
2060{
2061    struct stream_rw_prog_status saved_status;
2062    int is_reset, progress_made;
2063
2064    is_reset = lsquic_stream_is_reset(stream);
2065    lsquic_stream_get_rw_prog_status(stream, &saved_status);
2066    lsquic_stream_dispatch_rw_events(stream);
2067    progress_made = lsquic_stream_progress_was_made(stream, &saved_status);
2068    if (!is_reset && lsquic_stream_is_reset(stream))
2069        lsquic_send_ctl_reset_stream(&conn->fc_send_ctl, stream->id);
2070
2071    return progress_made;
2072}
2073
2074
2075/* Return 1 if progress was made, 0 otherwise */
2076static int
2077process_streams_rw_events (struct full_conn *conn)
2078{
2079    lsquic_stream_t *stream;
2080    struct filter_stream_ctx fctx;
2081    struct stream_prio_iter spi;
2082    int progress_count;
2083
2084    if (TAILQ_EMPTY(&conn->fc_pub.rw_streams))
2085        return 0;
2086
2087    progress_count = 0;
2088
2089    fctx.last_stream_id     = conn->fc_last_stream_id;
2090    fctx.max_peer_stream_id = conn->fc_max_peer_stream_id;
2091
2092    lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.rw_streams),
2093        TAILQ_LAST(&conn->fc_pub.rw_streams, lsquic_streams_tailq),
2094        (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_rw_stream),
2095        STREAM_RW_EVENT_FLAGS, conn->fc_conn.cn_cid, "rw-1");
2096
2097    for (stream = lsquic_spi_first(&spi); stream;
2098                                            stream = lsquic_spi_next(&spi))
2099        progress_count +=
2100            dispatch_stream_rw_events(conn, stream);
2101
2102    /* If new streams were created as result of the RW dispatching above,
2103     * process these new streams.
2104     */
2105    if (fctx.last_stream_id     < conn->fc_last_stream_id ||
2106        fctx.max_peer_stream_id < conn->fc_max_peer_stream_id)
2107    {
2108        fctx.conn = conn;
2109        lsquic_spi_init_ext(&spi, TAILQ_FIRST(&conn->fc_pub.rw_streams),
2110            TAILQ_LAST(&conn->fc_pub.rw_streams, lsquic_streams_tailq),
2111            (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_rw_stream),
2112            STREAM_RW_EVENT_FLAGS, filter_out_old_streams, &fctx,
2113            conn->fc_conn.cn_cid, "rw-2");
2114        for (stream = lsquic_spi_first(&spi); stream;
2115                                                stream = lsquic_spi_next(&spi))
2116            progress_count +=
2117                dispatch_stream_rw_events(conn, stream);
2118    }
2119
2120    return progress_count > 0;
2121}
2122
2123
2124/* Return 1 if progress was made, 0 otherwise. */
2125static int
2126process_hsk_stream_rw_events (struct full_conn *conn)
2127{
2128    lsquic_stream_t *stream;
2129    TAILQ_FOREACH(stream, &conn->fc_pub.rw_streams, next_rw_stream)
2130        if (LSQUIC_STREAM_HANDSHAKE == stream->id)
2131            return dispatch_stream_rw_events(conn, stream);
2132    return 0;
2133}
2134
2135
2136#if 1
2137#   define verify_ack_frame(a, b, c)
2138#else
2139static void
2140verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz)
2141{
2142    unsigned i;
2143    int parsed_len;
2144    struct ack_info *ack_info;
2145    const struct lsquic_packno_range *range;
2146    char ack_buf[512];
2147    unsigned buf_off = 0;
2148    int nw;
2149
2150    ack_info = conn->fc_pub.mm->acki;
2151    parsed_len = parse_ack_frame(buf, bufsz, ack_info);
2152    assert(parsed_len == bufsz);
2153
2154    for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range;
2155            range = lsquic_rechist_next(&conn->fc_rechist), ++i)
2156    {
2157        assert(i < ack_info->n_ranges);
2158        assert(range->high == ack_info->ranges[i].high);
2159        assert(range->low == ack_info->ranges[i].low);
2160        if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
2161        {
2162            nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off,
2163                            "[%"PRIu64"-%"PRIu64"]", range->high, range->low);
2164            assert(nw >= 0);
2165            buf_off += nw;
2166        }
2167    }
2168    assert(i == ack_info->n_ranges);
2169    LSQ_DEBUG("Sent ACK frame %s", ack_buf);
2170}
2171
2172
2173#endif
2174
2175
2176static void
2177generate_ack_frame (struct full_conn *conn)
2178{
2179    lsquic_packet_out_t *packet_out;
2180    lsquic_time_t now;
2181    int has_missing, w;
2182
2183    packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0);
2184    if (!packet_out)
2185    {
2186        ABORT_ERROR("cannot allocate packet: %s", strerror(errno));
2187        return;
2188    }
2189
2190    lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out);
2191    now = lsquic_time_now();
2192    w = conn->fc_conn.cn_pf->pf_gen_ack_frame(
2193            packet_out->po_data + packet_out->po_data_sz,
2194            lsquic_packet_out_avail(packet_out),
2195            (gaf_rechist_first_f)        lsquic_rechist_first,
2196            (gaf_rechist_next_f)         lsquic_rechist_next,
2197            (gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv,
2198            &conn->fc_rechist, now, &has_missing);
2199    if (w < 0) {
2200        ABORT_ERROR("generating ACK frame failed: %d", errno);
2201        return;
2202    }
2203    EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf,
2204                        packet_out->po_data + packet_out->po_data_sz, w);
2205    verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w);
2206    lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl);
2207    packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK;
2208    packet_out->po_data_sz += w;
2209    packet_out->po_regen_sz += w;
2210    if (has_missing)
2211        conn->fc_flags |= FC_ACK_HAD_MISS;
2212    else
2213        conn->fc_flags &= ~FC_ACK_HAD_MISS;
2214    LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w);
2215    if (conn->fc_conn.cn_version >= LSQVER_039 &&
2216            conn->fc_n_cons_unretx >= 20 &&
2217                !lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl))
2218    {
2219        LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx "
2220                                    "packets sent", conn->fc_n_cons_unretx);
2221        conn->fc_flags |= FC_SEND_WUF;
2222    }
2223}
2224
2225
2226static int
2227conn_ok_to_close (const struct full_conn *conn)
2228{
2229    assert(conn->fc_flags & FC_CLOSING);
2230    return !(conn->fc_flags & FC_SERVER)
2231        || (conn->fc_flags & FC_RECV_CLOSE)
2232        || (
2233               !lsquic_send_ctl_have_outgoing_stream_frames(&conn->fc_send_ctl)
2234            && lsquic_hash_count(conn->fc_streams) == 0
2235            && lsquic_send_ctl_have_unacked_stream_frames(&conn->fc_send_ctl) == 0);
2236}
2237
2238
2239static enum tick_st
2240immediate_close (struct full_conn *conn)
2241{
2242    lsquic_packet_out_t *packet_out;
2243    const char *error_reason;
2244    unsigned error_code;
2245    int sz;
2246
2247    if (conn->fc_flags & (FC_TICK_CLOSE|FC_GOT_PRST))
2248        return TICK_CLOSE;
2249
2250    conn->fc_flags |= FC_TICK_CLOSE;
2251
2252    /* No reason to send anything that's been scheduled if connection is
2253     * being closed immedately.  This also ensures that packet numbers
2254     * sequence is always increasing.
2255     */
2256    lsquic_send_ctl_drop_scheduled(&conn->fc_send_ctl);
2257
2258    if ((conn->fc_flags & FC_TIMED_OUT) && conn->fc_settings->es_silent_close)
2259        return TICK_CLOSE;
2260
2261    packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0);
2262    if (!packet_out)
2263    {
2264        LSQ_WARN("cannot allocate packet: %s", strerror(errno));
2265        return TICK_CLOSE;
2266    }
2267
2268    assert(conn->fc_flags & (FC_ERROR|FC_ABORTED|FC_TIMED_OUT));
2269    if (conn->fc_flags & FC_ERROR)
2270    {
2271        error_code = 0x01; /* QUIC_INTERNAL_ERROR */
2272        error_reason = "connection error";
2273    }
2274    else if (conn->fc_flags & FC_ABORTED)
2275    {
2276        error_code = 0x10; /* QUIC_PEER_GOING_AWAY */
2277        error_reason = "user aborted connection";
2278    }
2279    else if (conn->fc_flags & FC_TIMED_OUT)
2280    {
2281        error_code = 0x19; /* QUIC_NETWORK_IDLE_TIMEOUT */
2282        error_reason = "connection timed out";
2283    }
2284    else
2285    {
2286        error_code = 0x10; /* QUIC_PEER_GOING_AWAY */
2287        error_reason = NULL;
2288    }
2289
2290    lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out);
2291    sz = conn->fc_conn.cn_pf->pf_gen_connect_close_frame(
2292                     packet_out->po_data + packet_out->po_data_sz,
2293                     lsquic_packet_out_avail(packet_out), error_code,
2294                     error_reason, error_reason ? strlen(error_reason) : 0);
2295    if (sz < 0) {
2296        LSQ_WARN("%s failed", __func__);
2297        return TICK_CLOSE;
2298    }
2299    packet_out->po_data_sz += sz;
2300    packet_out->po_frame_types |= 1 << QUIC_FRAME_CONNECTION_CLOSE;
2301    LSQ_DEBUG("generated CONNECTION_CLOSE frame in its own packet");
2302    return TICK_SEND|TICK_CLOSE;
2303}
2304
2305
2306static enum tick_st
2307full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
2308{
2309    struct full_conn *conn = (struct full_conn *) lconn;
2310    lsquic_packet_out_t *last_packet_out;
2311    int have_delayed_packets;
2312    unsigned n;
2313    int progress_made;
2314    enum tick_st progress_tick = 0;
2315
2316#define CLOSE_IF_NECESSARY() do {                       \
2317    if (conn->fc_flags & FC_IMMEDIATE_CLOSE_FLAGS)     \
2318        return progress_tick | immediate_close(conn);                   \
2319} while (0)
2320
2321#define RETURN_IF_OUT_OF_PACKETS() do {                                 \
2322    if (!lsquic_send_ctl_can_send(&conn->fc_send_ctl))                  \
2323    {                                                                   \
2324        if (0 == lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl))       \
2325        {                                                               \
2326            LSQ_DEBUG("used up packet allowance, quiet now (line %d)",  \
2327                __LINE__);                                              \
2328            return progress_tick | TICK_QUIET;                          \
2329        }                                                               \
2330        else                                                            \
2331        {                                                               \
2332            LSQ_DEBUG("used up packet allowance, sending now (line %d)",\
2333                __LINE__);                                              \
2334            return progress_tick | TICK_SEND;                           \
2335        }                                                               \
2336    }                                                                   \
2337} while (0)
2338
2339    assert(!(conn->fc_conn.cn_flags & LSCONN_RW_PENDING));
2340
2341    lsquic_send_ctl_tick(&conn->fc_send_ctl, now);
2342    CLOSE_IF_NECESSARY();
2343
2344    if (!(conn->fc_flags & FC_SERVER))
2345    {
2346        lsquic_alarmset_unset(&conn->fc_alset, AL_PING);
2347        lsquic_send_ctl_sanity_check(&conn->fc_send_ctl);
2348    }
2349
2350    lsquic_alarmset_ring_expired(&conn->fc_alset, now);
2351    CLOSE_IF_NECESSARY();
2352
2353    /* To make things simple, only stream 1 is active until the handshake
2354     * has been completed.  This will be adjusted in the future: the client
2355     * does not want to wait if it has the server information.
2356     */
2357    if (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE)
2358        progress_made = process_streams_rw_events(conn);
2359    else
2360        progress_made = process_hsk_stream_rw_events(conn);
2361    progress_tick |= progress_made << TICK_BIT_PROGRESS;
2362    CLOSE_IF_NECESSARY();
2363
2364    if (conn->fc_flags & FC_FIRST_TICK)
2365    {
2366        conn->fc_flags &= ~FC_FIRST_TICK;
2367        have_delayed_packets = 0;
2368    }
2369    else
2370        /* If there are any scheduled packets at this point, it means that
2371         * they were not sent during previous tick; in other words, they
2372         * are delayed.  When there are delayed packets, the only packet
2373         * we sometimes add is a packet with an ACK frame, and we add it
2374         * to the *front* of the queue.
2375         */
2376        have_delayed_packets = lsquic_send_ctl_squeeze_sched(
2377                                                    &conn->fc_send_ctl);
2378
2379    if ((conn->fc_flags & FC_ACK_QUEUED) ||
2380                            lsquic_send_ctl_lost_ack(&conn->fc_send_ctl))
2381    {
2382        if (have_delayed_packets)
2383            lsquic_send_ctl_reset_packnos(&conn->fc_send_ctl);
2384
2385        /* ACK frame generation fails with an error if it does not fit into
2386         * a single packet (it always should fit).
2387         */
2388        generate_ack_frame(conn);
2389        CLOSE_IF_NECESSARY();
2390        reset_ack_state(conn);
2391
2392        /* Try to send STOP_WAITING frame at the same time we send an ACK
2393         * This follows reference implementation.
2394         */
2395        if (!(conn->fc_flags & FC_NSTP))
2396            conn->fc_flags |= FC_SEND_STOP_WAITING;
2397
2398        if (have_delayed_packets)
2399        {
2400            if (conn->fc_flags & FC_SEND_STOP_WAITING)
2401            {
2402                /* TODO: ensure that STOP_WAITING frame is in the same packet
2403                 * as the ACK frame in delayed packet mode.
2404                 */
2405                generate_stop_waiting_frame(conn);
2406                CLOSE_IF_NECESSARY();
2407            }
2408            lsquic_send_ctl_ack_to_front(&conn->fc_send_ctl);
2409        }
2410    }
2411
2412    if (have_delayed_packets)
2413        /* The reason for not adding STOP_WAITING and other frames below
2414         * to the packet carrying ACK frame generated when there are delayed
2415         * packets is so that if the ACK packet itself is delayed, it can be
2416         * dropped and replaced by new ACK packet.  This way, we are never
2417         * more than 1 packet over CWND.
2418         */
2419        return progress_tick | TICK_SEND;
2420
2421    RETURN_IF_OUT_OF_PACKETS();
2422
2423    /* Try to fit any of the following three frames -- STOP_WAITING,
2424     * WINDOW_UPDATE, and GOAWAY -- before checking if we have run
2425     * out of packets.  If either of them does not fit, it will be
2426     * tried next time around.
2427     */
2428    if (conn->fc_flags & FC_SEND_STOP_WAITING)
2429    {
2430        generate_stop_waiting_frame(conn);
2431        CLOSE_IF_NECESSARY();
2432    }
2433
2434    if (lsquic_cfcw_fc_offsets_changed(&conn->fc_pub.cfcw) ||
2435                                (conn->fc_flags & FC_SEND_WUF))
2436    {
2437        conn->fc_flags |= FC_SEND_WUF;
2438        generate_wuf_conn(conn);
2439        CLOSE_IF_NECESSARY();
2440    }
2441
2442    if (conn->fc_flags & FC_SEND_GOAWAY)
2443    {
2444        generate_goaway_frame(conn);
2445        CLOSE_IF_NECESSARY();
2446    }
2447
2448    n = lsquic_send_ctl_reschedule_packets(&conn->fc_send_ctl);
2449    if (n > 0)
2450        CLOSE_IF_NECESSARY();
2451
2452    RETURN_IF_OUT_OF_PACKETS();
2453
2454    if (conn->fc_conn.cn_flags & LSCONN_SEND_BLOCKED)
2455    {
2456        if (generate_blocked_frame(conn, 0))
2457            conn->fc_conn.cn_flags &= ~LSCONN_SEND_BLOCKED;
2458        else
2459            RETURN_IF_OUT_OF_PACKETS();
2460    }
2461
2462    if (!TAILQ_EMPTY(&conn->fc_pub.sending_streams))
2463    {
2464        progress_made = process_streams_ready_to_send(conn);
2465        progress_tick |= progress_made << TICK_BIT_PROGRESS;
2466        /* If last packet on the queue contains a stream-related frame, mark it
2467         * unwriteable to force a new packet allocation on the next tick.  This
2468         * is to prevent more than one STREAM or RST_STREAM frame from the same
2469         * stream to be written to the same packet.
2470         */
2471        last_packet_out = lsquic_send_ctl_last_scheduled(&conn->fc_send_ctl);
2472        if (last_packet_out &&
2473                (last_packet_out->po_frame_types &
2474                    ((1 << QUIC_FRAME_STREAM)|(1 << QUIC_FRAME_RST_STREAM))))
2475            last_packet_out->po_flags &= ~PO_WRITEABLE;
2476    }
2477    CLOSE_IF_NECESSARY();
2478
2479    service_streams(conn);
2480
2481    RETURN_IF_OUT_OF_PACKETS();
2482
2483    if ((conn->fc_flags & FC_CLOSING) && conn_ok_to_close(conn))
2484    {
2485        LSQ_DEBUG("connection is OK to close");
2486        /* This is normal termination sequence.
2487         *
2488         * Generate CONNECTION_CLOSE frame if we are responding to one, have
2489         * packets scheduled to send, or silent close flag is not set.
2490         */
2491        conn->fc_flags |= FC_TICK_CLOSE;
2492        if ((conn->fc_flags & FC_RECV_CLOSE) ||
2493                0 != lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl) ||
2494                                        !conn->fc_settings->es_silent_close)
2495        {
2496            generate_connection_close_packet(conn);
2497            return progress_tick | TICK_SEND|TICK_CLOSE;
2498        }
2499        else
2500            return progress_tick | TICK_CLOSE;
2501    }
2502
2503    if (0 == lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl))
2504    {
2505        if (conn->fc_flags & FC_SEND_PING)
2506        {
2507            conn->fc_flags &= ~FC_SEND_PING;
2508            generate_ping_frame(conn);
2509            CLOSE_IF_NECESSARY();
2510            assert(lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl) != 0);
2511        }
2512        else
2513            return progress_tick | TICK_QUIET;
2514    }
2515    else if (!(conn->fc_flags & FC_SERVER))
2516    {
2517        lsquic_alarmset_unset(&conn->fc_alset, AL_PING);
2518        lsquic_send_ctl_sanity_check(&conn->fc_send_ctl);
2519        conn->fc_flags &= ~FC_SEND_PING;   /* It may have rung */
2520    }
2521
2522    now = lsquic_time_now();
2523    lsquic_alarmset_set(&conn->fc_alset, AL_IDLE,
2524                                now + conn->fc_settings->es_idle_conn_to);
2525
2526    /* From the spec:
2527     *  " The PING frame should be used to keep a connection alive when
2528     *  " a stream is open.
2529     */
2530    if (0 == (conn->fc_flags & FC_SERVER) &&
2531                                        lsquic_hash_count(conn->fc_streams) > 0)
2532        lsquic_alarmset_set(&conn->fc_alset, AL_PING, now + TIME_BETWEEN_PINGS);
2533
2534    return progress_tick | TICK_SEND;
2535}
2536
2537
2538static void
2539full_conn_ci_packet_in (lsquic_conn_t *lconn, lsquic_packet_in_t *packet_in)
2540{
2541    struct full_conn *conn = (struct full_conn *) lconn;
2542
2543    lsquic_alarmset_set(&conn->fc_alset, AL_IDLE,
2544                packet_in->pi_received + conn->fc_settings->es_idle_conn_to);
2545    if (0 == (conn->fc_flags & FC_ERROR))
2546        if (0 != process_incoming_packet(conn, packet_in))
2547            conn->fc_flags |= FC_ERROR;
2548}
2549
2550
2551static lsquic_packet_out_t *
2552full_conn_ci_next_packet_to_send (lsquic_conn_t *lconn)
2553{
2554    struct full_conn *conn = (struct full_conn *) lconn;
2555    return lsquic_send_ctl_next_packet_to_send(&conn->fc_send_ctl);
2556}
2557
2558
2559static void
2560full_conn_ci_packet_sent (lsquic_conn_t *lconn, lsquic_packet_out_t *packet_out)
2561{
2562    struct full_conn *conn = (struct full_conn *) lconn;
2563    int s;
2564
2565    if (packet_out->po_frame_types & QFRAME_RETRANSMITTABLE_MASK)
2566    {
2567        conn->fc_n_cons_unretx = 0;
2568        lsquic_alarmset_set(&conn->fc_alset, AL_IDLE,
2569                    packet_out->po_sent + conn->fc_settings->es_idle_conn_to);
2570    }
2571    else
2572        ++conn->fc_n_cons_unretx;
2573    s = lsquic_send_ctl_sent_packet(&conn->fc_send_ctl, packet_out);
2574    if (s != 0)
2575        ABORT_ERROR("sent packet failed: %s", strerror(errno));
2576#if FULL_CONN_STATS
2577    ++conn->fc_stats.n_packets_out;
2578#endif
2579}
2580
2581
2582static void
2583full_conn_ci_packet_not_sent (lsquic_conn_t *lconn, lsquic_packet_out_t *packet_out)
2584{
2585    struct full_conn *conn = (struct full_conn *) lconn;
2586    lsquic_send_ctl_delayed_one(&conn->fc_send_ctl, packet_out);
2587}
2588
2589
2590static void
2591full_conn_ci_handshake_done (lsquic_conn_t *lconn)
2592{
2593    struct full_conn *conn = (struct full_conn *) lconn;
2594    LSQ_DEBUG("handshake reportedly done");
2595    lsquic_alarmset_unset(&conn->fc_alset, AL_HANDSHAKE);
2596    if (0 == apply_peer_settings(conn))
2597        lconn->cn_flags |= LSCONN_HANDSHAKE_DONE;
2598    else
2599        conn->fc_flags |= FC_ERROR;
2600}
2601
2602
2603static int
2604full_conn_ci_user_wants_read (lsquic_conn_t *lconn)
2605{
2606    struct full_conn *conn = (struct full_conn *) lconn;
2607    const lsquic_stream_t *stream;
2608
2609    TAILQ_FOREACH(stream, &conn->fc_pub.rw_streams, next_rw_stream)
2610        if (stream->stream_flags & STREAM_WANT_READ)
2611            return 1;
2612
2613    return 0;
2614}
2615
2616
2617void
2618lsquic_conn_abort (lsquic_conn_t *lconn)
2619{
2620    struct full_conn *conn = (struct full_conn *) lconn;
2621    LSQ_INFO("User aborted connection");
2622    conn->fc_flags |= FC_ABORTED;
2623}
2624
2625
2626void
2627full_conn_close_internal (lsquic_conn_t *lconn, int is_user)
2628{
2629    struct full_conn *conn = (struct full_conn *) lconn;
2630    lsquic_stream_t *stream;
2631    struct lsquic_hash_elem *el;
2632
2633    if (!(conn->fc_flags & FC_CLOSING))
2634    {
2635        if (is_user)
2636            LSQ_INFO("User closed connection");
2637        for (el = lsquic_hash_first(conn->fc_streams); el;
2638                                     el = lsquic_hash_next(conn->fc_streams))
2639        {
2640            stream = lsquic_hashelem_getdata(el);
2641            lsquic_stream_shutdown_internal(stream);
2642        }
2643        conn->fc_flags |= FC_CLOSING;
2644        if (!(conn->fc_flags & FC_GOAWAY_SENT))
2645            conn->fc_flags |= FC_SEND_GOAWAY;
2646    }
2647}
2648
2649
2650void
2651lsquic_conn_close (lsquic_conn_t *lconn)
2652{
2653    full_conn_close_internal(lconn, 1);
2654}
2655
2656
2657void
2658lsquic_conn_going_away (lsquic_conn_t *lconn)
2659{
2660    struct full_conn *conn = (struct full_conn *) lconn;
2661    if (!(conn->fc_flags & (FC_CLOSING|FC_GOING_AWAY)))
2662    {
2663        LSQ_INFO("connection marked as going away");
2664        assert(!(conn->fc_flags & FC_SEND_GOAWAY));
2665        conn->fc_flags |= FC_GOING_AWAY;
2666        if (!(conn->fc_flags & FC_GOAWAY_SENT))
2667            conn->fc_flags |= FC_SEND_GOAWAY;
2668    }
2669}
2670
2671
2672/* Find stream when stream ID is read in from HEADERS stream.  If stream
2673 * cannot be found or created, the connection is aborted.
2674 */
2675#if __GNUC__
2676__attribute__((nonnull(3)))
2677#endif
2678static lsquic_stream_t *
2679find_stream_on_headers (struct full_conn *conn, uint32_t stream_id,
2680                                                            const char *what)
2681{
2682    lsquic_stream_t *stream = find_stream_by_id(conn, stream_id);
2683    if (!stream)
2684    {
2685        if (conn_is_stream_closed(conn, stream_id))
2686        {
2687            LSQ_DEBUG("drop incoming %s for closed stream %u", what, stream_id);
2688            return NULL;
2689        }
2690        if (is_peer_initiated(conn, stream_id))
2691        {
2692            unsigned in_count = count_streams(conn, 1);
2693            LSQ_DEBUG("number of peer-initiated streams: %u", in_count);
2694            if (in_count >= conn->fc_cfg.max_streams_in)
2695            {
2696                ABORT_ERROR("incoming %s for stream %u would exceed "
2697                    "limit: %u", what, stream_id, conn->fc_cfg.max_streams_in);
2698                return NULL;
2699            }
2700            if ((conn->fc_flags & FC_GOING_AWAY) &&
2701                stream_id > conn->fc_max_peer_stream_id)
2702            {
2703                LSQ_DEBUG("going away: drop headers for new stream %u",
2704                                                    stream_id);
2705                return NULL;
2706            }
2707        }
2708        else
2709        {
2710            ABORT_ERROR("frame for never-initiated stream (push promise?)");
2711            return NULL;
2712        }
2713        stream = new_stream(conn, stream_id);
2714        if (!stream)
2715        {
2716            ABORT_ERROR("cannot create new stream: %s", strerror(errno));
2717            return NULL;
2718        }
2719        if (stream_id > conn->fc_max_peer_stream_id)
2720            conn->fc_max_peer_stream_id = stream_id;
2721    }
2722    return stream;
2723}
2724
2725
2726static void
2727headers_stream_on_conn_error (void *ctx)
2728{
2729    struct full_conn *conn = ctx;
2730    ABORT_ERROR("connection error reported by HEADERS stream");
2731}
2732
2733
2734static void
2735headers_stream_on_stream_error (void *ctx, uint32_t stream_id)
2736{
2737    struct full_conn *conn = ctx;
2738    lsquic_stream_t *stream = find_stream_on_headers(conn, stream_id, "error");
2739    if (stream)
2740    {
2741        LSQ_DEBUG("resetting stream %u due to error", stream_id);
2742        /* We use code 1, which is QUIC_INTERNAL_ERROR (see
2743         * [draft-hamilton-quic-transport-protocol-01], Section 10), for all
2744         * errors.  There does not seem to be a good reason to figure out
2745         * and send more specific error codes.
2746         */
2747        lsquic_stream_reset_ext(stream, 1, 0);
2748    }
2749}
2750
2751
2752static void
2753headers_stream_on_enable_push (void *ctx, int enable_push)
2754{
2755    struct full_conn *conn = ctx;
2756    if (0 == enable_push)
2757    {
2758        LSQ_DEBUG("server push %d -> 0", !!(conn->fc_flags & FC_SUPPORT_PUSH));
2759        conn->fc_flags &= ~FC_SUPPORT_PUSH;
2760    }
2761    else if (conn->fc_settings->es_support_push)
2762    {
2763        LSQ_DEBUG("server push %d -> 1", !!(conn->fc_flags & FC_SUPPORT_PUSH));
2764        conn->fc_flags |= FC_SUPPORT_PUSH;
2765    }
2766    else
2767        LSQ_INFO("not enabling server push that's disabled in engine settings");
2768}
2769
2770
2771static void
2772headers_stream_on_incoming_headers (void *ctx, struct uncompressed_headers *uh)
2773{
2774    struct full_conn *conn = ctx;
2775    lsquic_stream_t *stream;
2776
2777    LSQ_DEBUG("incoming headers for stream %u", uh->uh_stream_id);
2778
2779    stream = find_stream_on_headers(conn, uh->uh_stream_id, "headers");
2780    if (!stream)
2781    {
2782        free(uh);
2783        return;
2784    }
2785
2786    if (0 != lsquic_stream_uh_in(stream, uh))
2787    {
2788        ABORT_ERROR("stream %u refused incoming headers", uh->uh_stream_id);
2789        free(uh);
2790    }
2791}
2792
2793
2794static void
2795headers_stream_on_push_promise (void *ctx, struct uncompressed_headers *uh)
2796{
2797    struct full_conn *conn = ctx;
2798    lsquic_stream_t *stream;
2799
2800    assert(!(conn->fc_flags & FC_SERVER));
2801
2802    LSQ_DEBUG("push promise for stream %u in response to %u",
2803                                    uh->uh_oth_stream_id, uh->uh_stream_id);
2804
2805    if (0 == (uh->uh_stream_id & 1)     ||
2806        0 != (uh->uh_oth_stream_id & 1))
2807    {
2808        ABORT_ERROR("invalid push promise stream IDs: %u, %u",
2809                                    uh->uh_oth_stream_id, uh->uh_stream_id);
2810        free(uh);
2811        return;
2812    }
2813
2814    if (!(conn_is_stream_closed(conn, uh->uh_stream_id) ||
2815          find_stream_by_id(conn, uh->uh_stream_id)))
2816    {
2817        ABORT_ERROR("invalid push promise original stream ID %u never "
2818                    "initiated", uh->uh_stream_id);
2819        free(uh);
2820        return;
2821    }
2822
2823    if (conn_is_stream_closed(conn, uh->uh_oth_stream_id) ||
2824        find_stream_by_id(conn, uh->uh_oth_stream_id))
2825    {
2826        ABORT_ERROR("invalid promised stream ID %u already used",
2827                                                        uh->uh_oth_stream_id);
2828        free(uh);
2829        return;
2830    }
2831
2832    stream = new_stream_ext(conn, uh->uh_oth_stream_id, STREAM_IF_STD,
2833                SCF_DI_AUTOSWITCH|(conn->fc_enpub->enp_settings.es_rw_once ?
2834                                                        SCF_DISP_RW_ONCE : 0));
2835    if (!stream)
2836    {
2837        ABORT_ERROR("cannot create stream: %s", strerror(errno));
2838        free(uh);
2839        return;
2840    }
2841    lsquic_stream_push_req(stream, uh);
2842    lsquic_stream_call_on_new(stream,
2843                            conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx);
2844    return;
2845}
2846
2847
2848static void
2849headers_stream_on_priority (void *ctx, uint32_t stream_id, int exclusive,
2850                            uint32_t dep_stream_id, unsigned weight)
2851{
2852    struct full_conn *conn = ctx;
2853    lsquic_stream_t *stream;
2854    LSQ_DEBUG("got priority frame for stream %u: (ex: %d; dep stream: %u; "
2855                  "weight: %u)", stream_id, exclusive, dep_stream_id, weight);
2856    stream = find_stream_on_headers(conn, stream_id, "priority");
2857    if (stream)
2858        lsquic_stream_set_priority_internal(stream, weight);
2859}
2860
2861
2862int lsquic_conn_is_push_enabled(lsquic_conn_t *c)
2863{
2864    return ((struct full_conn *)c)->fc_flags & FC_SUPPORT_PUSH;
2865}
2866
2867
2868lsquic_conn_ctx_t *
2869lsquic_conn_get_ctx (const lsquic_conn_t *lconn)
2870{
2871    struct full_conn *const conn = (struct full_conn *) lconn;
2872    return conn->fc_conn_ctx;
2873}
2874
2875
2876static const struct headers_stream_callbacks headers_callbacks =
2877{
2878    .hsc_on_headers      = headers_stream_on_incoming_headers,
2879    .hsc_on_push_promise = headers_stream_on_push_promise,
2880    .hsc_on_priority     = headers_stream_on_priority,
2881    .hsc_on_stream_error = headers_stream_on_stream_error,
2882    .hsc_on_conn_error   = headers_stream_on_conn_error,
2883    .hsc_on_enable_push  = headers_stream_on_enable_push,
2884};
2885
2886
2887
2888static const struct conn_iface full_conn_iface = {
2889    .ci_destroy              =  full_conn_ci_destroy,
2890    .ci_handshake_done       =  full_conn_ci_handshake_done,
2891    .ci_next_packet_to_send  =  full_conn_ci_next_packet_to_send,
2892    .ci_packet_in            =  full_conn_ci_packet_in,
2893    .ci_packet_not_sent      =  full_conn_ci_packet_not_sent,
2894    .ci_packet_sent          =  full_conn_ci_packet_sent,
2895    .ci_tick                 =  full_conn_ci_tick,
2896    .ci_user_wants_read      =  full_conn_ci_user_wants_read,
2897};
2898