lsquic_stream.c revision a6cdaedb
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_varint.h"
39#include "lsquic_hq.h"
40#include "lsquic_hash.h"
41#include "lsquic_stream.h"
42#include "lsquic_conn_public.h"
43#include "lsquic_util.h"
44#include "lsquic_mm.h"
45#include "lsquic_headers_stream.h"
46#include "lsquic_conn.h"
47#include "lsquic_data_in_if.h"
48#include "lsquic_parse.h"
49#include "lsquic_packet_out.h"
50#include "lsquic_engine_public.h"
51#include "lsquic_senhist.h"
52#include "lsquic_pacer.h"
53#include "lsquic_cubic.h"
54#include "lsquic_bw_sampler.h"
55#include "lsquic_minmax.h"
56#include "lsquic_bbr.h"
57#include "lsquic_send_ctl.h"
58#include "lsquic_headers.h"
59#include "lsquic_ev_log.h"
60#include "lsquic_enc_sess.h"
61#include "lsqpack.h"
62#include "lsquic_frab_list.h"
63#include "lsquic_http1x_if.h"
64#include "lsquic_qdec_hdl.h"
65#include "lsquic_qenc_hdl.h"
66#include "lsquic_byteswap.h"
67#include "lsquic_h3_prio.h"
68#include "lsquic_ietf.h"
69#include "lsquic_push_promise.h"
70
71#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
72#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
73#define LSQUIC_LOG_STREAM_ID stream->id
74#include "lsquic_logger.h"
75
76#define MIN(a, b) ((a) < (b) ? (a) : (b))
77
78static void
79drop_frames_in (lsquic_stream_t *stream);
80
81static void
82maybe_schedule_call_on_close (lsquic_stream_t *stream);
83
84static int
85stream_wantread (lsquic_stream_t *stream, int is_want);
86
87static int
88stream_wantwrite (lsquic_stream_t *stream, int is_want);
89
90static ssize_t
91stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
92
93static ssize_t
94save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
95
96static int
97stream_flush (lsquic_stream_t *stream);
98
99static int
100stream_flush_nocheck (lsquic_stream_t *stream);
101
102static void
103maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
104
105enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
106
107static enum swtp_status
108stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
109
110static enum swtp_status
111stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
112
113static enum swtp_status
114stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
115
116static size_t
117stream_write_avail (struct lsquic_stream *);
118
119static size_t
120stream_write_avail_with_headers (struct lsquic_stream *);
121
122static int
123hq_filter_readable (struct lsquic_stream *stream);
124
125static void
126hq_decr_left (struct lsquic_stream *stream, size_t);
127
128static size_t
129hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
130
131static int
132stream_readable_non_http (struct lsquic_stream *stream);
133
134static int
135stream_readable_http_gquic (struct lsquic_stream *stream);
136
137static int
138stream_readable_http_ietf (struct lsquic_stream *stream);
139
140static ssize_t
141stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
142
143static size_t
144active_hq_frame_sizes (const struct lsquic_stream *);
145
146static void
147on_write_dp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
148
149static void
150on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
151
152static void
153stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
154
155const struct stream_filter_if hq_stream_filter_if =
156{
157    .sfi_readable   = hq_filter_readable,
158    .sfi_filter_df  = hq_filter_df,
159    .sfi_decr_left  = hq_decr_left,
160};
161
162
163#if LSQUIC_KEEP_STREAM_HISTORY
164/* These values are printable ASCII characters for ease of printing the
165 * whole history in a single line of a log message.
166 *
167 * The list of events is not exhaustive: only most interesting events
168 * are recorded.
169 */
170enum stream_history_event
171{
172    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
173    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
174    SHE_REACH_FIN          =  'a',
175    SHE_BLOCKED_OUT        =  'b',
176    SHE_CREATED            =  'C',
177    SHE_FRAME_IN           =  'd',
178    SHE_FRAME_OUT          =  'D',
179    SHE_RESET              =  'e',
180    SHE_WINDOW_UPDATE      =  'E',
181    SHE_FIN_IN             =  'f',
182    SHE_FINISHED           =  'F',
183    SHE_GOAWAY_IN          =  'g',
184    SHE_USER_WRITE_HEADER  =  'h',
185    SHE_HEADERS_IN         =  'H',
186    SHE_IF_SWITCH          =  'i',
187    SHE_ONCLOSE_SCHED      =  'l',
188    SHE_ONCLOSE_CALL       =  'L',
189    SHE_ONNEW              =  'N',
190    SHE_SET_PRIO           =  'p',
191    SHE_USER_READ          =  'r',
192    SHE_SHUTDOWN_READ      =  'R',
193    SHE_RST_IN             =  's',
194    SHE_SS_IN              =  'S',
195    SHE_RST_OUT            =  't',
196    SHE_RST_ACKED          =  'T',
197    SHE_FLUSH              =  'u',
198    SHE_USER_WRITE_DATA    =  'w',
199    SHE_SHUTDOWN_WRITE     =  'W',
200    SHE_CLOSE              =  'X',
201    SHE_DELAY_SW           =  'y',
202    SHE_FORCE_FINISH       =  'Z',
203};
204
205static void
206sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
207{
208    enum stream_history_event prev_event;
209    sm_hist_idx_t idx;
210    int plus;
211
212    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
213    plus = SHE_PLUS == stream->sm_hist_buf[idx];
214    idx = (idx - plus) & SM_HIST_IDX_MASK;
215    prev_event = stream->sm_hist_buf[idx];
216
217    if (prev_event == sh_event && plus)
218        return;
219
220    if (prev_event == sh_event)
221        sh_event = SHE_PLUS;
222    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
223
224    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
225        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
226                                                        stream->sm_hist_buf);
227}
228
229
230#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
231#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
232        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
233            LSQ_DEBUG("history: [%.*s]",                                    \
234                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
235                (stream)->sm_hist_buf);                                     \
236    } while (0)
237#else
238#   define SM_HISTORY_APPEND(stream, event)
239#   define SM_HISTORY_DUMP_REMAINING(stream)
240#endif
241
242
243static int
244stream_inside_callback (const lsquic_stream_t *stream)
245{
246    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
247}
248
249
250static void
251maybe_conn_to_tickable (lsquic_stream_t *stream)
252{
253    if (!stream_inside_callback(stream))
254        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
255                                           stream->conn_pub->lconn);
256}
257
258
259/* Here, "readable" means that the user is able to read from the stream. */
260static void
261maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
262{
263    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
264    {
265        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
266                                           stream->conn_pub->lconn);
267    }
268}
269
270
271/* Here, "writeable" means that data can be put into packets to be
272 * scheduled to be sent out.
273 *
274 * If `check_can_send' is false, it means that we do not need to check
275 * whether packets can be sent.  This check was already performed when
276 * we packetized stream data.
277 */
278static void
279maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
280                                                    int check_can_send)
281{
282    if (!stream_inside_callback(stream) &&
283            (!check_can_send
284             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
285          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
286    {
287        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
288                                           stream->conn_pub->lconn);
289    }
290}
291
292
293static int
294stream_stalled (const lsquic_stream_t *stream)
295{
296    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
297           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
298                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
299}
300
301
302static size_t
303stream_stream_frame_header_sz (const struct lsquic_stream *stream,
304                                                            unsigned data_sz)
305{
306    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
307                                    stream->id, stream->tosend_off, data_sz);
308}
309
310
311static size_t
312stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
313                                                    unsigned data_sz_IGNORED)
314{
315    return stream->conn_pub->lconn->cn_pf
316         ->pf_calc_crypto_frame_header_sz(stream->tosend_off);
317}
318
319
320/* GQUIC-only function */
321static int
322stream_is_hsk (const struct lsquic_stream *stream)
323{
324    if (stream->sm_bflags & SMBF_IETF)
325        return 0;
326    else
327        return stream->id == LSQUIC_GQUIC_STREAM_HANDSHAKE;
328}
329
330
331static struct lsquic_stream *
332stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
333           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
334           enum stream_ctor_flags ctor_flags)
335{
336    struct lsquic_stream *stream;
337
338    stream = calloc(1, sizeof(*stream));
339    if (!stream)
340        return NULL;
341
342    if (ctor_flags & SCF_USE_DI_HASH)
343        stream->data_in = data_in_hash_new(conn_pub, id, 0);
344    else
345        stream->data_in = data_in_nocopy_new(conn_pub, id);
346    if (!stream->data_in)
347    {
348        free(stream);
349        return NULL;
350    }
351
352    stream->id        = id;
353    stream->stream_if = stream_if;
354    stream->conn_pub  = conn_pub;
355    stream->sm_onnew_arg = stream_if_ctx;
356    stream->sm_write_avail = stream_write_avail;
357
358    if ((ctor_flags & (SCF_IETF|SCF_CRITICAL)) == SCF_IETF)
359    {
360        if (0 == lsquic_prio_tree_add_stream(conn_pub->u.ietf.prio_tree,
361                                                    stream, H3ET_ROOT, 0, 0))
362            stream->sm_qflags |= SMQF_H3_PRIO;
363        else
364        {
365            stream->data_in->di_if->di_destroy(stream->data_in);
366            free(stream);
367            return NULL;
368        }
369    }
370
371    STAILQ_INIT(&stream->sm_hq_frames);
372
373    stream->sm_bflags |= ctor_flags & ((1 << (N_SMBF_FLAGS - 1)) - 1);
374    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
375        stream->sm_bflags |= SMBF_SERVER;
376
377    return stream;
378}
379
380
381/* TODO: The logic to figure out whether the stream is connection limited
382 * should be taken out of the constructor.  The caller should specify this
383 * via one of enum stream_ctor_flags.
384 */
385lsquic_stream_t *
386lsquic_stream_new (lsquic_stream_id_t id,
387        struct lsquic_conn_public *conn_pub,
388        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
389        unsigned initial_window, uint64_t initial_send_off,
390        enum stream_ctor_flags ctor_flags)
391{
392    lsquic_cfcw_t *cfcw;
393    lsquic_stream_t *stream;
394
395    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
396                                                                ctor_flags);
397    if (!stream)
398        return NULL;
399
400    if (!initial_window)
401        initial_window = 16 * 1024;
402
403    if (ctor_flags & SCF_IETF)
404    {
405        cfcw = &conn_pub->cfcw;
406        stream->sm_bflags |= SMBF_CONN_LIMITED;
407        if (ctor_flags & SCF_HTTP)
408        {
409            stream->sm_write_avail = stream_write_avail_with_headers;
410            stream->sm_readable = stream_readable_http_ietf;
411            stream->sm_sfi = &hq_stream_filter_if;
412        }
413        else
414            stream->sm_readable = stream_readable_non_http;
415        lsquic_stream_set_priority_internal(stream,
416                                            LSQUIC_STREAM_DEFAULT_PRIO);
417        stream->sm_write_to_packet = stream_write_to_packet_std;
418    }
419    else
420    {
421        if (lsquic_stream_id_is_critical(ctor_flags & SCF_HTTP, id))
422            cfcw = NULL;
423        else
424        {
425            cfcw = &conn_pub->cfcw;
426            stream->sm_bflags |= SMBF_CONN_LIMITED;
427            lsquic_stream_set_priority_internal(stream,
428                                                LSQUIC_STREAM_DEFAULT_PRIO);
429        }
430        if (stream->sm_bflags & SMBF_USE_HEADERS)
431            stream->sm_readable = stream_readable_http_gquic;
432        else
433            stream->sm_readable = stream_readable_non_http;
434        if (stream_is_hsk(stream))
435            stream->sm_write_to_packet = stream_write_to_packet_hsk;
436        else
437            stream->sm_write_to_packet = stream_write_to_packet_std;
438    }
439
440    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
441    stream->max_send_off = initial_send_off;
442    LSQ_DEBUG("created stream");
443    SM_HISTORY_APPEND(stream, SHE_CREATED);
444    stream->sm_frame_header_sz = stream_stream_frame_header_sz;
445    if (ctor_flags & SCF_CALL_ON_NEW)
446        lsquic_stream_call_on_new(stream);
447    return stream;
448}
449
450
451struct lsquic_stream *
452lsquic_stream_new_crypto (enum enc_level enc_level,
453        struct lsquic_conn_public *conn_pub,
454        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
455        enum stream_ctor_flags ctor_flags)
456{
457    struct lsquic_stream *stream;
458    lsquic_stream_id_t stream_id;
459
460    assert(ctor_flags & SCF_CRITICAL);
461
462    stream_id = ~0ULL - enc_level;
463    stream = stream_new_common(stream_id, conn_pub, stream_if,
464                                                stream_if_ctx, ctor_flags);
465    if (!stream)
466        return NULL;
467
468    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
469    stream->sm_enc_level = enc_level;
470    /* TODO: why have limit in crypto stream?  Set it to UINT64_MAX? */
471    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
472    stream->max_send_off = 16 * 1024;
473    LSQ_DEBUG("created crypto stream");
474    SM_HISTORY_APPEND(stream, SHE_CREATED);
475    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
476    stream->sm_write_to_packet = stream_write_to_packet_crypto;
477    stream->sm_readable = stream_readable_non_http;
478    if (ctor_flags & SCF_CALL_ON_NEW)
479        lsquic_stream_call_on_new(stream);
480    return stream;
481}
482
483
484void
485lsquic_stream_call_on_new (lsquic_stream_t *stream)
486{
487    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
488    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
489    {
490        LSQ_DEBUG("calling on_new_stream");
491        SM_HISTORY_APPEND(stream, SHE_ONNEW);
492        stream->stream_flags |= STREAM_ONNEW_DONE;
493        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
494                                                          stream);
495    }
496}
497
498
499static void
500decr_conn_cap (struct lsquic_stream *stream, size_t incr)
501{
502    if (stream->sm_bflags & SMBF_CONN_LIMITED)
503    {
504        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
505        stream->conn_pub->conn_cap.cc_sent -= incr;
506    }
507}
508
509
510static void
511maybe_resize_stream_buffer (struct lsquic_stream *stream)
512{
513    assert(0 == stream->sm_n_buffered);
514
515    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
516    {
517        free(stream->sm_buf);
518        stream->sm_buf = NULL;
519        stream->sm_n_allocated = 0;
520    }
521    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
522        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
523}
524
525
526static void
527drop_buffered_data (struct lsquic_stream *stream)
528{
529    decr_conn_cap(stream, stream->sm_n_buffered);
530    stream->sm_n_buffered = 0;
531    maybe_resize_stream_buffer(stream);
532    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
533        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
534}
535
536
537static void
538destroy_uh (struct lsquic_stream *stream)
539{
540    if (stream->uh)
541    {
542        if (stream->uh->uh_hset)
543            stream->conn_pub->enpub->enp_hsi_if
544                            ->hsi_discard_header_set(stream->uh->uh_hset);
545        free(stream->uh);
546        stream->uh = NULL;
547    }
548}
549
550
551void
552lsquic_stream_destroy (lsquic_stream_t *stream)
553{
554    struct push_promise *promise;
555    struct stream_hq_frame *shf;
556
557    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
558    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
559                                                            STREAM_ONNEW_DONE)
560    {
561        stream->stream_flags |= STREAM_ONCLOSE_DONE;
562        stream->stream_if->on_close(stream, stream->st_ctx);
563    }
564    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
565        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
566    if (stream->sm_qflags & SMQF_WANT_READ)
567        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
568    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
569        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
570    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
571        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
572    if (stream->sm_qflags & SMQF_QPACK_DEC)
573        lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream);
574    if (stream->sm_qflags & SMQF_H3_PRIO)
575        lsquic_prio_tree_remove_stream(stream->conn_pub->u.ietf.prio_tree,
576            stream, lsquic_time_now() /* XXX: do we have to call this? */);
577    drop_buffered_data(stream);
578    lsquic_sfcw_consume_rem(&stream->fc);
579    drop_frames_in(stream);
580    if (stream->push_req)
581    {
582        if (stream->push_req->uh_hset)
583            stream->conn_pub->enpub->enp_hsi_if
584                            ->hsi_discard_header_set(stream->push_req->uh_hset);
585        free(stream->push_req);
586    }
587    while ((promise = SLIST_FIRST(&stream->sm_promises)))
588    {
589        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
590        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
591    }
592    if (stream->sm_promise)
593    {
594        assert(stream->sm_promise->pp_pushed_stream == stream);
595        stream->sm_promise->pp_pushed_stream = NULL;
596        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
597    }
598    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
599        stream_hq_frame_put(stream, shf);
600    destroy_uh(stream);
601    free(stream->sm_buf);
602    free(stream->sm_header_block);
603    LSQ_DEBUG("destroyed stream");
604    SM_HISTORY_DUMP_REMAINING(stream);
605    free(stream);
606}
607
608
609static int
610stream_is_finished (const lsquic_stream_t *stream)
611{
612    return lsquic_stream_is_closed(stream)
613           /* n_unacked checks that no outgoing packets that reference this
614            * stream are outstanding:
615            */
616        && 0 == stream->n_unacked
617           /* This checks that no packets that reference this stream will
618            * become outstanding:
619            */
620        && 0 == (stream->sm_qflags & SMQF_SEND_RST)
621        && ((stream->stream_flags & STREAM_FORCE_FINISH)
622          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
623}
624
625
626static void
627maybe_finish_stream (lsquic_stream_t *stream)
628{
629    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
630                                                    stream_is_finished(stream))
631    {
632        LSQ_DEBUG("stream is now finished");
633        SM_HISTORY_APPEND(stream, SHE_FINISHED);
634        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
635            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
636                                                    next_service_stream);
637        stream->sm_qflags |= SMQF_FREE_STREAM;
638        stream->stream_flags |= STREAM_FINISHED;
639    }
640}
641
642
643static void
644maybe_schedule_call_on_close (lsquic_stream_t *stream)
645{
646    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
647                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
648            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
649            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
650    {
651        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
652            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
653                                                    next_service_stream);
654        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
655        LSQ_DEBUG("scheduled calling on_close");
656        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
657    }
658}
659
660
661void
662lsquic_stream_call_on_close (lsquic_stream_t *stream)
663{
664    assert(stream->stream_flags & STREAM_ONNEW_DONE);
665    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
666    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
667        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
668                                                    next_service_stream);
669    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
670    {
671        LSQ_DEBUG("calling on_close");
672        stream->stream_flags |= STREAM_ONCLOSE_DONE;
673        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
674        stream->stream_if->on_close(stream, stream->st_ctx);
675    }
676    else
677        assert(0);
678}
679
680
681static int
682stream_readable_non_http (struct lsquic_stream *stream)
683{
684    return stream->data_in->di_if->di_get_frame(stream->data_in,
685                                                stream->read_offset) != NULL;
686}
687
688
689static int
690stream_readable_http_gquic (struct lsquic_stream *stream)
691{
692    return (stream->stream_flags & STREAM_HAVE_UH)
693        && (stream->uh
694            ||  stream->data_in->di_if->di_get_frame(stream->data_in,
695                                                    stream->read_offset));
696}
697
698
699static int
700stream_readable_http_ietf (struct lsquic_stream *stream)
701{
702    return
703        /* If we have read the header set and the header set has not yet
704         * been read, the stream is readable.
705         */
706        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
707        ||
708        /* Alternatively, run the filter and check for payload availability. */
709        (stream->sm_sfi->sfi_readable(stream)
710            && (/* Running the filter may result in hitting FIN: */
711                (stream->stream_flags & STREAM_FIN_REACHED)
712                || stream->data_in->di_if->di_get_frame(stream->data_in,
713                                                    stream->read_offset)));
714}
715
716
717int
718lsquic_stream_readable (struct lsquic_stream *stream)
719{
720    /* A stream is readable if one of the following is true: */
721    return
722        /* - It is already finished: in that case, lsquic_stream_read() will
723         *   return 0.
724         */
725            (stream->stream_flags & STREAM_FIN_REACHED)
726        /* - The stream is reset, by either side.  In this case,
727         *   lsquic_stream_read() will return -1 (we want the user to be
728         *   able to collect the error).
729         */
730        ||  lsquic_stream_is_reset(stream)
731        /* Type-dependent readability check: */
732        ||  stream->sm_readable(stream);
733    ;
734}
735
736
737static size_t
738stream_write_avail (struct lsquic_stream *stream)
739{
740    uint64_t stream_avail, conn_avail;
741    size_t hq_frames_sz;
742
743    stream_avail = stream->max_send_off - stream->tosend_off
744                                                - stream->sm_n_buffered;
745
746    if (stream->sm_bflags & SMBF_CONN_LIMITED)
747    {
748        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
749        if (conn_avail < stream_avail)
750            stream_avail = conn_avail;
751    }
752
753    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
754                                        == (SMBF_IETF|SMBF_USE_HEADERS))
755    {
756        hq_frames_sz = active_hq_frame_sizes(stream);
757        if (hq_frames_sz == 0)
758            hq_frames_sz = 3;   /* Smallest new frame */
759
760        if (stream_avail > hq_frames_sz)
761            stream_avail -= hq_frames_sz;
762        else
763            stream_avail = 0;
764    }
765
766    return stream_avail;
767}
768
769
770static int
771stream_is_pushing_promise (const struct lsquic_stream *stream)
772{
773    return (stream->stream_flags & STREAM_PUSHING)
774        && SLIST_FIRST(&stream->sm_promises)
775        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
776        ;
777}
778
779
780/* To prevent deadlocks, ensure that when headers are sent, the bytes
781 * sent on the encoder stream are written first.
782 *
783 * XXX If the encoder is set up in non-risking mode, it is perfectly
784 * fine to send the header block first.  TODO: update the logic to
785 * reflect this.  There should be two sending behaviors: risk and non-risk.
786 * For now, we assume risk for everything to be on the conservative side.
787 */
788static size_t
789stream_write_avail_with_headers (struct lsquic_stream *stream)
790{
791    if (stream->stream_flags & STREAM_PUSHING)
792        return stream_write_avail(stream);
793
794    switch (stream->sm_send_headers_state)
795    {
796    case SSHS_BEGIN:
797        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
798    case SSHS_ENC_SENDING:
799        if (stream->sm_hb_compl >
800                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
801            return 0;
802        LSQ_DEBUG("encoder stream bytes have all been sent");
803        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
804        /* fall-through */
805    default:
806        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
807        return stream_write_avail(stream);
808    }
809}
810
811
812size_t
813lsquic_stream_write_avail (struct lsquic_stream *stream)
814{
815    return stream->sm_write_avail(stream);
816}
817
818
819int
820lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
821{
822    struct lsquic_conn *lconn;
823
824    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
825                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
826    {
827        if (stream->sm_bflags & SMBF_IETF)
828        {
829            lconn = stream->conn_pub->lconn;
830            lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
831                "flow control violation on stream %"PRIu64, stream->id);
832        }
833        return -1;
834    }
835    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
836    {
837        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
838            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
839                                                    next_send_stream);
840        stream->sm_qflags |= SMQF_SEND_WUF;
841    }
842    return 0;
843}
844
845
846int
847lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
848{
849    uint64_t max_off;
850    int got_next_offset, rv, free_frame;
851    enum ins_frame ins_frame;
852
853    assert(frame->packet_in);
854
855    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
856    LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; "
857        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
858
859    if ((stream->sm_bflags & SMBF_USE_HEADERS)
860                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
861    {
862        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
863        lsquic_malo_put(frame);
864        return -1;
865    }
866
867    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
868  insert_frame:
869    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
870    if (INS_FRAME_OK == ins_frame)
871    {
872        /* Update maximum offset in the flow controller and check for flow
873         * control violation:
874         */
875        rv = -1;
876        free_frame = !stream->data_in->di_if->di_own_on_ok;
877        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
878        if (0 != lsquic_stream_update_sfcw(stream, max_off))
879            goto end_ok;
880        if (frame->data_frame.df_fin)
881        {
882            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
883            stream->stream_flags |= STREAM_FIN_RECVD;
884            stream->sm_fin_off = DF_END(frame);
885            maybe_finish_stream(stream);
886        }
887        if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
888                (stream->data_in->di_flags & DI_SWITCH_IMPL))
889        {
890            stream->data_in = stream->data_in->di_if->di_switch_impl(
891                                        stream->data_in, stream->read_offset);
892            if (!stream->data_in)
893            {
894                stream->data_in = data_in_error_new();
895                goto end_ok;
896            }
897        }
898        if (got_next_offset)
899            /* Checking the offset saves di_get_frame() call */
900            maybe_conn_to_tickable_if_readable(stream);
901        rv = 0;
902  end_ok:
903        if (free_frame)
904            lsquic_malo_put(frame);
905        return rv;
906    }
907    else if (INS_FRAME_DUP == ins_frame)
908    {
909        return 0;
910    }
911    else if (INS_FRAME_OVERLAP == ins_frame)
912    {
913        LSQ_DEBUG("overlap: switching DATA IN implementation");
914        stream->data_in = stream->data_in->di_if->di_switch_impl(
915                                    stream->data_in, stream->read_offset);
916        if (stream->data_in)
917            goto insert_frame;
918        stream->data_in = data_in_error_new();
919        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
920        lsquic_malo_put(frame);
921        return -1;
922    }
923    else
924    {
925        assert(INS_FRAME_ERR == ins_frame);
926        return -1;
927    }
928}
929
930
931static void
932drop_frames_in (lsquic_stream_t *stream)
933{
934    if (stream->data_in)
935    {
936        stream->data_in->di_if->di_destroy(stream->data_in);
937        /* To avoid checking whether `data_in` is set, just set to the error
938         * data-in stream.  It does the right thing after incoming data is
939         * dropped.
940         */
941        stream->data_in = data_in_error_new();
942    }
943}
944
945
946static void
947maybe_elide_stream_frames (struct lsquic_stream *stream)
948{
949    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
950    {
951        if (stream->n_unacked)
952            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
953                                                stream->id);
954        stream->stream_flags |= STREAM_FRAMES_ELIDED;
955    }
956}
957
958
959int
960lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
961                      uint64_t error_code)
962{
963
964    if (stream->stream_flags & STREAM_RST_RECVD)
965    {
966        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
967        return 0;
968    }
969
970    SM_HISTORY_APPEND(stream, SHE_RST_IN);
971    /* This flag must always be set, even if we are "ignoring" it: it is
972     * used by elision code.
973     */
974    stream->stream_flags |= STREAM_RST_RECVD;
975
976    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
977    {
978        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is "
979            "smaller than that of byte following the last byte we have seen: "
980            "0x%"PRIX64, offset,
981            lsquic_sfcw_get_max_recv_off(&stream->fc));
982        return -1;
983    }
984
985    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
986    {
987        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64
988            " violates flow control", offset);
989        return -1;
990    }
991
992    /* Let user collect error: */
993    maybe_conn_to_tickable_if_readable(stream);
994
995    lsquic_sfcw_consume_rem(&stream->fc);
996    drop_frames_in(stream);
997    drop_buffered_data(stream);
998    maybe_elide_stream_frames(stream);
999
1000    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1001                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1002        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1003
1004    stream->stream_flags |= STREAM_RST_RECVD;
1005
1006    maybe_finish_stream(stream);
1007    maybe_schedule_call_on_close(stream);
1008
1009    return 0;
1010}
1011
1012
1013void
1014lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1015                                                        uint64_t error_code)
1016{
1017    if (stream->stream_flags & STREAM_SS_RECVD)
1018    {
1019        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1020        return;
1021    }
1022
1023    SM_HISTORY_APPEND(stream, SHE_SS_IN);
1024    stream->stream_flags |= STREAM_SS_RECVD;
1025
1026    /* Let user collect error: */
1027    maybe_conn_to_tickable_if_readable(stream);
1028
1029    lsquic_sfcw_consume_rem(&stream->fc);
1030    drop_frames_in(stream);
1031    drop_buffered_data(stream);
1032    maybe_elide_stream_frames(stream);
1033
1034    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1035                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1036        lsquic_stream_reset_ext(stream, error_code, 0);
1037
1038    maybe_finish_stream(stream);
1039    maybe_schedule_call_on_close(stream);
1040}
1041
1042
1043uint64_t
1044lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1045{
1046    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1047}
1048
1049
1050void
1051lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1052{
1053    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1054    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1055    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1056        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1057    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1058}
1059
1060
1061uint64_t
1062lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1063{
1064    assert(stream->sm_qflags & SMQF_SEND_WUF);
1065    stream->sm_qflags &= ~SMQF_SEND_WUF;
1066    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1067        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1068    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1069}
1070
1071
1072void
1073lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1074{
1075    uint64_t last_off;
1076
1077    if (stream->sm_last_recv_off)
1078        last_off = stream->sm_last_recv_off;
1079    else
1080        /* This gets advertized in transport parameters */
1081        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1082
1083    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1084        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1085
1086    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1087    {
1088        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1089            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1090                                                    next_send_stream);
1091        stream->sm_qflags |= SMQF_SEND_WUF;
1092        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1093    }
1094    else if (stream->sm_qflags & SMQF_SEND_WUF)
1095        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1096    else if (stream->sm_last_recv_off)
1097        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1098            "packetized or sent", stream->sm_last_recv_off);
1099    else
1100        LSQ_INFO("Peer should have receive transport param limit "
1101            "of %"PRIu64"; odd.", last_off);
1102}
1103
1104
1105/* GQUIC's BLOCKED frame does not have an offset */
1106void
1107lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1108{
1109    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1110    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1111    {
1112        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1113            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1114                                                    next_send_stream);
1115        stream->sm_qflags |= SMQF_SEND_WUF;
1116        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1117    }
1118    else
1119        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1120}
1121
1122
1123void
1124lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1125{
1126    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1127    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1128    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1129    stream->stream_flags |= STREAM_BLOCKED_SENT;
1130    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1131        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1132}
1133
1134
1135void
1136lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1137{
1138    assert(stream->sm_qflags & SMQF_SEND_RST);
1139    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1140    stream->sm_qflags &= ~SMQF_SEND_RST;
1141    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1142        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1143    stream->stream_flags |= STREAM_RST_SENT;
1144    maybe_finish_stream(stream);
1145}
1146
1147
1148static size_t
1149read_uh (struct lsquic_stream *stream,
1150        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1151{
1152    struct http1x_headers *const h1h = stream->uh->uh_hset;
1153    size_t nread;
1154
1155    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1156                  h1h->h1h_size - h1h->h1h_off,
1157                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1158    h1h->h1h_off += nread;
1159    if (h1h->h1h_off == h1h->h1h_size)
1160    {
1161        LSQ_DEBUG("read all uncompressed headers");
1162        destroy_uh(stream);
1163        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1164        {
1165            stream->stream_flags |= STREAM_FIN_REACHED;
1166            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1167        }
1168    }
1169    return nread;
1170}
1171
1172
1173static void
1174stream_consumed_bytes (struct lsquic_stream *stream)
1175{
1176    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1177    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1178    {
1179        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1180            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1181                                                            next_send_stream);
1182        stream->sm_qflags |= SMQF_SEND_WUF;
1183        maybe_conn_to_tickable_if_writeable(stream, 1);
1184    }
1185}
1186
1187
1188struct read_frames_status
1189{
1190    int     error;
1191    int     processed_frames;
1192    size_t  total_nread;
1193};
1194
1195
1196static struct read_frames_status
1197read_data_frames (struct lsquic_stream *stream, int do_filtering,
1198        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1199{
1200    struct data_frame *data_frame;
1201    size_t nread, toread, total_nread;
1202    int short_read, processed_frames;
1203
1204    processed_frames = 0;
1205    total_nread = 0;
1206
1207    while ((data_frame = stream->data_in->di_if->di_get_frame(
1208                                        stream->data_in, stream->read_offset)))
1209    {
1210
1211        ++processed_frames;
1212
1213        do
1214        {
1215            if (do_filtering && stream->sm_sfi)
1216                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1217            else
1218                toread = data_frame->df_size - data_frame->df_read_off;
1219
1220            if (toread || data_frame->df_fin)
1221            {
1222                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1223                                                     toread, data_frame->df_fin);
1224                if (do_filtering && stream->sm_sfi)
1225                    stream->sm_sfi->sfi_decr_left(stream, nread);
1226                data_frame->df_read_off += nread;
1227                stream->read_offset += nread;
1228                total_nread += nread;
1229                short_read = nread < toread;
1230            }
1231            else
1232                short_read = 0;
1233
1234            if (data_frame->df_read_off == data_frame->df_size)
1235            {
1236                const int fin = data_frame->df_fin;
1237                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1238                data_frame = NULL;
1239                if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
1240                        (stream->data_in->di_flags & DI_SWITCH_IMPL))
1241                {
1242                    stream->data_in = stream->data_in->di_if->di_switch_impl(
1243                                                stream->data_in, stream->read_offset);
1244                    if (!stream->data_in)
1245                    {
1246                        stream->data_in = data_in_error_new();
1247                        return (struct read_frames_status) { .error = 1, };
1248                    }
1249                }
1250                if (fin)
1251                {
1252                    stream->stream_flags |= STREAM_FIN_REACHED;
1253                    goto end_while;
1254                }
1255            }
1256            else if (short_read)
1257                goto end_while;
1258        }
1259        while (data_frame);
1260    }
1261  end_while:
1262
1263    if (processed_frames)
1264        stream_consumed_bytes(stream);
1265
1266    return (struct read_frames_status) {
1267        .error = 0,
1268        .processed_frames = processed_frames,
1269        .total_nread = total_nread,
1270    };
1271}
1272
1273
1274static ssize_t
1275stream_readf (struct lsquic_stream *stream,
1276        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1277{
1278    size_t total_nread, nread;
1279    int read_unc_headers;
1280
1281    total_nread = 0;
1282
1283    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1284                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1285            && !(stream->stream_flags & STREAM_HAVE_UH)
1286            && !stream->uh)
1287    {
1288        if (stream->sm_readable(stream))
1289        {
1290            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1291            {
1292                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1293                errno = EBADMSG;
1294                return -1;
1295            }
1296            assert(stream->uh);
1297        }
1298        else
1299        {
1300            errno = EWOULDBLOCK;
1301            return -1;
1302        }
1303    }
1304
1305    if (stream->uh)
1306    {
1307        if (stream->uh->uh_flags & UH_H1H)
1308        {
1309            nread = read_uh(stream, readf, ctx);
1310            read_unc_headers = nread > 0;
1311            total_nread += nread;
1312            if (stream->uh)
1313                return total_nread;
1314        }
1315        else
1316        {
1317            LSQ_INFO("header set not claimed: cannot read from stream");
1318            return -1;
1319        }
1320    }
1321    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1322                                && !(stream->stream_flags & STREAM_HAVE_UH))
1323    {
1324        LSQ_DEBUG("cannot read: headers not available");
1325        errno = EWOULDBLOCK;
1326        return -1;
1327    }
1328    else
1329        read_unc_headers = 0;
1330
1331    const struct read_frames_status rfs
1332                        = read_data_frames(stream, 1, readf, ctx);
1333    if (rfs.error)
1334        return -1;
1335    total_nread += rfs.total_nread;
1336
1337    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
1338                                        total_nread, stream->read_offset);
1339
1340    if (rfs.processed_frames || read_unc_headers)
1341    {
1342        return total_nread;
1343    }
1344    else
1345    {
1346        assert(0 == total_nread);
1347        errno = EWOULDBLOCK;
1348        return -1;
1349    }
1350}
1351
1352
1353/* This function returns 0 when EOF is reached.
1354 */
1355ssize_t
1356lsquic_stream_readf (struct lsquic_stream *stream,
1357        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1358{
1359    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1360
1361    if (lsquic_stream_is_reset(stream))
1362    {
1363        if (stream->stream_flags & STREAM_RST_RECVD)
1364            stream->stream_flags |= STREAM_RST_READ;
1365        errno = ECONNRESET;
1366        return -1;
1367    }
1368    if (stream->stream_flags & STREAM_U_READ_DONE)
1369    {
1370        errno = EBADF;
1371        return -1;
1372    }
1373    if ((stream->stream_flags & STREAM_FIN_REACHED)
1374            && 0 == (!!(stream->stream_flags & STREAM_HAVE_UH)
1375                   ^ !!(stream->sm_bflags & SMBF_USE_HEADERS)))
1376        return 0;
1377
1378    return stream_readf(stream, readf, ctx);
1379}
1380
1381
1382struct readv_ctx
1383{
1384    const struct iovec        *iov;
1385    const struct iovec *const  end;
1386    unsigned char             *p;
1387};
1388
1389
1390static size_t
1391readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1392{
1393    struct readv_ctx *const ctx = ctx_p;
1394    const unsigned char *const end = buf + len;
1395    size_t ntocopy;
1396
1397    while (ctx->iov < ctx->end && buf < end)
1398    {
1399        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1400                                                                    - ctx->p;
1401        if (ntocopy > (size_t) (end - buf))
1402            ntocopy = end - buf;
1403        memcpy(ctx->p, buf, ntocopy);
1404        ctx->p += ntocopy;
1405        buf += ntocopy;
1406        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1407        {
1408            do
1409                ++ctx->iov;
1410            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1411            if (ctx->iov < ctx->end)
1412                ctx->p = ctx->iov->iov_base;
1413            else
1414                ctx->p = NULL;
1415        }
1416    }
1417
1418    return len - (end - buf);
1419}
1420
1421
1422ssize_t
1423lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1424                     int iovcnt)
1425{
1426    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1427    return lsquic_stream_readf(stream, readv_f, &ctx);
1428}
1429
1430
1431ssize_t
1432lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1433{
1434    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1435    return lsquic_stream_readv(stream, &iov, 1);
1436}
1437
1438
1439static void
1440stream_shutdown_read (lsquic_stream_t *stream)
1441{
1442    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1443    {
1444        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1445        stream->stream_flags |= STREAM_U_READ_DONE;
1446        stream_wantread(stream, 0);
1447        maybe_finish_stream(stream);
1448    }
1449}
1450
1451
1452static void
1453stream_shutdown_write (lsquic_stream_t *stream)
1454{
1455    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1456        return;
1457
1458    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1459    stream->stream_flags |= STREAM_U_WRITE_DONE;
1460    stream_wantwrite(stream, 0);
1461
1462    /* Don't bother to check whether there is anything else to write if
1463     * the flags indicate that nothing else should be written.
1464     */
1465    if (!(stream->sm_bflags & SMBF_CRYPTO)
1466            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1467                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1468    {
1469        if (stream->sm_n_buffered == 0)
1470        {
1471            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1472                                                 stream))
1473            {
1474                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1475                stream->stream_flags |= STREAM_FIN_SENT;
1476            }
1477            else
1478            {
1479                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1480                          "flag in it");
1481                (void) stream_flush_nocheck(stream);
1482            }
1483        }
1484        else
1485            (void) stream_flush_nocheck(stream);
1486    }
1487}
1488
1489
1490static void
1491maybe_stream_shutdown_write (struct lsquic_stream *stream)
1492{
1493    if (stream->sm_send_headers_state == SSHS_BEGIN)
1494        stream_shutdown_write(stream);
1495    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1496    {
1497        LSQ_DEBUG("shutdown delayed");
1498        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1499        stream->stream_flags |= STREAM_DELAYED_SW;
1500    }
1501}
1502
1503
1504int
1505lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1506{
1507    LSQ_DEBUG("shutdown; how: %d", how);
1508    if (lsquic_stream_is_closed(stream))
1509    {
1510        LSQ_INFO("Attempt to shut down a closed stream");
1511        errno = EBADF;
1512        return -1;
1513    }
1514    /* 0: read, 1: write: 2: read and write
1515     */
1516    if (how < 0 || how > 2)
1517    {
1518        errno = EINVAL;
1519        return -1;
1520    }
1521
1522    if (how)
1523        maybe_stream_shutdown_write(stream);
1524    if (how != 1)
1525        stream_shutdown_read(stream);
1526
1527    maybe_finish_stream(stream);
1528    maybe_schedule_call_on_close(stream);
1529    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1530        maybe_conn_to_tickable_if_writeable(stream, 1);
1531
1532    return 0;
1533}
1534
1535
1536void
1537lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1538{
1539    LSQ_DEBUG("internal shutdown");
1540    if (lsquic_stream_is_critical(stream))
1541    {
1542        LSQ_DEBUG("add flag to force-finish special stream");
1543        stream->stream_flags |= STREAM_FORCE_FINISH;
1544        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1545    }
1546    maybe_finish_stream(stream);
1547    maybe_schedule_call_on_close(stream);
1548}
1549
1550
1551static void
1552fake_reset_unused_stream (lsquic_stream_t *stream)
1553{
1554    stream->stream_flags |=
1555        STREAM_RST_RECVD    /* User will pick this up on read or write */
1556      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1557    ;
1558
1559    /* Cancel all writes to the network scheduled for this stream: */
1560    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1561        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1562                                                next_send_stream);
1563    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1564
1565    LSQ_DEBUG("fake-reset stream%s",
1566                    stream_stalled(stream) ? " (stalled)" : "");
1567    maybe_finish_stream(stream);
1568    maybe_schedule_call_on_close(stream);
1569}
1570
1571
1572/* This function should only be called for locally-initiated streams whose ID
1573 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1574 * frame sent by peer but we have not yet received it and created a stream.
1575 * In this situation, we mark the stream as reset, so that user's on_read or
1576 * on_write event callback picks up the error.  That, in turn, should result
1577 * in stream being closed.
1578 *
1579 * If we have received any data frames on this stream, this probably indicates
1580 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1581 * lower than this.  However, we still try to handle it gracefully and peform
1582 * a shutdown, as if the stream was not reset.
1583 */
1584void
1585lsquic_stream_received_goaway (lsquic_stream_t *stream)
1586{
1587    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1588    if (0 == stream->read_offset &&
1589                            stream->data_in->di_if->di_empty(stream->data_in))
1590        fake_reset_unused_stream(stream);       /* Normal condition */
1591    else
1592    {   /* This is odd, let's handle it the best we can: */
1593        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1594        lsquic_stream_shutdown_internal(stream);
1595    }
1596}
1597
1598
1599uint64_t
1600lsquic_stream_read_offset (const lsquic_stream_t *stream)
1601{
1602    return stream->read_offset;
1603}
1604
1605
1606static int
1607stream_wantread (lsquic_stream_t *stream, int is_want)
1608{
1609    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
1610    const int new_val = !!is_want;
1611    if (old_val != new_val)
1612    {
1613        if (new_val)
1614        {
1615            if (!old_val)
1616                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1617                                                            next_read_stream);
1618            stream->sm_qflags |= SMQF_WANT_READ;
1619        }
1620        else
1621        {
1622            stream->sm_qflags &= ~SMQF_WANT_READ;
1623            if (old_val)
1624                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1625                                                            next_read_stream);
1626        }
1627    }
1628    return old_val;
1629}
1630
1631
1632static void
1633maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1634{
1635    assert(SMQF_WRITE_Q_FLAGS & flag);
1636    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1637        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1638                                                        next_write_stream);
1639    stream->sm_qflags |= flag;
1640}
1641
1642
1643static void
1644maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1645{
1646    assert(SMQF_WRITE_Q_FLAGS & flag);
1647    if (stream->sm_qflags & flag)
1648    {
1649        stream->sm_qflags &= ~flag;
1650        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1651            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1652                                                        next_write_stream);
1653    }
1654}
1655
1656
1657static int
1658stream_wantwrite (struct lsquic_stream *stream, int new_val)
1659{
1660    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1661
1662    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
1663
1664    if (old_val != new_val)
1665    {
1666        if (new_val)
1667            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
1668        else
1669            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
1670    }
1671    return old_val;
1672}
1673
1674
1675int
1676lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1677{
1678    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1679    {
1680        if (is_want)
1681            maybe_conn_to_tickable_if_readable(stream);
1682        return stream_wantread(stream, is_want);
1683    }
1684    else
1685    {
1686        errno = EBADF;
1687        return -1;
1688    }
1689}
1690
1691
1692int
1693lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1694{
1695    int old_val;
1696
1697    is_want = !!is_want;
1698
1699    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
1700                            && SSHS_BEGIN == stream->sm_send_headers_state)
1701    {
1702        stream->sm_saved_want_write = is_want;
1703        if (is_want)
1704            maybe_conn_to_tickable_if_writeable(stream, 1);
1705        return stream_wantwrite(stream, is_want);
1706    }
1707    else if (SSHS_BEGIN != stream->sm_send_headers_state)
1708    {
1709        old_val = stream->sm_saved_want_write;
1710        stream->sm_saved_want_write = is_want;
1711        return old_val;
1712    }
1713    else
1714    {
1715        errno = EBADF;
1716        return -1;
1717    }
1718}
1719
1720
1721struct progress
1722{
1723    enum stream_flags   s_flags;
1724    enum stream_q_flags q_flags;
1725};
1726
1727
1728static struct progress
1729stream_progress (const struct lsquic_stream *stream)
1730{
1731    return (struct progress) {
1732        .s_flags = stream->stream_flags
1733          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
1734        .q_flags = stream->sm_qflags
1735          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
1736    };
1737}
1738
1739
1740static int
1741progress_eq (struct progress a, struct progress b)
1742{
1743    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
1744}
1745
1746
1747static void
1748stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1749{
1750    unsigned no_progress_count, no_progress_limit;
1751    struct progress progress;
1752    uint64_t size;
1753
1754    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1755
1756    no_progress_count = 0;
1757    while ((stream->sm_qflags & SMQF_WANT_READ)
1758                                            && lsquic_stream_readable(stream))
1759    {
1760        progress = stream_progress(stream);
1761        size  = stream->read_offset;
1762
1763        stream->stream_if->on_read(stream, stream->st_ctx);
1764
1765        if (no_progress_limit && size == stream->read_offset &&
1766                                progress_eq(progress, stream_progress(stream)))
1767        {
1768            ++no_progress_count;
1769            if (no_progress_count >= no_progress_limit)
1770            {
1771                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1772                    "progress) in user code reading from stream",
1773                    no_progress_count,
1774                    no_progress_count == 1 ? "" : "s");
1775                break;
1776            }
1777        }
1778        else
1779            no_progress_count = 0;
1780    }
1781}
1782
1783
1784static void
1785stream_hblock_sent (struct lsquic_stream *stream)
1786{
1787    int want_write;
1788
1789    LSQ_DEBUG("header block has been sent: restore default behavior");
1790    stream->sm_send_headers_state = SSHS_BEGIN;
1791    stream->sm_write_avail = stream_write_avail;
1792
1793    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1794    if (want_write != stream->sm_saved_want_write)
1795        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
1796
1797    if (stream->stream_flags & STREAM_DELAYED_SW)
1798    {
1799        LSQ_DEBUG("performing delayed shutdown write");
1800        stream->stream_flags &= ~STREAM_DELAYED_SW;
1801        stream_shutdown_write(stream);
1802        maybe_schedule_call_on_close(stream);
1803        maybe_finish_stream(stream);
1804        maybe_conn_to_tickable_if_writeable(stream, 1);
1805    }
1806}
1807
1808
1809static void
1810on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
1811{
1812    ssize_t nw;
1813
1814    nw = stream_write_buf(stream,
1815                stream->sm_header_block + stream->sm_hblock_off,
1816                stream->sm_hblock_sz - stream->sm_hblock_off);
1817    if (nw > 0)
1818    {
1819        stream->sm_hblock_off += nw;
1820        if (stream->sm_hblock_off == stream->sm_hblock_sz)
1821        {
1822            stream->stream_flags |= STREAM_HEADERS_SENT;
1823            free(stream->sm_header_block);
1824            stream->sm_header_block = NULL;
1825            stream->sm_hblock_sz = 0;
1826            stream_hblock_sent(stream);
1827            LSQ_DEBUG("header block written out successfully");
1828            /* TODO: if there was eos, do something else */
1829            if (stream->sm_qflags & SMQF_WANT_WRITE)
1830                stream->stream_if->on_write(stream, h);
1831        }
1832        else
1833        {
1834            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
1835                nw);
1836        }
1837    }
1838    else if (nw < 0)
1839    {
1840        /* XXX What should happen if we hit an error? TODO */
1841    }
1842}
1843
1844
1845static void
1846(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
1847                                                        lsquic_stream_ctx_t *)
1848{
1849    if (0 == (stream->stream_flags & STREAM_PUSHING)
1850                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
1851        /* Common case */
1852        return stream->stream_if->on_write;
1853    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
1854        return on_write_header_wrapper;
1855    else
1856    {
1857        assert(stream->stream_flags & STREAM_PUSHING);
1858        if (stream_is_pushing_promise(stream))
1859            return on_write_pp_wrapper;
1860        else if (stream->sm_dup_push_off < stream->sm_dup_push_len)
1861            return on_write_dp_wrapper;
1862        else
1863            return stream->stream_if->on_write;
1864    }
1865}
1866
1867
1868static void
1869stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1870{
1871    unsigned no_progress_count, no_progress_limit;
1872    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1873    struct progress progress;
1874
1875    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1876
1877    no_progress_count = 0;
1878    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1879    while ((stream->sm_qflags & SMQF_WANT_WRITE)
1880                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
1881                       && lsquic_stream_write_avail(stream))
1882    {
1883        progress = stream_progress(stream);
1884
1885        on_write = select_on_write(stream);
1886        on_write(stream, stream->st_ctx);
1887
1888        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
1889        {
1890            ++no_progress_count;
1891            if (no_progress_count >= no_progress_limit)
1892            {
1893                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1894                    "progress) in user code writing to stream",
1895                    no_progress_count,
1896                    no_progress_count == 1 ? "" : "s");
1897                break;
1898            }
1899        }
1900        else
1901            no_progress_count = 0;
1902    }
1903}
1904
1905
1906static void
1907stream_dispatch_read_events_once (lsquic_stream_t *stream)
1908{
1909    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
1910    {
1911        stream->stream_if->on_read(stream, stream->st_ctx);
1912    }
1913}
1914
1915
1916uint64_t
1917lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
1918{
1919    size_t frames_sizes;
1920
1921    frames_sizes = active_hq_frame_sizes(stream);
1922    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
1923}
1924
1925
1926static void
1927maybe_mark_as_blocked (lsquic_stream_t *stream)
1928{
1929    struct lsquic_conn_cap *cc;
1930    uint64_t used;
1931
1932    used = lsquic_stream_combined_send_off(stream);
1933    if (stream->max_send_off == used)
1934    {
1935        if (stream->blocked_off < stream->max_send_off)
1936        {
1937            stream->blocked_off = used;
1938            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1939                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1940                                                            next_send_stream);
1941            stream->sm_qflags |= SMQF_SEND_BLOCKED;
1942            LSQ_DEBUG("marked stream-blocked at stream offset "
1943                                            "%"PRIu64, stream->blocked_off);
1944        }
1945        else
1946            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1947                " has been, or is about to be, sent", stream->blocked_off);
1948    }
1949
1950    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
1951        && (cc = &stream->conn_pub->conn_cap,
1952                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1953    {
1954        if (cc->cc_blocked < cc->cc_max)
1955        {
1956            cc->cc_blocked = cc->cc_max;
1957            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1958            LSQ_DEBUG("marked connection-blocked at connection offset "
1959                                                    "%"PRIu64, cc->cc_max);
1960        }
1961        else
1962            LSQ_DEBUG("stream has already been marked connection-blocked "
1963                "at offset %"PRIu64, cc->cc_blocked);
1964    }
1965}
1966
1967
1968void
1969lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1970{
1971    assert(stream->sm_qflags & SMQF_WANT_READ);
1972
1973    if (stream->sm_bflags & SMBF_RW_ONCE)
1974        stream_dispatch_read_events_once(stream);
1975    else
1976        stream_dispatch_read_events_loop(stream);
1977}
1978
1979
1980void
1981lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1982{
1983    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1984    int progress;
1985    uint64_t tosend_off;
1986    unsigned short n_buffered;
1987    enum stream_q_flags q_flags;
1988
1989    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
1990    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
1991    tosend_off = stream->tosend_off;
1992    n_buffered = stream->sm_n_buffered;
1993
1994    if (stream->sm_qflags & SMQF_WANT_FLUSH)
1995        (void) stream_flush(stream);
1996
1997    if (stream->sm_bflags & SMBF_RW_ONCE)
1998    {
1999        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2000            && lsquic_stream_write_avail(stream))
2001        {
2002            on_write = select_on_write(stream);
2003            on_write(stream, stream->st_ctx);
2004        }
2005    }
2006    else
2007        stream_dispatch_write_events_loop(stream);
2008
2009    /* Progress means either flags or offsets changed: */
2010    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2011                        stream->tosend_off == tosend_off &&
2012                            stream->sm_n_buffered == n_buffered);
2013
2014    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2015    {
2016        if (progress)
2017        {   /* Move the stream to the end of the list to ensure fairness. */
2018            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2019                                                            next_write_stream);
2020            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2021                                                            next_write_stream);
2022        }
2023    }
2024}
2025
2026
2027static size_t
2028inner_reader_empty_size (void *ctx)
2029{
2030    return 0;
2031}
2032
2033
2034static size_t
2035inner_reader_empty_read (void *ctx, void *buf, size_t count)
2036{
2037    return 0;
2038}
2039
2040
2041static int
2042stream_flush (lsquic_stream_t *stream)
2043{
2044    struct lsquic_reader empty_reader;
2045    ssize_t nw;
2046
2047    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2048    assert(stream->sm_n_buffered > 0 ||
2049        /* Flushing is also used to packetize standalone FIN: */
2050        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2051                                                    == STREAM_U_WRITE_DONE));
2052
2053    empty_reader.lsqr_size = inner_reader_empty_size;
2054    empty_reader.lsqr_read = inner_reader_empty_read;
2055    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2056    nw = stream_write_to_packets(stream, &empty_reader, 0);
2057
2058    if (nw >= 0)
2059    {
2060        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2061        return 0;
2062    }
2063    else
2064        return -1;
2065}
2066
2067
2068static int
2069stream_flush_nocheck (lsquic_stream_t *stream)
2070{
2071    size_t frames;
2072
2073    frames = active_hq_frame_sizes(stream);
2074    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2075    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2076    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2077    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2078
2079    return stream_flush(stream);
2080}
2081
2082
2083int
2084lsquic_stream_flush (lsquic_stream_t *stream)
2085{
2086    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2087    {
2088        LSQ_DEBUG("cannot flush closed stream");
2089        errno = EBADF;
2090        return -1;
2091    }
2092
2093    if (0 == stream->sm_n_buffered)
2094    {
2095        LSQ_DEBUG("flushing 0 bytes: noop");
2096        return 0;
2097    }
2098
2099    return stream_flush_nocheck(stream);
2100}
2101
2102
2103static size_t
2104stream_get_n_allowed (const struct lsquic_stream *stream)
2105{
2106    if (stream->sm_n_allocated)
2107        return stream->sm_n_allocated;
2108    else
2109        return stream->conn_pub->path->np_pack_size;
2110}
2111
2112
2113/* The flush threshold is the maximum size of stream data that can be sent
2114 * in a full packet.
2115 */
2116#ifdef NDEBUG
2117static
2118#endif
2119       size_t
2120lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2121                                                            unsigned data_sz)
2122{
2123    enum packet_out_flags flags;
2124    enum packno_bits bits;
2125    size_t packet_header_sz, stream_header_sz, tag_len;
2126    size_t threshold;
2127
2128    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
2129    flags = bits << POBIT_SHIFT;
2130    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2131        flags |= PO_CONN_ID;
2132    if (stream_is_hsk(stream))
2133        flags |= PO_LONGHEAD;
2134
2135    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2136                                        stream->conn_pub->path->np_dcid.len);
2137    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2138    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2139
2140    threshold = stream_get_n_allowed(stream) - tag_len
2141              - packet_header_sz - stream_header_sz;
2142    return threshold;
2143}
2144
2145
2146#define COMMON_WRITE_CHECKS() do {                                          \
2147    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2148            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2149    {                                                                       \
2150        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2151        {                                                                   \
2152            LSQ_DEBUG("still sending headers: no writing allowed");         \
2153            return 0;                                                       \
2154        }                                                                   \
2155        else                                                                \
2156        {                                                                   \
2157            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2158                                                                "headers"); \
2159            errno = EILSEQ;                                                 \
2160            return -1;                                                      \
2161        }                                                                   \
2162    }                                                                       \
2163    if (lsquic_stream_is_reset(stream))                                     \
2164    {                                                                       \
2165        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2166        errno = ECONNRESET;                                                 \
2167        return -1;                                                          \
2168    }                                                                       \
2169    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2170    {                                                                       \
2171        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2172                                                                "writing"); \
2173        errno = EBADF;                                                      \
2174        return -1;                                                          \
2175    }                                                                       \
2176} while (0)
2177
2178
2179struct frame_gen_ctx
2180{
2181    lsquic_stream_t      *fgc_stream;
2182    struct lsquic_reader *fgc_reader;
2183    /* We keep our own count of how many bytes were read from reader because
2184     * some readers are external.  The external caller does not have to rely
2185     * on our count, but it can.
2186     */
2187    size_t                fgc_nread_from_reader;
2188    size_t              (*fgc_size) (void *ctx);
2189    int                 (*fgc_fin) (void *ctx);
2190    gsf_read_f            fgc_read;
2191};
2192
2193
2194static size_t
2195frame_std_gen_size (void *ctx)
2196{
2197    struct frame_gen_ctx *fg_ctx = ctx;
2198    size_t available, remaining;
2199
2200    /* Make sure we are not writing past available size: */
2201    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2202    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2203    if (available < remaining)
2204        remaining = available;
2205
2206    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2207}
2208
2209
2210static size_t
2211stream_hq_frame_size (const struct stream_hq_frame *shf)
2212{
2213    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2214        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2215    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2216        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2217    else
2218    {
2219        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2220                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2221        return 0;
2222    }
2223}
2224
2225
2226static size_t
2227active_hq_frame_sizes (const struct lsquic_stream *stream)
2228{
2229    const struct stream_hq_frame *shf;
2230    size_t size;
2231
2232    size = 0;
2233    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2234                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2235        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2236            if (!(shf->shf_flags & SHF_WRITTEN))
2237                size += stream_hq_frame_size(shf);
2238
2239    return size;
2240}
2241
2242
2243static uint64_t
2244stream_hq_frame_end (const struct stream_hq_frame *shf)
2245{
2246    if (shf->shf_flags & SHF_FIXED_SIZE)
2247        return shf->shf_off + shf->shf_frame_size;
2248    else if (shf->shf_flags & SHF_TWO_BYTES)
2249        return shf->shf_off + ((1 << 14) - 1);
2250    else
2251        return shf->shf_off + ((1 << 6) - 1);
2252}
2253
2254
2255static int
2256frame_in_stream (const struct lsquic_stream *stream,
2257                                            const struct stream_hq_frame *shf)
2258{
2259    return shf >= stream->sm_hq_frame_arr
2260        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2261                                        / sizeof(stream->sm_hq_frame_arr[0])
2262        ;
2263}
2264
2265
2266static void
2267stream_hq_frame_put (struct lsquic_stream *stream,
2268                                                struct stream_hq_frame *shf)
2269{
2270    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2271    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2272    if (frame_in_stream(stream, shf))
2273        memset(shf, 0, sizeof(*shf));
2274    else
2275        lsquic_malo_put(shf);
2276}
2277
2278
2279static void
2280stream_hq_frame_close (struct lsquic_stream *stream,
2281                                                struct stream_hq_frame *shf)
2282{
2283    unsigned bits;
2284
2285    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2286        " (actual offset %"PRIu64")", shf->shf_frame_type,
2287        stream->sm_payload, stream->tosend_off);
2288    assert(shf->shf_flags & SHF_ACTIVE);
2289    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2290    {
2291        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2292        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2293        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2294                                                            bits, 1 << bits);
2295    }
2296    stream_hq_frame_put(stream, shf);
2297}
2298
2299
2300static size_t
2301frame_hq_gen_size (void *ctx)
2302{
2303    struct frame_gen_ctx *fg_ctx = ctx;
2304    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2305    size_t available, remaining, frames;
2306    const struct stream_hq_frame *shf;
2307
2308    frames = 0;
2309    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2310        if (shf->shf_off >= stream->sm_payload)
2311            frames += stream_hq_frame_size(shf);
2312
2313    /* Make sure we are not writing past available size: */
2314    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2315    available = lsquic_stream_write_avail(stream);
2316    if (available < remaining)
2317        remaining = available;
2318
2319    return remaining + stream->sm_n_buffered + frames;
2320}
2321
2322
2323static int
2324frame_std_gen_fin (void *ctx)
2325{
2326    struct frame_gen_ctx *fg_ctx = ctx;
2327    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2328        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2329        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2330        /* Do not use frame_std_gen_size() as it may chop the real size: */
2331        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2332}
2333
2334
2335static void
2336incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2337{
2338    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2339    {
2340        stream->conn_pub->conn_cap.cc_sent += incr;
2341        assert(stream->conn_pub->conn_cap.cc_sent
2342                                    <= stream->conn_pub->conn_cap.cc_max);
2343    }
2344}
2345
2346
2347void
2348incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2349{
2350    stream->sm_payload += incr;
2351    stream->tosend_off += incr;
2352    assert(stream->tosend_off <= stream->max_send_off);
2353}
2354
2355
2356static size_t
2357frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2358{
2359    struct frame_gen_ctx *fg_ctx = ctx;
2360    unsigned char *p = begin_buf;
2361    unsigned char *const end = p + len;
2362    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2363    size_t n_written, available, n_to_write;
2364
2365    if (stream->sm_n_buffered > 0)
2366    {
2367        if (len <= stream->sm_n_buffered)
2368        {
2369            memcpy(p, stream->sm_buf, len);
2370            memmove(stream->sm_buf, stream->sm_buf + len,
2371                                                stream->sm_n_buffered - len);
2372            stream->sm_n_buffered -= len;
2373            if (0 == stream->sm_n_buffered)
2374                maybe_resize_stream_buffer(stream);
2375            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2376            incr_sm_payload(stream, len);
2377            *fin = fg_ctx->fgc_fin(fg_ctx);
2378            return len;
2379        }
2380        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2381        p += stream->sm_n_buffered;
2382        stream->sm_n_buffered = 0;
2383        maybe_resize_stream_buffer(stream);
2384    }
2385
2386    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2387    n_to_write = end - p;
2388    if (n_to_write > available)
2389        n_to_write = available;
2390    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2391                                              n_to_write);
2392    p += n_written;
2393    fg_ctx->fgc_nread_from_reader += n_written;
2394    *fin = fg_ctx->fgc_fin(fg_ctx);
2395    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2396    incr_conn_cap(stream, n_written);
2397    return p - (const unsigned char *) begin_buf;
2398}
2399
2400
2401static struct stream_hq_frame *
2402find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2403{
2404    struct stream_hq_frame *shf;
2405
2406    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2407        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2408            return shf;
2409
2410    return NULL;
2411}
2412
2413
2414static struct stream_hq_frame *
2415find_cur_hq_frame (const struct lsquic_stream *stream)
2416{
2417    return find_hq_frame(stream, stream->sm_payload);
2418}
2419
2420
2421static struct stream_hq_frame *
2422open_hq_frame (struct lsquic_stream *stream)
2423{
2424    struct stream_hq_frame *shf;
2425
2426    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2427            + sizeof(stream->sm_hq_frame_arr)
2428                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2429        if (!(shf->shf_flags & SHF_ACTIVE))
2430            goto found;
2431
2432    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2433    if (!shf)
2434    {
2435        LSQ_WARN("cannot allocate HQ frame");
2436        return NULL;
2437    }
2438    memset(shf, 0, sizeof(*shf));
2439
2440  found:
2441    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2442    shf->shf_flags = SHF_ACTIVE;
2443    return shf;
2444}
2445
2446
2447/* Returns index of the new frame */
2448static struct stream_hq_frame *
2449stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2450            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2451{
2452    struct stream_hq_frame *shf;
2453
2454    shf = open_hq_frame(stream);
2455    if (!shf)
2456    {
2457        LSQ_WARN("could not open HQ frame");
2458        return NULL;
2459    }
2460
2461    shf->shf_off        = off;
2462    shf->shf_flags     |= flags;
2463    shf->shf_frame_type = frame_type;
2464    if (shf->shf_flags & SHF_FIXED_SIZE)
2465        shf->shf_frame_size = size;
2466    else
2467    {
2468        shf->shf_frame_ptr  = NULL;
2469        if (size >= (1 << 6))
2470            shf->shf_flags |= SHF_TWO_BYTES;
2471    }
2472
2473    return shf;
2474}
2475
2476
2477static size_t
2478frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2479{
2480    struct frame_gen_ctx *fg_ctx = ctx;
2481    unsigned char *p = begin_buf;
2482    unsigned char *const end = p + len;
2483    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2484    struct stream_hq_frame *shf;
2485    size_t nw, frame_sz, avail, rem;
2486    unsigned bits;
2487
2488    while (p < end)
2489    {
2490        shf = find_cur_hq_frame(stream);
2491        if (!shf)
2492        {
2493            rem = frame_std_gen_size(ctx);
2494            if (rem)
2495            {
2496                if (rem > ((1 << 14) - 1))
2497                    rem = (1 << 14) - 1;
2498                shf = stream_activate_hq_frame(stream,
2499                                    stream->sm_payload, HQFT_DATA, 0, rem);
2500                /* XXX malloc can fail */
2501            }
2502            else
2503                break;
2504        }
2505        avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
2506        if (shf->shf_off == stream->sm_payload
2507                                        && !(shf->shf_flags & SHF_WRITTEN))
2508        {
2509            frame_sz = stream_hq_frame_size(shf);
2510            if (frame_sz > (uintptr_t) (end - p))
2511                break;
2512            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2513                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2514                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2515            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2516            {
2517                shf->shf_frame_ptr = p;
2518                memset(p, 0, frame_sz);
2519                p += frame_sz;
2520            }
2521            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2522                                                            == SHF_FIXED_SIZE)
2523            {
2524                *p++ = shf->shf_frame_type;
2525                bits = vint_val2bits(shf->shf_frame_size);
2526                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2527                p += 1 << bits;
2528            }
2529            else
2530                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2531                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2532            if (!(shf->shf_flags & SHF_CC_PAID))
2533            {
2534                incr_conn_cap(stream, frame_sz);
2535                shf->shf_flags |= SHF_CC_PAID;
2536            }
2537            shf->shf_flags |= SHF_WRITTEN;
2538            stream->tosend_off += frame_sz;
2539            assert(stream->tosend_off <= stream->max_send_off);
2540        }
2541        else
2542        {
2543            len = stream_hq_frame_end(shf) - stream->sm_payload;
2544            assert(len);
2545            if (len > (unsigned) (end - p))
2546                len = end - p;
2547            if (len > avail)
2548                len = avail;
2549            if (!len)
2550                break;
2551            nw = frame_std_gen_read(ctx, p, len, fin);
2552            p += nw;
2553            if (nw < len)
2554                break;
2555            if (stream_hq_frame_end(shf) == stream->sm_payload)
2556                stream_hq_frame_close(stream, shf);
2557        }
2558    }
2559
2560    return p - (unsigned char *) begin_buf;
2561}
2562
2563
2564static size_t
2565crypto_frame_gen_read (void *ctx, void *buf, size_t len)
2566{
2567    int fin_ignored;
2568
2569    return frame_std_gen_read(ctx, buf, len, &fin_ignored);
2570}
2571
2572
2573static void
2574check_flush_threshold (lsquic_stream_t *stream)
2575{
2576    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
2577                            stream->tosend_off >= stream->sm_flush_to)
2578    {
2579        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
2580                                                    stream->sm_flush_to);
2581        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
2582    }
2583}
2584
2585
2586static int
2587write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
2588                                        struct lsquic_packet_out *packet_out)
2589{
2590    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2591    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2592    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2593    unsigned off;
2594    int len, s;
2595
2596#if LSQUIC_CONN_STATS
2597    const uint64_t begin_off = stream->tosend_off;
2598#endif
2599    off = packet_out->po_data_sz;
2600    len = pf->pf_gen_stream_frame(
2601                packet_out->po_data + packet_out->po_data_sz,
2602                lsquic_packet_out_avail(packet_out), stream->id,
2603                stream->tosend_off,
2604                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
2605    if (len < 0)
2606        return len;
2607
2608#if LSQUIC_CONN_STATS
2609    stream->conn_pub->conn_stats->out.stream_frames += 1;
2610    stream->conn_pub->conn_stats->out.stream_data_sz
2611                                            += stream->tosend_off - begin_off;
2612#endif
2613    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
2614                            packet_out->po_data + packet_out->po_data_sz, len);
2615    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2616    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
2617    if (0 == lsquic_packet_out_avail(packet_out))
2618        packet_out->po_flags |= PO_STREAM_END;
2619    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2620                                     stream, QUIC_FRAME_STREAM, off, len);
2621    if (s != 0)
2622    {
2623        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
2624        return -1;
2625    }
2626
2627    check_flush_threshold(stream);
2628    return len;
2629}
2630
2631
2632static enum swtp_status
2633stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
2634{
2635    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2636    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2637    struct lsquic_packet_out *packet_out;
2638    int len;
2639
2640    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
2641                                                    stream->conn_pub->path);
2642    if (!packet_out)
2643        return SWTP_STOP;
2644    packet_out->po_header_type = stream->tosend_off == 0
2645                                        ? HETY_INITIAL : HETY_HANDSHAKE;
2646
2647    len = write_stream_frame(fg_ctx, size, packet_out);
2648
2649    if (len > 0)
2650    {
2651        packet_out->po_flags |= PO_HELLO;
2652        lsquic_packet_out_zero_pad(packet_out);
2653        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
2654        return SWTP_OK;
2655    }
2656    else
2657        return SWTP_ERROR;
2658}
2659
2660
2661static enum swtp_status
2662stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
2663{
2664    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2665    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2666    unsigned stream_header_sz, need_at_least;
2667    struct lsquic_packet_out *packet_out;
2668    int len;
2669
2670    if (!(stream->sm_bflags & SMBF_IETF)
2671            && (stream->stream_flags &
2672                    (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
2673                                                        == STREAM_HEADERS_SENT
2674        && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream))
2675    {
2676        /* TODO: make this logic work for IETF streams as well XXX */
2677        struct lsquic_stream *const headers_stream
2678                = lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
2679        if (lsquic_stream_has_data_to_flush(headers_stream))
2680        {
2681            LSQ_DEBUG("flushing headers stream before potential write to a "
2682                                                            "buffered packet");
2683            (void) lsquic_stream_flush(headers_stream);
2684        }
2685        else
2686            /* Some other stream must have flushed it: this means our headers
2687             * are flushed.
2688             */
2689            stream->stream_flags |= STREAM_HDRS_FLUSHED;
2690    }
2691
2692    stream_header_sz = stream->sm_frame_header_sz(stream, size);
2693    need_at_least = stream_header_sz + (size > 0);
2694  get_packet:
2695    packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl,
2696                                need_at_least, stream->conn_pub->path, stream);
2697    if (packet_out)
2698    {
2699        len = write_stream_frame(fg_ctx, size, packet_out);
2700        if (len > 0)
2701            return SWTP_OK;
2702        assert(len < 0);
2703        if (-len > (int) need_at_least)
2704        {
2705            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
2706                "%u bytes, will try again", -len, need_at_least);
2707            need_at_least = -len;
2708            goto get_packet;
2709        }
2710        return SWTP_ERROR;
2711    }
2712    else
2713        return SWTP_STOP;
2714}
2715
2716
2717static enum swtp_status
2718stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
2719{
2720    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2721    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2722    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2723    unsigned crypto_header_sz, need_at_least;
2724    struct lsquic_packet_out *packet_out;
2725    unsigned short off;
2726    const enum packnum_space pns = lsquic_enclev2pns[ crypto_level(stream) ];
2727    int len, s;
2728
2729    assert(size > 0);
2730    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
2731    need_at_least = crypto_header_sz + 1;
2732
2733    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
2734                                    need_at_least, pns, stream->conn_pub->path);
2735    if (!packet_out)
2736        return SWTP_STOP;
2737
2738    off = packet_out->po_data_sz;
2739    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
2740                lsquic_packet_out_avail(packet_out), stream->tosend_off,
2741                size, crypto_frame_gen_read, fg_ctx);
2742    if (len < 0)
2743        return len;
2744
2745    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
2746                            packet_out->po_data + packet_out->po_data_sz, len);
2747    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2748    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
2749    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2750                                     stream, QUIC_FRAME_CRYPTO, off, len);
2751    if (s != 0)
2752    {
2753        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
2754        return -1;
2755    }
2756
2757    packet_out->po_flags |= PO_HELLO;
2758
2759    check_flush_threshold(stream);
2760    return SWTP_OK;
2761}
2762
2763
2764static void
2765abort_connection (struct lsquic_stream *stream)
2766{
2767    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
2768        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
2769                                                next_service_stream);
2770    stream->sm_qflags |= SMQF_ABORT_CONN;
2771    LSQ_WARN("connection will be aborted");
2772    maybe_conn_to_tickable(stream);
2773}
2774
2775
2776static void
2777maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
2778{
2779    struct stream_hq_frame *shf;
2780    uint64_t size;
2781    unsigned bits;
2782
2783    shf = find_cur_hq_frame(stream);
2784    if (!shf)
2785        return;
2786
2787    if (shf->shf_flags & SHF_FIXED_SIZE)
2788    {
2789        stream_hq_frame_put(stream, shf);
2790        return;
2791    }
2792
2793    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2794    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
2795    if (size && size <= VINT_MAX_B(bits))
2796    {
2797        if (0 == stream->sm_n_buffered)
2798            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
2799                                                shf->shf_frame_type, size);
2800        else
2801            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
2802                                                shf->shf_frame_type, size);
2803        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2804        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
2805        if (0 == stream->sm_n_buffered)
2806            stream_hq_frame_put(stream, shf);
2807        else
2808        {
2809            shf->shf_frame_size = size;
2810            shf->shf_flags |= SHF_FIXED_SIZE;
2811        }
2812    }
2813    else if (!size)
2814    {
2815        assert(!shf->shf_frame_ptr);
2816        LSQ_WARN("discard zero-sized HQ frame type 0x%X (off: %"PRIu64")",
2817                                        shf->shf_frame_type, shf->shf_off);
2818        stream_hq_frame_put(stream, shf);
2819    }
2820    else
2821    {
2822        assert(stream->sm_n_buffered);
2823        LSQ_ERROR("cannot close frame of size %"PRIu64" -- too large", size);
2824        /* TODO: abort connection */
2825        stream_hq_frame_put(stream, shf);
2826    }
2827}
2828
2829
2830static ssize_t
2831stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
2832                         size_t thresh)
2833{
2834    size_t size;
2835    ssize_t nw;
2836    unsigned seen_ok;
2837    int use_framing;
2838    struct frame_gen_ctx fg_ctx = {
2839        .fgc_stream = stream,
2840        .fgc_reader = reader,
2841        .fgc_nread_from_reader = 0,
2842    };
2843
2844    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2845                                       == (SMBF_IETF|SMBF_USE_HEADERS);
2846    if (use_framing)
2847    {
2848        fg_ctx.fgc_size = frame_hq_gen_size;
2849        fg_ctx.fgc_read = frame_hq_gen_read;
2850        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
2851    }
2852    else
2853    {
2854        fg_ctx.fgc_size = frame_std_gen_size;
2855        fg_ctx.fgc_read = frame_std_gen_read;
2856        fg_ctx.fgc_fin = frame_std_gen_fin;
2857    }
2858
2859    seen_ok = 0;
2860    while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0)
2861           || fg_ctx.fgc_fin(&fg_ctx))
2862    {
2863        switch (stream->sm_write_to_packet(&fg_ctx, size))
2864        {
2865        case SWTP_OK:
2866            if (!seen_ok++)
2867                maybe_conn_to_tickable_if_writeable(stream, 0);
2868            if (fg_ctx.fgc_fin(&fg_ctx))
2869            {
2870                if (use_framing && seen_ok)
2871                    maybe_close_varsize_hq_frame(stream);
2872                stream->stream_flags |= STREAM_FIN_SENT;
2873                goto end;
2874            }
2875            else
2876                break;
2877        case SWTP_STOP:
2878            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2879            if (use_framing && seen_ok)
2880                maybe_close_varsize_hq_frame(stream);
2881            goto end;
2882        default:
2883            abort_connection(stream);
2884            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
2885            return -1;
2886        }
2887    }
2888
2889    if (use_framing && seen_ok)
2890        maybe_close_varsize_hq_frame(stream);
2891
2892    if (thresh)
2893    {
2894        assert(size < thresh);
2895        assert(size >= stream->sm_n_buffered);
2896        size -= stream->sm_n_buffered;
2897        if (size > 0)
2898        {
2899            nw = save_to_buffer(stream, reader, size);
2900            if (nw < 0)
2901                return -1;
2902            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
2903        }
2904    }
2905    else
2906    {
2907        /* We count flushed data towards both stream and connection limits,
2908         * so we should have been able to packetize all of it:
2909         */
2910        assert(0 == stream->sm_n_buffered);
2911        assert(size == 0);
2912    }
2913
2914    maybe_mark_as_blocked(stream);
2915
2916  end:
2917    return fg_ctx.fgc_nread_from_reader;
2918}
2919
2920
2921/* Perform an implicit flush when we hit connection limit while buffering
2922 * data.  This is to prevent a (theoretical) stall:
2923 *
2924 * Imagine a number of streams, all of which buffered some data.  The buffered
2925 * data is up to connection cap, which means no further writes are possible.
2926 * None of them flushes, which means that data is not sent and connection
2927 * WINDOW_UPDATE frame never arrives from peer.  Stall.
2928 */
2929static int
2930maybe_flush_stream (struct lsquic_stream *stream)
2931{
2932    if (stream->sm_n_buffered > 0
2933          && (stream->sm_bflags & SMBF_CONN_LIMITED)
2934            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
2935        return stream_flush_nocheck(stream);
2936    else
2937        return 0;
2938}
2939
2940
2941static int
2942stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
2943                                                                    unsigned len)
2944{
2945    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
2946        && cur_off - shf->shf_off < (1 << 6)
2947        && cur_off - shf->shf_off + len >= (1 << 6)
2948        ;
2949}
2950
2951
2952/* Update currently buffered HQ frame or create a new one, if possible.
2953 * Return update length to be buffered.  If a HQ frame cannot be
2954 * buffered due to size, 0 is returned, thereby preventing both HQ frame
2955 * creation and buffering.
2956 */
2957static size_t
2958update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
2959                                                                size_t avail)
2960{
2961    struct stream_hq_frame *shf;
2962    uint64_t cur_off, end;
2963    size_t frame_sz;
2964    int extendable;
2965
2966    cur_off = stream->sm_payload + stream->sm_n_buffered;
2967    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2968        if (shf->shf_off <= cur_off)
2969        {
2970            end = stream_hq_frame_end(shf);
2971            extendable = stream_hq_frame_extendable(shf, cur_off, len);
2972            if (cur_off < end + extendable)
2973                break;
2974        }
2975
2976    if (shf)
2977    {
2978        if (len > end - cur_off)
2979            len = end - cur_off;
2980        frame_sz = stream_hq_frame_size(shf);
2981    }
2982    else if (avail >= 3)
2983    {
2984        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
2985        /* XXX malloc can fail */
2986        if (len > stream_hq_frame_end(shf) - cur_off)
2987            len = stream_hq_frame_end(shf) - cur_off;
2988        extendable = 0;
2989        frame_sz = stream_hq_frame_size(shf);
2990        if (avail < frame_sz)
2991            return 0;
2992        avail -= frame_sz;
2993    }
2994    else
2995        return 0;
2996
2997    if (!(shf->shf_flags & SHF_CC_PAID))
2998    {
2999        incr_conn_cap(stream, frame_sz);
3000        shf->shf_flags |= SHF_CC_PAID;
3001    }
3002    if (extendable)
3003    {
3004        shf->shf_flags |= SHF_TWO_BYTES;
3005        incr_conn_cap(stream, 1);
3006        avail -= 1;
3007        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3008                && shf->shf_off <= stream->sm_payload
3009                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3010            stream->sm_flush_to += 1;
3011    }
3012
3013    if (len <= avail)
3014        return len;
3015    else
3016        return avail;
3017}
3018
3019
3020static ssize_t
3021save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3022                                                                size_t len)
3023{
3024    size_t avail, n_written, n_allowed;
3025
3026    n_allowed = stream_get_n_allowed(stream);
3027    assert(stream->sm_n_buffered + len <= n_allowed);
3028
3029    if (!stream->sm_buf)
3030    {
3031        stream->sm_buf = malloc(n_allowed);
3032        if (!stream->sm_buf)
3033            return -1;
3034        stream->sm_n_allocated = n_allowed;
3035    }
3036
3037    avail = lsquic_stream_write_avail(stream);
3038    if (avail < len)
3039        len = avail;
3040
3041    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3042                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3043        len = update_buffered_hq_frames(stream, len, avail);
3044
3045    n_written = reader->lsqr_read(reader->lsqr_ctx,
3046                        stream->sm_buf + stream->sm_n_buffered, len);
3047    stream->sm_n_buffered += n_written;
3048    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3049    incr_conn_cap(stream, n_written);
3050    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3051              n_written, stream->sm_n_buffered);
3052    if (0 != maybe_flush_stream(stream))
3053        return -1;
3054    return n_written;
3055}
3056
3057
3058static ssize_t
3059stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
3060{
3061    const struct stream_hq_frame *shf;
3062    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3063    ssize_t nw;
3064
3065    len = reader->lsqr_size(reader->lsqr_ctx);
3066    if (len == 0)
3067        return 0;
3068
3069    frames = 0;
3070    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3071                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3072        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3073            if (shf->shf_off >= stream->sm_payload)
3074                frames += stream_hq_frame_size(shf);
3075    total_len = len + frames + stream->sm_n_buffered;
3076    thresh = lsquic_stream_flush_threshold(stream, total_len);
3077    n_allowed = stream_get_n_allowed(stream);
3078    if (total_len <= n_allowed && total_len < thresh)
3079    {
3080        nwritten = 0;
3081        do
3082        {
3083            nw = save_to_buffer(stream, reader, len - nwritten);
3084            if (nw > 0)
3085                nwritten += (size_t) nw;
3086            else if (nw == 0)
3087                break;
3088            else
3089                return nw;
3090        }
3091        while (nwritten < len
3092                        && stream->sm_n_buffered < stream->sm_n_allocated);
3093        return nwritten;
3094    }
3095    else
3096        return stream_write_to_packets(stream, reader, thresh);
3097}
3098
3099
3100ssize_t
3101lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3102{
3103    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3104    return lsquic_stream_writev(stream, &iov, 1);
3105}
3106
3107
3108struct inner_reader_iovec {
3109    const struct iovec       *iov;
3110    const struct iovec *end;
3111    unsigned                  cur_iovec_off;
3112};
3113
3114
3115static size_t
3116inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3117{
3118    struct inner_reader_iovec *const iro = ctx;
3119    unsigned char *p = buf;
3120    unsigned char *const end = p + count;
3121    unsigned n_tocopy;
3122
3123    while (iro->iov < iro->end && p < end)
3124    {
3125        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3126        if (n_tocopy > (unsigned) (end - p))
3127            n_tocopy = end - p;
3128        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3129                                                                    n_tocopy);
3130        p += n_tocopy;
3131        iro->cur_iovec_off += n_tocopy;
3132        if (iro->iov->iov_len == iro->cur_iovec_off)
3133        {
3134            ++iro->iov;
3135            iro->cur_iovec_off = 0;
3136        }
3137    }
3138
3139    return p + count - end;
3140}
3141
3142
3143static size_t
3144inner_reader_iovec_size (void *ctx)
3145{
3146    struct inner_reader_iovec *const iro = ctx;
3147    const struct iovec *iov;
3148    size_t size;
3149
3150    size = 0;
3151    for (iov = iro->iov; iov < iro->end; ++iov)
3152        size += iov->iov_len;
3153
3154    return size - iro->cur_iovec_off;
3155}
3156
3157
3158ssize_t
3159lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3160                                                                    int iovcnt)
3161{
3162    COMMON_WRITE_CHECKS();
3163    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3164
3165    struct inner_reader_iovec iro = {
3166        .iov = iov,
3167        .end = iov + iovcnt,
3168        .cur_iovec_off = 0,
3169    };
3170    struct lsquic_reader reader = {
3171        .lsqr_read = inner_reader_iovec_read,
3172        .lsqr_size = inner_reader_iovec_size,
3173        .lsqr_ctx  = &iro,
3174    };
3175
3176    return stream_write(stream, &reader);
3177}
3178
3179
3180ssize_t
3181lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3182{
3183    COMMON_WRITE_CHECKS();
3184    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3185    return stream_write(stream, reader);
3186}
3187
3188
3189/* This bypasses COMMON_WRITE_CHECKS */
3190static ssize_t
3191stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3192{
3193    struct iovec iov = { (void *) buf, sz, };
3194    struct inner_reader_iovec iro = {
3195        .iov = &iov,
3196        .end = &iov + 1,
3197        .cur_iovec_off = 0,
3198    };
3199    struct lsquic_reader reader = {
3200        .lsqr_read = inner_reader_iovec_read,
3201        .lsqr_size = inner_reader_iovec_size,
3202        .lsqr_ctx  = &iro,
3203    };
3204    return stream_write(stream, &reader);
3205}
3206
3207
3208/* XXX Move this define elsewhere? */
3209#define MAX_HEADERS_SIZE (64 * 1024)
3210
3211static int
3212send_headers_ietf (struct lsquic_stream *stream,
3213                            const struct lsquic_http_headers *headers, int eos)
3214{
3215    enum qwh_status qwh;
3216    const size_t max_prefix_size =
3217                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
3218    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
3219    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
3220    unsigned bits;
3221    ssize_t nw;
3222    unsigned char *header_block;
3223    unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE];
3224
3225    stream->stream_flags &= ~STREAM_PUSHING;
3226    stream->stream_flags |= STREAM_NOPUSH;
3227
3228    /* TODO: Optimize for the common case: write directly to sm_buf and fall
3229     * back to a larger buffer if that fails.
3230     */
3231    prefix_sz = max_prefix_size;
3232    headers_sz = sizeof(buf) - max_prefix_size - max_push_size;
3233    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
3234                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
3235                &headers_sz, &stream->sm_hb_compl);
3236
3237    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
3238    {
3239        if (qwh == QWH_ENOBUF)
3240            LSQ_INFO("not enough room for header block");
3241        else
3242            LSQ_WARN("internal error encoding and sending HTTP headers");
3243        return -1;
3244    }
3245
3246    if (stream->sm_promise)
3247    {
3248        assert(lsquic_stream_is_pushed(stream));
3249        bits = vint_val2bits(stream->sm_promise->pp_id);
3250        push_sz = 1 + (1 << bits);
3251        if (!stream_activate_hq_frame(stream,
3252                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
3253                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
3254            return -1;
3255        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
3256        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
3257                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
3258    }
3259    else
3260        push_sz = 0;
3261
3262    /* Construct contiguous header block buffer including HQ framing */
3263    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
3264    hblock_sz = push_sz + prefix_sz + headers_sz;
3265    if (!stream_activate_hq_frame(stream,
3266                stream->sm_payload + stream->sm_n_buffered + push_sz,
3267                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
3268        return -1;
3269
3270    if (qwh == QWH_FULL)
3271    {
3272        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
3273        if (lsquic_stream_write_avail(stream))
3274        {
3275            nw = stream_write_buf(stream, header_block, hblock_sz);
3276            if (nw < 0)
3277            {
3278                LSQ_WARN("cannot write to stream: %s", strerror(errno));
3279                return -1;
3280            }
3281            if ((size_t) nw == hblock_sz)
3282            {
3283                stream->stream_flags |= STREAM_HEADERS_SENT;
3284                stream_hblock_sent(stream);
3285                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
3286                return 0;
3287            }
3288            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
3289        }
3290        else
3291        {
3292            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
3293                                        "header block", hblock_sz);
3294            nw = 0;
3295        }
3296    }
3297    else
3298    {
3299        stream->sm_send_headers_state = SSHS_ENC_SENDING;
3300        nw = 0;
3301    }
3302
3303    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
3304    stream_wantwrite(stream, 1);
3305
3306    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
3307    if (!stream->sm_header_block)
3308    {
3309        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
3310            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
3311        return -1;
3312    }
3313    memcpy(stream->sm_header_block, header_block + (size_t) nw,
3314                                                hblock_sz - (size_t) nw);
3315    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
3316    stream->sm_hblock_off = 0;
3317    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
3318    return 0;
3319}
3320
3321
3322static int
3323send_headers_gquic (struct lsquic_stream *stream,
3324                            const struct lsquic_http_headers *headers, int eos)
3325{
3326    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
3327                stream->id, headers, eos, lsquic_stream_priority(stream));
3328    if (0 == s)
3329    {
3330        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
3331        stream->stream_flags |= STREAM_HEADERS_SENT;
3332        if (eos)
3333            stream->stream_flags |= STREAM_FIN_SENT;
3334        LSQ_INFO("sent headers");
3335    }
3336    else
3337        LSQ_WARN("could not send headers: %s", strerror(errno));
3338    return s;
3339}
3340
3341
3342int
3343lsquic_stream_send_headers (lsquic_stream_t *stream,
3344                            const lsquic_http_headers_t *headers, int eos)
3345{
3346    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3347            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
3348    {
3349        if (stream->sm_bflags & SMBF_IETF)
3350            return send_headers_ietf(stream, headers, eos);
3351        else
3352            return send_headers_gquic(stream, headers, eos);
3353    }
3354    else
3355    {
3356        LSQ_INFO("cannot send headers in this state");
3357        errno = EBADMSG;
3358        return -1;
3359    }
3360}
3361
3362
3363void
3364lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
3365{
3366    if (offset > stream->max_send_off)
3367    {
3368        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
3369        LSQ_DEBUG("update max send offset from 0x%"PRIX64" to "
3370            "0x%"PRIX64, stream->max_send_off, offset);
3371        stream->max_send_off = offset;
3372    }
3373    else
3374        LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old "
3375            "max send offset 0x%"PRIX64", ignoring", offset,
3376            stream->max_send_off);
3377}
3378
3379
3380/* This function is used to update offsets after handshake completes and we
3381 * learn of peer's limits from the handshake values.
3382 */
3383int
3384lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
3385{
3386    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
3387    if (offset > stream->max_send_off)
3388    {
3389        lsquic_stream_window_update(stream, offset);
3390        return 0;
3391    }
3392    else if (offset < stream->tosend_off)
3393    {
3394        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
3395            "data already sent on this stream (%"PRIu64" bytes)", offset,
3396            stream->tosend_off);
3397        return -1;
3398    }
3399    else
3400    {
3401        stream->max_send_off = offset;
3402        return 0;
3403    }
3404}
3405
3406
3407void
3408lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
3409{
3410    lsquic_stream_reset_ext(stream, error_code, 1);
3411}
3412
3413
3414void
3415lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
3416                         int do_close)
3417{
3418    if ((stream->stream_flags & STREAM_RST_SENT)
3419                                    || (stream->sm_qflags & SMQF_SEND_RST))
3420    {
3421        LSQ_INFO("reset already sent");
3422        return;
3423    }
3424
3425    SM_HISTORY_APPEND(stream, SHE_RESET);
3426
3427    LSQ_INFO("reset, error code %"PRIu64, error_code);
3428    stream->error_code = error_code;
3429
3430    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
3431        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
3432                                                        next_send_stream);
3433    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
3434    stream->sm_qflags |= SMQF_SEND_RST;
3435
3436    if (stream->sm_qflags & SMQF_QPACK_DEC)
3437    {
3438        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
3439        stream->sm_qflags |= ~SMQF_QPACK_DEC;
3440    }
3441
3442    drop_buffered_data(stream);
3443    maybe_elide_stream_frames(stream);
3444    maybe_schedule_call_on_close(stream);
3445
3446    if (do_close)
3447        lsquic_stream_close(stream);
3448    else
3449        maybe_conn_to_tickable_if_writeable(stream, 1);
3450}
3451
3452
3453lsquic_stream_id_t
3454lsquic_stream_id (const lsquic_stream_t *stream)
3455{
3456    return stream->id;
3457}
3458
3459
3460#if !defined(NDEBUG) && __GNUC__
3461__attribute__((weak))
3462#endif
3463struct lsquic_conn *
3464lsquic_stream_conn (const lsquic_stream_t *stream)
3465{
3466    return stream->conn_pub->lconn;
3467}
3468
3469
3470int
3471lsquic_stream_close (lsquic_stream_t *stream)
3472{
3473    LSQ_DEBUG("lsquic_stream_close() called");
3474    SM_HISTORY_APPEND(stream, SHE_CLOSE);
3475    if (lsquic_stream_is_closed(stream))
3476    {
3477        LSQ_INFO("Attempt to close an already-closed stream");
3478        errno = EBADF;
3479        return -1;
3480    }
3481    maybe_stream_shutdown_write(stream);
3482    stream_shutdown_read(stream);
3483    maybe_schedule_call_on_close(stream);
3484    maybe_finish_stream(stream);
3485    if (!(stream->stream_flags & STREAM_DELAYED_SW))
3486        maybe_conn_to_tickable_if_writeable(stream, 1);
3487    return 0;
3488}
3489
3490
3491#ifndef NDEBUG
3492#if __GNUC__
3493__attribute__((weak))
3494#endif
3495#endif
3496void
3497lsquic_stream_acked (struct lsquic_stream *stream,
3498                                            enum quic_frame_type frame_type)
3499{
3500    assert(stream->n_unacked);
3501    --stream->n_unacked;
3502    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
3503    if (frame_type == QUIC_FRAME_RST_STREAM)
3504    {
3505        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
3506        LSQ_DEBUG("RESET that we sent has been acked by peer");
3507        stream->stream_flags |= STREAM_RST_ACKED;
3508    }
3509    if (0 == stream->n_unacked)
3510        maybe_finish_stream(stream);
3511}
3512
3513
3514void
3515lsquic_stream_push_req (lsquic_stream_t *stream,
3516                        struct uncompressed_headers *push_req)
3517{
3518    assert(!stream->push_req);
3519    stream->push_req = push_req;
3520    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
3521}
3522
3523
3524int
3525lsquic_stream_is_pushed (const lsquic_stream_t *stream)
3526{
3527    enum stream_id_type sit;
3528
3529    if (stream->sm_bflags & SMBF_IETF)
3530    {
3531        sit = stream->id & SIT_MASK;
3532        return sit == SIT_UNI_SERVER;
3533    }
3534    else
3535        return 1 & ~stream->id;
3536}
3537
3538
3539int
3540lsquic_stream_push_info (const lsquic_stream_t *stream,
3541                          lsquic_stream_id_t *ref_stream_id, void **hset)
3542{
3543    if (lsquic_stream_is_pushed(stream))
3544    {
3545        assert(stream->push_req);
3546        *ref_stream_id = stream->push_req->uh_stream_id;
3547        *hset          = stream->push_req->uh_hset;
3548        return 0;
3549    }
3550    else
3551        return -1;
3552}
3553
3554
3555int
3556lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
3557{
3558    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3559                                    && !(stream->stream_flags & STREAM_HAVE_UH))
3560    {
3561        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3562        LSQ_DEBUG("received uncompressed headers");
3563        stream->stream_flags |= STREAM_HAVE_UH;
3564        if (uh->uh_flags & UH_FIN)
3565        {
3566            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3567             * mark request as done:
3568             */
3569            if (stream->sm_bflags & SMBF_IETF)
3570                assert((stream->sm_bflags & SMBF_SERVER)
3571                                            && lsquic_stream_is_pushed(stream));
3572            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3573        }
3574        stream->uh = uh;
3575        if (uh->uh_oth_stream_id == 0)
3576        {
3577            if (uh->uh_weight)
3578                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3579        }
3580        else
3581            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3582                                                        uh->uh_oth_stream_id);
3583        return 0;
3584    }
3585    else
3586    {
3587        LSQ_ERROR("received unexpected uncompressed headers");
3588        return -1;
3589    }
3590}
3591
3592
3593unsigned
3594lsquic_stream_priority (const lsquic_stream_t *stream)
3595{
3596    return 256 - stream->sm_priority;
3597}
3598
3599
3600int
3601lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
3602{
3603    /* The user should never get a reference to the special streams,
3604     * but let's check just in case:
3605     */
3606    if (lsquic_stream_is_critical(stream))
3607        return -1;
3608    if (priority < 1 || priority > 256)
3609        return -1;
3610    stream->sm_priority = 256 - priority;
3611    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
3612    LSQ_DEBUG("set priority to %u", priority);
3613    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
3614    return 0;
3615}
3616
3617
3618static int
3619maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
3620{
3621    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3622                            && (stream->stream_flags & STREAM_HEADERS_SENT))
3623    {
3624        /* We need to send headers only if we are a) using HEADERS stream
3625         * and b) we already sent initial headers.  If initial headers
3626         * have not been sent yet, stream priority will be sent in the
3627         * HEADERS frame.
3628         */
3629        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
3630                                                stream->id, 0, 0, priority);
3631    }
3632    else
3633        return 0;
3634}
3635
3636
3637static int
3638send_priority_ietf (struct lsquic_stream *stream, unsigned priority)
3639{
3640    LSQ_WARN("%s: TODO", __func__);     /* TODO */
3641    return -1;
3642}
3643
3644
3645int
3646lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
3647{
3648    if (0 == lsquic_stream_set_priority_internal(stream, priority))
3649    {
3650        if (stream->sm_bflags & SMBF_IETF)
3651            return send_priority_ietf(stream, priority);
3652        else
3653            return maybe_send_priority_gquic(stream, priority);
3654    }
3655    else
3656        return -1;
3657}
3658
3659
3660lsquic_stream_ctx_t *
3661lsquic_stream_get_ctx (const lsquic_stream_t *stream)
3662{
3663    return stream->st_ctx;
3664}
3665
3666
3667int
3668lsquic_stream_refuse_push (lsquic_stream_t *stream)
3669{
3670    if (lsquic_stream_is_pushed(stream)
3671            && !(stream->sm_qflags & SMQF_SEND_RST)
3672            && !(stream->stream_flags & STREAM_RST_SENT))
3673    {
3674        LSQ_DEBUG("refusing pushed stream: send reset");
3675        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
3676        return 0;
3677    }
3678    else
3679        return -1;
3680}
3681
3682
3683size_t
3684lsquic_stream_mem_used (const struct lsquic_stream *stream)
3685{
3686    size_t size;
3687
3688    size = sizeof(stream);
3689    if (stream->sm_buf)
3690        size += stream->sm_n_allocated;
3691    if (stream->data_in)
3692        size += stream->data_in->di_if->di_mem_used(stream->data_in);
3693
3694    return size;
3695}
3696
3697
3698const lsquic_cid_t *
3699lsquic_stream_cid (const struct lsquic_stream *stream)
3700{
3701    return LSQUIC_LOG_CONN_ID;
3702}
3703
3704
3705void
3706lsquic_stream_dump_state (const struct lsquic_stream *stream)
3707{
3708    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
3709                                                    stream->read_offset);
3710    stream->data_in->di_if->di_dump_state(stream->data_in);
3711}
3712
3713
3714void *
3715lsquic_stream_get_hset (struct lsquic_stream *stream)
3716{
3717    void *hset;
3718
3719    if (lsquic_stream_is_reset(stream))
3720    {
3721        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
3722        errno = ECONNRESET;
3723        return NULL;
3724    }
3725
3726    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
3727                                && (stream->stream_flags & STREAM_HAVE_UH)))
3728    {
3729        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
3730                                                        stream->stream_flags);
3731        return NULL;
3732    }
3733
3734    if (!stream->uh)
3735    {
3736        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
3737        return NULL;
3738    }
3739
3740    if (stream->uh->uh_flags & UH_H1H)
3741    {
3742        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
3743        return NULL;
3744    }
3745
3746    hset = stream->uh->uh_hset;
3747    stream->uh->uh_hset = NULL;
3748    destroy_uh(stream);
3749    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
3750    {
3751        stream->stream_flags |= STREAM_FIN_REACHED;
3752        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
3753    }
3754    LSQ_DEBUG("return header set");
3755    return hset;
3756}
3757
3758
3759/* GQUIC-only function */
3760int
3761lsquic_stream_id_is_critical (int use_http, lsquic_stream_id_t stream_id)
3762{
3763    return stream_id == LSQUIC_GQUIC_STREAM_HANDSHAKE
3764        || (stream_id == LSQUIC_GQUIC_STREAM_HEADERS && use_http);
3765}
3766
3767
3768int
3769lsquic_stream_is_critical (const struct lsquic_stream *stream)
3770{
3771    return stream->sm_bflags & SMBF_CRITICAL;
3772}
3773
3774
3775void
3776lsquic_stream_set_stream_if (struct lsquic_stream *stream,
3777           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
3778{
3779    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
3780    stream->stream_if    = stream_if;
3781    stream->sm_onnew_arg = stream_if_ctx;
3782    LSQ_DEBUG("switched interface");
3783    assert(stream->stream_flags & STREAM_ONNEW_DONE);
3784    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
3785                                                      stream);
3786}
3787
3788
3789static int
3790update_type_hist_and_check (struct hq_filter *filter)
3791{
3792    /* 3-bit codes: */
3793    enum {
3794        CODE_UNSET,
3795        CODE_HEADER,    /* H    Header  */
3796        CODE_DATA,      /* D    Data    */
3797        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
3798    };
3799    static const unsigned valid_seqs[] = {
3800        /* Ordered by expected frequency */
3801        0123,   /* HD+  */
3802        012,    /* HD   */
3803        01,     /* H    */
3804        01231,  /* HD+H */
3805        0121,   /* HDH  */
3806    };
3807    unsigned code, i;
3808
3809    switch (filter->hqfi_type)
3810    {
3811    case HQFT_HEADERS:
3812        code = CODE_HEADER;
3813        break;
3814    case HQFT_DATA:
3815        code = CODE_DATA;
3816        break;
3817    default:
3818        /* Ignore unknown frames */
3819        return 0;
3820    }
3821
3822    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
3823        return -1;
3824
3825    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
3826    {
3827        filter->hqfi_hist_buf <<= 3;
3828        filter->hqfi_hist_buf |= CODE_PLUS;
3829        filter->hqfi_hist_idx++;
3830    }
3831    else if (filter->hqfi_hist_idx > 1
3832            && ((filter->hqfi_hist_buf >> 3) & 7) == code
3833            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
3834        /* Keep it at plus, do nothing */;
3835    else
3836    {
3837        filter->hqfi_hist_buf <<= 3;
3838        filter->hqfi_hist_buf |= code;
3839        filter->hqfi_hist_idx++;
3840    }
3841
3842    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
3843        if (filter->hqfi_hist_buf == valid_seqs[i])
3844            return 0;
3845
3846    return -1;
3847}
3848
3849
3850static size_t
3851hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
3852{
3853    struct lsquic_stream *const stream = ctx;
3854    struct hq_filter *const filter = &stream->sm_hq_filter;
3855    const unsigned char *p = buf, *prev;
3856    const unsigned char *const end = buf + sz;
3857    struct lsquic_conn *lconn;
3858    enum lsqpack_read_header_status rhs;
3859    int s;
3860
3861    while (p < end)
3862    {
3863        switch (filter->hqfi_state)
3864        {
3865        case HQFI_STATE_FRAME_HEADER_BEGIN:
3866            filter->hqfi_vint_state.vr2s_state = 0;
3867            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
3868            /* fall-through */
3869        case HQFI_STATE_FRAME_HEADER_CONTINUE:
3870            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint_state);
3871            if (s < 0)
3872                break;
3873            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
3874            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
3875            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
3876                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
3877                filter->hqfi_left);
3878            if (0 != update_type_hist_and_check(filter))
3879            {
3880                lconn = stream->conn_pub->lconn;
3881                filter->hqfi_flags |= HQFI_FLAG_ERROR;
3882                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
3883                    filter->hqfi_hist_buf);
3884                lconn->cn_if->ci_abort_error(lconn, 1, HEC_UNEXPECTED_FRAME,
3885                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
3886                    stream->id);
3887                goto end;
3888            }
3889            if (filter->hqfi_type == HQFT_HEADERS)
3890            {
3891                if (0 == (filter->hqfi_flags & HQFI_FLAG_GOT_HEADERS))
3892                    filter->hqfi_flags |= HQFI_FLAG_GOT_HEADERS;
3893                else
3894                {
3895                    filter->hqfi_type = (1ull << 62) - 1;
3896                    LSQ_DEBUG("Ignoring HEADERS frame");
3897                }
3898            }
3899            if (filter->hqfi_left > 0)
3900            {
3901                if (filter->hqfi_type == HQFT_DATA)
3902                    goto end;
3903                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
3904                {
3905                    lconn = stream->conn_pub->lconn;
3906                    if (stream->sm_bflags & SMBF_SERVER)
3907                        lconn->cn_if->ci_abort_error(lconn, 1,
3908                            HEC_UNEXPECTED_FRAME, "Received PUSH_PROMISE frame "
3909                            "on stream %"PRIu64" (clients are not supposed to "
3910                            "send those)", stream->id);
3911                    else
3912                        /* Our client implementation does not support server
3913                         * push.
3914                         */
3915                        lconn->cn_if->ci_abort_error(lconn, 1,
3916                            HEC_UNEXPECTED_FRAME, /* TODO: in ID-21 ID_ERROR? */
3917                            "Received PUSH_PROMISE frame (not supported)"
3918                            "on stream %"PRIu64, stream->id);
3919                    goto end;
3920                }
3921            }
3922            else
3923            {
3924                switch (filter->hqfi_type)
3925                {
3926                case HQFT_CANCEL_PUSH:
3927                case HQFT_GOAWAY:
3928                case HQFT_HEADERS:
3929                case HQFT_MAX_PUSH_ID:
3930                case HQFT_PRIORITY:
3931                case HQFT_PUSH_PROMISE:
3932                case HQFT_SETTINGS:
3933                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
3934                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
3935                                                            filter->hqfi_type);
3936                    abort_connection(stream);   /* XXX Overkill? */
3937                    goto end;
3938                default:
3939                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
3940                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
3941                    break;
3942                }
3943            }
3944            break;
3945        case HQFI_STATE_READING_PAYLOAD:
3946            if (filter->hqfi_type == HQFT_DATA)
3947                goto end;
3948            sz = filter->hqfi_left;
3949            if (sz > (uintptr_t) (end - p))
3950                sz = (uintptr_t) (end - p);
3951            switch (filter->hqfi_type)
3952            {
3953            case HQFT_HEADERS:
3954                prev = p;
3955                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
3956                {
3957                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
3958                    rhs = lsquic_qdh_header_in_begin(
3959                                stream->conn_pub->u.ietf.qdh,
3960                                stream, filter->hqfi_left, &p, sz);
3961                }
3962                else
3963                    rhs = lsquic_qdh_header_in_continue(
3964                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
3965                assert(p > prev || LQRHS_ERROR == rhs);
3966                filter->hqfi_left -= p - prev;
3967                if (filter->hqfi_left == 0)
3968                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
3969                switch (rhs)
3970                {
3971                case LQRHS_DONE:
3972                    assert(filter->hqfi_left == 0);
3973                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
3974                    break;
3975                case LQRHS_NEED:
3976                    stream->sm_qflags |= SMQF_QPACK_DEC;
3977                    break;
3978                case LQRHS_BLOCKED:
3979                    stream->sm_qflags |= SMQF_QPACK_DEC;
3980                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
3981                    goto end;
3982                default:
3983                    assert(LQRHS_ERROR == rhs);
3984                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
3985                    LSQ_INFO("error processing header block");
3986                    abort_connection(stream);   /* XXX Overkill? */
3987                    goto end;
3988                }
3989                break;
3990            default:
3991                /* Simply skip unknown frame type payload for now */
3992                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
3993                p += sz;
3994                filter->hqfi_left -= sz;
3995                if (filter->hqfi_left == 0)
3996                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
3997                break;
3998            }
3999            break;
4000        default:
4001            assert(0);
4002            goto end;
4003        }
4004    }
4005
4006  end:
4007    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4008    {
4009        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4010                                                        filter->hqfi_state);
4011        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4012/* From [draft-ietf-quic-http-16] Section 3.1:
4013 *               When a stream terminates cleanly, if the last frame on
4014 * the stream was truncated, this MUST be treated as a connection error
4015 * (see HTTP_MALFORMED_FRAME in Section 8.1).
4016 */
4017        abort_connection(stream);
4018    }
4019
4020    return p - buf;
4021}
4022
4023
4024static int
4025hq_filter_readable_now (const struct lsquic_stream *stream)
4026{
4027    const struct hq_filter *const filter = &stream->sm_hq_filter;
4028
4029    return (filter->hqfi_type == HQFT_DATA
4030                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4031        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4032        || stream->uh
4033        || (stream->stream_flags & STREAM_FIN_REACHED)
4034    ;
4035}
4036
4037
4038static int
4039hq_filter_readable (struct lsquic_stream *stream)
4040{
4041    struct hq_filter *const filter = &stream->sm_hq_filter;
4042    struct read_frames_status rfs;
4043
4044    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
4045        return 0;
4046
4047    if (!hq_filter_readable_now(stream))
4048    {
4049        rfs = read_data_frames(stream, 0, hq_read, stream);
4050        if (rfs.total_nread == 0)
4051        {
4052            if (rfs.error)
4053            {
4054                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4055                abort_connection(stream);   /* XXX Overkill? */
4056                return 1;   /* Collect error */
4057            }
4058            return 0;
4059        }
4060    }
4061
4062    return hq_filter_readable_now(stream);
4063}
4064
4065
4066static size_t
4067hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
4068{
4069    struct hq_filter *const filter = &stream->sm_hq_filter;
4070    size_t nr;
4071
4072    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4073                                            && filter->hqfi_type == HQFT_DATA))
4074    {
4075        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
4076                            data_frame->df_size - data_frame->df_read_off,
4077                            data_frame->df_fin);
4078        if (nr)
4079        {
4080            stream->read_offset += nr;
4081            stream_consumed_bytes(stream);
4082        }
4083    }
4084    else
4085        nr = 0;
4086
4087    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
4088    {
4089        data_frame->df_read_off += nr;
4090        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4091                                        && filter->hqfi_type == HQFT_DATA)
4092            return MIN(filter->hqfi_left,
4093                    (unsigned) data_frame->df_size - data_frame->df_read_off);
4094        else
4095        {
4096            assert(data_frame->df_read_off == data_frame->df_size);
4097            return 0;
4098        }
4099    }
4100    else
4101    {
4102        data_frame->df_read_off = data_frame->df_size;
4103        return 0;
4104    }
4105}
4106
4107
4108static void
4109hq_decr_left (struct lsquic_stream *stream, size_t read)
4110{
4111    struct hq_filter *const filter = &stream->sm_hq_filter;
4112
4113    if (read)
4114    {
4115        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4116                                            && filter->hqfi_type == HQFT_DATA);
4117        assert(read <= filter->hqfi_left);
4118    }
4119
4120    filter->hqfi_left -= read;
4121    if (0 == filter->hqfi_left)
4122        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4123}
4124
4125
4126struct qpack_dec_hdl *
4127lsquic_stream_get_qdh (const struct lsquic_stream *stream)
4128{
4129    return stream->conn_pub->u.ietf.qdh;
4130}
4131
4132
4133/* These are IETF QUIC states */
4134enum stream_state_sending
4135lsquic_stream_sending_state (const struct lsquic_stream *stream)
4136{
4137    if (0 == (stream->stream_flags & STREAM_RST_SENT))
4138    {
4139        if (stream->stream_flags & STREAM_FIN_SENT)
4140        {
4141            if (stream->n_unacked)
4142                return SSS_DATA_SENT;
4143            else
4144                return SSS_DATA_RECVD;
4145        }
4146        else
4147        {
4148            if (stream->tosend_off
4149                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
4150                return SSS_SEND;
4151            else
4152                return SSS_READY;
4153        }
4154    }
4155    else if (stream->stream_flags & STREAM_RST_ACKED)
4156        return SSS_RESET_RECVD;
4157    else
4158        return SSS_RESET_SENT;
4159}
4160
4161
4162const char *const lsquic_sss2str[] =
4163{
4164    [SSS_READY]        =  "Ready",
4165    [SSS_SEND]         =  "Send",
4166    [SSS_DATA_SENT]    =  "Data Sent",
4167    [SSS_RESET_SENT]   =  "Reset Sent",
4168    [SSS_DATA_RECVD]   =  "Data Recvd",
4169    [SSS_RESET_RECVD]  =  "Reset Recvd",
4170};
4171
4172
4173const char *const lsquic_ssr2str[] =
4174{
4175    [SSR_RECV]         =  "Recv",
4176    [SSR_SIZE_KNOWN]   =  "Size Known",
4177    [SSR_DATA_RECVD]   =  "Data Recvd",
4178    [SSR_RESET_RECVD]  =  "Reset Recvd",
4179    [SSR_DATA_READ]    =  "Data Read",
4180    [SSR_RESET_READ]   =  "Reset Read",
4181};
4182
4183
4184/* These are IETF QUIC states */
4185enum stream_state_receiving
4186lsquic_stream_receiving_state (struct lsquic_stream *stream)
4187{
4188    uint64_t n_bytes;
4189
4190    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
4191    {
4192        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
4193            return SSR_RECV;
4194        if (stream->stream_flags & STREAM_FIN_REACHED)
4195            return SSR_DATA_READ;
4196        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
4197        {
4198            n_bytes = stream->data_in->di_if->di_readable_bytes(
4199                                    stream->data_in, stream->read_offset);
4200            if (stream->read_offset + n_bytes == stream->sm_fin_off)
4201            {
4202                stream->stream_flags |= STREAM_DATA_RECVD;
4203                return SSR_DATA_RECVD;
4204            }
4205            else
4206                return SSR_SIZE_KNOWN;
4207        }
4208        else
4209            return SSR_DATA_RECVD;
4210    }
4211    else if (stream->stream_flags & STREAM_RST_READ)
4212        return SSR_RESET_READ;
4213    else
4214        return SSR_RESET_RECVD;
4215}
4216
4217
4218void
4219lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
4220{
4221    struct hq_filter *const filter = &stream->sm_hq_filter;
4222
4223    assert(stream->sm_qflags & SMQF_QPACK_DEC);
4224    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
4225
4226    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
4227    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
4228    LSQ_DEBUG("QPACK decoder unblocked");
4229}
4230
4231
4232int
4233lsquic_stream_is_rejected (const struct lsquic_stream *stream)
4234{
4235    return stream->stream_flags & STREAM_SS_RECVD;
4236}
4237
4238
4239int
4240lsquic_stream_can_push (const struct lsquic_stream *stream)
4241{
4242    if (lsquic_stream_is_pushed(stream))
4243        return 0;
4244    else if (stream->sm_bflags & SMBF_IETF)
4245        return (stream->sm_bflags & SMBF_USE_HEADERS)
4246            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
4247            && stream->sm_send_headers_state == SSHS_BEGIN
4248            ;
4249    else
4250        return 1;
4251}
4252
4253
4254static size_t
4255dp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4256{
4257    struct lsquic_stream *const stream = lsqr_ctx;
4258    unsigned char *dst = buf;
4259    unsigned char *const end = buf + count;
4260    size_t len;
4261
4262    len = MIN((size_t) (stream->sm_dup_push_len - stream->sm_dup_push_off),
4263                                                        (size_t) (end - dst));
4264    memcpy(dst, stream->sm_dup_push_buf + stream->sm_dup_push_off, len);
4265    stream->sm_dup_push_off += len;
4266
4267    if (stream->sm_dup_push_len == stream->sm_dup_push_off)
4268        LSQ_DEBUG("finish writing duplicate push");
4269
4270    return len;
4271}
4272
4273
4274static size_t
4275dp_reader_size (void *lsqr_ctx)
4276{
4277    struct lsquic_stream *const stream = lsqr_ctx;
4278
4279    return stream->sm_dup_push_len - stream->sm_dup_push_off;
4280}
4281
4282
4283static void
4284init_dp_reader (struct lsquic_stream *stream, struct lsquic_reader *reader)
4285{
4286    reader->lsqr_read = dp_reader_read;
4287    reader->lsqr_size = dp_reader_size;
4288    reader->lsqr_ctx = stream;
4289}
4290
4291
4292static void
4293on_write_dp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4294{
4295    struct lsquic_reader dp_reader;
4296    ssize_t nw;
4297    int want_write;
4298
4299    assert(stream->sm_dup_push_off < stream->sm_dup_push_len);
4300
4301    init_dp_reader(stream, &dp_reader);
4302    nw = stream_write(stream, &dp_reader);
4303    if (nw > 0)
4304    {
4305        LSQ_DEBUG("wrote %zd bytes more of duplicate push (%s)",
4306            nw, stream->sm_dup_push_off == stream->sm_dup_push_len ?
4307            "done" : "not done");
4308        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4309        {
4310            /* Restore want_write flag */
4311            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4312            if (want_write != stream->sm_saved_want_write)
4313                (void) lsquic_stream_wantwrite(stream,
4314                                                stream->sm_saved_want_write);
4315        }
4316    }
4317    else if (nw < 0)
4318    {
4319        LSQ_WARN("could not write duplicate push (wrapper)");
4320        /* XXX What should happen if we hit an error? TODO */
4321    }
4322}
4323
4324
4325int
4326lsquic_stream_duplicate_push (struct lsquic_stream *stream, uint64_t push_id)
4327{
4328    struct lsquic_reader dp_reader;
4329    unsigned bits, len;
4330    ssize_t nw;
4331
4332    assert(stream->sm_bflags & SMBF_IETF);
4333    assert(lsquic_stream_can_push(stream));
4334
4335    bits = vint_val2bits(push_id);
4336    len = 1 << bits;
4337
4338    if (!stream_activate_hq_frame(stream,
4339            stream->sm_payload + stream->sm_n_buffered, HQFT_DUPLICATE_PUSH,
4340            SHF_FIXED_SIZE, len))
4341        return -1;
4342
4343    stream->stream_flags |= STREAM_PUSHING;
4344
4345    stream->sm_dup_push_len = len;
4346    stream->sm_dup_push_off = 0;
4347    vint_write(stream->sm_dup_push_buf, push_id, bits, 1 << bits);
4348
4349    init_dp_reader(stream, &dp_reader);
4350    nw = stream_write(stream, &dp_reader);
4351    if (nw > 0)
4352    {
4353        if (stream->sm_dup_push_off == stream->sm_dup_push_len)
4354            LSQ_DEBUG("fully wrote DUPLICATE_PUSH %"PRIu64, push_id);
4355        else
4356        {
4357            LSQ_DEBUG("partially wrote DUPLICATE_PUSH %"PRIu64, push_id);
4358            stream->stream_flags |= STREAM_NOPUSH;
4359            stream->sm_saved_want_write =
4360                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4361            stream_wantwrite(stream, 1);
4362        }
4363        return 0;
4364    }
4365    else
4366    {
4367        if (nw < 0)
4368            LSQ_WARN("failure writing DUPLICATE_PUSH");
4369        stream->stream_flags |= STREAM_NOPUSH;
4370        stream->stream_flags &= ~STREAM_PUSHING;
4371        return -1;
4372    }
4373}
4374
4375
4376static size_t
4377pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4378{
4379    struct push_promise *const promise = lsqr_ctx;
4380    unsigned char *dst = buf;
4381    unsigned char *const end = buf + count;
4382    size_t len;
4383
4384    while (dst < end)
4385    {
4386        switch (promise->pp_write_state)
4387        {
4388        case PPWS_ID0:
4389        case PPWS_ID1:
4390        case PPWS_ID2:
4391        case PPWS_ID3:
4392        case PPWS_ID4:
4393        case PPWS_ID5:
4394        case PPWS_ID6:
4395        case PPWS_ID7:
4396            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
4397            ++promise->pp_write_state;
4398            break;
4399        case PPWS_PFX0:
4400            *dst++ = 0;
4401            ++promise->pp_write_state;
4402            break;
4403        case PPWS_PFX1:
4404            *dst++ = 0;
4405            ++promise->pp_write_state;
4406            break;
4407        case PPWS_HBLOCK:
4408            len = MIN(promise->pp_content_len - promise->pp_write_off,
4409                        (size_t) (end - dst));
4410            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
4411                                                                        len);
4412            promise->pp_write_off += len;
4413            dst += len;
4414            if (promise->pp_content_len == promise->pp_write_off)
4415            {
4416                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
4417                    ": reset push state", promise->pp_id);
4418                promise->pp_write_state = PPWS_DONE;
4419            }
4420            goto end;
4421        default:
4422            goto end;
4423        }
4424    }
4425
4426  end:
4427    return dst - (unsigned char *) buf;
4428}
4429
4430
4431static size_t
4432pp_reader_size (void *lsqr_ctx)
4433{
4434    struct push_promise *const promise = lsqr_ctx;
4435    size_t size;
4436
4437    size = 0;
4438    switch (promise->pp_write_state)
4439    {
4440    case PPWS_ID0:
4441    case PPWS_ID1:
4442    case PPWS_ID2:
4443    case PPWS_ID3:
4444    case PPWS_ID4:
4445    case PPWS_ID5:
4446    case PPWS_ID6:
4447    case PPWS_ID7:
4448        size += 8 - promise->pp_write_state;
4449    case PPWS_PFX0:
4450        ++size;
4451        /* fall-through */
4452    case PPWS_PFX1:
4453        ++size;
4454        /* fall-through */
4455    case PPWS_HBLOCK:
4456        size += promise->pp_content_len - promise->pp_write_off;
4457        break;
4458    default:
4459        break;
4460    }
4461
4462    return size;
4463}
4464
4465
4466static void
4467init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
4468{
4469    reader->lsqr_read = pp_reader_read;
4470    reader->lsqr_size = pp_reader_size;
4471    reader->lsqr_ctx = promise;
4472}
4473
4474
4475static void
4476on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4477{
4478    struct lsquic_reader pp_reader;
4479    struct push_promise *promise;
4480    ssize_t nw;
4481    int want_write;
4482
4483    assert(stream_is_pushing_promise(stream));
4484
4485    promise = SLIST_FIRST(&stream->sm_promises);
4486    init_pp_reader(promise, &pp_reader);
4487    nw = stream_write(stream, &pp_reader);
4488    if (nw > 0)
4489    {
4490        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
4491            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
4492        if (promise->pp_write_state == PPWS_DONE)
4493        {
4494            /* Restore want_write flag */
4495            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4496            if (want_write != stream->sm_saved_want_write)
4497                (void) lsquic_stream_wantwrite(stream,
4498                                                stream->sm_saved_want_write);
4499        }
4500    }
4501    else if (nw < 0)
4502    {
4503        LSQ_WARN("could not write push promise (wrapper)");
4504        /* XXX What should happen if we hit an error? TODO */
4505    }
4506}
4507
4508
4509/* Success means that the push promise has been placed on sm_promises list and
4510 * the stream now owns it.  Failure means that the push promise should be
4511 * destroyed by the caller.
4512 *
4513 * A push promise is written immediately.  If it cannot be written to packets
4514 * or buffered whole, the stream is marked as unable to push further promises.
4515 */
4516int
4517lsquic_stream_push_promise (struct lsquic_stream *stream,
4518                                                struct push_promise *promise)
4519{
4520    struct lsquic_reader pp_reader;
4521    unsigned bits, len;
4522    ssize_t nw;
4523
4524    assert(stream->sm_bflags & SMBF_IETF);
4525    assert(lsquic_stream_can_push(stream));
4526
4527    bits = vint_val2bits(promise->pp_id);
4528    len = 1 << bits;
4529    promise->pp_write_state = 8 - len;
4530    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
4531                                                            bits, 1 << bits);
4532
4533    if (!stream_activate_hq_frame(stream,
4534                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
4535                SHF_FIXED_SIZE, pp_reader_size(promise)))
4536        return -1;
4537
4538    stream->stream_flags |= STREAM_PUSHING;
4539
4540    init_pp_reader(promise, &pp_reader);
4541    nw = stream_write(stream, &pp_reader);
4542    if (nw > 0)
4543    {
4544        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
4545        ++promise->pp_refcnt;
4546        if (promise->pp_write_state == PPWS_DONE)
4547            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
4548        else
4549        {
4550            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
4551                ", disable further pushing", promise->pp_id,
4552                promise->pp_write_state, promise->pp_write_off);
4553            stream->stream_flags |= STREAM_NOPUSH;
4554            stream->sm_saved_want_write =
4555                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4556            stream_wantwrite(stream, 1);
4557        }
4558        return 0;
4559    }
4560    else
4561    {
4562        if (nw < 0)
4563            LSQ_WARN("failure writing push promise");
4564        stream->stream_flags |= STREAM_NOPUSH;
4565        stream->stream_flags &= ~STREAM_PUSHING;
4566        return -1;
4567    }
4568}
4569