lsquic_stream.c revision 0adf085a
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_varint.h"
39#include "lsquic_hq.h"
40#include "lsquic_hash.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn_public.h"
43#include "lsquic_util.h"
44#include "lsquic_mm.h"
45#include "lsquic_headers_stream.h"
46#include "lsquic_conn.h"
47#include "lsquic_data_in_if.h"
48#include "lsquic_parse.h"
49#include "lsquic_packet_out.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_senhist.h"
52#include "lsquic_pacer.h"
53#include "lsquic_cubic.h"
54#include "lsquic_bw_sampler.h"
55#include "lsquic_minmax.h"
56#include "lsquic_bbr.h"
57#include "lsquic_send_ctl.h"
58#include "lsquic_headers.h"
59#include "lsquic_ev_log.h"
60#include "lsquic_enc_sess.h"
61#include "lsqpack.h"
62#include "lsquic_frab_list.h"
63#include "lsquic_http1x_if.h"
64#include "lsquic_qdec_hdl.h"
65#include "lsquic_qenc_hdl.h"
66#include "lsquic_byteswap.h"
67#include "lsquic_ietf.h"
68#include "lsquic_push_promise.h"
69
70#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
71#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
72#define LSQUIC_LOG_STREAM_ID stream->id
73#include "lsquic_logger.h"
74
75#define MIN(a, b) ((a) < (b) ? (a) : (b))
76
77static void
78drop_frames_in (lsquic_stream_t *stream);
79
80static void
81maybe_schedule_call_on_close (lsquic_stream_t *stream);
82
83static int
84stream_wantread (lsquic_stream_t *stream, int is_want);
85
86static int
87stream_wantwrite (lsquic_stream_t *stream, int is_want);
88
89static ssize_t
90stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
91
92static ssize_t
93save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
94
95static int
96stream_flush (lsquic_stream_t *stream);
97
98static int
99stream_flush_nocheck (lsquic_stream_t *stream);
100
101static void
102maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
103
104enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
105
106static enum swtp_status
107stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
108
109static enum swtp_status
110stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
111
112static enum swtp_status
113stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
114
115static size_t
116stream_write_avail_no_frames (struct lsquic_stream *);
117
118static size_t
119stream_write_avail_with_frames (struct lsquic_stream *);
120
121static size_t
122stream_write_avail_with_headers (struct lsquic_stream *);
123
124static int
125hq_filter_readable (struct lsquic_stream *stream);
126
127static void
128hq_decr_left (struct lsquic_stream *stream, size_t);
129
130static size_t
131hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
132
133static int
134stream_readable_non_http (struct lsquic_stream *stream);
135
136static int
137stream_readable_http_gquic (struct lsquic_stream *stream);
138
139static int
140stream_readable_http_ietf (struct lsquic_stream *stream);
141
142static ssize_t
143stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
144
145static size_t
146active_hq_frame_sizes (const struct lsquic_stream *);
147
148static void
149on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
150
151static void
152on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
153
154static void
155stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
156
157static size_t
158stream_hq_frame_size (const struct stream_hq_frame *);
159
160const struct stream_filter_if hq_stream_filter_if =
161{
162    .sfi_readable   = hq_filter_readable,
163    .sfi_filter_df  = hq_filter_df,
164    .sfi_decr_left  = hq_decr_left,
165};
166
167
168#if LSQUIC_KEEP_STREAM_HISTORY
169/* These values are printable ASCII characters for ease of printing the
170 * whole history in a single line of a log message.
171 *
172 * The list of events is not exhaustive: only most interesting events
173 * are recorded.
174 */
175enum stream_history_event
176{
177    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
178    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
179    SHE_REACH_FIN          =  'a',
180    SHE_BLOCKED_OUT        =  'b',
181    SHE_CREATED            =  'C',
182    SHE_FRAME_IN           =  'd',
183    SHE_FRAME_OUT          =  'D',
184    SHE_RESET              =  'e',
185    SHE_WINDOW_UPDATE      =  'E',
186    SHE_FIN_IN             =  'f',
187    SHE_FINISHED           =  'F',
188    SHE_GOAWAY_IN          =  'g',
189    SHE_USER_WRITE_HEADER  =  'h',
190    SHE_HEADERS_IN         =  'H',
191    SHE_IF_SWITCH          =  'i',
192    SHE_ONCLOSE_SCHED      =  'l',
193    SHE_ONCLOSE_CALL       =  'L',
194    SHE_ONNEW              =  'N',
195    SHE_SET_PRIO           =  'p',
196    SHE_USER_READ          =  'r',
197    SHE_SHUTDOWN_READ      =  'R',
198    SHE_RST_IN             =  's',
199    SHE_SS_IN              =  'S',
200    SHE_RST_OUT            =  't',
201    SHE_RST_ACKED          =  'T',
202    SHE_FLUSH              =  'u',
203    SHE_USER_WRITE_DATA    =  'w',
204    SHE_SHUTDOWN_WRITE     =  'W',
205    SHE_CLOSE              =  'X',
206    SHE_DELAY_SW           =  'y',
207    SHE_FORCE_FINISH       =  'Z',
208    SHE_WANTREAD_NO        =  '0',  /* "YES" must be one more than "NO" */
209    SHE_WANTREAD_YES       =  '1',
210    SHE_WANTWRITE_NO       =  '2',
211    SHE_WANTWRITE_YES      =  '3',
212};
213
214static void
215sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
216{
217    enum stream_history_event prev_event;
218    sm_hist_idx_t idx;
219    int plus;
220
221    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
222    plus = SHE_PLUS == stream->sm_hist_buf[idx];
223    idx = (idx - plus) & SM_HIST_IDX_MASK;
224    prev_event = stream->sm_hist_buf[idx];
225
226    if (prev_event == sh_event && plus)
227        return;
228
229    if (prev_event == sh_event)
230        sh_event = SHE_PLUS;
231    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
232
233    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
234        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
235                                                        stream->sm_hist_buf);
236}
237
238
239#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
240#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
241        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
242            LSQ_DEBUG("history: [%.*s]",                                    \
243                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
244                (stream)->sm_hist_buf);                                     \
245    } while (0)
246#else
247#   define SM_HISTORY_APPEND(stream, event)
248#   define SM_HISTORY_DUMP_REMAINING(stream)
249#endif
250
251
252static int
253stream_inside_callback (const lsquic_stream_t *stream)
254{
255    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
256}
257
258
259static void
260maybe_conn_to_tickable (lsquic_stream_t *stream)
261{
262    if (!stream_inside_callback(stream))
263        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
264                                           stream->conn_pub->lconn);
265}
266
267
268/* Here, "readable" means that the user is able to read from the stream. */
269static void
270maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
271{
272    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
273    {
274        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
275                                           stream->conn_pub->lconn);
276    }
277}
278
279
280/* Here, "writeable" means that data can be put into packets to be
281 * scheduled to be sent out.
282 *
283 * If `check_can_send' is false, it means that we do not need to check
284 * whether packets can be sent.  This check was already performed when
285 * we packetized stream data.
286 */
287static void
288maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
289                                                    int check_can_send)
290{
291    if (!stream_inside_callback(stream) &&
292            (!check_can_send
293             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
294          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
295    {
296        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
297                                           stream->conn_pub->lconn);
298    }
299}
300
301
302static int
303stream_stalled (const lsquic_stream_t *stream)
304{
305    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
306           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
307                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
308}
309
310
311static size_t
312stream_stream_frame_header_sz (const struct lsquic_stream *stream,
313                                                            unsigned data_sz)
314{
315    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
316                                    stream->id, stream->tosend_off, data_sz);
317}
318
319
320static size_t
321stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
322                                                    unsigned data_sz_IGNORED)
323{
324    return stream->conn_pub->lconn->cn_pf
325         ->pf_calc_crypto_frame_header_sz(stream->tosend_off);
326}
327
328
329/* GQUIC-only function */
330static int
331stream_is_hsk (const struct lsquic_stream *stream)
332{
333    if (stream->sm_bflags & SMBF_IETF)
334        return 0;
335    else
336        return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE;
337}
338
339
340static struct lsquic_stream *
341stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
342           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
343           enum stream_ctor_flags ctor_flags)
344{
345    struct lsquic_stream *stream;
346
347    stream = calloc(1, sizeof(*stream));
348    if (!stream)
349        return NULL;
350
351    if (ctor_flags & SCF_USE_DI_HASH)
352        stream->data_in = data_in_hash_new(conn_pub, id, 0);
353    else
354        stream->data_in = data_in_nocopy_new(conn_pub, id);
355    if (!stream->data_in)
356    {
357        free(stream);
358        return NULL;
359    }
360
361    stream->id        = id;
362    stream->stream_if = stream_if;
363    stream->conn_pub  = conn_pub;
364    stream->sm_onnew_arg = stream_if_ctx;
365    stream->sm_write_avail = stream_write_avail_no_frames;
366
367    STAILQ_INIT(&stream->sm_hq_frames);
368
369    stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1);
370    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
371        stream->sm_bflags |= SMBF_SERVER;
372
373    return stream;
374}
375
376
377/* TODO: The logic to figure out whether the stream is connection limited
378 * should be taken out of the constructor.  The caller should specify this
379 * via one of enum stream_ctor_flags.
380 */
381lsquic_stream_t *
382lsquic_stream_new (lsquic_stream_id_t id,
383        struct lsquic_conn_public *conn_pub,
384        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
385        unsigned initial_window, uint64_t initial_send_off,
386        enum stream_ctor_flags ctor_flags)
387{
388    lsquic_cfcw_t *cfcw;
389    lsquic_stream_t *stream;
390
391    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
392                                                                ctor_flags);
393    if (!stream)
394        return NULL;
395
396    if (!initial_window)
397        initial_window = 16 * 1024;
398
399    if (ctor_flags & SCF_IETF)
400    {
401        cfcw = &conn_pub->cfcw;
402        stream->sm_bflags |= SMBF_CONN_LIMITED;
403        if (ctor_flags & SCF_HTTP)
404        {
405            stream->sm_write_avail = stream_write_avail_with_headers;
406            stream->sm_readable = stream_readable_http_ietf;
407            stream->sm_sfi = &hq_stream_filter_if;
408        }
409        else
410            stream->sm_readable = stream_readable_non_http;
411        lsquic_stream_set_priority_internal(stream,
412                                            LSQUIC_STREAM_DEFAULT_PRIO);
413        stream->sm_write_to_packet = stream_write_to_packet_std;
414    }
415    else
416    {
417        if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id))
418            cfcw = NULL;
419        else
420        {
421            cfcw = &conn_pub->cfcw;
422            stream->sm_bflags |= SMBF_CONN_LIMITED;
423            lsquic_stream_set_priority_internal(stream,
424                                                LSQUIC_STREAM_DEFAULT_PRIO);
425        }
426        if (stream->sm_bflags & SMBF_USE_HEADERS)
427            stream->sm_readable = stream_readable_http_gquic;
428        else
429            stream->sm_readable = stream_readable_non_http;
430        if (stream_is_hsk(stream))
431            stream->sm_write_to_packet = stream_write_to_packet_hsk;
432        else
433            stream->sm_write_to_packet = stream_write_to_packet_std;
434    }
435
436    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
437    stream->max_send_off = initial_send_off;
438    LSQ_DEBUG("created stream");
439    SM_HISTORY_APPEND(stream, SHE_CREATED);
440    stream->sm_frame_header_sz = stream_stream_frame_header_sz;
441    if (ctor_flags & SCF_CALL_ON_NEW)
442        lsquic_stream_call_on_new(stream);
443    return stream;
444}
445
446
447struct lsquic_stream *
448lsquic_stream_new_crypto (enum enc_level enc_level,
449        struct lsquic_conn_public *conn_pub,
450        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
451        enum stream_ctor_flags ctor_flags)
452{
453    struct lsquic_stream *stream;
454    lsquic_stream_id_t stream_id;
455
456    assert(ctor_flags & SCF_CRITICAL);
457
458    stream_id = ~0ULL - enc_level;
459    stream = stream_new_common(stream_id, conn_pub, stream_if,
460                                                stream_if_ctx, ctor_flags);
461    if (!stream)
462        return NULL;
463
464    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
465    stream->sm_enc_level = enc_level;
466    /* TODO: why have limit in crypto stream?  Set it to UINT64_MAX? */
467    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
468    stream->max_send_off = 16 * 1024;
469    LSQ_DEBUG("created crypto stream");
470    SM_HISTORY_APPEND(stream, SHE_CREATED);
471    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
472    stream->sm_write_to_packet = stream_write_to_packet_crypto;
473    stream->sm_readable = stream_readable_non_http;
474    if (ctor_flags & SCF_CALL_ON_NEW)
475        lsquic_stream_call_on_new(stream);
476    return stream;
477}
478
479
480void
481lsquic_stream_call_on_new (lsquic_stream_t *stream)
482{
483    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
484    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
485    {
486        LSQ_DEBUG("calling on_new_stream");
487        SM_HISTORY_APPEND(stream, SHE_ONNEW);
488        stream->stream_flags |= STREAM_ONNEW_DONE;
489        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
490                                                          stream);
491    }
492}
493
494
495static void
496decr_conn_cap (struct lsquic_stream *stream, size_t incr)
497{
498    if (stream->sm_bflags & SMBF_CONN_LIMITED)
499    {
500        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
501        stream->conn_pub->conn_cap.cc_sent -= incr;
502    }
503}
504
505
506static void
507maybe_resize_stream_buffer (struct lsquic_stream *stream)
508{
509    assert(0 == stream->sm_n_buffered);
510
511    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
512    {
513        free(stream->sm_buf);
514        stream->sm_buf = NULL;
515        stream->sm_n_allocated = 0;
516    }
517    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
518        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
519}
520
521
522static void
523drop_buffered_data (struct lsquic_stream *stream)
524{
525    decr_conn_cap(stream, stream->sm_n_buffered);
526    stream->sm_n_buffered = 0;
527    maybe_resize_stream_buffer(stream);
528    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
529        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
530}
531
532
533static void
534destroy_uh (struct lsquic_stream *stream)
535{
536    if (stream->uh)
537    {
538        if (stream->uh->uh_hset)
539            stream->conn_pub->enpub->enp_hsi_if
540                            ->hsi_discard_header_set(stream->uh->uh_hset);
541        free(stream->uh);
542        stream->uh = NULL;
543    }
544}
545
546
547void
548lsquic_stream_destroy (lsquic_stream_t *stream)
549{
550    struct push_promise *promise;
551    struct stream_hq_frame *shf;
552
553    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
554    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
555                                                            STREAM_ONNEW_DONE)
556    {
557        stream->stream_flags |= STREAM_ONCLOSE_DONE;
558        stream->stream_if->on_close(stream, stream->st_ctx);
559    }
560    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
561        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
562    if (stream->sm_qflags & SMQF_WANT_READ)
563        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
564    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
565        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
566    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
567        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
568    if (stream->sm_qflags & SMQF_QPACK_DEC)
569        lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream);
570    drop_buffered_data(stream);
571    lsquic_sfcw_consume_rem(&stream->fc);
572    drop_frames_in(stream);
573    if (stream->push_req)
574    {
575        if (stream->push_req->uh_hset)
576            stream->conn_pub->enpub->enp_hsi_if
577                            ->hsi_discard_header_set(stream->push_req->uh_hset);
578        free(stream->push_req);
579    }
580    while ((promise = SLIST_FIRST(&stream->sm_promises)))
581    {
582        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
583        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
584    }
585    if (stream->sm_promise)
586    {
587        assert(stream->sm_promise->pp_pushed_stream == stream);
588        stream->sm_promise->pp_pushed_stream = NULL;
589        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
590    }
591    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
592        stream_hq_frame_put(stream, shf);
593    destroy_uh(stream);
594    free(stream->sm_buf);
595    free(stream->sm_header_block);
596    LSQ_DEBUG("destroyed stream");
597    SM_HISTORY_DUMP_REMAINING(stream);
598    free(stream);
599}
600
601
602static int
603stream_is_finished (const lsquic_stream_t *stream)
604{
605    return lsquic_stream_is_closed(stream)
606           /* n_unacked checks that no outgoing packets that reference this
607            * stream are outstanding:
608            */
609        && 0 == stream->n_unacked
610           /* This checks that no packets that reference this stream will
611            * become outstanding:
612            */
613        && 0 == (stream->sm_qflags & SMQF_SEND_RST)
614        && ((stream->stream_flags & STREAM_FORCE_FINISH)
615          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
616}
617
618
619/* This is an internal function */
620void
621lsquic_stream_force_finish (struct lsquic_stream *stream)
622{
623    LSQ_DEBUG("stream is now finished");
624    SM_HISTORY_APPEND(stream, SHE_FINISHED);
625    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
626        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
627                                                next_service_stream);
628    stream->sm_qflags |= SMQF_FREE_STREAM;
629    stream->stream_flags |= STREAM_FINISHED;
630}
631
632
633static void
634maybe_finish_stream (lsquic_stream_t *stream)
635{
636    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
637                                                    stream_is_finished(stream))
638        lsquic_stream_force_finish(stream);
639}
640
641
642static void
643maybe_schedule_call_on_close (lsquic_stream_t *stream)
644{
645    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
646                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
647            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
648            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
649    {
650        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
651            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
652                                                    next_service_stream);
653        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
654        LSQ_DEBUG("scheduled calling on_close");
655        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
656    }
657}
658
659
660void
661lsquic_stream_call_on_close (lsquic_stream_t *stream)
662{
663    assert(stream->stream_flags & STREAM_ONNEW_DONE);
664    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
665    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
666        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
667                                                    next_service_stream);
668    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
669    {
670        LSQ_DEBUG("calling on_close");
671        stream->stream_flags |= STREAM_ONCLOSE_DONE;
672        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
673        stream->stream_if->on_close(stream, stream->st_ctx);
674    }
675    else
676        assert(0);
677}
678
679
680static int
681stream_readable_non_http (struct lsquic_stream *stream)
682{
683    return stream->data_in->di_if->di_get_frame(stream->data_in,
684                                                stream->read_offset) != NULL;
685}
686
687
688static int
689stream_readable_http_gquic (struct lsquic_stream *stream)
690{
691    return (stream->stream_flags & STREAM_HAVE_UH)
692        && (stream->uh
693            ||  stream->data_in->di_if->di_get_frame(stream->data_in,
694                                                    stream->read_offset));
695}
696
697
698static int
699stream_readable_http_ietf (struct lsquic_stream *stream)
700{
701    return
702        /* If we have read the header set and the header set has not yet
703         * been read, the stream is readable.
704         */
705        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
706        ||
707        /* Alternatively, run the filter and check for payload availability. */
708        (stream->sm_sfi->sfi_readable(stream)
709            && (/* Running the filter may result in hitting FIN: */
710                (stream->stream_flags & STREAM_FIN_REACHED)
711                || stream->data_in->di_if->di_get_frame(stream->data_in,
712                                                    stream->read_offset)));
713}
714
715
716int
717lsquic_stream_readable (struct lsquic_stream *stream)
718{
719    /* A stream is readable if one of the following is true: */
720    return
721        /* - It is already finished: in that case, lsquic_stream_read() will
722         *   return 0.
723         */
724            (stream->stream_flags & STREAM_FIN_REACHED)
725        /* - The stream is reset, by either side.  In this case,
726         *   lsquic_stream_read() will return -1 (we want the user to be
727         *   able to collect the error).
728         */
729        ||  lsquic_stream_is_reset(stream)
730        /* Type-dependent readability check: */
731        ||  stream->sm_readable(stream);
732    ;
733}
734
735
736static size_t
737stream_write_avail_no_frames (struct lsquic_stream *stream)
738{
739    uint64_t stream_avail, conn_avail;
740
741    stream_avail = stream->max_send_off - stream->tosend_off
742                                                - stream->sm_n_buffered;
743
744    if (stream->sm_bflags & SMBF_CONN_LIMITED)
745    {
746        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
747        if (conn_avail < stream_avail)
748            stream_avail = conn_avail;
749    }
750
751    return stream_avail;
752}
753
754
755static size_t
756stream_write_avail_with_frames (struct lsquic_stream *stream)
757{
758    uint64_t stream_avail, conn_avail;
759    const struct stream_hq_frame *shf;
760    size_t size;
761
762    stream_avail = stream->max_send_off - stream->tosend_off
763                                                - stream->sm_n_buffered;
764    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
765        if (!(shf->shf_flags & SHF_WRITTEN))
766        {
767            size = stream_hq_frame_size(shf);
768            assert(size <= stream_avail);
769            stream_avail -= size;
770        }
771
772    if (stream->sm_bflags & SMBF_CONN_LIMITED)
773    {
774        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
775        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
776            if (!(shf->shf_flags & SHF_CC_PAID))
777            {
778                size = stream_hq_frame_size(shf);
779                if (size < conn_avail)
780                    conn_avail -= size;
781                else
782                    return 0;
783            }
784        if (conn_avail < stream_avail)
785            stream_avail = conn_avail;
786    }
787
788    if (stream_avail >= 3 /* Smallest new frame */)
789        return stream_avail;
790    else
791        return 0;
792}
793
794
795static int
796stream_is_pushing_promise (const struct lsquic_stream *stream)
797{
798    return (stream->stream_flags & STREAM_PUSHING)
799        && SLIST_FIRST(&stream->sm_promises)
800        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
801        ;
802}
803
804
805/* To prevent deadlocks, ensure that when headers are sent, the bytes
806 * sent on the encoder stream are written first.
807 *
808 * XXX If the encoder is set up in non-risking mode, it is perfectly
809 * fine to send the header block first.  TODO: update the logic to
810 * reflect this.  There should be two sending behaviors: risk and non-risk.
811 * For now, we assume risk for everything to be on the conservative side.
812 */
813static size_t
814stream_write_avail_with_headers (struct lsquic_stream *stream)
815{
816    if (stream->stream_flags & STREAM_PUSHING)
817        return stream_write_avail_with_frames(stream);
818
819    switch (stream->sm_send_headers_state)
820    {
821    case SSHS_BEGIN:
822        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
823    case SSHS_ENC_SENDING:
824        if (stream->sm_hb_compl >
825                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
826            return 0;
827        LSQ_DEBUG("encoder stream bytes have all been sent");
828        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
829        /* fall-through */
830    default:
831        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
832        return stream_write_avail_with_frames(stream);
833    }
834}
835
836
837size_t
838lsquic_stream_write_avail (struct lsquic_stream *stream)
839{
840    return stream->sm_write_avail(stream);
841}
842
843
844int
845lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
846{
847    struct lsquic_conn *lconn;
848
849    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
850                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
851    {
852        if (stream->sm_bflags & SMBF_IETF)
853        {
854            lconn = stream->conn_pub->lconn;
855            lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
856                "flow control violation on stream %"PRIu64, stream->id);
857        }
858        return -1;
859    }
860    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
861    {
862        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
863            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
864                                                    next_send_stream);
865        stream->sm_qflags |= SMQF_SEND_WUF;
866    }
867    return 0;
868}
869
870
871int
872lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
873{
874    uint64_t max_off;
875    int got_next_offset, rv, free_frame;
876    enum ins_frame ins_frame;
877    struct lsquic_conn *lconn;
878
879    assert(frame->packet_in);
880
881    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
882    LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; "
883        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
884
885    if ((stream->sm_bflags & SMBF_USE_HEADERS)
886                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
887    {
888        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
889        lsquic_malo_put(frame);
890        return -1;
891    }
892
893    if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF)
894            && (stream->stream_flags & STREAM_FIN_RECVD)
895            && stream->sm_fin_off != DF_END(frame))
896    {
897        lconn = stream->conn_pub->lconn;
898        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
899            "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does "
900            "not match previous final size %"PRIu64, DF_END(frame),
901            stream->id, stream->sm_fin_off);
902        return -1;
903    }
904
905    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
906  insert_frame:
907    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
908    if (INS_FRAME_OK == ins_frame)
909    {
910        /* Update maximum offset in the flow controller and check for flow
911         * control violation:
912         */
913        rv = -1;
914        free_frame = !stream->data_in->di_if->di_own_on_ok;
915        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
916        if (0 != lsquic_stream_update_sfcw(stream, max_off))
917            goto end_ok;
918        if (frame->data_frame.df_fin)
919        {
920            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
921            stream->stream_flags |= STREAM_FIN_RECVD;
922            stream->sm_fin_off = DF_END(frame);
923            maybe_finish_stream(stream);
924        }
925        if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
926                (stream->data_in->di_flags & DI_SWITCH_IMPL))
927        {
928            stream->data_in = stream->data_in->di_if->di_switch_impl(
929                                        stream->data_in, stream->read_offset);
930            if (!stream->data_in)
931            {
932                stream->data_in = data_in_error_new();
933                goto end_ok;
934            }
935        }
936        if (got_next_offset)
937            /* Checking the offset saves di_get_frame() call */
938            maybe_conn_to_tickable_if_readable(stream);
939        rv = 0;
940  end_ok:
941        if (free_frame)
942            lsquic_malo_put(frame);
943        return rv;
944    }
945    else if (INS_FRAME_DUP == ins_frame)
946    {
947        return 0;
948    }
949    else if (INS_FRAME_OVERLAP == ins_frame)
950    {
951        LSQ_DEBUG("overlap: switching DATA IN implementation");
952        stream->data_in = stream->data_in->di_if->di_switch_impl(
953                                    stream->data_in, stream->read_offset);
954        if (stream->data_in)
955            goto insert_frame;
956        stream->data_in = data_in_error_new();
957        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
958        lsquic_malo_put(frame);
959        return -1;
960    }
961    else
962    {
963        assert(INS_FRAME_ERR == ins_frame);
964        return -1;
965    }
966}
967
968
969static void
970drop_frames_in (lsquic_stream_t *stream)
971{
972    if (stream->data_in)
973    {
974        stream->data_in->di_if->di_destroy(stream->data_in);
975        /* To avoid checking whether `data_in` is set, just set to the error
976         * data-in stream.  It does the right thing after incoming data is
977         * dropped.
978         */
979        stream->data_in = data_in_error_new();
980    }
981}
982
983
984static void
985maybe_elide_stream_frames (struct lsquic_stream *stream)
986{
987    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
988    {
989        if (stream->n_unacked)
990            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
991                                                stream->id);
992        stream->stream_flags |= STREAM_FRAMES_ELIDED;
993    }
994}
995
996
997int
998lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
999                      uint64_t error_code)
1000{
1001    struct lsquic_conn *lconn;
1002
1003    if ((stream->sm_bflags & SMBF_IETF)
1004            && (stream->stream_flags & STREAM_FIN_RECVD)
1005            && stream->sm_fin_off != offset)
1006    {
1007        lconn = stream->conn_pub->lconn;
1008        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1009            "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") "
1010            "does not match previous final size %"PRIu64, offset,
1011            stream->id, stream->sm_fin_off);
1012        return -1;
1013    }
1014
1015    if (stream->stream_flags & STREAM_RST_RECVD)
1016    {
1017        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
1018        return 0;
1019    }
1020
1021    SM_HISTORY_APPEND(stream, SHE_RST_IN);
1022    /* This flag must always be set, even if we are "ignoring" it: it is
1023     * used by elision code.
1024     */
1025    stream->stream_flags |= STREAM_RST_RECVD;
1026
1027    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
1028    {
1029        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is "
1030            "smaller than that of byte following the last byte we have seen: "
1031            "0x%"PRIX64, offset,
1032            lsquic_sfcw_get_max_recv_off(&stream->fc));
1033        return -1;
1034    }
1035
1036    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
1037    {
1038        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64
1039            " violates flow control", offset);
1040        return -1;
1041    }
1042
1043    /* Let user collect error: */
1044    maybe_conn_to_tickable_if_readable(stream);
1045
1046    lsquic_sfcw_consume_rem(&stream->fc);
1047    drop_frames_in(stream);
1048    drop_buffered_data(stream);
1049    maybe_elide_stream_frames(stream);
1050
1051    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1052                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1053        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1054
1055    stream->stream_flags |= STREAM_RST_RECVD;
1056
1057    maybe_finish_stream(stream);
1058    maybe_schedule_call_on_close(stream);
1059
1060    return 0;
1061}
1062
1063
1064void
1065lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1066                                                        uint64_t error_code)
1067{
1068    if (stream->stream_flags & STREAM_SS_RECVD)
1069    {
1070        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1071        return;
1072    }
1073
1074    SM_HISTORY_APPEND(stream, SHE_SS_IN);
1075    stream->stream_flags |= STREAM_SS_RECVD;
1076
1077    /* Let user collect error: */
1078    maybe_conn_to_tickable_if_readable(stream);
1079
1080    lsquic_sfcw_consume_rem(&stream->fc);
1081    drop_frames_in(stream);
1082    drop_buffered_data(stream);
1083    maybe_elide_stream_frames(stream);
1084
1085    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1086                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1087        lsquic_stream_reset_ext(stream, error_code, 0);
1088
1089    maybe_finish_stream(stream);
1090    maybe_schedule_call_on_close(stream);
1091}
1092
1093
1094uint64_t
1095lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1096{
1097    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1098}
1099
1100
1101void
1102lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1103{
1104    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1105    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1106    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1107        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1108    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1109}
1110
1111
1112uint64_t
1113lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1114{
1115    assert(stream->sm_qflags & SMQF_SEND_WUF);
1116    stream->sm_qflags &= ~SMQF_SEND_WUF;
1117    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1118        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1119    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1120}
1121
1122
1123void
1124lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1125{
1126    uint64_t last_off;
1127
1128    if (stream->sm_last_recv_off)
1129        last_off = stream->sm_last_recv_off;
1130    else
1131        /* This gets advertized in transport parameters */
1132        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1133
1134    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1135        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1136
1137    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1138    {
1139        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1140            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1141                                                    next_send_stream);
1142        stream->sm_qflags |= SMQF_SEND_WUF;
1143        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1144    }
1145    else if (stream->sm_qflags & SMQF_SEND_WUF)
1146        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1147    else if (stream->sm_last_recv_off)
1148        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1149            "packetized or sent", stream->sm_last_recv_off);
1150    else
1151        LSQ_INFO("Peer should have receive transport param limit "
1152            "of %"PRIu64"; odd.", last_off);
1153}
1154
1155
1156/* GQUIC's BLOCKED frame does not have an offset */
1157void
1158lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1159{
1160    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1161    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1162    {
1163        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1164            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1165                                                    next_send_stream);
1166        stream->sm_qflags |= SMQF_SEND_WUF;
1167        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1168    }
1169    else
1170        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1171}
1172
1173
1174void
1175lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1176{
1177    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1178    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1179    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1180    stream->stream_flags |= STREAM_BLOCKED_SENT;
1181    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1182        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1183}
1184
1185
1186void
1187lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1188{
1189    assert(stream->sm_qflags & SMQF_SEND_RST);
1190    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1191    stream->sm_qflags &= ~SMQF_SEND_RST;
1192    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1193        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1194    stream->stream_flags |= STREAM_RST_SENT;
1195    maybe_finish_stream(stream);
1196}
1197
1198
1199static size_t
1200read_uh (struct lsquic_stream *stream,
1201        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1202{
1203    struct http1x_headers *const h1h = stream->uh->uh_hset;
1204    size_t nread;
1205
1206    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1207                  h1h->h1h_size - h1h->h1h_off,
1208                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1209    h1h->h1h_off += nread;
1210    if (h1h->h1h_off == h1h->h1h_size)
1211    {
1212        LSQ_DEBUG("read all uncompressed headers");
1213        destroy_uh(stream);
1214        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1215        {
1216            stream->stream_flags |= STREAM_FIN_REACHED;
1217            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1218        }
1219    }
1220    return nread;
1221}
1222
1223
1224static void
1225stream_consumed_bytes (struct lsquic_stream *stream)
1226{
1227    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1228    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1229    {
1230        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1231            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1232                                                            next_send_stream);
1233        stream->sm_qflags |= SMQF_SEND_WUF;
1234        maybe_conn_to_tickable_if_writeable(stream, 1);
1235    }
1236}
1237
1238
1239struct read_frames_status
1240{
1241    int     error;
1242    int     processed_frames;
1243    size_t  total_nread;
1244};
1245
1246
1247static struct read_frames_status
1248read_data_frames (struct lsquic_stream *stream, int do_filtering,
1249        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1250{
1251    struct data_frame *data_frame;
1252    size_t nread, toread, total_nread;
1253    int short_read, processed_frames;
1254
1255    processed_frames = 0;
1256    total_nread = 0;
1257
1258    while ((data_frame = stream->data_in->di_if->di_get_frame(
1259                                        stream->data_in, stream->read_offset)))
1260    {
1261
1262        ++processed_frames;
1263
1264        do
1265        {
1266            if (do_filtering && stream->sm_sfi)
1267                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1268            else
1269                toread = data_frame->df_size - data_frame->df_read_off;
1270
1271            if (toread || data_frame->df_fin)
1272            {
1273                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1274                                                     toread, data_frame->df_fin);
1275                if (do_filtering && stream->sm_sfi)
1276                    stream->sm_sfi->sfi_decr_left(stream, nread);
1277                data_frame->df_read_off += nread;
1278                stream->read_offset += nread;
1279                total_nread += nread;
1280                short_read = nread < toread;
1281            }
1282            else
1283                short_read = 0;
1284
1285            if (data_frame->df_read_off == data_frame->df_size)
1286            {
1287                const int fin = data_frame->df_fin;
1288                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1289                data_frame = NULL;
1290                if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
1291                        (stream->data_in->di_flags & DI_SWITCH_IMPL))
1292                {
1293                    stream->data_in = stream->data_in->di_if->di_switch_impl(
1294                                                stream->data_in, stream->read_offset);
1295                    if (!stream->data_in)
1296                    {
1297                        stream->data_in = data_in_error_new();
1298                        return (struct read_frames_status) { .error = 1, };
1299                    }
1300                }
1301                if (fin)
1302                {
1303                    stream->stream_flags |= STREAM_FIN_REACHED;
1304                    goto end_while;
1305                }
1306            }
1307            else if (short_read)
1308                goto end_while;
1309        }
1310        while (data_frame);
1311    }
1312  end_while:
1313
1314    if (processed_frames)
1315        stream_consumed_bytes(stream);
1316
1317    return (struct read_frames_status) {
1318        .error = 0,
1319        .processed_frames = processed_frames,
1320        .total_nread = total_nread,
1321    };
1322}
1323
1324
1325static ssize_t
1326stream_readf (struct lsquic_stream *stream,
1327        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1328{
1329    size_t total_nread, nread;
1330    int read_unc_headers;
1331
1332    total_nread = 0;
1333
1334    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1335                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1336            && !(stream->stream_flags & STREAM_HAVE_UH)
1337            && !stream->uh)
1338    {
1339        if (stream->sm_readable(stream))
1340        {
1341            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1342            {
1343                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1344                errno = EBADMSG;
1345                return -1;
1346            }
1347            assert(stream->uh);
1348        }
1349        else
1350        {
1351            errno = EWOULDBLOCK;
1352            return -1;
1353        }
1354    }
1355
1356    if (stream->uh)
1357    {
1358        if (stream->uh->uh_flags & UH_H1H)
1359        {
1360            nread = read_uh(stream, readf, ctx);
1361            read_unc_headers = nread > 0;
1362            total_nread += nread;
1363            if (stream->uh)
1364                return total_nread;
1365        }
1366        else
1367        {
1368            LSQ_INFO("header set not claimed: cannot read from stream");
1369            return -1;
1370        }
1371    }
1372    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1373                                && !(stream->stream_flags & STREAM_HAVE_UH))
1374    {
1375        LSQ_DEBUG("cannot read: headers not available");
1376        errno = EWOULDBLOCK;
1377        return -1;
1378    }
1379    else
1380        read_unc_headers = 0;
1381
1382    const struct read_frames_status rfs
1383                        = read_data_frames(stream, 1, readf, ctx);
1384    if (rfs.error)
1385        return -1;
1386    total_nread += rfs.total_nread;
1387
1388    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
1389                                        total_nread, stream->read_offset);
1390
1391    if (rfs.processed_frames || read_unc_headers)
1392    {
1393        return total_nread;
1394    }
1395    else
1396    {
1397        assert(0 == total_nread);
1398        errno = EWOULDBLOCK;
1399        return -1;
1400    }
1401}
1402
1403
1404/* This function returns 0 when EOF is reached.
1405 */
1406ssize_t
1407lsquic_stream_readf (struct lsquic_stream *stream,
1408        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1409{
1410    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1411
1412    if (lsquic_stream_is_reset(stream))
1413    {
1414        if (stream->stream_flags & STREAM_RST_RECVD)
1415            stream->stream_flags |= STREAM_RST_READ;
1416        errno = ECONNRESET;
1417        return -1;
1418    }
1419    if (stream->stream_flags & STREAM_U_READ_DONE)
1420    {
1421        errno = EBADF;
1422        return -1;
1423    }
1424    if ((stream->stream_flags & STREAM_FIN_REACHED)
1425            && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH)
1426                   ^ !!(stream->sm_bflags & SMBF_USE_HEADERS)))
1427        return 0;
1428
1429    return stream_readf(stream, readf, ctx);
1430}
1431
1432
1433struct readv_ctx
1434{
1435    const struct iovec        *iov;
1436    const struct iovec *const  end;
1437    unsigned char             *p;
1438};
1439
1440
1441static size_t
1442readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1443{
1444    struct readv_ctx *const ctx = ctx_p;
1445    const unsigned char *const end = buf + len;
1446    size_t ntocopy;
1447
1448    while (ctx->iov < ctx->end && buf < end)
1449    {
1450        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1451                                                                    - ctx->p;
1452        if (ntocopy > (size_t) (end - buf))
1453            ntocopy = end - buf;
1454        memcpy(ctx->p, buf, ntocopy);
1455        ctx->p += ntocopy;
1456        buf += ntocopy;
1457        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1458        {
1459            do
1460                ++ctx->iov;
1461            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1462            if (ctx->iov < ctx->end)
1463                ctx->p = ctx->iov->iov_base;
1464            else
1465                ctx->p = NULL;
1466        }
1467    }
1468
1469    return len - (end - buf);
1470}
1471
1472
1473ssize_t
1474lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1475                     int iovcnt)
1476{
1477    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1478    return lsquic_stream_readf(stream, readv_f, &ctx);
1479}
1480
1481
1482ssize_t
1483lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1484{
1485    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1486    return lsquic_stream_readv(stream, &iov, 1);
1487}
1488
1489
1490static void
1491stream_shutdown_read (lsquic_stream_t *stream)
1492{
1493    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1494    {
1495        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1496        stream->stream_flags |= STREAM_U_READ_DONE;
1497        stream_wantread(stream, 0);
1498        maybe_finish_stream(stream);
1499    }
1500}
1501
1502
1503static int
1504stream_is_incoming_unidir (const struct lsquic_stream *stream)
1505{
1506    enum stream_id_type sit;
1507
1508    if (stream->sm_bflags & SMBF_IETF)
1509    {
1510        sit = stream->id & SIT_MASK;
1511        if (stream->sm_bflags & SMBF_SERVER)
1512            return sit == SIT_UNI_CLIENT;
1513        else
1514            return sit == SIT_UNI_SERVER;
1515    }
1516    else
1517        return 0;
1518}
1519
1520
1521static void
1522stream_shutdown_write (lsquic_stream_t *stream)
1523{
1524    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1525        return;
1526
1527    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1528    stream->stream_flags |= STREAM_U_WRITE_DONE;
1529    stream_wantwrite(stream, 0);
1530
1531    /* Don't bother to check whether there is anything else to write if
1532     * the flags indicate that nothing else should be written.
1533     */
1534    if (!(stream->sm_bflags & SMBF_CRYPTO)
1535            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1536                && !stream_is_incoming_unidir(stream)
1537                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1538    {
1539        if ((stream->sm_bflags & SMBF_USE_HEADERS)
1540                && !(stream->stream_flags & STREAM_HEADERS_SENT))
1541        {
1542            LSQ_DEBUG("headers not sent, send a reset");
1543            lsquic_stream_reset(stream, 0);
1544        }
1545        else if (stream->sm_n_buffered == 0)
1546        {
1547            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1548                                                 stream))
1549            {
1550                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1551                stream->stream_flags |= STREAM_FIN_SENT;
1552            }
1553            else
1554            {
1555                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1556                          "flag in it");
1557                (void) stream_flush_nocheck(stream);
1558            }
1559        }
1560        else
1561            (void) stream_flush_nocheck(stream);
1562    }
1563}
1564
1565
1566static void
1567maybe_stream_shutdown_write (struct lsquic_stream *stream)
1568{
1569    if (stream->sm_send_headers_state == SSHS_BEGIN)
1570        stream_shutdown_write(stream);
1571    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1572    {
1573        LSQ_DEBUG("shutdown delayed");
1574        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1575        stream->stream_flags |= STREAM_DELAYED_SW;
1576    }
1577}
1578
1579
1580int
1581lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1582{
1583    LSQ_DEBUG("shutdown; how: %d", how);
1584    if (lsquic_stream_is_closed(stream))
1585    {
1586        LSQ_INFO("Attempt to shut down a closed stream");
1587        errno = EBADF;
1588        return -1;
1589    }
1590    /* 0: read, 1: write: 2: read and write
1591     */
1592    if (how < 0 || how > 2)
1593    {
1594        errno = EINVAL;
1595        return -1;
1596    }
1597
1598    if (how)
1599        maybe_stream_shutdown_write(stream);
1600    if (how != 1)
1601        stream_shutdown_read(stream);
1602
1603    maybe_finish_stream(stream);
1604    maybe_schedule_call_on_close(stream);
1605    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1606        maybe_conn_to_tickable_if_writeable(stream, 1);
1607
1608    return 0;
1609}
1610
1611
1612void
1613lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1614{
1615    LSQ_DEBUG("internal shutdown");
1616    if (lsquic_stream_is_critical(stream))
1617    {
1618        LSQ_DEBUG("add flag to force-finish special stream");
1619        stream->stream_flags |= STREAM_FORCE_FINISH;
1620        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1621    }
1622    maybe_finish_stream(stream);
1623    maybe_schedule_call_on_close(stream);
1624}
1625
1626
1627static void
1628fake_reset_unused_stream (lsquic_stream_t *stream)
1629{
1630    stream->stream_flags |=
1631        STREAM_RST_RECVD    /* User will pick this up on read or write */
1632      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1633    ;
1634
1635    /* Cancel all writes to the network scheduled for this stream: */
1636    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1637        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1638                                                next_send_stream);
1639    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1640    drop_buffered_data(stream);
1641    LSQ_DEBUG("fake-reset stream%s",
1642                    stream_stalled(stream) ? " (stalled)" : "");
1643    maybe_finish_stream(stream);
1644    maybe_schedule_call_on_close(stream);
1645}
1646
1647
1648/* This function should only be called for locally-initiated streams whose ID
1649 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1650 * frame sent by peer but we have not yet received it and created a stream.
1651 * In this situation, we mark the stream as reset, so that user's on_read or
1652 * on_write event callback picks up the error.  That, in turn, should result
1653 * in stream being closed.
1654 *
1655 * If we have received any data frames on this stream, this probably indicates
1656 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1657 * lower than this.  However, we still try to handle it gracefully and peform
1658 * a shutdown, as if the stream was not reset.
1659 */
1660void
1661lsquic_stream_received_goaway (lsquic_stream_t *stream)
1662{
1663    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1664    if (0 == stream->read_offset &&
1665                            stream->data_in->di_if->di_empty(stream->data_in))
1666        fake_reset_unused_stream(stream);       /* Normal condition */
1667    else
1668    {   /* This is odd, let's handle it the best we can: */
1669        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1670        lsquic_stream_shutdown_internal(stream);
1671    }
1672}
1673
1674
1675uint64_t
1676lsquic_stream_read_offset (const lsquic_stream_t *stream)
1677{
1678    return stream->read_offset;
1679}
1680
1681
1682static int
1683stream_wantread (lsquic_stream_t *stream, int is_want)
1684{
1685    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
1686    const int new_val = !!is_want;
1687    if (old_val != new_val)
1688    {
1689        if (new_val)
1690        {
1691            if (!old_val)
1692                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1693                                                            next_read_stream);
1694            stream->sm_qflags |= SMQF_WANT_READ;
1695        }
1696        else
1697        {
1698            stream->sm_qflags &= ~SMQF_WANT_READ;
1699            if (old_val)
1700                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1701                                                            next_read_stream);
1702        }
1703    }
1704    return old_val;
1705}
1706
1707
1708static void
1709maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1710{
1711    assert(SMQF_WRITE_Q_FLAGS & flag);
1712    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1713        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1714                                                        next_write_stream);
1715    stream->sm_qflags |= flag;
1716}
1717
1718
1719static void
1720maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1721{
1722    assert(SMQF_WRITE_Q_FLAGS & flag);
1723    if (stream->sm_qflags & flag)
1724    {
1725        stream->sm_qflags &= ~flag;
1726        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1727            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1728                                                        next_write_stream);
1729    }
1730}
1731
1732
1733static int
1734stream_wantwrite (struct lsquic_stream *stream, int new_val)
1735{
1736    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1737
1738    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
1739
1740    if (old_val != new_val)
1741    {
1742        if (new_val)
1743            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
1744        else
1745            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
1746    }
1747    return old_val;
1748}
1749
1750
1751int
1752lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1753{
1754    SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want);
1755    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1756    {
1757        if (is_want)
1758            maybe_conn_to_tickable_if_readable(stream);
1759        return stream_wantread(stream, is_want);
1760    }
1761    else
1762    {
1763        errno = EBADF;
1764        return -1;
1765    }
1766}
1767
1768
1769int
1770lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1771{
1772    int old_val;
1773
1774    is_want = !!is_want;
1775
1776    SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want);
1777    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
1778                            && SSHS_BEGIN == stream->sm_send_headers_state)
1779    {
1780        stream->sm_saved_want_write = is_want;
1781        if (is_want)
1782            maybe_conn_to_tickable_if_writeable(stream, 1);
1783        return stream_wantwrite(stream, is_want);
1784    }
1785    else if (SSHS_BEGIN != stream->sm_send_headers_state)
1786    {
1787        old_val = stream->sm_saved_want_write;
1788        stream->sm_saved_want_write = is_want;
1789        return old_val;
1790    }
1791    else
1792    {
1793        errno = EBADF;
1794        return -1;
1795    }
1796}
1797
1798
1799struct progress
1800{
1801    enum stream_flags   s_flags;
1802    enum stream_q_flags q_flags;
1803};
1804
1805
1806static struct progress
1807stream_progress (const struct lsquic_stream *stream)
1808{
1809    return (struct progress) {
1810        .s_flags = stream->stream_flags
1811          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
1812        .q_flags = stream->sm_qflags
1813          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
1814    };
1815}
1816
1817
1818static int
1819progress_eq (struct progress a, struct progress b)
1820{
1821    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
1822}
1823
1824
1825static void
1826stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1827{
1828    unsigned no_progress_count, no_progress_limit;
1829    struct progress progress;
1830    uint64_t size;
1831
1832    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1833
1834    no_progress_count = 0;
1835    while ((stream->sm_qflags & SMQF_WANT_READ)
1836                                            && lsquic_stream_readable(stream))
1837    {
1838        progress = stream_progress(stream);
1839        size  = stream->read_offset;
1840
1841        stream->stream_if->on_read(stream, stream->st_ctx);
1842
1843        if (no_progress_limit && size == stream->read_offset &&
1844                                progress_eq(progress, stream_progress(stream)))
1845        {
1846            ++no_progress_count;
1847            if (no_progress_count >= no_progress_limit)
1848            {
1849                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1850                    "progress) in user code reading from stream",
1851                    no_progress_count,
1852                    no_progress_count == 1 ? "" : "s");
1853                break;
1854            }
1855        }
1856        else
1857            no_progress_count = 0;
1858    }
1859}
1860
1861
1862static void
1863stream_hblock_sent (struct lsquic_stream *stream)
1864{
1865    int want_write;
1866
1867    LSQ_DEBUG("header block has been sent: restore default behavior");
1868    stream->sm_send_headers_state = SSHS_BEGIN;
1869    stream->sm_write_avail = stream_write_avail_with_frames;
1870
1871    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1872    if (want_write != stream->sm_saved_want_write)
1873        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
1874
1875    if (stream->stream_flags & STREAM_DELAYED_SW)
1876    {
1877        LSQ_DEBUG("performing delayed shutdown write");
1878        stream->stream_flags &= ~STREAM_DELAYED_SW;
1879        stream_shutdown_write(stream);
1880        maybe_schedule_call_on_close(stream);
1881        maybe_finish_stream(stream);
1882        maybe_conn_to_tickable_if_writeable(stream, 1);
1883    }
1884}
1885
1886
1887static void
1888on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
1889{
1890    ssize_t nw;
1891
1892    nw = stream_write_buf(stream,
1893                stream->sm_header_block + stream->sm_hblock_off,
1894                stream->sm_hblock_sz - stream->sm_hblock_off);
1895    if (nw > 0)
1896    {
1897        stream->sm_hblock_off += nw;
1898        if (stream->sm_hblock_off == stream->sm_hblock_sz)
1899        {
1900            stream->stream_flags |= STREAM_HEADERS_SENT;
1901            free(stream->sm_header_block);
1902            stream->sm_header_block = NULL;
1903            stream->sm_hblock_sz = 0;
1904            stream_hblock_sent(stream);
1905            LSQ_DEBUG("header block written out successfully");
1906            /* TODO: if there was eos, do something else */
1907            if (stream->sm_qflags & SMQF_WANT_WRITE)
1908                stream->stream_if->on_write(stream, h);
1909        }
1910        else
1911        {
1912            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
1913                nw);
1914        }
1915    }
1916    else if (nw < 0)
1917    {
1918        /* XXX What should happen if we hit an error? TODO */
1919    }
1920}
1921
1922
1923static void
1924(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
1925                                                        lsquic_stream_ctx_t *)
1926{
1927    if (0 == (stream->stream_flags & STREAM_PUSHING)
1928                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
1929        /* Common case */
1930        return stream->stream_if->on_write;
1931    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
1932        return on_write_header_wrapper;
1933    else
1934    {
1935        assert(stream->stream_flags & STREAM_PUSHING);
1936        if (stream_is_pushing_promise(stream))
1937            return on_write_pp_wrapper;
1938        else if (stream->sm_dup_push_off < stream->sm_dup_push_len)
1939            return on_write_dp_wrapper;
1940        else
1941            return stream->stream_if->on_write;
1942    }
1943}
1944
1945
1946static void
1947stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1948{
1949    unsigned no_progress_count, no_progress_limit;
1950    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1951    struct progress progress;
1952
1953    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1954
1955    no_progress_count = 0;
1956    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1957    while ((stream->sm_qflags & SMQF_WANT_WRITE)
1958                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
1959                       && lsquic_stream_write_avail(stream))
1960    {
1961        progress = stream_progress(stream);
1962
1963        on_write = select_on_write(stream);
1964        on_write(stream, stream->st_ctx);
1965
1966        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
1967        {
1968            ++no_progress_count;
1969            if (no_progress_count >= no_progress_limit)
1970            {
1971                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1972                    "progress) in user code writing to stream",
1973                    no_progress_count,
1974                    no_progress_count == 1 ? "" : "s");
1975                break;
1976            }
1977        }
1978        else
1979            no_progress_count = 0;
1980    }
1981}
1982
1983
1984static void
1985stream_dispatch_read_events_once (lsquic_stream_t *stream)
1986{
1987    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
1988    {
1989        stream->stream_if->on_read(stream, stream->st_ctx);
1990    }
1991}
1992
1993
1994uint64_t
1995lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
1996{
1997    size_t frames_sizes;
1998
1999    frames_sizes = active_hq_frame_sizes(stream);
2000    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
2001}
2002
2003
2004static void
2005maybe_mark_as_blocked (lsquic_stream_t *stream)
2006{
2007    struct lsquic_conn_cap *cc;
2008    uint64_t used;
2009
2010    used = lsquic_stream_combined_send_off(stream);
2011    if (stream->max_send_off == used)
2012    {
2013        if (stream->blocked_off < stream->max_send_off)
2014        {
2015            stream->blocked_off = used;
2016            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
2017                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2018                                                            next_send_stream);
2019            stream->sm_qflags |= SMQF_SEND_BLOCKED;
2020            LSQ_DEBUG("marked stream-blocked at stream offset "
2021                                            "%"PRIu64, stream->blocked_off);
2022        }
2023        else
2024            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
2025                " has been, or is about to be, sent", stream->blocked_off);
2026    }
2027
2028    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
2029        && (cc = &stream->conn_pub->conn_cap,
2030                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
2031    {
2032        if (cc->cc_blocked < cc->cc_max)
2033        {
2034            cc->cc_blocked = cc->cc_max;
2035            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
2036            LSQ_DEBUG("marked connection-blocked at connection offset "
2037                                                    "%"PRIu64, cc->cc_max);
2038        }
2039        else
2040            LSQ_DEBUG("stream has already been marked connection-blocked "
2041                "at offset %"PRIu64, cc->cc_blocked);
2042    }
2043}
2044
2045
2046void
2047lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
2048{
2049    assert(stream->sm_qflags & SMQF_WANT_READ);
2050
2051    if (stream->sm_bflags & SMBF_RW_ONCE)
2052        stream_dispatch_read_events_once(stream);
2053    else
2054        stream_dispatch_read_events_loop(stream);
2055}
2056
2057
2058void
2059lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
2060{
2061    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2062    int progress;
2063    uint64_t tosend_off;
2064    unsigned short n_buffered;
2065    enum stream_q_flags q_flags;
2066
2067    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
2068    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
2069    tosend_off = stream->tosend_off;
2070    n_buffered = stream->sm_n_buffered;
2071
2072    if (stream->sm_qflags & SMQF_WANT_FLUSH)
2073        (void) stream_flush(stream);
2074
2075    if (stream->sm_bflags & SMBF_RW_ONCE)
2076    {
2077        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2078            && lsquic_stream_write_avail(stream))
2079        {
2080            on_write = select_on_write(stream);
2081            on_write(stream, stream->st_ctx);
2082        }
2083    }
2084    else
2085        stream_dispatch_write_events_loop(stream);
2086
2087    /* Progress means either flags or offsets changed: */
2088    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2089                        stream->tosend_off == tosend_off &&
2090                            stream->sm_n_buffered == n_buffered);
2091
2092    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2093    {
2094        if (progress)
2095        {   /* Move the stream to the end of the list to ensure fairness. */
2096            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2097                                                            next_write_stream);
2098            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2099                                                            next_write_stream);
2100        }
2101    }
2102}
2103
2104
2105static size_t
2106inner_reader_empty_size (void *ctx)
2107{
2108    return 0;
2109}
2110
2111
2112static size_t
2113inner_reader_empty_read (void *ctx, void *buf, size_t count)
2114{
2115    return 0;
2116}
2117
2118
2119static int
2120stream_flush (lsquic_stream_t *stream)
2121{
2122    struct lsquic_reader empty_reader;
2123    ssize_t nw;
2124
2125    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2126    assert(stream->sm_n_buffered > 0 ||
2127        /* Flushing is also used to packetize standalone FIN: */
2128        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2129                                                    == STREAM_U_WRITE_DONE));
2130
2131    empty_reader.lsqr_size = inner_reader_empty_size;
2132    empty_reader.lsqr_read = inner_reader_empty_read;
2133    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2134    nw = stream_write_to_packets(stream, &empty_reader, 0);
2135
2136    if (nw >= 0)
2137    {
2138        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2139        return 0;
2140    }
2141    else
2142        return -1;
2143}
2144
2145
2146static int
2147stream_flush_nocheck (lsquic_stream_t *stream)
2148{
2149    size_t frames;
2150
2151    frames = active_hq_frame_sizes(stream);
2152    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2153    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2154    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2155    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2156
2157    return stream_flush(stream);
2158}
2159
2160
2161int
2162lsquic_stream_flush (lsquic_stream_t *stream)
2163{
2164    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2165    {
2166        LSQ_DEBUG("cannot flush closed stream");
2167        errno = EBADF;
2168        return -1;
2169    }
2170
2171    if (0 == stream->sm_n_buffered)
2172    {
2173        LSQ_DEBUG("flushing 0 bytes: noop");
2174        return 0;
2175    }
2176
2177    return stream_flush_nocheck(stream);
2178}
2179
2180
2181static size_t
2182stream_get_n_allowed (const struct lsquic_stream *stream)
2183{
2184    if (stream->sm_n_allocated)
2185        return stream->sm_n_allocated;
2186    else
2187        return stream->conn_pub->path->np_pack_size;
2188}
2189
2190
2191/* The flush threshold is the maximum size of stream data that can be sent
2192 * in a full packet.
2193 */
2194#ifdef NDEBUG
2195static
2196#endif
2197       size_t
2198lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2199                                                            unsigned data_sz)
2200{
2201    enum packet_out_flags flags;
2202    enum packno_bits bits;
2203    size_t packet_header_sz, stream_header_sz, tag_len;
2204    size_t threshold;
2205
2206    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
2207    flags = bits << POBIT_SHIFT;
2208    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2209        flags |= PO_CONN_ID;
2210    if (stream_is_hsk(stream))
2211        flags |= PO_LONGHEAD;
2212
2213    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2214                                        stream->conn_pub->path->np_dcid.len);
2215    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2216    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2217
2218    threshold = stream_get_n_allowed(stream) - tag_len
2219              - packet_header_sz - stream_header_sz;
2220    return threshold;
2221}
2222
2223
2224#define COMMON_WRITE_CHECKS() do {                                          \
2225    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2226            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2227    {                                                                       \
2228        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2229        {                                                                   \
2230            LSQ_DEBUG("still sending headers: no writing allowed");         \
2231            return 0;                                                       \
2232        }                                                                   \
2233        else                                                                \
2234        {                                                                   \
2235            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2236                                                                "headers"); \
2237            errno = EILSEQ;                                                 \
2238            return -1;                                                      \
2239        }                                                                   \
2240    }                                                                       \
2241    if (lsquic_stream_is_reset(stream))                                     \
2242    {                                                                       \
2243        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2244        errno = ECONNRESET;                                                 \
2245        return -1;                                                          \
2246    }                                                                       \
2247    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2248    {                                                                       \
2249        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2250                                                                "writing"); \
2251        errno = EBADF;                                                      \
2252        return -1;                                                          \
2253    }                                                                       \
2254} while (0)
2255
2256
2257struct frame_gen_ctx
2258{
2259    lsquic_stream_t      *fgc_stream;
2260    struct lsquic_reader *fgc_reader;
2261    /* We keep our own count of how many bytes were read from reader because
2262     * some readers are external.  The external caller does not have to rely
2263     * on our count, but it can.
2264     */
2265    size_t                fgc_nread_from_reader;
2266    size_t              (*fgc_size) (void *ctx);
2267    int                 (*fgc_fin) (void *ctx);
2268    gsf_read_f            fgc_read;
2269};
2270
2271
2272static size_t
2273frame_std_gen_size (void *ctx)
2274{
2275    struct frame_gen_ctx *fg_ctx = ctx;
2276    size_t available, remaining;
2277
2278    /* Make sure we are not writing past available size: */
2279    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2280    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2281    if (available < remaining)
2282        remaining = available;
2283
2284    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2285}
2286
2287
2288static size_t
2289stream_hq_frame_size (const struct stream_hq_frame *shf)
2290{
2291    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2292        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2293    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2294        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2295    else
2296    {
2297        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2298                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2299        return 0;
2300    }
2301}
2302
2303
2304static size_t
2305active_hq_frame_sizes (const struct lsquic_stream *stream)
2306{
2307    const struct stream_hq_frame *shf;
2308    size_t size;
2309
2310    size = 0;
2311    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2312                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2313        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2314            if (!(shf->shf_flags & SHF_WRITTEN))
2315                size += stream_hq_frame_size(shf);
2316
2317    return size;
2318}
2319
2320
2321static uint64_t
2322stream_hq_frame_end (const struct stream_hq_frame *shf)
2323{
2324    if (shf->shf_flags & SHF_FIXED_SIZE)
2325        return shf->shf_off + shf->shf_frame_size;
2326    else if (shf->shf_flags & SHF_TWO_BYTES)
2327        return shf->shf_off + ((1 << 14) - 1);
2328    else
2329        return shf->shf_off + ((1 << 6) - 1);
2330}
2331
2332
2333static int
2334frame_in_stream (const struct lsquic_stream *stream,
2335                                            const struct stream_hq_frame *shf)
2336{
2337    return shf >= stream->sm_hq_frame_arr
2338        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2339                                        / sizeof(stream->sm_hq_frame_arr[0])
2340        ;
2341}
2342
2343
2344static void
2345stream_hq_frame_put (struct lsquic_stream *stream,
2346                                                struct stream_hq_frame *shf)
2347{
2348    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2349    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2350    if (frame_in_stream(stream, shf))
2351        memset(shf, 0, sizeof(*shf));
2352    else
2353        lsquic_malo_put(shf);
2354}
2355
2356
2357static void
2358stream_hq_frame_close (struct lsquic_stream *stream,
2359                                                struct stream_hq_frame *shf)
2360{
2361    unsigned bits;
2362
2363    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2364        " (actual offset %"PRIu64")", shf->shf_frame_type,
2365        stream->sm_payload, stream->tosend_off);
2366    assert(shf->shf_flags & SHF_ACTIVE);
2367    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2368    {
2369        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2370        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2371        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2372                                                            bits, 1 << bits);
2373    }
2374    stream_hq_frame_put(stream, shf);
2375}
2376
2377
2378static size_t
2379frame_hq_gen_size (void *ctx)
2380{
2381    struct frame_gen_ctx *fg_ctx = ctx;
2382    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2383    size_t available, remaining, frames;
2384    const struct stream_hq_frame *shf;
2385
2386    frames = 0;
2387    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2388        if (shf->shf_off >= stream->sm_payload)
2389            frames += stream_hq_frame_size(shf);
2390
2391    /* Make sure we are not writing past available size: */
2392    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2393    available = lsquic_stream_write_avail(stream);
2394    if (available < remaining)
2395        remaining = available;
2396
2397    return remaining + stream->sm_n_buffered + frames;
2398}
2399
2400
2401static int
2402frame_std_gen_fin (void *ctx)
2403{
2404    struct frame_gen_ctx *fg_ctx = ctx;
2405    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2406        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2407        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2408        /* Do not use frame_std_gen_size() as it may chop the real size: */
2409        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2410}
2411
2412
2413static void
2414incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2415{
2416    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2417    {
2418        stream->conn_pub->conn_cap.cc_sent += incr;
2419        assert(stream->conn_pub->conn_cap.cc_sent
2420                                    <= stream->conn_pub->conn_cap.cc_max);
2421    }
2422}
2423
2424
2425void
2426incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2427{
2428    stream->sm_payload += incr;
2429    stream->tosend_off += incr;
2430    assert(stream->tosend_off <= stream->max_send_off);
2431}
2432
2433
2434static size_t
2435frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2436{
2437    struct frame_gen_ctx *fg_ctx = ctx;
2438    unsigned char *p = begin_buf;
2439    unsigned char *const end = p + len;
2440    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2441    size_t n_written, available, n_to_write;
2442
2443    if (stream->sm_n_buffered > 0)
2444    {
2445        if (len <= stream->sm_n_buffered)
2446        {
2447            memcpy(p, stream->sm_buf, len);
2448            memmove(stream->sm_buf, stream->sm_buf + len,
2449                                                stream->sm_n_buffered - len);
2450            stream->sm_n_buffered -= len;
2451            if (0 == stream->sm_n_buffered)
2452                maybe_resize_stream_buffer(stream);
2453            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2454            incr_sm_payload(stream, len);
2455            *fin = fg_ctx->fgc_fin(fg_ctx);
2456            return len;
2457        }
2458        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2459        p += stream->sm_n_buffered;
2460        stream->sm_n_buffered = 0;
2461        maybe_resize_stream_buffer(stream);
2462    }
2463
2464    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2465    n_to_write = end - p;
2466    if (n_to_write > available)
2467        n_to_write = available;
2468    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2469                                              n_to_write);
2470    p += n_written;
2471    fg_ctx->fgc_nread_from_reader += n_written;
2472    *fin = fg_ctx->fgc_fin(fg_ctx);
2473    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2474    incr_conn_cap(stream, n_written);
2475    return p - (const unsigned char *) begin_buf;
2476}
2477
2478
2479static struct stream_hq_frame *
2480find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2481{
2482    struct stream_hq_frame *shf;
2483
2484    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2485        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2486            return shf;
2487
2488    return NULL;
2489}
2490
2491
2492static struct stream_hq_frame *
2493find_cur_hq_frame (const struct lsquic_stream *stream)
2494{
2495    return find_hq_frame(stream, stream->sm_payload);
2496}
2497
2498
2499static struct stream_hq_frame *
2500open_hq_frame (struct lsquic_stream *stream)
2501{
2502    struct stream_hq_frame *shf;
2503
2504    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2505            + sizeof(stream->sm_hq_frame_arr)
2506                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2507        if (!(shf->shf_flags & SHF_ACTIVE))
2508            goto found;
2509
2510    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2511    if (!shf)
2512    {
2513        LSQ_WARN("cannot allocate HQ frame");
2514        return NULL;
2515    }
2516    memset(shf, 0, sizeof(*shf));
2517
2518  found:
2519    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2520    shf->shf_flags = SHF_ACTIVE;
2521    return shf;
2522}
2523
2524
2525/* Returns index of the new frame */
2526static struct stream_hq_frame *
2527stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2528            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2529{
2530    struct stream_hq_frame *shf;
2531
2532    shf = open_hq_frame(stream);
2533    if (!shf)
2534    {
2535        LSQ_WARN("could not open HQ frame");
2536        return NULL;
2537    }
2538
2539    shf->shf_off        = off;
2540    shf->shf_flags     |= flags;
2541    shf->shf_frame_type = frame_type;
2542    if (shf->shf_flags & SHF_FIXED_SIZE)
2543    {
2544        shf->shf_frame_size = size;
2545        LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset "
2546            "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size);
2547    }
2548    else
2549    {
2550        shf->shf_frame_ptr  = NULL;
2551        if (size >= (1 << 6))
2552            shf->shf_flags |= SHF_TWO_BYTES;
2553        LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset "
2554            "%"PRIu64, shf->shf_frame_type, shf->shf_off);
2555    }
2556
2557    return shf;
2558}
2559
2560
2561static size_t
2562frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2563{
2564    struct frame_gen_ctx *fg_ctx = ctx;
2565    unsigned char *p = begin_buf;
2566    unsigned char *const end = p + len;
2567    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2568    struct stream_hq_frame *shf;
2569    size_t nw, frame_sz, avail, rem;
2570    unsigned bits;
2571
2572    while (p < end)
2573    {
2574        shf = find_cur_hq_frame(stream);
2575        if (shf)
2576            LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64,
2577                                            shf->shf_frame_type, shf->shf_off);
2578        else
2579        {
2580            rem = frame_std_gen_size(ctx);
2581            if (rem)
2582            {
2583                if (rem > ((1 << 14) - 1))
2584                    rem = (1 << 14) - 1;
2585                shf = stream_activate_hq_frame(stream,
2586                                    stream->sm_payload, HQFT_DATA, 0, rem);
2587                if (!shf)
2588                {
2589                    /* TODO: abort connection?  Handle failure somehow */
2590                    break;
2591                }
2592            }
2593            else
2594                break;
2595        }
2596        avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
2597        if (shf->shf_off == stream->sm_payload
2598                                        && !(shf->shf_flags & SHF_WRITTEN))
2599        {
2600            frame_sz = stream_hq_frame_size(shf);
2601            if (frame_sz > (uintptr_t) (end - p))
2602            {
2603                stream_hq_frame_put(stream, shf);
2604                break;
2605            }
2606            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2607                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2608                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2609            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2610            {
2611                shf->shf_frame_ptr = p;
2612                memset(p, 0, frame_sz);
2613                p += frame_sz;
2614            }
2615            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2616                                                            == SHF_FIXED_SIZE)
2617            {
2618                *p++ = shf->shf_frame_type;
2619                bits = vint_val2bits(shf->shf_frame_size);
2620                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2621                p += 1 << bits;
2622            }
2623            else
2624                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2625                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2626            if (!(shf->shf_flags & SHF_CC_PAID))
2627            {
2628                incr_conn_cap(stream, frame_sz);
2629                shf->shf_flags |= SHF_CC_PAID;
2630            }
2631            shf->shf_flags |= SHF_WRITTEN;
2632            stream->tosend_off += frame_sz;
2633            assert(stream->tosend_off <= stream->max_send_off);
2634        }
2635        else
2636        {
2637            len = stream_hq_frame_end(shf) - stream->sm_payload;
2638            assert(len);
2639            if (len > (unsigned) (end - p))
2640                len = end - p;
2641            if (len > avail)
2642                len = avail;
2643            if (!len)
2644                break;
2645            nw = frame_std_gen_read(ctx, p, len, fin);
2646            p += nw;
2647            if (nw < len)
2648                break;
2649            if (stream_hq_frame_end(shf) == stream->sm_payload)
2650                stream_hq_frame_close(stream, shf);
2651        }
2652    }
2653
2654    return p - (unsigned char *) begin_buf;
2655}
2656
2657
2658static size_t
2659crypto_frame_gen_read (void *ctx, void *buf, size_t len)
2660{
2661    int fin_ignored;
2662
2663    return frame_std_gen_read(ctx, buf, len, &fin_ignored);
2664}
2665
2666
2667static void
2668check_flush_threshold (lsquic_stream_t *stream)
2669{
2670    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
2671                            stream->tosend_off >= stream->sm_flush_to)
2672    {
2673        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
2674                                                    stream->sm_flush_to);
2675        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
2676    }
2677}
2678
2679
2680static int
2681write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
2682                                        struct lsquic_packet_out *packet_out)
2683{
2684    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2685    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2686    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2687    unsigned off;
2688    int len, s;
2689
2690#if LSQUIC_CONN_STATS
2691    const uint64_t begin_off = stream->tosend_off;
2692#endif
2693    off = packet_out->po_data_sz;
2694    len = pf->pf_gen_stream_frame(
2695                packet_out->po_data + packet_out->po_data_sz,
2696                lsquic_packet_out_avail(packet_out), stream->id,
2697                stream->tosend_off,
2698                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
2699    if (len < 0)
2700        return len;
2701
2702#if LSQUIC_CONN_STATS
2703    stream->conn_pub->conn_stats->out.stream_frames += 1;
2704    stream->conn_pub->conn_stats->out.stream_data_sz
2705                                            += stream->tosend_off - begin_off;
2706#endif
2707    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
2708                            packet_out->po_data + packet_out->po_data_sz, len);
2709    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2710    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
2711    if (0 == lsquic_packet_out_avail(packet_out))
2712        packet_out->po_flags |= PO_STREAM_END;
2713    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2714                                     stream, QUIC_FRAME_STREAM, off, len);
2715    if (s != 0)
2716    {
2717        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
2718        return -1;
2719    }
2720
2721    check_flush_threshold(stream);
2722    return len;
2723}
2724
2725
2726static enum swtp_status
2727stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
2728{
2729    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2730    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2731    struct lsquic_packet_out *packet_out;
2732    int len;
2733
2734    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
2735                                                    stream->conn_pub->path);
2736    if (!packet_out)
2737        return SWTP_STOP;
2738    packet_out->po_header_type = stream->tosend_off == 0
2739                                        ? HETY_INITIAL : HETY_HANDSHAKE;
2740
2741    len = write_stream_frame(fg_ctx, size, packet_out);
2742
2743    if (len > 0)
2744    {
2745        packet_out->po_flags |= PO_HELLO;
2746        lsquic_packet_out_zero_pad(packet_out);
2747        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
2748        return SWTP_OK;
2749    }
2750    else
2751        return SWTP_ERROR;
2752}
2753
2754
2755static enum swtp_status
2756stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
2757{
2758    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2759    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2760    unsigned stream_header_sz, need_at_least;
2761    struct lsquic_packet_out *packet_out;
2762    struct lsquic_stream *headers_stream;
2763    int len;
2764
2765    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
2766                                                        == STREAM_HEADERS_SENT)
2767    {
2768        if (stream->sm_bflags & SMBF_IETF)
2769        {
2770            if (stream->stream_flags & STREAM_ENCODER_DEP)
2771                headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out;
2772            else
2773                headers_stream = NULL;
2774        }
2775        else
2776            headers_stream =
2777                lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
2778        if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream))
2779        {
2780            LSQ_DEBUG("flushing headers stream before packetizing stream data");
2781            (void) lsquic_stream_flush(headers_stream);
2782        }
2783        /* If there is nothing to flush, some other stream must have flushed it:
2784         * this means our headers are flushed.  Either way, only do this once.
2785         */
2786        stream->stream_flags |= STREAM_HDRS_FLUSHED;
2787    }
2788
2789    stream_header_sz = stream->sm_frame_header_sz(stream, size);
2790    need_at_least = stream_header_sz;
2791    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2792                                       == (SMBF_IETF|SMBF_USE_HEADERS))
2793    {
2794        if (size > 0)
2795            need_at_least += 3;     /* Enough room for HTTP/3 frame */
2796    }
2797    else
2798        need_at_least += size > 0;
2799  get_packet:
2800    packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl,
2801                                need_at_least, stream->conn_pub->path, stream);
2802    if (packet_out)
2803    {
2804        len = write_stream_frame(fg_ctx, size, packet_out);
2805        if (len > 0)
2806            return SWTP_OK;
2807        assert(len < 0);
2808        if (-len > (int) need_at_least)
2809        {
2810            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
2811                "%u bytes, will try again", -len, need_at_least);
2812            need_at_least = -len;
2813            goto get_packet;
2814        }
2815        return SWTP_ERROR;
2816    }
2817    else
2818        return SWTP_STOP;
2819}
2820
2821
2822static enum swtp_status
2823stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
2824{
2825    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2826    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2827    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2828    unsigned crypto_header_sz, need_at_least;
2829    struct lsquic_packet_out *packet_out;
2830    unsigned short off;
2831    const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ];
2832    int len, s;
2833
2834    assert(size > 0);
2835    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
2836    need_at_least = crypto_header_sz + 1;
2837
2838    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
2839                                    need_at_least, pns, stream->conn_pub->path);
2840    if (!packet_out)
2841        return SWTP_STOP;
2842
2843    off = packet_out->po_data_sz;
2844    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
2845                lsquic_packet_out_avail(packet_out), stream->tosend_off,
2846                size, crypto_frame_gen_read, fg_ctx);
2847    if (len < 0)
2848        return len;
2849
2850    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
2851                            packet_out->po_data + packet_out->po_data_sz, len);
2852    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2853    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
2854    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2855                                     stream, QUIC_FRAME_CRYPTO, off, len);
2856    if (s != 0)
2857    {
2858        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
2859        return -1;
2860    }
2861
2862    packet_out->po_flags |= PO_HELLO;
2863
2864    check_flush_threshold(stream);
2865    return SWTP_OK;
2866}
2867
2868
2869static void
2870abort_connection (struct lsquic_stream *stream)
2871{
2872    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
2873        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
2874                                                next_service_stream);
2875    stream->sm_qflags |= SMQF_ABORT_CONN;
2876    LSQ_WARN("connection will be aborted");
2877    maybe_conn_to_tickable(stream);
2878}
2879
2880
2881static void
2882maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
2883{
2884    struct stream_hq_frame *shf;
2885    uint64_t size;
2886    unsigned bits;
2887
2888    shf = find_cur_hq_frame(stream);
2889    if (!shf)
2890        return;
2891
2892    if (shf->shf_flags & SHF_FIXED_SIZE)
2893    {
2894        if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload)
2895            stream_hq_frame_put(stream, shf);
2896        return;
2897    }
2898
2899    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2900    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
2901    if (size && size <= VINT_MAX_B(bits) && shf->shf_frame_ptr)
2902    {
2903        if (0 == stream->sm_n_buffered)
2904            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
2905                                                shf->shf_frame_type, size);
2906        else
2907            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
2908                                                shf->shf_frame_type, size);
2909        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2910        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
2911        if (0 == stream->sm_n_buffered)
2912            stream_hq_frame_put(stream, shf);
2913        else
2914        {
2915            shf->shf_frame_size = size;
2916            shf->shf_flags |= SHF_FIXED_SIZE;
2917        }
2918    }
2919    else if (!shf->shf_frame_ptr)
2920    {
2921        LSQ_WARN("dangling HTTP/3 frame");
2922        stream->conn_pub->lconn->cn_if->ci_internal_error(
2923            stream->conn_pub->lconn, "dangling HTTP/3 frame on stream %"PRIu64,
2924                stream->id);
2925        stream_hq_frame_put(stream, shf);
2926    }
2927    else if (!size)
2928    {
2929        assert(!shf->shf_frame_ptr);
2930        LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")",
2931                                        shf->shf_frame_type, shf->shf_off);
2932        stream_hq_frame_put(stream, shf);
2933    }
2934    else
2935    {
2936        assert(stream->sm_n_buffered);
2937        LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size);
2938        /* TODO: abort connection */
2939        stream_hq_frame_put(stream, shf);
2940    }
2941}
2942
2943
2944static ssize_t
2945stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
2946                         size_t thresh)
2947{
2948    size_t size;
2949    ssize_t nw;
2950    unsigned seen_ok;
2951    int use_framing;
2952    struct frame_gen_ctx fg_ctx = {
2953        .fgc_stream = stream,
2954        .fgc_reader = reader,
2955        .fgc_nread_from_reader = 0,
2956    };
2957
2958    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2959                                       == (SMBF_IETF|SMBF_USE_HEADERS);
2960    if (use_framing)
2961    {
2962        fg_ctx.fgc_size = frame_hq_gen_size;
2963        fg_ctx.fgc_read = frame_hq_gen_read;
2964        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
2965    }
2966    else
2967    {
2968        fg_ctx.fgc_size = frame_std_gen_size;
2969        fg_ctx.fgc_read = frame_std_gen_read;
2970        fg_ctx.fgc_fin = frame_std_gen_fin;
2971    }
2972
2973    seen_ok = 0;
2974    while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0)
2975           || fg_ctx.fgc_fin(&fg_ctx))
2976    {
2977        switch (stream->sm_write_to_packet(&fg_ctx, size))
2978        {
2979        case SWTP_OK:
2980            if (!seen_ok++)
2981                maybe_conn_to_tickable_if_writeable(stream, 0);
2982            if (fg_ctx.fgc_fin(&fg_ctx))
2983            {
2984                if (use_framing && seen_ok)
2985                    maybe_close_varsize_hq_frame(stream);
2986                stream->stream_flags |= STREAM_FIN_SENT;
2987                goto end;
2988            }
2989            else
2990                break;
2991        case SWTP_STOP:
2992            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2993            if (use_framing && seen_ok)
2994                maybe_close_varsize_hq_frame(stream);
2995            goto end;
2996        default:
2997            abort_connection(stream);
2998            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2999            return -1;
3000        }
3001    }
3002
3003    if (use_framing && seen_ok)
3004        maybe_close_varsize_hq_frame(stream);
3005
3006    if (thresh)
3007    {
3008        assert(size < thresh);
3009        assert(size >= stream->sm_n_buffered);
3010        size -= stream->sm_n_buffered;
3011        if (size > 0)
3012        {
3013            nw = save_to_buffer(stream, reader, size);
3014            if (nw < 0)
3015                return -1;
3016            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
3017        }
3018    }
3019    else
3020    {
3021        /* We count flushed data towards both stream and connection limits,
3022         * so we should have been able to packetize all of it:
3023         */
3024        assert(0 == stream->sm_n_buffered);
3025        assert(size == 0);
3026    }
3027
3028    maybe_mark_as_blocked(stream);
3029
3030  end:
3031    return fg_ctx.fgc_nread_from_reader;
3032}
3033
3034
3035/* Perform an implicit flush when we hit connection limit while buffering
3036 * data.  This is to prevent a (theoretical) stall:
3037 *
3038 * Imagine a number of streams, all of which buffered some data.  The buffered
3039 * data is up to connection cap, which means no further writes are possible.
3040 * None of them flushes, which means that data is not sent and connection
3041 * WINDOW_UPDATE frame never arrives from peer.  Stall.
3042 */
3043static int
3044maybe_flush_stream (struct lsquic_stream *stream)
3045{
3046    if (stream->sm_n_buffered > 0
3047          && (stream->sm_bflags & SMBF_CONN_LIMITED)
3048            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
3049        return stream_flush_nocheck(stream);
3050    else
3051        return 0;
3052}
3053
3054
3055static int
3056stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
3057                                                                    unsigned len)
3058{
3059    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
3060        && cur_off - shf->shf_off < (1 << 6)
3061        && cur_off - shf->shf_off + len >= (1 << 6)
3062        ;
3063}
3064
3065
3066/* Update currently buffered HQ frame or create a new one, if possible.
3067 * Return update length to be buffered.  If a HQ frame cannot be
3068 * buffered due to size, 0 is returned, thereby preventing both HQ frame
3069 * creation and buffering.
3070 */
3071static size_t
3072update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
3073                                                                size_t avail)
3074{
3075    struct stream_hq_frame *shf;
3076    uint64_t cur_off, end;
3077    size_t frame_sz;
3078    unsigned extendable;
3079
3080    cur_off = stream->sm_payload + stream->sm_n_buffered;
3081    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3082        if (shf->shf_off <= cur_off)
3083        {
3084            end = stream_hq_frame_end(shf);
3085            extendable = stream_hq_frame_extendable(shf, cur_off, len);
3086            if (cur_off < end + extendable)
3087                break;
3088        }
3089
3090    if (shf)
3091    {
3092        if (len > end + extendable - cur_off)
3093            len = end + extendable - cur_off;
3094        frame_sz = stream_hq_frame_size(shf);
3095    }
3096    else
3097    {
3098        assert(avail >= 3);
3099        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
3100        /* XXX malloc can fail */
3101        if (len > stream_hq_frame_end(shf) - cur_off)
3102            len = stream_hq_frame_end(shf) - cur_off;
3103        extendable = 0;
3104        frame_sz = stream_hq_frame_size(shf);
3105        if (avail < frame_sz)
3106            return 0;
3107        avail -= frame_sz;
3108    }
3109
3110    if (!(shf->shf_flags & SHF_CC_PAID))
3111    {
3112        incr_conn_cap(stream, frame_sz);
3113        shf->shf_flags |= SHF_CC_PAID;
3114    }
3115    if (extendable)
3116    {
3117        shf->shf_flags |= SHF_TWO_BYTES;
3118        incr_conn_cap(stream, 1);
3119        avail -= 1;
3120        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3121                && shf->shf_off <= stream->sm_payload
3122                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3123            stream->sm_flush_to += 1;
3124    }
3125
3126    if (len <= avail)
3127        return len;
3128    else
3129        return avail;
3130}
3131
3132
3133static ssize_t
3134save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3135                                                                size_t len)
3136{
3137    size_t avail, n_written, n_allowed;
3138
3139    avail = lsquic_stream_write_avail(stream);
3140    if (avail < len)
3141        len = avail;
3142    if (len == 0)
3143    {
3144        LSQ_DEBUG("zero-byte write (avail: %zu)", avail);
3145        return 0;
3146    }
3147
3148    n_allowed = stream_get_n_allowed(stream);
3149    assert(stream->sm_n_buffered + len <= n_allowed);
3150
3151    if (!stream->sm_buf)
3152    {
3153        stream->sm_buf = malloc(n_allowed);
3154        if (!stream->sm_buf)
3155            return -1;
3156        stream->sm_n_allocated = n_allowed;
3157    }
3158
3159    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3160                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3161        len = update_buffered_hq_frames(stream, len, avail);
3162
3163    n_written = reader->lsqr_read(reader->lsqr_ctx,
3164                        stream->sm_buf + stream->sm_n_buffered, len);
3165    stream->sm_n_buffered += n_written;
3166    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3167    incr_conn_cap(stream, n_written);
3168    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3169              n_written, stream->sm_n_buffered);
3170    if (0 != maybe_flush_stream(stream))
3171        return -1;
3172    return n_written;
3173}
3174
3175
3176static ssize_t
3177stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
3178{
3179    const struct stream_hq_frame *shf;
3180    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3181    ssize_t nw;
3182
3183    len = reader->lsqr_size(reader->lsqr_ctx);
3184    if (len == 0)
3185        return 0;
3186
3187    frames = 0;
3188    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3189                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3190        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3191            if (shf->shf_off >= stream->sm_payload)
3192                frames += stream_hq_frame_size(shf);
3193    total_len = len + frames + stream->sm_n_buffered;
3194    thresh = lsquic_stream_flush_threshold(stream, total_len);
3195    n_allowed = stream_get_n_allowed(stream);
3196    if (total_len <= n_allowed && total_len < thresh)
3197    {
3198        nwritten = 0;
3199        do
3200        {
3201            nw = save_to_buffer(stream, reader, len - nwritten);
3202            if (nw > 0)
3203                nwritten += (size_t) nw;
3204            else if (nw == 0)
3205                break;
3206            else
3207                return nw;
3208        }
3209        while (nwritten < len
3210                        && stream->sm_n_buffered < stream->sm_n_allocated);
3211        return nwritten;
3212    }
3213    else
3214        return stream_write_to_packets(stream, reader, thresh);
3215}
3216
3217
3218ssize_t
3219lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3220{
3221    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3222    return lsquic_stream_writev(stream, &iov, 1);
3223}
3224
3225
3226struct inner_reader_iovec {
3227    const struct iovec       *iov;
3228    const struct iovec *end;
3229    unsigned                  cur_iovec_off;
3230};
3231
3232
3233static size_t
3234inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3235{
3236    struct inner_reader_iovec *const iro = ctx;
3237    unsigned char *p = buf;
3238    unsigned char *const end = p + count;
3239    unsigned n_tocopy;
3240
3241    while (iro->iov < iro->end && p < end)
3242    {
3243        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3244        if (n_tocopy > (unsigned) (end - p))
3245            n_tocopy = end - p;
3246        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3247                                                                    n_tocopy);
3248        p += n_tocopy;
3249        iro->cur_iovec_off += n_tocopy;
3250        if (iro->iov->iov_len == iro->cur_iovec_off)
3251        {
3252            ++iro->iov;
3253            iro->cur_iovec_off = 0;
3254        }
3255    }
3256
3257    return p + count - end;
3258}
3259
3260
3261static size_t
3262inner_reader_iovec_size (void *ctx)
3263{
3264    struct inner_reader_iovec *const iro = ctx;
3265    const struct iovec *iov;
3266    size_t size;
3267
3268    size = 0;
3269    for (iov = iro->iov; iov < iro->end; ++iov)
3270        size += iov->iov_len;
3271
3272    return size - iro->cur_iovec_off;
3273}
3274
3275
3276ssize_t
3277lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3278                                                                    int iovcnt)
3279{
3280    COMMON_WRITE_CHECKS();
3281    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3282
3283    struct inner_reader_iovec iro = {
3284        .iov = iov,
3285        .end = iov + iovcnt,
3286        .cur_iovec_off = 0,
3287    };
3288    struct lsquic_reader reader = {
3289        .lsqr_read = inner_reader_iovec_read,
3290        .lsqr_size = inner_reader_iovec_size,
3291        .lsqr_ctx  = &iro,
3292    };
3293
3294    return stream_write(stream, &reader);
3295}
3296
3297
3298ssize_t
3299lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3300{
3301    COMMON_WRITE_CHECKS();
3302    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3303    return stream_write(stream, reader);
3304}
3305
3306
3307/* This bypasses COMMON_WRITE_CHECKS */
3308static ssize_t
3309stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3310{
3311    struct iovec iov = { (void *) buf, sz, };
3312    struct inner_reader_iovec iro = {
3313        .iov = &iov,
3314        .end = &iov + 1,
3315        .cur_iovec_off = 0,
3316    };
3317    struct lsquic_reader reader = {
3318        .lsqr_read = inner_reader_iovec_read,
3319        .lsqr_size = inner_reader_iovec_size,
3320        .lsqr_ctx  = &iro,
3321    };
3322    return stream_write(stream, &reader);
3323}
3324
3325
3326/* XXX Move this define elsewhere? */
3327#define MAX_HEADERS_SIZE (64 * 1024)
3328
3329static int
3330send_headers_ietf (struct lsquic_stream *stream,
3331                            const struct lsquic_http_headers *headers, int eos)
3332{
3333    enum qwh_status qwh;
3334    const size_t max_prefix_size =
3335                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
3336    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
3337    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
3338    unsigned bits;
3339    ssize_t nw;
3340    unsigned char *header_block;
3341    enum lsqpack_enc_header_flags hflags;
3342    unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE];
3343
3344    stream->stream_flags &= ~STREAM_PUSHING;
3345    stream->stream_flags |= STREAM_NOPUSH;
3346
3347    /* TODO: Optimize for the common case: write directly to sm_buf and fall
3348     * back to a larger buffer if that fails.
3349     */
3350    prefix_sz = max_prefix_size;
3351    headers_sz = sizeof(buf) - max_prefix_size - max_push_size;
3352    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
3353                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
3354                &headers_sz, &stream->sm_hb_compl, &hflags);
3355
3356    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
3357    {
3358        if (qwh == QWH_ENOBUF)
3359            LSQ_INFO("not enough room for header block");
3360        else
3361            LSQ_WARN("internal error encoding and sending HTTP headers");
3362        return -1;
3363    }
3364
3365    if (hflags & LSQECH_REF_NEW_ENTRIES)
3366        stream->stream_flags |= STREAM_ENCODER_DEP;
3367
3368    if (stream->sm_promise)
3369    {
3370        assert(lsquic_stream_is_pushed(stream));
3371        bits = vint_val2bits(stream->sm_promise->pp_id);
3372        push_sz = 1 + (1 << bits);
3373        if (!stream_activate_hq_frame(stream,
3374                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
3375                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
3376            return -1;
3377        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
3378        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
3379                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
3380    }
3381    else
3382        push_sz = 0;
3383
3384    /* Construct contiguous header block buffer including HQ framing */
3385    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
3386    hblock_sz = push_sz + prefix_sz + headers_sz;
3387    if (!stream_activate_hq_frame(stream,
3388                stream->sm_payload + stream->sm_n_buffered + push_sz,
3389                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
3390        return -1;
3391
3392    if (qwh == QWH_FULL)
3393    {
3394        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
3395        if (lsquic_stream_write_avail(stream))
3396        {
3397            nw = stream_write_buf(stream, header_block, hblock_sz);
3398            if (nw < 0)
3399            {
3400                LSQ_WARN("cannot write to stream: %s", strerror(errno));
3401                return -1;
3402            }
3403            if ((size_t) nw == hblock_sz)
3404            {
3405                stream->stream_flags |= STREAM_HEADERS_SENT;
3406                stream_hblock_sent(stream);
3407                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
3408                return 0;
3409            }
3410            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
3411        }
3412        else
3413        {
3414            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
3415                                        "header block", hblock_sz);
3416            nw = 0;
3417        }
3418    }
3419    else
3420    {
3421        stream->sm_send_headers_state = SSHS_ENC_SENDING;
3422        nw = 0;
3423    }
3424
3425    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
3426    stream_wantwrite(stream, 1);
3427
3428    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
3429    if (!stream->sm_header_block)
3430    {
3431        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
3432            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
3433        return -1;
3434    }
3435    memcpy(stream->sm_header_block, header_block + (size_t) nw,
3436                                                hblock_sz - (size_t) nw);
3437    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
3438    stream->sm_hblock_off = 0;
3439    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
3440    return 0;
3441}
3442
3443
3444static int
3445send_headers_gquic (struct lsquic_stream *stream,
3446                            const struct lsquic_http_headers *headers, int eos)
3447{
3448    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
3449                stream->id, headers, eos, lsquic_stream_priority(stream));
3450    if (0 == s)
3451    {
3452        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
3453        stream->stream_flags |= STREAM_HEADERS_SENT;
3454        if (eos)
3455            stream->stream_flags |= STREAM_FIN_SENT;
3456        LSQ_INFO("sent headers");
3457    }
3458    else
3459        LSQ_WARN("could not send headers: %s", strerror(errno));
3460    return s;
3461}
3462
3463
3464int
3465lsquic_stream_send_headers (lsquic_stream_t *stream,
3466                            const lsquic_http_headers_t *headers, int eos)
3467{
3468    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3469            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
3470    {
3471        if (stream->sm_bflags & SMBF_IETF)
3472            return send_headers_ietf(stream, headers, eos);
3473        else
3474            return send_headers_gquic(stream, headers, eos);
3475    }
3476    else
3477    {
3478        LSQ_INFO("cannot send headers in this state");
3479        errno = EBADMSG;
3480        return -1;
3481    }
3482}
3483
3484
3485void
3486lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
3487{
3488    if (offset > stream->max_send_off)
3489    {
3490        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
3491        LSQ_DEBUG("update max send offset from 0x%"PRIX64" to "
3492            "0x%"PRIX64, stream->max_send_off, offset);
3493        stream->max_send_off = offset;
3494    }
3495    else
3496        LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old "
3497            "max send offset 0x%"PRIX64", ignoring", offset,
3498            stream->max_send_off);
3499}
3500
3501
3502/* This function is used to update offsets after handshake completes and we
3503 * learn of peer's limits from the handshake values.
3504 */
3505int
3506lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
3507{
3508    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
3509    if (offset > stream->max_send_off)
3510    {
3511        lsquic_stream_window_update(stream, offset);
3512        return 0;
3513    }
3514    else if (offset < stream->tosend_off)
3515    {
3516        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
3517            "data already sent on this stream (%"PRIu64" bytes)", offset,
3518            stream->tosend_off);
3519        return -1;
3520    }
3521    else
3522    {
3523        stream->max_send_off = offset;
3524        return 0;
3525    }
3526}
3527
3528
3529void
3530lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
3531{
3532    lsquic_stream_reset_ext(stream, error_code, 1);
3533}
3534
3535
3536void
3537lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
3538                         int do_close)
3539{
3540    if ((stream->stream_flags & STREAM_RST_SENT)
3541                                    || (stream->sm_qflags & SMQF_SEND_RST))
3542    {
3543        LSQ_INFO("reset already sent");
3544        return;
3545    }
3546
3547    SM_HISTORY_APPEND(stream, SHE_RESET);
3548
3549    LSQ_INFO("reset, error code %"PRIu64, error_code);
3550    stream->error_code = error_code;
3551
3552    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
3553        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
3554                                                        next_send_stream);
3555    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
3556    stream->sm_qflags |= SMQF_SEND_RST;
3557
3558    if (stream->sm_qflags & SMQF_QPACK_DEC)
3559    {
3560        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
3561        stream->sm_qflags |= ~SMQF_QPACK_DEC;
3562    }
3563
3564    drop_buffered_data(stream);
3565    maybe_elide_stream_frames(stream);
3566    maybe_schedule_call_on_close(stream);
3567
3568    if (do_close)
3569        lsquic_stream_close(stream);
3570    else
3571        maybe_conn_to_tickable_if_writeable(stream, 1);
3572}
3573
3574
3575lsquic_stream_id_t
3576lsquic_stream_id (const lsquic_stream_t *stream)
3577{
3578    return stream->id;
3579}
3580
3581
3582#if !defined(NDEBUG) && __GNUC__
3583__attribute__((weak))
3584#endif
3585struct lsquic_conn *
3586lsquic_stream_conn (const lsquic_stream_t *stream)
3587{
3588    return stream->conn_pub->lconn;
3589}
3590
3591
3592int
3593lsquic_stream_close (lsquic_stream_t *stream)
3594{
3595    LSQ_DEBUG("lsquic_stream_close() called");
3596    SM_HISTORY_APPEND(stream, SHE_CLOSE);
3597    if (lsquic_stream_is_closed(stream))
3598    {
3599        LSQ_INFO("Attempt to close an already-closed stream");
3600        errno = EBADF;
3601        return -1;
3602    }
3603    maybe_stream_shutdown_write(stream);
3604    stream_shutdown_read(stream);
3605    maybe_schedule_call_on_close(stream);
3606    maybe_finish_stream(stream);
3607    if (!(stream->stream_flags & STREAM_DELAYED_SW))
3608        maybe_conn_to_tickable_if_writeable(stream, 1);
3609    return 0;
3610}
3611
3612
3613#ifndef NDEBUG
3614#if __GNUC__
3615__attribute__((weak))
3616#endif
3617#endif
3618void
3619lsquic_stream_acked (struct lsquic_stream *stream,
3620                                            enum quic_frame_type frame_type)
3621{
3622    assert(stream->n_unacked);
3623    --stream->n_unacked;
3624    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
3625    if (frame_type == QUIC_FRAME_RST_STREAM)
3626    {
3627        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
3628        LSQ_DEBUG("RESET that we sent has been acked by peer");
3629        stream->stream_flags |= STREAM_RST_ACKED;
3630    }
3631    if (0 == stream->n_unacked)
3632        maybe_finish_stream(stream);
3633}
3634
3635
3636void
3637lsquic_stream_push_req (lsquic_stream_t *stream,
3638                        struct uncompressed_headers *push_req)
3639{
3640    assert(!stream->push_req);
3641    stream->push_req = push_req;
3642    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
3643}
3644
3645
3646int
3647lsquic_stream_is_pushed (const lsquic_stream_t *stream)
3648{
3649    enum stream_id_type sit;
3650
3651    if (stream->sm_bflags & SMBF_IETF)
3652    {
3653        sit = stream->id & SIT_MASK;
3654        return sit == SIT_UNI_SERVER;
3655    }
3656    else
3657        return 1 & ~stream->id;
3658}
3659
3660
3661int
3662lsquic_stream_push_info (const lsquic_stream_t *stream,
3663                          lsquic_stream_id_t *ref_stream_id, void **hset)
3664{
3665    if (lsquic_stream_is_pushed(stream))
3666    {
3667        assert(stream->push_req);
3668        *ref_stream_id = stream->push_req->uh_stream_id;
3669        *hset          = stream->push_req->uh_hset;
3670        return 0;
3671    }
3672    else
3673        return -1;
3674}
3675
3676
3677int
3678lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
3679{
3680    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3681                                    && !(stream->stream_flags & STREAM_HAVE_UH))
3682    {
3683        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3684        LSQ_DEBUG("received uncompressed headers");
3685        stream->stream_flags |= STREAM_HAVE_UH;
3686        if (uh->uh_flags & UH_FIN)
3687        {
3688            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3689             * mark request as done:
3690             */
3691            if (stream->sm_bflags & SMBF_IETF)
3692                assert((stream->sm_bflags & SMBF_SERVER)
3693                                            && lsquic_stream_is_pushed(stream));
3694            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3695        }
3696        stream->uh = uh;
3697        if (uh->uh_oth_stream_id == 0)
3698        {
3699            if (uh->uh_weight)
3700                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3701        }
3702        else
3703            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3704                                                        uh->uh_oth_stream_id);
3705        return 0;
3706    }
3707    else
3708    {
3709        LSQ_ERROR("received unexpected uncompressed headers");
3710        return -1;
3711    }
3712}
3713
3714
3715unsigned
3716lsquic_stream_priority (const lsquic_stream_t *stream)
3717{
3718    return 256 - stream->sm_priority;
3719}
3720
3721
3722int
3723lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
3724{
3725    /* The user should never get a reference to the special streams,
3726     * but let's check just in case:
3727     */
3728    if (lsquic_stream_is_critical(stream))
3729        return -1;
3730    if (priority < 1 || priority > 256)
3731        return -1;
3732    stream->sm_priority = 256 - priority;
3733    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
3734    LSQ_DEBUG("set priority to %u", priority);
3735    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
3736    return 0;
3737}
3738
3739
3740static int
3741maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
3742{
3743    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3744                            && (stream->stream_flags & STREAM_HEADERS_SENT))
3745    {
3746        /* We need to send headers only if we are a) using HEADERS stream
3747         * and b) we already sent initial headers.  If initial headers
3748         * have not been sent yet, stream priority will be sent in the
3749         * HEADERS frame.
3750         */
3751        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
3752                                                stream->id, 0, 0, priority);
3753    }
3754    else
3755        return 0;
3756}
3757
3758
3759static int
3760send_priority_ietf (struct lsquic_stream *stream, unsigned priority)
3761{
3762    LSQ_WARN("%s: TODO", __func__);     /* TODO */
3763    return -1;
3764}
3765
3766
3767int
3768lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
3769{
3770    if (0 == lsquic_stream_set_priority_internal(stream, priority))
3771    {
3772        if (stream->sm_bflags & SMBF_IETF)
3773            return send_priority_ietf(stream, priority);
3774        else
3775            return maybe_send_priority_gquic(stream, priority);
3776    }
3777    else
3778        return -1;
3779}
3780
3781
3782lsquic_stream_ctx_t *
3783lsquic_stream_get_ctx (const lsquic_stream_t *stream)
3784{
3785    return stream->st_ctx;
3786}
3787
3788
3789int
3790lsquic_stream_refuse_push (lsquic_stream_t *stream)
3791{
3792    if (lsquic_stream_is_pushed(stream)
3793            && !(stream->sm_qflags & SMQF_SEND_RST)
3794            && !(stream->stream_flags & STREAM_RST_SENT))
3795    {
3796        LSQ_DEBUG("refusing pushed stream: send reset");
3797        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
3798        return 0;
3799    }
3800    else
3801        return -1;
3802}
3803
3804
3805size_t
3806lsquic_stream_mem_used (const struct lsquic_stream *stream)
3807{
3808    size_t size;
3809
3810    size = sizeof(stream);
3811    if (stream->sm_buf)
3812        size += stream->sm_n_allocated;
3813    if (stream->data_in)
3814        size += stream->data_in->di_if->di_mem_used(stream->data_in);
3815
3816    return size;
3817}
3818
3819
3820const lsquic_cid_t *
3821lsquic_stream_cid (const struct lsquic_stream *stream)
3822{
3823    return LSQUIC_LOG_CONN_ID;
3824}
3825
3826
3827void
3828lsquic_stream_dump_state (const struct lsquic_stream *stream)
3829{
3830    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
3831                                                    stream->read_offset);
3832    stream->data_in->di_if->di_dump_state(stream->data_in);
3833}
3834
3835
3836void *
3837lsquic_stream_get_hset (struct lsquic_stream *stream)
3838{
3839    void *hset;
3840
3841    if (lsquic_stream_is_reset(stream))
3842    {
3843        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
3844        errno = ECONNRESET;
3845        return NULL;
3846    }
3847
3848    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
3849                                && (stream->stream_flags & STREAM_HAVE_UH)))
3850    {
3851        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
3852                                                        stream->stream_flags);
3853        return NULL;
3854    }
3855
3856    if (!stream->uh)
3857    {
3858        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
3859        return NULL;
3860    }
3861
3862    if (stream->uh->uh_flags & UH_H1H)
3863    {
3864        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
3865        return NULL;
3866    }
3867
3868    hset = stream->uh->uh_hset;
3869    stream->uh->uh_hset = NULL;
3870    destroy_uh(stream);
3871    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
3872    {
3873        stream->stream_flags |= STREAM_FIN_REACHED;
3874        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
3875    }
3876    LSQ_DEBUG("return header set");
3877    return hset;
3878}
3879
3880
3881/* GQUIC-only function */
3882int
3883lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id)
3884{
3885    return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE
3886        || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http);
3887}
3888
3889
3890int
3891lsquic_stream_is_critical (const struct lsquic_stream *stream)
3892{
3893    return stream->sm_bflags & SMBF_CRITICAL;
3894}
3895
3896
3897void
3898lsquic_stream_set_stream_if (struct lsquic_stream *stream,
3899           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
3900{
3901    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
3902    stream->stream_if    = stream_if;
3903    stream->sm_onnew_arg = stream_if_ctx;
3904    LSQ_DEBUG("switched interface");
3905    assert(stream->stream_flags & STREAM_ONNEW_DONE);
3906    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
3907                                                      stream);
3908}
3909
3910
3911static int
3912update_type_hist_and_check (struct hq_filter *filter)
3913{
3914    /* 3-bit codes: */
3915    enum {
3916        CODE_UNSET,
3917        CODE_HEADER,    /* H    Header  */
3918        CODE_DATA,      /* D    Data    */
3919        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
3920    };
3921    static const unsigned valid_seqs[] = {
3922        /* Ordered by expected frequency */
3923        0123,   /* HD+  */
3924        012,    /* HD   */
3925        01,     /* H    */
3926        01231,  /* HD+H */
3927        0121,   /* HDH  */
3928    };
3929    unsigned code, i;
3930
3931    switch (filter->hqfi_type)
3932    {
3933    case HQFT_HEADERS:
3934        code = CODE_HEADER;
3935        break;
3936    case HQFT_DATA:
3937        code = CODE_DATA;
3938        break;
3939    default:
3940        /* Ignore unknown frames */
3941        return 0;
3942    }
3943
3944    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
3945        return -1;
3946
3947    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
3948    {
3949        filter->hqfi_hist_buf <<= 3;
3950        filter->hqfi_hist_buf |= CODE_PLUS;
3951        filter->hqfi_hist_idx++;
3952    }
3953    else if (filter->hqfi_hist_idx > 1
3954            && ((filter->hqfi_hist_buf >> 3) & 7) == code
3955            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
3956        /* Keep it at plus, do nothing */;
3957    else
3958    {
3959        filter->hqfi_hist_buf <<= 3;
3960        filter->hqfi_hist_buf |= code;
3961        filter->hqfi_hist_idx++;
3962    }
3963
3964    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
3965        if (filter->hqfi_hist_buf == valid_seqs[i])
3966            return 0;
3967
3968    return -1;
3969}
3970
3971
3972static size_t
3973hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
3974{
3975    struct lsquic_stream *const stream = ctx;
3976    struct hq_filter *const filter = &stream->sm_hq_filter;
3977    const unsigned char *p = buf, *prev;
3978    const unsigned char *const end = buf + sz;
3979    struct lsquic_conn *lconn;
3980    enum lsqpack_read_header_status rhs;
3981    int s;
3982
3983    while (p < end)
3984    {
3985        switch (filter->hqfi_state)
3986        {
3987        case HQFI_STATE_FRAME_HEADER_BEGIN:
3988            filter->hqfi_vint_state.vr2s_state = 0;
3989            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
3990            /* fall-through */
3991        case HQFI_STATE_FRAME_HEADER_CONTINUE:
3992            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state);
3993            if (s < 0)
3994                break;
3995            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
3996            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
3997            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
3998                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
3999                filter->hqfi_left);
4000            if (0 != update_type_hist_and_check(filter))
4001            {
4002                lconn = stream->conn_pub->lconn;
4003                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4004                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
4005                    filter->hqfi_hist_buf);
4006                lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED,
4007                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
4008                    stream->id);
4009                goto end;
4010            }
4011            if (filter->hqfi_type == HQFT_HEADERS)
4012            {
4013                if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS))
4014                    filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS;
4015                else
4016                {
4017                    filter->hqfi_type = (1ull << 62) - 1;
4018                    LSQ_DEBUG("Ignoring HEADERS frame");
4019                }
4020            }
4021            if (filter->hqfi_left > 0)
4022            {
4023                if (filter->hqfi_type == HQFT_DATA)
4024                    goto end;
4025                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
4026                {
4027                    lconn = stream->conn_pub->lconn;
4028                    if (stream->sm_bflags & SMBF_SERVER)
4029                        lconn->cn_if->ci_abort_error(lconn, 1,
4030                            HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame "
4031                            "on stream %"PRIu64" (clients are not supposed to "
4032                            "send those)", stream->id);
4033                    else
4034                        /* Our client implementation does not support server
4035                         * push.
4036                         */
4037                        lconn->cn_if->ci_abort_error(lconn, 1,
4038                            HEC_FRAME_UNEXPECTED,
4039                            "Received PUSH_PROMISE frame (not supported)"
4040                            "on stream %"PRIu64, stream->id);
4041                    goto end;
4042                }
4043            }
4044            else
4045            {
4046                switch (filter->hqfi_type)
4047                {
4048                case HQFT_CANCEL_PUSH:
4049                case HQFT_GOAWAY:
4050                case HQFT_HEADERS:
4051                case HQFT_MAX_PUSH_ID:
4052                case HQFT_PUSH_PROMISE:
4053                case HQFT_SETTINGS:
4054                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4055                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
4056                                                            filter->hqfi_type);
4057                    abort_connection(stream);   /* XXX Overkill? */
4058                    goto end;
4059                default:
4060                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4061                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4062                    break;
4063                }
4064            }
4065            break;
4066        case HQFI_STATE_READING_PAYLOAD:
4067            if (filter->hqfi_type == HQFT_DATA)
4068                goto end;
4069            sz = filter->hqfi_left;
4070            if (sz > (uintptr_t) (end - p))
4071                sz = (uintptr_t) (end - p);
4072            switch (filter->hqfi_type)
4073            {
4074            case HQFT_HEADERS:
4075                prev = p;
4076                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
4077                {
4078                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4079                    rhs = lsquic_qdh_header_in_begin(
4080                                stream->conn_pub->u.ietf.qdh,
4081                                stream, filter->hqfi_left, &p, sz);
4082                }
4083                else
4084                    rhs = lsquic_qdh_header_in_continue(
4085                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
4086                assert(p > prev || LQRHS_ERROR == rhs);
4087                filter->hqfi_left -= p - prev;
4088                if (filter->hqfi_left == 0)
4089                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4090                switch (rhs)
4091                {
4092                case LQRHS_DONE:
4093                    assert(filter->hqfi_left == 0);
4094                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4095                    break;
4096                case LQRHS_NEED:
4097                    stream->sm_qflags |= SMQF_QPACK_DEC;
4098                    break;
4099                case LQRHS_BLOCKED:
4100                    stream->sm_qflags |= SMQF_QPACK_DEC;
4101                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
4102                    goto end;
4103                default:
4104                    assert(LQRHS_ERROR == rhs);
4105                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4106                    LSQ_INFO("error processing header block");
4107                    abort_connection(stream);   /* XXX Overkill? */
4108                    goto end;
4109                }
4110                break;
4111            default:
4112                /* Simply skip unknown frame type payload for now */
4113                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4114                p += sz;
4115                filter->hqfi_left -= sz;
4116                if (filter->hqfi_left == 0)
4117                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4118                break;
4119            }
4120            break;
4121        default:
4122            assert(0);
4123            goto end;
4124        }
4125    }
4126
4127  end:
4128    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4129    {
4130        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4131                                                        filter->hqfi_state);
4132        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4133/* From [draft-ietf-quic-http-16] Section 3.1:
4134 *               When a stream terminates cleanly, if the last frame on
4135 * the stream was truncated, this MUST be treated as a connection error
4136 * (see HTTP_MALFORMED_FRAME in Section 8.1).
4137 */
4138        abort_connection(stream);
4139    }
4140
4141    return p - buf;
4142}
4143
4144
4145static int
4146hq_filter_readable_now (const struct lsquic_stream *stream)
4147{
4148    const struct hq_filter *const filter = &stream->sm_hq_filter;
4149
4150    return (filter->hqfi_type == HQFT_DATA
4151                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4152        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4153        || stream->uh
4154        || (stream->stream_flags & STREAM_FIN_REACHED)
4155    ;
4156}
4157
4158
4159static int
4160hq_filter_readable (struct lsquic_stream *stream)
4161{
4162    struct hq_filter *const filter = &stream->sm_hq_filter;
4163    struct read_frames_status rfs;
4164
4165    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
4166        return 0;
4167
4168    if (!hq_filter_readable_now(stream))
4169    {
4170        rfs = read_data_frames(stream, 0, hq_read, stream);
4171        if (rfs.total_nread == 0)
4172        {
4173            if (rfs.error)
4174            {
4175                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4176                abort_connection(stream);   /* XXX Overkill? */
4177                return 1;   /* Collect error */
4178            }
4179            return 0;
4180        }
4181    }
4182
4183    return hq_filter_readable_now(stream);
4184}
4185
4186
4187static size_t
4188hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
4189{
4190    struct hq_filter *const filter = &stream->sm_hq_filter;
4191    size_t nr;
4192
4193    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4194                                            && filter->hqfi_type == HQFT_DATA))
4195    {
4196        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
4197                            data_frame->df_size - data_frame->df_read_off,
4198                            data_frame->df_fin);
4199        if (nr)
4200        {
4201            stream->read_offset += nr;
4202            stream_consumed_bytes(stream);
4203        }
4204    }
4205    else
4206        nr = 0;
4207
4208    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
4209    {
4210        data_frame->df_read_off += nr;
4211        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4212                                        && filter->hqfi_type == HQFT_DATA)
4213            return MIN(filter->hqfi_left,
4214                    (unsigned) data_frame->df_size - data_frame->df_read_off);
4215        else
4216        {
4217            assert(data_frame->df_read_off == data_frame->df_size);
4218            return 0;
4219        }
4220    }
4221    else
4222    {
4223        data_frame->df_read_off = data_frame->df_size;
4224        return 0;
4225    }
4226}
4227
4228
4229static void
4230hq_decr_left (struct lsquic_stream *stream, size_t read)
4231{
4232    struct hq_filter *const filter = &stream->sm_hq_filter;
4233
4234    if (read)
4235    {
4236        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4237                                            && filter->hqfi_type == HQFT_DATA);
4238        assert(read <= filter->hqfi_left);
4239    }
4240
4241    filter->hqfi_left -= read;
4242    if (0 == filter->hqfi_left)
4243        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4244}
4245
4246
4247struct qpack_dec_hdl *
4248lsquic_stream_get_qdh (const struct lsquic_stream *stream)
4249{
4250    return stream->conn_pub->u.ietf.qdh;
4251}
4252
4253
4254/* These are IETF QUIC states */
4255enum stream_state_sending
4256lsquic_stream_sending_state (const struct lsquic_stream *stream)
4257{
4258    if (0 == (stream->stream_flags & STREAM_RST_SENT))
4259    {
4260        if (stream->stream_flags & STREAM_FIN_SENT)
4261        {
4262            if (stream->n_unacked)
4263                return SSS_DATA_SENT;
4264            else
4265                return SSS_DATA_RECVD;
4266        }
4267        else
4268        {
4269            if (stream->tosend_off
4270                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
4271                return SSS_SEND;
4272            else
4273                return SSS_READY;
4274        }
4275    }
4276    else if (stream->stream_flags & STREAM_RST_ACKED)
4277        return SSS_RESET_RECVD;
4278    else
4279        return SSS_RESET_SENT;
4280}
4281
4282
4283const char *const lsquic_sss2str[] =
4284{
4285    [SSS_READY]        =  "Ready",
4286    [SSS_SEND]         =  "Send",
4287    [SSS_DATA_SENT]    =  "Data Sent",
4288    [SSS_RESET_SENT]   =  "Reset Sent",
4289    [SSS_DATA_RECVD]   =  "Data Recvd",
4290    [SSS_RESET_RECVD]  =  "Reset Recvd",
4291};
4292
4293
4294const char *const lsquic_ssr2str[] =
4295{
4296    [SSR_RECV]         =  "Recv",
4297    [SSR_SIZE_KNOWN]   =  "Size Known",
4298    [SSR_DATA_RECVD]   =  "Data Recvd",
4299    [SSR_RESET_RECVD]  =  "Reset Recvd",
4300    [SSR_DATA_READ]    =  "Data Read",
4301    [SSR_RESET_READ]   =  "Reset Read",
4302};
4303
4304
4305/* These are IETF QUIC states */
4306enum stream_state_receiving
4307lsquic_stream_receiving_state (struct lsquic_stream *stream)
4308{
4309    uint64_t n_bytes;
4310
4311    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
4312    {
4313        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
4314            return SSR_RECV;
4315        if (stream->stream_flags & STREAM_FIN_REACHED)
4316            return SSR_DATA_READ;
4317        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
4318        {
4319            n_bytes = stream->data_in->di_if->di_readable_bytes(
4320                                    stream->data_in, stream->read_offset);
4321            if (stream->read_offset + n_bytes == stream->sm_fin_off)
4322            {
4323                stream->stream_flags |= STREAM_DATA_RECVD;
4324                return SSR_DATA_RECVD;
4325            }
4326            else
4327                return SSR_SIZE_KNOWN;
4328        }
4329        else
4330            return SSR_DATA_RECVD;
4331    }
4332    else if (stream->stream_flags & STREAM_RST_READ)
4333        return SSR_RESET_READ;
4334    else
4335        return SSR_RESET_RECVD;
4336}
4337
4338
4339void
4340lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
4341{
4342    struct hq_filter *const filter = &stream->sm_hq_filter;
4343
4344    assert(stream->sm_qflags & SMQF_QPACK_DEC);
4345    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
4346
4347    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
4348    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
4349    LSQ_DEBUG("QPACK decoder unblocked");
4350}
4351
4352
4353int
4354lsquic_stream_is_rejected (const struct lsquic_stream *stream)
4355{
4356    return stream->stream_flags & STREAM_SS_RECVD;
4357}
4358
4359
4360int
4361lsquic_stream_can_push (const struct lsquic_stream *stream)
4362{
4363    if (lsquic_stream_is_pushed(stream))
4364        return 0;
4365    else if (stream->sm_bflags & SMBF_IETF)
4366        return (stream->sm_bflags & SMBF_USE_HEADERS)
4367            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
4368            && stream->sm_send_headers_state == SSHS_BEGIN
4369            ;
4370    else
4371        return 1;
4372}
4373
4374
4375static size_t
4376dp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4377{
4378    struct lsquic_stream *const stream = lsqr_ctx;
4379    unsigned char *dst = buf;
4380    unsigned char *const end = buf + count;
4381    size_t len;
4382
4383    len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off),
4384                                                        (size_t) (end - dst));
4385    memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len);
4386    stream->sm_dup_push_off += len;
4387
4388    if (stream->sm_dup_push_len == stream->sm_dup_push_off)
4389        LSQ_DEBUG("finish writing duplicate push");
4390
4391    return len;
4392}
4393
4394
4395static size_t
4396dp_reader_size (void *lsqr_ctx)
4397{
4398    struct lsquic_stream *const stream = lsqr_ctx;
4399
4400    return stream->sm_dup_push_len - stream->sm_dup_push_off;
4401}
4402
4403
4404static void
4405init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader)
4406{
4407    reader->lsqr_read = dp_reader_read;
4408    reader->lsqr_size = dp_reader_size;
4409    reader->lsqr_ctx = stream;
4410}
4411
4412
4413static void
4414on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4415{
4416    struct lsquic_reader dp_reader;
4417    ssize_t nw;
4418    int want_write;
4419
4420    assert(stream->sm_dup_push_off < stream->sm_dup_push_len);
4421
4422    init_dp_reader(stream, &dp_reader);
4423    nw = stream_write(stream, &dp_reader);
4424    if (nw > 0)
4425    {
4426        LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)",
4427            nw, stream->sm_dup_push_off == stream->sm_dup_push_len ?
4428            "done" : "not done");
4429        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4430        {
4431            /* Restore want_write flag */
4432            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4433            if (want_write != stream->sm_saved_want_write)
4434                (void) lsquic_stream_wantwrite(stream,
4435                                                stream->sm_saved_want_write);
4436        }
4437    }
4438    else if (nw < 0)
4439    {
4440        LSQ_WARN("could not write duplicate push (wrapper)");
4441        /* XXX What should happen if we hit an error? TODO */
4442    }
4443}
4444
4445
4446int
4447lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id)
4448{
4449    struct lsquic_reader dp_reader;
4450    unsigned bits, len;
4451    ssize_t nw;
4452
4453    assert(stream->sm_bflags & SMBF_IETF);
4454    assert(lsquic_stream_can_push(stream));
4455
4456    bits = vint_val2bits(push_id);
4457    len = 1 << bits;
4458
4459    if (!stream_activate_hq_frame(stream,
4460            stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH,
4461            SHF_FIXED_SIZE, len))
4462        return -1;
4463
4464    stream->stream_flags |= STREAM_PUSHING;
4465
4466    stream->sm_dup_push_len = len;
4467    stream->sm_dup_push_off = 0;
4468    vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits);
4469
4470    init_dp_reader(stream, &dp_reader);
4471    nw = stream_write(stream, &dp_reader);
4472    if (nw > 0)
4473    {
4474        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4475            LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id);
4476        else
4477        {
4478            LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id);
4479            stream->stream_flags |= STREAM_NOPUSH;
4480            stream->sm_saved_want_write =
4481                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4482            stream_wantwrite(stream, 1);
4483        }
4484        return 0;
4485    }
4486    else
4487    {
4488        if (nw < 0)
4489            LSQ_WARN("failure writing DUPLICATE_PUSH");
4490        stream->stream_flags |= STREAM_NOPUSH;
4491        stream->stream_flags &= ~STREAM_PUSHING;
4492        return -1;
4493    }
4494}
4495
4496
4497static size_t
4498pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4499{
4500    struct push_promise *const promise = lsqr_ctx;
4501    unsigned char *dst = buf;
4502    unsigned char *const end = buf + count;
4503    size_t len;
4504
4505    while (dst < end)
4506    {
4507        switch (promise->pp_write_state)
4508        {
4509        case PPWS_ID0:
4510        case PPWS_ID1:
4511        case PPWS_ID2:
4512        case PPWS_ID3:
4513        case PPWS_ID4:
4514        case PPWS_ID5:
4515        case PPWS_ID6:
4516        case PPWS_ID7:
4517            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
4518            ++promise->pp_write_state;
4519            break;
4520        case PPWS_PFX0:
4521            *dst++ = 0;
4522            ++promise->pp_write_state;
4523            break;
4524        case PPWS_PFX1:
4525            *dst++ = 0;
4526            ++promise->pp_write_state;
4527            break;
4528        case PPWS_HBLOCK:
4529            len = MIN(promise->pp_content_len - promise->pp_write_off,
4530                        (size_t) (end - dst));
4531            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
4532                                                                        len);
4533            promise->pp_write_off += len;
4534            dst += len;
4535            if (promise->pp_content_len == promise->pp_write_off)
4536            {
4537                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
4538                    ": reset push state", promise->pp_id);
4539                promise->pp_write_state = PPWS_DONE;
4540            }
4541            goto end;
4542        default:
4543            goto end;
4544        }
4545    }
4546
4547  end:
4548    return dst - (unsigned char *) buf;
4549}
4550
4551
4552static size_t
4553pp_reader_size (void *lsqr_ctx)
4554{
4555    struct push_promise *const promise = lsqr_ctx;
4556    size_t size;
4557
4558    size = 0;
4559    switch (promise->pp_write_state)
4560    {
4561    case PPWS_ID0:
4562    case PPWS_ID1:
4563    case PPWS_ID2:
4564    case PPWS_ID3:
4565    case PPWS_ID4:
4566    case PPWS_ID5:
4567    case PPWS_ID6:
4568    case PPWS_ID7:
4569        size += 8 - promise->pp_write_state;
4570    case PPWS_PFX0:
4571        ++size;
4572        /* fall-through */
4573    case PPWS_PFX1:
4574        ++size;
4575        /* fall-through */
4576    case PPWS_HBLOCK:
4577        size += promise->pp_content_len - promise->pp_write_off;
4578        break;
4579    default:
4580        break;
4581    }
4582
4583    return size;
4584}
4585
4586
4587static void
4588init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
4589{
4590    reader->lsqr_read = pp_reader_read;
4591    reader->lsqr_size = pp_reader_size;
4592    reader->lsqr_ctx = promise;
4593}
4594
4595
4596static void
4597on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4598{
4599    struct lsquic_reader pp_reader;
4600    struct push_promise *promise;
4601    ssize_t nw;
4602    int want_write;
4603
4604    assert(stream_is_pushing_promise(stream));
4605
4606    promise = SLIST_FIRST(&stream->sm_promises);
4607    init_pp_reader(promise, &pp_reader);
4608    nw = stream_write(stream, &pp_reader);
4609    if (nw > 0)
4610    {
4611        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
4612            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
4613        if (promise->pp_write_state == PPWS_DONE)
4614        {
4615            /* Restore want_write flag */
4616            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4617            if (want_write != stream->sm_saved_want_write)
4618                (void) lsquic_stream_wantwrite(stream,
4619                                                stream->sm_saved_want_write);
4620        }
4621    }
4622    else if (nw < 0)
4623    {
4624        LSQ_WARN("could not write push promise (wrapper)");
4625        /* XXX What should happen if we hit an error? TODO */
4626    }
4627}
4628
4629
4630/* Success means that the push promise has been placed on sm_promises list and
4631 * the stream now owns it.  Failure means that the push promise should be
4632 * destroyed by the caller.
4633 *
4634 * A push promise is written immediately.  If it cannot be written to packets
4635 * or buffered whole, the stream is marked as unable to push further promises.
4636 */
4637int
4638lsquic_stream_push_promise (struct lsquic_stream *stream,
4639                                                struct push_promise *promise)
4640{
4641    struct lsquic_reader pp_reader;
4642    unsigned bits, len;
4643    ssize_t nw;
4644
4645    assert(stream->sm_bflags & SMBF_IETF);
4646    assert(lsquic_stream_can_push(stream));
4647
4648    bits = vint_val2bits(promise->pp_id);
4649    len = 1 << bits;
4650    promise->pp_write_state = 8 - len;
4651    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
4652                                                            bits, 1 << bits);
4653
4654    if (!stream_activate_hq_frame(stream,
4655                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
4656                SHF_FIXED_SIZE, pp_reader_size(promise)))
4657        return -1;
4658
4659    stream->stream_flags |= STREAM_PUSHING;
4660
4661    init_pp_reader(promise, &pp_reader);
4662    nw = stream_write(stream, &pp_reader);
4663    if (nw > 0)
4664    {
4665        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
4666        ++promise->pp_refcnt;
4667        if (promise->pp_write_state == PPWS_DONE)
4668            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
4669        else
4670        {
4671            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
4672                ", disable further pushing", promise->pp_id,
4673                promise->pp_write_state, promise->pp_write_off);
4674            stream->stream_flags |= STREAM_NOPUSH;
4675            stream->sm_saved_want_write =
4676                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4677            stream_wantwrite(stream, 1);
4678        }
4679        return 0;
4680    }
4681    else
4682    {
4683        if (nw < 0)
4684            LSQ_WARN("failure writing push promise");
4685        stream->stream_flags |= STREAM_NOPUSH;
4686        stream->stream_flags &= ~STREAM_PUSHING;
4687        return -1;
4688    }
4689}
4690