lsquic_stream.c revision 292abba1
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 */
5
6#include <assert.h>
7#include <errno.h>
8#include <inttypes.h>
9#include <stdarg.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/queue.h>
13#include <stddef.h>
14
15#ifdef WIN32
16#include <malloc.h>
17#endif
18
19#include "fiu-local.h"
20
21#include "lsquic.h"
22
23#include "lsquic_int_types.h"
24#include "lsquic_packet_common.h"
25#include "lsquic_packet_in.h"
26#include "lsquic_malo.h"
27#include "lsquic_conn_flow.h"
28#include "lsquic_rtt.h"
29#include "lsquic_sfcw.h"
30#include "lsquic_varint.h"
31#include "lsquic_hq.h"
32#include "lsquic_hash.h"
33#include "lsquic_stream.h"
34#include "lsquic_conn_public.h"
35#include "lsquic_util.h"
36#include "lsquic_mm.h"
37#include "lsquic_headers_stream.h"
38#include "lsquic_conn.h"
39#include "lsquic_data_in_if.h"
40#include "lsquic_parse.h"
41#include "lsquic_packet_in.h"
42#include "lsquic_packet_out.h"
43#include "lsquic_engine_public.h"
44#include "lsquic_senhist.h"
45#include "lsquic_pacer.h"
46#include "lsquic_cubic.h"
47#include "lsquic_bw_sampler.h"
48#include "lsquic_minmax.h"
49#include "lsquic_bbr.h"
50#include "lsquic_adaptive_cc.h"
51#include "lsquic_send_ctl.h"
52#include "lsquic_headers.h"
53#include "lsquic_ev_log.h"
54#include "lsquic_enc_sess.h"
55#include "lsqpack.h"
56#include "lsquic_frab_list.h"
57#include "lsquic_http1x_if.h"
58#include "lsquic_qdec_hdl.h"
59#include "lsquic_qenc_hdl.h"
60#include "lsquic_byteswap.h"
61#include "lsquic_ietf.h"
62#include "lsquic_push_promise.h"
63#include "lsquic_hcso_writer.h"
64
65#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
66#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
67#define LSQUIC_LOG_STREAM_ID stream->id
68#include "lsquic_logger.h"
69
70#define MIN(a, b) ((a) < (b) ? (a) : (b))
71
72static void
73drop_frames_in (lsquic_stream_t *stream);
74
75static void
76maybe_schedule_call_on_close (lsquic_stream_t *stream);
77
78static int
79stream_wantread (lsquic_stream_t *stream, int is_want);
80
81static int
82stream_wantwrite (lsquic_stream_t *stream, int is_want);
83
84enum stream_write_options
85{
86    SWO_BUFFER  = 1 << 0,       /* Allow buffering in sm_buf */
87};
88
89
90static ssize_t
91stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t,
92                                                    enum stream_write_options);
93
94static ssize_t
95save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
96
97static int
98stream_flush (lsquic_stream_t *stream);
99
100static int
101stream_flush_nocheck (lsquic_stream_t *stream);
102
103static void
104maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
105
106enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
107
108static enum swtp_status
109stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
110
111static enum swtp_status
112stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
113
114static enum swtp_status
115stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
116
117static size_t
118stream_write_avail_no_frames (struct lsquic_stream *);
119
120static size_t
121stream_write_avail_with_frames (struct lsquic_stream *);
122
123static size_t
124stream_write_avail_with_headers (struct lsquic_stream *);
125
126static int
127hq_filter_readable (struct lsquic_stream *stream);
128
129static void
130hq_decr_left (struct lsquic_stream *stream, size_t);
131
132static size_t
133hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
134
135static int
136stream_readable_non_http (struct lsquic_stream *stream);
137
138static int
139stream_readable_http_gquic (struct lsquic_stream *stream);
140
141static int
142stream_readable_http_ietf (struct lsquic_stream *stream);
143
144static ssize_t
145stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
146
147static size_t
148active_hq_frame_sizes (const struct lsquic_stream *);
149
150static void
151on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
152
153static void
154stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
155
156static size_t
157stream_hq_frame_size (const struct stream_hq_frame *);
158
159const struct stream_filter_if hq_stream_filter_if =
160{
161    .sfi_readable   = hq_filter_readable,
162    .sfi_filter_df  = hq_filter_df,
163    .sfi_decr_left  = hq_decr_left,
164};
165
166
167#if LSQUIC_KEEP_STREAM_HISTORY
168/* These values are printable ASCII characters for ease of printing the
169 * whole history in a single line of a log message.
170 *
171 * The list of events is not exhaustive: only most interesting events
172 * are recorded.
173 */
174enum stream_history_event
175{
176    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
177    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
178    SHE_REACH_FIN          =  'a',
179    SHE_EARLY_READ_STOP    =  'A',
180    SHE_BLOCKED_OUT        =  'b',
181    SHE_CREATED            =  'C',
182    SHE_FRAME_IN           =  'd',
183    SHE_FRAME_OUT          =  'D',
184    SHE_RESET              =  'e',
185    SHE_WINDOW_UPDATE      =  'E',
186    SHE_FIN_IN             =  'f',
187    SHE_FINISHED           =  'F',
188    SHE_GOAWAY_IN          =  'g',
189    SHE_USER_WRITE_HEADER  =  'h',
190    SHE_HEADERS_IN         =  'H',
191    SHE_IF_SWITCH          =  'i',
192    SHE_ONCLOSE_SCHED      =  'l',
193    SHE_ONCLOSE_CALL       =  'L',
194    SHE_ONNEW              =  'N',
195    SHE_SET_PRIO           =  'p',
196    SHE_SHORT_WRITE        =  'q',
197    SHE_USER_READ          =  'r',
198    SHE_SHUTDOWN_READ      =  'R',
199    SHE_RST_IN             =  's',
200    SHE_STOP_SENDIG_IN     =  'S',
201    SHE_RST_OUT            =  't',
202    SHE_RST_ACKED          =  'T',
203    SHE_FLUSH              =  'u',
204    SHE_STOP_SENDIG_OUT    =  'U',
205    SHE_USER_WRITE_DATA    =  'w',
206    SHE_SHUTDOWN_WRITE     =  'W',
207    SHE_CLOSE              =  'X',
208    SHE_DELAY_SW           =  'y',
209    SHE_FORCE_FINISH       =  'Z',
210    SHE_WANTREAD_NO        =  '0',  /* "YES" must be one more than "NO" */
211    SHE_WANTREAD_YES       =  '1',
212    SHE_WANTWRITE_NO       =  '2',
213    SHE_WANTWRITE_YES      =  '3',
214};
215
216static void
217sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
218{
219    enum stream_history_event prev_event;
220    sm_hist_idx_t idx;
221    int plus;
222
223    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
224    plus = SHE_PLUS == stream->sm_hist_buf[idx];
225    idx = (idx - plus) & SM_HIST_IDX_MASK;
226    prev_event = stream->sm_hist_buf[idx];
227
228    if (prev_event == sh_event && plus)
229        return;
230
231    if (prev_event == sh_event)
232        sh_event = SHE_PLUS;
233    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
234
235    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
236        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
237                                                        stream->sm_hist_buf);
238}
239
240
241#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
242#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
243        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
244            LSQ_DEBUG("history: [%.*s]",                                    \
245                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
246                (stream)->sm_hist_buf);                                     \
247    } while (0)
248#else
249#   define SM_HISTORY_APPEND(stream, event)
250#   define SM_HISTORY_DUMP_REMAINING(stream)
251#endif
252
253
254static int
255stream_inside_callback (const lsquic_stream_t *stream)
256{
257    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
258}
259
260
261/* This is an approximation.  If data is written or read outside of the
262 * event loop, last_prog will be somewhat out of date, but it's close
263 * enough for our purposes.
264 */
265static void
266maybe_update_last_progress (struct lsquic_stream *stream)
267{
268    if (stream->conn_pub && !lsquic_stream_is_critical(stream))
269    {
270        if (stream->conn_pub->last_prog != stream->conn_pub->last_tick)
271            LSQ_DEBUG("update last progress to %"PRIu64,
272                                            stream->conn_pub->last_tick);
273        stream->conn_pub->last_prog = stream->conn_pub->last_tick;
274#ifndef NDEBUG
275        stream->sm_last_prog = stream->conn_pub->last_tick;
276#endif
277    }
278}
279
280
281static void
282maybe_conn_to_tickable (lsquic_stream_t *stream)
283{
284    if (!stream_inside_callback(stream))
285        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
286                                           stream->conn_pub->lconn);
287}
288
289
290/* Here, "readable" means that the user is able to read from the stream. */
291static void
292maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
293{
294    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
295    {
296        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
297                                           stream->conn_pub->lconn);
298    }
299}
300
301
302/* Here, "writeable" means that data can be put into packets to be
303 * scheduled to be sent out.
304 *
305 * If `check_can_send' is false, it means that we do not need to check
306 * whether packets can be sent.  This check was already performed when
307 * we packetized stream data.
308 */
309static void
310maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
311                                                    int check_can_send)
312{
313    if (!stream_inside_callback(stream) &&
314            (!check_can_send
315             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
316          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
317    {
318        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
319                                           stream->conn_pub->lconn);
320    }
321}
322
323
324static int
325stream_stalled (const lsquic_stream_t *stream)
326{
327    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
328           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
329                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
330}
331
332
333static size_t
334stream_stream_frame_header_sz (const struct lsquic_stream *stream,
335                                                            unsigned data_sz)
336{
337    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
338                                    stream->id, stream->tosend_off, data_sz);
339}
340
341
342static size_t
343stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
344                                                            unsigned data_sz)
345{
346    return stream->conn_pub->lconn->cn_pf
347         ->pf_calc_crypto_frame_header_sz(stream->tosend_off, data_sz);
348}
349
350
351/* GQUIC-only function */
352static int
353stream_is_hsk (const struct lsquic_stream *stream)
354{
355    if (stream->sm_bflags & SMBF_IETF)
356        return 0;
357    else
358        return lsquic_stream_is_crypto(stream);
359}
360
361
362/* This function's only job is to change the allocated packet's header
363 * type to HETY_0RTT when stream frames are written before handshake
364 * is complete.
365 */
366static struct lsquic_packet_out *
367stream_get_packet_for_stream_0rtt (struct lsquic_send_ctl *ctl,
368                unsigned need_at_least, const struct network_path *path,
369                const struct lsquic_stream *stream)
370{
371    struct lsquic_packet_out *packet_out;
372
373    if (stream->conn_pub->lconn->cn_flags & LSCONN_HANDSHAKE_DONE)
374    {
375        LSQ_DEBUG("switch to regular \"get packet for stream\" function");
376        /* Here we drop the "const" because this is a static function.
377         * Otherwise, we would not condone such sorcery.
378         */
379        ((struct lsquic_stream *) stream)->sm_get_packet_for_stream
380                                = lsquic_send_ctl_get_packet_for_stream;
381        return lsquic_send_ctl_get_packet_for_stream(ctl, need_at_least,
382                                                            path, stream);
383    }
384    else
385    {
386        packet_out = lsquic_send_ctl_get_packet_for_stream(ctl, need_at_least,
387                                                            path, stream);
388        if (packet_out)
389            packet_out->po_header_type = HETY_0RTT;
390        return packet_out;
391    }
392}
393
394
395static struct lsquic_stream *
396stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
397           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
398           enum stream_ctor_flags ctor_flags)
399{
400    struct lsquic_stream *stream;
401
402    stream = calloc(1, sizeof(*stream));
403    if (!stream)
404        return NULL;
405
406    if (ctor_flags & SCF_USE_DI_HASH)
407        stream->data_in = lsquic_data_in_hash_new(conn_pub, id, 0);
408    else
409        stream->data_in = lsquic_data_in_nocopy_new(conn_pub, id);
410    if (!stream->data_in)
411    {
412        free(stream);
413        return NULL;
414    }
415
416    stream->id        = id;
417    stream->stream_if = stream_if;
418    stream->conn_pub  = conn_pub;
419    stream->sm_onnew_arg = stream_if_ctx;
420    stream->sm_write_avail = stream_write_avail_no_frames;
421
422    STAILQ_INIT(&stream->sm_hq_frames);
423
424    stream->sm_bflags |= ctor_flags & ((1 << N_SMBF_FLAGS) - 1);
425    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
426        stream->sm_bflags |= SMBF_SERVER;
427    stream->sm_get_packet_for_stream = lsquic_send_ctl_get_packet_for_stream;
428
429    return stream;
430}
431
432
433lsquic_stream_t *
434lsquic_stream_new (lsquic_stream_id_t id,
435        struct lsquic_conn_public *conn_pub,
436        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
437        unsigned initial_window, uint64_t initial_send_off,
438        enum stream_ctor_flags ctor_flags)
439{
440    lsquic_cfcw_t *cfcw;
441    lsquic_stream_t *stream;
442
443    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
444                                                                ctor_flags);
445    if (!stream)
446        return NULL;
447
448    if (!initial_window)
449        initial_window = 16 * 1024;
450
451    if (ctor_flags & SCF_IETF)
452    {
453        cfcw = &conn_pub->cfcw;
454        stream->sm_bflags |= SMBF_CONN_LIMITED;
455        if (ctor_flags & SCF_HTTP)
456        {
457            stream->sm_write_avail = stream_write_avail_with_headers;
458            stream->sm_readable = stream_readable_http_ietf;
459            stream->sm_sfi = &hq_stream_filter_if;
460        }
461        else
462            stream->sm_readable = stream_readable_non_http;
463        if ((ctor_flags & (SCF_HTTP|SCF_HTTP_PRIO))
464                                                == (SCF_HTTP|SCF_HTTP_PRIO))
465            lsquic_stream_set_priority_internal(stream, LSQUIC_DEF_HTTP_URGENCY);
466        else
467            lsquic_stream_set_priority_internal(stream,
468                                            LSQUIC_STREAM_DEFAULT_PRIO);
469        stream->sm_write_to_packet = stream_write_to_packet_std;
470        stream->sm_frame_header_sz = stream_stream_frame_header_sz;
471    }
472    else
473    {
474        if (ctor_flags & SCF_CRITICAL)
475            cfcw = NULL;
476        else
477        {
478            cfcw = &conn_pub->cfcw;
479            stream->sm_bflags |= SMBF_CONN_LIMITED;
480            lsquic_stream_set_priority_internal(stream,
481                                                LSQUIC_STREAM_DEFAULT_PRIO);
482        }
483        if (stream->sm_bflags & SMBF_USE_HEADERS)
484            stream->sm_readable = stream_readable_http_gquic;
485        else
486            stream->sm_readable = stream_readable_non_http;
487        if (ctor_flags & SCF_CRYPTO_FRAMES)
488        {
489            stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
490            stream->sm_write_to_packet = stream_write_to_packet_crypto;
491        }
492        else
493        {
494            if (stream_is_hsk(stream))
495                stream->sm_write_to_packet = stream_write_to_packet_hsk;
496            else
497                stream->sm_write_to_packet = stream_write_to_packet_std;
498            stream->sm_frame_header_sz = stream_stream_frame_header_sz;
499        }
500    }
501
502    if ((stream->sm_bflags & (SMBF_SERVER|SMBF_IETF)) == SMBF_IETF
503                    && !(conn_pub->lconn->cn_flags & LSCONN_HANDSHAKE_DONE))
504    {
505        LSQ_DEBUG("use wrapper \"get packet for stream\" function");
506        stream->sm_get_packet_for_stream = stream_get_packet_for_stream_0rtt;
507    }
508
509    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
510    stream->max_send_off = initial_send_off;
511    LSQ_DEBUG("created stream");
512    SM_HISTORY_APPEND(stream, SHE_CREATED);
513    if (ctor_flags & SCF_CALL_ON_NEW)
514        lsquic_stream_call_on_new(stream);
515    return stream;
516}
517
518
519struct lsquic_stream *
520lsquic_stream_new_crypto (enum enc_level enc_level,
521        struct lsquic_conn_public *conn_pub,
522        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
523        enum stream_ctor_flags ctor_flags)
524{
525    struct lsquic_stream *stream;
526    lsquic_stream_id_t stream_id;
527
528    assert(ctor_flags & SCF_CRITICAL);
529
530    fiu_return_on("stream/new_crypto", NULL);
531
532    stream_id = ~0ULL - enc_level;
533    stream = stream_new_common(stream_id, conn_pub, stream_if,
534                                                stream_if_ctx, ctor_flags);
535    if (!stream)
536        return NULL;
537
538    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
539    stream->sm_enc_level = enc_level;
540    /* We allow buffering of up to 16 KB of CRYPTO data (I guess we could
541     * make this configurable?).  The window is opened (without sending
542     * MAX_STREAM_DATA) as CRYPTO data is consumed.  If too much comes in
543     * at a time, we abort with TEC_CRYPTO_BUFFER_EXCEEDED.
544     */
545    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
546    /* Don't limit ourselves from sending CRYPTO data.  We assume that
547     * the underlying crypto library behaves in a sane manner.
548     */
549    stream->max_send_off = UINT64_MAX;
550    LSQ_DEBUG("created crypto stream");
551    SM_HISTORY_APPEND(stream, SHE_CREATED);
552    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
553    stream->sm_write_to_packet = stream_write_to_packet_crypto;
554    stream->sm_readable = stream_readable_non_http;
555    if (ctor_flags & SCF_CALL_ON_NEW)
556        lsquic_stream_call_on_new(stream);
557    return stream;
558}
559
560
561void
562lsquic_stream_call_on_new (lsquic_stream_t *stream)
563{
564    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
565    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
566    {
567        LSQ_DEBUG("calling on_new_stream");
568        SM_HISTORY_APPEND(stream, SHE_ONNEW);
569        stream->stream_flags |= STREAM_ONNEW_DONE;
570        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
571                                                          stream);
572    }
573}
574
575
576static void
577decr_conn_cap (struct lsquic_stream *stream, size_t incr)
578{
579    if (stream->sm_bflags & SMBF_CONN_LIMITED)
580    {
581        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
582        stream->conn_pub->conn_cap.cc_sent -= incr;
583    }
584}
585
586
587static void
588maybe_resize_stream_buffer (struct lsquic_stream *stream)
589{
590    assert(0 == stream->sm_n_buffered);
591
592    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
593    {
594        free(stream->sm_buf);
595        stream->sm_buf = NULL;
596        stream->sm_n_allocated = 0;
597    }
598    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
599        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
600}
601
602
603static void
604drop_buffered_data (struct lsquic_stream *stream)
605{
606    decr_conn_cap(stream, stream->sm_n_buffered);
607    stream->sm_n_buffered = 0;
608    maybe_resize_stream_buffer(stream);
609    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
610        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
611}
612
613
614void
615lsquic_stream_drop_hset_ref (struct lsquic_stream *stream)
616{
617    if (stream->uh)
618        stream->uh->uh_hset = NULL;
619}
620
621
622static void
623destroy_uh (struct lsquic_stream *stream)
624{
625    if (stream->uh)
626    {
627        if (stream->uh->uh_hset)
628            stream->conn_pub->enpub->enp_hsi_if
629                            ->hsi_discard_header_set(stream->uh->uh_hset);
630        free(stream->uh);
631        stream->uh = NULL;
632    }
633}
634
635
636void
637lsquic_stream_destroy (lsquic_stream_t *stream)
638{
639    struct push_promise *promise;
640    struct stream_hq_frame *shf;
641
642    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
643    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
644                                                            STREAM_ONNEW_DONE)
645    {
646        stream->stream_flags |= STREAM_ONCLOSE_DONE;
647        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
648        stream->stream_if->on_close(stream, stream->st_ctx);
649    }
650    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
651        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
652    if (stream->sm_qflags & SMQF_WANT_READ)
653        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
654    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
655        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
656    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
657        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
658    if (stream->sm_qflags & SMQF_QPACK_DEC)
659        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
660    else if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
661                                            == (SMBF_IETF|SMBF_USE_HEADERS)
662            && !(stream->stream_flags & STREAM_FIN_REACHED))
663        lsquic_qdh_cancel_stream_id(stream->conn_pub->u.ietf.qdh, stream->id);
664    drop_buffered_data(stream);
665    lsquic_sfcw_consume_rem(&stream->fc);
666    drop_frames_in(stream);
667    if (stream->push_req)
668    {
669        if (stream->push_req->uh_hset)
670            stream->conn_pub->enpub->enp_hsi_if
671                            ->hsi_discard_header_set(stream->push_req->uh_hset);
672        free(stream->push_req);
673    }
674    while ((promise = SLIST_FIRST(&stream->sm_promises)))
675    {
676        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
677        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
678    }
679    if (stream->sm_promise)
680    {
681        assert(stream->sm_promise->pp_pushed_stream == stream);
682        stream->sm_promise->pp_pushed_stream = NULL;
683        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
684    }
685    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
686        stream_hq_frame_put(stream, shf);
687    destroy_uh(stream);
688    free(stream->sm_buf);
689    free(stream->sm_header_block);
690    LSQ_DEBUG("destroyed stream");
691    SM_HISTORY_DUMP_REMAINING(stream);
692    free(stream);
693}
694
695
696static int
697stream_is_finished (struct lsquic_stream *stream)
698{
699    return lsquic_stream_is_closed(stream)
700        && (stream->sm_bflags & SMBF_DELAY_ONCLOSE ?
701           /* Need a stricter check when on_close() is delayed: */
702            !lsquic_stream_has_unacked_data(stream) :
703           /* n_unacked checks that no outgoing packets that reference this
704            * stream are outstanding:
705            */
706            0 == stream->n_unacked)
707        && 0 == (stream->sm_qflags & (
708           /* This checks that no packets that reference this stream will
709            * become outstanding:
710            */
711                    SMQF_SEND_RST
712           /* Can't finish stream until all "self" flags are unset: */
713                    | SMQF_SELF_FLAGS))
714        && ((stream->stream_flags & STREAM_FORCE_FINISH)
715          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
716}
717
718
719/* This is an internal function */
720void
721lsquic_stream_force_finish (struct lsquic_stream *stream)
722{
723    LSQ_DEBUG("stream is now finished");
724    SM_HISTORY_APPEND(stream, SHE_FINISHED);
725    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
726        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
727                                                next_service_stream);
728    stream->sm_qflags |= SMQF_FREE_STREAM;
729    stream->stream_flags |= STREAM_FINISHED;
730}
731
732
733static void
734maybe_finish_stream (lsquic_stream_t *stream)
735{
736    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
737                                                    stream_is_finished(stream))
738        lsquic_stream_force_finish(stream);
739}
740
741
742static void
743maybe_schedule_call_on_close (lsquic_stream_t *stream)
744{
745    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
746                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
747            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
748            && (!(stream->sm_bflags & SMBF_DELAY_ONCLOSE)
749                                || !lsquic_stream_has_unacked_data(stream))
750            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
751    {
752        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
753            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
754                                                    next_service_stream);
755        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
756        LSQ_DEBUG("scheduled calling on_close");
757        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
758    }
759}
760
761
762void
763lsquic_stream_call_on_close (lsquic_stream_t *stream)
764{
765    assert(stream->stream_flags & STREAM_ONNEW_DONE);
766    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
767    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
768        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
769                                                    next_service_stream);
770    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
771    {
772        LSQ_DEBUG("calling on_close");
773        stream->stream_flags |= STREAM_ONCLOSE_DONE;
774        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
775        stream->stream_if->on_close(stream, stream->st_ctx);
776    }
777    else
778        assert(0);
779}
780
781
782static int
783stream_has_frame_at_read_offset (struct lsquic_stream *stream)
784{
785    if (!((stream->stream_flags & STREAM_CACHED_FRAME)
786                    && stream->read_offset == stream->sm_last_frame_off))
787    {
788        stream->sm_has_frame = stream->data_in->di_if->di_get_frame(
789                                stream->data_in, stream->read_offset) != NULL;
790        stream->sm_last_frame_off = stream->read_offset;
791        stream->stream_flags |= STREAM_CACHED_FRAME;
792    }
793    return stream->sm_has_frame;
794}
795
796
797static int
798stream_readable_non_http (struct lsquic_stream *stream)
799{
800    return stream_has_frame_at_read_offset(stream);
801}
802
803
804static int
805stream_readable_http_gquic (struct lsquic_stream *stream)
806{
807    return (stream->stream_flags & STREAM_HAVE_UH)
808        && (stream->uh
809            || stream_has_frame_at_read_offset(stream));
810}
811
812
813static int
814stream_readable_http_ietf (struct lsquic_stream *stream)
815{
816    return
817        /* If we have read the header set and the header set has not yet
818         * been read, the stream is readable.
819         */
820        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
821        ||
822        /* Alternatively, run the filter and check for payload availability. */
823        (stream->sm_sfi->sfi_readable(stream)
824            && (/* Running the filter may result in hitting FIN: */
825                (stream->stream_flags & STREAM_FIN_REACHED)
826                || stream_has_frame_at_read_offset(stream)));
827}
828
829
830static int
831maybe_switch_data_in (struct lsquic_stream *stream)
832{
833    if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
834            (stream->data_in->di_flags & DI_SWITCH_IMPL))
835    {
836        stream->data_in = stream->data_in->di_if->di_switch_impl(
837                                    stream->data_in, stream->read_offset);
838        if (!stream->data_in)
839        {
840            stream->data_in = lsquic_data_in_error_new();
841            return -1;
842        }
843    }
844
845    return 0;
846}
847
848
849/* Drain and discard any incoming data */
850static int
851stream_readable_discard (struct lsquic_stream *stream)
852{
853    struct data_frame *data_frame;
854    uint64_t toread;
855    int fin;
856
857    while ((data_frame = stream->data_in->di_if->di_get_frame(
858                                    stream->data_in, stream->read_offset)))
859    {
860        fin = data_frame->df_fin;
861        toread = data_frame->df_size - data_frame->df_read_off;
862        stream->read_offset += toread;
863        data_frame->df_read_off = data_frame->df_size;
864        stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
865        if (fin)
866            break;
867    }
868
869    (void) maybe_switch_data_in(stream);
870
871    return 0;   /* Never readable */
872}
873
874
875static int
876stream_is_read_reset (const struct lsquic_stream *stream)
877{
878    if (stream->sm_bflags & SMBF_IETF)
879        return stream->stream_flags & STREAM_RST_RECVD;
880    else
881        return (stream->stream_flags & (STREAM_RST_RECVD|STREAM_RST_SENT))
882            || (stream->sm_qflags & SMQF_SEND_RST);
883}
884
885
886int
887lsquic_stream_readable (struct lsquic_stream *stream)
888{
889    /* A stream is readable if one of the following is true: */
890    return
891        /* - It is already finished: in that case, lsquic_stream_read() will
892         *   return 0.
893         */
894            (stream->stream_flags & STREAM_FIN_REACHED)
895        /* - The stream is reset, by either side.  In this case,
896         *   lsquic_stream_read() will return -1 (we want the user to be
897         *   able to collect the error).
898         */
899        ||  stream_is_read_reset(stream)
900        /* Type-dependent readability check: */
901        ||  stream->sm_readable(stream);
902    ;
903}
904
905
906/* Return true if write end of the stream has been reset.
907 * Note that the logic for gQUIC is the same for write and read resets.
908 */
909int
910lsquic_stream_is_write_reset (const struct lsquic_stream *stream)
911{
912    if (stream->sm_bflags & SMBF_IETF)
913        return stream->stream_flags & STREAM_SS_RECVD;
914    else
915        return (stream->stream_flags & (STREAM_RST_RECVD|STREAM_RST_SENT))
916            || (stream->sm_qflags & SMQF_SEND_RST);
917}
918
919
920static int
921stream_writeable (struct lsquic_stream *stream)
922{
923    /* A stream is writeable if one of the following is true: */
924    return
925        /* - The stream is reset, by either side.  In this case,
926         *   lsquic_stream_write() will return -1 (we want the user to be
927         *   able to collect the error).
928         */
929           lsquic_stream_is_write_reset(stream)
930        /* - Data can be written to stream: */
931        || lsquic_stream_write_avail(stream)
932    ;
933}
934
935
936static size_t
937stream_write_avail_no_frames (struct lsquic_stream *stream)
938{
939    uint64_t stream_avail, conn_avail;
940
941    stream_avail = stream->max_send_off - stream->tosend_off
942                                                - stream->sm_n_buffered;
943
944    if (stream->sm_bflags & SMBF_CONN_LIMITED)
945    {
946        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
947        if (conn_avail < stream_avail)
948            stream_avail = conn_avail;
949    }
950
951    return stream_avail;
952}
953
954
955static size_t
956stream_write_avail_with_frames (struct lsquic_stream *stream)
957{
958    uint64_t stream_avail, conn_avail;
959    const struct stream_hq_frame *shf;
960    size_t size;
961
962    stream_avail = stream->max_send_off - stream->tosend_off
963                                                - stream->sm_n_buffered;
964    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
965        if (!(shf->shf_flags & SHF_WRITTEN))
966        {
967            size = stream_hq_frame_size(shf);
968            assert(size <= stream_avail);
969            stream_avail -= size;
970        }
971
972    if (stream->sm_bflags & SMBF_CONN_LIMITED)
973    {
974        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
975        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
976            if (!(shf->shf_flags & SHF_CC_PAID))
977            {
978                size = stream_hq_frame_size(shf);
979                if (size < conn_avail)
980                    conn_avail -= size;
981                else
982                    return 0;
983            }
984        if (conn_avail < stream_avail)
985            stream_avail = conn_avail;
986    }
987
988    if (stream_avail >= 3 /* Smallest new frame */)
989        return stream_avail;
990    else
991        return 0;
992}
993
994
995static int
996stream_is_pushing_promise (const struct lsquic_stream *stream)
997{
998    return (stream->stream_flags & STREAM_PUSHING)
999        && SLIST_FIRST(&stream->sm_promises)
1000        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
1001        ;
1002}
1003
1004
1005/* To prevent deadlocks, ensure that when headers are sent, the bytes
1006 * sent on the encoder stream are written first.
1007 *
1008 * XXX If the encoder is set up in non-risking mode, it is perfectly
1009 * fine to send the header block first.  TODO: update the logic to
1010 * reflect this.  There should be two sending behaviors: risk and non-risk.
1011 * For now, we assume risk for everything to be on the conservative side.
1012 */
1013static size_t
1014stream_write_avail_with_headers (struct lsquic_stream *stream)
1015{
1016    if (stream->stream_flags & STREAM_PUSHING)
1017        return stream_write_avail_with_frames(stream);
1018
1019    switch (stream->sm_send_headers_state)
1020    {
1021    case SSHS_BEGIN:
1022        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
1023    case SSHS_ENC_SENDING:
1024        if (stream->sm_hb_compl >
1025                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
1026            return 0;
1027        LSQ_DEBUG("encoder stream bytes have all been sent");
1028        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
1029        /* fall-through */
1030    default:
1031        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
1032        return stream_write_avail_with_frames(stream);
1033    }
1034}
1035
1036
1037size_t
1038lsquic_stream_write_avail (struct lsquic_stream *stream)
1039{
1040    return stream->sm_write_avail(stream);
1041}
1042
1043
1044int
1045lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
1046{
1047    struct lsquic_conn *lconn;
1048
1049    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
1050                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
1051    {
1052        if (stream->sm_bflags & SMBF_IETF)
1053        {
1054            lconn = stream->conn_pub->lconn;
1055            if (lsquic_stream_is_crypto(stream))
1056                lconn->cn_if->ci_abort_error(lconn, 0,
1057                    TEC_CRYPTO_BUFFER_EXCEEDED,
1058                    "crypto buffer exceeded on in crypto level %"PRIu64,
1059                    crypto_level(stream));
1060            else
1061                lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
1062                    "flow control violation on stream %"PRIu64, stream->id);
1063        }
1064        return -1;
1065    }
1066    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1067    {
1068        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1069            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1070                                                    next_send_stream);
1071        stream->sm_qflags |= SMQF_SEND_WUF;
1072    }
1073    return 0;
1074}
1075
1076
1077int
1078lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
1079{
1080    uint64_t max_off;
1081    int got_next_offset, rv, free_frame;
1082    enum ins_frame ins_frame;
1083    struct lsquic_conn *lconn;
1084
1085    assert(frame->packet_in);
1086
1087    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
1088    LSQ_DEBUG("received stream frame, offset %"PRIu64", len %u; "
1089        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
1090
1091    if ((stream->sm_bflags & SMBF_USE_HEADERS)
1092                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
1093    {
1094        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
1095        lsquic_malo_put(frame);
1096        return -1;
1097    }
1098
1099    if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF)
1100            && (stream->stream_flags & STREAM_FIN_RECVD)
1101            && stream->sm_fin_off != DF_END(frame))
1102    {
1103        lconn = stream->conn_pub->lconn;
1104        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1105            "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does "
1106            "not match previous final size %"PRIu64, DF_END(frame),
1107            stream->id, stream->sm_fin_off);
1108        return -1;
1109    }
1110
1111    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
1112  insert_frame:
1113    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
1114    if (INS_FRAME_OK == ins_frame)
1115    {
1116        /* Update maximum offset in the flow controller and check for flow
1117         * control violation:
1118         */
1119        rv = -1;
1120        free_frame = !stream->data_in->di_if->di_own_on_ok;
1121        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
1122        if (0 != lsquic_stream_update_sfcw(stream, max_off))
1123            goto end_ok;
1124        if (frame->data_frame.df_fin)
1125        {
1126            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
1127            stream->stream_flags |= STREAM_FIN_RECVD;
1128            stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF;
1129            stream->sm_fin_off = DF_END(frame);
1130            maybe_finish_stream(stream);
1131        }
1132        if (0 != maybe_switch_data_in(stream))
1133            goto end_ok;
1134        if (got_next_offset)
1135            /* Checking the offset saves di_get_frame() call */
1136            maybe_conn_to_tickable_if_readable(stream);
1137        rv = 0;
1138  end_ok:
1139        if (free_frame)
1140            lsquic_malo_put(frame);
1141        stream->stream_flags &= ~STREAM_CACHED_FRAME;
1142        return rv;
1143    }
1144    else if (INS_FRAME_DUP == ins_frame)
1145    {
1146        return 0;
1147    }
1148    else if (INS_FRAME_OVERLAP == ins_frame)
1149    {
1150        LSQ_DEBUG("overlap: switching DATA IN implementation");
1151        stream->data_in = stream->data_in->di_if->di_switch_impl(
1152                                    stream->data_in, stream->read_offset);
1153        if (stream->data_in)
1154            goto insert_frame;
1155        stream->data_in = lsquic_data_in_error_new();
1156        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
1157        lsquic_malo_put(frame);
1158        return -1;
1159    }
1160    else
1161    {
1162        assert(INS_FRAME_ERR == ins_frame);
1163        return -1;
1164    }
1165}
1166
1167
1168static void
1169drop_frames_in (lsquic_stream_t *stream)
1170{
1171    if (stream->data_in)
1172    {
1173        stream->data_in->di_if->di_destroy(stream->data_in);
1174        /* To avoid checking whether `data_in` is set, just set to the error
1175         * data-in stream.  It does the right thing after incoming data is
1176         * dropped.
1177         */
1178        stream->data_in = lsquic_data_in_error_new();
1179        stream->stream_flags &= ~STREAM_CACHED_FRAME;
1180    }
1181}
1182
1183
1184static void
1185maybe_elide_stream_frames (struct lsquic_stream *stream)
1186{
1187    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
1188    {
1189        if (stream->n_unacked)
1190            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
1191                                                stream->id);
1192        stream->stream_flags |= STREAM_FRAMES_ELIDED;
1193    }
1194}
1195
1196
1197int
1198lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
1199                      uint64_t error_code)
1200{
1201    struct lsquic_conn *lconn;
1202
1203    if ((stream->sm_bflags & SMBF_IETF)
1204            && (stream->stream_flags & STREAM_FIN_RECVD)
1205            && stream->sm_fin_off != offset)
1206    {
1207        lconn = stream->conn_pub->lconn;
1208        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1209            "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") "
1210            "does not match previous final size %"PRIu64, offset,
1211            stream->id, stream->sm_fin_off);
1212        return -1;
1213    }
1214
1215    if (stream->stream_flags & STREAM_RST_RECVD)
1216    {
1217        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
1218        return 0;
1219    }
1220
1221    SM_HISTORY_APPEND(stream, SHE_RST_IN);
1222    /* This flag must always be set, even if we are "ignoring" it: it is
1223     * used by elision code.
1224     */
1225    stream->stream_flags |= STREAM_RST_RECVD;
1226
1227    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
1228    {
1229        LSQ_INFO("RST_STREAM invalid: its offset %"PRIu64" is "
1230            "smaller than that of byte following the last byte we have seen: "
1231            "%"PRIu64, offset,
1232            lsquic_sfcw_get_max_recv_off(&stream->fc));
1233        return -1;
1234    }
1235
1236    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
1237    {
1238        LSQ_INFO("RST_STREAM invalid: its offset %"PRIu64
1239            " violates flow control", offset);
1240        return -1;
1241    }
1242
1243    if (stream->stream_if->on_reset
1244                            && !(stream->stream_flags & STREAM_ONCLOSE_DONE))
1245    {
1246        if (stream->sm_bflags & SMBF_IETF)
1247        {
1248            if (!(stream->sm_dflags & SMDF_ONRESET0))
1249            {
1250                stream->stream_if->on_reset(stream, stream->st_ctx, 0);
1251                stream->sm_dflags |= SMDF_ONRESET0;
1252            }
1253        }
1254        else
1255        {
1256            if ((stream->sm_dflags & (SMDF_ONRESET0|SMDF_ONRESET1))
1257                                    != (SMDF_ONRESET0|SMDF_ONRESET1))
1258            {
1259                stream->stream_if->on_reset(stream, stream->st_ctx, 2);
1260                stream->sm_dflags |= SMDF_ONRESET0|SMDF_ONRESET1;
1261            }
1262        }
1263    }
1264
1265    /* Let user collect error: */
1266    maybe_conn_to_tickable_if_readable(stream);
1267
1268    lsquic_sfcw_consume_rem(&stream->fc);
1269    drop_frames_in(stream);
1270
1271    if (!(stream->sm_bflags & SMBF_IETF))
1272    {
1273        drop_buffered_data(stream);
1274        maybe_elide_stream_frames(stream);
1275    }
1276
1277    if (stream->sm_qflags & SMQF_WAIT_FIN_OFF)
1278    {
1279        stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF;
1280        LSQ_DEBUG("final offset is now known: %"PRIu64, offset);
1281    }
1282
1283    if (!(stream->stream_flags &
1284                        (STREAM_RST_SENT|STREAM_SS_SENT|STREAM_FIN_SENT))
1285                            && !(stream->sm_bflags & SMBF_IETF)
1286                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1287        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1288
1289    stream->stream_flags |= STREAM_RST_RECVD;
1290
1291    maybe_finish_stream(stream);
1292    maybe_schedule_call_on_close(stream);
1293
1294    return 0;
1295}
1296
1297
1298void
1299lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1300                                                        uint64_t error_code)
1301{
1302    if (stream->stream_flags & STREAM_SS_RECVD)
1303    {
1304        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1305        return;
1306    }
1307
1308    SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_IN);
1309    stream->stream_flags |= STREAM_SS_RECVD;
1310
1311    if (stream->stream_if->on_reset && !(stream->sm_dflags & SMDF_ONRESET1)
1312                            && !(stream->stream_flags & STREAM_ONCLOSE_DONE))
1313    {
1314        stream->stream_if->on_reset(stream, stream->st_ctx, 1);
1315        stream->sm_dflags |= SMDF_ONRESET1;
1316    }
1317
1318    /* Let user collect error: */
1319    maybe_conn_to_tickable_if_writeable(stream, 0);
1320
1321    lsquic_sfcw_consume_rem(&stream->fc);
1322    drop_buffered_data(stream);
1323    maybe_elide_stream_frames(stream);
1324
1325    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1326                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1327        lsquic_stream_reset_ext(stream, 0, 0);
1328
1329    maybe_finish_stream(stream);
1330    maybe_schedule_call_on_close(stream);
1331}
1332
1333
1334uint64_t
1335lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1336{
1337    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1338}
1339
1340
1341void
1342lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1343{
1344    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1345    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1346    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1347        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1348    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1349}
1350
1351
1352uint64_t
1353lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1354{
1355    assert(stream->sm_qflags & SMQF_SEND_WUF);
1356    stream->sm_qflags &= ~SMQF_SEND_WUF;
1357    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1358        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1359    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1360}
1361
1362
1363void
1364lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1365{
1366    uint64_t last_off;
1367
1368    if (stream->sm_last_recv_off)
1369        last_off = stream->sm_last_recv_off;
1370    else
1371        /* This gets advertized in transport parameters */
1372        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1373
1374    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1375        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1376
1377    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1378    {
1379        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1380            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1381                                                    next_send_stream);
1382        stream->sm_qflags |= SMQF_SEND_WUF;
1383        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1384    }
1385    else if (stream->sm_qflags & SMQF_SEND_WUF)
1386        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1387    else if (stream->sm_last_recv_off)
1388        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1389            "packetized or sent", stream->sm_last_recv_off);
1390    else
1391        LSQ_INFO("Peer should have receive transport param limit "
1392            "of %"PRIu64"; odd.", last_off);
1393}
1394
1395
1396/* GQUIC's BLOCKED frame does not have an offset */
1397void
1398lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1399{
1400    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1401    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1402    {
1403        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1404            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1405                                                    next_send_stream);
1406        stream->sm_qflags |= SMQF_SEND_WUF;
1407        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1408    }
1409    else
1410        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1411}
1412
1413
1414void
1415lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1416{
1417    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1418    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1419    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1420    stream->stream_flags |= STREAM_BLOCKED_SENT;
1421    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1422        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1423}
1424
1425
1426void
1427lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1428{
1429    assert(stream->sm_qflags & SMQF_SEND_RST);
1430    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1431    stream->sm_qflags &= ~SMQF_SEND_RST;
1432    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1433        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1434    stream->stream_flags |= STREAM_RST_SENT;
1435    maybe_finish_stream(stream);
1436}
1437
1438
1439static size_t
1440read_uh (struct lsquic_stream *stream,
1441        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1442{
1443    struct http1x_headers *const h1h = stream->uh->uh_hset;
1444    size_t nread;
1445
1446    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1447                  h1h->h1h_size - h1h->h1h_off,
1448                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1449    h1h->h1h_off += nread;
1450    if (h1h->h1h_off == h1h->h1h_size)
1451    {
1452        LSQ_DEBUG("read all uncompressed headers");
1453        destroy_uh(stream);
1454        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1455        {
1456            stream->stream_flags |= STREAM_FIN_REACHED;
1457            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1458        }
1459    }
1460    return nread;
1461}
1462
1463
1464static void
1465verify_cl_on_fin (struct lsquic_stream *stream)
1466{
1467    struct lsquic_conn *lconn;
1468
1469    /* The rules in RFC7230, Section 3.3.2 are a bit too intricate.  We take
1470     * a simple approach and verify content-length only when there was any
1471     * payload at all.
1472     */
1473    if (stream->sm_data_in != 0 && stream->sm_cont_len != stream->sm_data_in)
1474    {
1475        lconn = stream->conn_pub->lconn;
1476        lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR,
1477            "number of bytes in DATA frames of stream %"PRIu64" is %llu, "
1478            "while content-length specified of %llu", stream->id,
1479            stream->sm_data_in, stream->sm_cont_len);
1480    }
1481}
1482
1483
1484static void
1485stream_consumed_bytes (struct lsquic_stream *stream)
1486{
1487    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1488    if (lsquic_sfcw_fc_offsets_changed(&stream->fc)
1489            /* We advance crypto streams' offsets (to control amount of
1490             * buffering we allow), but do not send MAX_STREAM_DATA frames.
1491             */
1492            && !((stream->sm_bflags & (SMBF_IETF|SMBF_CRYPTO))
1493                                                == (SMBF_IETF|SMBF_CRYPTO)))
1494    {
1495        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1496            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1497                                                            next_send_stream);
1498        stream->sm_qflags |= SMQF_SEND_WUF;
1499        maybe_conn_to_tickable_if_writeable(stream, 1);
1500    }
1501}
1502
1503
1504static ssize_t
1505read_data_frames (struct lsquic_stream *stream, int do_filtering,
1506        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1507{
1508    struct data_frame *data_frame;
1509    size_t nread, toread, total_nread;
1510    int short_read, processed_frames;
1511
1512    processed_frames = 0;
1513    total_nread = 0;
1514
1515    while ((data_frame = stream->data_in->di_if->di_get_frame(
1516                                        stream->data_in, stream->read_offset)))
1517    {
1518
1519        ++processed_frames;
1520
1521        do
1522        {
1523            if (do_filtering && stream->sm_sfi)
1524                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1525            else
1526                toread = data_frame->df_size - data_frame->df_read_off;
1527
1528            if (toread || data_frame->df_fin)
1529            {
1530                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1531                                                     toread, data_frame->df_fin);
1532                if (do_filtering && stream->sm_sfi)
1533                    stream->sm_sfi->sfi_decr_left(stream, nread);
1534                data_frame->df_read_off += nread;
1535                stream->read_offset += nread;
1536                total_nread += nread;
1537                short_read = nread < toread;
1538            }
1539            else
1540                short_read = 0;
1541
1542            if (data_frame->df_read_off == data_frame->df_size)
1543            {
1544                const int fin = data_frame->df_fin;
1545                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1546                data_frame = NULL;
1547                if (0 != maybe_switch_data_in(stream))
1548                    return -1;
1549                if (fin)
1550                {
1551                    stream->stream_flags |= STREAM_FIN_REACHED;
1552                    if (stream->sm_bflags & SMBF_VERIFY_CL)
1553                        verify_cl_on_fin(stream);
1554                    goto end_while;
1555                }
1556            }
1557            else if (short_read)
1558                goto end_while;
1559        }
1560        while (data_frame);
1561    }
1562  end_while:
1563
1564    if (processed_frames)
1565        stream_consumed_bytes(stream);
1566
1567    return total_nread;
1568}
1569
1570
1571static ssize_t
1572stream_readf (struct lsquic_stream *stream,
1573        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1574{
1575    size_t total_nread;
1576    ssize_t nread;
1577
1578    total_nread = 0;
1579
1580    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1581                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1582            && !(stream->stream_flags & STREAM_HAVE_UH)
1583            && !stream->uh)
1584    {
1585        if (stream->sm_readable(stream))
1586        {
1587            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1588            {
1589                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1590                errno = EBADMSG;
1591                return -1;
1592            }
1593            assert(stream->uh);
1594        }
1595        else
1596        {
1597            errno = EWOULDBLOCK;
1598            return -1;
1599        }
1600    }
1601
1602    if (stream->uh)
1603    {
1604        if (stream->uh->uh_flags & UH_H1H)
1605        {
1606            total_nread += read_uh(stream, readf, ctx);
1607            if (stream->uh)
1608                return total_nread;
1609        }
1610        else
1611        {
1612            LSQ_INFO("header set not claimed: cannot read from stream");
1613            return -1;
1614        }
1615    }
1616    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1617                                && !(stream->stream_flags & STREAM_HAVE_UH))
1618    {
1619        LSQ_DEBUG("cannot read: headers not available");
1620        errno = EWOULDBLOCK;
1621        return -1;
1622    }
1623
1624    nread = read_data_frames(stream, 1, readf, ctx);
1625    if (nread < 0)
1626        return nread;
1627    total_nread += (size_t) nread;
1628
1629    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64", reached fin: %d",
1630        __func__, total_nread, stream->read_offset,
1631        !!(stream->stream_flags & STREAM_FIN_REACHED));
1632
1633    if (total_nread)
1634        return total_nread;
1635    else if (stream->stream_flags & STREAM_FIN_REACHED)
1636        return 0;
1637    else
1638    {
1639        errno = EWOULDBLOCK;
1640        return -1;
1641    }
1642}
1643
1644
1645/* This function returns 0 when EOF is reached.
1646 */
1647ssize_t
1648lsquic_stream_readf (struct lsquic_stream *stream,
1649        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1650{
1651    ssize_t nread;
1652
1653    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1654
1655    if (stream_is_read_reset(stream))
1656    {
1657        if (stream->stream_flags & STREAM_RST_RECVD)
1658            stream->stream_flags |= STREAM_RST_READ;
1659        errno = ECONNRESET;
1660        return -1;
1661    }
1662    if (stream->stream_flags & STREAM_U_READ_DONE)
1663    {
1664        errno = EBADF;
1665        return -1;
1666    }
1667    if (stream->stream_flags & STREAM_FIN_REACHED)
1668    {
1669       if (stream->sm_bflags & SMBF_USE_HEADERS)
1670       {
1671            if ((stream->stream_flags & STREAM_HAVE_UH) && !stream->uh)
1672                return 0;
1673       }
1674       else
1675           return 0;
1676    }
1677
1678    nread = stream_readf(stream, readf, ctx);
1679    if (nread >= 0)
1680        maybe_update_last_progress(stream);
1681
1682    return nread;
1683}
1684
1685
1686struct readv_ctx
1687{
1688    const struct iovec        *iov;
1689    const struct iovec *const  end;
1690    unsigned char             *p;
1691};
1692
1693
1694static size_t
1695readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1696{
1697    struct readv_ctx *const ctx = ctx_p;
1698    const unsigned char *const end = buf + len;
1699    size_t ntocopy;
1700
1701    while (ctx->iov < ctx->end && buf < end)
1702    {
1703        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1704                                                                    - ctx->p;
1705        if (ntocopy > (size_t) (end - buf))
1706            ntocopy = end - buf;
1707        memcpy(ctx->p, buf, ntocopy);
1708        ctx->p += ntocopy;
1709        buf += ntocopy;
1710        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1711        {
1712            do
1713                ++ctx->iov;
1714            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1715            if (ctx->iov < ctx->end)
1716                ctx->p = ctx->iov->iov_base;
1717            else
1718                break;
1719        }
1720    }
1721
1722    return len - (end - buf);
1723}
1724
1725
1726ssize_t
1727lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1728                     int iovcnt)
1729{
1730    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1731    return lsquic_stream_readf(stream, readv_f, &ctx);
1732}
1733
1734
1735ssize_t
1736lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1737{
1738    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1739    return lsquic_stream_readv(stream, &iov, 1);
1740}
1741
1742
1743void
1744lsquic_stream_ss_frame_sent (struct lsquic_stream *stream)
1745{
1746    assert(stream->sm_qflags & SMQF_SEND_STOP_SENDING);
1747    SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_OUT);
1748    stream->sm_qflags &= ~SMQF_SEND_STOP_SENDING;
1749    stream->stream_flags |= STREAM_SS_SENT;
1750    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1751        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1752}
1753
1754
1755static void
1756handle_early_read_shutdown_ietf (struct lsquic_stream *stream)
1757{
1758    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1759        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1760                                                    next_send_stream);
1761    stream->sm_qflags |= SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF;
1762}
1763
1764
1765static void
1766handle_early_read_shutdown_gquic (struct lsquic_stream *stream)
1767{
1768    if (!(stream->stream_flags & STREAM_RST_SENT))
1769    {
1770        lsquic_stream_reset_ext(stream, 7 /* QUIC_STREAM_CANCELLED */, 0);
1771        stream->sm_qflags |= SMQF_WAIT_FIN_OFF;
1772    }
1773}
1774
1775
1776static void
1777handle_early_read_shutdown (struct lsquic_stream *stream)
1778{
1779    if (stream->sm_bflags & SMBF_IETF)
1780        handle_early_read_shutdown_ietf(stream);
1781    else
1782        handle_early_read_shutdown_gquic(stream);
1783}
1784
1785
1786static void
1787stream_shutdown_read (lsquic_stream_t *stream)
1788{
1789    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1790    {
1791        if (!(stream->stream_flags & STREAM_FIN_REACHED))
1792        {
1793            LSQ_DEBUG("read shut down before reading FIN.  (FIN received: %d)",
1794                !!(stream->stream_flags & STREAM_FIN_RECVD));
1795            SM_HISTORY_APPEND(stream, SHE_EARLY_READ_STOP);
1796            if (!(stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))
1797                handle_early_read_shutdown(stream);
1798        }
1799        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1800        stream->stream_flags |= STREAM_U_READ_DONE;
1801        stream->sm_readable = stream_readable_discard;
1802        stream_wantread(stream, 0);
1803        maybe_finish_stream(stream);
1804    }
1805}
1806
1807
1808static int
1809stream_is_incoming_unidir (const struct lsquic_stream *stream)
1810{
1811    enum stream_id_type sit;
1812
1813    if (stream->sm_bflags & SMBF_IETF)
1814    {
1815        sit = stream->id & SIT_MASK;
1816        if (stream->sm_bflags & SMBF_SERVER)
1817            return sit == SIT_UNI_CLIENT;
1818        else
1819            return sit == SIT_UNI_SERVER;
1820    }
1821    else
1822        return 0;
1823}
1824
1825
1826static void
1827stream_shutdown_write (lsquic_stream_t *stream)
1828{
1829    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1830        return;
1831
1832    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1833    stream->stream_flags |= STREAM_U_WRITE_DONE;
1834    stream_wantwrite(stream, 0);
1835
1836    /* Don't bother to check whether there is anything else to write if
1837     * the flags indicate that nothing else should be written.
1838     */
1839    if (!(stream->sm_bflags & SMBF_CRYPTO)
1840            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1841                && !stream_is_incoming_unidir(stream)
1842                        /* In gQUIC, receiving a RESET means "stop sending" */
1843                    && !(!(stream->sm_bflags & SMBF_IETF)
1844                                && (stream->stream_flags & STREAM_RST_RECVD)))
1845    {
1846        if ((stream->sm_bflags & SMBF_USE_HEADERS)
1847                && !(stream->stream_flags & STREAM_HEADERS_SENT))
1848        {
1849            LSQ_DEBUG("headers not sent, send a reset");
1850            lsquic_stream_reset(stream, 0);
1851        }
1852        else if (stream->sm_n_buffered == 0)
1853        {
1854            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1855                                                 stream))
1856            {
1857                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1858                stream->stream_flags |= STREAM_FIN_SENT;
1859            }
1860            else
1861            {
1862                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1863                          "flag in it");
1864                (void) stream_flush_nocheck(stream);
1865            }
1866        }
1867        else
1868            (void) stream_flush_nocheck(stream);
1869    }
1870}
1871
1872
1873static void
1874maybe_stream_shutdown_write (struct lsquic_stream *stream)
1875{
1876    if (stream->sm_send_headers_state == SSHS_BEGIN)
1877        stream_shutdown_write(stream);
1878    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1879    {
1880        LSQ_DEBUG("shutdown delayed");
1881        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1882        stream->stream_flags |= STREAM_DELAYED_SW;
1883    }
1884}
1885
1886
1887int
1888lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1889{
1890    LSQ_DEBUG("shutdown; how: %d", how);
1891    if (lsquic_stream_is_closed(stream))
1892    {
1893        LSQ_INFO("Attempt to shut down a closed stream");
1894        errno = EBADF;
1895        return -1;
1896    }
1897    /* 0: read, 1: write: 2: read and write
1898     */
1899    if (how < 0 || how > 2)
1900    {
1901        errno = EINVAL;
1902        return -1;
1903    }
1904
1905    if (how)
1906        maybe_stream_shutdown_write(stream);
1907    if (how != 1)
1908        stream_shutdown_read(stream);
1909
1910    maybe_finish_stream(stream);
1911    maybe_schedule_call_on_close(stream);
1912    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1913        maybe_conn_to_tickable_if_writeable(stream, 1);
1914
1915    return 0;
1916}
1917
1918
1919void
1920lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1921{
1922    LSQ_DEBUG("internal shutdown");
1923    stream->stream_flags |= STREAM_U_READ_DONE|STREAM_U_WRITE_DONE;
1924    stream_wantwrite(stream, 0);
1925    stream_wantread(stream, 0);
1926    if (lsquic_stream_is_critical(stream))
1927    {
1928        LSQ_DEBUG("add flag to force-finish special stream");
1929        stream->stream_flags |= STREAM_FORCE_FINISH;
1930        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1931    }
1932    maybe_finish_stream(stream);
1933    maybe_schedule_call_on_close(stream);
1934}
1935
1936
1937static void
1938fake_reset_unused_stream (lsquic_stream_t *stream)
1939{
1940    stream->stream_flags |=
1941        STREAM_RST_RECVD    /* User will pick this up on read or write */
1942      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1943    ;
1944
1945    /* Cancel all writes to the network scheduled for this stream: */
1946    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1947        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1948                                                next_send_stream);
1949    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1950    drop_buffered_data(stream);
1951    LSQ_DEBUG("fake-reset stream%s",
1952                    stream_stalled(stream) ? " (stalled)" : "");
1953    maybe_finish_stream(stream);
1954    maybe_schedule_call_on_close(stream);
1955}
1956
1957
1958/* This function should only be called for locally-initiated streams whose ID
1959 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1960 * frame sent by peer but we have not yet received it and created a stream.
1961 * In this situation, we mark the stream as reset, so that user's on_read or
1962 * on_write event callback picks up the error.  That, in turn, should result
1963 * in stream being closed.
1964 *
1965 * If we have received any data frames on this stream, this probably indicates
1966 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1967 * lower than this.  However, we still try to handle it gracefully and peform
1968 * a shutdown, as if the stream was not reset.
1969 */
1970void
1971lsquic_stream_received_goaway (lsquic_stream_t *stream)
1972{
1973    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1974
1975    if (stream->stream_flags & STREAM_GOAWAY_IN)
1976    {
1977        LSQ_DEBUG("ignore duplicate GOAWAY");
1978        return;
1979    }
1980    stream->stream_flags |= STREAM_GOAWAY_IN;
1981
1982    if (0 == stream->read_offset &&
1983                            stream->data_in->di_if->di_empty(stream->data_in))
1984        fake_reset_unused_stream(stream);       /* Normal condition */
1985    else
1986    {   /* This is odd, let's handle it the best we can: */
1987        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1988        lsquic_stream_shutdown_internal(stream);
1989    }
1990}
1991
1992
1993uint64_t
1994lsquic_stream_read_offset (const lsquic_stream_t *stream)
1995{
1996    return stream->read_offset;
1997}
1998
1999
2000static int
2001stream_wantread (lsquic_stream_t *stream, int is_want)
2002{
2003    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
2004    const int new_val = !!is_want;
2005    if (old_val != new_val)
2006    {
2007        if (new_val)
2008        {
2009            if (!old_val)
2010                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
2011                                                            next_read_stream);
2012            stream->sm_qflags |= SMQF_WANT_READ;
2013        }
2014        else
2015        {
2016            stream->sm_qflags &= ~SMQF_WANT_READ;
2017            if (old_val)
2018                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
2019                                                            next_read_stream);
2020        }
2021    }
2022    return old_val;
2023}
2024
2025
2026static void
2027maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
2028{
2029    assert(SMQF_WRITE_Q_FLAGS & flag);
2030    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
2031        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2032                                                        next_write_stream);
2033    stream->sm_qflags |= flag;
2034}
2035
2036
2037static void
2038maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
2039{
2040    assert(SMQF_WRITE_Q_FLAGS & flag);
2041    if (stream->sm_qflags & flag)
2042    {
2043        stream->sm_qflags &= ~flag;
2044        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
2045            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2046                                                        next_write_stream);
2047    }
2048}
2049
2050
2051static int
2052stream_wantwrite (struct lsquic_stream *stream, int new_val)
2053{
2054    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
2055
2056    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
2057
2058    if (old_val != new_val)
2059    {
2060        if (new_val)
2061            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
2062        else
2063            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
2064    }
2065    return old_val;
2066}
2067
2068
2069int
2070lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
2071{
2072    SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want);
2073    if (!(stream->stream_flags & STREAM_U_READ_DONE))
2074    {
2075        if (is_want)
2076            maybe_conn_to_tickable_if_readable(stream);
2077        return stream_wantread(stream, is_want);
2078    }
2079    else
2080    {
2081        errno = EBADF;
2082        return -1;
2083    }
2084}
2085
2086
2087int
2088lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
2089{
2090    int old_val;
2091
2092    is_want = !!is_want;
2093
2094    SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want);
2095    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
2096                            && SSHS_BEGIN == stream->sm_send_headers_state)
2097    {
2098        stream->sm_saved_want_write = is_want;
2099        if (is_want)
2100            maybe_conn_to_tickable_if_writeable(stream, 1);
2101        return stream_wantwrite(stream, is_want);
2102    }
2103    else if (SSHS_BEGIN != stream->sm_send_headers_state)
2104    {
2105        old_val = stream->sm_saved_want_write;
2106        stream->sm_saved_want_write = is_want;
2107        return old_val;
2108    }
2109    else
2110    {
2111        errno = EBADF;
2112        return -1;
2113    }
2114}
2115
2116
2117struct progress
2118{
2119    enum stream_flags   s_flags;
2120    enum stream_q_flags q_flags;
2121};
2122
2123
2124static struct progress
2125stream_progress (const struct lsquic_stream *stream)
2126{
2127    return (struct progress) {
2128        .s_flags = stream->stream_flags
2129          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
2130        .q_flags = stream->sm_qflags
2131          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
2132    };
2133}
2134
2135
2136static int
2137progress_eq (struct progress a, struct progress b)
2138{
2139    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
2140}
2141
2142
2143static void
2144stream_dispatch_read_events_loop (lsquic_stream_t *stream)
2145{
2146    unsigned no_progress_count, no_progress_limit;
2147    struct progress progress;
2148    uint64_t size;
2149
2150    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
2151
2152    no_progress_count = 0;
2153    while ((stream->sm_qflags & SMQF_WANT_READ)
2154                                            && lsquic_stream_readable(stream))
2155    {
2156        progress = stream_progress(stream);
2157        size  = stream->read_offset;
2158
2159        stream->stream_if->on_read(stream, stream->st_ctx);
2160
2161        if (no_progress_limit && size == stream->read_offset &&
2162                                progress_eq(progress, stream_progress(stream)))
2163        {
2164            ++no_progress_count;
2165            if (no_progress_count >= no_progress_limit)
2166            {
2167                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
2168                    "progress) in user code reading from stream",
2169                    no_progress_count,
2170                    no_progress_count == 1 ? "" : "s");
2171                break;
2172            }
2173        }
2174        else
2175            no_progress_count = 0;
2176    }
2177}
2178
2179
2180static void
2181stream_hblock_sent (struct lsquic_stream *stream)
2182{
2183    int want_write;
2184
2185    LSQ_DEBUG("header block has been sent: restore default behavior");
2186    stream->sm_send_headers_state = SSHS_BEGIN;
2187    stream->sm_write_avail = stream_write_avail_with_frames;
2188
2189    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
2190    if (want_write != stream->sm_saved_want_write)
2191        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
2192
2193    if (stream->stream_flags & STREAM_DELAYED_SW)
2194    {
2195        LSQ_DEBUG("performing delayed shutdown write");
2196        stream->stream_flags &= ~STREAM_DELAYED_SW;
2197        stream_shutdown_write(stream);
2198        maybe_schedule_call_on_close(stream);
2199        maybe_finish_stream(stream);
2200        maybe_conn_to_tickable_if_writeable(stream, 1);
2201    }
2202}
2203
2204
2205static void
2206on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
2207{
2208    ssize_t nw;
2209
2210    nw = stream_write_buf(stream,
2211                stream->sm_header_block + stream->sm_hblock_off,
2212                stream->sm_hblock_sz - stream->sm_hblock_off);
2213    if (nw > 0)
2214    {
2215        stream->sm_hblock_off += nw;
2216        if (stream->sm_hblock_off == stream->sm_hblock_sz)
2217        {
2218            stream->stream_flags |= STREAM_HEADERS_SENT;
2219            free(stream->sm_header_block);
2220            stream->sm_header_block = NULL;
2221            stream->sm_hblock_sz = 0;
2222            stream_hblock_sent(stream);
2223            LSQ_DEBUG("header block written out successfully");
2224            /* TODO: if there was eos, do something else */
2225            if (stream->sm_qflags & SMQF_WANT_WRITE)
2226                stream->stream_if->on_write(stream, h);
2227        }
2228        else
2229        {
2230            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
2231                nw);
2232        }
2233    }
2234    else if (nw < 0)
2235    {
2236        /* XXX What should happen if we hit an error? TODO */
2237    }
2238}
2239
2240
2241static void
2242(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
2243                                                        lsquic_stream_ctx_t *)
2244{
2245    if (0 == (stream->stream_flags & STREAM_PUSHING)
2246                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
2247        /* Common case */
2248        return stream->stream_if->on_write;
2249    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
2250        return on_write_header_wrapper;
2251    else
2252    {
2253        assert(stream->stream_flags & STREAM_PUSHING);
2254        if (stream_is_pushing_promise(stream))
2255            return on_write_pp_wrapper;
2256        else
2257            return stream->stream_if->on_write;
2258    }
2259}
2260
2261
2262static void
2263stream_dispatch_write_events_loop (lsquic_stream_t *stream)
2264{
2265    unsigned no_progress_count, no_progress_limit;
2266    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2267    struct progress progress;
2268
2269    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
2270
2271    no_progress_count = 0;
2272    stream->stream_flags |= STREAM_LAST_WRITE_OK;
2273    while ((stream->sm_qflags & SMQF_WANT_WRITE)
2274                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
2275                       && stream_writeable(stream))
2276    {
2277        progress = stream_progress(stream);
2278
2279        on_write = select_on_write(stream);
2280        on_write(stream, stream->st_ctx);
2281
2282        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
2283        {
2284            ++no_progress_count;
2285            if (no_progress_count >= no_progress_limit)
2286            {
2287                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
2288                    "progress) in user code writing to stream",
2289                    no_progress_count,
2290                    no_progress_count == 1 ? "" : "s");
2291                break;
2292            }
2293        }
2294        else
2295            no_progress_count = 0;
2296    }
2297}
2298
2299
2300static void
2301stream_dispatch_read_events_once (lsquic_stream_t *stream)
2302{
2303    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
2304    {
2305        stream->stream_if->on_read(stream, stream->st_ctx);
2306    }
2307}
2308
2309
2310uint64_t
2311lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
2312{
2313    size_t frames_sizes;
2314
2315    frames_sizes = active_hq_frame_sizes(stream);
2316    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
2317}
2318
2319
2320static void
2321maybe_mark_as_blocked (lsquic_stream_t *stream)
2322{
2323    struct lsquic_conn_cap *cc;
2324    uint64_t used;
2325
2326    used = lsquic_stream_combined_send_off(stream);
2327    if (stream->max_send_off == used)
2328    {
2329        if (stream->blocked_off < stream->max_send_off)
2330        {
2331            stream->blocked_off = used;
2332            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
2333                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2334                                                            next_send_stream);
2335            stream->sm_qflags |= SMQF_SEND_BLOCKED;
2336            LSQ_DEBUG("marked stream-blocked at stream offset "
2337                                            "%"PRIu64, stream->blocked_off);
2338        }
2339        else
2340            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
2341                " has been, or is about to be, sent", stream->blocked_off);
2342    }
2343
2344    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
2345        && (cc = &stream->conn_pub->conn_cap,
2346                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
2347    {
2348        if (cc->cc_blocked < cc->cc_max)
2349        {
2350            cc->cc_blocked = cc->cc_max;
2351            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
2352            LSQ_DEBUG("marked connection-blocked at connection offset "
2353                                                    "%"PRIu64, cc->cc_max);
2354        }
2355        else
2356            LSQ_DEBUG("stream has already been marked connection-blocked "
2357                "at offset %"PRIu64, cc->cc_blocked);
2358    }
2359}
2360
2361
2362void
2363lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
2364{
2365    assert(stream->sm_qflags & SMQF_WANT_READ);
2366
2367    if (stream->sm_bflags & SMBF_RW_ONCE)
2368        stream_dispatch_read_events_once(stream);
2369    else
2370        stream_dispatch_read_events_loop(stream);
2371}
2372
2373
2374void
2375lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
2376{
2377    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2378    int progress;
2379    uint64_t tosend_off;
2380    unsigned short n_buffered;
2381    enum stream_q_flags q_flags;
2382
2383    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
2384    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
2385    tosend_off = stream->tosend_off;
2386    n_buffered = stream->sm_n_buffered;
2387
2388    if (stream->sm_qflags & SMQF_WANT_FLUSH)
2389        (void) stream_flush(stream);
2390
2391    if (stream->sm_bflags & SMBF_RW_ONCE)
2392    {
2393        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2394            && stream_writeable(stream))
2395        {
2396            on_write = select_on_write(stream);
2397            on_write(stream, stream->st_ctx);
2398        }
2399    }
2400    else
2401        stream_dispatch_write_events_loop(stream);
2402
2403    /* Progress means either flags or offsets changed: */
2404    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2405                        stream->tosend_off == tosend_off &&
2406                            stream->sm_n_buffered == n_buffered);
2407
2408    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2409    {
2410        if (progress)
2411        {   /* Move the stream to the end of the list to ensure fairness. */
2412            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2413                                                            next_write_stream);
2414            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2415                                                            next_write_stream);
2416        }
2417    }
2418}
2419
2420
2421static size_t
2422inner_reader_empty_size (void *ctx)
2423{
2424    return 0;
2425}
2426
2427
2428static size_t
2429inner_reader_empty_read (void *ctx, void *buf, size_t count)
2430{
2431    return 0;
2432}
2433
2434
2435static int
2436stream_flush (lsquic_stream_t *stream)
2437{
2438    struct lsquic_reader empty_reader;
2439    ssize_t nw;
2440
2441    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2442    assert(stream->sm_n_buffered > 0 ||
2443        /* Flushing is also used to packetize standalone FIN: */
2444        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2445                                                    == STREAM_U_WRITE_DONE));
2446
2447    empty_reader.lsqr_size = inner_reader_empty_size;
2448    empty_reader.lsqr_read = inner_reader_empty_read;
2449    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2450    nw = stream_write_to_packets(stream, &empty_reader, 0, SWO_BUFFER);
2451
2452    if (nw >= 0)
2453    {
2454        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2455        return 0;
2456    }
2457    else
2458        return -1;
2459}
2460
2461
2462static int
2463stream_flush_nocheck (lsquic_stream_t *stream)
2464{
2465    size_t frames;
2466
2467    frames = active_hq_frame_sizes(stream);
2468    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2469    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2470    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2471    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2472
2473    return stream_flush(stream);
2474}
2475
2476
2477int
2478lsquic_stream_flush (lsquic_stream_t *stream)
2479{
2480    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2481    {
2482        LSQ_DEBUG("cannot flush closed stream");
2483        errno = EBADF;
2484        return -1;
2485    }
2486
2487    if (0 == stream->sm_n_buffered)
2488    {
2489        LSQ_DEBUG("flushing 0 bytes: noop");
2490        return 0;
2491    }
2492
2493    return stream_flush_nocheck(stream);
2494}
2495
2496
2497static size_t
2498stream_get_n_allowed (const struct lsquic_stream *stream)
2499{
2500    if (stream->sm_n_allocated)
2501        return stream->sm_n_allocated;
2502    else
2503        return stream->conn_pub->path->np_pack_size;
2504}
2505
2506
2507/* The flush threshold is the maximum size of stream data that can be sent
2508 * in a full packet.
2509 */
2510#ifdef NDEBUG
2511static
2512#endif
2513       size_t
2514lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2515                                                            unsigned data_sz)
2516{
2517    enum packet_out_flags flags;
2518    enum packno_bits bits;
2519    size_t packet_header_sz, stream_header_sz, tag_len;
2520    size_t threshold;
2521
2522    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl, PNS_APP);
2523    flags = bits << POBIT_SHIFT;
2524    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2525        flags |= PO_CONN_ID;
2526    if (stream_is_hsk(stream))
2527        flags |= PO_LONGHEAD;
2528
2529    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2530                            stream->conn_pub->path->np_dcid.len, HETY_NOT_SET);
2531    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2532    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2533
2534    threshold = stream_get_n_allowed(stream) - tag_len
2535              - packet_header_sz - stream_header_sz;
2536    return threshold;
2537}
2538
2539
2540#define COMMON_WRITE_CHECKS() do {                                          \
2541    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2542            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2543    {                                                                       \
2544        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2545        {                                                                   \
2546            LSQ_DEBUG("still sending headers: no writing allowed");         \
2547            return 0;                                                       \
2548        }                                                                   \
2549        else                                                                \
2550        {                                                                   \
2551            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2552                                                                "headers"); \
2553            errno = EILSEQ;                                                 \
2554            return -1;                                                      \
2555        }                                                                   \
2556    }                                                                       \
2557    if (lsquic_stream_is_write_reset(stream))                               \
2558    {                                                                       \
2559        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2560        errno = ECONNRESET;                                                 \
2561        return -1;                                                          \
2562    }                                                                       \
2563    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2564    {                                                                       \
2565        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2566                                                                "writing"); \
2567        errno = EBADF;                                                      \
2568        return -1;                                                          \
2569    }                                                                       \
2570} while (0)
2571
2572
2573struct frame_gen_ctx
2574{
2575    lsquic_stream_t      *fgc_stream;
2576    struct lsquic_reader *fgc_reader;
2577    /* We keep our own count of how many bytes were read from reader because
2578     * some readers are external.  The external caller does not have to rely
2579     * on our count, but it can.
2580     */
2581    size_t                fgc_nread_from_reader;
2582    size_t              (*fgc_size) (void *ctx);
2583    int                 (*fgc_fin) (void *ctx);
2584    gsf_read_f            fgc_read;
2585    size_t                fgc_thresh;
2586};
2587
2588
2589static size_t
2590frame_std_gen_size (void *ctx)
2591{
2592    struct frame_gen_ctx *fg_ctx = ctx;
2593    size_t available, remaining;
2594
2595    /* Make sure we are not writing past available size: */
2596    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2597    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2598    if (available < remaining)
2599        remaining = available;
2600
2601    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2602}
2603
2604
2605static size_t
2606stream_hq_frame_size (const struct stream_hq_frame *shf)
2607{
2608    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2609        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2610    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2611        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2612    else
2613    {
2614        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2615                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2616        return 0;
2617    }
2618}
2619
2620
2621static size_t
2622active_hq_frame_sizes (const struct lsquic_stream *stream)
2623{
2624    const struct stream_hq_frame *shf;
2625    size_t size;
2626
2627    size = 0;
2628    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2629                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2630        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2631            if (!(shf->shf_flags & SHF_WRITTEN))
2632                size += stream_hq_frame_size(shf);
2633
2634    return size;
2635}
2636
2637
2638static uint64_t
2639stream_hq_frame_end (const struct stream_hq_frame *shf)
2640{
2641    if (shf->shf_flags & SHF_FIXED_SIZE)
2642        return shf->shf_off + shf->shf_frame_size;
2643    else if (shf->shf_flags & SHF_TWO_BYTES)
2644        return shf->shf_off + ((1 << 14) - 1);
2645    else
2646        return shf->shf_off + ((1 << 6) - 1);
2647}
2648
2649
2650static int
2651frame_in_stream (const struct lsquic_stream *stream,
2652                                            const struct stream_hq_frame *shf)
2653{
2654    return shf >= stream->sm_hq_frame_arr
2655        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2656                                        / sizeof(stream->sm_hq_frame_arr[0])
2657        ;
2658}
2659
2660
2661static void
2662stream_hq_frame_put (struct lsquic_stream *stream,
2663                                                struct stream_hq_frame *shf)
2664{
2665    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2666    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2667    if (frame_in_stream(stream, shf))
2668        memset(shf, 0, sizeof(*shf));
2669    else
2670        lsquic_malo_put(shf);
2671}
2672
2673
2674static void
2675stream_hq_frame_close (struct lsquic_stream *stream,
2676                                                struct stream_hq_frame *shf)
2677{
2678    unsigned bits;
2679
2680    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2681        " (actual offset %"PRIu64")", shf->shf_frame_type,
2682        stream->sm_payload, stream->tosend_off);
2683    assert(shf->shf_flags & SHF_ACTIVE);
2684    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2685    {
2686        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2687        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2688        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2689                                                            bits, 1 << bits);
2690    }
2691    stream_hq_frame_put(stream, shf);
2692}
2693
2694
2695static size_t
2696frame_hq_gen_size (void *ctx)
2697{
2698    struct frame_gen_ctx *fg_ctx = ctx;
2699    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2700    size_t available, remaining, frames;
2701    const struct stream_hq_frame *shf;
2702
2703    frames = 0;
2704    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2705        if (shf->shf_off >= stream->sm_payload)
2706            frames += stream_hq_frame_size(shf);
2707
2708    /* Make sure we are not writing past available size: */
2709    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2710    available = lsquic_stream_write_avail(stream);
2711    if (available < remaining)
2712        remaining = available;
2713
2714    return remaining + stream->sm_n_buffered + frames;
2715}
2716
2717
2718static int
2719frame_std_gen_fin (void *ctx)
2720{
2721    struct frame_gen_ctx *fg_ctx = ctx;
2722    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2723        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2724        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2725        /* Do not use frame_std_gen_size() as it may chop the real size: */
2726        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2727}
2728
2729
2730static void
2731incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2732{
2733    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2734    {
2735        stream->conn_pub->conn_cap.cc_sent += incr;
2736        assert(stream->conn_pub->conn_cap.cc_sent
2737                                    <= stream->conn_pub->conn_cap.cc_max);
2738    }
2739}
2740
2741
2742static void
2743incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2744{
2745    stream->sm_payload += incr;
2746    stream->tosend_off += incr;
2747    assert(stream->tosend_off <= stream->max_send_off);
2748}
2749
2750
2751static void
2752maybe_resize_threshold (struct frame_gen_ctx *fg_ctx)
2753{
2754    struct lsquic_stream *stream = fg_ctx->fgc_stream;
2755    size_t old;
2756
2757    if (fg_ctx->fgc_thresh)
2758    {
2759        old = fg_ctx->fgc_thresh;
2760        fg_ctx->fgc_thresh
2761            = lsquic_stream_flush_threshold(stream, fg_ctx->fgc_size(fg_ctx));
2762        LSQ_DEBUG("changed threshold from %zd to %zd", old, fg_ctx->fgc_thresh);
2763    }
2764}
2765
2766
2767static size_t
2768frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2769{
2770    struct frame_gen_ctx *fg_ctx = ctx;
2771    unsigned char *p = begin_buf;
2772    unsigned char *const end = p + len;
2773    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2774    size_t n_written, available, n_to_write;
2775
2776    if (stream->sm_n_buffered > 0)
2777    {
2778        if (len <= stream->sm_n_buffered)
2779        {
2780            memcpy(p, stream->sm_buf, len);
2781            memmove(stream->sm_buf, stream->sm_buf + len,
2782                                                stream->sm_n_buffered - len);
2783            stream->sm_n_buffered -= len;
2784            if (0 == stream->sm_n_buffered)
2785            {
2786                maybe_resize_stream_buffer(stream);
2787                maybe_resize_threshold(fg_ctx);
2788            }
2789            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2790            incr_sm_payload(stream, len);
2791            *fin = fg_ctx->fgc_fin(fg_ctx);
2792            return len;
2793        }
2794        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2795        p += stream->sm_n_buffered;
2796        stream->sm_n_buffered = 0;
2797        maybe_resize_stream_buffer(stream);
2798        maybe_resize_threshold(fg_ctx);
2799    }
2800
2801    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2802    n_to_write = end - p;
2803    if (n_to_write > available)
2804        n_to_write = available;
2805    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2806                                              n_to_write);
2807    p += n_written;
2808    fg_ctx->fgc_nread_from_reader += n_written;
2809    *fin = fg_ctx->fgc_fin(fg_ctx);
2810    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2811    incr_conn_cap(stream, n_written);
2812    return p - (const unsigned char *) begin_buf;
2813}
2814
2815
2816static struct stream_hq_frame *
2817find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2818{
2819    struct stream_hq_frame *shf;
2820
2821    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2822        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2823            return shf;
2824
2825    return NULL;
2826}
2827
2828
2829static struct stream_hq_frame *
2830find_cur_hq_frame (const struct lsquic_stream *stream)
2831{
2832    return find_hq_frame(stream, stream->sm_payload);
2833}
2834
2835
2836static struct stream_hq_frame *
2837open_hq_frame (struct lsquic_stream *stream)
2838{
2839    struct stream_hq_frame *shf;
2840
2841    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2842            + sizeof(stream->sm_hq_frame_arr)
2843                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2844        if (!(shf->shf_flags & SHF_ACTIVE))
2845            goto found;
2846
2847    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2848    if (!shf)
2849    {
2850        LSQ_WARN("cannot allocate HQ frame");
2851        return NULL;
2852    }
2853    memset(shf, 0, sizeof(*shf));
2854
2855  found:
2856    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2857    shf->shf_flags = SHF_ACTIVE;
2858    return shf;
2859}
2860
2861
2862static struct stream_hq_frame *
2863stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2864            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2865{
2866    struct stream_hq_frame *shf;
2867
2868    shf = open_hq_frame(stream);
2869    if (!shf)
2870    {
2871        LSQ_WARN("could not open HQ frame");
2872        return NULL;
2873    }
2874
2875    shf->shf_off        = off;
2876    shf->shf_flags     |= flags;
2877    shf->shf_frame_type = frame_type;
2878    if (shf->shf_flags & SHF_FIXED_SIZE)
2879    {
2880        shf->shf_frame_size = size;
2881        LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset "
2882            "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size);
2883    }
2884    else
2885    {
2886        shf->shf_frame_ptr  = NULL;
2887        if (size >= (1 << 6))
2888            shf->shf_flags |= SHF_TWO_BYTES;
2889        LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset "
2890            "%"PRIu64, shf->shf_frame_type, shf->shf_off);
2891    }
2892
2893    return shf;
2894}
2895
2896
2897struct hq_arr
2898{
2899    unsigned char     **p;
2900    unsigned            count;
2901    unsigned            max;
2902};
2903
2904
2905static int
2906save_hq_ptr (struct hq_arr *hq_arr, void *p)
2907{
2908    if (hq_arr->count < hq_arr->max)
2909    {
2910        hq_arr->p[hq_arr->count++] = p;
2911        return 0;
2912    }
2913    else
2914        return -1;
2915}
2916
2917
2918static size_t
2919frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2920{
2921    struct frame_gen_ctx *fg_ctx = ctx;
2922    unsigned char *p = begin_buf;
2923    unsigned char *const end = p + len;
2924    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2925    struct stream_hq_frame *shf;
2926    size_t nw, frame_sz, avail, rem;
2927    unsigned bits;
2928    int new;
2929
2930    while (p < end)
2931    {
2932        shf = find_cur_hq_frame(stream);
2933        if (shf)
2934        {
2935            new = 0;
2936            LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64,
2937                                            shf->shf_frame_type, shf->shf_off);
2938        }
2939        else
2940        {
2941            rem = frame_std_gen_size(ctx);
2942            if (rem)
2943            {
2944                if (rem > ((1 << 14) - 1))
2945                    rem = (1 << 14) - 1;
2946                shf = stream_activate_hq_frame(stream,
2947                                    stream->sm_payload, HQFT_DATA, 0, rem);
2948                if (shf)
2949                {
2950                    new = 1;
2951                    goto insert;
2952                }
2953                else
2954                {
2955                    /* TODO: abort connection?  Handle failure somehow */
2956                    break;
2957                }
2958            }
2959            else
2960                break;
2961        }
2962        if (shf->shf_off == stream->sm_payload
2963                                        && !(shf->shf_flags & SHF_WRITTEN))
2964        {
2965  insert:
2966            frame_sz = stream_hq_frame_size(shf);
2967            if (frame_sz > (uintptr_t) (end - p))
2968            {
2969                if (new)
2970                    stream_hq_frame_put(stream, shf);
2971                break;
2972            }
2973            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2974                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2975                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2976            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2977            {
2978                shf->shf_frame_ptr = p;
2979                if (stream->sm_hq_arr && 0 != save_hq_ptr(stream->sm_hq_arr, p))
2980                {
2981                    stream_hq_frame_put(stream, shf);
2982                    break;
2983                }
2984                memset(p, 0, frame_sz);
2985                p += frame_sz;
2986            }
2987            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2988                                                            == SHF_FIXED_SIZE)
2989            {
2990                *p++ = shf->shf_frame_type;
2991                bits = vint_val2bits(shf->shf_frame_size);
2992                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2993                p += 1 << bits;
2994            }
2995            else
2996                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2997                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2998            if (!(shf->shf_flags & SHF_CC_PAID))
2999            {
3000                incr_conn_cap(stream, frame_sz);
3001                shf->shf_flags |= SHF_CC_PAID;
3002            }
3003            shf->shf_flags |= SHF_WRITTEN;
3004            stream->tosend_off += frame_sz;
3005            assert(stream->tosend_off <= stream->max_send_off);
3006        }
3007        else
3008        {
3009            avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
3010            len = stream_hq_frame_end(shf) - stream->sm_payload;
3011            assert(len);
3012            if (len > (unsigned) (end - p))
3013                len = end - p;
3014            if (len > avail)
3015                len = avail;
3016            if (!len)
3017                break;
3018            nw = frame_std_gen_read(ctx, p, len, fin);
3019            p += nw;
3020            if (nw < len)
3021                break;
3022            if (stream_hq_frame_end(shf) == stream->sm_payload)
3023                stream_hq_frame_close(stream, shf);
3024        }
3025    }
3026
3027    return p - (unsigned char *) begin_buf;
3028}
3029
3030
3031static void
3032check_flush_threshold (lsquic_stream_t *stream)
3033{
3034    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
3035                            stream->tosend_off >= stream->sm_flush_to)
3036    {
3037        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
3038                                                    stream->sm_flush_to);
3039        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
3040    }
3041}
3042
3043
3044#if LSQUIC_EXTRA_CHECKS
3045static void
3046verify_conn_cap (const struct lsquic_conn_public *conn_pub)
3047{
3048    const struct lsquic_stream *stream;
3049    struct lsquic_hash_elem *el;
3050    unsigned n_buffered;
3051
3052    if (conn_pub->wtp_level > 1)
3053        return;
3054
3055    if (!conn_pub->all_streams)
3056        /* TODO: enable this check for unit tests as well */
3057        return;
3058
3059    n_buffered = 0;
3060    for (el = lsquic_hash_first(conn_pub->all_streams); el;
3061                                 el = lsquic_hash_next(conn_pub->all_streams))
3062    {
3063        stream = lsquic_hashelem_getdata(el);
3064        if (stream->sm_bflags & SMBF_CONN_LIMITED)
3065            n_buffered += stream->sm_n_buffered;
3066    }
3067
3068    assert(n_buffered + conn_pub->stream_frame_bytes
3069                                            == conn_pub->conn_cap.cc_sent);
3070    LSQ_DEBUG("%s: cc_sent: %"PRIu64, __func__, conn_pub->conn_cap.cc_sent);
3071}
3072
3073
3074#endif
3075
3076
3077static int
3078write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
3079                                        struct lsquic_packet_out *packet_out)
3080{
3081    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
3082    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
3083    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
3084    unsigned off;
3085    int len, s;
3086
3087#if LSQUIC_CONN_STATS || LSQUIC_EXTRA_CHECKS
3088    const uint64_t begin_off = stream->tosend_off;
3089#endif
3090    off = packet_out->po_data_sz;
3091    len = pf->pf_gen_stream_frame(
3092                packet_out->po_data + packet_out->po_data_sz,
3093                lsquic_packet_out_avail(packet_out), stream->id,
3094                stream->tosend_off,
3095                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
3096    if (len < 0)
3097        return len;
3098
3099#if LSQUIC_CONN_STATS
3100    stream->conn_pub->conn_stats->out.stream_frames += 1;
3101    stream->conn_pub->conn_stats->out.stream_data_sz
3102                                            += stream->tosend_off - begin_off;
3103#endif
3104    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
3105                            packet_out->po_data + packet_out->po_data_sz, len);
3106    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
3107    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
3108    if (0 == lsquic_packet_out_avail(packet_out))
3109        packet_out->po_flags |= PO_STREAM_END;
3110    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
3111                                     stream, QUIC_FRAME_STREAM, off, len);
3112    if (s != 0)
3113    {
3114        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
3115        return -1;
3116    }
3117#if LSQUIC_EXTRA_CHECKS
3118    if (stream->sm_bflags & SMBF_CONN_LIMITED)
3119    {
3120        stream->conn_pub->stream_frame_bytes += stream->tosend_off - begin_off;
3121        verify_conn_cap(stream->conn_pub);
3122    }
3123#endif
3124
3125    check_flush_threshold(stream);
3126    return len;
3127}
3128
3129
3130static enum swtp_status
3131stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
3132{
3133    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
3134    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
3135    struct lsquic_packet_out *packet_out;
3136    int len;
3137
3138    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
3139                                                    stream->conn_pub->path);
3140    if (!packet_out)
3141        return SWTP_STOP;
3142    packet_out->po_header_type = stream->tosend_off == 0
3143                                        ? HETY_INITIAL : HETY_HANDSHAKE;
3144
3145    len = write_stream_frame(fg_ctx, size, packet_out);
3146
3147    if (len > 0)
3148    {
3149        packet_out->po_flags |= PO_HELLO;
3150        lsquic_packet_out_zero_pad(packet_out);
3151        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
3152        return SWTP_OK;
3153    }
3154    else
3155        return SWTP_ERROR;
3156}
3157
3158
3159static enum swtp_status
3160stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
3161{
3162    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
3163    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
3164    unsigned stream_header_sz, need_at_least;
3165    struct lsquic_packet_out *packet_out;
3166    struct lsquic_stream *headers_stream;
3167    int len;
3168
3169    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
3170                                                        == STREAM_HEADERS_SENT)
3171    {
3172        if (stream->sm_bflags & SMBF_IETF)
3173        {
3174            if (stream->stream_flags & STREAM_ENCODER_DEP)
3175                headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out;
3176            else
3177                headers_stream = NULL;
3178        }
3179        else
3180            headers_stream =
3181                lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
3182        if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream))
3183        {
3184            LSQ_DEBUG("flushing headers stream before packetizing stream data");
3185            (void) lsquic_stream_flush(headers_stream);
3186        }
3187        /* If there is nothing to flush, some other stream must have flushed it:
3188         * this means our headers are flushed.  Either way, only do this once.
3189         */
3190        stream->stream_flags |= STREAM_HDRS_FLUSHED;
3191    }
3192
3193    stream_header_sz = stream->sm_frame_header_sz(stream, size);
3194    need_at_least = stream_header_sz;
3195    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3196                                       == (SMBF_IETF|SMBF_USE_HEADERS))
3197    {
3198        if (size > 0)
3199            need_at_least += 3;     /* Enough room for HTTP/3 frame */
3200    }
3201    else
3202        need_at_least += size > 0;
3203  get_packet:
3204    packet_out = stream->sm_get_packet_for_stream(send_ctl,
3205                                need_at_least, stream->conn_pub->path, stream);
3206    if (packet_out)
3207    {
3208        len = write_stream_frame(fg_ctx, size, packet_out);
3209        if (len > 0)
3210            return SWTP_OK;
3211        assert(len < 0);
3212        if (-len > (int) need_at_least)
3213        {
3214            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
3215                "%u bytes, will try again", -len, need_at_least);
3216            need_at_least = -len;
3217            goto get_packet;
3218        }
3219        return SWTP_ERROR;
3220    }
3221    else
3222        return SWTP_STOP;
3223}
3224
3225
3226/* Use for IETF crypto streams and gQUIC crypto stream for versions >= Q050. */
3227static enum swtp_status
3228stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
3229{
3230    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
3231    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
3232    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
3233    unsigned crypto_header_sz, need_at_least;
3234    struct lsquic_packet_out *packet_out;
3235    unsigned short off;
3236    enum packnum_space pns;
3237    int len, s;
3238
3239    if (stream->sm_bflags & SMBF_IETF)
3240        pns = lsquic_enclev2pns[ crypto_level(stream) ];
3241    else
3242        pns = PNS_APP;
3243
3244    assert(size > 0);
3245    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
3246    need_at_least = crypto_header_sz + 1;
3247
3248    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
3249                                    need_at_least, pns, stream->conn_pub->path);
3250    if (!packet_out)
3251        return SWTP_STOP;
3252
3253    off = packet_out->po_data_sz;
3254    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
3255                lsquic_packet_out_avail(packet_out), 0, stream->tosend_off, 0,
3256                size, frame_std_gen_read, fg_ctx);
3257    if (len < 0)
3258        return len;
3259
3260    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
3261                            packet_out->po_data + packet_out->po_data_sz, len);
3262    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
3263    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
3264    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
3265                                     stream, QUIC_FRAME_CRYPTO, off, len);
3266    if (s != 0)
3267    {
3268        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
3269        return -1;
3270    }
3271
3272    packet_out->po_flags |= PO_HELLO;
3273
3274    if (!(stream->sm_bflags & SMBF_IETF))
3275    {
3276        const unsigned short before = packet_out->po_data_sz;
3277        lsquic_packet_out_zero_pad(packet_out);
3278        /* XXX: too hacky */
3279        if (before < packet_out->po_data_sz)
3280            send_ctl->sc_bytes_scheduled += packet_out->po_data_sz - before;
3281    }
3282
3283    check_flush_threshold(stream);
3284    return SWTP_OK;
3285}
3286
3287
3288static void
3289abort_connection (struct lsquic_stream *stream)
3290{
3291    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
3292        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
3293                                                next_service_stream);
3294    stream->sm_qflags |= SMQF_ABORT_CONN;
3295    LSQ_WARN("connection will be aborted");
3296    maybe_conn_to_tickable(stream);
3297}
3298
3299
3300static void
3301maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
3302{
3303    struct stream_hq_frame *shf;
3304    uint64_t size;
3305    unsigned bits;
3306
3307    shf = find_cur_hq_frame(stream);
3308    if (!shf)
3309        return;
3310
3311    if (shf->shf_flags & SHF_FIXED_SIZE)
3312    {
3313        if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload)
3314            stream_hq_frame_put(stream, shf);
3315        return;
3316    }
3317
3318    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
3319    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
3320    if (size <= VINT_MAX_B(bits) && shf->shf_frame_ptr)
3321    {
3322        if (0 == stream->sm_n_buffered)
3323            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
3324                                                shf->shf_frame_type, size);
3325        else
3326            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
3327                                                shf->shf_frame_type, size);
3328        shf->shf_frame_ptr[0] = shf->shf_frame_type;
3329        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
3330        if (0 == stream->sm_n_buffered)
3331            stream_hq_frame_put(stream, shf);
3332        else
3333        {
3334            shf->shf_frame_size = size;
3335            shf->shf_flags |= SHF_FIXED_SIZE;
3336        }
3337    }
3338    else if (!shf->shf_frame_ptr)
3339        LSQ_DEBUG("HQ frame of type 0x%X has not yet been written, not "
3340            "closing", shf->shf_frame_type);
3341    else
3342    {
3343        assert(stream->sm_n_buffered);
3344        LSQ_ERROR("cannot close frame of size %"PRIu64" on stream %"PRIu64
3345            " -- too large", size, stream->id);
3346        stream->conn_pub->lconn->cn_if->ci_internal_error(
3347            stream->conn_pub->lconn, "HTTP/3 frame too large");
3348        stream_hq_frame_put(stream, shf);
3349    }
3350}
3351
3352
3353static ssize_t
3354stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
3355                         size_t thresh, enum stream_write_options swo)
3356{
3357    size_t size;
3358    ssize_t nw;
3359    unsigned seen_ok;
3360    int use_framing;
3361    struct frame_gen_ctx fg_ctx = {
3362        .fgc_stream = stream,
3363        .fgc_reader = reader,
3364        .fgc_nread_from_reader = 0,
3365        .fgc_thresh = thresh,
3366    };
3367
3368#if LSQUIC_EXTRA_CHECKS
3369    if (stream->conn_pub)
3370        ++stream->conn_pub->wtp_level;
3371#endif
3372    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3373                                       == (SMBF_IETF|SMBF_USE_HEADERS);
3374    if (use_framing)
3375    {
3376        fg_ctx.fgc_size = frame_hq_gen_size;
3377        fg_ctx.fgc_read = frame_hq_gen_read;
3378        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
3379    }
3380    else
3381    {
3382        fg_ctx.fgc_size = frame_std_gen_size;
3383        fg_ctx.fgc_read = frame_std_gen_read;
3384        fg_ctx.fgc_fin = frame_std_gen_fin;
3385    }
3386
3387    seen_ok = 0;
3388    while ((size = fg_ctx.fgc_size(&fg_ctx),
3389                            fg_ctx.fgc_thresh
3390                          ? size >= fg_ctx.fgc_thresh : size > 0)
3391           || fg_ctx.fgc_fin(&fg_ctx))
3392    {
3393        switch (stream->sm_write_to_packet(&fg_ctx, size))
3394        {
3395        case SWTP_OK:
3396            if (!seen_ok++)
3397            {
3398                maybe_conn_to_tickable_if_writeable(stream, 0);
3399                maybe_update_last_progress(stream);
3400            }
3401            if (fg_ctx.fgc_fin(&fg_ctx))
3402            {
3403                if (use_framing && seen_ok)
3404                    maybe_close_varsize_hq_frame(stream);
3405                stream->stream_flags |= STREAM_FIN_SENT;
3406                goto end;
3407            }
3408            else
3409                break;
3410        case SWTP_STOP:
3411            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3412            if (use_framing && seen_ok)
3413                maybe_close_varsize_hq_frame(stream);
3414            goto end;
3415        default:
3416            abort_connection(stream);
3417            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3418            goto err;
3419        }
3420    }
3421
3422    if (use_framing && seen_ok)
3423        maybe_close_varsize_hq_frame(stream);
3424
3425    if (fg_ctx.fgc_thresh && (swo & SWO_BUFFER))
3426    {
3427        assert(size < fg_ctx.fgc_thresh);
3428        assert(size >= stream->sm_n_buffered);
3429        size -= stream->sm_n_buffered;
3430        if (size > 0)
3431        {
3432            nw = save_to_buffer(stream, reader, size);
3433            if (nw < 0)
3434                goto err;
3435            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
3436        }
3437    }
3438#ifndef NDEBUG
3439    else if (swo & SWO_BUFFER)
3440    {
3441        /* We count flushed data towards both stream and connection limits,
3442         * so we should have been able to packetize all of it:
3443         */
3444        assert(0 == stream->sm_n_buffered);
3445        assert(size == 0);
3446    }
3447#endif
3448
3449    maybe_mark_as_blocked(stream);
3450
3451  end:
3452#if LSQUIC_EXTRA_CHECKS
3453    if (stream->conn_pub)
3454        --stream->conn_pub->wtp_level;
3455#endif
3456    return fg_ctx.fgc_nread_from_reader;
3457
3458  err:
3459#if LSQUIC_EXTRA_CHECKS
3460    if (stream->conn_pub)
3461        --stream->conn_pub->wtp_level;
3462#endif
3463    return -1;
3464}
3465
3466
3467/* Perform an implicit flush when we hit connection limit while buffering
3468 * data.  This is to prevent a (theoretical) stall:
3469 *
3470 * Imagine a number of streams, all of which buffered some data.  The buffered
3471 * data is up to connection cap, which means no further writes are possible.
3472 * None of them flushes, which means that data is not sent and connection
3473 * WINDOW_UPDATE frame never arrives from peer.  Stall.
3474 */
3475static int
3476maybe_flush_stream (struct lsquic_stream *stream)
3477{
3478    if (stream->sm_n_buffered > 0
3479          && (stream->sm_bflags & SMBF_CONN_LIMITED)
3480            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
3481        return stream_flush_nocheck(stream);
3482    else
3483        return 0;
3484}
3485
3486
3487static int
3488stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
3489                                                                    unsigned len)
3490{
3491    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
3492        && cur_off - shf->shf_off < (1 << 6)
3493        && cur_off - shf->shf_off + len >= (1 << 6)
3494        ;
3495}
3496
3497
3498/* Update currently buffered HQ frame or create a new one, if possible.
3499 * Return update length to be buffered.  If a HQ frame cannot be
3500 * buffered due to size, 0 is returned, thereby preventing both HQ frame
3501 * creation and buffering.
3502 */
3503static size_t
3504update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
3505                                                                size_t avail)
3506{
3507    struct stream_hq_frame *shf;
3508    uint64_t cur_off, end;
3509    size_t frame_sz;
3510    unsigned extendable;
3511#if _MSC_VER
3512    end = 0;
3513    extendable = 0;
3514#endif
3515
3516    cur_off = stream->sm_payload + stream->sm_n_buffered;
3517    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3518        if (shf->shf_off <= cur_off)
3519        {
3520            end = stream_hq_frame_end(shf);
3521            extendable = stream_hq_frame_extendable(shf, cur_off, len);
3522            if (cur_off < end + extendable)
3523                break;
3524        }
3525
3526    if (shf)
3527    {
3528        if (len > end + extendable - cur_off)
3529            len = end + extendable - cur_off;
3530        frame_sz = stream_hq_frame_size(shf);
3531    }
3532    else
3533    {
3534        assert(avail >= 3);
3535        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
3536        if (!shf)
3537            return 0;
3538        if (len > stream_hq_frame_end(shf) - cur_off)
3539            len = stream_hq_frame_end(shf) - cur_off;
3540        extendable = 0;
3541        frame_sz = stream_hq_frame_size(shf);
3542        if (avail < frame_sz)
3543            return 0;
3544        avail -= frame_sz;
3545    }
3546
3547    if (!(shf->shf_flags & SHF_CC_PAID))
3548    {
3549        incr_conn_cap(stream, frame_sz);
3550        shf->shf_flags |= SHF_CC_PAID;
3551    }
3552    if (extendable)
3553    {
3554        shf->shf_flags |= SHF_TWO_BYTES;
3555        incr_conn_cap(stream, 1);
3556        avail -= 1;
3557        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3558                && shf->shf_off <= stream->sm_payload
3559                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3560            stream->sm_flush_to += 1;
3561    }
3562
3563    if (len <= avail)
3564        return len;
3565    else
3566        return avail;
3567}
3568
3569
3570static ssize_t
3571save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3572                                                                size_t len)
3573{
3574    size_t avail, n_written, n_allowed;
3575
3576    avail = lsquic_stream_write_avail(stream);
3577    if (avail < len)
3578        len = avail;
3579    if (len == 0)
3580    {
3581        LSQ_DEBUG("zero-byte write (avail: %zu)", avail);
3582        return 0;
3583    }
3584
3585    n_allowed = stream_get_n_allowed(stream);
3586    assert(stream->sm_n_buffered + len <= n_allowed);
3587
3588    if (!stream->sm_buf)
3589    {
3590        stream->sm_buf = malloc(n_allowed);
3591        if (!stream->sm_buf)
3592            return -1;
3593        stream->sm_n_allocated = n_allowed;
3594    }
3595
3596    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3597                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3598        len = update_buffered_hq_frames(stream, len, avail);
3599
3600    n_written = reader->lsqr_read(reader->lsqr_ctx,
3601                        stream->sm_buf + stream->sm_n_buffered, len);
3602    stream->sm_n_buffered += n_written;
3603    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3604    incr_conn_cap(stream, n_written);
3605    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3606              n_written, stream->sm_n_buffered);
3607    if (0 != maybe_flush_stream(stream))
3608        return -1;
3609    return n_written;
3610}
3611
3612
3613static ssize_t
3614stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader,
3615                                                enum stream_write_options swo)
3616{
3617    const struct stream_hq_frame *shf;
3618    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3619    ssize_t nw;
3620
3621    len = reader->lsqr_size(reader->lsqr_ctx);
3622    if (len == 0)
3623        return 0;
3624
3625    frames = 0;
3626    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3627                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3628        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3629            if (shf->shf_off >= stream->sm_payload)
3630                frames += stream_hq_frame_size(shf);
3631    total_len = len + frames + stream->sm_n_buffered;
3632    thresh = lsquic_stream_flush_threshold(stream, total_len);
3633    n_allowed = stream_get_n_allowed(stream);
3634    if (total_len <= n_allowed && total_len < thresh)
3635    {
3636        if (!(swo & SWO_BUFFER))
3637            return 0;
3638        nwritten = 0;
3639        do
3640        {
3641            nw = save_to_buffer(stream, reader, len - nwritten);
3642            if (nw > 0)
3643                nwritten += (size_t) nw;
3644            else if (nw == 0)
3645                break;
3646            else
3647                return nw;
3648        }
3649        while (nwritten < len
3650                        && stream->sm_n_buffered < stream->sm_n_allocated);
3651        return nwritten;
3652    }
3653    else
3654        return stream_write_to_packets(stream, reader, thresh, swo);
3655}
3656
3657
3658ssize_t
3659lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3660{
3661    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3662    return lsquic_stream_writev(stream, &iov, 1);
3663}
3664
3665
3666struct inner_reader_iovec {
3667    const struct iovec       *iov;
3668    const struct iovec *end;
3669    unsigned                  cur_iovec_off;
3670};
3671
3672
3673static size_t
3674inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3675{
3676    struct inner_reader_iovec *const iro = ctx;
3677    unsigned char *p = buf;
3678    unsigned char *const end = p + count;
3679    unsigned n_tocopy;
3680
3681    while (iro->iov < iro->end && p < end)
3682    {
3683        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3684        if (n_tocopy > (unsigned) (end - p))
3685            n_tocopy = end - p;
3686        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3687                                                                    n_tocopy);
3688        p += n_tocopy;
3689        iro->cur_iovec_off += n_tocopy;
3690        if (iro->iov->iov_len == iro->cur_iovec_off)
3691        {
3692            ++iro->iov;
3693            iro->cur_iovec_off = 0;
3694        }
3695    }
3696
3697    return p + count - end;
3698}
3699
3700
3701static size_t
3702inner_reader_iovec_size (void *ctx)
3703{
3704    struct inner_reader_iovec *const iro = ctx;
3705    const struct iovec *iov;
3706    size_t size;
3707
3708    size = 0;
3709    for (iov = iro->iov; iov < iro->end; ++iov)
3710        size += iov->iov_len;
3711
3712    return size - iro->cur_iovec_off;
3713}
3714
3715
3716ssize_t
3717lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3718                                                                    int iovcnt)
3719{
3720    COMMON_WRITE_CHECKS();
3721    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3722
3723    struct inner_reader_iovec iro = {
3724        .iov = iov,
3725        .end = iov + iovcnt,
3726        .cur_iovec_off = 0,
3727    };
3728    struct lsquic_reader reader = {
3729        .lsqr_read = inner_reader_iovec_read,
3730        .lsqr_size = inner_reader_iovec_size,
3731        .lsqr_ctx  = &iro,
3732    };
3733
3734    return stream_write(stream, &reader, SWO_BUFFER);
3735}
3736
3737
3738ssize_t
3739lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3740{
3741    COMMON_WRITE_CHECKS();
3742    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3743    return stream_write(stream, reader, SWO_BUFFER);
3744}
3745
3746
3747/* Configuration for lsquic_stream_pwritev: */
3748#ifndef LSQUIC_PWRITEV_DEF_IOVECS
3749#define LSQUIC_PWRITEV_DEF_IOVECS  16
3750#endif
3751/* This is an overkill, this limit should only be reached during testing: */
3752#ifndef LSQUIC_PWRITEV_DEF_FRAMES
3753#define LSQUIC_PWRITEV_DEF_FRAMES (LSQUIC_PWRITEV_DEF_IOVECS * 2)
3754#endif
3755
3756#ifdef NDEBUG
3757#define PWRITEV_IOVECS  LSQUIC_PWRITEV_DEF_IOVECS
3758#define PWRITEV_FRAMES  LSQUIC_PWRITEV_DEF_FRAMES
3759#else
3760#if _MSC_VER
3761#define MALLOC_PWRITEV 1
3762#else
3763#define MALLOC_PWRITEV 0
3764#endif
3765static unsigned
3766    PWRITEV_IOVECS  = LSQUIC_PWRITEV_DEF_IOVECS,
3767    PWRITEV_FRAMES  = LSQUIC_PWRITEV_DEF_FRAMES;
3768
3769void
3770lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames)
3771{
3772    PWRITEV_IOVECS  = iovecs;
3773    PWRITEV_FRAMES  = frames;
3774}
3775
3776
3777#endif
3778
3779struct pwritev_ctx
3780{
3781    struct iovec *iov;
3782    const struct hq_arr *hq_arr;
3783    size_t        total_bytes;
3784    size_t        n_to_write;
3785    unsigned      n_iovecs, max_iovecs;
3786};
3787
3788
3789static size_t
3790pwritev_size (void *lsqr_ctx)
3791{
3792    struct pwritev_ctx *const ctx = lsqr_ctx;
3793
3794    if (ctx->n_iovecs < ctx->max_iovecs
3795                                    && ctx->hq_arr->count < ctx->hq_arr->max)
3796        return ctx->n_to_write - ctx->total_bytes;
3797    else
3798        return 0;
3799}
3800
3801
3802static size_t
3803pwritev_read (void *lsqr_ctx, void *buf, size_t count)
3804{
3805    struct pwritev_ctx *const ctx = lsqr_ctx;
3806
3807    assert(ctx->n_iovecs < ctx->max_iovecs);
3808    ctx->iov[ctx->n_iovecs].iov_base = buf;
3809    ctx->iov[ctx->n_iovecs].iov_len = count;
3810    ++ctx->n_iovecs;
3811    ctx->total_bytes += count;
3812    return count;
3813}
3814
3815
3816/* pwritev works as follows: allocate packets via lsquic_stream_writef() call
3817 * and record pointers and sizes into an iovec array.  Then issue a single call
3818 * to user-supplied preadv() to populate all packets in one shot.
3819 *
3820 * Unwinding state changes due to a short write is by far the most complicated
3821 * part of the machinery that follows.  We optimize the normal path: it should
3822 * be cheap to be prepared for the unwinding; unwinding itself can be more
3823 * expensive, as we do not expect it to happen often.
3824 */
3825ssize_t
3826lsquic_stream_pwritev (struct lsquic_stream *stream,
3827    ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt),
3828    void *user_data, size_t n_to_write)
3829{
3830    struct lsquic_send_ctl *const ctl = stream->conn_pub->send_ctl;
3831#if MALLOC_PWRITEV
3832    struct iovec *iovecs;
3833    unsigned char **hq_frames;
3834#else
3835    struct iovec iovecs[PWRITEV_IOVECS];
3836    unsigned char *hq_frames[PWRITEV_FRAMES];
3837#endif
3838    struct iovec *last_iov;
3839    struct pwritev_ctx ctx;
3840    struct lsquic_reader reader;
3841    struct send_ctl_state ctl_state;
3842    struct hq_arr hq_arr;
3843    ssize_t nw;
3844    size_t n_allocated, sum;
3845#ifndef NDEBUG
3846    const unsigned short n_buffered = stream->sm_n_buffered;
3847#endif
3848
3849    COMMON_WRITE_CHECKS();
3850    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3851
3852#if MALLOC_PWRITEV
3853    iovecs = malloc(sizeof(iovecs[0]) * PWRITEV_IOVECS);
3854    hq_frames = malloc(sizeof(hq_frames[0]) * PWRITEV_FRAMES);
3855    if (!(iovecs && hq_frames))
3856    {
3857        free(iovecs);
3858        free(hq_frames);
3859        return -1;
3860    }
3861#endif
3862
3863    lsquic_send_ctl_snapshot(ctl, &ctl_state);
3864
3865    ctx.total_bytes = 0;
3866    ctx.n_to_write = n_to_write;
3867    ctx.n_iovecs = 0;
3868    ctx.max_iovecs = PWRITEV_IOVECS;
3869    ctx.iov = iovecs;
3870    ctx.hq_arr = &hq_arr;
3871
3872    hq_arr.p = hq_frames;
3873    hq_arr.count = 0;
3874    hq_arr.max = PWRITEV_FRAMES;
3875    stream->sm_hq_arr = &hq_arr;
3876
3877    reader.lsqr_ctx = &ctx;
3878    reader.lsqr_size = pwritev_size;
3879    reader.lsqr_read = pwritev_read;
3880
3881    nw = stream_write(stream, &reader, 0);
3882    LSQ_DEBUG("pwritev: stream_write returned %zd, n_iovecs: %d", nw,
3883                                                                ctx.n_iovecs);
3884    if (nw > 0)
3885    {
3886        /* Amount of buffered data shouldn't have increased */
3887        assert(n_buffered >= stream->sm_n_buffered);
3888        n_allocated = (size_t) nw;
3889        nw = preadv(user_data, ctx.iov, ctx.n_iovecs);
3890        LSQ_DEBUG("pwritev: preadv returned %zd", nw);
3891        if (nw >= 0 && (size_t) nw < n_allocated)
3892            goto unwind_short_write;
3893    }
3894
3895  cleanup:
3896    stream->sm_hq_arr = NULL;
3897#if MALLOC_PWRITEV
3898    free(iovecs);
3899    free(hq_frames);
3900#endif
3901    return nw;
3902
3903  unwind_short_write:
3904    /* What follows is not the most efficient process.  The emphasis here is
3905     * on being simple instead.  We expect short writes to be rare, so being
3906     * slower than possible is a good tradeoff for being correct.
3907     */
3908    LSQ_DEBUG("short write occurred, unwind");
3909    SM_HISTORY_APPEND(stream, SHE_SHORT_WRITE);
3910
3911    /* First, adjust connection cap and stream offsets, and HTTP/3 framing,
3912     * if necessary.
3913     */
3914    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
3915                                            == (SMBF_USE_HEADERS|SMBF_IETF))
3916    {
3917        size_t shortfall, payload_sz, decr;
3918        unsigned char *p;
3919        unsigned bits;
3920
3921        assert(hq_arr.count > 0);
3922        shortfall = n_allocated - (size_t) nw;
3923        do
3924        {
3925            const unsigned count = hq_arr.count;
3926            (void) count;
3927            p = hq_frames[--hq_arr.count];
3928            assert(p[0] == HQFT_DATA);
3929            assert(!(p[1] & 0x80));     /* Only one- and two-byte frame sizes */
3930            if (p[1] & 0x40)
3931            {
3932                payload_sz = (p[1] & 0x3F) << 8;
3933                payload_sz |= p[2];
3934            }
3935            else
3936                payload_sz = p[1];
3937            if (payload_sz > shortfall)
3938            {
3939                bits = p[1] >> 6;
3940                vint_write(p + 1, payload_sz - shortfall, bits, 1 << bits);
3941                decr = shortfall;
3942                if (stream->sm_bflags & SMBF_CONN_LIMITED)
3943                    stream->conn_pub->conn_cap.cc_sent -= decr;
3944                stream->sm_payload -= decr;
3945                stream->tosend_off -= decr;
3946                shortfall = 0;
3947            }
3948            else
3949            {
3950                decr = payload_sz + 2 + (p[1] >> 6);
3951                if (stream->sm_bflags & SMBF_CONN_LIMITED)
3952                    stream->conn_pub->conn_cap.cc_sent -= decr;
3953                stream->sm_payload -= payload_sz;
3954                stream->tosend_off -= decr;
3955                shortfall -= payload_sz;
3956            }
3957        }
3958        while (hq_arr.count);
3959        assert(shortfall == 0);
3960    }
3961    else
3962    {
3963        const size_t shortfall = n_allocated - (size_t) nw;
3964        if (stream->sm_bflags & SMBF_CONN_LIMITED)
3965            stream->conn_pub->conn_cap.cc_sent -= shortfall;
3966        stream->sm_payload -= shortfall;
3967        stream->tosend_off -= shortfall;
3968    }
3969
3970    /* Find last iovec: */
3971    sum = 0;
3972    for (last_iov = iovecs; last_iov < iovecs + PWRITEV_IOVECS; ++last_iov)
3973    {
3974        sum += last_iov->iov_len;
3975        if ((last_iov == iovecs || (size_t) nw > sum - last_iov->iov_len)
3976                                                        && (size_t) nw <= sum)
3977            break;
3978    }
3979    assert(last_iov < iovecs + PWRITEV_IOVECS);
3980    lsquic_send_ctl_rollback(ctl, &ctl_state, last_iov, sum - nw);
3981
3982    goto cleanup;
3983}
3984
3985
3986/* This bypasses COMMON_WRITE_CHECKS */
3987static ssize_t
3988stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3989{
3990    const struct iovec iov[1] = {{ (void *) buf, sz, }};
3991    struct inner_reader_iovec iro = {
3992        .iov = iov,
3993        .end = iov + 1,
3994        .cur_iovec_off = 0,
3995    };
3996    struct lsquic_reader reader = {
3997        .lsqr_read = inner_reader_iovec_read,
3998        .lsqr_size = inner_reader_iovec_size,
3999        .lsqr_ctx  = &iro,
4000    };
4001    return stream_write(stream, &reader, SWO_BUFFER);
4002}
4003
4004
4005/* This limits the cumulative size of the compressed header fields */
4006#define MAX_HEADERS_SIZE (64 * 1024)
4007
4008static int
4009send_headers_ietf (struct lsquic_stream *stream,
4010                            const struct lsquic_http_headers *headers, int eos)
4011{
4012    enum qwh_status qwh;
4013    const size_t max_prefix_size =
4014                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
4015    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
4016    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
4017    unsigned bits;
4018    ssize_t nw;
4019    unsigned char *header_block;
4020    enum lsqpack_enc_header_flags hflags;
4021    int rv;
4022    const size_t buf_sz = max_push_size + max_prefix_size + MAX_HEADERS_SIZE;
4023#ifndef WIN32
4024    unsigned char buf[buf_sz];
4025#else
4026    unsigned char *buf = _malloca(buf_sz);
4027    if (!buf)
4028        return -1;
4029#endif
4030
4031    stream->stream_flags &= ~STREAM_PUSHING;
4032    stream->stream_flags |= STREAM_NOPUSH;
4033
4034    /* TODO: Optimize for the common case: write directly to sm_buf and fall
4035     * back to a larger buffer if that fails.
4036     */
4037    prefix_sz = max_prefix_size;
4038    headers_sz = buf_sz - max_prefix_size - max_push_size;
4039    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
4040                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
4041                &headers_sz, &stream->sm_hb_compl, &hflags);
4042
4043    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
4044    {
4045        if (qwh == QWH_ENOBUF)
4046            LSQ_INFO("not enough room for header block");
4047        else
4048            LSQ_WARN("internal error encoding and sending HTTP headers");
4049        goto err;
4050    }
4051
4052    if (hflags & LSQECH_REF_NEW_ENTRIES)
4053        stream->stream_flags |= STREAM_ENCODER_DEP;
4054
4055    if (stream->sm_promise)
4056    {
4057        assert(lsquic_stream_is_pushed(stream));
4058        bits = vint_val2bits(stream->sm_promise->pp_id);
4059        push_sz = 1 + (1 << bits);
4060        if (!stream_activate_hq_frame(stream,
4061                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
4062                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
4063            goto err;
4064        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
4065        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
4066                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
4067    }
4068    else
4069        push_sz = 0;
4070
4071    /* Construct contiguous header block buffer including HQ framing */
4072    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
4073    hblock_sz = push_sz + prefix_sz + headers_sz;
4074    if (!stream_activate_hq_frame(stream,
4075                stream->sm_payload + stream->sm_n_buffered + push_sz,
4076                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
4077        goto err;
4078
4079    if (qwh == QWH_FULL)
4080    {
4081        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
4082        if (lsquic_stream_write_avail(stream))
4083        {
4084            nw = stream_write_buf(stream, header_block, hblock_sz);
4085            if (nw < 0)
4086            {
4087                LSQ_WARN("cannot write to stream: %s", strerror(errno));
4088                goto err;
4089            }
4090            if ((size_t) nw == hblock_sz)
4091            {
4092                stream->stream_flags |= STREAM_HEADERS_SENT;
4093                stream_hblock_sent(stream);
4094                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
4095                goto end;
4096            }
4097            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
4098        }
4099        else
4100        {
4101            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
4102                                        "header block", hblock_sz);
4103            nw = 0;
4104        }
4105    }
4106    else
4107    {
4108        stream->sm_send_headers_state = SSHS_ENC_SENDING;
4109        nw = 0;
4110    }
4111
4112    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4113    stream_wantwrite(stream, 1);
4114
4115    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
4116    if (!stream->sm_header_block)
4117    {
4118        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
4119            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
4120        goto err;
4121    }
4122    memcpy(stream->sm_header_block, header_block + (size_t) nw,
4123                                                hblock_sz - (size_t) nw);
4124    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
4125    stream->sm_hblock_off = 0;
4126    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
4127
4128  end:
4129    rv = 0;
4130  clean:
4131#ifdef WIN32
4132    _freea(buf);
4133#endif
4134    return rv;
4135
4136  err:
4137    rv = -1;
4138    goto clean;
4139}
4140
4141
4142static int
4143send_headers_gquic (struct lsquic_stream *stream,
4144                            const struct lsquic_http_headers *headers, int eos)
4145{
4146    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
4147                stream->id, headers, eos, lsquic_stream_priority(stream));
4148    if (0 == s)
4149    {
4150        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
4151        stream->stream_flags |= STREAM_HEADERS_SENT;
4152        if (eos)
4153            stream->stream_flags |= STREAM_FIN_SENT;
4154        LSQ_INFO("sent headers");
4155    }
4156    else
4157        LSQ_WARN("could not send headers: %s", strerror(errno));
4158    return s;
4159}
4160
4161
4162int
4163lsquic_stream_send_headers (lsquic_stream_t *stream,
4164                            const lsquic_http_headers_t *headers, int eos)
4165{
4166    if ((stream->sm_bflags & SMBF_USE_HEADERS)
4167            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
4168    {
4169        if (stream->sm_bflags & SMBF_IETF)
4170            return send_headers_ietf(stream, headers, eos);
4171        else
4172            return send_headers_gquic(stream, headers, eos);
4173    }
4174    else
4175    {
4176        LSQ_INFO("cannot send headers in this state");
4177        errno = EBADMSG;
4178        return -1;
4179    }
4180}
4181
4182
4183void
4184lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
4185{
4186    if (offset > stream->max_send_off)
4187    {
4188        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
4189        LSQ_DEBUG("update max send offset from %"PRIu64" to "
4190            "%"PRIu64, stream->max_send_off, offset);
4191        stream->max_send_off = offset;
4192    }
4193    else
4194        LSQ_DEBUG("new offset %"PRIu64" is not larger than old "
4195            "max send offset %"PRIu64", ignoring", offset,
4196            stream->max_send_off);
4197}
4198
4199
4200/* This function is used to update offsets after handshake completes and we
4201 * learn of peer's limits from the handshake values.
4202 */
4203int
4204lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
4205{
4206    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
4207    if (offset > stream->max_send_off)
4208    {
4209        lsquic_stream_window_update(stream, offset);
4210        return 0;
4211    }
4212    else if (offset < stream->tosend_off)
4213    {
4214        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
4215            "data already sent on this stream (%"PRIu64" bytes)", offset,
4216            stream->tosend_off);
4217        return -1;
4218    }
4219    else
4220    {
4221        stream->max_send_off = offset;
4222        return 0;
4223    }
4224}
4225
4226
4227void
4228lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
4229{
4230    lsquic_stream_reset_ext(stream, error_code, 1);
4231}
4232
4233
4234void
4235lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
4236                         int do_close)
4237{
4238    if ((stream->stream_flags & STREAM_RST_SENT)
4239                                    || (stream->sm_qflags & SMQF_SEND_RST))
4240    {
4241        LSQ_INFO("reset already sent");
4242        return;
4243    }
4244
4245    SM_HISTORY_APPEND(stream, SHE_RESET);
4246
4247    LSQ_INFO("reset, error code %"PRIu64, error_code);
4248    stream->error_code = error_code;
4249
4250    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
4251        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
4252                                                        next_send_stream);
4253    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
4254    stream->sm_qflags |= SMQF_SEND_RST;
4255
4256    if (stream->sm_qflags & SMQF_QPACK_DEC)
4257    {
4258        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
4259        stream->sm_qflags &= ~SMQF_QPACK_DEC;
4260    }
4261
4262    drop_buffered_data(stream);
4263    maybe_elide_stream_frames(stream);
4264    maybe_schedule_call_on_close(stream);
4265
4266    if (do_close)
4267        lsquic_stream_close(stream);
4268    else
4269        maybe_conn_to_tickable_if_writeable(stream, 1);
4270}
4271
4272
4273lsquic_stream_id_t
4274lsquic_stream_id (const lsquic_stream_t *stream)
4275{
4276    return stream->id;
4277}
4278
4279
4280#if !defined(NDEBUG) && __GNUC__
4281__attribute__((weak))
4282#endif
4283struct lsquic_conn *
4284lsquic_stream_conn (const lsquic_stream_t *stream)
4285{
4286    return stream->conn_pub->lconn;
4287}
4288
4289
4290int
4291lsquic_stream_close (lsquic_stream_t *stream)
4292{
4293    LSQ_DEBUG("lsquic_stream_close() called");
4294    SM_HISTORY_APPEND(stream, SHE_CLOSE);
4295    if (lsquic_stream_is_closed(stream))
4296    {
4297        LSQ_INFO("Attempt to close an already-closed stream");
4298        errno = EBADF;
4299        return -1;
4300    }
4301    maybe_stream_shutdown_write(stream);
4302    stream_shutdown_read(stream);
4303    maybe_schedule_call_on_close(stream);
4304    maybe_finish_stream(stream);
4305    if (!(stream->stream_flags & STREAM_DELAYED_SW))
4306        maybe_conn_to_tickable_if_writeable(stream, 1);
4307    return 0;
4308}
4309
4310
4311#ifndef NDEBUG
4312#if __GNUC__
4313__attribute__((weak))
4314#endif
4315#endif
4316void
4317lsquic_stream_acked (struct lsquic_stream *stream,
4318                                            enum quic_frame_type frame_type)
4319{
4320    assert(stream->n_unacked);
4321    --stream->n_unacked;
4322    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
4323    if (frame_type == QUIC_FRAME_RST_STREAM)
4324    {
4325        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
4326        LSQ_DEBUG("RESET that we sent has been acked by peer");
4327        stream->stream_flags |= STREAM_RST_ACKED;
4328    }
4329    if (0 == stream->n_unacked)
4330    {
4331        maybe_schedule_call_on_close(stream);
4332        maybe_finish_stream(stream);
4333    }
4334}
4335
4336
4337void
4338lsquic_stream_push_req (lsquic_stream_t *stream,
4339                        struct uncompressed_headers *push_req)
4340{
4341    assert(!stream->push_req);
4342    stream->push_req = push_req;
4343    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
4344}
4345
4346
4347int
4348lsquic_stream_is_pushed (const lsquic_stream_t *stream)
4349{
4350    enum stream_id_type sit;
4351
4352    switch (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
4353    {
4354    case SMBF_IETF|SMBF_USE_HEADERS:
4355        sit = stream->id & SIT_MASK;
4356        return sit == SIT_UNI_SERVER;
4357    case SMBF_USE_HEADERS:
4358        return 1 & ~stream->id;
4359    default:
4360        return 0;
4361    }
4362}
4363
4364
4365int
4366lsquic_stream_push_info (const lsquic_stream_t *stream,
4367                          lsquic_stream_id_t *ref_stream_id, void **hset)
4368{
4369    if (lsquic_stream_is_pushed(stream))
4370    {
4371        assert(stream->push_req);
4372        *ref_stream_id = stream->push_req->uh_stream_id;
4373        *hset          = stream->push_req->uh_hset;
4374        return 0;
4375    }
4376    else
4377        return -1;
4378}
4379
4380
4381static int
4382stream_uh_in_gquic (struct lsquic_stream *stream,
4383                                            struct uncompressed_headers *uh)
4384{
4385    if ((stream->sm_bflags & SMBF_USE_HEADERS)
4386                                    && !(stream->stream_flags & STREAM_HAVE_UH))
4387    {
4388        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
4389        LSQ_DEBUG("received uncompressed headers");
4390        stream->stream_flags |= STREAM_HAVE_UH;
4391        if (uh->uh_flags & UH_FIN)
4392            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
4393        stream->uh = uh;
4394        if (uh->uh_oth_stream_id == 0)
4395        {
4396            if (uh->uh_weight)
4397                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
4398        }
4399        else
4400            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
4401                                                        uh->uh_oth_stream_id);
4402        return 0;
4403    }
4404    else
4405    {
4406        LSQ_ERROR("received unexpected uncompressed headers");
4407        return -1;
4408    }
4409}
4410
4411
4412static int
4413stream_uh_in_ietf (struct lsquic_stream *stream,
4414                                            struct uncompressed_headers *uh)
4415{
4416    int push_promise;
4417
4418    push_promise = lsquic_stream_header_is_pp(stream);
4419    if (!(stream->stream_flags & STREAM_HAVE_UH) && !push_promise)
4420    {
4421        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
4422        LSQ_DEBUG("received uncompressed headers");
4423        stream->stream_flags |= STREAM_HAVE_UH;
4424        if (uh->uh_flags & UH_FIN)
4425        {
4426            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
4427             * mark request as done:
4428             */
4429            if (stream->sm_bflags & SMBF_IETF)
4430                assert((stream->sm_bflags & SMBF_SERVER)
4431                                            && lsquic_stream_is_pushed(stream));
4432            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
4433        }
4434        stream->uh = uh;
4435        if (uh->uh_oth_stream_id == 0)
4436        {
4437            if (uh->uh_weight)
4438                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
4439        }
4440        else
4441            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
4442                                                        uh->uh_oth_stream_id);
4443    }
4444    else
4445    {
4446        /* Trailer should never make here, as we discard it in qdh */
4447        LSQ_DEBUG("discard %s header set",
4448                                    push_promise ? "push promise" : "trailer");
4449        if (uh->uh_hset)
4450            stream->conn_pub->enpub->enp_hsi_if
4451                            ->hsi_discard_header_set(uh->uh_hset);
4452        free(uh);
4453    }
4454
4455    return 0;
4456}
4457
4458
4459int
4460lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
4461{
4462    if (stream->sm_bflags & SMBF_USE_HEADERS)
4463    {
4464        if (stream->sm_bflags & SMBF_IETF)
4465            return stream_uh_in_ietf(stream, uh);
4466        else
4467            return stream_uh_in_gquic(stream, uh);
4468    }
4469    else
4470        return -1;
4471}
4472
4473
4474unsigned
4475lsquic_stream_priority (const lsquic_stream_t *stream)
4476{
4477    if (stream->sm_bflags & SMBF_HTTP_PRIO)
4478        return stream->sm_priority;
4479    else
4480        return 256 - stream->sm_priority;
4481}
4482
4483
4484int
4485lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
4486{
4487    /* The user should never get a reference to the special streams,
4488     * but let's check just in case:
4489     */
4490    if (lsquic_stream_is_critical(stream))
4491        return -1;
4492
4493    if (stream->sm_bflags & SMBF_HTTP_PRIO)
4494    {
4495        if (priority > LSQUIC_MAX_HTTP_URGENCY)
4496            return -1;
4497        stream->sm_priority = priority;
4498    }
4499    else
4500    {
4501        if (priority < 1 || priority > 256)
4502            return -1;
4503        stream->sm_priority = 256 - priority;
4504    }
4505
4506    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
4507    LSQ_DEBUG("set priority to %u", priority);
4508    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
4509    return 0;
4510}
4511
4512
4513static int
4514maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
4515{
4516    if ((stream->sm_bflags & SMBF_USE_HEADERS)
4517                            && (stream->stream_flags & STREAM_HEADERS_SENT))
4518    {
4519        /* We need to send headers only if we are a) using HEADERS stream
4520         * and b) we already sent initial headers.  If initial headers
4521         * have not been sent yet, stream priority will be sent in the
4522         * HEADERS frame.
4523         */
4524        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
4525                                                stream->id, 0, 0, priority);
4526    }
4527    else
4528        return 0;
4529}
4530
4531
4532static int
4533send_priority_ietf (struct lsquic_stream *stream)
4534{
4535    struct lsquic_ext_http_prio ehp;
4536
4537    if (0 == lsquic_stream_get_http_prio(stream, &ehp)
4538            && 0 == lsquic_hcso_write_priority_update(
4539                            stream->conn_pub->u.ietf.hcso,
4540                            HQFT_PRIORITY_UPDATE_STREAM, stream->id, &ehp))
4541        return 0;
4542    else
4543        return -1;
4544}
4545
4546
4547int
4548lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
4549{
4550    if (0 == lsquic_stream_set_priority_internal(stream, priority))
4551    {
4552        if (stream->sm_bflags & SMBF_IETF)
4553        {
4554            if (stream->sm_bflags & SMBF_HTTP_PRIO)
4555                return send_priority_ietf(stream);
4556            else
4557                return 0;
4558        }
4559        else
4560            return maybe_send_priority_gquic(stream, priority);
4561    }
4562    else
4563        return -1;
4564}
4565
4566
4567lsquic_stream_ctx_t *
4568lsquic_stream_get_ctx (const lsquic_stream_t *stream)
4569{
4570    fiu_return_on("stream/get_ctx", NULL);
4571    return stream->st_ctx;
4572}
4573
4574
4575int
4576lsquic_stream_refuse_push (lsquic_stream_t *stream)
4577{
4578    if (lsquic_stream_is_pushed(stream)
4579            && !(stream->sm_qflags & SMQF_SEND_RST)
4580            && !(stream->stream_flags & STREAM_RST_SENT))
4581    {
4582        LSQ_DEBUG("refusing pushed stream: send reset");
4583        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
4584        return 0;
4585    }
4586    else
4587        return -1;
4588}
4589
4590
4591size_t
4592lsquic_stream_mem_used (const struct lsquic_stream *stream)
4593{
4594    size_t size;
4595
4596    size = sizeof(stream);
4597    if (stream->sm_buf)
4598        size += stream->sm_n_allocated;
4599    if (stream->data_in)
4600        size += stream->data_in->di_if->di_mem_used(stream->data_in);
4601
4602    return size;
4603}
4604
4605
4606const lsquic_cid_t *
4607lsquic_stream_cid (const struct lsquic_stream *stream)
4608{
4609    return LSQUIC_LOG_CONN_ID;
4610}
4611
4612
4613void
4614lsquic_stream_dump_state (const struct lsquic_stream *stream)
4615{
4616    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
4617                                                    stream->read_offset);
4618    stream->data_in->di_if->di_dump_state(stream->data_in);
4619}
4620
4621
4622void *
4623lsquic_stream_get_hset (struct lsquic_stream *stream)
4624{
4625    void *hset;
4626
4627    if (stream_is_read_reset(stream))
4628    {
4629        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
4630        errno = ECONNRESET;
4631        return NULL;
4632    }
4633
4634    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
4635                                && (stream->stream_flags & STREAM_HAVE_UH)))
4636    {
4637        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
4638                                                        stream->stream_flags);
4639        return NULL;
4640    }
4641
4642    if (!stream->uh)
4643    {
4644        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
4645        return NULL;
4646    }
4647
4648    hset = stream->uh->uh_hset;
4649    stream->uh->uh_hset = NULL;
4650    destroy_uh(stream);
4651    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
4652    {
4653        stream->stream_flags |= STREAM_FIN_REACHED;
4654        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
4655    }
4656    maybe_update_last_progress(stream);
4657    LSQ_DEBUG("return header set");
4658    return hset;
4659}
4660
4661
4662void
4663lsquic_stream_set_stream_if (struct lsquic_stream *stream,
4664           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
4665{
4666    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
4667    stream->stream_if    = stream_if;
4668    stream->sm_onnew_arg = stream_if_ctx;
4669    LSQ_DEBUG("switched interface");
4670    assert(stream->stream_flags & STREAM_ONNEW_DONE);
4671    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
4672                                                      stream);
4673}
4674
4675
4676static int
4677update_type_hist_and_check (const struct lsquic_stream *stream,
4678                                                    struct hq_filter *filter)
4679{
4680    /* 3-bit codes: */
4681    enum {
4682        CODE_UNSET,
4683        CODE_HEADER,    /* H    Header  */
4684        CODE_DATA,      /* D    Data    */
4685        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
4686    };
4687    static const unsigned valid_seqs[] = {
4688        /* Ordered by expected frequency */
4689        0123,   /* HD+  */
4690        012,    /* HD   */
4691        01,     /* H    */
4692        013,    /* H+   */ /* Really HH, but we don't record it like this */
4693        01231,  /* HD+H */
4694        0121,   /* HDH  */
4695    };
4696    unsigned code, i;
4697
4698    switch (filter->hqfi_type)
4699    {
4700    case HQFT_HEADERS:
4701        code = CODE_HEADER;
4702        break;
4703    case HQFT_DATA:
4704        code = CODE_DATA;
4705        break;
4706    case HQFT_PUSH_PROMISE:
4707        /* [draft-ietf-quic-http-24], Section 7 */
4708        if ((stream->id & SIT_MASK) == SIT_BIDI_CLIENT
4709                                    && !(stream->sm_bflags & SMBF_SERVER))
4710            return 0;
4711        else
4712            return -1;
4713    case HQFT_CANCEL_PUSH:
4714    case HQFT_SETTINGS:
4715    case HQFT_GOAWAY:
4716    case HQFT_MAX_PUSH_ID:
4717        /* [draft-ietf-quic-http-24], Section 7 */
4718        return -1;
4719    case 2: /* HTTP/2 PRIORITY */
4720    case 6: /* HTTP/2 PING */
4721    case 8: /* HTTP/2 WINDOW_UPDATE */
4722    case 9: /* HTTP/2 CONTINUATION */
4723        /* [draft-ietf-quic-http-30], Section 7.2.8 */
4724        return -1;
4725    case HQFT_PRIORITY_UPDATE_STREAM:
4726    case HQFT_PRIORITY_UPDATE_PUSH:
4727        if (stream->sm_bflags & SMBF_HTTP_PRIO)
4728            /* If we know about Extensible HTTP Priorities, we should check
4729             * that they do not arrive on any but the control stream:
4730             */
4731            return -1;
4732        else
4733            /* On the other hand, if we do not support Priorities, treat it
4734             * as an unknown frame:
4735             */
4736            return 0;
4737    default:
4738        /* Ignore unknown frames */
4739        return 0;
4740    }
4741
4742    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
4743        return -1;
4744
4745    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
4746    {
4747        filter->hqfi_hist_buf <<= 3;
4748        filter->hqfi_hist_buf |= CODE_PLUS;
4749        filter->hqfi_hist_idx++;
4750    }
4751    else if (filter->hqfi_hist_idx > 1
4752            && ((filter->hqfi_hist_buf >> 3) & 7) == code
4753            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
4754        /* Keep it at plus, do nothing */;
4755    else
4756    {
4757        filter->hqfi_hist_buf <<= 3;
4758        filter->hqfi_hist_buf |= code;
4759        filter->hqfi_hist_idx++;
4760    }
4761
4762    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
4763        if (filter->hqfi_hist_buf == valid_seqs[i])
4764            return 0;
4765
4766    return -1;
4767}
4768
4769
4770int
4771lsquic_stream_header_is_pp (const struct lsquic_stream *stream)
4772{
4773    return stream->sm_hq_filter.hqfi_type == HQFT_PUSH_PROMISE;
4774}
4775
4776
4777int
4778lsquic_stream_header_is_trailer (const struct lsquic_stream *stream)
4779{
4780    return (stream->stream_flags & STREAM_HAVE_UH)
4781        && stream->sm_hq_filter.hqfi_type == HQFT_HEADERS;
4782}
4783
4784
4785static void
4786verify_cl_on_new_data_frame (struct lsquic_stream *stream,
4787                                                    struct hq_filter *filter)
4788{
4789    struct lsquic_conn *lconn;
4790
4791    stream->sm_data_in += filter->hqfi_left;
4792    if (stream->sm_data_in > stream->sm_cont_len)
4793    {
4794        lconn = stream->conn_pub->lconn;
4795        lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR,
4796            "number of bytes in DATA frames of stream %"PRIu64" exceeds "
4797            "content-length limit of %llu", stream->id, stream->sm_cont_len);
4798    }
4799}
4800
4801
4802static size_t
4803hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
4804{
4805    struct lsquic_stream *const stream = ctx;
4806    struct hq_filter *const filter = &stream->sm_hq_filter;
4807    const unsigned char *p = buf, *prev;
4808    const unsigned char *const end = buf + sz;
4809    struct lsquic_conn *lconn;
4810    enum lsqpack_read_header_status rhs;
4811    int s;
4812
4813    while (p < end)
4814    {
4815        switch (filter->hqfi_state)
4816        {
4817        case HQFI_STATE_FRAME_HEADER_BEGIN:
4818            filter->hqfi_vint2_state.vr2s_state = 0;
4819            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
4820            /* fall-through */
4821        case HQFI_STATE_FRAME_HEADER_CONTINUE:
4822            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint2_state);
4823            if (s < 0)
4824                break;
4825            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
4826            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
4827            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
4828                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
4829                filter->hqfi_left);
4830            if (0 != update_type_hist_and_check(stream, filter))
4831            {
4832                lconn = stream->conn_pub->lconn;
4833                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4834                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
4835                    filter->hqfi_hist_buf);
4836                lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED,
4837                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
4838                    stream->id);
4839                goto end;
4840            }
4841            if (filter->hqfi_left > 0)
4842            {
4843                if (filter->hqfi_type == HQFT_DATA)
4844                {
4845                    if (stream->sm_bflags & SMBF_VERIFY_CL)
4846                        verify_cl_on_new_data_frame(stream, filter);
4847                    goto end;
4848                }
4849                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
4850                {
4851                    if (stream->sm_bflags & SMBF_SERVER)
4852                    {
4853                        lconn = stream->conn_pub->lconn;
4854                        lconn->cn_if->ci_abort_error(lconn, 1,
4855                            HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame "
4856                            "on stream %"PRIu64" (clients are not supposed to "
4857                            "send those)", stream->id);
4858                        goto end;
4859                    }
4860                    else
4861                        filter->hqfi_state = HQFI_STATE_PUSH_ID_BEGIN;
4862                }
4863            }
4864            else
4865            {
4866                switch (filter->hqfi_type)
4867                {
4868                case HQFT_CANCEL_PUSH:
4869                case HQFT_GOAWAY:
4870                case HQFT_HEADERS:
4871                case HQFT_MAX_PUSH_ID:
4872                case HQFT_PUSH_PROMISE:
4873                case HQFT_SETTINGS:
4874                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4875                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
4876                                                            filter->hqfi_type);
4877                    abort_connection(stream);   /* XXX Overkill? */
4878                    goto end;
4879                default:
4880                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4881                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4882                    break;
4883                }
4884            }
4885            break;
4886        case HQFI_STATE_PUSH_ID_BEGIN:
4887            filter->hqfi_vint1_state.pos = 0;
4888            filter->hqfi_state = HQFI_STATE_PUSH_ID_CONTINUE;
4889            /* Fall-through */
4890        case HQFI_STATE_PUSH_ID_CONTINUE:
4891            prev = p;
4892            s = lsquic_varint_read_nb(&p, end, &filter->hqfi_vint1_state);
4893            filter->hqfi_left -= p - prev;
4894            if (s == 0)
4895                filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
4896                /* A bit of a white lie here */
4897            break;
4898        case HQFI_STATE_READING_PAYLOAD:
4899            if (filter->hqfi_type == HQFT_DATA)
4900                goto end;
4901            sz = filter->hqfi_left;
4902            if (sz > (uintptr_t) (end - p))
4903                sz = (uintptr_t) (end - p);
4904            switch (filter->hqfi_type)
4905            {
4906            case HQFT_HEADERS:
4907            case HQFT_PUSH_PROMISE:
4908                prev = p;
4909                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
4910                {
4911                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4912                    rhs = lsquic_qdh_header_in_begin(
4913                                stream->conn_pub->u.ietf.qdh,
4914                                stream, filter->hqfi_left, &p, sz);
4915                }
4916                else
4917                    rhs = lsquic_qdh_header_in_continue(
4918                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
4919                assert(p > prev || LQRHS_ERROR == rhs);
4920                filter->hqfi_left -= p - prev;
4921                if (filter->hqfi_left == 0)
4922                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4923                switch (rhs)
4924                {
4925                case LQRHS_DONE:
4926                    assert(filter->hqfi_left == 0);
4927                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4928                    break;
4929                case LQRHS_NEED:
4930                    stream->sm_qflags |= SMQF_QPACK_DEC;
4931                    break;
4932                case LQRHS_BLOCKED:
4933                    stream->sm_qflags |= SMQF_QPACK_DEC;
4934                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
4935                    goto end;
4936                default:
4937                    assert(LQRHS_ERROR == rhs);
4938                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4939                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4940                    LSQ_INFO("error processing header block");
4941                    abort_connection(stream);   /* XXX Overkill? */
4942                    goto end;
4943                }
4944                break;
4945            default:
4946                /* Simply skip unknown frame type payload for now */
4947                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4948                p += sz;
4949                filter->hqfi_left -= sz;
4950                if (filter->hqfi_left == 0)
4951                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4952                break;
4953            }
4954            break;
4955        default:
4956            assert(0);
4957            goto end;
4958        }
4959    }
4960
4961  end:
4962    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4963    {
4964        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4965                                                        filter->hqfi_state);
4966        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4967/* From [draft-ietf-quic-http-28] Section 7.1:
4968 " When a stream terminates cleanly, if the last frame on the stream was
4969 " truncated, this MUST be treated as a connection error (Section 8) of
4970 " type H3_FRAME_ERROR.  Streams which terminate abruptly may be reset
4971 " at any point in a frame.
4972 */
4973        lconn = stream->conn_pub->lconn;
4974        lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_ERROR,
4975            "last HTTP/3 frame on stream %"PRIu64" was truncated", stream->id);
4976    }
4977
4978    return p - buf;
4979}
4980
4981
4982static int
4983hq_filter_readable_now (const struct lsquic_stream *stream)
4984{
4985    const struct hq_filter *const filter = &stream->sm_hq_filter;
4986
4987    return (filter->hqfi_type == HQFT_DATA
4988                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4989        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4990        || stream->uh
4991        || (stream->stream_flags & STREAM_FIN_REACHED)
4992    ;
4993}
4994
4995
4996static int
4997hq_filter_readable (struct lsquic_stream *stream)
4998{
4999    struct hq_filter *const filter = &stream->sm_hq_filter;
5000    ssize_t nread;
5001
5002    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
5003        return 0;
5004
5005    if (!hq_filter_readable_now(stream))
5006    {
5007        nread = read_data_frames(stream, 0, hq_read, stream);
5008        if (nread <= 0)
5009        {
5010            if (nread < 0)
5011            {
5012                filter->hqfi_flags |= HQFI_FLAG_ERROR;
5013                abort_connection(stream);   /* XXX Overkill? */
5014                return 1;   /* Collect error */
5015            }
5016            return 0;
5017        }
5018    }
5019
5020    return hq_filter_readable_now(stream);
5021}
5022
5023
5024static size_t
5025hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
5026{
5027    struct hq_filter *const filter = &stream->sm_hq_filter;
5028    size_t nr;
5029
5030    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
5031                                            && filter->hqfi_type == HQFT_DATA))
5032    {
5033        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
5034                            data_frame->df_size - data_frame->df_read_off,
5035                            data_frame->df_fin);
5036        if (nr)
5037        {
5038            stream->read_offset += nr;
5039            stream_consumed_bytes(stream);
5040        }
5041    }
5042    else
5043        nr = 0;
5044
5045    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
5046    {
5047        data_frame->df_read_off += nr;
5048        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
5049                                        && filter->hqfi_type == HQFT_DATA)
5050            return MIN(filter->hqfi_left,
5051                    (unsigned) data_frame->df_size - data_frame->df_read_off);
5052        else
5053        {
5054            assert(data_frame->df_read_off == data_frame->df_size);
5055            return 0;
5056        }
5057    }
5058    else
5059    {
5060        data_frame->df_read_off = data_frame->df_size;
5061        return 0;
5062    }
5063}
5064
5065
5066static void
5067hq_decr_left (struct lsquic_stream *stream, size_t read)
5068{
5069    struct hq_filter *const filter = &stream->sm_hq_filter;
5070
5071    if (read)
5072    {
5073        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
5074                                            && filter->hqfi_type == HQFT_DATA);
5075        assert(read <= filter->hqfi_left);
5076    }
5077
5078    filter->hqfi_left -= read;
5079    if (0 == filter->hqfi_left)
5080        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
5081}
5082
5083
5084/* These are IETF QUIC states */
5085enum stream_state_sending
5086lsquic_stream_sending_state (const struct lsquic_stream *stream)
5087{
5088    if (0 == (stream->stream_flags & STREAM_RST_SENT))
5089    {
5090        if (stream->stream_flags & STREAM_FIN_SENT)
5091        {
5092            if (stream->n_unacked)
5093                return SSS_DATA_SENT;
5094            else
5095                return SSS_DATA_RECVD;
5096        }
5097        else
5098        {
5099            if (stream->tosend_off
5100                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
5101                return SSS_SEND;
5102            else
5103                return SSS_READY;
5104        }
5105    }
5106    else if (stream->stream_flags & STREAM_RST_ACKED)
5107        return SSS_RESET_RECVD;
5108    else
5109        return SSS_RESET_SENT;
5110}
5111
5112
5113const char *const lsquic_sss2str[] =
5114{
5115    [SSS_READY]        =  "Ready",
5116    [SSS_SEND]         =  "Send",
5117    [SSS_DATA_SENT]    =  "Data Sent",
5118    [SSS_RESET_SENT]   =  "Reset Sent",
5119    [SSS_DATA_RECVD]   =  "Data Recvd",
5120    [SSS_RESET_RECVD]  =  "Reset Recvd",
5121};
5122
5123
5124const char *const lsquic_ssr2str[] =
5125{
5126    [SSR_RECV]         =  "Recv",
5127    [SSR_SIZE_KNOWN]   =  "Size Known",
5128    [SSR_DATA_RECVD]   =  "Data Recvd",
5129    [SSR_RESET_RECVD]  =  "Reset Recvd",
5130    [SSR_DATA_READ]    =  "Data Read",
5131    [SSR_RESET_READ]   =  "Reset Read",
5132};
5133
5134
5135/* These are IETF QUIC states */
5136enum stream_state_receiving
5137lsquic_stream_receiving_state (struct lsquic_stream *stream)
5138{
5139    uint64_t n_bytes;
5140
5141    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
5142    {
5143        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
5144            return SSR_RECV;
5145        if (stream->stream_flags & STREAM_FIN_REACHED)
5146            return SSR_DATA_READ;
5147        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
5148        {
5149            n_bytes = stream->data_in->di_if->di_readable_bytes(
5150                                    stream->data_in, stream->read_offset);
5151            if (stream->read_offset + n_bytes == stream->sm_fin_off)
5152            {
5153                stream->stream_flags |= STREAM_DATA_RECVD;
5154                return SSR_DATA_RECVD;
5155            }
5156            else
5157                return SSR_SIZE_KNOWN;
5158        }
5159        else
5160            return SSR_DATA_RECVD;
5161    }
5162    else if (stream->stream_flags & STREAM_RST_READ)
5163        return SSR_RESET_READ;
5164    else
5165        return SSR_RESET_RECVD;
5166}
5167
5168
5169void
5170lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
5171{
5172    struct hq_filter *const filter = &stream->sm_hq_filter;
5173
5174    assert(stream->sm_qflags & SMQF_QPACK_DEC);
5175    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
5176
5177    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
5178    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
5179    LSQ_DEBUG("QPACK decoder unblocked");
5180}
5181
5182
5183int
5184lsquic_stream_is_rejected (const struct lsquic_stream *stream)
5185{
5186    return stream->stream_flags & STREAM_SS_RECVD;
5187}
5188
5189
5190int
5191lsquic_stream_can_push (const struct lsquic_stream *stream)
5192{
5193    if (lsquic_stream_is_pushed(stream))
5194        return 0;
5195    else if (stream->sm_bflags & SMBF_IETF)
5196        return (stream->sm_bflags & SMBF_USE_HEADERS)
5197            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
5198            && stream->sm_send_headers_state == SSHS_BEGIN
5199            ;
5200    else
5201        return 1;
5202}
5203
5204
5205static size_t
5206pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
5207{
5208    struct push_promise *const promise = lsqr_ctx;
5209    unsigned char *dst = buf;
5210    unsigned char *const end = dst + count;
5211    size_t len;
5212
5213    while (dst < end)
5214    {
5215        switch (promise->pp_write_state)
5216        {
5217        case PPWS_ID0:
5218        case PPWS_ID1:
5219        case PPWS_ID2:
5220        case PPWS_ID3:
5221        case PPWS_ID4:
5222        case PPWS_ID5:
5223        case PPWS_ID6:
5224        case PPWS_ID7:
5225            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
5226            ++promise->pp_write_state;
5227            break;
5228        case PPWS_PFX0:
5229            *dst++ = 0;
5230            ++promise->pp_write_state;
5231            break;
5232        case PPWS_PFX1:
5233            *dst++ = 0;
5234            ++promise->pp_write_state;
5235            break;
5236        case PPWS_HBLOCK:
5237            len = MIN(promise->pp_content_len - promise->pp_write_off,
5238                        (size_t) (end - dst));
5239            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
5240                                                                        len);
5241            promise->pp_write_off += len;
5242            dst += len;
5243            if (promise->pp_content_len == promise->pp_write_off)
5244            {
5245                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
5246                    ": reset push state", promise->pp_id);
5247                promise->pp_write_state = PPWS_DONE;
5248            }
5249            goto end;
5250        default:
5251            goto end;
5252        }
5253    }
5254
5255  end:
5256    return dst - (unsigned char *) buf;
5257}
5258
5259
5260static size_t
5261pp_reader_size (void *lsqr_ctx)
5262{
5263    struct push_promise *const promise = lsqr_ctx;
5264    size_t size;
5265
5266    size = 0;
5267    switch (promise->pp_write_state)
5268    {
5269    case PPWS_ID0:
5270    case PPWS_ID1:
5271    case PPWS_ID2:
5272    case PPWS_ID3:
5273    case PPWS_ID4:
5274    case PPWS_ID5:
5275    case PPWS_ID6:
5276    case PPWS_ID7:
5277        size += 8 - promise->pp_write_state;
5278        /* fall-through */
5279    case PPWS_PFX0:
5280        ++size;
5281        /* fall-through */
5282    case PPWS_PFX1:
5283        ++size;
5284        /* fall-through */
5285    case PPWS_HBLOCK:
5286        size += promise->pp_content_len - promise->pp_write_off;
5287        break;
5288    default:
5289        break;
5290    }
5291
5292    return size;
5293}
5294
5295
5296static void
5297init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
5298{
5299    reader->lsqr_read = pp_reader_read;
5300    reader->lsqr_size = pp_reader_size;
5301    reader->lsqr_ctx = promise;
5302}
5303
5304
5305static void
5306on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
5307{
5308    struct lsquic_reader pp_reader;
5309    struct push_promise *promise;
5310    ssize_t nw;
5311    int want_write;
5312
5313    assert(stream_is_pushing_promise(stream));
5314
5315    promise = SLIST_FIRST(&stream->sm_promises);
5316    init_pp_reader(promise, &pp_reader);
5317    nw = stream_write(stream, &pp_reader, SWO_BUFFER);
5318    if (nw > 0)
5319    {
5320        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
5321            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
5322        if (promise->pp_write_state == PPWS_DONE)
5323        {
5324            /* Restore want_write flag */
5325            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
5326            if (want_write != stream->sm_saved_want_write)
5327                (void) lsquic_stream_wantwrite(stream,
5328                                                stream->sm_saved_want_write);
5329        }
5330    }
5331    else if (nw < 0)
5332    {
5333        LSQ_WARN("could not write push promise (wrapper)");
5334        /* XXX What should happen if we hit an error? TODO */
5335    }
5336}
5337
5338
5339/* Success means that the push promise has been placed on sm_promises list and
5340 * the stream now owns it.  Failure means that the push promise should be
5341 * destroyed by the caller.
5342 *
5343 * A push promise is written immediately.  If it cannot be written to packets
5344 * or buffered whole, the stream is marked as unable to push further promises.
5345 */
5346int
5347lsquic_stream_push_promise (struct lsquic_stream *stream,
5348                                                struct push_promise *promise)
5349{
5350    struct lsquic_reader pp_reader;
5351    unsigned bits, len;
5352    ssize_t nw;
5353
5354    assert(stream->sm_bflags & SMBF_IETF);
5355    assert(lsquic_stream_can_push(stream));
5356
5357    bits = vint_val2bits(promise->pp_id);
5358    len = 1 << bits;
5359    promise->pp_write_state = 8 - len;
5360    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
5361                                                            bits, 1 << bits);
5362
5363    if (!stream_activate_hq_frame(stream,
5364                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
5365                SHF_FIXED_SIZE, pp_reader_size(promise)))
5366        return -1;
5367
5368    stream->stream_flags |= STREAM_PUSHING;
5369
5370    init_pp_reader(promise, &pp_reader);
5371    nw = stream_write(stream, &pp_reader, SWO_BUFFER);
5372    if (nw > 0)
5373    {
5374        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
5375        ++promise->pp_refcnt;
5376        if (promise->pp_write_state == PPWS_DONE)
5377            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
5378        else
5379        {
5380            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
5381                ", disable further pushing", promise->pp_id,
5382                promise->pp_write_state, promise->pp_write_off);
5383            stream->stream_flags |= STREAM_NOPUSH;
5384            stream->sm_saved_want_write =
5385                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
5386            stream_wantwrite(stream, 1);
5387        }
5388        return 0;
5389    }
5390    else
5391    {
5392        if (nw < 0)
5393            LSQ_WARN("failure writing push promise");
5394        stream->stream_flags |= STREAM_NOPUSH;
5395        stream->stream_flags &= ~STREAM_PUSHING;
5396        return -1;
5397    }
5398}
5399
5400
5401int
5402lsquic_stream_verify_len (struct lsquic_stream *stream,
5403                                                unsigned long long cont_len)
5404{
5405    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
5406                                            == (SMBF_IETF|SMBF_USE_HEADERS))
5407    {
5408        stream->sm_cont_len = cont_len;
5409        stream->sm_bflags |= SMBF_VERIFY_CL;
5410        LSQ_DEBUG("will verify that incoming DATA frames have %llu bytes",
5411            cont_len);
5412        return 0;
5413    }
5414    else
5415        return -1;
5416}
5417
5418
5419int
5420lsquic_stream_get_http_prio (struct lsquic_stream *stream,
5421                                        struct lsquic_ext_http_prio *ehp)
5422{
5423    if (stream->sm_bflags & SMBF_HTTP_PRIO)
5424    {
5425        ehp->urgency = MIN(stream->sm_priority, LSQUIC_MAX_HTTP_URGENCY);
5426        ehp->incremental = !!(stream->sm_bflags & SMBF_INCREMENTAL);
5427        return 0;
5428    }
5429    else
5430        return -1;
5431}
5432
5433
5434int
5435lsquic_stream_set_http_prio (struct lsquic_stream *stream,
5436                                        const struct lsquic_ext_http_prio *ehp)
5437{
5438    if (stream->sm_bflags & SMBF_HTTP_PRIO)
5439    {
5440        if (ehp->urgency > LSQUIC_MAX_HTTP_URGENCY)
5441        {
5442            LSQ_INFO("%s: invalid urgency: %hhu", __func__, ehp->urgency);
5443            return -1;
5444        }
5445        stream->sm_priority = ehp->urgency;
5446        if (ehp->incremental)
5447            stream->sm_bflags |= SMBF_INCREMENTAL;
5448        else
5449            stream->sm_bflags &= ~SMBF_INCREMENTAL;
5450        stream->sm_bflags |= SMBF_HPRIO_SET;
5451        LSQ_DEBUG("set urgency to %hhu, incremental to %hhd", ehp->urgency,
5452                                                            ehp->incremental);
5453        if (!(stream->sm_bflags & SMBF_SERVER))
5454            return send_priority_ietf(stream);
5455        else
5456            return 0;
5457    }
5458    else
5459        return -1;
5460}
5461
5462
5463int
5464lsquic_stream_has_unacked_data (struct lsquic_stream *stream)
5465{
5466    return stream->n_unacked > 0 || stream->sm_n_buffered > 0;
5467}
5468