lsquic_stream.c revision ad08470c
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_varint.h"
39#include "lsquic_hq.h"
40#include "lsquic_hash.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn_public.h"
43#include "lsquic_util.h"
44#include "lsquic_mm.h"
45#include "lsquic_headers_stream.h"
46#include "lsquic_conn.h"
47#include "lsquic_data_in_if.h"
48#include "lsquic_parse.h"
49#include "lsquic_packet_out.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_senhist.h"
52#include "lsquic_pacer.h"
53#include "lsquic_cubic.h"
54#include "lsquic_bw_sampler.h"
55#include "lsquic_minmax.h"
56#include "lsquic_bbr.h"
57#include "lsquic_send_ctl.h"
58#include "lsquic_headers.h"
59#include "lsquic_ev_log.h"
60#include "lsquic_enc_sess.h"
61#include "lsqpack.h"
62#include "lsquic_frab_list.h"
63#include "lsquic_http1x_if.h"
64#include "lsquic_qdec_hdl.h"
65#include "lsquic_qenc_hdl.h"
66#include "lsquic_byteswap.h"
67#include "lsquic_ietf.h"
68#include "lsquic_push_promise.h"
69
70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
72#define LSQUIC_LOG_STREAM_ID stream->id
73#include "lsquic_logger.h"
74
75#define MIN(a, b) ((a) < (b) ? (a) : (b))
76
77static void
78drop_frames_in (lsquic_stream_t *stream);
79
80static void
81maybe_schedule_call_on_close (lsquic_stream_t *stream);
82
83static int
84stream_wantread (lsquic_stream_t *stream, int is_want);
85
86static int
87stream_wantwrite (lsquic_stream_t *stream, int is_want);
88
89static ssize_t
90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
91
92static ssize_t
93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
94
95static int
96stream_flush (lsquic_stream_t *stream);
97
98static int
99stream_flush_nocheck (lsquic_stream_t *stream);
100
101static void
102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
103
104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
105
106static enum swtp_status
107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
108
109static enum swtp_status
110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
111
112static enum swtp_status
113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
114
115static size_t
116stream_write_avail_no_frames (struct lsquic_stream *);
117
118static size_t
119stream_write_avail_with_frames (struct lsquic_stream *);
120
121static size_t
122stream_write_avail_with_headers (struct lsquic_stream *);
123
124static int
125hq_filter_readable (struct lsquic_stream *stream);
126
127static void
128hq_decr_left (struct lsquic_stream *stream, size_t);
129
130static size_t
131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
132
133static int
134stream_readable_non_http (struct lsquic_stream *stream);
135
136static int
137stream_readable_http_gquic (struct lsquic_stream *stream);
138
139static int
140stream_readable_http_ietf (struct lsquic_stream *stream);
141
142static ssize_t
143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
144
145static size_t
146active_hq_frame_sizes (const struct lsquic_stream *);
147
148static void
149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
150
151static void
152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
153
154static void
155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
156
157static size_t
158stream_hq_frame_size (const struct stream_hq_frame *);
159
160const struct stream_filter_if hq_stream_filter_if =
161{
162    .sfi_readable   = hq_filter_readable,
163    .sfi_filter_df  = hq_filter_df,
164    .sfi_decr_left  = hq_decr_left,
165};
166
167
168#if LSQUIC_KEEP_STREAM_HISTORY
169/* These values are printable ASCII characters for ease of printing the
170 * whole history in a single line of a log message.
171 *
172 * The list of events is not exhaustive: only most interesting events
173 * are recorded.
174 */
175enum stream_history_event
176{
177    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
178    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
179    SHE_REACH_FIN          =  'a',
180    SHE_BLOCKED_OUT        =  'b',
181    SHE_CREATED            =  'C',
182    SHE_FRAME_IN           =  'd',
183    SHE_FRAME_OUT          =  'D',
184    SHE_RESET              =  'e',
185    SHE_WINDOW_UPDATE      =  'E',
186    SHE_FIN_IN             =  'f',
187    SHE_FINISHED           =  'F',
188    SHE_GOAWAY_IN          =  'g',
189    SHE_USER_WRITE_HEADER  =  'h',
190    SHE_HEADERS_IN         =  'H',
191    SHE_IF_SWITCH          =  'i',
192    SHE_ONCLOSE_SCHED      =  'l',
193    SHE_ONCLOSE_CALL       =  'L',
194    SHE_ONNEW              =  'N',
195    SHE_SET_PRIO           =  'p',
196    SHE_USER_READ          =  'r',
197    SHE_SHUTDOWN_READ      =  'R',
198    SHE_RST_IN             =  's',
199    SHE_SS_IN              =  'S',
200    SHE_RST_OUT            =  't',
201    SHE_RST_ACKED          =  'T',
202    SHE_FLUSH              =  'u',
203    SHE_USER_WRITE_DATA    =  'w',
204    SHE_SHUTDOWN_WRITE     =  'W',
205    SHE_CLOSE              =  'X',
206    SHE_DELAY_SW           =  'y',
207    SHE_FORCE_FINISH       =  'Z',
208    SHE_WANTREAD_NO        =  '0',  /* "YES" must be one more than "NO" */
209    SHE_WANTREAD_YES       =  '1',
210    SHE_WANTWRITE_NO       =  '2',
211    SHE_WANTWRITE_YES      =  '3',
212};
213
214static void
215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
216{
217    enum stream_history_event prev_event;
218    sm_hist_idx_t idx;
219    int plus;
220
221    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
222    plus = SHE_PLUS == stream->sm_hist_buf[idx];
223    idx = (idx - plus) & SM_HIST_IDX_MASK;
224    prev_event = stream->sm_hist_buf[idx];
225
226    if (prev_event == sh_event && plus)
227        return;
228
229    if (prev_event == sh_event)
230        sh_event = SHE_PLUS;
231    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
232
233    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
234        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
235                                                        stream->sm_hist_buf);
236}
237
238
239#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
240#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
241        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
242            LSQ_DEBUG("history: [%.*s]",                                    \
243                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
244                (stream)->sm_hist_buf);                                     \
245    } while (0)
246#else
247#   define SM_HISTORY_APPEND(stream, event)
248#   define SM_HISTORY_DUMP_REMAINING(stream)
249#endif
250
251
252static int
253stream_inside_callback (const lsquic_stream_t *stream)
254{
255    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
256}
257
258
259static void
260maybe_conn_to_tickable (lsquic_stream_t *stream)
261{
262    if (!stream_inside_callback(stream))
263        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
264                                           stream->conn_pub->lconn);
265}
266
267
268/* Here, "readable" means that the user is able to read from the stream. */
269static void
270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
271{
272    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
273    {
274        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
275                                           stream->conn_pub->lconn);
276    }
277}
278
279
280/* Here, "writeable" means that data can be put into packets to be
281 * scheduled to be sent out.
282 *
283 * If `check_can_send' is false, it means that we do not need to check
284 * whether packets can be sent.  This check was already performed when
285 * we packetized stream data.
286 */
287static void
288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
289                                                    int check_can_send)
290{
291    if (!stream_inside_callback(stream) &&
292            (!check_can_send
293             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
294          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
295    {
296        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
297                                           stream->conn_pub->lconn);
298    }
299}
300
301
302static int
303stream_stalled (const lsquic_stream_t *stream)
304{
305    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
306           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
307                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
308}
309
310
311static size_t
312stream_stream_frame_header_sz (const struct lsquic_stream *stream,
313                                                            unsigned data_sz)
314{
315    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
316                                    stream->id, stream->tosend_off, data_sz);
317}
318
319
320static size_t
321stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
322                                                    unsigned data_sz_IGNORED)
323{
324    return stream->conn_pub->lconn->cn_pf
325         ->pf_calc_crypto_frame_header_sz(stream->tosend_off);
326}
327
328
329/* GQUIC-only function */
330static int
331stream_is_hsk (const struct lsquic_stream *stream)
332{
333    if (stream->sm_bflags & SMBF_IETF)
334        return 0;
335    else
336        return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE;
337}
338
339
340static struct lsquic_stream *
341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
342           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
343           enum stream_ctor_flags ctor_flags)
344{
345    struct lsquic_stream *stream;
346
347    stream = calloc(1, sizeof(*stream));
348    if (!stream)
349        return NULL;
350
351    if (ctor_flags & SCF_USE_DI_HASH)
352        stream->data_in = data_in_hash_new(conn_pub, id, 0);
353    else
354        stream->data_in = data_in_nocopy_new(conn_pub, id);
355    if (!stream->data_in)
356    {
357        free(stream);
358        return NULL;
359    }
360
361    stream->id        = id;
362    stream->stream_if = stream_if;
363    stream->conn_pub  = conn_pub;
364    stream->sm_onnew_arg = stream_if_ctx;
365    stream->sm_write_avail = stream_write_avail_no_frames;
366
367    STAILQ_INIT(&stream->sm_hq_frames);
368
369    stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1);
370    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
371        stream->sm_bflags |= SMBF_SERVER;
372
373    return stream;
374}
375
376
377/* TODO: The logic to figure out whether the stream is connection limited
378 * should be taken out of the constructor.  The caller should specify this
379 * via one of enum stream_ctor_flags.
380 */
381lsquic_stream_t *
382lsquic_stream_new (lsquic_stream_id_t id,
383        struct lsquic_conn_public *conn_pub,
384        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
385        unsigned initial_window, uint64_t initial_send_off,
386        enum stream_ctor_flags ctor_flags)
387{
388    lsquic_cfcw_t *cfcw;
389    lsquic_stream_t *stream;
390
391    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
392                                                                ctor_flags);
393    if (!stream)
394        return NULL;
395
396    if (!initial_window)
397        initial_window = 16 * 1024;
398
399    if (ctor_flags & SCF_IETF)
400    {
401        cfcw = &conn_pub->cfcw;
402        stream->sm_bflags |= SMBF_CONN_LIMITED;
403        if (ctor_flags & SCF_HTTP)
404        {
405            stream->sm_write_avail = stream_write_avail_with_headers;
406            stream->sm_readable = stream_readable_http_ietf;
407            stream->sm_sfi = &hq_stream_filter_if;
408        }
409        else
410            stream->sm_readable = stream_readable_non_http;
411        lsquic_stream_set_priority_internal(stream,
412                                            LSQUIC_STREAM_DEFAULT_PRIO);
413        stream->sm_write_to_packet = stream_write_to_packet_std;
414    }
415    else
416    {
417        if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id))
418            cfcw = NULL;
419        else
420        {
421            cfcw = &conn_pub->cfcw;
422            stream->sm_bflags |= SMBF_CONN_LIMITED;
423            lsquic_stream_set_priority_internal(stream,
424                                                LSQUIC_STREAM_DEFAULT_PRIO);
425        }
426        if (stream->sm_bflags & SMBF_USE_HEADERS)
427            stream->sm_readable = stream_readable_http_gquic;
428        else
429            stream->sm_readable = stream_readable_non_http;
430        if (stream_is_hsk(stream))
431            stream->sm_write_to_packet = stream_write_to_packet_hsk;
432        else
433            stream->sm_write_to_packet = stream_write_to_packet_std;
434    }
435
436    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
437    stream->max_send_off = initial_send_off;
438    LSQ_DEBUG("created stream");
439    SM_HISTORY_APPEND(stream, SHE_CREATED);
440    stream->sm_frame_header_sz = stream_stream_frame_header_sz;
441    if (ctor_flags & SCF_CALL_ON_NEW)
442        lsquic_stream_call_on_new(stream);
443    return stream;
444}
445
446
447struct lsquic_stream *
448lsquic_stream_new_crypto (enum enc_level enc_level,
449        struct lsquic_conn_public *conn_pub,
450        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
451        enum stream_ctor_flags ctor_flags)
452{
453    struct lsquic_stream *stream;
454    lsquic_stream_id_t stream_id;
455
456    assert(ctor_flags & SCF_CRITICAL);
457
458    stream_id = ~0ULL - enc_level;
459    stream = stream_new_common(stream_id, conn_pub, stream_if,
460                                                stream_if_ctx, ctor_flags);
461    if (!stream)
462        return NULL;
463
464    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
465    stream->sm_enc_level = enc_level;
466    /* TODO: why have limit in crypto stream?  Set it to UINT64_MAX? */
467    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
468    stream->max_send_off = 16 * 1024;
469    LSQ_DEBUG("created crypto stream");
470    SM_HISTORY_APPEND(stream, SHE_CREATED);
471    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
472    stream->sm_write_to_packet = stream_write_to_packet_crypto;
473    stream->sm_readable = stream_readable_non_http;
474    if (ctor_flags & SCF_CALL_ON_NEW)
475        lsquic_stream_call_on_new(stream);
476    return stream;
477}
478
479
480void
481lsquic_stream_call_on_new (lsquic_stream_t *stream)
482{
483    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
484    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
485    {
486        LSQ_DEBUG("calling on_new_stream");
487        SM_HISTORY_APPEND(stream, SHE_ONNEW);
488        stream->stream_flags |= STREAM_ONNEW_DONE;
489        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
490                                                          stream);
491    }
492}
493
494
495static void
496decr_conn_cap (struct lsquic_stream *stream, size_t incr)
497{
498    if (stream->sm_bflags & SMBF_CONN_LIMITED)
499    {
500        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
501        stream->conn_pub->conn_cap.cc_sent -= incr;
502    }
503}
504
505
506static void
507maybe_resize_stream_buffer (struct lsquic_stream *stream)
508{
509    assert(0 == stream->sm_n_buffered);
510
511    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
512    {
513        free(stream->sm_buf);
514        stream->sm_buf = NULL;
515        stream->sm_n_allocated = 0;
516    }
517    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
518        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
519}
520
521
522static void
523drop_buffered_data (struct lsquic_stream *stream)
524{
525    decr_conn_cap(stream, stream->sm_n_buffered);
526    stream->sm_n_buffered = 0;
527    maybe_resize_stream_buffer(stream);
528    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
529        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
530}
531
532
533static void
534destroy_uh (struct lsquic_stream *stream)
535{
536    if (stream->uh)
537    {
538        if (stream->uh->uh_hset)
539            stream->conn_pub->enpub->enp_hsi_if
540                            ->hsi_discard_header_set(stream->uh->uh_hset);
541        free(stream->uh);
542        stream->uh = NULL;
543    }
544}
545
546
547void
548lsquic_stream_destroy (lsquic_stream_t *stream)
549{
550    struct push_promise *promise;
551    struct stream_hq_frame *shf;
552
553    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
554    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
555                                                            STREAM_ONNEW_DONE)
556    {
557        stream->stream_flags |= STREAM_ONCLOSE_DONE;
558        stream->stream_if->on_close(stream, stream->st_ctx);
559    }
560    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
561        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
562    if (stream->sm_qflags & SMQF_WANT_READ)
563        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
564    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
565        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
566    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
567        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
568    if (stream->sm_qflags & SMQF_QPACK_DEC)
569        lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream);
570    drop_buffered_data(stream);
571    lsquic_sfcw_consume_rem(&stream->fc);
572    drop_frames_in(stream);
573    if (stream->push_req)
574    {
575        if (stream->push_req->uh_hset)
576            stream->conn_pub->enpub->enp_hsi_if
577                            ->hsi_discard_header_set(stream->push_req->uh_hset);
578        free(stream->push_req);
579    }
580    while ((promise = SLIST_FIRST(&stream->sm_promises)))
581    {
582        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
583        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
584    }
585    if (stream->sm_promise)
586    {
587        assert(stream->sm_promise->pp_pushed_stream == stream);
588        stream->sm_promise->pp_pushed_stream = NULL;
589        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
590    }
591    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
592        stream_hq_frame_put(stream, shf);
593    destroy_uh(stream);
594    free(stream->sm_buf);
595    free(stream->sm_header_block);
596    LSQ_DEBUG("destroyed stream");
597    SM_HISTORY_DUMP_REMAINING(stream);
598    free(stream);
599}
600
601
602static int
603stream_is_finished (const lsquic_stream_t *stream)
604{
605    return lsquic_stream_is_closed(stream)
606           /* n_unacked checks that no outgoing packets that reference this
607            * stream are outstanding:
608            */
609        && 0 == stream->n_unacked
610           /* This checks that no packets that reference this stream will
611            * become outstanding:
612            */
613        && 0 == (stream->sm_qflags & SMQF_SEND_RST)
614        && ((stream->stream_flags & STREAM_FORCE_FINISH)
615          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
616}
617
618
619/* This is an internal function */
620void
621lsquic_stream_force_finish (struct lsquic_stream *stream)
622{
623    LSQ_DEBUG("stream is now finished");
624    SM_HISTORY_APPEND(stream, SHE_FINISHED);
625    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
626        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
627                                                next_service_stream);
628    stream->sm_qflags |= SMQF_FREE_STREAM;
629    stream->stream_flags |= STREAM_FINISHED;
630}
631
632
633static void
634maybe_finish_stream (lsquic_stream_t *stream)
635{
636    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
637                                                    stream_is_finished(stream))
638        lsquic_stream_force_finish(stream);
639}
640
641
642static void
643maybe_schedule_call_on_close (lsquic_stream_t *stream)
644{
645    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
646                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
647            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
648            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
649    {
650        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
651            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
652                                                    next_service_stream);
653        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
654        LSQ_DEBUG("scheduled calling on_close");
655        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
656    }
657}
658
659
660void
661lsquic_stream_call_on_close (lsquic_stream_t *stream)
662{
663    assert(stream->stream_flags & STREAM_ONNEW_DONE);
664    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
665    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
666        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
667                                                    next_service_stream);
668    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
669    {
670        LSQ_DEBUG("calling on_close");
671        stream->stream_flags |= STREAM_ONCLOSE_DONE;
672        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
673        stream->stream_if->on_close(stream, stream->st_ctx);
674    }
675    else
676        assert(0);
677}
678
679
680static int
681stream_readable_non_http (struct lsquic_stream *stream)
682{
683    return stream->data_in->di_if->di_get_frame(stream->data_in,
684                                                stream->read_offset) != NULL;
685}
686
687
688static int
689stream_readable_http_gquic (struct lsquic_stream *stream)
690{
691    return (stream->stream_flags & STREAM_HAVE_UH)
692        && (stream->uh
693            ||  stream->data_in->di_if->di_get_frame(stream->data_in,
694                                                    stream->read_offset));
695}
696
697
698static int
699stream_readable_http_ietf (struct lsquic_stream *stream)
700{
701    return
702        /* If we have read the header set and the header set has not yet
703         * been read, the stream is readable.
704         */
705        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
706        ||
707        /* Alternatively, run the filter and check for payload availability. */
708        (stream->sm_sfi->sfi_readable(stream)
709            && (/* Running the filter may result in hitting FIN: */
710                (stream->stream_flags & STREAM_FIN_REACHED)
711                || stream->data_in->di_if->di_get_frame(stream->data_in,
712                                                    stream->read_offset)));
713}
714
715
716int
717lsquic_stream_readable (struct lsquic_stream *stream)
718{
719    /* A stream is readable if one of the following is true: */
720    return
721        /* - It is already finished: in that case, lsquic_stream_read() will
722         *   return 0.
723         */
724            (stream->stream_flags & STREAM_FIN_REACHED)
725        /* - The stream is reset, by either side.  In this case,
726         *   lsquic_stream_read() will return -1 (we want the user to be
727         *   able to collect the error).
728         */
729        ||  lsquic_stream_is_reset(stream)
730        /* Type-dependent readability check: */
731        ||  stream->sm_readable(stream);
732    ;
733}
734
735
736static size_t
737stream_write_avail_no_frames (struct lsquic_stream *stream)
738{
739    uint64_t stream_avail, conn_avail;
740
741    stream_avail = stream->max_send_off - stream->tosend_off
742                                                - stream->sm_n_buffered;
743
744    if (stream->sm_bflags & SMBF_CONN_LIMITED)
745    {
746        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
747        if (conn_avail < stream_avail)
748            stream_avail = conn_avail;
749    }
750
751    return stream_avail;
752}
753
754
755static size_t
756stream_write_avail_with_frames (struct lsquic_stream *stream)
757{
758    uint64_t stream_avail, conn_avail;
759    const struct stream_hq_frame *shf;
760    size_t size;
761
762    stream_avail = stream->max_send_off - stream->tosend_off
763                                                - stream->sm_n_buffered;
764    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
765        if (!(shf->shf_flags & SHF_WRITTEN))
766        {
767            size = stream_hq_frame_size(shf);
768            assert(size <= stream_avail);
769            stream_avail -= size;
770        }
771
772    if (stream->sm_bflags & SMBF_CONN_LIMITED)
773    {
774        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
775        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
776            if (!(shf->shf_flags & SHF_CC_PAID))
777            {
778                size = stream_hq_frame_size(shf);
779                if (size < conn_avail)
780                    conn_avail -= size;
781                else
782                    return 0;
783            }
784        if (conn_avail < stream_avail)
785            stream_avail = conn_avail;
786    }
787
788    if (stream_avail >= 3 /* Smallest new frame */)
789        return stream_avail;
790    else
791        return 0;
792}
793
794
795static int
796stream_is_pushing_promise (const struct lsquic_stream *stream)
797{
798    return (stream->stream_flags & STREAM_PUSHING)
799        && SLIST_FIRST(&stream->sm_promises)
800        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
801        ;
802}
803
804
805/* To prevent deadlocks, ensure that when headers are sent, the bytes
806 * sent on the encoder stream are written first.
807 *
808 * XXX If the encoder is set up in non-risking mode, it is perfectly
809 * fine to send the header block first.  TODO: update the logic to
810 * reflect this.  There should be two sending behaviors: risk and non-risk.
811 * For now, we assume risk for everything to be on the conservative side.
812 */
813static size_t
814stream_write_avail_with_headers (struct lsquic_stream *stream)
815{
816    if (stream->stream_flags & STREAM_PUSHING)
817        return stream_write_avail_with_frames(stream);
818
819    switch (stream->sm_send_headers_state)
820    {
821    case SSHS_BEGIN:
822        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
823    case SSHS_ENC_SENDING:
824        if (stream->sm_hb_compl >
825                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
826            return 0;
827        LSQ_DEBUG("encoder stream bytes have all been sent");
828        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
829        /* fall-through */
830    default:
831        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
832        return stream_write_avail_with_frames(stream);
833    }
834}
835
836
837size_t
838lsquic_stream_write_avail (struct lsquic_stream *stream)
839{
840    return stream->sm_write_avail(stream);
841}
842
843
844int
845lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
846{
847    struct lsquic_conn *lconn;
848
849    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
850                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
851    {
852        if (stream->sm_bflags & SMBF_IETF)
853        {
854            lconn = stream->conn_pub->lconn;
855            lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
856                "flow control violation on stream %"PRIu64, stream->id);
857        }
858        return -1;
859    }
860    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
861    {
862        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
863            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
864                                                    next_send_stream);
865        stream->sm_qflags |= SMQF_SEND_WUF;
866    }
867    return 0;
868}
869
870
871int
872lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
873{
874    uint64_t max_off;
875    int got_next_offset, rv, free_frame;
876    enum ins_frame ins_frame;
877    struct lsquic_conn *lconn;
878
879    assert(frame->packet_in);
880
881    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
882    LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; "
883        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
884
885    if ((stream->sm_bflags & SMBF_USE_HEADERS)
886                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
887    {
888        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
889        lsquic_malo_put(frame);
890        return -1;
891    }
892
893    if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF)
894            && (stream->stream_flags & STREAM_FIN_RECVD)
895            && stream->sm_fin_off != DF_END(frame))
896    {
897        lconn = stream->conn_pub->lconn;
898        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
899            "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does "
900            "not match previous final size %"PRIu64, DF_END(frame),
901            stream->id, stream->sm_fin_off);
902        return -1;
903    }
904
905    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
906  insert_frame:
907    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
908    if (INS_FRAME_OK == ins_frame)
909    {
910        /* Update maximum offset in the flow controller and check for flow
911         * control violation:
912         */
913        rv = -1;
914        free_frame = !stream->data_in->di_if->di_own_on_ok;
915        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
916        if (0 != lsquic_stream_update_sfcw(stream, max_off))
917            goto end_ok;
918        if (frame->data_frame.df_fin)
919        {
920            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
921            stream->stream_flags |= STREAM_FIN_RECVD;
922            stream->sm_fin_off = DF_END(frame);
923            maybe_finish_stream(stream);
924        }
925        if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
926                (stream->data_in->di_flags & DI_SWITCH_IMPL))
927        {
928            stream->data_in = stream->data_in->di_if->di_switch_impl(
929                                        stream->data_in, stream->read_offset);
930            if (!stream->data_in)
931            {
932                stream->data_in = data_in_error_new();
933                goto end_ok;
934            }
935        }
936        if (got_next_offset)
937            /* Checking the offset saves di_get_frame() call */
938            maybe_conn_to_tickable_if_readable(stream);
939        rv = 0;
940  end_ok:
941        if (free_frame)
942            lsquic_malo_put(frame);
943        return rv;
944    }
945    else if (INS_FRAME_DUP == ins_frame)
946    {
947        return 0;
948    }
949    else if (INS_FRAME_OVERLAP == ins_frame)
950    {
951        LSQ_DEBUG("overlap: switching DATA IN implementation");
952        stream->data_in = stream->data_in->di_if->di_switch_impl(
953                                    stream->data_in, stream->read_offset);
954        if (stream->data_in)
955            goto insert_frame;
956        stream->data_in = data_in_error_new();
957        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
958        lsquic_malo_put(frame);
959        return -1;
960    }
961    else
962    {
963        assert(INS_FRAME_ERR == ins_frame);
964        return -1;
965    }
966}
967
968
969static void
970drop_frames_in (lsquic_stream_t *stream)
971{
972    if (stream->data_in)
973    {
974        stream->data_in->di_if->di_destroy(stream->data_in);
975        /* To avoid checking whether `data_in` is set, just set to the error
976         * data-in stream.  It does the right thing after incoming data is
977         * dropped.
978         */
979        stream->data_in = data_in_error_new();
980    }
981}
982
983
984static void
985maybe_elide_stream_frames (struct lsquic_stream *stream)
986{
987    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
988    {
989        if (stream->n_unacked)
990            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
991                                                stream->id);
992        stream->stream_flags |= STREAM_FRAMES_ELIDED;
993    }
994}
995
996
997int
998lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
999                      uint64_t error_code)
1000{
1001    struct lsquic_conn *lconn;
1002
1003    if ((stream->sm_bflags & SMBF_IETF)
1004            && (stream->stream_flags & STREAM_FIN_RECVD)
1005            && stream->sm_fin_off != offset)
1006    {
1007        lconn = stream->conn_pub->lconn;
1008        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1009            "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") "
1010            "does not match previous final size %"PRIu64, offset,
1011            stream->id, stream->sm_fin_off);
1012        return -1;
1013    }
1014
1015    if (stream->stream_flags & STREAM_RST_RECVD)
1016    {
1017        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
1018        return 0;
1019    }
1020
1021    SM_HISTORY_APPEND(stream, SHE_RST_IN);
1022    /* This flag must always be set, even if we are "ignoring" it: it is
1023     * used by elision code.
1024     */
1025    stream->stream_flags |= STREAM_RST_RECVD;
1026
1027    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
1028    {
1029        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is "
1030            "smaller than that of byte following the last byte we have seen: "
1031            "0x%"PRIX64, offset,
1032            lsquic_sfcw_get_max_recv_off(&stream->fc));
1033        return -1;
1034    }
1035
1036    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
1037    {
1038        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64
1039            " violates flow control", offset);
1040        return -1;
1041    }
1042
1043    /* Let user collect error: */
1044    maybe_conn_to_tickable_if_readable(stream);
1045
1046    lsquic_sfcw_consume_rem(&stream->fc);
1047    drop_frames_in(stream);
1048    drop_buffered_data(stream);
1049    maybe_elide_stream_frames(stream);
1050
1051    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1052                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1053        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1054
1055    stream->stream_flags |= STREAM_RST_RECVD;
1056
1057    maybe_finish_stream(stream);
1058    maybe_schedule_call_on_close(stream);
1059
1060    return 0;
1061}
1062
1063
1064void
1065lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1066                                                        uint64_t error_code)
1067{
1068    if (stream->stream_flags & STREAM_SS_RECVD)
1069    {
1070        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1071        return;
1072    }
1073
1074    SM_HISTORY_APPEND(stream, SHE_SS_IN);
1075    stream->stream_flags |= STREAM_SS_RECVD;
1076
1077    /* Let user collect error: */
1078    maybe_conn_to_tickable_if_readable(stream);
1079
1080    lsquic_sfcw_consume_rem(&stream->fc);
1081    drop_frames_in(stream);
1082    drop_buffered_data(stream);
1083    maybe_elide_stream_frames(stream);
1084
1085    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1086                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1087        lsquic_stream_reset_ext(stream, error_code, 0);
1088
1089    maybe_finish_stream(stream);
1090    maybe_schedule_call_on_close(stream);
1091}
1092
1093
1094uint64_t
1095lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1096{
1097    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1098}
1099
1100
1101void
1102lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1103{
1104    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1105    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1106    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1107        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1108    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1109}
1110
1111
1112uint64_t
1113lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1114{
1115    assert(stream->sm_qflags & SMQF_SEND_WUF);
1116    stream->sm_qflags &= ~SMQF_SEND_WUF;
1117    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1118        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1119    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1120}
1121
1122
1123void
1124lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1125{
1126    uint64_t last_off;
1127
1128    if (stream->sm_last_recv_off)
1129        last_off = stream->sm_last_recv_off;
1130    else
1131        /* This gets advertized in transport parameters */
1132        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1133
1134    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1135        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1136
1137    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1138    {
1139        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1140            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1141                                                    next_send_stream);
1142        stream->sm_qflags |= SMQF_SEND_WUF;
1143        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1144    }
1145    else if (stream->sm_qflags & SMQF_SEND_WUF)
1146        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1147    else if (stream->sm_last_recv_off)
1148        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1149            "packetized or sent", stream->sm_last_recv_off);
1150    else
1151        LSQ_INFO("Peer should have receive transport param limit "
1152            "of %"PRIu64"; odd.", last_off);
1153}
1154
1155
1156/* GQUIC's BLOCKED frame does not have an offset */
1157void
1158lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1159{
1160    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1161    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1162    {
1163        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1164            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1165                                                    next_send_stream);
1166        stream->sm_qflags |= SMQF_SEND_WUF;
1167        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1168    }
1169    else
1170        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1171}
1172
1173
1174void
1175lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1176{
1177    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1178    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1179    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1180    stream->stream_flags |= STREAM_BLOCKED_SENT;
1181    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1182        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1183}
1184
1185
1186void
1187lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1188{
1189    assert(stream->sm_qflags & SMQF_SEND_RST);
1190    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1191    stream->sm_qflags &= ~SMQF_SEND_RST;
1192    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1193        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1194    stream->stream_flags |= STREAM_RST_SENT;
1195    maybe_finish_stream(stream);
1196}
1197
1198
1199static size_t
1200read_uh (struct lsquic_stream *stream,
1201        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1202{
1203    struct http1x_headers *const h1h = stream->uh->uh_hset;
1204    size_t nread;
1205
1206    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1207                  h1h->h1h_size - h1h->h1h_off,
1208                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1209    h1h->h1h_off += nread;
1210    if (h1h->h1h_off == h1h->h1h_size)
1211    {
1212        LSQ_DEBUG("read all uncompressed headers");
1213        destroy_uh(stream);
1214        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1215        {
1216            stream->stream_flags |= STREAM_FIN_REACHED;
1217            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1218        }
1219    }
1220    return nread;
1221}
1222
1223
1224static void
1225stream_consumed_bytes (struct lsquic_stream *stream)
1226{
1227    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1228    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1229    {
1230        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1231            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1232                                                            next_send_stream);
1233        stream->sm_qflags |= SMQF_SEND_WUF;
1234        maybe_conn_to_tickable_if_writeable(stream, 1);
1235    }
1236}
1237
1238
1239struct read_frames_status
1240{
1241    int     error;
1242    int     processed_frames;
1243    size_t  total_nread;
1244};
1245
1246
1247static struct read_frames_status
1248read_data_frames (struct lsquic_stream *stream, int do_filtering,
1249        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1250{
1251    struct data_frame *data_frame;
1252    size_t nread, toread, total_nread;
1253    int short_read, processed_frames;
1254
1255    processed_frames = 0;
1256    total_nread = 0;
1257
1258    while ((data_frame = stream->data_in->di_if->di_get_frame(
1259                                        stream->data_in, stream->read_offset)))
1260    {
1261
1262        ++processed_frames;
1263
1264        do
1265        {
1266            if (do_filtering && stream->sm_sfi)
1267                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1268            else
1269                toread = data_frame->df_size - data_frame->df_read_off;
1270
1271            if (toread || data_frame->df_fin)
1272            {
1273                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1274                                                     toread, data_frame->df_fin);
1275                if (do_filtering && stream->sm_sfi)
1276                    stream->sm_sfi->sfi_decr_left(stream, nread);
1277                data_frame->df_read_off += nread;
1278                stream->read_offset += nread;
1279                total_nread += nread;
1280                short_read = nread < toread;
1281            }
1282            else
1283                short_read = 0;
1284
1285            if (data_frame->df_read_off == data_frame->df_size)
1286            {
1287                const int fin = data_frame->df_fin;
1288                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1289                data_frame = NULL;
1290                if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
1291                        (stream->data_in->di_flags & DI_SWITCH_IMPL))
1292                {
1293                    stream->data_in = stream->data_in->di_if->di_switch_impl(
1294                                                stream->data_in, stream->read_offset);
1295                    if (!stream->data_in)
1296                    {
1297                        stream->data_in = data_in_error_new();
1298                        return (struct read_frames_status) { .error = 1, };
1299                    }
1300                }
1301                if (fin)
1302                {
1303                    stream->stream_flags |= STREAM_FIN_REACHED;
1304                    goto end_while;
1305                }
1306            }
1307            else if (short_read)
1308                goto end_while;
1309        }
1310        while (data_frame);
1311    }
1312  end_while:
1313
1314    if (processed_frames)
1315        stream_consumed_bytes(stream);
1316
1317    return (struct read_frames_status) {
1318        .error = 0,
1319        .processed_frames = processed_frames,
1320        .total_nread = total_nread,
1321    };
1322}
1323
1324
1325static ssize_t
1326stream_readf (struct lsquic_stream *stream,
1327        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1328{
1329    size_t total_nread, nread;
1330    int read_unc_headers;
1331
1332    total_nread = 0;
1333
1334    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1335                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1336            && !(stream->stream_flags & STREAM_HAVE_UH)
1337            && !stream->uh)
1338    {
1339        if (stream->sm_readable(stream))
1340        {
1341            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1342            {
1343                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1344                errno = EBADMSG;
1345                return -1;
1346            }
1347            assert(stream->uh);
1348        }
1349        else
1350        {
1351            errno = EWOULDBLOCK;
1352            return -1;
1353        }
1354    }
1355
1356    if (stream->uh)
1357    {
1358        if (stream->uh->uh_flags & UH_H1H)
1359        {
1360            nread = read_uh(stream, readf, ctx);
1361            read_unc_headers = nread > 0;
1362            total_nread += nread;
1363            if (stream->uh)
1364                return total_nread;
1365        }
1366        else
1367        {
1368            LSQ_INFO("header set not claimed: cannot read from stream");
1369            return -1;
1370        }
1371    }
1372    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1373                                && !(stream->stream_flags & STREAM_HAVE_UH))
1374    {
1375        LSQ_DEBUG("cannot read: headers not available");
1376        errno = EWOULDBLOCK;
1377        return -1;
1378    }
1379    else
1380        read_unc_headers = 0;
1381
1382    const struct read_frames_status rfs
1383                        = read_data_frames(stream, 1, readf, ctx);
1384    if (rfs.error)
1385        return -1;
1386    total_nread += rfs.total_nread;
1387
1388    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
1389                                        total_nread, stream->read_offset);
1390
1391    if (rfs.processed_frames || read_unc_headers)
1392    {
1393        return total_nread;
1394    }
1395    else
1396    {
1397        assert(0 == total_nread);
1398        errno = EWOULDBLOCK;
1399        return -1;
1400    }
1401}
1402
1403
1404/* This function returns 0 when EOF is reached.
1405 */
1406ssize_t
1407lsquic_stream_readf (struct lsquic_stream *stream,
1408        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1409{
1410    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1411
1412    if (lsquic_stream_is_reset(stream))
1413    {
1414        if (stream->stream_flags & STREAM_RST_RECVD)
1415            stream->stream_flags |= STREAM_RST_READ;
1416        errno = ECONNRESET;
1417        return -1;
1418    }
1419    if (stream->stream_flags & STREAM_U_READ_DONE)
1420    {
1421        errno = EBADF;
1422        return -1;
1423    }
1424    if ((stream->stream_flags & STREAM_FIN_REACHED)
1425            && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH)
1426                   ^ !!(stream->sm_bflags & SMBF_USE_HEADERS)))
1427        return 0;
1428
1429    return stream_readf(stream, readf, ctx);
1430}
1431
1432
1433struct readv_ctx
1434{
1435    const struct iovec        *iov;
1436    const struct iovec *const  end;
1437    unsigned char             *p;
1438};
1439
1440
1441static size_t
1442readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1443{
1444    struct readv_ctx *const ctx = ctx_p;
1445    const unsigned char *const end = buf + len;
1446    size_t ntocopy;
1447
1448    while (ctx->iov < ctx->end && buf < end)
1449    {
1450        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1451                                                                    - ctx->p;
1452        if (ntocopy > (size_t) (end - buf))
1453            ntocopy = end - buf;
1454        memcpy(ctx->p, buf, ntocopy);
1455        ctx->p += ntocopy;
1456        buf += ntocopy;
1457        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1458        {
1459            do
1460                ++ctx->iov;
1461            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1462            if (ctx->iov < ctx->end)
1463                ctx->p = ctx->iov->iov_base;
1464            else
1465                ctx->p = NULL;
1466        }
1467    }
1468
1469    return len - (end - buf);
1470}
1471
1472
1473ssize_t
1474lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1475                     int iovcnt)
1476{
1477    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1478    return lsquic_stream_readf(stream, readv_f, &ctx);
1479}
1480
1481
1482ssize_t
1483lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1484{
1485    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1486    return lsquic_stream_readv(stream, &iov, 1);
1487}
1488
1489
1490static void
1491stream_shutdown_read (lsquic_stream_t *stream)
1492{
1493    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1494    {
1495        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1496        stream->stream_flags |= STREAM_U_READ_DONE;
1497        stream_wantread(stream, 0);
1498        maybe_finish_stream(stream);
1499    }
1500}
1501
1502
1503static int
1504stream_is_incoming_unidir (const struct lsquic_stream *stream)
1505{
1506    enum stream_id_type sit;
1507
1508    if (stream->sm_bflags & SMBF_IETF)
1509    {
1510        sit = stream->id & SIT_MASK;
1511        if (stream->sm_bflags & SMBF_SERVER)
1512            return sit == SIT_UNI_CLIENT;
1513        else
1514            return sit == SIT_UNI_SERVER;
1515    }
1516    else
1517        return 0;
1518}
1519
1520
1521static void
1522stream_shutdown_write (lsquic_stream_t *stream)
1523{
1524    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1525        return;
1526
1527    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1528    stream->stream_flags |= STREAM_U_WRITE_DONE;
1529    stream_wantwrite(stream, 0);
1530
1531    /* Don't bother to check whether there is anything else to write if
1532     * the flags indicate that nothing else should be written.
1533     */
1534    if (!(stream->sm_bflags & SMBF_CRYPTO)
1535            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1536                && !stream_is_incoming_unidir(stream)
1537                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1538    {
1539        if (stream->sm_n_buffered == 0)
1540        {
1541            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1542                                                 stream))
1543            {
1544                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1545                stream->stream_flags |= STREAM_FIN_SENT;
1546            }
1547            else
1548            {
1549                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1550                          "flag in it");
1551                (void) stream_flush_nocheck(stream);
1552            }
1553        }
1554        else
1555            (void) stream_flush_nocheck(stream);
1556    }
1557}
1558
1559
1560static void
1561maybe_stream_shutdown_write (struct lsquic_stream *stream)
1562{
1563    if (stream->sm_send_headers_state == SSHS_BEGIN)
1564        stream_shutdown_write(stream);
1565    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1566    {
1567        LSQ_DEBUG("shutdown delayed");
1568        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1569        stream->stream_flags |= STREAM_DELAYED_SW;
1570    }
1571}
1572
1573
1574int
1575lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1576{
1577    LSQ_DEBUG("shutdown; how: %d", how);
1578    if (lsquic_stream_is_closed(stream))
1579    {
1580        LSQ_INFO("Attempt to shut down a closed stream");
1581        errno = EBADF;
1582        return -1;
1583    }
1584    /* 0: read, 1: write: 2: read and write
1585     */
1586    if (how < 0 || how > 2)
1587    {
1588        errno = EINVAL;
1589        return -1;
1590    }
1591
1592    if (how)
1593        maybe_stream_shutdown_write(stream);
1594    if (how != 1)
1595        stream_shutdown_read(stream);
1596
1597    maybe_finish_stream(stream);
1598    maybe_schedule_call_on_close(stream);
1599    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1600        maybe_conn_to_tickable_if_writeable(stream, 1);
1601
1602    return 0;
1603}
1604
1605
1606void
1607lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1608{
1609    LSQ_DEBUG("internal shutdown");
1610    if (lsquic_stream_is_critical(stream))
1611    {
1612        LSQ_DEBUG("add flag to force-finish special stream");
1613        stream->stream_flags |= STREAM_FORCE_FINISH;
1614        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1615    }
1616    maybe_finish_stream(stream);
1617    maybe_schedule_call_on_close(stream);
1618}
1619
1620
1621static void
1622fake_reset_unused_stream (lsquic_stream_t *stream)
1623{
1624    stream->stream_flags |=
1625        STREAM_RST_RECVD    /* User will pick this up on read or write */
1626      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1627    ;
1628
1629    /* Cancel all writes to the network scheduled for this stream: */
1630    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1631        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1632                                                next_send_stream);
1633    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1634    drop_buffered_data(stream);
1635    LSQ_DEBUG("fake-reset stream%s",
1636                    stream_stalled(stream) ? " (stalled)" : "");
1637    maybe_finish_stream(stream);
1638    maybe_schedule_call_on_close(stream);
1639}
1640
1641
1642/* This function should only be called for locally-initiated streams whose ID
1643 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1644 * frame sent by peer but we have not yet received it and created a stream.
1645 * In this situation, we mark the stream as reset, so that user's on_read or
1646 * on_write event callback picks up the error.  That, in turn, should result
1647 * in stream being closed.
1648 *
1649 * If we have received any data frames on this stream, this probably indicates
1650 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1651 * lower than this.  However, we still try to handle it gracefully and peform
1652 * a shutdown, as if the stream was not reset.
1653 */
1654void
1655lsquic_stream_received_goaway (lsquic_stream_t *stream)
1656{
1657    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1658    if (0 == stream->read_offset &&
1659                            stream->data_in->di_if->di_empty(stream->data_in))
1660        fake_reset_unused_stream(stream);       /* Normal condition */
1661    else
1662    {   /* This is odd, let's handle it the best we can: */
1663        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1664        lsquic_stream_shutdown_internal(stream);
1665    }
1666}
1667
1668
1669uint64_t
1670lsquic_stream_read_offset (const lsquic_stream_t *stream)
1671{
1672    return stream->read_offset;
1673}
1674
1675
1676static int
1677stream_wantread (lsquic_stream_t *stream, int is_want)
1678{
1679    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
1680    const int new_val = !!is_want;
1681    if (old_val != new_val)
1682    {
1683        if (new_val)
1684        {
1685            if (!old_val)
1686                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1687                                                            next_read_stream);
1688            stream->sm_qflags |= SMQF_WANT_READ;
1689        }
1690        else
1691        {
1692            stream->sm_qflags &= ~SMQF_WANT_READ;
1693            if (old_val)
1694                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1695                                                            next_read_stream);
1696        }
1697    }
1698    return old_val;
1699}
1700
1701
1702static void
1703maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1704{
1705    assert(SMQF_WRITE_Q_FLAGS & flag);
1706    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1707        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1708                                                        next_write_stream);
1709    stream->sm_qflags |= flag;
1710}
1711
1712
1713static void
1714maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1715{
1716    assert(SMQF_WRITE_Q_FLAGS & flag);
1717    if (stream->sm_qflags & flag)
1718    {
1719        stream->sm_qflags &= ~flag;
1720        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1721            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1722                                                        next_write_stream);
1723    }
1724}
1725
1726
1727static int
1728stream_wantwrite (struct lsquic_stream *stream, int new_val)
1729{
1730    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1731
1732    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
1733
1734    if (old_val != new_val)
1735    {
1736        if (new_val)
1737            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
1738        else
1739            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
1740    }
1741    return old_val;
1742}
1743
1744
1745int
1746lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1747{
1748    SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want);
1749    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1750    {
1751        if (is_want)
1752            maybe_conn_to_tickable_if_readable(stream);
1753        return stream_wantread(stream, is_want);
1754    }
1755    else
1756    {
1757        errno = EBADF;
1758        return -1;
1759    }
1760}
1761
1762
1763int
1764lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1765{
1766    int old_val;
1767
1768    is_want = !!is_want;
1769
1770    SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want);
1771    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
1772                            && SSHS_BEGIN == stream->sm_send_headers_state)
1773    {
1774        stream->sm_saved_want_write = is_want;
1775        if (is_want)
1776            maybe_conn_to_tickable_if_writeable(stream, 1);
1777        return stream_wantwrite(stream, is_want);
1778    }
1779    else if (SSHS_BEGIN != stream->sm_send_headers_state)
1780    {
1781        old_val = stream->sm_saved_want_write;
1782        stream->sm_saved_want_write = is_want;
1783        return old_val;
1784    }
1785    else
1786    {
1787        errno = EBADF;
1788        return -1;
1789    }
1790}
1791
1792
1793struct progress
1794{
1795    enum stream_flags   s_flags;
1796    enum stream_q_flags q_flags;
1797};
1798
1799
1800static struct progress
1801stream_progress (const struct lsquic_stream *stream)
1802{
1803    return (struct progress) {
1804        .s_flags = stream->stream_flags
1805          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
1806        .q_flags = stream->sm_qflags
1807          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
1808    };
1809}
1810
1811
1812static int
1813progress_eq (struct progress a, struct progress b)
1814{
1815    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
1816}
1817
1818
1819static void
1820stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1821{
1822    unsigned no_progress_count, no_progress_limit;
1823    struct progress progress;
1824    uint64_t size;
1825
1826    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1827
1828    no_progress_count = 0;
1829    while ((stream->sm_qflags & SMQF_WANT_READ)
1830                                            && lsquic_stream_readable(stream))
1831    {
1832        progress = stream_progress(stream);
1833        size  = stream->read_offset;
1834
1835        stream->stream_if->on_read(stream, stream->st_ctx);
1836
1837        if (no_progress_limit && size == stream->read_offset &&
1838                                progress_eq(progress, stream_progress(stream)))
1839        {
1840            ++no_progress_count;
1841            if (no_progress_count >= no_progress_limit)
1842            {
1843                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1844                    "progress) in user code reading from stream",
1845                    no_progress_count,
1846                    no_progress_count == 1 ? "" : "s");
1847                break;
1848            }
1849        }
1850        else
1851            no_progress_count = 0;
1852    }
1853}
1854
1855
1856static void
1857stream_hblock_sent (struct lsquic_stream *stream)
1858{
1859    int want_write;
1860
1861    LSQ_DEBUG("header block has been sent: restore default behavior");
1862    stream->sm_send_headers_state = SSHS_BEGIN;
1863    stream->sm_write_avail = stream_write_avail_with_frames;
1864
1865    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1866    if (want_write != stream->sm_saved_want_write)
1867        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
1868
1869    if (stream->stream_flags & STREAM_DELAYED_SW)
1870    {
1871        LSQ_DEBUG("performing delayed shutdown write");
1872        stream->stream_flags &= ~STREAM_DELAYED_SW;
1873        stream_shutdown_write(stream);
1874        maybe_schedule_call_on_close(stream);
1875        maybe_finish_stream(stream);
1876        maybe_conn_to_tickable_if_writeable(stream, 1);
1877    }
1878}
1879
1880
1881static void
1882on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
1883{
1884    ssize_t nw;
1885
1886    nw = stream_write_buf(stream,
1887                stream->sm_header_block + stream->sm_hblock_off,
1888                stream->sm_hblock_sz - stream->sm_hblock_off);
1889    if (nw > 0)
1890    {
1891        stream->sm_hblock_off += nw;
1892        if (stream->sm_hblock_off == stream->sm_hblock_sz)
1893        {
1894            stream->stream_flags |= STREAM_HEADERS_SENT;
1895            free(stream->sm_header_block);
1896            stream->sm_header_block = NULL;
1897            stream->sm_hblock_sz = 0;
1898            stream_hblock_sent(stream);
1899            LSQ_DEBUG("header block written out successfully");
1900            /* TODO: if there was eos, do something else */
1901            if (stream->sm_qflags & SMQF_WANT_WRITE)
1902                stream->stream_if->on_write(stream, h);
1903        }
1904        else
1905        {
1906            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
1907                nw);
1908        }
1909    }
1910    else if (nw < 0)
1911    {
1912        /* XXX What should happen if we hit an error? TODO */
1913    }
1914}
1915
1916
1917static void
1918(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
1919                                                        lsquic_stream_ctx_t *)
1920{
1921    if (0 == (stream->stream_flags & STREAM_PUSHING)
1922                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
1923        /* Common case */
1924        return stream->stream_if->on_write;
1925    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
1926        return on_write_header_wrapper;
1927    else
1928    {
1929        assert(stream->stream_flags & STREAM_PUSHING);
1930        if (stream_is_pushing_promise(stream))
1931            return on_write_pp_wrapper;
1932        else if (stream->sm_dup_push_off < stream->sm_dup_push_len)
1933            return on_write_dp_wrapper;
1934        else
1935            return stream->stream_if->on_write;
1936    }
1937}
1938
1939
1940static void
1941stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1942{
1943    unsigned no_progress_count, no_progress_limit;
1944    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1945    struct progress progress;
1946
1947    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1948
1949    no_progress_count = 0;
1950    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1951    while ((stream->sm_qflags & SMQF_WANT_WRITE)
1952                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
1953                       && lsquic_stream_write_avail(stream))
1954    {
1955        progress = stream_progress(stream);
1956
1957        on_write = select_on_write(stream);
1958        on_write(stream, stream->st_ctx);
1959
1960        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
1961        {
1962            ++no_progress_count;
1963            if (no_progress_count >= no_progress_limit)
1964            {
1965                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1966                    "progress) in user code writing to stream",
1967                    no_progress_count,
1968                    no_progress_count == 1 ? "" : "s");
1969                break;
1970            }
1971        }
1972        else
1973            no_progress_count = 0;
1974    }
1975}
1976
1977
1978static void
1979stream_dispatch_read_events_once (lsquic_stream_t *stream)
1980{
1981    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
1982    {
1983        stream->stream_if->on_read(stream, stream->st_ctx);
1984    }
1985}
1986
1987
1988uint64_t
1989lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
1990{
1991    size_t frames_sizes;
1992
1993    frames_sizes = active_hq_frame_sizes(stream);
1994    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
1995}
1996
1997
1998static void
1999maybe_mark_as_blocked (lsquic_stream_t *stream)
2000{
2001    struct lsquic_conn_cap *cc;
2002    uint64_t used;
2003
2004    used = lsquic_stream_combined_send_off(stream);
2005    if (stream->max_send_off == used)
2006    {
2007        if (stream->blocked_off < stream->max_send_off)
2008        {
2009            stream->blocked_off = used;
2010            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
2011                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2012                                                            next_send_stream);
2013            stream->sm_qflags |= SMQF_SEND_BLOCKED;
2014            LSQ_DEBUG("marked stream-blocked at stream offset "
2015                                            "%"PRIu64, stream->blocked_off);
2016        }
2017        else
2018            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
2019                " has been, or is about to be, sent", stream->blocked_off);
2020    }
2021
2022    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
2023        && (cc = &stream->conn_pub->conn_cap,
2024                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
2025    {
2026        if (cc->cc_blocked < cc->cc_max)
2027        {
2028            cc->cc_blocked = cc->cc_max;
2029            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
2030            LSQ_DEBUG("marked connection-blocked at connection offset "
2031                                                    "%"PRIu64, cc->cc_max);
2032        }
2033        else
2034            LSQ_DEBUG("stream has already been marked connection-blocked "
2035                "at offset %"PRIu64, cc->cc_blocked);
2036    }
2037}
2038
2039
2040void
2041lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
2042{
2043    assert(stream->sm_qflags & SMQF_WANT_READ);
2044
2045    if (stream->sm_bflags & SMBF_RW_ONCE)
2046        stream_dispatch_read_events_once(stream);
2047    else
2048        stream_dispatch_read_events_loop(stream);
2049}
2050
2051
2052void
2053lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
2054{
2055    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2056    int progress;
2057    uint64_t tosend_off;
2058    unsigned short n_buffered;
2059    enum stream_q_flags q_flags;
2060
2061    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
2062    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
2063    tosend_off = stream->tosend_off;
2064    n_buffered = stream->sm_n_buffered;
2065
2066    if (stream->sm_qflags & SMQF_WANT_FLUSH)
2067        (void) stream_flush(stream);
2068
2069    if (stream->sm_bflags & SMBF_RW_ONCE)
2070    {
2071        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2072            && lsquic_stream_write_avail(stream))
2073        {
2074            on_write = select_on_write(stream);
2075            on_write(stream, stream->st_ctx);
2076        }
2077    }
2078    else
2079        stream_dispatch_write_events_loop(stream);
2080
2081    /* Progress means either flags or offsets changed: */
2082    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2083                        stream->tosend_off == tosend_off &&
2084                            stream->sm_n_buffered == n_buffered);
2085
2086    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2087    {
2088        if (progress)
2089        {   /* Move the stream to the end of the list to ensure fairness. */
2090            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2091                                                            next_write_stream);
2092            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2093                                                            next_write_stream);
2094        }
2095    }
2096}
2097
2098
2099static size_t
2100inner_reader_empty_size (void *ctx)
2101{
2102    return 0;
2103}
2104
2105
2106static size_t
2107inner_reader_empty_read (void *ctx, void *buf, size_t count)
2108{
2109    return 0;
2110}
2111
2112
2113static int
2114stream_flush (lsquic_stream_t *stream)
2115{
2116    struct lsquic_reader empty_reader;
2117    ssize_t nw;
2118
2119    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2120    assert(stream->sm_n_buffered > 0 ||
2121        /* Flushing is also used to packetize standalone FIN: */
2122        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2123                                                    == STREAM_U_WRITE_DONE));
2124
2125    empty_reader.lsqr_size = inner_reader_empty_size;
2126    empty_reader.lsqr_read = inner_reader_empty_read;
2127    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2128    nw = stream_write_to_packets(stream, &empty_reader, 0);
2129
2130    if (nw >= 0)
2131    {
2132        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2133        return 0;
2134    }
2135    else
2136        return -1;
2137}
2138
2139
2140static int
2141stream_flush_nocheck (lsquic_stream_t *stream)
2142{
2143    size_t frames;
2144
2145    frames = active_hq_frame_sizes(stream);
2146    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2147    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2148    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2149    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2150
2151    return stream_flush(stream);
2152}
2153
2154
2155int
2156lsquic_stream_flush (lsquic_stream_t *stream)
2157{
2158    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2159    {
2160        LSQ_DEBUG("cannot flush closed stream");
2161        errno = EBADF;
2162        return -1;
2163    }
2164
2165    if (0 == stream->sm_n_buffered)
2166    {
2167        LSQ_DEBUG("flushing 0 bytes: noop");
2168        return 0;
2169    }
2170
2171    return stream_flush_nocheck(stream);
2172}
2173
2174
2175static size_t
2176stream_get_n_allowed (const struct lsquic_stream *stream)
2177{
2178    if (stream->sm_n_allocated)
2179        return stream->sm_n_allocated;
2180    else
2181        return stream->conn_pub->path->np_pack_size;
2182}
2183
2184
2185/* The flush threshold is the maximum size of stream data that can be sent
2186 * in a full packet.
2187 */
2188#ifdef NDEBUG
2189static
2190#endif
2191       size_t
2192lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2193                                                            unsigned data_sz)
2194{
2195    enum packet_out_flags flags;
2196    enum packno_bits bits;
2197    size_t packet_header_sz, stream_header_sz, tag_len;
2198    size_t threshold;
2199
2200    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
2201    flags = bits << POBIT_SHIFT;
2202    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2203        flags |= PO_CONN_ID;
2204    if (stream_is_hsk(stream))
2205        flags |= PO_LONGHEAD;
2206
2207    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2208                                        stream->conn_pub->path->np_dcid.len);
2209    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2210    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2211
2212    threshold = stream_get_n_allowed(stream) - tag_len
2213              - packet_header_sz - stream_header_sz;
2214    return threshold;
2215}
2216
2217
2218#define COMMON_WRITE_CHECKS() do {                                          \
2219    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2220            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2221    {                                                                       \
2222        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2223        {                                                                   \
2224            LSQ_DEBUG("still sending headers: no writing allowed");         \
2225            return 0;                                                       \
2226        }                                                                   \
2227        else                                                                \
2228        {                                                                   \
2229            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2230                                                                "headers"); \
2231            errno = EILSEQ;                                                 \
2232            return -1;                                                      \
2233        }                                                                   \
2234    }                                                                       \
2235    if (lsquic_stream_is_reset(stream))                                     \
2236    {                                                                       \
2237        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2238        errno = ECONNRESET;                                                 \
2239        return -1;                                                          \
2240    }                                                                       \
2241    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2242    {                                                                       \
2243        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2244                                                                "writing"); \
2245        errno = EBADF;                                                      \
2246        return -1;                                                          \
2247    }                                                                       \
2248} while (0)
2249
2250
2251struct frame_gen_ctx
2252{
2253    lsquic_stream_t      *fgc_stream;
2254    struct lsquic_reader *fgc_reader;
2255    /* We keep our own count of how many bytes were read from reader because
2256     * some readers are external.  The external caller does not have to rely
2257     * on our count, but it can.
2258     */
2259    size_t                fgc_nread_from_reader;
2260    size_t              (*fgc_size) (void *ctx);
2261    int                 (*fgc_fin) (void *ctx);
2262    gsf_read_f            fgc_read;
2263};
2264
2265
2266static size_t
2267frame_std_gen_size (void *ctx)
2268{
2269    struct frame_gen_ctx *fg_ctx = ctx;
2270    size_t available, remaining;
2271
2272    /* Make sure we are not writing past available size: */
2273    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2274    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2275    if (available < remaining)
2276        remaining = available;
2277
2278    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2279}
2280
2281
2282static size_t
2283stream_hq_frame_size (const struct stream_hq_frame *shf)
2284{
2285    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2286        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2287    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2288        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2289    else
2290    {
2291        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2292                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2293        return 0;
2294    }
2295}
2296
2297
2298static size_t
2299active_hq_frame_sizes (const struct lsquic_stream *stream)
2300{
2301    const struct stream_hq_frame *shf;
2302    size_t size;
2303
2304    size = 0;
2305    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2306                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2307        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2308            if (!(shf->shf_flags & SHF_WRITTEN))
2309                size += stream_hq_frame_size(shf);
2310
2311    return size;
2312}
2313
2314
2315static uint64_t
2316stream_hq_frame_end (const struct stream_hq_frame *shf)
2317{
2318    if (shf->shf_flags & SHF_FIXED_SIZE)
2319        return shf->shf_off + shf->shf_frame_size;
2320    else if (shf->shf_flags & SHF_TWO_BYTES)
2321        return shf->shf_off + ((1 << 14) - 1);
2322    else
2323        return shf->shf_off + ((1 << 6) - 1);
2324}
2325
2326
2327static int
2328frame_in_stream (const struct lsquic_stream *stream,
2329                                            const struct stream_hq_frame *shf)
2330{
2331    return shf >= stream->sm_hq_frame_arr
2332        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2333                                        / sizeof(stream->sm_hq_frame_arr[0])
2334        ;
2335}
2336
2337
2338static void
2339stream_hq_frame_put (struct lsquic_stream *stream,
2340                                                struct stream_hq_frame *shf)
2341{
2342    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2343    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2344    if (frame_in_stream(stream, shf))
2345        memset(shf, 0, sizeof(*shf));
2346    else
2347        lsquic_malo_put(shf);
2348}
2349
2350
2351static void
2352stream_hq_frame_close (struct lsquic_stream *stream,
2353                                                struct stream_hq_frame *shf)
2354{
2355    unsigned bits;
2356
2357    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2358        " (actual offset %"PRIu64")", shf->shf_frame_type,
2359        stream->sm_payload, stream->tosend_off);
2360    assert(shf->shf_flags & SHF_ACTIVE);
2361    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2362    {
2363        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2364        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2365        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2366                                                            bits, 1 << bits);
2367    }
2368    stream_hq_frame_put(stream, shf);
2369}
2370
2371
2372static size_t
2373frame_hq_gen_size (void *ctx)
2374{
2375    struct frame_gen_ctx *fg_ctx = ctx;
2376    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2377    size_t available, remaining, frames;
2378    const struct stream_hq_frame *shf;
2379
2380    frames = 0;
2381    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2382        if (shf->shf_off >= stream->sm_payload)
2383            frames += stream_hq_frame_size(shf);
2384
2385    /* Make sure we are not writing past available size: */
2386    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2387    available = lsquic_stream_write_avail(stream);
2388    if (available < remaining)
2389        remaining = available;
2390
2391    return remaining + stream->sm_n_buffered + frames;
2392}
2393
2394
2395static int
2396frame_std_gen_fin (void *ctx)
2397{
2398    struct frame_gen_ctx *fg_ctx = ctx;
2399    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2400        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2401        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2402        /* Do not use frame_std_gen_size() as it may chop the real size: */
2403        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2404}
2405
2406
2407static void
2408incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2409{
2410    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2411    {
2412        stream->conn_pub->conn_cap.cc_sent += incr;
2413        assert(stream->conn_pub->conn_cap.cc_sent
2414                                    <= stream->conn_pub->conn_cap.cc_max);
2415    }
2416}
2417
2418
2419void
2420incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2421{
2422    stream->sm_payload += incr;
2423    stream->tosend_off += incr;
2424    assert(stream->tosend_off <= stream->max_send_off);
2425}
2426
2427
2428static size_t
2429frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2430{
2431    struct frame_gen_ctx *fg_ctx = ctx;
2432    unsigned char *p = begin_buf;
2433    unsigned char *const end = p + len;
2434    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2435    size_t n_written, available, n_to_write;
2436
2437    if (stream->sm_n_buffered > 0)
2438    {
2439        if (len <= stream->sm_n_buffered)
2440        {
2441            memcpy(p, stream->sm_buf, len);
2442            memmove(stream->sm_buf, stream->sm_buf + len,
2443                                                stream->sm_n_buffered - len);
2444            stream->sm_n_buffered -= len;
2445            if (0 == stream->sm_n_buffered)
2446                maybe_resize_stream_buffer(stream);
2447            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2448            incr_sm_payload(stream, len);
2449            *fin = fg_ctx->fgc_fin(fg_ctx);
2450            return len;
2451        }
2452        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2453        p += stream->sm_n_buffered;
2454        stream->sm_n_buffered = 0;
2455        maybe_resize_stream_buffer(stream);
2456    }
2457
2458    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2459    n_to_write = end - p;
2460    if (n_to_write > available)
2461        n_to_write = available;
2462    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2463                                              n_to_write);
2464    p += n_written;
2465    fg_ctx->fgc_nread_from_reader += n_written;
2466    *fin = fg_ctx->fgc_fin(fg_ctx);
2467    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2468    incr_conn_cap(stream, n_written);
2469    return p - (const unsigned char *) begin_buf;
2470}
2471
2472
2473static struct stream_hq_frame *
2474find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2475{
2476    struct stream_hq_frame *shf;
2477
2478    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2479        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2480            return shf;
2481
2482    return NULL;
2483}
2484
2485
2486static struct stream_hq_frame *
2487find_cur_hq_frame (const struct lsquic_stream *stream)
2488{
2489    return find_hq_frame(stream, stream->sm_payload);
2490}
2491
2492
2493static struct stream_hq_frame *
2494open_hq_frame (struct lsquic_stream *stream)
2495{
2496    struct stream_hq_frame *shf;
2497
2498    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2499            + sizeof(stream->sm_hq_frame_arr)
2500                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2501        if (!(shf->shf_flags & SHF_ACTIVE))
2502            goto found;
2503
2504    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2505    if (!shf)
2506    {
2507        LSQ_WARN("cannot allocate HQ frame");
2508        return NULL;
2509    }
2510    memset(shf, 0, sizeof(*shf));
2511
2512  found:
2513    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2514    shf->shf_flags = SHF_ACTIVE;
2515    return shf;
2516}
2517
2518
2519/* Returns index of the new frame */
2520static struct stream_hq_frame *
2521stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2522            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2523{
2524    struct stream_hq_frame *shf;
2525
2526    shf = open_hq_frame(stream);
2527    if (!shf)
2528    {
2529        LSQ_WARN("could not open HQ frame");
2530        return NULL;
2531    }
2532
2533    shf->shf_off        = off;
2534    shf->shf_flags     |= flags;
2535    shf->shf_frame_type = frame_type;
2536    if (shf->shf_flags & SHF_FIXED_SIZE)
2537    {
2538        shf->shf_frame_size = size;
2539        LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset "
2540            "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size);
2541    }
2542    else
2543    {
2544        shf->shf_frame_ptr  = NULL;
2545        if (size >= (1 << 6))
2546            shf->shf_flags |= SHF_TWO_BYTES;
2547        LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset "
2548            "%"PRIu64, shf->shf_frame_type, shf->shf_off);
2549    }
2550
2551    return shf;
2552}
2553
2554
2555static size_t
2556frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2557{
2558    struct frame_gen_ctx *fg_ctx = ctx;
2559    unsigned char *p = begin_buf;
2560    unsigned char *const end = p + len;
2561    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2562    struct stream_hq_frame *shf;
2563    size_t nw, frame_sz, avail, rem;
2564    unsigned bits;
2565
2566    while (p < end)
2567    {
2568        shf = find_cur_hq_frame(stream);
2569        if (shf)
2570            LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64,
2571                                            shf->shf_frame_type, shf->shf_off);
2572        else
2573        {
2574            rem = frame_std_gen_size(ctx);
2575            if (rem)
2576            {
2577                if (rem > ((1 << 14) - 1))
2578                    rem = (1 << 14) - 1;
2579                shf = stream_activate_hq_frame(stream,
2580                                    stream->sm_payload, HQFT_DATA, 0, rem);
2581                if (!shf)
2582                {
2583                    /* TODO: abort connection?  Handle failure somehow */
2584                    break;
2585                }
2586            }
2587            else
2588                break;
2589        }
2590        avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
2591        if (shf->shf_off == stream->sm_payload
2592                                        && !(shf->shf_flags & SHF_WRITTEN))
2593        {
2594            frame_sz = stream_hq_frame_size(shf);
2595            if (frame_sz > (uintptr_t) (end - p))
2596            {
2597                stream_hq_frame_put(stream, shf);
2598                break;
2599            }
2600            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2601                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2602                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2603            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2604            {
2605                shf->shf_frame_ptr = p;
2606                memset(p, 0, frame_sz);
2607                p += frame_sz;
2608            }
2609            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2610                                                            == SHF_FIXED_SIZE)
2611            {
2612                *p++ = shf->shf_frame_type;
2613                bits = vint_val2bits(shf->shf_frame_size);
2614                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2615                p += 1 << bits;
2616            }
2617            else
2618                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2619                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2620            if (!(shf->shf_flags & SHF_CC_PAID))
2621            {
2622                incr_conn_cap(stream, frame_sz);
2623                shf->shf_flags |= SHF_CC_PAID;
2624            }
2625            shf->shf_flags |= SHF_WRITTEN;
2626            stream->tosend_off += frame_sz;
2627            assert(stream->tosend_off <= stream->max_send_off);
2628        }
2629        else
2630        {
2631            len = stream_hq_frame_end(shf) - stream->sm_payload;
2632            assert(len);
2633            if (len > (unsigned) (end - p))
2634                len = end - p;
2635            if (len > avail)
2636                len = avail;
2637            if (!len)
2638                break;
2639            nw = frame_std_gen_read(ctx, p, len, fin);
2640            p += nw;
2641            if (nw < len)
2642                break;
2643            if (stream_hq_frame_end(shf) == stream->sm_payload)
2644                stream_hq_frame_close(stream, shf);
2645        }
2646    }
2647
2648    return p - (unsigned char *) begin_buf;
2649}
2650
2651
2652static size_t
2653crypto_frame_gen_read (void *ctx, void *buf, size_t len)
2654{
2655    int fin_ignored;
2656
2657    return frame_std_gen_read(ctx, buf, len, &fin_ignored);
2658}
2659
2660
2661static void
2662check_flush_threshold (lsquic_stream_t *stream)
2663{
2664    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
2665                            stream->tosend_off >= stream->sm_flush_to)
2666    {
2667        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
2668                                                    stream->sm_flush_to);
2669        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
2670    }
2671}
2672
2673
2674static int
2675write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
2676                                        struct lsquic_packet_out *packet_out)
2677{
2678    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2679    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2680    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2681    unsigned off;
2682    int len, s;
2683
2684#if LSQUIC_CONN_STATS
2685    const uint64_t begin_off = stream->tosend_off;
2686#endif
2687    off = packet_out->po_data_sz;
2688    len = pf->pf_gen_stream_frame(
2689                packet_out->po_data + packet_out->po_data_sz,
2690                lsquic_packet_out_avail(packet_out), stream->id,
2691                stream->tosend_off,
2692                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
2693    if (len < 0)
2694        return len;
2695
2696#if LSQUIC_CONN_STATS
2697    stream->conn_pub->conn_stats->out.stream_frames += 1;
2698    stream->conn_pub->conn_stats->out.stream_data_sz
2699                                            += stream->tosend_off - begin_off;
2700#endif
2701    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
2702                            packet_out->po_data + packet_out->po_data_sz, len);
2703    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2704    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
2705    if (0 == lsquic_packet_out_avail(packet_out))
2706        packet_out->po_flags |= PO_STREAM_END;
2707    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2708                                     stream, QUIC_FRAME_STREAM, off, len);
2709    if (s != 0)
2710    {
2711        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
2712        return -1;
2713    }
2714
2715    check_flush_threshold(stream);
2716    return len;
2717}
2718
2719
2720static enum swtp_status
2721stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
2722{
2723    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2724    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2725    struct lsquic_packet_out *packet_out;
2726    int len;
2727
2728    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
2729                                                    stream->conn_pub->path);
2730    if (!packet_out)
2731        return SWTP_STOP;
2732    packet_out->po_header_type = stream->tosend_off == 0
2733                                        ? HETY_INITIAL : HETY_HANDSHAKE;
2734
2735    len = write_stream_frame(fg_ctx, size, packet_out);
2736
2737    if (len > 0)
2738    {
2739        packet_out->po_flags |= PO_HELLO;
2740        lsquic_packet_out_zero_pad(packet_out);
2741        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
2742        return SWTP_OK;
2743    }
2744    else
2745        return SWTP_ERROR;
2746}
2747
2748
2749static enum swtp_status
2750stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
2751{
2752    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2753    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2754    unsigned stream_header_sz, need_at_least;
2755    struct lsquic_packet_out *packet_out;
2756    struct lsquic_stream *headers_stream;
2757    int len;
2758
2759    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
2760                                                        == STREAM_HEADERS_SENT)
2761    {
2762        if (stream->sm_bflags & SMBF_IETF)
2763        {
2764            if (stream->stream_flags & STREAM_ENCODER_DEP)
2765                headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out;
2766            else
2767                headers_stream = NULL;
2768        }
2769        else
2770            headers_stream =
2771                lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
2772        if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream))
2773        {
2774            LSQ_DEBUG("flushing headers stream before packetizing stream data");
2775            (void) lsquic_stream_flush(headers_stream);
2776        }
2777        /* If there is nothing to flush, some other stream must have flushed it:
2778         * this means our headers are flushed.  Either way, only do this once.
2779         */
2780        stream->stream_flags |= STREAM_HDRS_FLUSHED;
2781    }
2782
2783    stream_header_sz = stream->sm_frame_header_sz(stream, size);
2784    need_at_least = stream_header_sz;
2785    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2786                                       == (SMBF_IETF|SMBF_USE_HEADERS))
2787    {
2788        if (size > 0)
2789            need_at_least += 3;     /* Enough room for HTTP/3 frame */
2790    }
2791    else
2792        need_at_least += size > 0;
2793  get_packet:
2794    packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl,
2795                                need_at_least, stream->conn_pub->path, stream);
2796    if (packet_out)
2797    {
2798        len = write_stream_frame(fg_ctx, size, packet_out);
2799        if (len > 0)
2800            return SWTP_OK;
2801        assert(len < 0);
2802        if (-len > (int) need_at_least)
2803        {
2804            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
2805                "%u bytes, will try again", -len, need_at_least);
2806            need_at_least = -len;
2807            goto get_packet;
2808        }
2809        return SWTP_ERROR;
2810    }
2811    else
2812        return SWTP_STOP;
2813}
2814
2815
2816static enum swtp_status
2817stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
2818{
2819    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2820    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2821    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2822    unsigned crypto_header_sz, need_at_least;
2823    struct lsquic_packet_out *packet_out;
2824    unsigned short off;
2825    const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ];
2826    int len, s;
2827
2828    assert(size > 0);
2829    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
2830    need_at_least = crypto_header_sz + 1;
2831
2832    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
2833                                    need_at_least, pns, stream->conn_pub->path);
2834    if (!packet_out)
2835        return SWTP_STOP;
2836
2837    off = packet_out->po_data_sz;
2838    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
2839                lsquic_packet_out_avail(packet_out), stream->tosend_off,
2840                size, crypto_frame_gen_read, fg_ctx);
2841    if (len < 0)
2842        return len;
2843
2844    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
2845                            packet_out->po_data + packet_out->po_data_sz, len);
2846    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2847    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
2848    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2849                                     stream, QUIC_FRAME_CRYPTO, off, len);
2850    if (s != 0)
2851    {
2852        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
2853        return -1;
2854    }
2855
2856    packet_out->po_flags |= PO_HELLO;
2857
2858    check_flush_threshold(stream);
2859    return SWTP_OK;
2860}
2861
2862
2863static void
2864abort_connection (struct lsquic_stream *stream)
2865{
2866    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
2867        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
2868                                                next_service_stream);
2869    stream->sm_qflags |= SMQF_ABORT_CONN;
2870    LSQ_WARN("connection will be aborted");
2871    maybe_conn_to_tickable(stream);
2872}
2873
2874
2875static void
2876maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
2877{
2878    struct stream_hq_frame *shf;
2879    uint64_t size;
2880    unsigned bits;
2881
2882    shf = find_cur_hq_frame(stream);
2883    if (!shf)
2884        return;
2885
2886    if (shf->shf_flags & SHF_FIXED_SIZE)
2887    {
2888        if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload)
2889            stream_hq_frame_put(stream, shf);
2890        return;
2891    }
2892
2893    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2894    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
2895    if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr)
2896    {
2897        if (0 == stream->sm_n_buffered)
2898            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
2899                                                shf->shf_frame_type, size);
2900        else
2901            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
2902                                                shf->shf_frame_type, size);
2903        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2904        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
2905        if (0 == stream->sm_n_buffered)
2906            stream_hq_frame_put(stream, shf);
2907        else
2908        {
2909            shf->shf_frame_size = size;
2910            shf->shf_flags |= SHF_FIXED_SIZE;
2911        }
2912    }
2913    else if (!shf->shf_frame_ptr)
2914    {
2915        LSQ_WARN("dangling HTTP/3 frame");
2916        stream->conn_pub->lconn->cn_if->ci_internal_error(
2917            stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64,
2918                stream->id);
2919        stream_hq_frame_put(stream, shf);
2920    }
2921    else if (!size)
2922    {
2923        assert(!shf->shf_frame_ptr);
2924        LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")",
2925                                        shf->shf_frame_type, shf->shf_off);
2926        stream_hq_frame_put(stream, shf);
2927    }
2928    else
2929    {
2930        assert(stream->sm_n_buffered);
2931        LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size);
2932        /* TODO: abort connection */
2933        stream_hq_frame_put(stream, shf);
2934    }
2935}
2936
2937
2938static ssize_t
2939stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
2940                         size_t thresh)
2941{
2942    size_t size;
2943    ssize_t nw;
2944    unsigned seen_ok;
2945    int use_framing;
2946    struct frame_gen_ctx fg_ctx = {
2947        .fgc_stream = stream,
2948        .fgc_reader = reader,
2949        .fgc_nread_from_reader = 0,
2950    };
2951
2952    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2953                                       == (SMBF_IETF|SMBF_USE_HEADERS);
2954    if (use_framing)
2955    {
2956        fg_ctx.fgc_size = frame_hq_gen_size;
2957        fg_ctx.fgc_read = frame_hq_gen_read;
2958        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
2959    }
2960    else
2961    {
2962        fg_ctx.fgc_size = frame_std_gen_size;
2963        fg_ctx.fgc_read = frame_std_gen_read;
2964        fg_ctx.fgc_fin = frame_std_gen_fin;
2965    }
2966
2967    seen_ok = 0;
2968    while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0)
2969           || fg_ctx.fgc_fin(&fg_ctx))
2970    {
2971        switch (stream->sm_write_to_packet(&fg_ctx, size))
2972        {
2973        case SWTP_OK:
2974            if (!seen_ok++)
2975                maybe_conn_to_tickable_if_writeable(stream, 0);
2976            if (fg_ctx.fgc_fin(&fg_ctx))
2977            {
2978                if (use_framing && seen_ok)
2979                    maybe_close_varsize_hq_frame(stream);
2980                stream->stream_flags |= STREAM_FIN_SENT;
2981                goto end;
2982            }
2983            else
2984                break;
2985        case SWTP_STOP:
2986            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2987            if (use_framing && seen_ok)
2988                maybe_close_varsize_hq_frame(stream);
2989            goto end;
2990        default:
2991            abort_connection(stream);
2992            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2993            return -1;
2994        }
2995    }
2996
2997    if (use_framing && seen_ok)
2998        maybe_close_varsize_hq_frame(stream);
2999
3000    if (thresh)
3001    {
3002        assert(size < thresh);
3003        assert(size >= stream->sm_n_buffered);
3004        size -= stream->sm_n_buffered;
3005        if (size > 0)
3006        {
3007            nw = save_to_buffer(stream, reader, size);
3008            if (nw < 0)
3009                return -1;
3010            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
3011        }
3012    }
3013    else
3014    {
3015        /* We count flushed data towards both stream and connection limits,
3016         * so we should have been able to packetize all of it:
3017         */
3018        assert(0 == stream->sm_n_buffered);
3019        assert(size == 0);
3020    }
3021
3022    maybe_mark_as_blocked(stream);
3023
3024  end:
3025    return fg_ctx.fgc_nread_from_reader;
3026}
3027
3028
3029/* Perform an implicit flush when we hit connection limit while buffering
3030 * data.  This is to prevent a (theoretical) stall:
3031 *
3032 * Imagine a number of streams, all of which buffered some data.  The buffered
3033 * data is up to connection cap, which means no further writes are possible.
3034 * None of them flushes, which means that data is not sent and connection
3035 * WINDOW_UPDATE frame never arrives from peer.  Stall.
3036 */
3037static int
3038maybe_flush_stream (struct lsquic_stream *stream)
3039{
3040    if (stream->sm_n_buffered > 0
3041          && (stream->sm_bflags & SMBF_CONN_LIMITED)
3042            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
3043        return stream_flush_nocheck(stream);
3044    else
3045        return 0;
3046}
3047
3048
3049static int
3050stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
3051                                                                    unsigned len)
3052{
3053    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
3054        && cur_off - shf->shf_off < (1 << 6)
3055        && cur_off - shf->shf_off + len >= (1 << 6)
3056        ;
3057}
3058
3059
3060/* Update currently buffered HQ frame or create a new one, if possible.
3061 * Return update length to be buffered.  If a HQ frame cannot be
3062 * buffered due to size, 0 is returned, thereby preventing both HQ frame
3063 * creation and buffering.
3064 */
3065static size_t
3066update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
3067                                                                size_t avail)
3068{
3069    struct stream_hq_frame *shf;
3070    uint64_t cur_off, end;
3071    size_t frame_sz;
3072    unsigned extendable;
3073
3074    cur_off = stream->sm_payload + stream->sm_n_buffered;
3075    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3076        if (shf->shf_off <= cur_off)
3077        {
3078            end = stream_hq_frame_end(shf);
3079            extendable = stream_hq_frame_extendable(shf, cur_off, len);
3080            if (cur_off < end + extendable)
3081                break;
3082        }
3083
3084    if (shf)
3085    {
3086        if (len > end + extendable - cur_off)
3087            len = end + extendable - cur_off;
3088        frame_sz = stream_hq_frame_size(shf);
3089    }
3090    else
3091    {
3092        assert(avail >= 3);
3093        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
3094        /* XXX malloc can fail */
3095        if (len > stream_hq_frame_end(shf) - cur_off)
3096            len = stream_hq_frame_end(shf) - cur_off;
3097        extendable = 0;
3098        frame_sz = stream_hq_frame_size(shf);
3099        if (avail < frame_sz)
3100            return 0;
3101        avail -= frame_sz;
3102    }
3103
3104    if (!(shf->shf_flags & SHF_CC_PAID))
3105    {
3106        incr_conn_cap(stream, frame_sz);
3107        shf->shf_flags |= SHF_CC_PAID;
3108    }
3109    if (extendable)
3110    {
3111        shf->shf_flags |= SHF_TWO_BYTES;
3112        incr_conn_cap(stream, 1);
3113        avail -= 1;
3114        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3115                && shf->shf_off <= stream->sm_payload
3116                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3117            stream->sm_flush_to += 1;
3118    }
3119
3120    if (len <= avail)
3121        return len;
3122    else
3123        return avail;
3124}
3125
3126
3127static ssize_t
3128save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3129                                                                size_t len)
3130{
3131    size_t avail, n_written, n_allowed;
3132
3133    avail = lsquic_stream_write_avail(stream);
3134    if (avail < len)
3135        len = avail;
3136    if (len == 0)
3137    {
3138        LSQ_DEBUG("zero-byte write (avail: %zu)", avail);
3139        return 0;
3140    }
3141
3142    n_allowed = stream_get_n_allowed(stream);
3143    assert(stream->sm_n_buffered + len <= n_allowed);
3144
3145    if (!stream->sm_buf)
3146    {
3147        stream->sm_buf = malloc(n_allowed);
3148        if (!stream->sm_buf)
3149            return -1;
3150        stream->sm_n_allocated = n_allowed;
3151    }
3152
3153    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3154                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3155        len = update_buffered_hq_frames(stream, len, avail);
3156
3157    n_written = reader->lsqr_read(reader->lsqr_ctx,
3158                        stream->sm_buf + stream->sm_n_buffered, len);
3159    stream->sm_n_buffered += n_written;
3160    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3161    incr_conn_cap(stream, n_written);
3162    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3163              n_written, stream->sm_n_buffered);
3164    if (0 != maybe_flush_stream(stream))
3165        return -1;
3166    return n_written;
3167}
3168
3169
3170static ssize_t
3171stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
3172{
3173    const struct stream_hq_frame *shf;
3174    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3175    ssize_t nw;
3176
3177    len = reader->lsqr_size(reader->lsqr_ctx);
3178    if (len == 0)
3179        return 0;
3180
3181    frames = 0;
3182    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3183                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3184        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3185            if (shf->shf_off >= stream->sm_payload)
3186                frames += stream_hq_frame_size(shf);
3187    total_len = len + frames + stream->sm_n_buffered;
3188    thresh = lsquic_stream_flush_threshold(stream, total_len);
3189    n_allowed = stream_get_n_allowed(stream);
3190    if (total_len <= n_allowed && total_len < thresh)
3191    {
3192        nwritten = 0;
3193        do
3194        {
3195            nw = save_to_buffer(stream, reader, len - nwritten);
3196            if (nw > 0)
3197                nwritten += (size_t) nw;
3198            else if (nw == 0)
3199                break;
3200            else
3201                return nw;
3202        }
3203        while (nwritten < len
3204                        && stream->sm_n_buffered < stream->sm_n_allocated);
3205        return nwritten;
3206    }
3207    else
3208        return stream_write_to_packets(stream, reader, thresh);
3209}
3210
3211
3212ssize_t
3213lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3214{
3215    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3216    return lsquic_stream_writev(stream, &iov, 1);
3217}
3218
3219
3220struct inner_reader_iovec {
3221    const struct iovec       *iov;
3222    const struct iovec *end;
3223    unsigned                  cur_iovec_off;
3224};
3225
3226
3227static size_t
3228inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3229{
3230    struct inner_reader_iovec *const iro = ctx;
3231    unsigned char *p = buf;
3232    unsigned char *const end = p + count;
3233    unsigned n_tocopy;
3234
3235    while (iro->iov < iro->end && p < end)
3236    {
3237        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3238        if (n_tocopy > (unsigned) (end - p))
3239            n_tocopy = end - p;
3240        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3241                                                                    n_tocopy);
3242        p += n_tocopy;
3243        iro->cur_iovec_off += n_tocopy;
3244        if (iro->iov->iov_len == iro->cur_iovec_off)
3245        {
3246            ++iro->iov;
3247            iro->cur_iovec_off = 0;
3248        }
3249    }
3250
3251    return p + count - end;
3252}
3253
3254
3255static size_t
3256inner_reader_iovec_size (void *ctx)
3257{
3258    struct inner_reader_iovec *const iro = ctx;
3259    const struct iovec *iov;
3260    size_t size;
3261
3262    size = 0;
3263    for (iov = iro->iov; iov < iro->end; ++iov)
3264        size += iov->iov_len;
3265
3266    return size - iro->cur_iovec_off;
3267}
3268
3269
3270ssize_t
3271lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3272                                                                    int iovcnt)
3273{
3274    COMMON_WRITE_CHECKS();
3275    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3276
3277    struct inner_reader_iovec iro = {
3278        .iov = iov,
3279        .end = iov + iovcnt,
3280        .cur_iovec_off = 0,
3281    };
3282    struct lsquic_reader reader = {
3283        .lsqr_read = inner_reader_iovec_read,
3284        .lsqr_size = inner_reader_iovec_size,
3285        .lsqr_ctx  = &iro,
3286    };
3287
3288    return stream_write(stream, &reader);
3289}
3290
3291
3292ssize_t
3293lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3294{
3295    COMMON_WRITE_CHECKS();
3296    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3297    return stream_write(stream, reader);
3298}
3299
3300
3301/* This bypasses COMMON_WRITE_CHECKS */
3302static ssize_t
3303stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3304{
3305    struct iovec iov = { (void *) buf, sz, };
3306    struct inner_reader_iovec iro = {
3307        .iov = &iov,
3308        .end = &iov + 1,
3309        .cur_iovec_off = 0,
3310    };
3311    struct lsquic_reader reader = {
3312        .lsqr_read = inner_reader_iovec_read,
3313        .lsqr_size = inner_reader_iovec_size,
3314        .lsqr_ctx  = &iro,
3315    };
3316    return stream_write(stream, &reader);
3317}
3318
3319
3320/* XXX Move this define elsewhere? */
3321#define MAX_HEADERS_SIZE (64 * 1024)
3322
3323static int
3324send_headers_ietf (struct lsquic_stream *stream,
3325                            const struct lsquic_http_headers *headers, int eos)
3326{
3327    enum qwh_status qwh;
3328    const size_t max_prefix_size =
3329                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
3330    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
3331    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
3332    unsigned bits;
3333    ssize_t nw;
3334    unsigned char *header_block;
3335    enum lsqpack_enc_header_flags hflags;
3336    unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE];
3337
3338    stream->stream_flags &= ~STREAM_PUSHING;
3339    stream->stream_flags |= STREAM_NOPUSH;
3340
3341    /* TODO: Optimize for the common case: write directly to sm_buf and fall
3342     * back to a larger buffer if that fails.
3343     */
3344    prefix_sz = max_prefix_size;
3345    headers_sz = sizeof(buf) - max_prefix_size - max_push_size;
3346    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
3347                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
3348                &headers_sz, &stream->sm_hb_compl, &hflags);
3349
3350    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
3351    {
3352        if (qwh == QWH_ENOBUF)
3353            LSQ_INFO("not enough room for header block");
3354        else
3355            LSQ_WARN("internal error encoding and sending HTTP headers");
3356        return -1;
3357    }
3358
3359    if (hflags & LSQECH_REF_NEW_ENTRIES)
3360        stream->stream_flags |= STREAM_ENCODER_DEP;
3361
3362    if (stream->sm_promise)
3363    {
3364        assert(lsquic_stream_is_pushed(stream));
3365        bits = vint_val2bits(stream->sm_promise->pp_id);
3366        push_sz = 1 + (1 << bits);
3367        if (!stream_activate_hq_frame(stream,
3368                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
3369                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
3370            return -1;
3371        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
3372        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
3373                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
3374    }
3375    else
3376        push_sz = 0;
3377
3378    /* Construct contiguous header block buffer including HQ framing */
3379    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
3380    hblock_sz = push_sz + prefix_sz + headers_sz;
3381    if (!stream_activate_hq_frame(stream,
3382                stream->sm_payload + stream->sm_n_buffered + push_sz,
3383                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
3384        return -1;
3385
3386    if (qwh == QWH_FULL)
3387    {
3388        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
3389        if (lsquic_stream_write_avail(stream))
3390        {
3391            nw = stream_write_buf(stream, header_block, hblock_sz);
3392            if (nw < 0)
3393            {
3394                LSQ_WARN("cannot write to stream: %s", strerror(errno));
3395                return -1;
3396            }
3397            if ((size_t) nw == hblock_sz)
3398            {
3399                stream->stream_flags |= STREAM_HEADERS_SENT;
3400                stream_hblock_sent(stream);
3401                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
3402                return 0;
3403            }
3404            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
3405        }
3406        else
3407        {
3408            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
3409                                        "header block", hblock_sz);
3410            nw = 0;
3411        }
3412    }
3413    else
3414    {
3415        stream->sm_send_headers_state = SSHS_ENC_SENDING;
3416        nw = 0;
3417    }
3418
3419    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
3420    stream_wantwrite(stream, 1);
3421
3422    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
3423    if (!stream->sm_header_block)
3424    {
3425        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
3426            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
3427        return -1;
3428    }
3429    memcpy(stream->sm_header_block, header_block + (size_t) nw,
3430                                                hblock_sz - (size_t) nw);
3431    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
3432    stream->sm_hblock_off = 0;
3433    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
3434    return 0;
3435}
3436
3437
3438static int
3439send_headers_gquic (struct lsquic_stream *stream,
3440                            const struct lsquic_http_headers *headers, int eos)
3441{
3442    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
3443                stream->id, headers, eos, lsquic_stream_priority(stream));
3444    if (0 == s)
3445    {
3446        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
3447        stream->stream_flags |= STREAM_HEADERS_SENT;
3448        if (eos)
3449            stream->stream_flags |= STREAM_FIN_SENT;
3450        LSQ_INFO("sent headers");
3451    }
3452    else
3453        LSQ_WARN("could not send headers: %s", strerror(errno));
3454    return s;
3455}
3456
3457
3458int
3459lsquic_stream_send_headers (lsquic_stream_t *stream,
3460                            const lsquic_http_headers_t *headers, int eos)
3461{
3462    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3463            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
3464    {
3465        if (stream->sm_bflags & SMBF_IETF)
3466            return send_headers_ietf(stream, headers, eos);
3467        else
3468            return send_headers_gquic(stream, headers, eos);
3469    }
3470    else
3471    {
3472        LSQ_INFO("cannot send headers in this state");
3473        errno = EBADMSG;
3474        return -1;
3475    }
3476}
3477
3478
3479void
3480lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
3481{
3482    if (offset > stream->max_send_off)
3483    {
3484        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
3485        LSQ_DEBUG("update max send offset from 0x%"PRIX64" to "
3486            "0x%"PRIX64, stream->max_send_off, offset);
3487        stream->max_send_off = offset;
3488    }
3489    else
3490        LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old "
3491            "max send offset 0x%"PRIX64", ignoring", offset,
3492            stream->max_send_off);
3493}
3494
3495
3496/* This function is used to update offsets after handshake completes and we
3497 * learn of peer's limits from the handshake values.
3498 */
3499int
3500lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
3501{
3502    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
3503    if (offset > stream->max_send_off)
3504    {
3505        lsquic_stream_window_update(stream, offset);
3506        return 0;
3507    }
3508    else if (offset < stream->tosend_off)
3509    {
3510        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
3511            "data already sent on this stream (%"PRIu64" bytes)", offset,
3512            stream->tosend_off);
3513        return -1;
3514    }
3515    else
3516    {
3517        stream->max_send_off = offset;
3518        return 0;
3519    }
3520}
3521
3522
3523void
3524lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
3525{
3526    lsquic_stream_reset_ext(stream, error_code, 1);
3527}
3528
3529
3530void
3531lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
3532                         int do_close)
3533{
3534    if ((stream->stream_flags & STREAM_RST_SENT)
3535                                    || (stream->sm_qflags & SMQF_SEND_RST))
3536    {
3537        LSQ_INFO("reset already sent");
3538        return;
3539    }
3540
3541    SM_HISTORY_APPEND(stream, SHE_RESET);
3542
3543    LSQ_INFO("reset, error code %"PRIu64, error_code);
3544    stream->error_code = error_code;
3545
3546    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
3547        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
3548                                                        next_send_stream);
3549    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
3550    stream->sm_qflags |= SMQF_SEND_RST;
3551
3552    if (stream->sm_qflags & SMQF_QPACK_DEC)
3553    {
3554        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
3555        stream->sm_qflags |= ~SMQF_QPACK_DEC;
3556    }
3557
3558    drop_buffered_data(stream);
3559    maybe_elide_stream_frames(stream);
3560    maybe_schedule_call_on_close(stream);
3561
3562    if (do_close)
3563        lsquic_stream_close(stream);
3564    else
3565        maybe_conn_to_tickable_if_writeable(stream, 1);
3566}
3567
3568
3569lsquic_stream_id_t
3570lsquic_stream_id (const lsquic_stream_t *stream)
3571{
3572    return stream->id;
3573}
3574
3575
3576#if !defined(NDEBUG) && __GNUC__
3577__attribute__((weak))
3578#endif
3579struct lsquic_conn *
3580lsquic_stream_conn (const lsquic_stream_t *stream)
3581{
3582    return stream->conn_pub->lconn;
3583}
3584
3585
3586int
3587lsquic_stream_close (lsquic_stream_t *stream)
3588{
3589    LSQ_DEBUG("lsquic_stream_close() called");
3590    SM_HISTORY_APPEND(stream, SHE_CLOSE);
3591    if (lsquic_stream_is_closed(stream))
3592    {
3593        LSQ_INFO("Attempt to close an already-closed stream");
3594        errno = EBADF;
3595        return -1;
3596    }
3597    maybe_stream_shutdown_write(stream);
3598    stream_shutdown_read(stream);
3599    maybe_schedule_call_on_close(stream);
3600    maybe_finish_stream(stream);
3601    if (!(stream->stream_flags & STREAM_DELAYED_SW))
3602        maybe_conn_to_tickable_if_writeable(stream, 1);
3603    return 0;
3604}
3605
3606
3607#ifndef NDEBUG
3608#if __GNUC__
3609__attribute__((weak))
3610#endif
3611#endif
3612void
3613lsquic_stream_acked (struct lsquic_stream *stream,
3614                                            enum quic_frame_type frame_type)
3615{
3616    assert(stream->n_unacked);
3617    --stream->n_unacked;
3618    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
3619    if (frame_type == QUIC_FRAME_RST_STREAM)
3620    {
3621        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
3622        LSQ_DEBUG("RESET that we sent has been acked by peer");
3623        stream->stream_flags |= STREAM_RST_ACKED;
3624    }
3625    if (0 == stream->n_unacked)
3626        maybe_finish_stream(stream);
3627}
3628
3629
3630void
3631lsquic_stream_push_req (lsquic_stream_t *stream,
3632                        struct uncompressed_headers *push_req)
3633{
3634    assert(!stream->push_req);
3635    stream->push_req = push_req;
3636    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
3637}
3638
3639
3640int
3641lsquic_stream_is_pushed (const lsquic_stream_t *stream)
3642{
3643    enum stream_id_type sit;
3644
3645    if (stream->sm_bflags & SMBF_IETF)
3646    {
3647        sit = stream->id & SIT_MASK;
3648        return sit == SIT_UNI_SERVER;
3649    }
3650    else
3651        return 1 & ~stream->id;
3652}
3653
3654
3655int
3656lsquic_stream_push_info (const lsquic_stream_t *stream,
3657                          lsquic_stream_id_t *ref_stream_id, void **hset)
3658{
3659    if (lsquic_stream_is_pushed(stream))
3660    {
3661        assert(stream->push_req);
3662        *ref_stream_id = stream->push_req->uh_stream_id;
3663        *hset          = stream->push_req->uh_hset;
3664        return 0;
3665    }
3666    else
3667        return -1;
3668}
3669
3670
3671int
3672lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
3673{
3674    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3675                                    && !(stream->stream_flags & STREAM_HAVE_UH))
3676    {
3677        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3678        LSQ_DEBUG("received uncompressed headers");
3679        stream->stream_flags |= STREAM_HAVE_UH;
3680        if (uh->uh_flags & UH_FIN)
3681        {
3682            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3683             * mark request as done:
3684             */
3685            if (stream->sm_bflags & SMBF_IETF)
3686                assert((stream->sm_bflags & SMBF_SERVER)
3687                                            && lsquic_stream_is_pushed(stream));
3688            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3689        }
3690        stream->uh = uh;
3691        if (uh->uh_oth_stream_id == 0)
3692        {
3693            if (uh->uh_weight)
3694                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3695        }
3696        else
3697            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3698                                                        uh->uh_oth_stream_id);
3699        return 0;
3700    }
3701    else
3702    {
3703        LSQ_ERROR("received unexpected uncompressed headers");
3704        return -1;
3705    }
3706}
3707
3708
3709unsigned
3710lsquic_stream_priority (const lsquic_stream_t *stream)
3711{
3712    return 256 - stream->sm_priority;
3713}
3714
3715
3716int
3717lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
3718{
3719    /* The user should never get a reference to the special streams,
3720     * but let's check just in case:
3721     */
3722    if (lsquic_stream_is_critical(stream))
3723        return -1;
3724    if (priority < 1 || priority > 256)
3725        return -1;
3726    stream->sm_priority = 256 - priority;
3727    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
3728    LSQ_DEBUG("set priority to %u", priority);
3729    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
3730    return 0;
3731}
3732
3733
3734static int
3735maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
3736{
3737    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3738                            && (stream->stream_flags & STREAM_HEADERS_SENT))
3739    {
3740        /* We need to send headers only if we are a) using HEADERS stream
3741         * and b) we already sent initial headers.  If initial headers
3742         * have not been sent yet, stream priority will be sent in the
3743         * HEADERS frame.
3744         */
3745        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
3746                                                stream->id, 0, 0, priority);
3747    }
3748    else
3749        return 0;
3750}
3751
3752
3753static int
3754send_priority_ietf (struct lsquic_stream *stream, unsigned priority)
3755{
3756    LSQ_WARN("%s: TODO", __func__);     /* TODO */
3757    return -1;
3758}
3759
3760
3761int
3762lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
3763{
3764    if (0 == lsquic_stream_set_priority_internal(stream, priority))
3765    {
3766        if (stream->sm_bflags & SMBF_IETF)
3767            return send_priority_ietf(stream, priority);
3768        else
3769            return maybe_send_priority_gquic(stream, priority);
3770    }
3771    else
3772        return -1;
3773}
3774
3775
3776lsquic_stream_ctx_t *
3777lsquic_stream_get_ctx (const lsquic_stream_t *stream)
3778{
3779    return stream->st_ctx;
3780}
3781
3782
3783int
3784lsquic_stream_refuse_push (lsquic_stream_t *stream)
3785{
3786    if (lsquic_stream_is_pushed(stream)
3787            && !(stream->sm_qflags & SMQF_SEND_RST)
3788            && !(stream->stream_flags & STREAM_RST_SENT))
3789    {
3790        LSQ_DEBUG("refusing pushed stream: send reset");
3791        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
3792        return 0;
3793    }
3794    else
3795        return -1;
3796}
3797
3798
3799size_t
3800lsquic_stream_mem_used (const struct lsquic_stream *stream)
3801{
3802    size_t size;
3803
3804    size = sizeof(stream);
3805    if (stream->sm_buf)
3806        size += stream->sm_n_allocated;
3807    if (stream->data_in)
3808        size += stream->data_in->di_if->di_mem_used(stream->data_in);
3809
3810    return size;
3811}
3812
3813
3814const lsquic_cid_t *
3815lsquic_stream_cid (const struct lsquic_stream *stream)
3816{
3817    return LSQUIC_LOG_CONN_ID;
3818}
3819
3820
3821void
3822lsquic_stream_dump_state (const struct lsquic_stream *stream)
3823{
3824    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
3825                                                    stream->read_offset);
3826    stream->data_in->di_if->di_dump_state(stream->data_in);
3827}
3828
3829
3830void *
3831lsquic_stream_get_hset (struct lsquic_stream *stream)
3832{
3833    void *hset;
3834
3835    if (lsquic_stream_is_reset(stream))
3836    {
3837        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
3838        errno = ECONNRESET;
3839        return NULL;
3840    }
3841
3842    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
3843                                && (stream->stream_flags & STREAM_HAVE_UH)))
3844    {
3845        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
3846                                                        stream->stream_flags);
3847        return NULL;
3848    }
3849
3850    if (!stream->uh)
3851    {
3852        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
3853        return NULL;
3854    }
3855
3856    if (stream->uh->uh_flags & UH_H1H)
3857    {
3858        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
3859        return NULL;
3860    }
3861
3862    hset = stream->uh->uh_hset;
3863    stream->uh->uh_hset = NULL;
3864    destroy_uh(stream);
3865    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
3866    {
3867        stream->stream_flags |= STREAM_FIN_REACHED;
3868        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
3869    }
3870    LSQ_DEBUG("return header set");
3871    return hset;
3872}
3873
3874
3875/* GQUIC-only function */
3876int
3877lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id)
3878{
3879    return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE
3880        || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http);
3881}
3882
3883
3884int
3885lsquic_stream_is_critical (const struct lsquic_stream *stream)
3886{
3887    return stream->sm_bflags & SMBF_CRITICAL;
3888}
3889
3890
3891void
3892lsquic_stream_set_stream_if (struct lsquic_stream *stream,
3893           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
3894{
3895    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
3896    stream->stream_if    = stream_if;
3897    stream->sm_onnew_arg = stream_if_ctx;
3898    LSQ_DEBUG("switched interface");
3899    assert(stream->stream_flags & STREAM_ONNEW_DONE);
3900    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
3901                                                      stream);
3902}
3903
3904
3905static int
3906update_type_hist_and_check (struct hq_filter *filter)
3907{
3908    /* 3-bit codes: */
3909    enum {
3910        CODE_UNSET,
3911        CODE_HEADER,    /* H    Header  */
3912        CODE_DATA,      /* D    Data    */
3913        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
3914    };
3915    static const unsigned valid_seqs[] = {
3916        /* Ordered by expected frequency */
3917        0123,   /* HD+  */
3918        012,    /* HD   */
3919        01,     /* H    */
3920        01231,  /* HD+H */
3921        0121,   /* HDH  */
3922    };
3923    unsigned code, i;
3924
3925    switch (filter->hqfi_type)
3926    {
3927    case HQFT_HEADERS:
3928        code = CODE_HEADER;
3929        break;
3930    case HQFT_DATA:
3931        code = CODE_DATA;
3932        break;
3933    default:
3934        /* Ignore unknown frames */
3935        return 0;
3936    }
3937
3938    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
3939        return -1;
3940
3941    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
3942    {
3943        filter->hqfi_hist_buf <<= 3;
3944        filter->hqfi_hist_buf |= CODE_PLUS;
3945        filter->hqfi_hist_idx++;
3946    }
3947    else if (filter->hqfi_hist_idx > 1
3948            && ((filter->hqfi_hist_buf >> 3) & 7) == code
3949            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
3950        /* Keep it at plus, do nothing */;
3951    else
3952    {
3953        filter->hqfi_hist_buf <<= 3;
3954        filter->hqfi_hist_buf |= code;
3955        filter->hqfi_hist_idx++;
3956    }
3957
3958    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
3959        if (filter->hqfi_hist_buf == valid_seqs[i])
3960            return 0;
3961
3962    return -1;
3963}
3964
3965
3966static size_t
3967hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
3968{
3969    struct lsquic_stream *const stream = ctx;
3970    struct hq_filter *const filter = &stream->sm_hq_filter;
3971    const unsigned char *p = buf, *prev;
3972    const unsigned char *const end = buf + sz;
3973    struct lsquic_conn *lconn;
3974    enum lsqpack_read_header_status rhs;
3975    int s;
3976
3977    while (p < end)
3978    {
3979        switch (filter->hqfi_state)
3980        {
3981        case HQFI_STATE_FRAME_HEADER_BEGIN:
3982            filter->hqfi_vint_state.vr2s_state = 0;
3983            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
3984            /* fall-through */
3985        case HQFI_STATE_FRAME_HEADER_CONTINUE:
3986            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state);
3987            if (s < 0)
3988                break;
3989            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
3990            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
3991            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
3992                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
3993                filter->hqfi_left);
3994            if (0 != update_type_hist_and_check(filter))
3995            {
3996                lconn = stream->conn_pub->lconn;
3997                filter->hqfi_flags |= HQFI_FLAG_ERROR;
3998                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
3999                    filter->hqfi_hist_buf);
4000                lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED,
4001                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
4002                    stream->id);
4003                goto end;
4004            }
4005            if (filter->hqfi_type == HQFT_HEADERS)
4006            {
4007                if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS))
4008                    filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS;
4009                else
4010                {
4011                    filter->hqfi_type = (1ull << 62) - 1;
4012                    LSQ_DEBUG("Ignoring HEADERS frame");
4013                }
4014            }
4015            if (filter->hqfi_left > 0)
4016            {
4017                if (filter->hqfi_type == HQFT_DATA)
4018                    goto end;
4019                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
4020                {
4021                    lconn = stream->conn_pub->lconn;
4022                    if (stream->sm_bflags & SMBF_SERVER)
4023                        lconn->cn_if->ci_abort_error(lconn, 1,
4024                            HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame "
4025                            "on stream %"PRIu64" (clients are not supposed to "
4026                            "send those)", stream->id);
4027                    else
4028                        /* Our client implementation does not support server
4029                         * push.
4030                         */
4031                        lconn->cn_if->ci_abort_error(lconn, 1,
4032                            HEC_FRAME_UNEXPECTED,
4033                            "Received PUSH_PROMISE frame (not supported)"
4034                            "on stream %"PRIu64, stream->id);
4035                    goto end;
4036                }
4037            }
4038            else
4039            {
4040                switch (filter->hqfi_type)
4041                {
4042                case HQFT_CANCEL_PUSH:
4043                case HQFT_GOAWAY:
4044                case HQFT_HEADERS:
4045                case HQFT_MAX_PUSH_ID:
4046                case HQFT_PUSH_PROMISE:
4047                case HQFT_SETTINGS:
4048                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4049                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
4050                                                            filter->hqfi_type);
4051                    abort_connection(stream);   /* XXX Overkill? */
4052                    goto end;
4053                default:
4054                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4055                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4056                    break;
4057                }
4058            }
4059            break;
4060        case HQFI_STATE_READING_PAYLOAD:
4061            if (filter->hqfi_type == HQFT_DATA)
4062                goto end;
4063            sz = filter->hqfi_left;
4064            if (sz > (uintptr_t) (end - p))
4065                sz = (uintptr_t) (end - p);
4066            switch (filter->hqfi_type)
4067            {
4068            case HQFT_HEADERS:
4069                prev = p;
4070                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
4071                {
4072                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4073                    rhs = lsquic_qdh_header_in_begin(
4074                                stream->conn_pub->u.ietf.qdh,
4075                                stream, filter->hqfi_left, &p, sz);
4076                }
4077                else
4078                    rhs = lsquic_qdh_header_in_continue(
4079                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
4080                assert(p > prev || LQRHS_ERROR == rhs);
4081                filter->hqfi_left -= p - prev;
4082                if (filter->hqfi_left == 0)
4083                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4084                switch (rhs)
4085                {
4086                case LQRHS_DONE:
4087                    assert(filter->hqfi_left == 0);
4088                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4089                    break;
4090                case LQRHS_NEED:
4091                    stream->sm_qflags |= SMQF_QPACK_DEC;
4092                    break;
4093                case LQRHS_BLOCKED:
4094                    stream->sm_qflags |= SMQF_QPACK_DEC;
4095                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
4096                    goto end;
4097                default:
4098                    assert(LQRHS_ERROR == rhs);
4099                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4100                    LSQ_INFO("error processing header block");
4101                    abort_connection(stream);   /* XXX Overkill? */
4102                    goto end;
4103                }
4104                break;
4105            default:
4106                /* Simply skip unknown frame type payload for now */
4107                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4108                p += sz;
4109                filter->hqfi_left -= sz;
4110                if (filter->hqfi_left == 0)
4111                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4112                break;
4113            }
4114            break;
4115        default:
4116            assert(0);
4117            goto end;
4118        }
4119    }
4120
4121  end:
4122    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4123    {
4124        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4125                                                        filter->hqfi_state);
4126        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4127/* From [draft-ietf-quic-http-16] Section 3.1:
4128 *               When a stream terminates cleanly, if the last frame on
4129 * the stream was truncated, this MUST be treated as a connection error
4130 * (see HTTP_MALFORMED_FRAME in Section 8.1).
4131 */
4132        abort_connection(stream);
4133    }
4134
4135    return p - buf;
4136}
4137
4138
4139static int
4140hq_filter_readable_now (const struct lsquic_stream *stream)
4141{
4142    const struct hq_filter *const filter = &stream->sm_hq_filter;
4143
4144    return (filter->hqfi_type == HQFT_DATA
4145                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4146        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4147        || stream->uh
4148        || (stream->stream_flags & STREAM_FIN_REACHED)
4149    ;
4150}
4151
4152
4153static int
4154hq_filter_readable (struct lsquic_stream *stream)
4155{
4156    struct hq_filter *const filter = &stream->sm_hq_filter;
4157    struct read_frames_status rfs;
4158
4159    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
4160        return 0;
4161
4162    if (!hq_filter_readable_now(stream))
4163    {
4164        rfs = read_data_frames(stream, 0, hq_read, stream);
4165        if (rfs.total_nread == 0)
4166        {
4167            if (rfs.error)
4168            {
4169                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4170                abort_connection(stream);   /* XXX Overkill? */
4171                return 1;   /* Collect error */
4172            }
4173            return 0;
4174        }
4175    }
4176
4177    return hq_filter_readable_now(stream);
4178}
4179
4180
4181static size_t
4182hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
4183{
4184    struct hq_filter *const filter = &stream->sm_hq_filter;
4185    size_t nr;
4186
4187    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4188                                            && filter->hqfi_type == HQFT_DATA))
4189    {
4190        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
4191                            data_frame->df_size - data_frame->df_read_off,
4192                            data_frame->df_fin);
4193        if (nr)
4194        {
4195            stream->read_offset += nr;
4196            stream_consumed_bytes(stream);
4197        }
4198    }
4199    else
4200        nr = 0;
4201
4202    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
4203    {
4204        data_frame->df_read_off += nr;
4205        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4206                                        && filter->hqfi_type == HQFT_DATA)
4207            return MIN(filter->hqfi_left,
4208                    (unsigned) data_frame->df_size - data_frame->df_read_off);
4209        else
4210        {
4211            assert(data_frame->df_read_off == data_frame->df_size);
4212            return 0;
4213        }
4214    }
4215    else
4216    {
4217        data_frame->df_read_off = data_frame->df_size;
4218        return 0;
4219    }
4220}
4221
4222
4223static void
4224hq_decr_left (struct lsquic_stream *stream, size_t read)
4225{
4226    struct hq_filter *const filter = &stream->sm_hq_filter;
4227
4228    if (read)
4229    {
4230        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4231                                            && filter->hqfi_type == HQFT_DATA);
4232        assert(read <= filter->hqfi_left);
4233    }
4234
4235    filter->hqfi_left -= read;
4236    if (0 == filter->hqfi_left)
4237        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4238}
4239
4240
4241struct qpack_dec_hdl *
4242lsquic_stream_get_qdh (const struct lsquic_stream *stream)
4243{
4244    return stream->conn_pub->u.ietf.qdh;
4245}
4246
4247
4248/* These are IETF QUIC states */
4249enum stream_state_sending
4250lsquic_stream_sending_state (const struct lsquic_stream *stream)
4251{
4252    if (0 == (stream->stream_flags & STREAM_RST_SENT))
4253    {
4254        if (stream->stream_flags & STREAM_FIN_SENT)
4255        {
4256            if (stream->n_unacked)
4257                return SSS_DATA_SENT;
4258            else
4259                return SSS_DATA_RECVD;
4260        }
4261        else
4262        {
4263            if (stream->tosend_off
4264                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
4265                return SSS_SEND;
4266            else
4267                return SSS_READY;
4268        }
4269    }
4270    else if (stream->stream_flags & STREAM_RST_ACKED)
4271        return SSS_RESET_RECVD;
4272    else
4273        return SSS_RESET_SENT;
4274}
4275
4276
4277const char *const lsquic_sss2str[] =
4278{
4279    [SSS_READY]        =  "Ready",
4280    [SSS_SEND]         =  "Send",
4281    [SSS_DATA_SENT]    =  "Data Sent",
4282    [SSS_RESET_SENT]   =  "Reset Sent",
4283    [SSS_DATA_RECVD]   =  "Data Recvd",
4284    [SSS_RESET_RECVD]  =  "Reset Recvd",
4285};
4286
4287
4288const char *const lsquic_ssr2str[] =
4289{
4290    [SSR_RECV]         =  "Recv",
4291    [SSR_SIZE_KNOWN]   =  "Size Known",
4292    [SSR_DATA_RECVD]   =  "Data Recvd",
4293    [SSR_RESET_RECVD]  =  "Reset Recvd",
4294    [SSR_DATA_READ]    =  "Data Read",
4295    [SSR_RESET_READ]   =  "Reset Read",
4296};
4297
4298
4299/* These are IETF QUIC states */
4300enum stream_state_receiving
4301lsquic_stream_receiving_state (struct lsquic_stream *stream)
4302{
4303    uint64_t n_bytes;
4304
4305    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
4306    {
4307        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
4308            return SSR_RECV;
4309        if (stream->stream_flags & STREAM_FIN_REACHED)
4310            return SSR_DATA_READ;
4311        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
4312        {
4313            n_bytes = stream->data_in->di_if->di_readable_bytes(
4314                                    stream->data_in, stream->read_offset);
4315            if (stream->read_offset + n_bytes == stream->sm_fin_off)
4316            {
4317                stream->stream_flags |= STREAM_DATA_RECVD;
4318                return SSR_DATA_RECVD;
4319            }
4320            else
4321                return SSR_SIZE_KNOWN;
4322        }
4323        else
4324            return SSR_DATA_RECVD;
4325    }
4326    else if (stream->stream_flags & STREAM_RST_READ)
4327        return SSR_RESET_READ;
4328    else
4329        return SSR_RESET_RECVD;
4330}
4331
4332
4333void
4334lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
4335{
4336    struct hq_filter *const filter = &stream->sm_hq_filter;
4337
4338    assert(stream->sm_qflags & SMQF_QPACK_DEC);
4339    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
4340
4341    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
4342    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
4343    LSQ_DEBUG("QPACK decoder unblocked");
4344}
4345
4346
4347int
4348lsquic_stream_is_rejected (const struct lsquic_stream *stream)
4349{
4350    return stream->stream_flags & STREAM_SS_RECVD;
4351}
4352
4353
4354int
4355lsquic_stream_can_push (const struct lsquic_stream *stream)
4356{
4357    if (lsquic_stream_is_pushed(stream))
4358        return 0;
4359    else if (stream->sm_bflags & SMBF_IETF)
4360        return (stream->sm_bflags & SMBF_USE_HEADERS)
4361            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
4362            && stream->sm_send_headers_state == SSHS_BEGIN
4363            ;
4364    else
4365        return 1;
4366}
4367
4368
4369static size_t
4370dp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4371{
4372    struct lsquic_stream *const stream = lsqr_ctx;
4373    unsigned char *dst = buf;
4374    unsigned char *const end = buf + count;
4375    size_t len;
4376
4377    len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off),
4378                                                        (size_t) (end - dst));
4379    memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len);
4380    stream->sm_dup_push_off += len;
4381
4382    if (stream->sm_dup_push_len == stream->sm_dup_push_off)
4383        LSQ_DEBUG("finish writing duplicate push");
4384
4385    return len;
4386}
4387
4388
4389static size_t
4390dp_reader_size (void *lsqr_ctx)
4391{
4392    struct lsquic_stream *const stream = lsqr_ctx;
4393
4394    return stream->sm_dup_push_len - stream->sm_dup_push_off;
4395}
4396
4397
4398static void
4399init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader)
4400{
4401    reader->lsqr_read = dp_reader_read;
4402    reader->lsqr_size = dp_reader_size;
4403    reader->lsqr_ctx = stream;
4404}
4405
4406
4407static void
4408on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4409{
4410    struct lsquic_reader dp_reader;
4411    ssize_t nw;
4412    int want_write;
4413
4414    assert(stream->sm_dup_push_off < stream->sm_dup_push_len);
4415
4416    init_dp_reader(stream, &dp_reader);
4417    nw = stream_write(stream, &dp_reader);
4418    if (nw > 0)
4419    {
4420        LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)",
4421            nw, stream->sm_dup_push_off == stream->sm_dup_push_len ?
4422            "done" : "not done");
4423        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4424        {
4425            /* Restore want_write flag */
4426            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4427            if (want_write != stream->sm_saved_want_write)
4428                (void) lsquic_stream_wantwrite(stream,
4429                                                stream->sm_saved_want_write);
4430        }
4431    }
4432    else if (nw < 0)
4433    {
4434        LSQ_WARN("could not write duplicate push (wrapper)");
4435        /* XXX What should happen if we hit an error? TODO */
4436    }
4437}
4438
4439
4440int
4441lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id)
4442{
4443    struct lsquic_reader dp_reader;
4444    unsigned bits, len;
4445    ssize_t nw;
4446
4447    assert(stream->sm_bflags & SMBF_IETF);
4448    assert(lsquic_stream_can_push(stream));
4449
4450    bits = vint_val2bits(push_id);
4451    len = 1 << bits;
4452
4453    if (!stream_activate_hq_frame(stream,
4454            stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH,
4455            SHF_FIXED_SIZE, len))
4456        return -1;
4457
4458    stream->stream_flags |= STREAM_PUSHING;
4459
4460    stream->sm_dup_push_len = len;
4461    stream->sm_dup_push_off = 0;
4462    vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits);
4463
4464    init_dp_reader(stream, &dp_reader);
4465    nw = stream_write(stream, &dp_reader);
4466    if (nw > 0)
4467    {
4468        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4469            LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id);
4470        else
4471        {
4472            LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id);
4473            stream->stream_flags |= STREAM_NOPUSH;
4474            stream->sm_saved_want_write =
4475                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4476            stream_wantwrite(stream, 1);
4477        }
4478        return 0;
4479    }
4480    else
4481    {
4482        if (nw < 0)
4483            LSQ_WARN("failure writing DUPLICATE_PUSH");
4484        stream->stream_flags |= STREAM_NOPUSH;
4485        stream->stream_flags &= ~STREAM_PUSHING;
4486        return -1;
4487    }
4488}
4489
4490
4491static size_t
4492pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4493{
4494    struct push_promise *const promise = lsqr_ctx;
4495    unsigned char *dst = buf;
4496    unsigned char *const end = buf + count;
4497    size_t len;
4498
4499    while (dst < end)
4500    {
4501        switch (promise->pp_write_state)
4502        {
4503        case PPWS_ID0:
4504        case PPWS_ID1:
4505        case PPWS_ID2:
4506        case PPWS_ID3:
4507        case PPWS_ID4:
4508        case PPWS_ID5:
4509        case PPWS_ID6:
4510        case PPWS_ID7:
4511            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
4512            ++promise->pp_write_state;
4513            break;
4514        case PPWS_PFX0:
4515            *dst++ = 0;
4516            ++promise->pp_write_state;
4517            break;
4518        case PPWS_PFX1:
4519            *dst++ = 0;
4520            ++promise->pp_write_state;
4521            break;
4522        case PPWS_HBLOCK:
4523            len = MIN(promise->pp_content_len - promise->pp_write_off,
4524                        (size_t) (end - dst));
4525            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
4526                                                                        len);
4527            promise->pp_write_off += len;
4528            dst += len;
4529            if (promise->pp_content_len == promise->pp_write_off)
4530            {
4531                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
4532                    ": reset push state", promise->pp_id);
4533                promise->pp_write_state = PPWS_DONE;
4534            }
4535            goto end;
4536        default:
4537            goto end;
4538        }
4539    }
4540
4541  end:
4542    return dst - (unsigned char *) buf;
4543}
4544
4545
4546static size_t
4547pp_reader_size (void *lsqr_ctx)
4548{
4549    struct push_promise *const promise = lsqr_ctx;
4550    size_t size;
4551
4552    size = 0;
4553    switch (promise->pp_write_state)
4554    {
4555    case PPWS_ID0:
4556    case PPWS_ID1:
4557    case PPWS_ID2:
4558    case PPWS_ID3:
4559    case PPWS_ID4:
4560    case PPWS_ID5:
4561    case PPWS_ID6:
4562    case PPWS_ID7:
4563        size += 8 - promise->pp_write_state;
4564    case PPWS_PFX0:
4565        ++size;
4566        /* fall-through */
4567    case PPWS_PFX1:
4568        ++size;
4569        /* fall-through */
4570    case PPWS_HBLOCK:
4571        size += promise->pp_content_len - promise->pp_write_off;
4572        break;
4573    default:
4574        break;
4575    }
4576
4577    return size;
4578}
4579
4580
4581static void
4582init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
4583{
4584    reader->lsqr_read = pp_reader_read;
4585    reader->lsqr_size = pp_reader_size;
4586    reader->lsqr_ctx = promise;
4587}
4588
4589
4590static void
4591on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4592{
4593    struct lsquic_reader pp_reader;
4594    struct push_promise *promise;
4595    ssize_t nw;
4596    int want_write;
4597
4598    assert(stream_is_pushing_promise(stream));
4599
4600    promise = SLIST_FIRST(&stream->sm_promises);
4601    init_pp_reader(promise, &pp_reader);
4602    nw = stream_write(stream, &pp_reader);
4603    if (nw > 0)
4604    {
4605        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
4606            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
4607        if (promise->pp_write_state == PPWS_DONE)
4608        {
4609            /* Restore want_write flag */
4610            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4611            if (want_write != stream->sm_saved_want_write)
4612                (void) lsquic_stream_wantwrite(stream,
4613                                                stream->sm_saved_want_write);
4614        }
4615    }
4616    else if (nw < 0)
4617    {
4618        LSQ_WARN("could not write push promise (wrapper)");
4619        /* XXX What should happen if we hit an error? TODO */
4620    }
4621}
4622
4623
4624/* Success means that the push promise has been placed on sm_promises list and
4625 * the stream now owns it.  Failure means that the push promise should be
4626 * destroyed by the caller.
4627 *
4628 * A push promise is written immediately.  If it cannot be written to packets
4629 * or buffered whole, the stream is marked as unable to push further promises.
4630 */
4631int
4632lsquic_stream_push_promise (struct lsquic_stream *stream,
4633                                                struct push_promise *promise)
4634{
4635    struct lsquic_reader pp_reader;
4636    unsigned bits, len;
4637    ssize_t nw;
4638
4639    assert(stream->sm_bflags & SMBF_IETF);
4640    assert(lsquic_stream_can_push(stream));
4641
4642    bits = vint_val2bits(promise->pp_id);
4643    len = 1 << bits;
4644    promise->pp_write_state = 8 - len;
4645    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
4646                                                            bits, 1 << bits);
4647
4648    if (!stream_activate_hq_frame(stream,
4649                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
4650                SHF_FIXED_SIZE, pp_reader_size(promise)))
4651        return -1;
4652
4653    stream->stream_flags |= STREAM_PUSHING;
4654
4655    init_pp_reader(promise, &pp_reader);
4656    nw = stream_write(stream, &pp_reader);
4657    if (nw > 0)
4658    {
4659        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
4660        ++promise->pp_refcnt;
4661        if (promise->pp_write_state == PPWS_DONE)
4662            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
4663        else
4664        {
4665            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
4666                ", disable further pushing", promise->pp_id,
4667                promise->pp_write_state, promise->pp_write_off);
4668            stream->stream_flags |= STREAM_NOPUSH;
4669            stream->sm_saved_want_write =
4670                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4671            stream_wantwrite(stream, 1);
4672        }
4673        return 0;
4674    }
4675    else
4676    {
4677        if (nw < 0)
4678            LSQ_WARN("failure writing push promise");
4679        stream->stream_flags |= STREAM_NOPUSH;
4680        stream->stream_flags &= ~STREAM_PUSHING;
4681        return -1;
4682    }
4683}
4684