lsquic_stream.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_varint.h"
39#include "lsquic_hq.h"
40#include "lsquic_hash.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn_public.h"
43#include "lsquic_util.h"
44#include "lsquic_mm.h"
45#include "lsquic_headers_stream.h"
46#include "lsquic_conn.h"
47#include "lsquic_data_in_if.h"
48#include "lsquic_parse.h"
49#include "lsquic_packet_out.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_senhist.h"
52#include "lsquic_pacer.h"
53#include "lsquic_cubic.h"
54#include "lsquic_bw_sampler.h"
55#include "lsquic_minmax.h"
56#include "lsquic_bbr.h"
57#include "lsquic_send_ctl.h"
58#include "lsquic_headers.h"
59#include "lsquic_ev_log.h"
60#include "lsquic_enc_sess.h"
61#include "lsqpack.h"
62#include "lsquic_frab_list.h"
63#include "lsquic_http1x_if.h"
64#include "lsquic_qdec_hdl.h"
65#include "lsquic_qenc_hdl.h"
66#include "lsquic_byteswap.h"
67#include "lsquic_ietf.h"
68#include "lsquic_push_promise.h"
69
70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
72#define LSQUIC_LOG_STREAM_ID stream->id
73#include "lsquic_logger.h"
74
75#define MIN(a, b) ((a) < (b) ? (a) : (b))
76
77static void
78drop_frames_in (lsquic_stream_t *stream);
79
80static void
81maybe_schedule_call_on_close (lsquic_stream_t *stream);
82
83static int
84stream_wantread (lsquic_stream_t *stream, int is_want);
85
86static int
87stream_wantwrite (lsquic_stream_t *stream, int is_want);
88
89static ssize_t
90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
91
92static ssize_t
93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
94
95static int
96stream_flush (lsquic_stream_t *stream);
97
98static int
99stream_flush_nocheck (lsquic_stream_t *stream);
100
101static void
102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
103
104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
105
106static enum swtp_status
107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
108
109static enum swtp_status
110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
111
112static enum swtp_status
113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
114
115static size_t
116stream_write_avail_no_frames (struct lsquic_stream *);
117
118static size_t
119stream_write_avail_with_frames (struct lsquic_stream *);
120
121static size_t
122stream_write_avail_with_headers (struct lsquic_stream *);
123
124static int
125hq_filter_readable (struct lsquic_stream *stream);
126
127static void
128hq_decr_left (struct lsquic_stream *stream, size_t);
129
130static size_t
131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
132
133static int
134stream_readable_non_http (struct lsquic_stream *stream);
135
136static int
137stream_readable_http_gquic (struct lsquic_stream *stream);
138
139static int
140stream_readable_http_ietf (struct lsquic_stream *stream);
141
142static ssize_t
143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
144
145static size_t
146active_hq_frame_sizes (const struct lsquic_stream *);
147
148static void
149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
150
151static void
152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
153
154static void
155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
156
157static size_t
158stream_hq_frame_size (const struct stream_hq_frame *);
159
160const struct stream_filter_if hq_stream_filter_if =
161{
162    .sfi_readable   = hq_filter_readable,
163    .sfi_filter_df  = hq_filter_df,
164    .sfi_decr_left  = hq_decr_left,
165};
166
167
168#if LSQUIC_KEEP_STREAM_HISTORY
169/* These values are printable ASCII characters for ease of printing the
170 * whole history in a single line of a log message.
171 *
172 * The list of events is not exhaustive: only most interesting events
173 * are recorded.
174 */
175enum stream_history_event
176{
177    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
178    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
179    SHE_REACH_FIN          =  'a',
180    SHE_BLOCKED_OUT        =  'b',
181    SHE_CREATED            =  'C',
182    SHE_FRAME_IN           =  'd',
183    SHE_FRAME_OUT          =  'D',
184    SHE_RESET              =  'e',
185    SHE_WINDOW_UPDATE      =  'E',
186    SHE_FIN_IN             =  'f',
187    SHE_FINISHED           =  'F',
188    SHE_GOAWAY_IN          =  'g',
189    SHE_USER_WRITE_HEADER  =  'h',
190    SHE_HEADERS_IN         =  'H',
191    SHE_IF_SWITCH          =  'i',
192    SHE_ONCLOSE_SCHED      =  'l',
193    SHE_ONCLOSE_CALL       =  'L',
194    SHE_ONNEW              =  'N',
195    SHE_SET_PRIO           =  'p',
196    SHE_USER_READ          =  'r',
197    SHE_SHUTDOWN_READ      =  'R',
198    SHE_RST_IN             =  's',
199    SHE_SS_IN              =  'S',
200    SHE_RST_OUT            =  't',
201    SHE_RST_ACKED          =  'T',
202    SHE_FLUSH              =  'u',
203    SHE_USER_WRITE_DATA    =  'w',
204    SHE_SHUTDOWN_WRITE     =  'W',
205    SHE_CLOSE              =  'X',
206    SHE_DELAY_SW           =  'y',
207    SHE_FORCE_FINISH       =  'Z',
208    SHE_WANTREAD_NO        =  '0',  /* "YES" must be one more than "NO" */
209    SHE_WANTREAD_YES       =  '1',
210    SHE_WANTWRITE_NO       =  '2',
211    SHE_WANTWRITE_YES      =  '3',
212};
213
214static void
215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
216{
217    enum stream_history_event prev_event;
218    sm_hist_idx_t idx;
219    int plus;
220
221    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
222    plus = SHE_PLUS == stream->sm_hist_buf[idx];
223    idx = (idx - plus) & SM_HIST_IDX_MASK;
224    prev_event = stream->sm_hist_buf[idx];
225
226    if (prev_event == sh_event && plus)
227        return;
228
229    if (prev_event == sh_event)
230        sh_event = SHE_PLUS;
231    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
232
233    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
234        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
235                                                        stream->sm_hist_buf);
236}
237
238
239#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
240#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
241        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
242            LSQ_DEBUG("history: [%.*s]",                                    \
243                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
244                (stream)->sm_hist_buf);                                     \
245    } while (0)
246#else
247#   define SM_HISTORY_APPEND(stream, event)
248#   define SM_HISTORY_DUMP_REMAINING(stream)
249#endif
250
251
252static int
253stream_inside_callback (const lsquic_stream_t *stream)
254{
255    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
256}
257
258
259static void
260maybe_conn_to_tickable (lsquic_stream_t *stream)
261{
262    if (!stream_inside_callback(stream))
263        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
264                                           stream->conn_pub->lconn);
265}
266
267
268/* Here, "readable" means that the user is able to read from the stream. */
269static void
270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
271{
272    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
273    {
274        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
275                                           stream->conn_pub->lconn);
276    }
277}
278
279
280/* Here, "writeable" means that data can be put into packets to be
281 * scheduled to be sent out.
282 *
283 * If `check_can_send' is false, it means that we do not need to check
284 * whether packets can be sent.  This check was already performed when
285 * we packetized stream data.
286 */
287static void
288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
289                                                    int check_can_send)
290{
291    if (!stream_inside_callback(stream) &&
292            (!check_can_send
293             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
294          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
295    {
296        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
297                                           stream->conn_pub->lconn);
298    }
299}
300
301
302static int
303stream_stalled (const lsquic_stream_t *stream)
304{
305    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
306           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
307                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
308}
309
310
311static size_t
312stream_stream_frame_header_sz (const struct lsquic_stream *stream,
313                                                            unsigned data_sz)
314{
315    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
316                                    stream->id, stream->tosend_off, data_sz);
317}
318
319
320static size_t
321stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
322                                                    unsigned data_sz_IGNORED)
323{
324    return stream->conn_pub->lconn->cn_pf
325         ->pf_calc_crypto_frame_header_sz(stream->tosend_off);
326}
327
328
329/* GQUIC-only function */
330static int
331stream_is_hsk (const struct lsquic_stream *stream)
332{
333    if (stream->sm_bflags & SMBF_IETF)
334        return 0;
335    else
336        return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE;
337}
338
339
340static struct lsquic_stream *
341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
342           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
343           enum stream_ctor_flags ctor_flags)
344{
345    struct lsquic_stream *stream;
346
347    stream = calloc(1, sizeof(*stream));
348    if (!stream)
349        return NULL;
350
351    if (ctor_flags & SCF_USE_DI_HASH)
352        stream->data_in = data_in_hash_new(conn_pub, id, 0);
353    else
354        stream->data_in = data_in_nocopy_new(conn_pub, id);
355    if (!stream->data_in)
356    {
357        free(stream);
358        return NULL;
359    }
360
361    stream->id        = id;
362    stream->stream_if = stream_if;
363    stream->conn_pub  = conn_pub;
364    stream->sm_onnew_arg = stream_if_ctx;
365    stream->sm_write_avail = stream_write_avail_no_frames;
366
367    STAILQ_INIT(&stream->sm_hq_frames);
368
369    stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1);
370    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
371        stream->sm_bflags |= SMBF_SERVER;
372
373    return stream;
374}
375
376
377/* TODO: The logic to figure out whether the stream is connection limited
378 * should be taken out of the constructor.  The caller should specify this
379 * via one of enum stream_ctor_flags.
380 */
381lsquic_stream_t *
382lsquic_stream_new (lsquic_stream_id_t id,
383        struct lsquic_conn_public *conn_pub,
384        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
385        unsigned initial_window, uint64_t initial_send_off,
386        enum stream_ctor_flags ctor_flags)
387{
388    lsquic_cfcw_t *cfcw;
389    lsquic_stream_t *stream;
390
391    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
392                                                                ctor_flags);
393    if (!stream)
394        return NULL;
395
396    if (!initial_window)
397        initial_window = 16 * 1024;
398
399    if (ctor_flags & SCF_IETF)
400    {
401        cfcw = &conn_pub->cfcw;
402        stream->sm_bflags |= SMBF_CONN_LIMITED;
403        if (ctor_flags & SCF_HTTP)
404        {
405            stream->sm_write_avail = stream_write_avail_with_headers;
406            stream->sm_readable = stream_readable_http_ietf;
407            stream->sm_sfi = &hq_stream_filter_if;
408        }
409        else
410            stream->sm_readable = stream_readable_non_http;
411        lsquic_stream_set_priority_internal(stream,
412                                            LSQUIC_STREAM_DEFAULT_PRIO);
413        stream->sm_write_to_packet = stream_write_to_packet_std;
414    }
415    else
416    {
417        if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id))
418            cfcw = NULL;
419        else
420        {
421            cfcw = &conn_pub->cfcw;
422            stream->sm_bflags |= SMBF_CONN_LIMITED;
423            lsquic_stream_set_priority_internal(stream,
424                                                LSQUIC_STREAM_DEFAULT_PRIO);
425        }
426        if (stream->sm_bflags & SMBF_USE_HEADERS)
427            stream->sm_readable = stream_readable_http_gquic;
428        else
429            stream->sm_readable = stream_readable_non_http;
430        if (stream_is_hsk(stream))
431            stream->sm_write_to_packet = stream_write_to_packet_hsk;
432        else
433            stream->sm_write_to_packet = stream_write_to_packet_std;
434    }
435
436    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
437    stream->max_send_off = initial_send_off;
438    LSQ_DEBUG("created stream");
439    SM_HISTORY_APPEND(stream, SHE_CREATED);
440    stream->sm_frame_header_sz = stream_stream_frame_header_sz;
441    if (ctor_flags & SCF_CALL_ON_NEW)
442        lsquic_stream_call_on_new(stream);
443    return stream;
444}
445
446
447struct lsquic_stream *
448lsquic_stream_new_crypto (enum enc_level enc_level,
449        struct lsquic_conn_public *conn_pub,
450        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
451        enum stream_ctor_flags ctor_flags)
452{
453    struct lsquic_stream *stream;
454    lsquic_stream_id_t stream_id;
455
456    assert(ctor_flags & SCF_CRITICAL);
457
458    stream_id = ~0ULL - enc_level;
459    stream = stream_new_common(stream_id, conn_pub, stream_if,
460                                                stream_if_ctx, ctor_flags);
461    if (!stream)
462        return NULL;
463
464    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
465    stream->sm_enc_level = enc_level;
466    /* TODO: why have limit in crypto stream?  Set it to UINT64_MAX? */
467    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
468    stream->max_send_off = 16 * 1024;
469    LSQ_DEBUG("created crypto stream");
470    SM_HISTORY_APPEND(stream, SHE_CREATED);
471    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
472    stream->sm_write_to_packet = stream_write_to_packet_crypto;
473    stream->sm_readable = stream_readable_non_http;
474    if (ctor_flags & SCF_CALL_ON_NEW)
475        lsquic_stream_call_on_new(stream);
476    return stream;
477}
478
479
480void
481lsquic_stream_call_on_new (lsquic_stream_t *stream)
482{
483    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
484    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
485    {
486        LSQ_DEBUG("calling on_new_stream");
487        SM_HISTORY_APPEND(stream, SHE_ONNEW);
488        stream->stream_flags |= STREAM_ONNEW_DONE;
489        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
490                                                          stream);
491    }
492}
493
494
495static void
496decr_conn_cap (struct lsquic_stream *stream, size_t incr)
497{
498    if (stream->sm_bflags & SMBF_CONN_LIMITED)
499    {
500        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
501        stream->conn_pub->conn_cap.cc_sent -= incr;
502    }
503}
504
505
506static void
507maybe_resize_stream_buffer (struct lsquic_stream *stream)
508{
509    assert(0 == stream->sm_n_buffered);
510
511    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
512    {
513        free(stream->sm_buf);
514        stream->sm_buf = NULL;
515        stream->sm_n_allocated = 0;
516    }
517    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
518        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
519}
520
521
522static void
523drop_buffered_data (struct lsquic_stream *stream)
524{
525    decr_conn_cap(stream, stream->sm_n_buffered);
526    stream->sm_n_buffered = 0;
527    maybe_resize_stream_buffer(stream);
528    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
529        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
530}
531
532
533static void
534destroy_uh (struct lsquic_stream *stream)
535{
536    if (stream->uh)
537    {
538        if (stream->uh->uh_hset)
539            stream->conn_pub->enpub->enp_hsi_if
540                            ->hsi_discard_header_set(stream->uh->uh_hset);
541        free(stream->uh);
542        stream->uh = NULL;
543    }
544}
545
546
547void
548lsquic_stream_destroy (lsquic_stream_t *stream)
549{
550    struct push_promise *promise;
551    struct stream_hq_frame *shf;
552
553    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
554    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
555                                                            STREAM_ONNEW_DONE)
556    {
557        stream->stream_flags |= STREAM_ONCLOSE_DONE;
558        stream->stream_if->on_close(stream, stream->st_ctx);
559    }
560    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
561        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
562    if (stream->sm_qflags & SMQF_WANT_READ)
563        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
564    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
565        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
566    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
567        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
568    if (stream->sm_qflags & SMQF_QPACK_DEC)
569        lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream);
570    drop_buffered_data(stream);
571    lsquic_sfcw_consume_rem(&stream->fc);
572    drop_frames_in(stream);
573    if (stream->push_req)
574    {
575        if (stream->push_req->uh_hset)
576            stream->conn_pub->enpub->enp_hsi_if
577                            ->hsi_discard_header_set(stream->push_req->uh_hset);
578        free(stream->push_req);
579    }
580    while ((promise = SLIST_FIRST(&stream->sm_promises)))
581    {
582        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
583        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
584    }
585    if (stream->sm_promise)
586    {
587        assert(stream->sm_promise->pp_pushed_stream == stream);
588        stream->sm_promise->pp_pushed_stream = NULL;
589        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
590    }
591    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
592        stream_hq_frame_put(stream, shf);
593    destroy_uh(stream);
594    free(stream->sm_buf);
595    free(stream->sm_header_block);
596    LSQ_DEBUG("destroyed stream");
597    SM_HISTORY_DUMP_REMAINING(stream);
598    free(stream);
599}
600
601
602static int
603stream_is_finished (const lsquic_stream_t *stream)
604{
605    return lsquic_stream_is_closed(stream)
606           /* n_unacked checks that no outgoing packets that reference this
607            * stream are outstanding:
608            */
609        && 0 == stream->n_unacked
610           /* This checks that no packets that reference this stream will
611            * become outstanding:
612            */
613        && 0 == (stream->sm_qflags & SMQF_SEND_RST)
614        && ((stream->stream_flags & STREAM_FORCE_FINISH)
615          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
616}
617
618
619/* This is an internal function */
620void
621lsquic_stream_force_finish (struct lsquic_stream *stream)
622{
623    LSQ_DEBUG("stream is now finished");
624    SM_HISTORY_APPEND(stream, SHE_FINISHED);
625    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
626        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
627                                                next_service_stream);
628    stream->sm_qflags |= SMQF_FREE_STREAM;
629    stream->stream_flags |= STREAM_FINISHED;
630}
631
632
633static void
634maybe_finish_stream (lsquic_stream_t *stream)
635{
636    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
637                                                    stream_is_finished(stream))
638        lsquic_stream_force_finish(stream);
639}
640
641
642static void
643maybe_schedule_call_on_close (lsquic_stream_t *stream)
644{
645    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
646                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
647            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
648            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
649    {
650        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
651            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
652                                                    next_service_stream);
653        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
654        LSQ_DEBUG("scheduled calling on_close");
655        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
656    }
657}
658
659
660void
661lsquic_stream_call_on_close (lsquic_stream_t *stream)
662{
663    assert(stream->stream_flags & STREAM_ONNEW_DONE);
664    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
665    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
666        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
667                                                    next_service_stream);
668    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
669    {
670        LSQ_DEBUG("calling on_close");
671        stream->stream_flags |= STREAM_ONCLOSE_DONE;
672        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
673        stream->stream_if->on_close(stream, stream->st_ctx);
674    }
675    else
676        assert(0);
677}
678
679
680static int
681stream_has_frame_at_read_offset (struct lsquic_stream *stream)
682{
683    if (!((stream->stream_flags & STREAM_CACHED_FRAME)
684                    && stream->read_offset == stream->sm_last_frame_off))
685    {
686        stream->sm_has_frame = stream->data_in->di_if->di_get_frame(
687                                stream->data_in, stream->read_offset) != NULL;
688        stream->sm_last_frame_off = stream->read_offset;
689        stream->stream_flags |= STREAM_CACHED_FRAME;
690    }
691    return stream->sm_has_frame;
692}
693
694
695static int
696stream_readable_non_http (struct lsquic_stream *stream)
697{
698    return stream_has_frame_at_read_offset(stream);
699}
700
701
702static int
703stream_readable_http_gquic (struct lsquic_stream *stream)
704{
705    return (stream->stream_flags & STREAM_HAVE_UH)
706        && (stream->uh
707            || stream_has_frame_at_read_offset(stream));
708}
709
710
711static int
712stream_readable_http_ietf (struct lsquic_stream *stream)
713{
714    return
715        /* If we have read the header set and the header set has not yet
716         * been read, the stream is readable.
717         */
718        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
719        ||
720        /* Alternatively, run the filter and check for payload availability. */
721        (stream->sm_sfi->sfi_readable(stream)
722            && (/* Running the filter may result in hitting FIN: */
723                (stream->stream_flags & STREAM_FIN_REACHED)
724                || stream_has_frame_at_read_offset(stream)));
725}
726
727
728int
729lsquic_stream_readable (struct lsquic_stream *stream)
730{
731    /* A stream is readable if one of the following is true: */
732    return
733        /* - It is already finished: in that case, lsquic_stream_read() will
734         *   return 0.
735         */
736            (stream->stream_flags & STREAM_FIN_REACHED)
737        /* - The stream is reset, by either side.  In this case,
738         *   lsquic_stream_read() will return -1 (we want the user to be
739         *   able to collect the error).
740         */
741        ||  lsquic_stream_is_reset(stream)
742        /* Type-dependent readability check: */
743        ||  stream->sm_readable(stream);
744    ;
745}
746
747
748static size_t
749stream_write_avail_no_frames (struct lsquic_stream *stream)
750{
751    uint64_t stream_avail, conn_avail;
752
753    stream_avail = stream->max_send_off - stream->tosend_off
754                                                - stream->sm_n_buffered;
755
756    if (stream->sm_bflags & SMBF_CONN_LIMITED)
757    {
758        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
759        if (conn_avail < stream_avail)
760            stream_avail = conn_avail;
761    }
762
763    return stream_avail;
764}
765
766
767static size_t
768stream_write_avail_with_frames (struct lsquic_stream *stream)
769{
770    uint64_t stream_avail, conn_avail;
771    const struct stream_hq_frame *shf;
772    size_t size;
773
774    stream_avail = stream->max_send_off - stream->tosend_off
775                                                - stream->sm_n_buffered;
776    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
777        if (!(shf->shf_flags & SHF_WRITTEN))
778        {
779            size = stream_hq_frame_size(shf);
780            assert(size <= stream_avail);
781            stream_avail -= size;
782        }
783
784    if (stream->sm_bflags & SMBF_CONN_LIMITED)
785    {
786        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
787        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
788            if (!(shf->shf_flags & SHF_CC_PAID))
789            {
790                size = stream_hq_frame_size(shf);
791                if (size < conn_avail)
792                    conn_avail -= size;
793                else
794                    return 0;
795            }
796        if (conn_avail < stream_avail)
797            stream_avail = conn_avail;
798    }
799
800    if (stream_avail >= 3 /* Smallest new frame */)
801        return stream_avail;
802    else
803        return 0;
804}
805
806
807static int
808stream_is_pushing_promise (const struct lsquic_stream *stream)
809{
810    return (stream->stream_flags & STREAM_PUSHING)
811        && SLIST_FIRST(&stream->sm_promises)
812        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
813        ;
814}
815
816
817/* To prevent deadlocks, ensure that when headers are sent, the bytes
818 * sent on the encoder stream are written first.
819 *
820 * XXX If the encoder is set up in non-risking mode, it is perfectly
821 * fine to send the header block first.  TODO: update the logic to
822 * reflect this.  There should be two sending behaviors: risk and non-risk.
823 * For now, we assume risk for everything to be on the conservative side.
824 */
825static size_t
826stream_write_avail_with_headers (struct lsquic_stream *stream)
827{
828    if (stream->stream_flags & STREAM_PUSHING)
829        return stream_write_avail_with_frames(stream);
830
831    switch (stream->sm_send_headers_state)
832    {
833    case SSHS_BEGIN:
834        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
835    case SSHS_ENC_SENDING:
836        if (stream->sm_hb_compl >
837                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
838            return 0;
839        LSQ_DEBUG("encoder stream bytes have all been sent");
840        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
841        /* fall-through */
842    default:
843        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
844        return stream_write_avail_with_frames(stream);
845    }
846}
847
848
849size_t
850lsquic_stream_write_avail (struct lsquic_stream *stream)
851{
852    return stream->sm_write_avail(stream);
853}
854
855
856int
857lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
858{
859    struct lsquic_conn *lconn;
860
861    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
862                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
863    {
864        if (stream->sm_bflags & SMBF_IETF)
865        {
866            lconn = stream->conn_pub->lconn;
867            lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
868                "flow control violation on stream %"PRIu64, stream->id);
869        }
870        return -1;
871    }
872    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
873    {
874        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
875            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
876                                                    next_send_stream);
877        stream->sm_qflags |= SMQF_SEND_WUF;
878    }
879    return 0;
880}
881
882
883int
884lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
885{
886    uint64_t max_off;
887    int got_next_offset, rv, free_frame;
888    enum ins_frame ins_frame;
889    struct lsquic_conn *lconn;
890
891    assert(frame->packet_in);
892
893    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
894    LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; "
895        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
896
897    if ((stream->sm_bflags & SMBF_USE_HEADERS)
898                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
899    {
900        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
901        lsquic_malo_put(frame);
902        return -1;
903    }
904
905    if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF)
906            && (stream->stream_flags & STREAM_FIN_RECVD)
907            && stream->sm_fin_off != DF_END(frame))
908    {
909        lconn = stream->conn_pub->lconn;
910        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
911            "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does "
912            "not match previous final size %"PRIu64, DF_END(frame),
913            stream->id, stream->sm_fin_off);
914        return -1;
915    }
916
917    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
918  insert_frame:
919    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
920    if (INS_FRAME_OK == ins_frame)
921    {
922        /* Update maximum offset in the flow controller and check for flow
923         * control violation:
924         */
925        rv = -1;
926        free_frame = !stream->data_in->di_if->di_own_on_ok;
927        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
928        if (0 != lsquic_stream_update_sfcw(stream, max_off))
929            goto end_ok;
930        if (frame->data_frame.df_fin)
931        {
932            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
933            stream->stream_flags |= STREAM_FIN_RECVD;
934            stream->sm_fin_off = DF_END(frame);
935            maybe_finish_stream(stream);
936        }
937        if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
938                (stream->data_in->di_flags & DI_SWITCH_IMPL))
939        {
940            stream->data_in = stream->data_in->di_if->di_switch_impl(
941                                        stream->data_in, stream->read_offset);
942            if (!stream->data_in)
943            {
944                stream->data_in = data_in_error_new();
945                goto end_ok;
946            }
947        }
948        if (got_next_offset)
949            /* Checking the offset saves di_get_frame() call */
950            maybe_conn_to_tickable_if_readable(stream);
951        rv = 0;
952  end_ok:
953        if (free_frame)
954            lsquic_malo_put(frame);
955        stream->stream_flags &= ~STREAM_CACHED_FRAME;
956        return rv;
957    }
958    else if (INS_FRAME_DUP == ins_frame)
959    {
960        return 0;
961    }
962    else if (INS_FRAME_OVERLAP == ins_frame)
963    {
964        LSQ_DEBUG("overlap: switching DATA IN implementation");
965        stream->data_in = stream->data_in->di_if->di_switch_impl(
966                                    stream->data_in, stream->read_offset);
967        if (stream->data_in)
968            goto insert_frame;
969        stream->data_in = data_in_error_new();
970        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
971        lsquic_malo_put(frame);
972        return -1;
973    }
974    else
975    {
976        assert(INS_FRAME_ERR == ins_frame);
977        return -1;
978    }
979}
980
981
982static void
983drop_frames_in (lsquic_stream_t *stream)
984{
985    if (stream->data_in)
986    {
987        stream->data_in->di_if->di_destroy(stream->data_in);
988        /* To avoid checking whether `data_in` is set, just set to the error
989         * data-in stream.  It does the right thing after incoming data is
990         * dropped.
991         */
992        stream->data_in = data_in_error_new();
993        stream->stream_flags &= ~STREAM_CACHED_FRAME;
994    }
995}
996
997
998static void
999maybe_elide_stream_frames (struct lsquic_stream *stream)
1000{
1001    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
1002    {
1003        if (stream->n_unacked)
1004            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
1005                                                stream->id);
1006        stream->stream_flags |= STREAM_FRAMES_ELIDED;
1007    }
1008}
1009
1010
1011int
1012lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
1013                      uint64_t error_code)
1014{
1015    struct lsquic_conn *lconn;
1016
1017    if ((stream->sm_bflags & SMBF_IETF)
1018            && (stream->stream_flags & STREAM_FIN_RECVD)
1019            && stream->sm_fin_off != offset)
1020    {
1021        lconn = stream->conn_pub->lconn;
1022        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1023            "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") "
1024            "does not match previous final size %"PRIu64, offset,
1025            stream->id, stream->sm_fin_off);
1026        return -1;
1027    }
1028
1029    if (stream->stream_flags & STREAM_RST_RECVD)
1030    {
1031        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
1032        return 0;
1033    }
1034
1035    SM_HISTORY_APPEND(stream, SHE_RST_IN);
1036    /* This flag must always be set, even if we are "ignoring" it: it is
1037     * used by elision code.
1038     */
1039    stream->stream_flags |= STREAM_RST_RECVD;
1040
1041    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
1042    {
1043        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is "
1044            "smaller than that of byte following the last byte we have seen: "
1045            "0x%"PRIX64, offset,
1046            lsquic_sfcw_get_max_recv_off(&stream->fc));
1047        return -1;
1048    }
1049
1050    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
1051    {
1052        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64
1053            " violates flow control", offset);
1054        return -1;
1055    }
1056
1057    /* Let user collect error: */
1058    maybe_conn_to_tickable_if_readable(stream);
1059
1060    lsquic_sfcw_consume_rem(&stream->fc);
1061    drop_frames_in(stream);
1062    drop_buffered_data(stream);
1063    maybe_elide_stream_frames(stream);
1064
1065    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1066                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1067        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1068
1069    stream->stream_flags |= STREAM_RST_RECVD;
1070
1071    maybe_finish_stream(stream);
1072    maybe_schedule_call_on_close(stream);
1073
1074    return 0;
1075}
1076
1077
1078void
1079lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1080                                                        uint64_t error_code)
1081{
1082    if (stream->stream_flags & STREAM_SS_RECVD)
1083    {
1084        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1085        return;
1086    }
1087
1088    SM_HISTORY_APPEND(stream, SHE_SS_IN);
1089    stream->stream_flags |= STREAM_SS_RECVD;
1090
1091    /* Let user collect error: */
1092    maybe_conn_to_tickable_if_readable(stream);
1093
1094    lsquic_sfcw_consume_rem(&stream->fc);
1095    drop_frames_in(stream);
1096    drop_buffered_data(stream);
1097    maybe_elide_stream_frames(stream);
1098
1099    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1100                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1101        lsquic_stream_reset_ext(stream, error_code, 0);
1102
1103    maybe_finish_stream(stream);
1104    maybe_schedule_call_on_close(stream);
1105}
1106
1107
1108uint64_t
1109lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1110{
1111    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1112}
1113
1114
1115void
1116lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1117{
1118    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1119    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1120    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1121        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1122    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1123}
1124
1125
1126uint64_t
1127lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1128{
1129    assert(stream->sm_qflags & SMQF_SEND_WUF);
1130    stream->sm_qflags &= ~SMQF_SEND_WUF;
1131    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1132        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1133    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1134}
1135
1136
1137void
1138lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1139{
1140    uint64_t last_off;
1141
1142    if (stream->sm_last_recv_off)
1143        last_off = stream->sm_last_recv_off;
1144    else
1145        /* This gets advertized in transport parameters */
1146        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1147
1148    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1149        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1150
1151    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1152    {
1153        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1154            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1155                                                    next_send_stream);
1156        stream->sm_qflags |= SMQF_SEND_WUF;
1157        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1158    }
1159    else if (stream->sm_qflags & SMQF_SEND_WUF)
1160        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1161    else if (stream->sm_last_recv_off)
1162        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1163            "packetized or sent", stream->sm_last_recv_off);
1164    else
1165        LSQ_INFO("Peer should have receive transport param limit "
1166            "of %"PRIu64"; odd.", last_off);
1167}
1168
1169
1170/* GQUIC's BLOCKED frame does not have an offset */
1171void
1172lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1173{
1174    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1175    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1176    {
1177        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1178            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1179                                                    next_send_stream);
1180        stream->sm_qflags |= SMQF_SEND_WUF;
1181        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1182    }
1183    else
1184        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1185}
1186
1187
1188void
1189lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1190{
1191    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1192    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1193    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1194    stream->stream_flags |= STREAM_BLOCKED_SENT;
1195    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1196        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1197}
1198
1199
1200void
1201lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1202{
1203    assert(stream->sm_qflags & SMQF_SEND_RST);
1204    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1205    stream->sm_qflags &= ~SMQF_SEND_RST;
1206    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1207        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1208    stream->stream_flags |= STREAM_RST_SENT;
1209    maybe_finish_stream(stream);
1210}
1211
1212
1213static size_t
1214read_uh (struct lsquic_stream *stream,
1215        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1216{
1217    struct http1x_headers *const h1h = stream->uh->uh_hset;
1218    size_t nread;
1219
1220    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1221                  h1h->h1h_size - h1h->h1h_off,
1222                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1223    h1h->h1h_off += nread;
1224    if (h1h->h1h_off == h1h->h1h_size)
1225    {
1226        LSQ_DEBUG("read all uncompressed headers");
1227        destroy_uh(stream);
1228        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1229        {
1230            stream->stream_flags |= STREAM_FIN_REACHED;
1231            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1232        }
1233    }
1234    return nread;
1235}
1236
1237
1238static void
1239stream_consumed_bytes (struct lsquic_stream *stream)
1240{
1241    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1242    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1243    {
1244        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1245            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1246                                                            next_send_stream);
1247        stream->sm_qflags |= SMQF_SEND_WUF;
1248        maybe_conn_to_tickable_if_writeable(stream, 1);
1249    }
1250}
1251
1252
1253struct read_frames_status
1254{
1255    int     error;
1256    int     processed_frames;
1257    size_t  total_nread;
1258};
1259
1260
1261static struct read_frames_status
1262read_data_frames (struct lsquic_stream *stream, int do_filtering,
1263        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1264{
1265    struct data_frame *data_frame;
1266    size_t nread, toread, total_nread;
1267    int short_read, processed_frames;
1268
1269    processed_frames = 0;
1270    total_nread = 0;
1271
1272    while ((data_frame = stream->data_in->di_if->di_get_frame(
1273                                        stream->data_in, stream->read_offset)))
1274    {
1275
1276        ++processed_frames;
1277
1278        do
1279        {
1280            if (do_filtering && stream->sm_sfi)
1281                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1282            else
1283                toread = data_frame->df_size - data_frame->df_read_off;
1284
1285            if (toread || data_frame->df_fin)
1286            {
1287                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1288                                                     toread, data_frame->df_fin);
1289                if (do_filtering && stream->sm_sfi)
1290                    stream->sm_sfi->sfi_decr_left(stream, nread);
1291                data_frame->df_read_off += nread;
1292                stream->read_offset += nread;
1293                total_nread += nread;
1294                short_read = nread < toread;
1295            }
1296            else
1297                short_read = 0;
1298
1299            if (data_frame->df_read_off == data_frame->df_size)
1300            {
1301                const int fin = data_frame->df_fin;
1302                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1303                data_frame = NULL;
1304                if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
1305                        (stream->data_in->di_flags & DI_SWITCH_IMPL))
1306                {
1307                    stream->data_in = stream->data_in->di_if->di_switch_impl(
1308                                                stream->data_in, stream->read_offset);
1309                    if (!stream->data_in)
1310                    {
1311                        stream->data_in = data_in_error_new();
1312                        return (struct read_frames_status) { .error = 1, };
1313                    }
1314                }
1315                if (fin)
1316                {
1317                    stream->stream_flags |= STREAM_FIN_REACHED;
1318                    goto end_while;
1319                }
1320            }
1321            else if (short_read)
1322                goto end_while;
1323        }
1324        while (data_frame);
1325    }
1326  end_while:
1327
1328    if (processed_frames)
1329        stream_consumed_bytes(stream);
1330
1331    return (struct read_frames_status) {
1332        .error = 0,
1333        .processed_frames = processed_frames,
1334        .total_nread = total_nread,
1335    };
1336}
1337
1338
1339static ssize_t
1340stream_readf (struct lsquic_stream *stream,
1341        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1342{
1343    size_t total_nread, nread;
1344    int read_unc_headers;
1345
1346    total_nread = 0;
1347
1348    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1349                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1350            && !(stream->stream_flags & STREAM_HAVE_UH)
1351            && !stream->uh)
1352    {
1353        if (stream->sm_readable(stream))
1354        {
1355            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1356            {
1357                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1358                errno = EBADMSG;
1359                return -1;
1360            }
1361            assert(stream->uh);
1362        }
1363        else
1364        {
1365            errno = EWOULDBLOCK;
1366            return -1;
1367        }
1368    }
1369
1370    if (stream->uh)
1371    {
1372        if (stream->uh->uh_flags & UH_H1H)
1373        {
1374            nread = read_uh(stream, readf, ctx);
1375            read_unc_headers = nread > 0;
1376            total_nread += nread;
1377            if (stream->uh)
1378                return total_nread;
1379        }
1380        else
1381        {
1382            LSQ_INFO("header set not claimed: cannot read from stream");
1383            return -1;
1384        }
1385    }
1386    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1387                                && !(stream->stream_flags & STREAM_HAVE_UH))
1388    {
1389        LSQ_DEBUG("cannot read: headers not available");
1390        errno = EWOULDBLOCK;
1391        return -1;
1392    }
1393    else
1394        read_unc_headers = 0;
1395
1396    const struct read_frames_status rfs
1397                        = read_data_frames(stream, 1, readf, ctx);
1398    if (rfs.error)
1399        return -1;
1400    total_nread += rfs.total_nread;
1401
1402    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
1403                                        total_nread, stream->read_offset);
1404
1405    if (rfs.processed_frames || read_unc_headers)
1406    {
1407        return total_nread;
1408    }
1409    else
1410    {
1411        assert(0 == total_nread);
1412        errno = EWOULDBLOCK;
1413        return -1;
1414    }
1415}
1416
1417
1418/* This function returns 0 when EOF is reached.
1419 */
1420ssize_t
1421lsquic_stream_readf (struct lsquic_stream *stream,
1422        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1423{
1424    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1425
1426    if (lsquic_stream_is_reset(stream))
1427    {
1428        if (stream->stream_flags & STREAM_RST_RECVD)
1429            stream->stream_flags |= STREAM_RST_READ;
1430        errno = ECONNRESET;
1431        return -1;
1432    }
1433    if (stream->stream_flags & STREAM_U_READ_DONE)
1434    {
1435        errno = EBADF;
1436        return -1;
1437    }
1438    if ((stream->stream_flags & STREAM_FIN_REACHED)
1439            && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH)
1440                   ^ !!(stream->sm_bflags & SMBF_USE_HEADERS)))
1441        return 0;
1442
1443    return stream_readf(stream, readf, ctx);
1444}
1445
1446
1447struct readv_ctx
1448{
1449    const struct iovec        *iov;
1450    const struct iovec *const  end;
1451    unsigned char             *p;
1452};
1453
1454
1455static size_t
1456readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1457{
1458    struct readv_ctx *const ctx = ctx_p;
1459    const unsigned char *const end = buf + len;
1460    size_t ntocopy;
1461
1462    while (ctx->iov < ctx->end && buf < end)
1463    {
1464        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1465                                                                    - ctx->p;
1466        if (ntocopy > (size_t) (end - buf))
1467            ntocopy = end - buf;
1468        memcpy(ctx->p, buf, ntocopy);
1469        ctx->p += ntocopy;
1470        buf += ntocopy;
1471        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1472        {
1473            do
1474                ++ctx->iov;
1475            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1476            if (ctx->iov < ctx->end)
1477                ctx->p = ctx->iov->iov_base;
1478            else
1479                break;
1480        }
1481    }
1482
1483    return len - (end - buf);
1484}
1485
1486
1487ssize_t
1488lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1489                     int iovcnt)
1490{
1491    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1492    return lsquic_stream_readf(stream, readv_f, &ctx);
1493}
1494
1495
1496ssize_t
1497lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1498{
1499    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1500    return lsquic_stream_readv(stream, &iov, 1);
1501}
1502
1503
1504static void
1505stream_shutdown_read (lsquic_stream_t *stream)
1506{
1507    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1508    {
1509        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1510        stream->stream_flags |= STREAM_U_READ_DONE;
1511        stream_wantread(stream, 0);
1512        maybe_finish_stream(stream);
1513    }
1514}
1515
1516
1517static int
1518stream_is_incoming_unidir (const struct lsquic_stream *stream)
1519{
1520    enum stream_id_type sit;
1521
1522    if (stream->sm_bflags & SMBF_IETF)
1523    {
1524        sit = stream->id & SIT_MASK;
1525        if (stream->sm_bflags & SMBF_SERVER)
1526            return sit == SIT_UNI_CLIENT;
1527        else
1528            return sit == SIT_UNI_SERVER;
1529    }
1530    else
1531        return 0;
1532}
1533
1534
1535static void
1536stream_shutdown_write (lsquic_stream_t *stream)
1537{
1538    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1539        return;
1540
1541    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1542    stream->stream_flags |= STREAM_U_WRITE_DONE;
1543    stream_wantwrite(stream, 0);
1544
1545    /* Don't bother to check whether there is anything else to write if
1546     * the flags indicate that nothing else should be written.
1547     */
1548    if (!(stream->sm_bflags & SMBF_CRYPTO)
1549            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1550                && !stream_is_incoming_unidir(stream)
1551                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1552    {
1553        if ((stream->sm_bflags & SMBF_USE_HEADERS)
1554                && !(stream->stream_flags & STREAM_HEADERS_SENT))
1555        {
1556            LSQ_DEBUG("headers not sent, send a reset");
1557            lsquic_stream_reset(stream, 0);
1558        }
1559        else if (stream->sm_n_buffered == 0)
1560        {
1561            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1562                                                 stream))
1563            {
1564                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1565                stream->stream_flags |= STREAM_FIN_SENT;
1566            }
1567            else
1568            {
1569                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1570                          "flag in it");
1571                (void) stream_flush_nocheck(stream);
1572            }
1573        }
1574        else
1575            (void) stream_flush_nocheck(stream);
1576    }
1577}
1578
1579
1580static void
1581maybe_stream_shutdown_write (struct lsquic_stream *stream)
1582{
1583    if (stream->sm_send_headers_state == SSHS_BEGIN)
1584        stream_shutdown_write(stream);
1585    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1586    {
1587        LSQ_DEBUG("shutdown delayed");
1588        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1589        stream->stream_flags |= STREAM_DELAYED_SW;
1590    }
1591}
1592
1593
1594int
1595lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1596{
1597    LSQ_DEBUG("shutdown; how: %d", how);
1598    if (lsquic_stream_is_closed(stream))
1599    {
1600        LSQ_INFO("Attempt to shut down a closed stream");
1601        errno = EBADF;
1602        return -1;
1603    }
1604    /* 0: read, 1: write: 2: read and write
1605     */
1606    if (how < 0 || how > 2)
1607    {
1608        errno = EINVAL;
1609        return -1;
1610    }
1611
1612    if (how)
1613        maybe_stream_shutdown_write(stream);
1614    if (how != 1)
1615        stream_shutdown_read(stream);
1616
1617    maybe_finish_stream(stream);
1618    maybe_schedule_call_on_close(stream);
1619    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1620        maybe_conn_to_tickable_if_writeable(stream, 1);
1621
1622    return 0;
1623}
1624
1625
1626void
1627lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1628{
1629    LSQ_DEBUG("internal shutdown");
1630    if (lsquic_stream_is_critical(stream))
1631    {
1632        LSQ_DEBUG("add flag to force-finish special stream");
1633        stream->stream_flags |= STREAM_FORCE_FINISH;
1634        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1635    }
1636    maybe_finish_stream(stream);
1637    maybe_schedule_call_on_close(stream);
1638}
1639
1640
1641static void
1642fake_reset_unused_stream (lsquic_stream_t *stream)
1643{
1644    stream->stream_flags |=
1645        STREAM_RST_RECVD    /* User will pick this up on read or write */
1646      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1647    ;
1648
1649    /* Cancel all writes to the network scheduled for this stream: */
1650    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1651        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1652                                                next_send_stream);
1653    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1654    drop_buffered_data(stream);
1655    LSQ_DEBUG("fake-reset stream%s",
1656                    stream_stalled(stream) ? " (stalled)" : "");
1657    maybe_finish_stream(stream);
1658    maybe_schedule_call_on_close(stream);
1659}
1660
1661
1662/* This function should only be called for locally-initiated streams whose ID
1663 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1664 * frame sent by peer but we have not yet received it and created a stream.
1665 * In this situation, we mark the stream as reset, so that user's on_read or
1666 * on_write event callback picks up the error.  That, in turn, should result
1667 * in stream being closed.
1668 *
1669 * If we have received any data frames on this stream, this probably indicates
1670 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1671 * lower than this.  However, we still try to handle it gracefully and peform
1672 * a shutdown, as if the stream was not reset.
1673 */
1674void
1675lsquic_stream_received_goaway (lsquic_stream_t *stream)
1676{
1677    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1678    if (0 == stream->read_offset &&
1679                            stream->data_in->di_if->di_empty(stream->data_in))
1680        fake_reset_unused_stream(stream);       /* Normal condition */
1681    else
1682    {   /* This is odd, let's handle it the best we can: */
1683        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1684        lsquic_stream_shutdown_internal(stream);
1685    }
1686}
1687
1688
1689uint64_t
1690lsquic_stream_read_offset (const lsquic_stream_t *stream)
1691{
1692    return stream->read_offset;
1693}
1694
1695
1696static int
1697stream_wantread (lsquic_stream_t *stream, int is_want)
1698{
1699    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
1700    const int new_val = !!is_want;
1701    if (old_val != new_val)
1702    {
1703        if (new_val)
1704        {
1705            if (!old_val)
1706                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1707                                                            next_read_stream);
1708            stream->sm_qflags |= SMQF_WANT_READ;
1709        }
1710        else
1711        {
1712            stream->sm_qflags &= ~SMQF_WANT_READ;
1713            if (old_val)
1714                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1715                                                            next_read_stream);
1716        }
1717    }
1718    return old_val;
1719}
1720
1721
1722static void
1723maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1724{
1725    assert(SMQF_WRITE_Q_FLAGS & flag);
1726    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1727        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1728                                                        next_write_stream);
1729    stream->sm_qflags |= flag;
1730}
1731
1732
1733static void
1734maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1735{
1736    assert(SMQF_WRITE_Q_FLAGS & flag);
1737    if (stream->sm_qflags & flag)
1738    {
1739        stream->sm_qflags &= ~flag;
1740        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1741            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1742                                                        next_write_stream);
1743    }
1744}
1745
1746
1747static int
1748stream_wantwrite (struct lsquic_stream *stream, int new_val)
1749{
1750    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1751
1752    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
1753
1754    if (old_val != new_val)
1755    {
1756        if (new_val)
1757            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
1758        else
1759            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
1760    }
1761    return old_val;
1762}
1763
1764
1765int
1766lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1767{
1768    SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want);
1769    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1770    {
1771        if (is_want)
1772            maybe_conn_to_tickable_if_readable(stream);
1773        return stream_wantread(stream, is_want);
1774    }
1775    else
1776    {
1777        errno = EBADF;
1778        return -1;
1779    }
1780}
1781
1782
1783int
1784lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1785{
1786    int old_val;
1787
1788    is_want = !!is_want;
1789
1790    SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want);
1791    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
1792                            && SSHS_BEGIN == stream->sm_send_headers_state)
1793    {
1794        stream->sm_saved_want_write = is_want;
1795        if (is_want)
1796            maybe_conn_to_tickable_if_writeable(stream, 1);
1797        return stream_wantwrite(stream, is_want);
1798    }
1799    else if (SSHS_BEGIN != stream->sm_send_headers_state)
1800    {
1801        old_val = stream->sm_saved_want_write;
1802        stream->sm_saved_want_write = is_want;
1803        return old_val;
1804    }
1805    else
1806    {
1807        errno = EBADF;
1808        return -1;
1809    }
1810}
1811
1812
1813struct progress
1814{
1815    enum stream_flags   s_flags;
1816    enum stream_q_flags q_flags;
1817};
1818
1819
1820static struct progress
1821stream_progress (const struct lsquic_stream *stream)
1822{
1823    return (struct progress) {
1824        .s_flags = stream->stream_flags
1825          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
1826        .q_flags = stream->sm_qflags
1827          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
1828    };
1829}
1830
1831
1832static int
1833progress_eq (struct progress a, struct progress b)
1834{
1835    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
1836}
1837
1838
1839static void
1840stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1841{
1842    unsigned no_progress_count, no_progress_limit;
1843    struct progress progress;
1844    uint64_t size;
1845
1846    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1847
1848    no_progress_count = 0;
1849    while ((stream->sm_qflags & SMQF_WANT_READ)
1850                                            && lsquic_stream_readable(stream))
1851    {
1852        progress = stream_progress(stream);
1853        size  = stream->read_offset;
1854
1855        stream->stream_if->on_read(stream, stream->st_ctx);
1856
1857        if (no_progress_limit && size == stream->read_offset &&
1858                                progress_eq(progress, stream_progress(stream)))
1859        {
1860            ++no_progress_count;
1861            if (no_progress_count >= no_progress_limit)
1862            {
1863                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1864                    "progress) in user code reading from stream",
1865                    no_progress_count,
1866                    no_progress_count == 1 ? "" : "s");
1867                break;
1868            }
1869        }
1870        else
1871            no_progress_count = 0;
1872    }
1873}
1874
1875
1876static void
1877stream_hblock_sent (struct lsquic_stream *stream)
1878{
1879    int want_write;
1880
1881    LSQ_DEBUG("header block has been sent: restore default behavior");
1882    stream->sm_send_headers_state = SSHS_BEGIN;
1883    stream->sm_write_avail = stream_write_avail_with_frames;
1884
1885    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1886    if (want_write != stream->sm_saved_want_write)
1887        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
1888
1889    if (stream->stream_flags & STREAM_DELAYED_SW)
1890    {
1891        LSQ_DEBUG("performing delayed shutdown write");
1892        stream->stream_flags &= ~STREAM_DELAYED_SW;
1893        stream_shutdown_write(stream);
1894        maybe_schedule_call_on_close(stream);
1895        maybe_finish_stream(stream);
1896        maybe_conn_to_tickable_if_writeable(stream, 1);
1897    }
1898}
1899
1900
1901static void
1902on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
1903{
1904    ssize_t nw;
1905
1906    nw = stream_write_buf(stream,
1907                stream->sm_header_block + stream->sm_hblock_off,
1908                stream->sm_hblock_sz - stream->sm_hblock_off);
1909    if (nw > 0)
1910    {
1911        stream->sm_hblock_off += nw;
1912        if (stream->sm_hblock_off == stream->sm_hblock_sz)
1913        {
1914            stream->stream_flags |= STREAM_HEADERS_SENT;
1915            free(stream->sm_header_block);
1916            stream->sm_header_block = NULL;
1917            stream->sm_hblock_sz = 0;
1918            stream_hblock_sent(stream);
1919            LSQ_DEBUG("header block written out successfully");
1920            /* TODO: if there was eos, do something else */
1921            if (stream->sm_qflags & SMQF_WANT_WRITE)
1922                stream->stream_if->on_write(stream, h);
1923        }
1924        else
1925        {
1926            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
1927                nw);
1928        }
1929    }
1930    else if (nw < 0)
1931    {
1932        /* XXX What should happen if we hit an error? TODO */
1933    }
1934}
1935
1936
1937static void
1938(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
1939                                                        lsquic_stream_ctx_t *)
1940{
1941    if (0 == (stream->stream_flags & STREAM_PUSHING)
1942                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
1943        /* Common case */
1944        return stream->stream_if->on_write;
1945    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
1946        return on_write_header_wrapper;
1947    else
1948    {
1949        assert(stream->stream_flags & STREAM_PUSHING);
1950        if (stream_is_pushing_promise(stream))
1951            return on_write_pp_wrapper;
1952        else if (stream->sm_dup_push_off < stream->sm_dup_push_len)
1953            return on_write_dp_wrapper;
1954        else
1955            return stream->stream_if->on_write;
1956    }
1957}
1958
1959
1960static void
1961stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1962{
1963    unsigned no_progress_count, no_progress_limit;
1964    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1965    struct progress progress;
1966
1967    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1968
1969    no_progress_count = 0;
1970    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1971    while ((stream->sm_qflags & SMQF_WANT_WRITE)
1972                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
1973                       && lsquic_stream_write_avail(stream))
1974    {
1975        progress = stream_progress(stream);
1976
1977        on_write = select_on_write(stream);
1978        on_write(stream, stream->st_ctx);
1979
1980        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
1981        {
1982            ++no_progress_count;
1983            if (no_progress_count >= no_progress_limit)
1984            {
1985                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1986                    "progress) in user code writing to stream",
1987                    no_progress_count,
1988                    no_progress_count == 1 ? "" : "s");
1989                break;
1990            }
1991        }
1992        else
1993            no_progress_count = 0;
1994    }
1995}
1996
1997
1998static void
1999stream_dispatch_read_events_once (lsquic_stream_t *stream)
2000{
2001    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
2002    {
2003        stream->stream_if->on_read(stream, stream->st_ctx);
2004    }
2005}
2006
2007
2008uint64_t
2009lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
2010{
2011    size_t frames_sizes;
2012
2013    frames_sizes = active_hq_frame_sizes(stream);
2014    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
2015}
2016
2017
2018static void
2019maybe_mark_as_blocked (lsquic_stream_t *stream)
2020{
2021    struct lsquic_conn_cap *cc;
2022    uint64_t used;
2023
2024    used = lsquic_stream_combined_send_off(stream);
2025    if (stream->max_send_off == used)
2026    {
2027        if (stream->blocked_off < stream->max_send_off)
2028        {
2029            stream->blocked_off = used;
2030            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
2031                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2032                                                            next_send_stream);
2033            stream->sm_qflags |= SMQF_SEND_BLOCKED;
2034            LSQ_DEBUG("marked stream-blocked at stream offset "
2035                                            "%"PRIu64, stream->blocked_off);
2036        }
2037        else
2038            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
2039                " has been, or is about to be, sent", stream->blocked_off);
2040    }
2041
2042    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
2043        && (cc = &stream->conn_pub->conn_cap,
2044                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
2045    {
2046        if (cc->cc_blocked < cc->cc_max)
2047        {
2048            cc->cc_blocked = cc->cc_max;
2049            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
2050            LSQ_DEBUG("marked connection-blocked at connection offset "
2051                                                    "%"PRIu64, cc->cc_max);
2052        }
2053        else
2054            LSQ_DEBUG("stream has already been marked connection-blocked "
2055                "at offset %"PRIu64, cc->cc_blocked);
2056    }
2057}
2058
2059
2060void
2061lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
2062{
2063    assert(stream->sm_qflags & SMQF_WANT_READ);
2064
2065    if (stream->sm_bflags & SMBF_RW_ONCE)
2066        stream_dispatch_read_events_once(stream);
2067    else
2068        stream_dispatch_read_events_loop(stream);
2069}
2070
2071
2072void
2073lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
2074{
2075    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2076    int progress;
2077    uint64_t tosend_off;
2078    unsigned short n_buffered;
2079    enum stream_q_flags q_flags;
2080
2081    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
2082    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
2083    tosend_off = stream->tosend_off;
2084    n_buffered = stream->sm_n_buffered;
2085
2086    if (stream->sm_qflags & SMQF_WANT_FLUSH)
2087        (void) stream_flush(stream);
2088
2089    if (stream->sm_bflags & SMBF_RW_ONCE)
2090    {
2091        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2092            && lsquic_stream_write_avail(stream))
2093        {
2094            on_write = select_on_write(stream);
2095            on_write(stream, stream->st_ctx);
2096        }
2097    }
2098    else
2099        stream_dispatch_write_events_loop(stream);
2100
2101    /* Progress means either flags or offsets changed: */
2102    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2103                        stream->tosend_off == tosend_off &&
2104                            stream->sm_n_buffered == n_buffered);
2105
2106    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2107    {
2108        if (progress)
2109        {   /* Move the stream to the end of the list to ensure fairness. */
2110            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2111                                                            next_write_stream);
2112            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2113                                                            next_write_stream);
2114        }
2115    }
2116}
2117
2118
2119static size_t
2120inner_reader_empty_size (void *ctx)
2121{
2122    return 0;
2123}
2124
2125
2126static size_t
2127inner_reader_empty_read (void *ctx, void *buf, size_t count)
2128{
2129    return 0;
2130}
2131
2132
2133static int
2134stream_flush (lsquic_stream_t *stream)
2135{
2136    struct lsquic_reader empty_reader;
2137    ssize_t nw;
2138
2139    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2140    assert(stream->sm_n_buffered > 0 ||
2141        /* Flushing is also used to packetize standalone FIN: */
2142        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2143                                                    == STREAM_U_WRITE_DONE));
2144
2145    empty_reader.lsqr_size = inner_reader_empty_size;
2146    empty_reader.lsqr_read = inner_reader_empty_read;
2147    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2148    nw = stream_write_to_packets(stream, &empty_reader, 0);
2149
2150    if (nw >= 0)
2151    {
2152        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2153        return 0;
2154    }
2155    else
2156        return -1;
2157}
2158
2159
2160static int
2161stream_flush_nocheck (lsquic_stream_t *stream)
2162{
2163    size_t frames;
2164
2165    frames = active_hq_frame_sizes(stream);
2166    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2167    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2168    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2169    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2170
2171    return stream_flush(stream);
2172}
2173
2174
2175int
2176lsquic_stream_flush (lsquic_stream_t *stream)
2177{
2178    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2179    {
2180        LSQ_DEBUG("cannot flush closed stream");
2181        errno = EBADF;
2182        return -1;
2183    }
2184
2185    if (0 == stream->sm_n_buffered)
2186    {
2187        LSQ_DEBUG("flushing 0 bytes: noop");
2188        return 0;
2189    }
2190
2191    return stream_flush_nocheck(stream);
2192}
2193
2194
2195static size_t
2196stream_get_n_allowed (const struct lsquic_stream *stream)
2197{
2198    if (stream->sm_n_allocated)
2199        return stream->sm_n_allocated;
2200    else
2201        return stream->conn_pub->path->np_pack_size;
2202}
2203
2204
2205/* The flush threshold is the maximum size of stream data that can be sent
2206 * in a full packet.
2207 */
2208#ifdef NDEBUG
2209static
2210#endif
2211       size_t
2212lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2213                                                            unsigned data_sz)
2214{
2215    enum packet_out_flags flags;
2216    enum packno_bits bits;
2217    size_t packet_header_sz, stream_header_sz, tag_len;
2218    size_t threshold;
2219
2220    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
2221    flags = bits << POBIT_SHIFT;
2222    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2223        flags |= PO_CONN_ID;
2224    if (stream_is_hsk(stream))
2225        flags |= PO_LONGHEAD;
2226
2227    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2228                                        stream->conn_pub->path->np_dcid.len);
2229    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2230    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2231
2232    threshold = stream_get_n_allowed(stream) - tag_len
2233              - packet_header_sz - stream_header_sz;
2234    return threshold;
2235}
2236
2237
2238#define COMMON_WRITE_CHECKS() do {                                          \
2239    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2240            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2241    {                                                                       \
2242        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2243        {                                                                   \
2244            LSQ_DEBUG("still sending headers: no writing allowed");         \
2245            return 0;                                                       \
2246        }                                                                   \
2247        else                                                                \
2248        {                                                                   \
2249            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2250                                                                "headers"); \
2251            errno = EILSEQ;                                                 \
2252            return -1;                                                      \
2253        }                                                                   \
2254    }                                                                       \
2255    if (lsquic_stream_is_reset(stream))                                     \
2256    {                                                                       \
2257        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2258        errno = ECONNRESET;                                                 \
2259        return -1;                                                          \
2260    }                                                                       \
2261    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2262    {                                                                       \
2263        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2264                                                                "writing"); \
2265        errno = EBADF;                                                      \
2266        return -1;                                                          \
2267    }                                                                       \
2268} while (0)
2269
2270
2271struct frame_gen_ctx
2272{
2273    lsquic_stream_t      *fgc_stream;
2274    struct lsquic_reader *fgc_reader;
2275    /* We keep our own count of how many bytes were read from reader because
2276     * some readers are external.  The external caller does not have to rely
2277     * on our count, but it can.
2278     */
2279    size_t                fgc_nread_from_reader;
2280    size_t              (*fgc_size) (void *ctx);
2281    int                 (*fgc_fin) (void *ctx);
2282    gsf_read_f            fgc_read;
2283};
2284
2285
2286static size_t
2287frame_std_gen_size (void *ctx)
2288{
2289    struct frame_gen_ctx *fg_ctx = ctx;
2290    size_t available, remaining;
2291
2292    /* Make sure we are not writing past available size: */
2293    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2294    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2295    if (available < remaining)
2296        remaining = available;
2297
2298    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2299}
2300
2301
2302static size_t
2303stream_hq_frame_size (const struct stream_hq_frame *shf)
2304{
2305    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2306        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2307    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2308        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2309    else
2310    {
2311        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2312                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2313        return 0;
2314    }
2315}
2316
2317
2318static size_t
2319active_hq_frame_sizes (const struct lsquic_stream *stream)
2320{
2321    const struct stream_hq_frame *shf;
2322    size_t size;
2323
2324    size = 0;
2325    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2326                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2327        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2328            if (!(shf->shf_flags & SHF_WRITTEN))
2329                size += stream_hq_frame_size(shf);
2330
2331    return size;
2332}
2333
2334
2335static uint64_t
2336stream_hq_frame_end (const struct stream_hq_frame *shf)
2337{
2338    if (shf->shf_flags & SHF_FIXED_SIZE)
2339        return shf->shf_off + shf->shf_frame_size;
2340    else if (shf->shf_flags & SHF_TWO_BYTES)
2341        return shf->shf_off + ((1 << 14) - 1);
2342    else
2343        return shf->shf_off + ((1 << 6) - 1);
2344}
2345
2346
2347static int
2348frame_in_stream (const struct lsquic_stream *stream,
2349                                            const struct stream_hq_frame *shf)
2350{
2351    return shf >= stream->sm_hq_frame_arr
2352        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2353                                        / sizeof(stream->sm_hq_frame_arr[0])
2354        ;
2355}
2356
2357
2358static void
2359stream_hq_frame_put (struct lsquic_stream *stream,
2360                                                struct stream_hq_frame *shf)
2361{
2362    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2363    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2364    if (frame_in_stream(stream, shf))
2365        memset(shf, 0, sizeof(*shf));
2366    else
2367        lsquic_malo_put(shf);
2368}
2369
2370
2371static void
2372stream_hq_frame_close (struct lsquic_stream *stream,
2373                                                struct stream_hq_frame *shf)
2374{
2375    unsigned bits;
2376
2377    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2378        " (actual offset %"PRIu64")", shf->shf_frame_type,
2379        stream->sm_payload, stream->tosend_off);
2380    assert(shf->shf_flags & SHF_ACTIVE);
2381    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2382    {
2383        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2384        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2385        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2386                                                            bits, 1 << bits);
2387    }
2388    stream_hq_frame_put(stream, shf);
2389}
2390
2391
2392static size_t
2393frame_hq_gen_size (void *ctx)
2394{
2395    struct frame_gen_ctx *fg_ctx = ctx;
2396    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2397    size_t available, remaining, frames;
2398    const struct stream_hq_frame *shf;
2399
2400    frames = 0;
2401    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2402        if (shf->shf_off >= stream->sm_payload)
2403            frames += stream_hq_frame_size(shf);
2404
2405    /* Make sure we are not writing past available size: */
2406    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2407    available = lsquic_stream_write_avail(stream);
2408    if (available < remaining)
2409        remaining = available;
2410
2411    return remaining + stream->sm_n_buffered + frames;
2412}
2413
2414
2415static int
2416frame_std_gen_fin (void *ctx)
2417{
2418    struct frame_gen_ctx *fg_ctx = ctx;
2419    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2420        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2421        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2422        /* Do not use frame_std_gen_size() as it may chop the real size: */
2423        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2424}
2425
2426
2427static void
2428incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2429{
2430    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2431    {
2432        stream->conn_pub->conn_cap.cc_sent += incr;
2433        assert(stream->conn_pub->conn_cap.cc_sent
2434                                    <= stream->conn_pub->conn_cap.cc_max);
2435    }
2436}
2437
2438
2439void
2440incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2441{
2442    stream->sm_payload += incr;
2443    stream->tosend_off += incr;
2444    assert(stream->tosend_off <= stream->max_send_off);
2445}
2446
2447
2448static size_t
2449frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2450{
2451    struct frame_gen_ctx *fg_ctx = ctx;
2452    unsigned char *p = begin_buf;
2453    unsigned char *const end = p + len;
2454    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2455    size_t n_written, available, n_to_write;
2456
2457    if (stream->sm_n_buffered > 0)
2458    {
2459        if (len <= stream->sm_n_buffered)
2460        {
2461            memcpy(p, stream->sm_buf, len);
2462            memmove(stream->sm_buf, stream->sm_buf + len,
2463                                                stream->sm_n_buffered - len);
2464            stream->sm_n_buffered -= len;
2465            if (0 == stream->sm_n_buffered)
2466                maybe_resize_stream_buffer(stream);
2467            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2468            incr_sm_payload(stream, len);
2469            *fin = fg_ctx->fgc_fin(fg_ctx);
2470            return len;
2471        }
2472        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2473        p += stream->sm_n_buffered;
2474        stream->sm_n_buffered = 0;
2475        maybe_resize_stream_buffer(stream);
2476    }
2477
2478    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2479    n_to_write = end - p;
2480    if (n_to_write > available)
2481        n_to_write = available;
2482    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2483                                              n_to_write);
2484    p += n_written;
2485    fg_ctx->fgc_nread_from_reader += n_written;
2486    *fin = fg_ctx->fgc_fin(fg_ctx);
2487    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2488    incr_conn_cap(stream, n_written);
2489    return p - (const unsigned char *) begin_buf;
2490}
2491
2492
2493static struct stream_hq_frame *
2494find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2495{
2496    struct stream_hq_frame *shf;
2497
2498    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2499        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2500            return shf;
2501
2502    return NULL;
2503}
2504
2505
2506static struct stream_hq_frame *
2507find_cur_hq_frame (const struct lsquic_stream *stream)
2508{
2509    return find_hq_frame(stream, stream->sm_payload);
2510}
2511
2512
2513static struct stream_hq_frame *
2514open_hq_frame (struct lsquic_stream *stream)
2515{
2516    struct stream_hq_frame *shf;
2517
2518    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2519            + sizeof(stream->sm_hq_frame_arr)
2520                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2521        if (!(shf->shf_flags & SHF_ACTIVE))
2522            goto found;
2523
2524    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2525    if (!shf)
2526    {
2527        LSQ_WARN("cannot allocate HQ frame");
2528        return NULL;
2529    }
2530    memset(shf, 0, sizeof(*shf));
2531
2532  found:
2533    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2534    shf->shf_flags = SHF_ACTIVE;
2535    return shf;
2536}
2537
2538
2539/* Returns index of the new frame */
2540static struct stream_hq_frame *
2541stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2542            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2543{
2544    struct stream_hq_frame *shf;
2545
2546    shf = open_hq_frame(stream);
2547    if (!shf)
2548    {
2549        LSQ_WARN("could not open HQ frame");
2550        return NULL;
2551    }
2552
2553    shf->shf_off        = off;
2554    shf->shf_flags     |= flags;
2555    shf->shf_frame_type = frame_type;
2556    if (shf->shf_flags & SHF_FIXED_SIZE)
2557    {
2558        shf->shf_frame_size = size;
2559        LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset "
2560            "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size);
2561    }
2562    else
2563    {
2564        shf->shf_frame_ptr  = NULL;
2565        if (size >= (1 << 6))
2566            shf->shf_flags |= SHF_TWO_BYTES;
2567        LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset "
2568            "%"PRIu64, shf->shf_frame_type, shf->shf_off);
2569    }
2570
2571    return shf;
2572}
2573
2574
2575static size_t
2576frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2577{
2578    struct frame_gen_ctx *fg_ctx = ctx;
2579    unsigned char *p = begin_buf;
2580    unsigned char *const end = p + len;
2581    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2582    struct stream_hq_frame *shf;
2583    size_t nw, frame_sz, avail, rem;
2584    unsigned bits;
2585
2586    while (p < end)
2587    {
2588        shf = find_cur_hq_frame(stream);
2589        if (shf)
2590            LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64,
2591                                            shf->shf_frame_type, shf->shf_off);
2592        else
2593        {
2594            rem = frame_std_gen_size(ctx);
2595            if (rem)
2596            {
2597                if (rem > ((1 << 14) - 1))
2598                    rem = (1 << 14) - 1;
2599                shf = stream_activate_hq_frame(stream,
2600                                    stream->sm_payload, HQFT_DATA, 0, rem);
2601                if (shf)
2602                    goto insert;
2603                else
2604                {
2605                    /* TODO: abort connection?  Handle failure somehow */
2606                    break;
2607                }
2608            }
2609            else
2610                break;
2611        }
2612        if (shf->shf_off == stream->sm_payload
2613                                        && !(shf->shf_flags & SHF_WRITTEN))
2614        {
2615  insert:
2616            frame_sz = stream_hq_frame_size(shf);
2617            if (frame_sz > (uintptr_t) (end - p))
2618            {
2619                stream_hq_frame_put(stream, shf);
2620                break;
2621            }
2622            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2623                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2624                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2625            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2626            {
2627                shf->shf_frame_ptr = p;
2628                memset(p, 0, frame_sz);
2629                p += frame_sz;
2630            }
2631            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2632                                                            == SHF_FIXED_SIZE)
2633            {
2634                *p++ = shf->shf_frame_type;
2635                bits = vint_val2bits(shf->shf_frame_size);
2636                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2637                p += 1 << bits;
2638            }
2639            else
2640                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2641                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2642            if (!(shf->shf_flags & SHF_CC_PAID))
2643            {
2644                incr_conn_cap(stream, frame_sz);
2645                shf->shf_flags |= SHF_CC_PAID;
2646            }
2647            shf->shf_flags |= SHF_WRITTEN;
2648            stream->tosend_off += frame_sz;
2649            assert(stream->tosend_off <= stream->max_send_off);
2650        }
2651        else
2652        {
2653            avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
2654            len = stream_hq_frame_end(shf) - stream->sm_payload;
2655            assert(len);
2656            if (len > (unsigned) (end - p))
2657                len = end - p;
2658            if (len > avail)
2659                len = avail;
2660            if (!len)
2661                break;
2662            nw = frame_std_gen_read(ctx, p, len, fin);
2663            p += nw;
2664            if (nw < len)
2665                break;
2666            if (stream_hq_frame_end(shf) == stream->sm_payload)
2667                stream_hq_frame_close(stream, shf);
2668        }
2669    }
2670
2671    return p - (unsigned char *) begin_buf;
2672}
2673
2674
2675static size_t
2676crypto_frame_gen_read (void *ctx, void *buf, size_t len)
2677{
2678    int fin_ignored;
2679
2680    return frame_std_gen_read(ctx, buf, len, &fin_ignored);
2681}
2682
2683
2684static void
2685check_flush_threshold (lsquic_stream_t *stream)
2686{
2687    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
2688                            stream->tosend_off >= stream->sm_flush_to)
2689    {
2690        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
2691                                                    stream->sm_flush_to);
2692        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
2693    }
2694}
2695
2696
2697static int
2698write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
2699                                        struct lsquic_packet_out *packet_out)
2700{
2701    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2702    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2703    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2704    unsigned off;
2705    int len, s;
2706
2707#if LSQUIC_CONN_STATS
2708    const uint64_t begin_off = stream->tosend_off;
2709#endif
2710    off = packet_out->po_data_sz;
2711    len = pf->pf_gen_stream_frame(
2712                packet_out->po_data + packet_out->po_data_sz,
2713                lsquic_packet_out_avail(packet_out), stream->id,
2714                stream->tosend_off,
2715                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
2716    if (len < 0)
2717        return len;
2718
2719#if LSQUIC_CONN_STATS
2720    stream->conn_pub->conn_stats->out.stream_frames += 1;
2721    stream->conn_pub->conn_stats->out.stream_data_sz
2722                                            += stream->tosend_off - begin_off;
2723#endif
2724    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
2725                            packet_out->po_data + packet_out->po_data_sz, len);
2726    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2727    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
2728    if (0 == lsquic_packet_out_avail(packet_out))
2729        packet_out->po_flags |= PO_STREAM_END;
2730    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2731                                     stream, QUIC_FRAME_STREAM, off, len);
2732    if (s != 0)
2733    {
2734        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
2735        return -1;
2736    }
2737
2738    check_flush_threshold(stream);
2739    return len;
2740}
2741
2742
2743static enum swtp_status
2744stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
2745{
2746    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2747    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2748    struct lsquic_packet_out *packet_out;
2749    int len;
2750
2751    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
2752                                                    stream->conn_pub->path);
2753    if (!packet_out)
2754        return SWTP_STOP;
2755    packet_out->po_header_type = stream->tosend_off == 0
2756                                        ? HETY_INITIAL : HETY_HANDSHAKE;
2757
2758    len = write_stream_frame(fg_ctx, size, packet_out);
2759
2760    if (len > 0)
2761    {
2762        packet_out->po_flags |= PO_HELLO;
2763        lsquic_packet_out_zero_pad(packet_out);
2764        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
2765        return SWTP_OK;
2766    }
2767    else
2768        return SWTP_ERROR;
2769}
2770
2771
2772static enum swtp_status
2773stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
2774{
2775    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2776    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2777    unsigned stream_header_sz, need_at_least;
2778    struct lsquic_packet_out *packet_out;
2779    struct lsquic_stream *headers_stream;
2780    int len;
2781
2782    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
2783                                                        == STREAM_HEADERS_SENT)
2784    {
2785        if (stream->sm_bflags & SMBF_IETF)
2786        {
2787            if (stream->stream_flags & STREAM_ENCODER_DEP)
2788                headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out;
2789            else
2790                headers_stream = NULL;
2791        }
2792        else
2793            headers_stream =
2794                lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
2795        if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream))
2796        {
2797            LSQ_DEBUG("flushing headers stream before packetizing stream data");
2798            (void) lsquic_stream_flush(headers_stream);
2799        }
2800        /* If there is nothing to flush, some other stream must have flushed it:
2801         * this means our headers are flushed.  Either way, only do this once.
2802         */
2803        stream->stream_flags |= STREAM_HDRS_FLUSHED;
2804    }
2805
2806    stream_header_sz = stream->sm_frame_header_sz(stream, size);
2807    need_at_least = stream_header_sz;
2808    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2809                                       == (SMBF_IETF|SMBF_USE_HEADERS))
2810    {
2811        if (size > 0)
2812            need_at_least += 3;     /* Enough room for HTTP/3 frame */
2813    }
2814    else
2815        need_at_least += size > 0;
2816  get_packet:
2817    packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl,
2818                                need_at_least, stream->conn_pub->path, stream);
2819    if (packet_out)
2820    {
2821        len = write_stream_frame(fg_ctx, size, packet_out);
2822        if (len > 0)
2823            return SWTP_OK;
2824        assert(len < 0);
2825        if (-len > (int) need_at_least)
2826        {
2827            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
2828                "%u bytes, will try again", -len, need_at_least);
2829            need_at_least = -len;
2830            goto get_packet;
2831        }
2832        return SWTP_ERROR;
2833    }
2834    else
2835        return SWTP_STOP;
2836}
2837
2838
2839static enum swtp_status
2840stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
2841{
2842    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2843    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2844    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2845    unsigned crypto_header_sz, need_at_least;
2846    struct lsquic_packet_out *packet_out;
2847    unsigned short off;
2848    const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ];
2849    int len, s;
2850
2851    assert(size > 0);
2852    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
2853    need_at_least = crypto_header_sz + 1;
2854
2855    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
2856                                    need_at_least, pns, stream->conn_pub->path);
2857    if (!packet_out)
2858        return SWTP_STOP;
2859
2860    off = packet_out->po_data_sz;
2861    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
2862                lsquic_packet_out_avail(packet_out), stream->tosend_off,
2863                size, crypto_frame_gen_read, fg_ctx);
2864    if (len < 0)
2865        return len;
2866
2867    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
2868                            packet_out->po_data + packet_out->po_data_sz, len);
2869    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2870    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
2871    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2872                                     stream, QUIC_FRAME_CRYPTO, off, len);
2873    if (s != 0)
2874    {
2875        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
2876        return -1;
2877    }
2878
2879    packet_out->po_flags |= PO_HELLO;
2880
2881    check_flush_threshold(stream);
2882    return SWTP_OK;
2883}
2884
2885
2886static void
2887abort_connection (struct lsquic_stream *stream)
2888{
2889    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
2890        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
2891                                                next_service_stream);
2892    stream->sm_qflags |= SMQF_ABORT_CONN;
2893    LSQ_WARN("connection will be aborted");
2894    maybe_conn_to_tickable(stream);
2895}
2896
2897
2898static void
2899maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
2900{
2901    struct stream_hq_frame *shf;
2902    uint64_t size;
2903    unsigned bits;
2904
2905    shf = find_cur_hq_frame(stream);
2906    if (!shf)
2907        return;
2908
2909    if (shf->shf_flags & SHF_FIXED_SIZE)
2910    {
2911        if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload)
2912            stream_hq_frame_put(stream, shf);
2913        return;
2914    }
2915
2916    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2917    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
2918    if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr)
2919    {
2920        if (0 == stream->sm_n_buffered)
2921            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
2922                                                shf->shf_frame_type, size);
2923        else
2924            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
2925                                                shf->shf_frame_type, size);
2926        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2927        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
2928        if (0 == stream->sm_n_buffered)
2929            stream_hq_frame_put(stream, shf);
2930        else
2931        {
2932            shf->shf_frame_size = size;
2933            shf->shf_flags |= SHF_FIXED_SIZE;
2934        }
2935    }
2936    else if (!shf->shf_frame_ptr)
2937    {
2938        LSQ_WARN("dangling HTTP/3 frame");
2939        stream->conn_pub->lconn->cn_if->ci_internal_error(
2940            stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64,
2941                stream->id);
2942        stream_hq_frame_put(stream, shf);
2943    }
2944    else if (!size)
2945    {
2946        assert(!shf->shf_frame_ptr);
2947        LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")",
2948                                        shf->shf_frame_type, shf->shf_off);
2949        stream_hq_frame_put(stream, shf);
2950    }
2951    else
2952    {
2953        assert(stream->sm_n_buffered);
2954        LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size);
2955        /* TODO: abort connection */
2956        stream_hq_frame_put(stream, shf);
2957    }
2958}
2959
2960
2961static ssize_t
2962stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
2963                         size_t thresh)
2964{
2965    size_t size;
2966    ssize_t nw;
2967    unsigned seen_ok;
2968    int use_framing;
2969    struct frame_gen_ctx fg_ctx = {
2970        .fgc_stream = stream,
2971        .fgc_reader = reader,
2972        .fgc_nread_from_reader = 0,
2973    };
2974
2975    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2976                                       == (SMBF_IETF|SMBF_USE_HEADERS);
2977    if (use_framing)
2978    {
2979        fg_ctx.fgc_size = frame_hq_gen_size;
2980        fg_ctx.fgc_read = frame_hq_gen_read;
2981        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
2982    }
2983    else
2984    {
2985        fg_ctx.fgc_size = frame_std_gen_size;
2986        fg_ctx.fgc_read = frame_std_gen_read;
2987        fg_ctx.fgc_fin = frame_std_gen_fin;
2988    }
2989
2990    seen_ok = 0;
2991    while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0)
2992           || fg_ctx.fgc_fin(&fg_ctx))
2993    {
2994        switch (stream->sm_write_to_packet(&fg_ctx, size))
2995        {
2996        case SWTP_OK:
2997            if (!seen_ok++)
2998                maybe_conn_to_tickable_if_writeable(stream, 0);
2999            if (fg_ctx.fgc_fin(&fg_ctx))
3000            {
3001                if (use_framing && seen_ok)
3002                    maybe_close_varsize_hq_frame(stream);
3003                stream->stream_flags |= STREAM_FIN_SENT;
3004                goto end;
3005            }
3006            else
3007                break;
3008        case SWTP_STOP:
3009            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3010            if (use_framing && seen_ok)
3011                maybe_close_varsize_hq_frame(stream);
3012            goto end;
3013        default:
3014            abort_connection(stream);
3015            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3016            return -1;
3017        }
3018    }
3019
3020    if (use_framing && seen_ok)
3021        maybe_close_varsize_hq_frame(stream);
3022
3023    if (thresh)
3024    {
3025        assert(size < thresh);
3026        assert(size >= stream->sm_n_buffered);
3027        size -= stream->sm_n_buffered;
3028        if (size > 0)
3029        {
3030            nw = save_to_buffer(stream, reader, size);
3031            if (nw < 0)
3032                return -1;
3033            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
3034        }
3035    }
3036    else
3037    {
3038        /* We count flushed data towards both stream and connection limits,
3039         * so we should have been able to packetize all of it:
3040         */
3041        assert(0 == stream->sm_n_buffered);
3042        assert(size == 0);
3043    }
3044
3045    maybe_mark_as_blocked(stream);
3046
3047  end:
3048    return fg_ctx.fgc_nread_from_reader;
3049}
3050
3051
3052/* Perform an implicit flush when we hit connection limit while buffering
3053 * data.  This is to prevent a (theoretical) stall:
3054 *
3055 * Imagine a number of streams, all of which buffered some data.  The buffered
3056 * data is up to connection cap, which means no further writes are possible.
3057 * None of them flushes, which means that data is not sent and connection
3058 * WINDOW_UPDATE frame never arrives from peer.  Stall.
3059 */
3060static int
3061maybe_flush_stream (struct lsquic_stream *stream)
3062{
3063    if (stream->sm_n_buffered > 0
3064          && (stream->sm_bflags & SMBF_CONN_LIMITED)
3065            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
3066        return stream_flush_nocheck(stream);
3067    else
3068        return 0;
3069}
3070
3071
3072static int
3073stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
3074                                                                    unsigned len)
3075{
3076    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
3077        && cur_off - shf->shf_off < (1 << 6)
3078        && cur_off - shf->shf_off + len >= (1 << 6)
3079        ;
3080}
3081
3082
3083/* Update currently buffered HQ frame or create a new one, if possible.
3084 * Return update length to be buffered.  If a HQ frame cannot be
3085 * buffered due to size, 0 is returned, thereby preventing both HQ frame
3086 * creation and buffering.
3087 */
3088static size_t
3089update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
3090                                                                size_t avail)
3091{
3092    struct stream_hq_frame *shf;
3093    uint64_t cur_off, end;
3094    size_t frame_sz;
3095    unsigned extendable;
3096
3097    cur_off = stream->sm_payload + stream->sm_n_buffered;
3098    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3099        if (shf->shf_off <= cur_off)
3100        {
3101            end = stream_hq_frame_end(shf);
3102            extendable = stream_hq_frame_extendable(shf, cur_off, len);
3103            if (cur_off < end + extendable)
3104                break;
3105        }
3106
3107    if (shf)
3108    {
3109        if (len > end + extendable - cur_off)
3110            len = end + extendable - cur_off;
3111        frame_sz = stream_hq_frame_size(shf);
3112    }
3113    else
3114    {
3115        assert(avail >= 3);
3116        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
3117        /* XXX malloc can fail */
3118        if (len > stream_hq_frame_end(shf) - cur_off)
3119            len = stream_hq_frame_end(shf) - cur_off;
3120        extendable = 0;
3121        frame_sz = stream_hq_frame_size(shf);
3122        if (avail < frame_sz)
3123            return 0;
3124        avail -= frame_sz;
3125    }
3126
3127    if (!(shf->shf_flags & SHF_CC_PAID))
3128    {
3129        incr_conn_cap(stream, frame_sz);
3130        shf->shf_flags |= SHF_CC_PAID;
3131    }
3132    if (extendable)
3133    {
3134        shf->shf_flags |= SHF_TWO_BYTES;
3135        incr_conn_cap(stream, 1);
3136        avail -= 1;
3137        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3138                && shf->shf_off <= stream->sm_payload
3139                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3140            stream->sm_flush_to += 1;
3141    }
3142
3143    if (len <= avail)
3144        return len;
3145    else
3146        return avail;
3147}
3148
3149
3150static ssize_t
3151save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3152                                                                size_t len)
3153{
3154    size_t avail, n_written, n_allowed;
3155
3156    avail = lsquic_stream_write_avail(stream);
3157    if (avail < len)
3158        len = avail;
3159    if (len == 0)
3160    {
3161        LSQ_DEBUG("zero-byte write (avail: %zu)", avail);
3162        return 0;
3163    }
3164
3165    n_allowed = stream_get_n_allowed(stream);
3166    assert(stream->sm_n_buffered + len <= n_allowed);
3167
3168    if (!stream->sm_buf)
3169    {
3170        stream->sm_buf = malloc(n_allowed);
3171        if (!stream->sm_buf)
3172            return -1;
3173        stream->sm_n_allocated = n_allowed;
3174    }
3175
3176    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3177                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3178        len = update_buffered_hq_frames(stream, len, avail);
3179
3180    n_written = reader->lsqr_read(reader->lsqr_ctx,
3181                        stream->sm_buf + stream->sm_n_buffered, len);
3182    stream->sm_n_buffered += n_written;
3183    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3184    incr_conn_cap(stream, n_written);
3185    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3186              n_written, stream->sm_n_buffered);
3187    if (0 != maybe_flush_stream(stream))
3188        return -1;
3189    return n_written;
3190}
3191
3192
3193static ssize_t
3194stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
3195{
3196    const struct stream_hq_frame *shf;
3197    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3198    ssize_t nw;
3199
3200    len = reader->lsqr_size(reader->lsqr_ctx);
3201    if (len == 0)
3202        return 0;
3203
3204    frames = 0;
3205    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3206                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3207        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3208            if (shf->shf_off >= stream->sm_payload)
3209                frames += stream_hq_frame_size(shf);
3210    total_len = len + frames + stream->sm_n_buffered;
3211    thresh = lsquic_stream_flush_threshold(stream, total_len);
3212    n_allowed = stream_get_n_allowed(stream);
3213    if (total_len <= n_allowed && total_len < thresh)
3214    {
3215        nwritten = 0;
3216        do
3217        {
3218            nw = save_to_buffer(stream, reader, len - nwritten);
3219            if (nw > 0)
3220                nwritten += (size_t) nw;
3221            else if (nw == 0)
3222                break;
3223            else
3224                return nw;
3225        }
3226        while (nwritten < len
3227                        && stream->sm_n_buffered < stream->sm_n_allocated);
3228        return nwritten;
3229    }
3230    else
3231        return stream_write_to_packets(stream, reader, thresh);
3232}
3233
3234
3235ssize_t
3236lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3237{
3238    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3239    return lsquic_stream_writev(stream, &iov, 1);
3240}
3241
3242
3243struct inner_reader_iovec {
3244    const struct iovec       *iov;
3245    const struct iovec *end;
3246    unsigned                  cur_iovec_off;
3247};
3248
3249
3250static size_t
3251inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3252{
3253    struct inner_reader_iovec *const iro = ctx;
3254    unsigned char *p = buf;
3255    unsigned char *const end = p + count;
3256    unsigned n_tocopy;
3257
3258    while (iro->iov < iro->end && p < end)
3259    {
3260        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3261        if (n_tocopy > (unsigned) (end - p))
3262            n_tocopy = end - p;
3263        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3264                                                                    n_tocopy);
3265        p += n_tocopy;
3266        iro->cur_iovec_off += n_tocopy;
3267        if (iro->iov->iov_len == iro->cur_iovec_off)
3268        {
3269            ++iro->iov;
3270            iro->cur_iovec_off = 0;
3271        }
3272    }
3273
3274    return p + count - end;
3275}
3276
3277
3278static size_t
3279inner_reader_iovec_size (void *ctx)
3280{
3281    struct inner_reader_iovec *const iro = ctx;
3282    const struct iovec *iov;
3283    size_t size;
3284
3285    size = 0;
3286    for (iov = iro->iov; iov < iro->end; ++iov)
3287        size += iov->iov_len;
3288
3289    return size - iro->cur_iovec_off;
3290}
3291
3292
3293ssize_t
3294lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3295                                                                    int iovcnt)
3296{
3297    COMMON_WRITE_CHECKS();
3298    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3299
3300    struct inner_reader_iovec iro = {
3301        .iov = iov,
3302        .end = iov + iovcnt,
3303        .cur_iovec_off = 0,
3304    };
3305    struct lsquic_reader reader = {
3306        .lsqr_read = inner_reader_iovec_read,
3307        .lsqr_size = inner_reader_iovec_size,
3308        .lsqr_ctx  = &iro,
3309    };
3310
3311    return stream_write(stream, &reader);
3312}
3313
3314
3315ssize_t
3316lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3317{
3318    COMMON_WRITE_CHECKS();
3319    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3320    return stream_write(stream, reader);
3321}
3322
3323
3324/* This bypasses COMMON_WRITE_CHECKS */
3325static ssize_t
3326stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3327{
3328    struct iovec iov = { (void *) buf, sz, };
3329    struct inner_reader_iovec iro = {
3330        .iov = &iov,
3331        .end = &iov + 1,
3332        .cur_iovec_off = 0,
3333    };
3334    struct lsquic_reader reader = {
3335        .lsqr_read = inner_reader_iovec_read,
3336        .lsqr_size = inner_reader_iovec_size,
3337        .lsqr_ctx  = &iro,
3338    };
3339    return stream_write(stream, &reader);
3340}
3341
3342
3343/* XXX Move this define elsewhere? */
3344#define MAX_HEADERS_SIZE (64 * 1024)
3345
3346static int
3347send_headers_ietf (struct lsquic_stream *stream,
3348                            const struct lsquic_http_headers *headers, int eos)
3349{
3350    enum qwh_status qwh;
3351    const size_t max_prefix_size =
3352                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
3353    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
3354    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
3355    unsigned bits;
3356    ssize_t nw;
3357    unsigned char *header_block;
3358    enum lsqpack_enc_header_flags hflags;
3359    unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE];
3360
3361    stream->stream_flags &= ~STREAM_PUSHING;
3362    stream->stream_flags |= STREAM_NOPUSH;
3363
3364    /* TODO: Optimize for the common case: write directly to sm_buf and fall
3365     * back to a larger buffer if that fails.
3366     */
3367    prefix_sz = max_prefix_size;
3368    headers_sz = sizeof(buf) - max_prefix_size - max_push_size;
3369    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
3370                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
3371                &headers_sz, &stream->sm_hb_compl, &hflags);
3372
3373    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
3374    {
3375        if (qwh == QWH_ENOBUF)
3376            LSQ_INFO("not enough room for header block");
3377        else
3378            LSQ_WARN("internal error encoding and sending HTTP headers");
3379        return -1;
3380    }
3381
3382    if (hflags & LSQECH_REF_NEW_ENTRIES)
3383        stream->stream_flags |= STREAM_ENCODER_DEP;
3384
3385    if (stream->sm_promise)
3386    {
3387        assert(lsquic_stream_is_pushed(stream));
3388        bits = vint_val2bits(stream->sm_promise->pp_id);
3389        push_sz = 1 + (1 << bits);
3390        if (!stream_activate_hq_frame(stream,
3391                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
3392                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
3393            return -1;
3394        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
3395        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
3396                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
3397    }
3398    else
3399        push_sz = 0;
3400
3401    /* Construct contiguous header block buffer including HQ framing */
3402    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
3403    hblock_sz = push_sz + prefix_sz + headers_sz;
3404    if (!stream_activate_hq_frame(stream,
3405                stream->sm_payload + stream->sm_n_buffered + push_sz,
3406                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
3407        return -1;
3408
3409    if (qwh == QWH_FULL)
3410    {
3411        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
3412        if (lsquic_stream_write_avail(stream))
3413        {
3414            nw = stream_write_buf(stream, header_block, hblock_sz);
3415            if (nw < 0)
3416            {
3417                LSQ_WARN("cannot write to stream: %s", strerror(errno));
3418                return -1;
3419            }
3420            if ((size_t) nw == hblock_sz)
3421            {
3422                stream->stream_flags |= STREAM_HEADERS_SENT;
3423                stream_hblock_sent(stream);
3424                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
3425                return 0;
3426            }
3427            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
3428        }
3429        else
3430        {
3431            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
3432                                        "header block", hblock_sz);
3433            nw = 0;
3434        }
3435    }
3436    else
3437    {
3438        stream->sm_send_headers_state = SSHS_ENC_SENDING;
3439        nw = 0;
3440    }
3441
3442    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
3443    stream_wantwrite(stream, 1);
3444
3445    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
3446    if (!stream->sm_header_block)
3447    {
3448        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
3449            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
3450        return -1;
3451    }
3452    memcpy(stream->sm_header_block, header_block + (size_t) nw,
3453                                                hblock_sz - (size_t) nw);
3454    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
3455    stream->sm_hblock_off = 0;
3456    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
3457    return 0;
3458}
3459
3460
3461static int
3462send_headers_gquic (struct lsquic_stream *stream,
3463                            const struct lsquic_http_headers *headers, int eos)
3464{
3465    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
3466                stream->id, headers, eos, lsquic_stream_priority(stream));
3467    if (0 == s)
3468    {
3469        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
3470        stream->stream_flags |= STREAM_HEADERS_SENT;
3471        if (eos)
3472            stream->stream_flags |= STREAM_FIN_SENT;
3473        LSQ_INFO("sent headers");
3474    }
3475    else
3476        LSQ_WARN("could not send headers: %s", strerror(errno));
3477    return s;
3478}
3479
3480
3481int
3482lsquic_stream_send_headers (lsquic_stream_t *stream,
3483                            const lsquic_http_headers_t *headers, int eos)
3484{
3485    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3486            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
3487    {
3488        if (stream->sm_bflags & SMBF_IETF)
3489            return send_headers_ietf(stream, headers, eos);
3490        else
3491            return send_headers_gquic(stream, headers, eos);
3492    }
3493    else
3494    {
3495        LSQ_INFO("cannot send headers in this state");
3496        errno = EBADMSG;
3497        return -1;
3498    }
3499}
3500
3501
3502void
3503lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
3504{
3505    if (offset > stream->max_send_off)
3506    {
3507        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
3508        LSQ_DEBUG("update max send offset from 0x%"PRIX64" to "
3509            "0x%"PRIX64, stream->max_send_off, offset);
3510        stream->max_send_off = offset;
3511    }
3512    else
3513        LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old "
3514            "max send offset 0x%"PRIX64", ignoring", offset,
3515            stream->max_send_off);
3516}
3517
3518
3519/* This function is used to update offsets after handshake completes and we
3520 * learn of peer's limits from the handshake values.
3521 */
3522int
3523lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
3524{
3525    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
3526    if (offset > stream->max_send_off)
3527    {
3528        lsquic_stream_window_update(stream, offset);
3529        return 0;
3530    }
3531    else if (offset < stream->tosend_off)
3532    {
3533        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
3534            "data already sent on this stream (%"PRIu64" bytes)", offset,
3535            stream->tosend_off);
3536        return -1;
3537    }
3538    else
3539    {
3540        stream->max_send_off = offset;
3541        return 0;
3542    }
3543}
3544
3545
3546void
3547lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
3548{
3549    lsquic_stream_reset_ext(stream, error_code, 1);
3550}
3551
3552
3553void
3554lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
3555                         int do_close)
3556{
3557    if ((stream->stream_flags & STREAM_RST_SENT)
3558                                    || (stream->sm_qflags & SMQF_SEND_RST))
3559    {
3560        LSQ_INFO("reset already sent");
3561        return;
3562    }
3563
3564    SM_HISTORY_APPEND(stream, SHE_RESET);
3565
3566    LSQ_INFO("reset, error code %"PRIu64, error_code);
3567    stream->error_code = error_code;
3568
3569    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
3570        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
3571                                                        next_send_stream);
3572    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
3573    stream->sm_qflags |= SMQF_SEND_RST;
3574
3575    if (stream->sm_qflags & SMQF_QPACK_DEC)
3576    {
3577        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
3578        stream->sm_qflags |= ~SMQF_QPACK_DEC;
3579    }
3580
3581    drop_buffered_data(stream);
3582    maybe_elide_stream_frames(stream);
3583    maybe_schedule_call_on_close(stream);
3584
3585    if (do_close)
3586        lsquic_stream_close(stream);
3587    else
3588        maybe_conn_to_tickable_if_writeable(stream, 1);
3589}
3590
3591
3592lsquic_stream_id_t
3593lsquic_stream_id (const lsquic_stream_t *stream)
3594{
3595    return stream->id;
3596}
3597
3598
3599#if !defined(NDEBUG) && __GNUC__
3600__attribute__((weak))
3601#endif
3602struct lsquic_conn *
3603lsquic_stream_conn (const lsquic_stream_t *stream)
3604{
3605    return stream->conn_pub->lconn;
3606}
3607
3608
3609int
3610lsquic_stream_close (lsquic_stream_t *stream)
3611{
3612    LSQ_DEBUG("lsquic_stream_close() called");
3613    SM_HISTORY_APPEND(stream, SHE_CLOSE);
3614    if (lsquic_stream_is_closed(stream))
3615    {
3616        LSQ_INFO("Attempt to close an already-closed stream");
3617        errno = EBADF;
3618        return -1;
3619    }
3620    maybe_stream_shutdown_write(stream);
3621    stream_shutdown_read(stream);
3622    maybe_schedule_call_on_close(stream);
3623    maybe_finish_stream(stream);
3624    if (!(stream->stream_flags & STREAM_DELAYED_SW))
3625        maybe_conn_to_tickable_if_writeable(stream, 1);
3626    return 0;
3627}
3628
3629
3630#ifndef NDEBUG
3631#if __GNUC__
3632__attribute__((weak))
3633#endif
3634#endif
3635void
3636lsquic_stream_acked (struct lsquic_stream *stream,
3637                                            enum quic_frame_type frame_type)
3638{
3639    assert(stream->n_unacked);
3640    --stream->n_unacked;
3641    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
3642    if (frame_type == QUIC_FRAME_RST_STREAM)
3643    {
3644        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
3645        LSQ_DEBUG("RESET that we sent has been acked by peer");
3646        stream->stream_flags |= STREAM_RST_ACKED;
3647    }
3648    if (0 == stream->n_unacked)
3649        maybe_finish_stream(stream);
3650}
3651
3652
3653void
3654lsquic_stream_push_req (lsquic_stream_t *stream,
3655                        struct uncompressed_headers *push_req)
3656{
3657    assert(!stream->push_req);
3658    stream->push_req = push_req;
3659    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
3660}
3661
3662
3663int
3664lsquic_stream_is_pushed (const lsquic_stream_t *stream)
3665{
3666    enum stream_id_type sit;
3667
3668    if (stream->sm_bflags & SMBF_IETF)
3669    {
3670        sit = stream->id & SIT_MASK;
3671        return sit == SIT_UNI_SERVER;
3672    }
3673    else
3674        return 1 & ~stream->id;
3675}
3676
3677
3678int
3679lsquic_stream_push_info (const lsquic_stream_t *stream,
3680                          lsquic_stream_id_t *ref_stream_id, void **hset)
3681{
3682    if (lsquic_stream_is_pushed(stream))
3683    {
3684        assert(stream->push_req);
3685        *ref_stream_id = stream->push_req->uh_stream_id;
3686        *hset          = stream->push_req->uh_hset;
3687        return 0;
3688    }
3689    else
3690        return -1;
3691}
3692
3693
3694int
3695lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
3696{
3697    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3698                                    && !(stream->stream_flags & STREAM_HAVE_UH))
3699    {
3700        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3701        LSQ_DEBUG("received uncompressed headers");
3702        stream->stream_flags |= STREAM_HAVE_UH;
3703        if (uh->uh_flags & UH_FIN)
3704        {
3705            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3706             * mark request as done:
3707             */
3708            if (stream->sm_bflags & SMBF_IETF)
3709                assert((stream->sm_bflags & SMBF_SERVER)
3710                                            && lsquic_stream_is_pushed(stream));
3711            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3712        }
3713        stream->uh = uh;
3714        if (uh->uh_oth_stream_id == 0)
3715        {
3716            if (uh->uh_weight)
3717                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3718        }
3719        else
3720            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3721                                                        uh->uh_oth_stream_id);
3722        return 0;
3723    }
3724    else
3725    {
3726        LSQ_ERROR("received unexpected uncompressed headers");
3727        return -1;
3728    }
3729}
3730
3731
3732unsigned
3733lsquic_stream_priority (const lsquic_stream_t *stream)
3734{
3735    return 256 - stream->sm_priority;
3736}
3737
3738
3739int
3740lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
3741{
3742    /* The user should never get a reference to the special streams,
3743     * but let's check just in case:
3744     */
3745    if (lsquic_stream_is_critical(stream))
3746        return -1;
3747    if (priority < 1 || priority > 256)
3748        return -1;
3749    stream->sm_priority = 256 - priority;
3750    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
3751    LSQ_DEBUG("set priority to %u", priority);
3752    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
3753    return 0;
3754}
3755
3756
3757static int
3758maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
3759{
3760    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3761                            && (stream->stream_flags & STREAM_HEADERS_SENT))
3762    {
3763        /* We need to send headers only if we are a) using HEADERS stream
3764         * and b) we already sent initial headers.  If initial headers
3765         * have not been sent yet, stream priority will be sent in the
3766         * HEADERS frame.
3767         */
3768        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
3769                                                stream->id, 0, 0, priority);
3770    }
3771    else
3772        return 0;
3773}
3774
3775
3776static int
3777send_priority_ietf (struct lsquic_stream *stream, unsigned priority)
3778{
3779    LSQ_WARN("%s: TODO", __func__);     /* TODO */
3780    return -1;
3781}
3782
3783
3784int
3785lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
3786{
3787    if (0 == lsquic_stream_set_priority_internal(stream, priority))
3788    {
3789        if (stream->sm_bflags & SMBF_IETF)
3790            return send_priority_ietf(stream, priority);
3791        else
3792            return maybe_send_priority_gquic(stream, priority);
3793    }
3794    else
3795        return -1;
3796}
3797
3798
3799lsquic_stream_ctx_t *
3800lsquic_stream_get_ctx (const lsquic_stream_t *stream)
3801{
3802    return stream->st_ctx;
3803}
3804
3805
3806int
3807lsquic_stream_refuse_push (lsquic_stream_t *stream)
3808{
3809    if (lsquic_stream_is_pushed(stream)
3810            && !(stream->sm_qflags & SMQF_SEND_RST)
3811            && !(stream->stream_flags & STREAM_RST_SENT))
3812    {
3813        LSQ_DEBUG("refusing pushed stream: send reset");
3814        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
3815        return 0;
3816    }
3817    else
3818        return -1;
3819}
3820
3821
3822size_t
3823lsquic_stream_mem_used (const struct lsquic_stream *stream)
3824{
3825    size_t size;
3826
3827    size = sizeof(stream);
3828    if (stream->sm_buf)
3829        size += stream->sm_n_allocated;
3830    if (stream->data_in)
3831        size += stream->data_in->di_if->di_mem_used(stream->data_in);
3832
3833    return size;
3834}
3835
3836
3837const lsquic_cid_t *
3838lsquic_stream_cid (const struct lsquic_stream *stream)
3839{
3840    return LSQUIC_LOG_CONN_ID;
3841}
3842
3843
3844void
3845lsquic_stream_dump_state (const struct lsquic_stream *stream)
3846{
3847    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
3848                                                    stream->read_offset);
3849    stream->data_in->di_if->di_dump_state(stream->data_in);
3850}
3851
3852
3853void *
3854lsquic_stream_get_hset (struct lsquic_stream *stream)
3855{
3856    void *hset;
3857
3858    if (lsquic_stream_is_reset(stream))
3859    {
3860        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
3861        errno = ECONNRESET;
3862        return NULL;
3863    }
3864
3865    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
3866                                && (stream->stream_flags & STREAM_HAVE_UH)))
3867    {
3868        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
3869                                                        stream->stream_flags);
3870        return NULL;
3871    }
3872
3873    if (!stream->uh)
3874    {
3875        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
3876        return NULL;
3877    }
3878
3879    if (stream->uh->uh_flags & UH_H1H)
3880    {
3881        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
3882        return NULL;
3883    }
3884
3885    hset = stream->uh->uh_hset;
3886    stream->uh->uh_hset = NULL;
3887    destroy_uh(stream);
3888    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
3889    {
3890        stream->stream_flags |= STREAM_FIN_REACHED;
3891        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
3892    }
3893    LSQ_DEBUG("return header set");
3894    return hset;
3895}
3896
3897
3898/* GQUIC-only function */
3899int
3900lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id)
3901{
3902    return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE
3903        || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http);
3904}
3905
3906
3907int
3908lsquic_stream_is_critical (const struct lsquic_stream *stream)
3909{
3910    return stream->sm_bflags & SMBF_CRITICAL;
3911}
3912
3913
3914void
3915lsquic_stream_set_stream_if (struct lsquic_stream *stream,
3916           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
3917{
3918    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
3919    stream->stream_if    = stream_if;
3920    stream->sm_onnew_arg = stream_if_ctx;
3921    LSQ_DEBUG("switched interface");
3922    assert(stream->stream_flags & STREAM_ONNEW_DONE);
3923    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
3924                                                      stream);
3925}
3926
3927
3928static int
3929update_type_hist_and_check (struct hq_filter *filter)
3930{
3931    /* 3-bit codes: */
3932    enum {
3933        CODE_UNSET,
3934        CODE_HEADER,    /* H    Header  */
3935        CODE_DATA,      /* D    Data    */
3936        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
3937    };
3938    static const unsigned valid_seqs[] = {
3939        /* Ordered by expected frequency */
3940        0123,   /* HD+  */
3941        012,    /* HD   */
3942        01,     /* H    */
3943        01231,  /* HD+H */
3944        0121,   /* HDH  */
3945    };
3946    unsigned code, i;
3947
3948    switch (filter->hqfi_type)
3949    {
3950    case HQFT_HEADERS:
3951        code = CODE_HEADER;
3952        break;
3953    case HQFT_DATA:
3954        code = CODE_DATA;
3955        break;
3956    default:
3957        /* Ignore unknown frames */
3958        return 0;
3959    }
3960
3961    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
3962        return -1;
3963
3964    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
3965    {
3966        filter->hqfi_hist_buf <<= 3;
3967        filter->hqfi_hist_buf |= CODE_PLUS;
3968        filter->hqfi_hist_idx++;
3969    }
3970    else if (filter->hqfi_hist_idx > 1
3971            && ((filter->hqfi_hist_buf >> 3) & 7) == code
3972            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
3973        /* Keep it at plus, do nothing */;
3974    else
3975    {
3976        filter->hqfi_hist_buf <<= 3;
3977        filter->hqfi_hist_buf |= code;
3978        filter->hqfi_hist_idx++;
3979    }
3980
3981    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
3982        if (filter->hqfi_hist_buf == valid_seqs[i])
3983            return 0;
3984
3985    return -1;
3986}
3987
3988
3989static size_t
3990hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
3991{
3992    struct lsquic_stream *const stream = ctx;
3993    struct hq_filter *const filter = &stream->sm_hq_filter;
3994    const unsigned char *p = buf, *prev;
3995    const unsigned char *const end = buf + sz;
3996    struct lsquic_conn *lconn;
3997    enum lsqpack_read_header_status rhs;
3998    int s;
3999
4000    while (p < end)
4001    {
4002        switch (filter->hqfi_state)
4003        {
4004        case HQFI_STATE_FRAME_HEADER_BEGIN:
4005            filter->hqfi_vint_state.vr2s_state = 0;
4006            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
4007            /* fall-through */
4008        case HQFI_STATE_FRAME_HEADER_CONTINUE:
4009            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state);
4010            if (s < 0)
4011                break;
4012            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
4013            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
4014            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
4015                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
4016                filter->hqfi_left);
4017            if (0 != update_type_hist_and_check(filter))
4018            {
4019                lconn = stream->conn_pub->lconn;
4020                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4021                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
4022                    filter->hqfi_hist_buf);
4023                lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED,
4024                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
4025                    stream->id);
4026                goto end;
4027            }
4028            if (filter->hqfi_type == HQFT_HEADERS)
4029            {
4030                if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS))
4031                    filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS;
4032                else
4033                {
4034                    filter->hqfi_type = (1ull << 62) - 1;
4035                    LSQ_DEBUG("Ignoring HEADERS frame");
4036                }
4037            }
4038            if (filter->hqfi_left > 0)
4039            {
4040                if (filter->hqfi_type == HQFT_DATA)
4041                    goto end;
4042                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
4043                {
4044                    lconn = stream->conn_pub->lconn;
4045                    if (stream->sm_bflags & SMBF_SERVER)
4046                        lconn->cn_if->ci_abort_error(lconn, 1,
4047                            HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame "
4048                            "on stream %"PRIu64" (clients are not supposed to "
4049                            "send those)", stream->id);
4050                    else
4051                        /* Our client implementation does not support server
4052                         * push.
4053                         */
4054                        lconn->cn_if->ci_abort_error(lconn, 1,
4055                            HEC_FRAME_UNEXPECTED,
4056                            "Received PUSH_PROMISE frame (not supported)"
4057                            "on stream %"PRIu64, stream->id);
4058                    goto end;
4059                }
4060            }
4061            else
4062            {
4063                switch (filter->hqfi_type)
4064                {
4065                case HQFT_CANCEL_PUSH:
4066                case HQFT_GOAWAY:
4067                case HQFT_HEADERS:
4068                case HQFT_MAX_PUSH_ID:
4069                case HQFT_PUSH_PROMISE:
4070                case HQFT_SETTINGS:
4071                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4072                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
4073                                                            filter->hqfi_type);
4074                    abort_connection(stream);   /* XXX Overkill? */
4075                    goto end;
4076                default:
4077                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4078                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4079                    break;
4080                }
4081            }
4082            break;
4083        case HQFI_STATE_READING_PAYLOAD:
4084            if (filter->hqfi_type == HQFT_DATA)
4085                goto end;
4086            sz = filter->hqfi_left;
4087            if (sz > (uintptr_t) (end - p))
4088                sz = (uintptr_t) (end - p);
4089            switch (filter->hqfi_type)
4090            {
4091            case HQFT_HEADERS:
4092                prev = p;
4093                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
4094                {
4095                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4096                    rhs = lsquic_qdh_header_in_begin(
4097                                stream->conn_pub->u.ietf.qdh,
4098                                stream, filter->hqfi_left, &p, sz);
4099                }
4100                else
4101                    rhs = lsquic_qdh_header_in_continue(
4102                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
4103                assert(p > prev || LQRHS_ERROR == rhs);
4104                filter->hqfi_left -= p - prev;
4105                if (filter->hqfi_left == 0)
4106                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4107                switch (rhs)
4108                {
4109                case LQRHS_DONE:
4110                    assert(filter->hqfi_left == 0);
4111                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4112                    break;
4113                case LQRHS_NEED:
4114                    stream->sm_qflags |= SMQF_QPACK_DEC;
4115                    break;
4116                case LQRHS_BLOCKED:
4117                    stream->sm_qflags |= SMQF_QPACK_DEC;
4118                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
4119                    goto end;
4120                default:
4121                    assert(LQRHS_ERROR == rhs);
4122                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4123                    LSQ_INFO("error processing header block");
4124                    abort_connection(stream);   /* XXX Overkill? */
4125                    goto end;
4126                }
4127                break;
4128            default:
4129                /* Simply skip unknown frame type payload for now */
4130                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4131                p += sz;
4132                filter->hqfi_left -= sz;
4133                if (filter->hqfi_left == 0)
4134                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4135                break;
4136            }
4137            break;
4138        default:
4139            assert(0);
4140            goto end;
4141        }
4142    }
4143
4144  end:
4145    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4146    {
4147        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4148                                                        filter->hqfi_state);
4149        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4150/* From [draft-ietf-quic-http-16] Section 3.1:
4151 *               When a stream terminates cleanly, if the last frame on
4152 * the stream was truncated, this MUST be treated as a connection error
4153 * (see HTTP_MALFORMED_FRAME in Section 8.1).
4154 */
4155        abort_connection(stream);
4156    }
4157
4158    return p - buf;
4159}
4160
4161
4162static int
4163hq_filter_readable_now (const struct lsquic_stream *stream)
4164{
4165    const struct hq_filter *const filter = &stream->sm_hq_filter;
4166
4167    return (filter->hqfi_type == HQFT_DATA
4168                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4169        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4170        || stream->uh
4171        || (stream->stream_flags & STREAM_FIN_REACHED)
4172    ;
4173}
4174
4175
4176static int
4177hq_filter_readable (struct lsquic_stream *stream)
4178{
4179    struct hq_filter *const filter = &stream->sm_hq_filter;
4180    struct read_frames_status rfs;
4181
4182    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
4183        return 0;
4184
4185    if (!hq_filter_readable_now(stream))
4186    {
4187        rfs = read_data_frames(stream, 0, hq_read, stream);
4188        if (rfs.total_nread == 0)
4189        {
4190            if (rfs.error)
4191            {
4192                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4193                abort_connection(stream);   /* XXX Overkill? */
4194                return 1;   /* Collect error */
4195            }
4196            return 0;
4197        }
4198    }
4199
4200    return hq_filter_readable_now(stream);
4201}
4202
4203
4204static size_t
4205hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
4206{
4207    struct hq_filter *const filter = &stream->sm_hq_filter;
4208    size_t nr;
4209
4210    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4211                                            && filter->hqfi_type == HQFT_DATA))
4212    {
4213        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
4214                            data_frame->df_size - data_frame->df_read_off,
4215                            data_frame->df_fin);
4216        if (nr)
4217        {
4218            stream->read_offset += nr;
4219            stream_consumed_bytes(stream);
4220        }
4221    }
4222    else
4223        nr = 0;
4224
4225    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
4226    {
4227        data_frame->df_read_off += nr;
4228        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4229                                        && filter->hqfi_type == HQFT_DATA)
4230            return MIN(filter->hqfi_left,
4231                    (unsigned) data_frame->df_size - data_frame->df_read_off);
4232        else
4233        {
4234            assert(data_frame->df_read_off == data_frame->df_size);
4235            return 0;
4236        }
4237    }
4238    else
4239    {
4240        data_frame->df_read_off = data_frame->df_size;
4241        return 0;
4242    }
4243}
4244
4245
4246static void
4247hq_decr_left (struct lsquic_stream *stream, size_t read)
4248{
4249    struct hq_filter *const filter = &stream->sm_hq_filter;
4250
4251    if (read)
4252    {
4253        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4254                                            && filter->hqfi_type == HQFT_DATA);
4255        assert(read <= filter->hqfi_left);
4256    }
4257
4258    filter->hqfi_left -= read;
4259    if (0 == filter->hqfi_left)
4260        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4261}
4262
4263
4264struct qpack_dec_hdl *
4265lsquic_stream_get_qdh (const struct lsquic_stream *stream)
4266{
4267    return stream->conn_pub->u.ietf.qdh;
4268}
4269
4270
4271/* These are IETF QUIC states */
4272enum stream_state_sending
4273lsquic_stream_sending_state (const struct lsquic_stream *stream)
4274{
4275    if (0 == (stream->stream_flags & STREAM_RST_SENT))
4276    {
4277        if (stream->stream_flags & STREAM_FIN_SENT)
4278        {
4279            if (stream->n_unacked)
4280                return SSS_DATA_SENT;
4281            else
4282                return SSS_DATA_RECVD;
4283        }
4284        else
4285        {
4286            if (stream->tosend_off
4287                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
4288                return SSS_SEND;
4289            else
4290                return SSS_READY;
4291        }
4292    }
4293    else if (stream->stream_flags & STREAM_RST_ACKED)
4294        return SSS_RESET_RECVD;
4295    else
4296        return SSS_RESET_SENT;
4297}
4298
4299
4300const char *const lsquic_sss2str[] =
4301{
4302    [SSS_READY]        =  "Ready",
4303    [SSS_SEND]         =  "Send",
4304    [SSS_DATA_SENT]    =  "Data Sent",
4305    [SSS_RESET_SENT]   =  "Reset Sent",
4306    [SSS_DATA_RECVD]   =  "Data Recvd",
4307    [SSS_RESET_RECVD]  =  "Reset Recvd",
4308};
4309
4310
4311const char *const lsquic_ssr2str[] =
4312{
4313    [SSR_RECV]         =  "Recv",
4314    [SSR_SIZE_KNOWN]   =  "Size Known",
4315    [SSR_DATA_RECVD]   =  "Data Recvd",
4316    [SSR_RESET_RECVD]  =  "Reset Recvd",
4317    [SSR_DATA_READ]    =  "Data Read",
4318    [SSR_RESET_READ]   =  "Reset Read",
4319};
4320
4321
4322/* These are IETF QUIC states */
4323enum stream_state_receiving
4324lsquic_stream_receiving_state (struct lsquic_stream *stream)
4325{
4326    uint64_t n_bytes;
4327
4328    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
4329    {
4330        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
4331            return SSR_RECV;
4332        if (stream->stream_flags & STREAM_FIN_REACHED)
4333            return SSR_DATA_READ;
4334        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
4335        {
4336            n_bytes = stream->data_in->di_if->di_readable_bytes(
4337                                    stream->data_in, stream->read_offset);
4338            if (stream->read_offset + n_bytes == stream->sm_fin_off)
4339            {
4340                stream->stream_flags |= STREAM_DATA_RECVD;
4341                return SSR_DATA_RECVD;
4342            }
4343            else
4344                return SSR_SIZE_KNOWN;
4345        }
4346        else
4347            return SSR_DATA_RECVD;
4348    }
4349    else if (stream->stream_flags & STREAM_RST_READ)
4350        return SSR_RESET_READ;
4351    else
4352        return SSR_RESET_RECVD;
4353}
4354
4355
4356void
4357lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
4358{
4359    struct hq_filter *const filter = &stream->sm_hq_filter;
4360
4361    assert(stream->sm_qflags & SMQF_QPACK_DEC);
4362    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
4363
4364    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
4365    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
4366    LSQ_DEBUG("QPACK decoder unblocked");
4367}
4368
4369
4370int
4371lsquic_stream_is_rejected (const struct lsquic_stream *stream)
4372{
4373    return stream->stream_flags & STREAM_SS_RECVD;
4374}
4375
4376
4377int
4378lsquic_stream_can_push (const struct lsquic_stream *stream)
4379{
4380    if (lsquic_stream_is_pushed(stream))
4381        return 0;
4382    else if (stream->sm_bflags & SMBF_IETF)
4383        return (stream->sm_bflags & SMBF_USE_HEADERS)
4384            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
4385            && stream->sm_send_headers_state == SSHS_BEGIN
4386            ;
4387    else
4388        return 1;
4389}
4390
4391
4392static size_t
4393dp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4394{
4395    struct lsquic_stream *const stream = lsqr_ctx;
4396    unsigned char *dst = buf;
4397    unsigned char *const end = buf + count;
4398    size_t len;
4399
4400    len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off),
4401                                                        (size_t) (end - dst));
4402    memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len);
4403    stream->sm_dup_push_off += len;
4404
4405    if (stream->sm_dup_push_len == stream->sm_dup_push_off)
4406        LSQ_DEBUG("finish writing duplicate push");
4407
4408    return len;
4409}
4410
4411
4412static size_t
4413dp_reader_size (void *lsqr_ctx)
4414{
4415    struct lsquic_stream *const stream = lsqr_ctx;
4416
4417    return stream->sm_dup_push_len - stream->sm_dup_push_off;
4418}
4419
4420
4421static void
4422init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader)
4423{
4424    reader->lsqr_read = dp_reader_read;
4425    reader->lsqr_size = dp_reader_size;
4426    reader->lsqr_ctx = stream;
4427}
4428
4429
4430static void
4431on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4432{
4433    struct lsquic_reader dp_reader;
4434    ssize_t nw;
4435    int want_write;
4436
4437    assert(stream->sm_dup_push_off < stream->sm_dup_push_len);
4438
4439    init_dp_reader(stream, &dp_reader);
4440    nw = stream_write(stream, &dp_reader);
4441    if (nw > 0)
4442    {
4443        LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)",
4444            nw, stream->sm_dup_push_off == stream->sm_dup_push_len ?
4445            "done" : "not done");
4446        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4447        {
4448            /* Restore want_write flag */
4449            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4450            if (want_write != stream->sm_saved_want_write)
4451                (void) lsquic_stream_wantwrite(stream,
4452                                                stream->sm_saved_want_write);
4453        }
4454    }
4455    else if (nw < 0)
4456    {
4457        LSQ_WARN("could not write duplicate push (wrapper)");
4458        /* XXX What should happen if we hit an error? TODO */
4459    }
4460}
4461
4462
4463int
4464lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id)
4465{
4466    struct lsquic_reader dp_reader;
4467    unsigned bits, len;
4468    ssize_t nw;
4469
4470    assert(stream->sm_bflags & SMBF_IETF);
4471    assert(lsquic_stream_can_push(stream));
4472
4473    bits = vint_val2bits(push_id);
4474    len = 1 << bits;
4475
4476    if (!stream_activate_hq_frame(stream,
4477            stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH,
4478            SHF_FIXED_SIZE, len))
4479        return -1;
4480
4481    stream->stream_flags |= STREAM_PUSHING;
4482
4483    stream->sm_dup_push_len = len;
4484    stream->sm_dup_push_off = 0;
4485    vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits);
4486
4487    init_dp_reader(stream, &dp_reader);
4488    nw = stream_write(stream, &dp_reader);
4489    if (nw > 0)
4490    {
4491        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4492            LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id);
4493        else
4494        {
4495            LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id);
4496            stream->stream_flags |= STREAM_NOPUSH;
4497            stream->sm_saved_want_write =
4498                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4499            stream_wantwrite(stream, 1);
4500        }
4501        return 0;
4502    }
4503    else
4504    {
4505        if (nw < 0)
4506            LSQ_WARN("failure writing DUPLICATE_PUSH");
4507        stream->stream_flags |= STREAM_NOPUSH;
4508        stream->stream_flags &= ~STREAM_PUSHING;
4509        return -1;
4510    }
4511}
4512
4513
4514static size_t
4515pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4516{
4517    struct push_promise *const promise = lsqr_ctx;
4518    unsigned char *dst = buf;
4519    unsigned char *const end = buf + count;
4520    size_t len;
4521
4522    while (dst < end)
4523    {
4524        switch (promise->pp_write_state)
4525        {
4526        case PPWS_ID0:
4527        case PPWS_ID1:
4528        case PPWS_ID2:
4529        case PPWS_ID3:
4530        case PPWS_ID4:
4531        case PPWS_ID5:
4532        case PPWS_ID6:
4533        case PPWS_ID7:
4534            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
4535            ++promise->pp_write_state;
4536            break;
4537        case PPWS_PFX0:
4538            *dst++ = 0;
4539            ++promise->pp_write_state;
4540            break;
4541        case PPWS_PFX1:
4542            *dst++ = 0;
4543            ++promise->pp_write_state;
4544            break;
4545        case PPWS_HBLOCK:
4546            len = MIN(promise->pp_content_len - promise->pp_write_off,
4547                        (size_t) (end - dst));
4548            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
4549                                                                        len);
4550            promise->pp_write_off += len;
4551            dst += len;
4552            if (promise->pp_content_len == promise->pp_write_off)
4553            {
4554                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
4555                    ": reset push state", promise->pp_id);
4556                promise->pp_write_state = PPWS_DONE;
4557            }
4558            goto end;
4559        default:
4560            goto end;
4561        }
4562    }
4563
4564  end:
4565    return dst - (unsigned char *) buf;
4566}
4567
4568
4569static size_t
4570pp_reader_size (void *lsqr_ctx)
4571{
4572    struct push_promise *const promise = lsqr_ctx;
4573    size_t size;
4574
4575    size = 0;
4576    switch (promise->pp_write_state)
4577    {
4578    case PPWS_ID0:
4579    case PPWS_ID1:
4580    case PPWS_ID2:
4581    case PPWS_ID3:
4582    case PPWS_ID4:
4583    case PPWS_ID5:
4584    case PPWS_ID6:
4585    case PPWS_ID7:
4586        size += 8 - promise->pp_write_state;
4587    case PPWS_PFX0:
4588        ++size;
4589        /* fall-through */
4590    case PPWS_PFX1:
4591        ++size;
4592        /* fall-through */
4593    case PPWS_HBLOCK:
4594        size += promise->pp_content_len - promise->pp_write_off;
4595        break;
4596    default:
4597        break;
4598    }
4599
4600    return size;
4601}
4602
4603
4604static void
4605init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
4606{
4607    reader->lsqr_read = pp_reader_read;
4608    reader->lsqr_size = pp_reader_size;
4609    reader->lsqr_ctx = promise;
4610}
4611
4612
4613static void
4614on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4615{
4616    struct lsquic_reader pp_reader;
4617    struct push_promise *promise;
4618    ssize_t nw;
4619    int want_write;
4620
4621    assert(stream_is_pushing_promise(stream));
4622
4623    promise = SLIST_FIRST(&stream->sm_promises);
4624    init_pp_reader(promise, &pp_reader);
4625    nw = stream_write(stream, &pp_reader);
4626    if (nw > 0)
4627    {
4628        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
4629            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
4630        if (promise->pp_write_state == PPWS_DONE)
4631        {
4632            /* Restore want_write flag */
4633            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4634            if (want_write != stream->sm_saved_want_write)
4635                (void) lsquic_stream_wantwrite(stream,
4636                                                stream->sm_saved_want_write);
4637        }
4638    }
4639    else if (nw < 0)
4640    {
4641        LSQ_WARN("could not write push promise (wrapper)");
4642        /* XXX What should happen if we hit an error? TODO */
4643    }
4644}
4645
4646
4647/* Success means that the push promise has been placed on sm_promises list and
4648 * the stream now owns it.  Failure means that the push promise should be
4649 * destroyed by the caller.
4650 *
4651 * A push promise is written immediately.  If it cannot be written to packets
4652 * or buffered whole, the stream is marked as unable to push further promises.
4653 */
4654int
4655lsquic_stream_push_promise (struct lsquic_stream *stream,
4656                                                struct push_promise *promise)
4657{
4658    struct lsquic_reader pp_reader;
4659    unsigned bits, len;
4660    ssize_t nw;
4661
4662    assert(stream->sm_bflags & SMBF_IETF);
4663    assert(lsquic_stream_can_push(stream));
4664
4665    bits = vint_val2bits(promise->pp_id);
4666    len = 1 << bits;
4667    promise->pp_write_state = 8 - len;
4668    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
4669                                                            bits, 1 << bits);
4670
4671    if (!stream_activate_hq_frame(stream,
4672                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
4673                SHF_FIXED_SIZE, pp_reader_size(promise)))
4674        return -1;
4675
4676    stream->stream_flags |= STREAM_PUSHING;
4677
4678    init_pp_reader(promise, &pp_reader);
4679    nw = stream_write(stream, &pp_reader);
4680    if (nw > 0)
4681    {
4682        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
4683        ++promise->pp_refcnt;
4684        if (promise->pp_write_state == PPWS_DONE)
4685            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
4686        else
4687        {
4688            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
4689                ", disable further pushing", promise->pp_id,
4690                promise->pp_write_state, promise->pp_write_off);
4691            stream->stream_flags |= STREAM_NOPUSH;
4692            stream->sm_saved_want_write =
4693                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4694            stream_wantwrite(stream, 1);
4695        }
4696        return 0;
4697    }
4698    else
4699    {
4700        if (nw < 0)
4701            LSQ_WARN("failure writing push promise");
4702        stream->stream_flags |= STREAM_NOPUSH;
4703        stream->stream_flags &= ~STREAM_PUSHING;
4704        return -1;
4705    }
4706}
4707