lsquic_stream.c revision bc520ef7
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "fiu-local.h"
30
31#include "lsquic.h"
32
33#include "lsquic_int_types.h"
34#include "lsquic_packet_common.h"
35#include "lsquic_packet_in.h"
36#include "lsquic_malo.h"
37#include "lsquic_conn_flow.h"
38#include "lsquic_rtt.h"
39#include "lsquic_sfcw.h"
40#include "lsquic_varint.h"
41#include "lsquic_hq.h"
42#include "lsquic_hash.h"
43#include "lsquic_stream.h"
44#include "lsquic_conn_public.h"
45#include "lsquic_util.h"
46#include "lsquic_mm.h"
47#include "lsquic_headers_stream.h"
48#include "lsquic_conn.h"
49#include "lsquic_data_in_if.h"
50#include "lsquic_parse.h"
51#include "lsquic_packet_out.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_senhist.h"
54#include "lsquic_pacer.h"
55#include "lsquic_cubic.h"
56#include "lsquic_bw_sampler.h"
57#include "lsquic_minmax.h"
58#include "lsquic_bbr.h"
59#include "lsquic_send_ctl.h"
60#include "lsquic_headers.h"
61#include "lsquic_ev_log.h"
62#include "lsquic_enc_sess.h"
63#include "lsqpack.h"
64#include "lsquic_frab_list.h"
65#include "lsquic_http1x_if.h"
66#include "lsquic_qdec_hdl.h"
67#include "lsquic_qenc_hdl.h"
68#include "lsquic_byteswap.h"
69#include "lsquic_ietf.h"
70#include "lsquic_push_promise.h"
71
72#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
73#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(stream->conn_pub->lconn)
74#define LSQUIC_LOG_STREAM_ID stream->id
75#include "lsquic_logger.h"
76
77#define MIN(a, b) ((a) < (b) ? (a) : (b))
78
79static void
80drop_frames_in (lsquic_stream_t *stream);
81
82static void
83maybe_schedule_call_on_close (lsquic_stream_t *stream);
84
85static int
86stream_wantread (lsquic_stream_t *stream, int is_want);
87
88static int
89stream_wantwrite (lsquic_stream_t *stream, int is_want);
90
91static ssize_t
92stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
93
94static ssize_t
95save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
96
97static int
98stream_flush (lsquic_stream_t *stream);
99
100static int
101stream_flush_nocheck (lsquic_stream_t *stream);
102
103static void
104maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag);
105
106enum swtp_status { SWTP_OK, SWTP_STOP, SWTP_ERROR };
107
108static enum swtp_status
109stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size);
110
111static enum swtp_status
112stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size);
113
114static enum swtp_status
115stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size);
116
117static size_t
118stream_write_avail_no_frames (struct lsquic_stream *);
119
120static size_t
121stream_write_avail_with_frames (struct lsquic_stream *);
122
123static size_t
124stream_write_avail_with_headers (struct lsquic_stream *);
125
126static int
127hq_filter_readable (struct lsquic_stream *stream);
128
129static void
130hq_decr_left (struct lsquic_stream *stream, size_t);
131
132static size_t
133hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame);
134
135static int
136stream_readable_non_http (struct lsquic_stream *stream);
137
138static int
139stream_readable_http_gquic (struct lsquic_stream *stream);
140
141static int
142stream_readable_http_ietf (struct lsquic_stream *stream);
143
144static ssize_t
145stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz);
146
147static size_t
148active_hq_frame_sizes (const struct lsquic_stream *);
149
150static void
151on_write_pp_wrapper (struct lsquic_stream *, lsquic_stream_ctx_t *);
152
153static void
154stream_hq_frame_put (struct lsquic_stream *, struct stream_hq_frame *);
155
156static size_t
157stream_hq_frame_size (const struct stream_hq_frame *);
158
159const struct stream_filter_if hq_stream_filter_if =
160{
161    .sfi_readable   = hq_filter_readable,
162    .sfi_filter_df  = hq_filter_df,
163    .sfi_decr_left  = hq_decr_left,
164};
165
166
167#if LSQUIC_KEEP_STREAM_HISTORY
168/* These values are printable ASCII characters for ease of printing the
169 * whole history in a single line of a log message.
170 *
171 * The list of events is not exhaustive: only most interesting events
172 * are recorded.
173 */
174enum stream_history_event
175{
176    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
177    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
178    SHE_REACH_FIN          =  'a',
179    SHE_BLOCKED_OUT        =  'b',
180    SHE_CREATED            =  'C',
181    SHE_FRAME_IN           =  'd',
182    SHE_FRAME_OUT          =  'D',
183    SHE_RESET              =  'e',
184    SHE_WINDOW_UPDATE      =  'E',
185    SHE_FIN_IN             =  'f',
186    SHE_FINISHED           =  'F',
187    SHE_GOAWAY_IN          =  'g',
188    SHE_USER_WRITE_HEADER  =  'h',
189    SHE_HEADERS_IN         =  'H',
190    SHE_IF_SWITCH          =  'i',
191    SHE_ONCLOSE_SCHED      =  'l',
192    SHE_ONCLOSE_CALL       =  'L',
193    SHE_ONNEW              =  'N',
194    SHE_SET_PRIO           =  'p',
195    SHE_USER_READ          =  'r',
196    SHE_SHUTDOWN_READ      =  'R',
197    SHE_RST_IN             =  's',
198    SHE_SS_IN              =  'S',
199    SHE_RST_OUT            =  't',
200    SHE_RST_ACKED          =  'T',
201    SHE_FLUSH              =  'u',
202    SHE_USER_WRITE_DATA    =  'w',
203    SHE_SHUTDOWN_WRITE     =  'W',
204    SHE_CLOSE              =  'X',
205    SHE_DELAY_SW           =  'y',
206    SHE_FORCE_FINISH       =  'Z',
207    SHE_WANTREAD_NO        =  '0',  /* "YES" must be one more than "NO" */
208    SHE_WANTREAD_YES       =  '1',
209    SHE_WANTWRITE_NO       =  '2',
210    SHE_WANTWRITE_YES      =  '3',
211};
212
213static void
214sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
215{
216    enum stream_history_event prev_event;
217    sm_hist_idx_t idx;
218    int plus;
219
220    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
221    plus = SHE_PLUS == stream->sm_hist_buf[idx];
222    idx = (idx - plus) & SM_HIST_IDX_MASK;
223    prev_event = stream->sm_hist_buf[idx];
224
225    if (prev_event == sh_event && plus)
226        return;
227
228    if (prev_event == sh_event)
229        sh_event = SHE_PLUS;
230    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
231
232    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
233        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
234                                                        stream->sm_hist_buf);
235}
236
237
238#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
239#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
240        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
241            LSQ_DEBUG("history: [%.*s]",                                    \
242                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
243                (stream)->sm_hist_buf);                                     \
244    } while (0)
245#else
246#   define SM_HISTORY_APPEND(stream, event)
247#   define SM_HISTORY_DUMP_REMAINING(stream)
248#endif
249
250
251static int
252stream_inside_callback (const lsquic_stream_t *stream)
253{
254    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
255}
256
257
258static void
259maybe_conn_to_tickable (lsquic_stream_t *stream)
260{
261    if (!stream_inside_callback(stream))
262        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
263                                           stream->conn_pub->lconn);
264}
265
266
267/* Here, "readable" means that the user is able to read from the stream. */
268static void
269maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
270{
271    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
272    {
273        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
274                                           stream->conn_pub->lconn);
275    }
276}
277
278
279/* Here, "writeable" means that data can be put into packets to be
280 * scheduled to be sent out.
281 *
282 * If `check_can_send' is false, it means that we do not need to check
283 * whether packets can be sent.  This check was already performed when
284 * we packetized stream data.
285 */
286static void
287maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
288                                                    int check_can_send)
289{
290    if (!stream_inside_callback(stream) &&
291            (!check_can_send
292             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
293          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
294    {
295        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
296                                           stream->conn_pub->lconn);
297    }
298}
299
300
301static int
302stream_stalled (const lsquic_stream_t *stream)
303{
304    return 0 == (stream->sm_qflags & (SMQF_WANT_WRITE|SMQF_WANT_READ)) &&
305           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
306                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
307}
308
309
310static size_t
311stream_stream_frame_header_sz (const struct lsquic_stream *stream,
312                                                            unsigned data_sz)
313{
314    return stream->conn_pub->lconn->cn_pf->pf_calc_stream_frame_header_sz(
315                                    stream->id, stream->tosend_off, data_sz);
316}
317
318
319static size_t
320stream_crypto_frame_header_sz (const struct lsquic_stream *stream,
321                                                            unsigned data_sz)
322{
323    return stream->conn_pub->lconn->cn_pf
324         ->pf_calc_crypto_frame_header_sz(stream->tosend_off, data_sz);
325}
326
327
328/* GQUIC-only function */
329static int
330stream_is_hsk (const struct lsquic_stream *stream)
331{
332    if (stream->sm_bflags & SMBF_IETF)
333        return 0;
334    else
335        return lsquic_stream_is_crypto(stream);
336}
337
338
339static struct lsquic_stream *
340stream_new_common (lsquic_stream_id_t id, struct lsquic_conn_public *conn_pub,
341           const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
342           enum stream_ctor_flags ctor_flags)
343{
344    struct lsquic_stream *stream;
345
346    stream = calloc(1, sizeof(*stream));
347    if (!stream)
348        return NULL;
349
350    if (ctor_flags & SCF_USE_DI_HASH)
351        stream->data_in = data_in_hash_new(conn_pub, id, 0);
352    else
353        stream->data_in = data_in_nocopy_new(conn_pub, id);
354    if (!stream->data_in)
355    {
356        free(stream);
357        return NULL;
358    }
359
360    stream->id        = id;
361    stream->stream_if = stream_if;
362    stream->conn_pub  = conn_pub;
363    stream->sm_onnew_arg = stream_if_ctx;
364    stream->sm_write_avail = stream_write_avail_no_frames;
365
366    STAILQ_INIT(&stream->sm_hq_frames);
367
368    stream->sm_bflags |= ctor_flags & ((1 << N_SMBF_FLAGS) - 1);
369    if (conn_pub->lconn->cn_flags & LSCONN_SERVER)
370        stream->sm_bflags |= SMBF_SERVER;
371
372    return stream;
373}
374
375
376lsquic_stream_t *
377lsquic_stream_new (lsquic_stream_id_t id,
378        struct lsquic_conn_public *conn_pub,
379        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
380        unsigned initial_window, uint64_t initial_send_off,
381        enum stream_ctor_flags ctor_flags)
382{
383    lsquic_cfcw_t *cfcw;
384    lsquic_stream_t *stream;
385
386    stream = stream_new_common(id, conn_pub, stream_if, stream_if_ctx,
387                                                                ctor_flags);
388    if (!stream)
389        return NULL;
390
391    if (!initial_window)
392        initial_window = 16 * 1024;
393
394    if (ctor_flags & SCF_IETF)
395    {
396        cfcw = &conn_pub->cfcw;
397        stream->sm_bflags |= SMBF_CONN_LIMITED;
398        if (ctor_flags & SCF_HTTP)
399        {
400            stream->sm_write_avail = stream_write_avail_with_headers;
401            stream->sm_readable = stream_readable_http_ietf;
402            stream->sm_sfi = &hq_stream_filter_if;
403        }
404        else
405            stream->sm_readable = stream_readable_non_http;
406        lsquic_stream_set_priority_internal(stream,
407                                            LSQUIC_STREAM_DEFAULT_PRIO);
408        stream->sm_write_to_packet = stream_write_to_packet_std;
409        stream->sm_frame_header_sz = stream_stream_frame_header_sz;
410    }
411    else
412    {
413        if (ctor_flags & SCF_CRITICAL)
414            cfcw = NULL;
415        else
416        {
417            cfcw = &conn_pub->cfcw;
418            stream->sm_bflags |= SMBF_CONN_LIMITED;
419            lsquic_stream_set_priority_internal(stream,
420                                                LSQUIC_STREAM_DEFAULT_PRIO);
421        }
422        if (stream->sm_bflags & SMBF_USE_HEADERS)
423            stream->sm_readable = stream_readable_http_gquic;
424        else
425            stream->sm_readable = stream_readable_non_http;
426        if (ctor_flags & SCF_CRYPTO_FRAMES)
427        {
428            stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
429            stream->sm_write_to_packet = stream_write_to_packet_crypto;
430        }
431        else
432        {
433            if (stream_is_hsk(stream))
434                stream->sm_write_to_packet = stream_write_to_packet_hsk;
435            else
436                stream->sm_write_to_packet = stream_write_to_packet_std;
437            stream->sm_frame_header_sz = stream_stream_frame_header_sz;
438        }
439    }
440
441    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
442    stream->max_send_off = initial_send_off;
443    LSQ_DEBUG("created stream");
444    SM_HISTORY_APPEND(stream, SHE_CREATED);
445    if (ctor_flags & SCF_CALL_ON_NEW)
446        lsquic_stream_call_on_new(stream);
447    return stream;
448}
449
450
451struct lsquic_stream *
452lsquic_stream_new_crypto (enum enc_level enc_level,
453        struct lsquic_conn_public *conn_pub,
454        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
455        enum stream_ctor_flags ctor_flags)
456{
457    struct lsquic_stream *stream;
458    lsquic_stream_id_t stream_id;
459
460    assert(ctor_flags & SCF_CRITICAL);
461
462    fiu_return_on("stream/new_crypto", NULL);
463
464    stream_id = ~0ULL - enc_level;
465    stream = stream_new_common(stream_id, conn_pub, stream_if,
466                                                stream_if_ctx, ctor_flags);
467    if (!stream)
468        return NULL;
469
470    stream->sm_bflags |= SMBF_CRYPTO|SMBF_IETF;
471    stream->sm_enc_level = enc_level;
472    /* TODO: why have limit in crypto stream?  Set it to UINT64_MAX? */
473    lsquic_sfcw_init(&stream->fc, 16 * 1024, NULL, conn_pub, stream_id);
474    stream->max_send_off = 16 * 1024;
475    LSQ_DEBUG("created crypto stream");
476    SM_HISTORY_APPEND(stream, SHE_CREATED);
477    stream->sm_frame_header_sz = stream_crypto_frame_header_sz;
478    stream->sm_write_to_packet = stream_write_to_packet_crypto;
479    stream->sm_readable = stream_readable_non_http;
480    if (ctor_flags & SCF_CALL_ON_NEW)
481        lsquic_stream_call_on_new(stream);
482    return stream;
483}
484
485
486void
487lsquic_stream_call_on_new (lsquic_stream_t *stream)
488{
489    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
490    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
491    {
492        LSQ_DEBUG("calling on_new_stream");
493        SM_HISTORY_APPEND(stream, SHE_ONNEW);
494        stream->stream_flags |= STREAM_ONNEW_DONE;
495        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
496                                                          stream);
497    }
498}
499
500
501static void
502decr_conn_cap (struct lsquic_stream *stream, size_t incr)
503{
504    if (stream->sm_bflags & SMBF_CONN_LIMITED)
505    {
506        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
507        stream->conn_pub->conn_cap.cc_sent -= incr;
508    }
509}
510
511
512static void
513maybe_resize_stream_buffer (struct lsquic_stream *stream)
514{
515    assert(0 == stream->sm_n_buffered);
516
517    if (stream->sm_n_allocated < stream->conn_pub->path->np_pack_size)
518    {
519        free(stream->sm_buf);
520        stream->sm_buf = NULL;
521        stream->sm_n_allocated = 0;
522    }
523    else if (stream->sm_n_allocated > stream->conn_pub->path->np_pack_size)
524        stream->sm_n_allocated = stream->conn_pub->path->np_pack_size;
525}
526
527
528static void
529drop_buffered_data (struct lsquic_stream *stream)
530{
531    decr_conn_cap(stream, stream->sm_n_buffered);
532    stream->sm_n_buffered = 0;
533    maybe_resize_stream_buffer(stream);
534    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
535        maybe_remove_from_write_q(stream, SMQF_WRITE_Q_FLAGS);
536}
537
538
539static void
540destroy_uh (struct lsquic_stream *stream)
541{
542    if (stream->uh)
543    {
544        if (stream->uh->uh_hset)
545            stream->conn_pub->enpub->enp_hsi_if
546                            ->hsi_discard_header_set(stream->uh->uh_hset);
547        free(stream->uh);
548        stream->uh = NULL;
549    }
550}
551
552
553void
554lsquic_stream_destroy (lsquic_stream_t *stream)
555{
556    struct push_promise *promise;
557    struct stream_hq_frame *shf;
558
559    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
560    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
561                                                            STREAM_ONNEW_DONE)
562    {
563        stream->stream_flags |= STREAM_ONCLOSE_DONE;
564        stream->stream_if->on_close(stream, stream->st_ctx);
565    }
566    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
567        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
568    if (stream->sm_qflags & SMQF_WANT_READ)
569        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
570    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
571        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
572    if (stream->sm_qflags & SMQF_SERVICE_FLAGS)
573        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
574    if (stream->sm_qflags & SMQF_QPACK_DEC)
575        lsquic_qdh_unref_stream(stream->conn_pub->u.ietf.qdh, stream);
576    drop_buffered_data(stream);
577    lsquic_sfcw_consume_rem(&stream->fc);
578    drop_frames_in(stream);
579    if (stream->push_req)
580    {
581        if (stream->push_req->uh_hset)
582            stream->conn_pub->enpub->enp_hsi_if
583                            ->hsi_discard_header_set(stream->push_req->uh_hset);
584        free(stream->push_req);
585    }
586    while ((promise = SLIST_FIRST(&stream->sm_promises)))
587    {
588        SLIST_REMOVE_HEAD(&stream->sm_promises, pp_next);
589        lsquic_pp_put(promise, stream->conn_pub->u.ietf.promises);
590    }
591    if (stream->sm_promise)
592    {
593        assert(stream->sm_promise->pp_pushed_stream == stream);
594        stream->sm_promise->pp_pushed_stream = NULL;
595        lsquic_pp_put(stream->sm_promise, stream->conn_pub->u.ietf.promises);
596    }
597    while ((shf = STAILQ_FIRST(&stream->sm_hq_frames)))
598        stream_hq_frame_put(stream, shf);
599    destroy_uh(stream);
600    free(stream->sm_buf);
601    free(stream->sm_header_block);
602    LSQ_DEBUG("destroyed stream");
603    SM_HISTORY_DUMP_REMAINING(stream);
604    free(stream);
605}
606
607
608static int
609stream_is_finished (const lsquic_stream_t *stream)
610{
611    return lsquic_stream_is_closed(stream)
612           /* n_unacked checks that no outgoing packets that reference this
613            * stream are outstanding:
614            */
615        && 0 == stream->n_unacked
616           /* This checks that no packets that reference this stream will
617            * become outstanding:
618            */
619        && 0 == (stream->sm_qflags & SMQF_SEND_RST)
620        && ((stream->stream_flags & STREAM_FORCE_FINISH)
621          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
622}
623
624
625/* This is an internal function */
626void
627lsquic_stream_force_finish (struct lsquic_stream *stream)
628{
629    LSQ_DEBUG("stream is now finished");
630    SM_HISTORY_APPEND(stream, SHE_FINISHED);
631    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
632        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
633                                                next_service_stream);
634    stream->sm_qflags |= SMQF_FREE_STREAM;
635    stream->stream_flags |= STREAM_FINISHED;
636}
637
638
639static void
640maybe_finish_stream (lsquic_stream_t *stream)
641{
642    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
643                                                    stream_is_finished(stream))
644        lsquic_stream_force_finish(stream);
645}
646
647
648static void
649maybe_schedule_call_on_close (lsquic_stream_t *stream)
650{
651    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
652                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
653            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
654            && !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
655    {
656        if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
657            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
658                                                    next_service_stream);
659        stream->sm_qflags |= SMQF_CALL_ONCLOSE;
660        LSQ_DEBUG("scheduled calling on_close");
661        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
662    }
663}
664
665
666void
667lsquic_stream_call_on_close (lsquic_stream_t *stream)
668{
669    assert(stream->stream_flags & STREAM_ONNEW_DONE);
670    stream->sm_qflags &= ~SMQF_CALL_ONCLOSE;
671    if (!(stream->sm_qflags & SMQF_SERVICE_FLAGS))
672        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
673                                                    next_service_stream);
674    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
675    {
676        LSQ_DEBUG("calling on_close");
677        stream->stream_flags |= STREAM_ONCLOSE_DONE;
678        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
679        stream->stream_if->on_close(stream, stream->st_ctx);
680    }
681    else
682        assert(0);
683}
684
685
686static int
687stream_has_frame_at_read_offset (struct lsquic_stream *stream)
688{
689    if (!((stream->stream_flags & STREAM_CACHED_FRAME)
690                    && stream->read_offset == stream->sm_last_frame_off))
691    {
692        stream->sm_has_frame = stream->data_in->di_if->di_get_frame(
693                                stream->data_in, stream->read_offset) != NULL;
694        stream->sm_last_frame_off = stream->read_offset;
695        stream->stream_flags |= STREAM_CACHED_FRAME;
696    }
697    return stream->sm_has_frame;
698}
699
700
701static int
702stream_readable_non_http (struct lsquic_stream *stream)
703{
704    return stream_has_frame_at_read_offset(stream);
705}
706
707
708static int
709stream_readable_http_gquic (struct lsquic_stream *stream)
710{
711    return (stream->stream_flags & STREAM_HAVE_UH)
712        && (stream->uh
713            || stream_has_frame_at_read_offset(stream));
714}
715
716
717static int
718stream_readable_http_ietf (struct lsquic_stream *stream)
719{
720    return
721        /* If we have read the header set and the header set has not yet
722         * been read, the stream is readable.
723         */
724        ((stream->stream_flags & STREAM_HAVE_UH) && stream->uh)
725        ||
726        /* Alternatively, run the filter and check for payload availability. */
727        (stream->sm_sfi->sfi_readable(stream)
728            && (/* Running the filter may result in hitting FIN: */
729                (stream->stream_flags & STREAM_FIN_REACHED)
730                || stream_has_frame_at_read_offset(stream)));
731}
732
733
734int
735lsquic_stream_readable (struct lsquic_stream *stream)
736{
737    /* A stream is readable if one of the following is true: */
738    return
739        /* - It is already finished: in that case, lsquic_stream_read() will
740         *   return 0.
741         */
742            (stream->stream_flags & STREAM_FIN_REACHED)
743        /* - The stream is reset, by either side.  In this case,
744         *   lsquic_stream_read() will return -1 (we want the user to be
745         *   able to collect the error).
746         */
747        ||  lsquic_stream_is_reset(stream)
748        /* Type-dependent readability check: */
749        ||  stream->sm_readable(stream);
750    ;
751}
752
753
754static size_t
755stream_write_avail_no_frames (struct lsquic_stream *stream)
756{
757    uint64_t stream_avail, conn_avail;
758
759    stream_avail = stream->max_send_off - stream->tosend_off
760                                                - stream->sm_n_buffered;
761
762    if (stream->sm_bflags & SMBF_CONN_LIMITED)
763    {
764        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
765        if (conn_avail < stream_avail)
766            stream_avail = conn_avail;
767    }
768
769    return stream_avail;
770}
771
772
773static size_t
774stream_write_avail_with_frames (struct lsquic_stream *stream)
775{
776    uint64_t stream_avail, conn_avail;
777    const struct stream_hq_frame *shf;
778    size_t size;
779
780    stream_avail = stream->max_send_off - stream->tosend_off
781                                                - stream->sm_n_buffered;
782    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
783        if (!(shf->shf_flags & SHF_WRITTEN))
784        {
785            size = stream_hq_frame_size(shf);
786            assert(size <= stream_avail);
787            stream_avail -= size;
788        }
789
790    if (stream->sm_bflags & SMBF_CONN_LIMITED)
791    {
792        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
793        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
794            if (!(shf->shf_flags & SHF_CC_PAID))
795            {
796                size = stream_hq_frame_size(shf);
797                if (size < conn_avail)
798                    conn_avail -= size;
799                else
800                    return 0;
801            }
802        if (conn_avail < stream_avail)
803            stream_avail = conn_avail;
804    }
805
806    if (stream_avail >= 3 /* Smallest new frame */)
807        return stream_avail;
808    else
809        return 0;
810}
811
812
813static int
814stream_is_pushing_promise (const struct lsquic_stream *stream)
815{
816    return (stream->stream_flags & STREAM_PUSHING)
817        && SLIST_FIRST(&stream->sm_promises)
818        && (SLIST_FIRST(&stream->sm_promises))->pp_write_state != PPWS_DONE
819        ;
820}
821
822
823/* To prevent deadlocks, ensure that when headers are sent, the bytes
824 * sent on the encoder stream are written first.
825 *
826 * XXX If the encoder is set up in non-risking mode, it is perfectly
827 * fine to send the header block first.  TODO: update the logic to
828 * reflect this.  There should be two sending behaviors: risk and non-risk.
829 * For now, we assume risk for everything to be on the conservative side.
830 */
831static size_t
832stream_write_avail_with_headers (struct lsquic_stream *stream)
833{
834    if (stream->stream_flags & STREAM_PUSHING)
835        return stream_write_avail_with_frames(stream);
836
837    switch (stream->sm_send_headers_state)
838    {
839    case SSHS_BEGIN:
840        return lsquic_qeh_write_avail(stream->conn_pub->u.ietf.qeh);
841    case SSHS_ENC_SENDING:
842        if (stream->sm_hb_compl >
843                            lsquic_qeh_enc_off(stream->conn_pub->u.ietf.qeh))
844            return 0;
845        LSQ_DEBUG("encoder stream bytes have all been sent");
846        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
847        /* fall-through */
848    default:
849        assert(SSHS_HBLOCK_SENDING == stream->sm_send_headers_state);
850        return stream_write_avail_with_frames(stream);
851    }
852}
853
854
855size_t
856lsquic_stream_write_avail (struct lsquic_stream *stream)
857{
858    return stream->sm_write_avail(stream);
859}
860
861
862int
863lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
864{
865    struct lsquic_conn *lconn;
866
867    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
868                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
869    {
870        if (stream->sm_bflags & SMBF_IETF)
871        {
872            lconn = stream->conn_pub->lconn;
873            lconn->cn_if->ci_abort_error(lconn, 0, TEC_FLOW_CONTROL_ERROR,
874                "flow control violation on stream %"PRIu64, stream->id);
875        }
876        return -1;
877    }
878    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
879    {
880        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
881            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
882                                                    next_send_stream);
883        stream->sm_qflags |= SMQF_SEND_WUF;
884    }
885    return 0;
886}
887
888
889int
890lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
891{
892    uint64_t max_off;
893    int got_next_offset, rv, free_frame;
894    enum ins_frame ins_frame;
895    struct lsquic_conn *lconn;
896
897    assert(frame->packet_in);
898
899    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
900    LSQ_DEBUG("received stream frame, offset 0x%"PRIX64", len %u; "
901        "fin: %d", frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
902
903    if ((stream->sm_bflags & SMBF_USE_HEADERS)
904                            && (stream->stream_flags & STREAM_HEAD_IN_FIN))
905    {
906        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
907        lsquic_malo_put(frame);
908        return -1;
909    }
910
911    if (frame->data_frame.df_fin && (stream->sm_bflags & SMBF_IETF)
912            && (stream->stream_flags & STREAM_FIN_RECVD)
913            && stream->sm_fin_off != DF_END(frame))
914    {
915        lconn = stream->conn_pub->lconn;
916        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
917            "new final size %"PRIu64" from STREAM frame (id: %"PRIu64") does "
918            "not match previous final size %"PRIu64, DF_END(frame),
919            stream->id, stream->sm_fin_off);
920        return -1;
921    }
922
923    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
924  insert_frame:
925    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
926    if (INS_FRAME_OK == ins_frame)
927    {
928        /* Update maximum offset in the flow controller and check for flow
929         * control violation:
930         */
931        rv = -1;
932        free_frame = !stream->data_in->di_if->di_own_on_ok;
933        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
934        if (0 != lsquic_stream_update_sfcw(stream, max_off))
935            goto end_ok;
936        if (frame->data_frame.df_fin)
937        {
938            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
939            stream->stream_flags |= STREAM_FIN_RECVD;
940            stream->sm_fin_off = DF_END(frame);
941            maybe_finish_stream(stream);
942        }
943        if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
944                (stream->data_in->di_flags & DI_SWITCH_IMPL))
945        {
946            stream->data_in = stream->data_in->di_if->di_switch_impl(
947                                        stream->data_in, stream->read_offset);
948            if (!stream->data_in)
949            {
950                stream->data_in = data_in_error_new();
951                goto end_ok;
952            }
953        }
954        if (got_next_offset)
955            /* Checking the offset saves di_get_frame() call */
956            maybe_conn_to_tickable_if_readable(stream);
957        rv = 0;
958  end_ok:
959        if (free_frame)
960            lsquic_malo_put(frame);
961        stream->stream_flags &= ~STREAM_CACHED_FRAME;
962        return rv;
963    }
964    else if (INS_FRAME_DUP == ins_frame)
965    {
966        return 0;
967    }
968    else if (INS_FRAME_OVERLAP == ins_frame)
969    {
970        LSQ_DEBUG("overlap: switching DATA IN implementation");
971        stream->data_in = stream->data_in->di_if->di_switch_impl(
972                                    stream->data_in, stream->read_offset);
973        if (stream->data_in)
974            goto insert_frame;
975        stream->data_in = data_in_error_new();
976        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
977        lsquic_malo_put(frame);
978        return -1;
979    }
980    else
981    {
982        assert(INS_FRAME_ERR == ins_frame);
983        return -1;
984    }
985}
986
987
988static void
989drop_frames_in (lsquic_stream_t *stream)
990{
991    if (stream->data_in)
992    {
993        stream->data_in->di_if->di_destroy(stream->data_in);
994        /* To avoid checking whether `data_in` is set, just set to the error
995         * data-in stream.  It does the right thing after incoming data is
996         * dropped.
997         */
998        stream->data_in = data_in_error_new();
999        stream->stream_flags &= ~STREAM_CACHED_FRAME;
1000    }
1001}
1002
1003
1004static void
1005maybe_elide_stream_frames (struct lsquic_stream *stream)
1006{
1007    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
1008    {
1009        if (stream->n_unacked)
1010            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
1011                                                stream->id);
1012        stream->stream_flags |= STREAM_FRAMES_ELIDED;
1013    }
1014}
1015
1016
1017int
1018lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
1019                      uint64_t error_code)
1020{
1021    struct lsquic_conn *lconn;
1022
1023    if ((stream->sm_bflags & SMBF_IETF)
1024            && (stream->stream_flags & STREAM_FIN_RECVD)
1025            && stream->sm_fin_off != offset)
1026    {
1027        lconn = stream->conn_pub->lconn;
1028        lconn->cn_if->ci_abort_error(lconn, 0, TEC_FINAL_SIZE_ERROR,
1029            "final size %"PRIu64" from RESET_STREAM frame (id: %"PRIu64") "
1030            "does not match previous final size %"PRIu64, offset,
1031            stream->id, stream->sm_fin_off);
1032        return -1;
1033    }
1034
1035    if (stream->stream_flags & STREAM_RST_RECVD)
1036    {
1037        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
1038        return 0;
1039    }
1040
1041    SM_HISTORY_APPEND(stream, SHE_RST_IN);
1042    /* This flag must always be set, even if we are "ignoring" it: it is
1043     * used by elision code.
1044     */
1045    stream->stream_flags |= STREAM_RST_RECVD;
1046
1047    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
1048    {
1049        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64" is "
1050            "smaller than that of byte following the last byte we have seen: "
1051            "0x%"PRIX64, offset,
1052            lsquic_sfcw_get_max_recv_off(&stream->fc));
1053        return -1;
1054    }
1055
1056    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
1057    {
1058        LSQ_INFO("RST_STREAM invalid: its offset 0x%"PRIX64
1059            " violates flow control", offset);
1060        return -1;
1061    }
1062
1063    /* Let user collect error: */
1064    maybe_conn_to_tickable_if_readable(stream);
1065
1066    lsquic_sfcw_consume_rem(&stream->fc);
1067    drop_frames_in(stream);
1068    drop_buffered_data(stream);
1069    maybe_elide_stream_frames(stream);
1070
1071    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1072                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1073        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
1074
1075    stream->stream_flags |= STREAM_RST_RECVD;
1076
1077    maybe_finish_stream(stream);
1078    maybe_schedule_call_on_close(stream);
1079
1080    return 0;
1081}
1082
1083
1084void
1085lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
1086                                                        uint64_t error_code)
1087{
1088    if (stream->stream_flags & STREAM_SS_RECVD)
1089    {
1090        LSQ_DEBUG("ignore duplicate STOP_SENDING frame");
1091        return;
1092    }
1093
1094    SM_HISTORY_APPEND(stream, SHE_SS_IN);
1095    stream->stream_flags |= STREAM_SS_RECVD;
1096
1097    /* Let user collect error: */
1098    maybe_conn_to_tickable_if_readable(stream);
1099
1100    lsquic_sfcw_consume_rem(&stream->fc);
1101    drop_frames_in(stream);
1102    drop_buffered_data(stream);
1103    maybe_elide_stream_frames(stream);
1104
1105    if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
1106                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1107        lsquic_stream_reset_ext(stream, error_code, 0);
1108
1109    maybe_finish_stream(stream);
1110    maybe_schedule_call_on_close(stream);
1111}
1112
1113
1114uint64_t
1115lsquic_stream_fc_recv_off_const (const struct lsquic_stream *stream)
1116{
1117    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
1118}
1119
1120
1121void
1122lsquic_stream_max_stream_data_sent (struct lsquic_stream *stream)
1123{
1124    assert(stream->sm_qflags & SMQF_SEND_MAX_STREAM_DATA);
1125    stream->sm_qflags &= ~SMQF_SEND_MAX_STREAM_DATA;
1126    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1127        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1128    stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1129}
1130
1131
1132uint64_t
1133lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
1134{
1135    assert(stream->sm_qflags & SMQF_SEND_WUF);
1136    stream->sm_qflags &= ~SMQF_SEND_WUF;
1137    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1138        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1139    return stream->sm_last_recv_off = lsquic_sfcw_get_fc_recv_off(&stream->fc);
1140}
1141
1142
1143void
1144lsquic_stream_peer_blocked (struct lsquic_stream *stream, uint64_t peer_off)
1145{
1146    uint64_t last_off;
1147
1148    if (stream->sm_last_recv_off)
1149        last_off = stream->sm_last_recv_off;
1150    else
1151        /* This gets advertized in transport parameters */
1152        last_off = lsquic_sfcw_get_max_recv_off(&stream->fc);
1153
1154    LSQ_DEBUG("Peer blocked at %"PRIu64", while the last MAX_STREAM_DATA "
1155        "frame we sent advertized the limit of %"PRIu64, peer_off, last_off);
1156
1157    if (peer_off > last_off && !(stream->sm_qflags & SMQF_SEND_WUF))
1158    {
1159        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1160            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1161                                                    next_send_stream);
1162        stream->sm_qflags |= SMQF_SEND_WUF;
1163        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1164    }
1165    else if (stream->sm_qflags & SMQF_SEND_WUF)
1166        LSQ_DEBUG("MAX_STREAM_DATA frame is already scheduled");
1167    else if (stream->sm_last_recv_off)
1168        LSQ_DEBUG("MAX_STREAM_DATA(%"PRIu64") has already been either "
1169            "packetized or sent", stream->sm_last_recv_off);
1170    else
1171        LSQ_INFO("Peer should have receive transport param limit "
1172            "of %"PRIu64"; odd.", last_off);
1173}
1174
1175
1176/* GQUIC's BLOCKED frame does not have an offset */
1177void
1178lsquic_stream_peer_blocked_gquic (struct lsquic_stream *stream)
1179{
1180    LSQ_DEBUG("Peer blocked: schedule another WINDOW_UPDATE frame");
1181    if (!(stream->sm_qflags & SMQF_SEND_WUF))
1182    {
1183        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1184            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1185                                                    next_send_stream);
1186        stream->sm_qflags |= SMQF_SEND_WUF;
1187        LSQ_DEBUG("marked to send MAX_STREAM_DATA frame");
1188    }
1189    else
1190        LSQ_DEBUG("WINDOW_UPDATE frame is already scheduled");
1191}
1192
1193
1194void
1195lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
1196{
1197    assert(stream->sm_qflags & SMQF_SEND_BLOCKED);
1198    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
1199    stream->sm_qflags &= ~SMQF_SEND_BLOCKED;
1200    stream->stream_flags |= STREAM_BLOCKED_SENT;
1201    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1202        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1203}
1204
1205
1206void
1207lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
1208{
1209    assert(stream->sm_qflags & SMQF_SEND_RST);
1210    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
1211    stream->sm_qflags &= ~SMQF_SEND_RST;
1212    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1213        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
1214    stream->stream_flags |= STREAM_RST_SENT;
1215    maybe_finish_stream(stream);
1216}
1217
1218
1219static size_t
1220read_uh (struct lsquic_stream *stream,
1221        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1222{
1223    struct http1x_headers *const h1h = stream->uh->uh_hset;
1224    size_t nread;
1225
1226    nread = readf(ctx, (unsigned char *) h1h->h1h_buf + h1h->h1h_off,
1227                  h1h->h1h_size - h1h->h1h_off,
1228                  (stream->stream_flags & STREAM_HEAD_IN_FIN) > 0);
1229    h1h->h1h_off += nread;
1230    if (h1h->h1h_off == h1h->h1h_size)
1231    {
1232        LSQ_DEBUG("read all uncompressed headers");
1233        destroy_uh(stream);
1234        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
1235        {
1236            stream->stream_flags |= STREAM_FIN_REACHED;
1237            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
1238        }
1239    }
1240    return nread;
1241}
1242
1243
1244static void
1245verify_cl_on_fin (struct lsquic_stream *stream)
1246{
1247    struct lsquic_conn *lconn;
1248
1249    /* The rules in RFC7230, Section 3.3.2 are a bit too intricate.  We take
1250     * a simple approach and verify content-length only when there was any
1251     * payload at all.
1252     */
1253    if (stream->sm_data_in != 0 && stream->sm_cont_len != stream->sm_data_in)
1254    {
1255        lconn = stream->conn_pub->lconn;
1256        lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR,
1257            "number of bytes in DATA frames of stream %"PRIu64" is %llu, "
1258            "while content-length specified of %llu", stream->id,
1259            stream->sm_data_in, stream->sm_cont_len);
1260    }
1261}
1262
1263
1264static void
1265stream_consumed_bytes (struct lsquic_stream *stream)
1266{
1267    lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
1268    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
1269    {
1270        if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
1271            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1272                                                            next_send_stream);
1273        stream->sm_qflags |= SMQF_SEND_WUF;
1274        maybe_conn_to_tickable_if_writeable(stream, 1);
1275    }
1276}
1277
1278
1279struct read_frames_status
1280{
1281    int     error;
1282    int     processed_frames;
1283    size_t  total_nread;
1284};
1285
1286
1287static struct read_frames_status
1288read_data_frames (struct lsquic_stream *stream, int do_filtering,
1289        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1290{
1291    struct data_frame *data_frame;
1292    size_t nread, toread, total_nread;
1293    int short_read, processed_frames;
1294
1295    processed_frames = 0;
1296    total_nread = 0;
1297
1298    while ((data_frame = stream->data_in->di_if->di_get_frame(
1299                                        stream->data_in, stream->read_offset)))
1300    {
1301
1302        ++processed_frames;
1303
1304        do
1305        {
1306            if (do_filtering && stream->sm_sfi)
1307                toread = stream->sm_sfi->sfi_filter_df(stream, data_frame);
1308            else
1309                toread = data_frame->df_size - data_frame->df_read_off;
1310
1311            if (toread || data_frame->df_fin)
1312            {
1313                nread = readf(ctx, data_frame->df_data + data_frame->df_read_off,
1314                                                     toread, data_frame->df_fin);
1315                if (do_filtering && stream->sm_sfi)
1316                    stream->sm_sfi->sfi_decr_left(stream, nread);
1317                data_frame->df_read_off += nread;
1318                stream->read_offset += nread;
1319                total_nread += nread;
1320                short_read = nread < toread;
1321            }
1322            else
1323                short_read = 0;
1324
1325            if (data_frame->df_read_off == data_frame->df_size)
1326            {
1327                const int fin = data_frame->df_fin;
1328                stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
1329                data_frame = NULL;
1330                if ((stream->sm_bflags & SMBF_AUTOSWITCH) &&
1331                        (stream->data_in->di_flags & DI_SWITCH_IMPL))
1332                {
1333                    stream->data_in = stream->data_in->di_if->di_switch_impl(
1334                                                stream->data_in, stream->read_offset);
1335                    if (!stream->data_in)
1336                    {
1337                        stream->data_in = data_in_error_new();
1338                        return (struct read_frames_status) { .error = 1, };
1339                    }
1340                }
1341                if (fin)
1342                {
1343                    stream->stream_flags |= STREAM_FIN_REACHED;
1344                    if (stream->sm_bflags & SMBF_VERIFY_CL)
1345                        verify_cl_on_fin(stream);
1346                    goto end_while;
1347                }
1348            }
1349            else if (short_read)
1350                goto end_while;
1351        }
1352        while (data_frame);
1353    }
1354  end_while:
1355
1356    if (processed_frames)
1357        stream_consumed_bytes(stream);
1358
1359    return (struct read_frames_status) {
1360        .error = 0,
1361        .processed_frames = processed_frames,
1362        .total_nread = total_nread,
1363    };
1364}
1365
1366
1367static ssize_t
1368stream_readf (struct lsquic_stream *stream,
1369        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1370{
1371    size_t total_nread, nread;
1372    int read_unc_headers;
1373
1374    total_nread = 0;
1375
1376    if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
1377                                            == (SMBF_USE_HEADERS|SMBF_IETF)
1378            && !(stream->stream_flags & STREAM_HAVE_UH)
1379            && !stream->uh)
1380    {
1381        if (stream->sm_readable(stream))
1382        {
1383            if (stream->sm_hq_filter.hqfi_flags & HQFI_FLAG_ERROR)
1384            {
1385                LSQ_INFO("HQ filter hit an error: cannot read from stream");
1386                errno = EBADMSG;
1387                return -1;
1388            }
1389            assert(stream->uh);
1390        }
1391        else
1392        {
1393            errno = EWOULDBLOCK;
1394            return -1;
1395        }
1396    }
1397
1398    if (stream->uh)
1399    {
1400        if (stream->uh->uh_flags & UH_H1H)
1401        {
1402            nread = read_uh(stream, readf, ctx);
1403            read_unc_headers = nread > 0;
1404            total_nread += nread;
1405            if (stream->uh)
1406                return total_nread;
1407        }
1408        else
1409        {
1410            LSQ_INFO("header set not claimed: cannot read from stream");
1411            return -1;
1412        }
1413    }
1414    else if ((stream->sm_bflags & SMBF_USE_HEADERS)
1415                                && !(stream->stream_flags & STREAM_HAVE_UH))
1416    {
1417        LSQ_DEBUG("cannot read: headers not available");
1418        errno = EWOULDBLOCK;
1419        return -1;
1420    }
1421    else
1422        read_unc_headers = 0;
1423
1424    const struct read_frames_status rfs
1425                        = read_data_frames(stream, 1, readf, ctx);
1426    if (rfs.error)
1427        return -1;
1428    total_nread += rfs.total_nread;
1429
1430    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
1431                                        total_nread, stream->read_offset);
1432
1433    if (rfs.processed_frames || read_unc_headers)
1434    {
1435        return total_nread;
1436    }
1437    else
1438    {
1439        assert(0 == total_nread);
1440        errno = EWOULDBLOCK;
1441        return -1;
1442    }
1443}
1444
1445
1446/* This function returns 0 when EOF is reached.
1447 */
1448ssize_t
1449lsquic_stream_readf (struct lsquic_stream *stream,
1450        size_t (*readf)(void *, const unsigned char *, size_t, int), void *ctx)
1451{
1452    SM_HISTORY_APPEND(stream, SHE_USER_READ);
1453
1454    if (lsquic_stream_is_reset(stream))
1455    {
1456        if (stream->stream_flags & STREAM_RST_RECVD)
1457            stream->stream_flags |= STREAM_RST_READ;
1458        errno = ECONNRESET;
1459        return -1;
1460    }
1461    if (stream->stream_flags & STREAM_U_READ_DONE)
1462    {
1463        errno = EBADF;
1464        return -1;
1465    }
1466    if (stream->stream_flags & STREAM_FIN_REACHED)
1467    {
1468       if (stream->sm_bflags & SMBF_USE_HEADERS)
1469       {
1470            if ((stream->stream_flags & STREAM_HAVE_UH) && !stream->uh)
1471                return 0;
1472       }
1473       else
1474           return 0;
1475    }
1476
1477    return stream_readf(stream, readf, ctx);
1478}
1479
1480
1481struct readv_ctx
1482{
1483    const struct iovec        *iov;
1484    const struct iovec *const  end;
1485    unsigned char             *p;
1486};
1487
1488
1489static size_t
1490readv_f (void *ctx_p, const unsigned char *buf, size_t len, int fin)
1491{
1492    struct readv_ctx *const ctx = ctx_p;
1493    const unsigned char *const end = buf + len;
1494    size_t ntocopy;
1495
1496    while (ctx->iov < ctx->end && buf < end)
1497    {
1498        ntocopy = (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len
1499                                                                    - ctx->p;
1500        if (ntocopy > (size_t) (end - buf))
1501            ntocopy = end - buf;
1502        memcpy(ctx->p, buf, ntocopy);
1503        ctx->p += ntocopy;
1504        buf += ntocopy;
1505        if (ctx->p == (unsigned char *) ctx->iov->iov_base + ctx->iov->iov_len)
1506        {
1507            do
1508                ++ctx->iov;
1509            while (ctx->iov < ctx->end && ctx->iov->iov_len == 0);
1510            if (ctx->iov < ctx->end)
1511                ctx->p = ctx->iov->iov_base;
1512            else
1513                break;
1514        }
1515    }
1516
1517    return len - (end - buf);
1518}
1519
1520
1521ssize_t
1522lsquic_stream_readv (struct lsquic_stream *stream, const struct iovec *iov,
1523                     int iovcnt)
1524{
1525    struct readv_ctx ctx = { iov, iov + iovcnt, iov->iov_base, };
1526    return lsquic_stream_readf(stream, readv_f, &ctx);
1527}
1528
1529
1530ssize_t
1531lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
1532{
1533    struct iovec iov = { .iov_base = buf, .iov_len = len, };
1534    return lsquic_stream_readv(stream, &iov, 1);
1535}
1536
1537
1538static void
1539stream_shutdown_read (lsquic_stream_t *stream)
1540{
1541    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1542    {
1543        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
1544        stream->stream_flags |= STREAM_U_READ_DONE;
1545        stream_wantread(stream, 0);
1546        maybe_finish_stream(stream);
1547    }
1548}
1549
1550
1551static int
1552stream_is_incoming_unidir (const struct lsquic_stream *stream)
1553{
1554    enum stream_id_type sit;
1555
1556    if (stream->sm_bflags & SMBF_IETF)
1557    {
1558        sit = stream->id & SIT_MASK;
1559        if (stream->sm_bflags & SMBF_SERVER)
1560            return sit == SIT_UNI_CLIENT;
1561        else
1562            return sit == SIT_UNI_SERVER;
1563    }
1564    else
1565        return 0;
1566}
1567
1568
1569static void
1570stream_shutdown_write (lsquic_stream_t *stream)
1571{
1572    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1573        return;
1574
1575    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
1576    stream->stream_flags |= STREAM_U_WRITE_DONE;
1577    stream_wantwrite(stream, 0);
1578
1579    /* Don't bother to check whether there is anything else to write if
1580     * the flags indicate that nothing else should be written.
1581     */
1582    if (!(stream->sm_bflags & SMBF_CRYPTO)
1583            && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
1584                && !stream_is_incoming_unidir(stream)
1585                                    && !(stream->sm_qflags & SMQF_SEND_RST))
1586    {
1587        if ((stream->sm_bflags & SMBF_USE_HEADERS)
1588                && !(stream->stream_flags & STREAM_HEADERS_SENT))
1589        {
1590            LSQ_DEBUG("headers not sent, send a reset");
1591            lsquic_stream_reset(stream, 0);
1592        }
1593        else if (stream->sm_n_buffered == 0)
1594        {
1595            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
1596                                                 stream))
1597            {
1598                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
1599                stream->stream_flags |= STREAM_FIN_SENT;
1600            }
1601            else
1602            {
1603                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
1604                          "flag in it");
1605                (void) stream_flush_nocheck(stream);
1606            }
1607        }
1608        else
1609            (void) stream_flush_nocheck(stream);
1610    }
1611}
1612
1613
1614static void
1615maybe_stream_shutdown_write (struct lsquic_stream *stream)
1616{
1617    if (stream->sm_send_headers_state == SSHS_BEGIN)
1618        stream_shutdown_write(stream);
1619    else if (0 == (stream->stream_flags & STREAM_DELAYED_SW))
1620    {
1621        LSQ_DEBUG("shutdown delayed");
1622        SM_HISTORY_APPEND(stream, SHE_DELAY_SW);
1623        stream->stream_flags |= STREAM_DELAYED_SW;
1624    }
1625}
1626
1627
1628int
1629lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
1630{
1631    LSQ_DEBUG("shutdown; how: %d", how);
1632    if (lsquic_stream_is_closed(stream))
1633    {
1634        LSQ_INFO("Attempt to shut down a closed stream");
1635        errno = EBADF;
1636        return -1;
1637    }
1638    /* 0: read, 1: write: 2: read and write
1639     */
1640    if (how < 0 || how > 2)
1641    {
1642        errno = EINVAL;
1643        return -1;
1644    }
1645
1646    if (how)
1647        maybe_stream_shutdown_write(stream);
1648    if (how != 1)
1649        stream_shutdown_read(stream);
1650
1651    maybe_finish_stream(stream);
1652    maybe_schedule_call_on_close(stream);
1653    if (how && !(stream->stream_flags & STREAM_DELAYED_SW))
1654        maybe_conn_to_tickable_if_writeable(stream, 1);
1655
1656    return 0;
1657}
1658
1659
1660void
1661lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1662{
1663    LSQ_DEBUG("internal shutdown");
1664    if (lsquic_stream_is_critical(stream))
1665    {
1666        LSQ_DEBUG("add flag to force-finish special stream");
1667        stream->stream_flags |= STREAM_FORCE_FINISH;
1668        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1669    }
1670    maybe_finish_stream(stream);
1671    maybe_schedule_call_on_close(stream);
1672}
1673
1674
1675static void
1676fake_reset_unused_stream (lsquic_stream_t *stream)
1677{
1678    stream->stream_flags |=
1679        STREAM_RST_RECVD    /* User will pick this up on read or write */
1680      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1681    ;
1682
1683    /* Cancel all writes to the network scheduled for this stream: */
1684    if (stream->sm_qflags & SMQF_SENDING_FLAGS)
1685        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1686                                                next_send_stream);
1687    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
1688    drop_buffered_data(stream);
1689    LSQ_DEBUG("fake-reset stream%s",
1690                    stream_stalled(stream) ? " (stalled)" : "");
1691    maybe_finish_stream(stream);
1692    maybe_schedule_call_on_close(stream);
1693}
1694
1695
1696/* This function should only be called for locally-initiated streams whose ID
1697 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1698 * frame sent by peer but we have not yet received it and created a stream.
1699 * In this situation, we mark the stream as reset, so that user's on_read or
1700 * on_write event callback picks up the error.  That, in turn, should result
1701 * in stream being closed.
1702 *
1703 * If we have received any data frames on this stream, this probably indicates
1704 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1705 * lower than this.  However, we still try to handle it gracefully and peform
1706 * a shutdown, as if the stream was not reset.
1707 */
1708void
1709lsquic_stream_received_goaway (lsquic_stream_t *stream)
1710{
1711    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1712    if (0 == stream->read_offset &&
1713                            stream->data_in->di_if->di_empty(stream->data_in))
1714        fake_reset_unused_stream(stream);       /* Normal condition */
1715    else
1716    {   /* This is odd, let's handle it the best we can: */
1717        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1718        lsquic_stream_shutdown_internal(stream);
1719    }
1720}
1721
1722
1723uint64_t
1724lsquic_stream_read_offset (const lsquic_stream_t *stream)
1725{
1726    return stream->read_offset;
1727}
1728
1729
1730static int
1731stream_wantread (lsquic_stream_t *stream, int is_want)
1732{
1733    const int old_val = !!(stream->sm_qflags & SMQF_WANT_READ);
1734    const int new_val = !!is_want;
1735    if (old_val != new_val)
1736    {
1737        if (new_val)
1738        {
1739            if (!old_val)
1740                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1741                                                            next_read_stream);
1742            stream->sm_qflags |= SMQF_WANT_READ;
1743        }
1744        else
1745        {
1746            stream->sm_qflags &= ~SMQF_WANT_READ;
1747            if (old_val)
1748                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1749                                                            next_read_stream);
1750        }
1751    }
1752    return old_val;
1753}
1754
1755
1756static void
1757maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1758{
1759    assert(SMQF_WRITE_Q_FLAGS & flag);
1760    if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1761        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1762                                                        next_write_stream);
1763    stream->sm_qflags |= flag;
1764}
1765
1766
1767static void
1768maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_q_flags flag)
1769{
1770    assert(SMQF_WRITE_Q_FLAGS & flag);
1771    if (stream->sm_qflags & flag)
1772    {
1773        stream->sm_qflags &= ~flag;
1774        if (!(stream->sm_qflags & SMQF_WRITE_Q_FLAGS))
1775            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1776                                                        next_write_stream);
1777    }
1778}
1779
1780
1781static int
1782stream_wantwrite (struct lsquic_stream *stream, int new_val)
1783{
1784    const int old_val = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1785
1786    assert(0 == (new_val & ~1));    /* new_val is either 0 or 1 */
1787
1788    if (old_val != new_val)
1789    {
1790        if (new_val)
1791            maybe_put_onto_write_q(stream, SMQF_WANT_WRITE);
1792        else
1793            maybe_remove_from_write_q(stream, SMQF_WANT_WRITE);
1794    }
1795    return old_val;
1796}
1797
1798
1799int
1800lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1801{
1802    SM_HISTORY_APPEND(stream, SHE_WANTREAD_NO + !!is_want);
1803    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1804    {
1805        if (is_want)
1806            maybe_conn_to_tickable_if_readable(stream);
1807        return stream_wantread(stream, is_want);
1808    }
1809    else
1810    {
1811        errno = EBADF;
1812        return -1;
1813    }
1814}
1815
1816
1817int
1818lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1819{
1820    int old_val;
1821
1822    is_want = !!is_want;
1823
1824    SM_HISTORY_APPEND(stream, SHE_WANTWRITE_NO + is_want);
1825    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE)
1826                            && SSHS_BEGIN == stream->sm_send_headers_state)
1827    {
1828        stream->sm_saved_want_write = is_want;
1829        if (is_want)
1830            maybe_conn_to_tickable_if_writeable(stream, 1);
1831        return stream_wantwrite(stream, is_want);
1832    }
1833    else if (SSHS_BEGIN != stream->sm_send_headers_state)
1834    {
1835        old_val = stream->sm_saved_want_write;
1836        stream->sm_saved_want_write = is_want;
1837        return old_val;
1838    }
1839    else
1840    {
1841        errno = EBADF;
1842        return -1;
1843    }
1844}
1845
1846
1847struct progress
1848{
1849    enum stream_flags   s_flags;
1850    enum stream_q_flags q_flags;
1851};
1852
1853
1854static struct progress
1855stream_progress (const struct lsquic_stream *stream)
1856{
1857    return (struct progress) {
1858        .s_flags = stream->stream_flags
1859          & (STREAM_U_WRITE_DONE|STREAM_U_READ_DONE),
1860        .q_flags = stream->sm_qflags
1861          & (SMQF_WANT_READ|SMQF_WANT_WRITE|SMQF_WANT_FLUSH|SMQF_SEND_RST),
1862    };
1863}
1864
1865
1866static int
1867progress_eq (struct progress a, struct progress b)
1868{
1869    return a.s_flags == b.s_flags && a.q_flags == b.q_flags;
1870}
1871
1872
1873static void
1874stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1875{
1876    unsigned no_progress_count, no_progress_limit;
1877    struct progress progress;
1878    uint64_t size;
1879
1880    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1881
1882    no_progress_count = 0;
1883    while ((stream->sm_qflags & SMQF_WANT_READ)
1884                                            && lsquic_stream_readable(stream))
1885    {
1886        progress = stream_progress(stream);
1887        size  = stream->read_offset;
1888
1889        stream->stream_if->on_read(stream, stream->st_ctx);
1890
1891        if (no_progress_limit && size == stream->read_offset &&
1892                                progress_eq(progress, stream_progress(stream)))
1893        {
1894            ++no_progress_count;
1895            if (no_progress_count >= no_progress_limit)
1896            {
1897                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1898                    "progress) in user code reading from stream",
1899                    no_progress_count,
1900                    no_progress_count == 1 ? "" : "s");
1901                break;
1902            }
1903        }
1904        else
1905            no_progress_count = 0;
1906    }
1907}
1908
1909
1910static void
1911stream_hblock_sent (struct lsquic_stream *stream)
1912{
1913    int want_write;
1914
1915    LSQ_DEBUG("header block has been sent: restore default behavior");
1916    stream->sm_send_headers_state = SSHS_BEGIN;
1917    stream->sm_write_avail = stream_write_avail_with_frames;
1918
1919    want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
1920    if (want_write != stream->sm_saved_want_write)
1921        (void) lsquic_stream_wantwrite(stream, stream->sm_saved_want_write);
1922
1923    if (stream->stream_flags & STREAM_DELAYED_SW)
1924    {
1925        LSQ_DEBUG("performing delayed shutdown write");
1926        stream->stream_flags &= ~STREAM_DELAYED_SW;
1927        stream_shutdown_write(stream);
1928        maybe_schedule_call_on_close(stream);
1929        maybe_finish_stream(stream);
1930        maybe_conn_to_tickable_if_writeable(stream, 1);
1931    }
1932}
1933
1934
1935static void
1936on_write_header_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
1937{
1938    ssize_t nw;
1939
1940    nw = stream_write_buf(stream,
1941                stream->sm_header_block + stream->sm_hblock_off,
1942                stream->sm_hblock_sz - stream->sm_hblock_off);
1943    if (nw > 0)
1944    {
1945        stream->sm_hblock_off += nw;
1946        if (stream->sm_hblock_off == stream->sm_hblock_sz)
1947        {
1948            stream->stream_flags |= STREAM_HEADERS_SENT;
1949            free(stream->sm_header_block);
1950            stream->sm_header_block = NULL;
1951            stream->sm_hblock_sz = 0;
1952            stream_hblock_sent(stream);
1953            LSQ_DEBUG("header block written out successfully");
1954            /* TODO: if there was eos, do something else */
1955            if (stream->sm_qflags & SMQF_WANT_WRITE)
1956                stream->stream_if->on_write(stream, h);
1957        }
1958        else
1959        {
1960            LSQ_DEBUG("wrote %zd bytes more of header block; not done yet",
1961                nw);
1962        }
1963    }
1964    else if (nw < 0)
1965    {
1966        /* XXX What should happen if we hit an error? TODO */
1967    }
1968}
1969
1970
1971static void
1972(*select_on_write (struct lsquic_stream *stream))(struct lsquic_stream *,
1973                                                        lsquic_stream_ctx_t *)
1974{
1975    if (0 == (stream->stream_flags & STREAM_PUSHING)
1976                    && SSHS_HBLOCK_SENDING != stream->sm_send_headers_state)
1977        /* Common case */
1978        return stream->stream_if->on_write;
1979    else if (SSHS_HBLOCK_SENDING == stream->sm_send_headers_state)
1980        return on_write_header_wrapper;
1981    else
1982    {
1983        assert(stream->stream_flags & STREAM_PUSHING);
1984        if (stream_is_pushing_promise(stream))
1985            return on_write_pp_wrapper;
1986        else
1987            return stream->stream_if->on_write;
1988    }
1989}
1990
1991
1992static void
1993stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1994{
1995    unsigned no_progress_count, no_progress_limit;
1996    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
1997    struct progress progress;
1998
1999    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
2000
2001    no_progress_count = 0;
2002    stream->stream_flags |= STREAM_LAST_WRITE_OK;
2003    while ((stream->sm_qflags & SMQF_WANT_WRITE)
2004                && (stream->stream_flags & STREAM_LAST_WRITE_OK)
2005                       && lsquic_stream_write_avail(stream))
2006    {
2007        progress = stream_progress(stream);
2008
2009        on_write = select_on_write(stream);
2010        on_write(stream, stream->st_ctx);
2011
2012        if (no_progress_limit && progress_eq(progress, stream_progress(stream)))
2013        {
2014            ++no_progress_count;
2015            if (no_progress_count >= no_progress_limit)
2016            {
2017                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
2018                    "progress) in user code writing to stream",
2019                    no_progress_count,
2020                    no_progress_count == 1 ? "" : "s");
2021                break;
2022            }
2023        }
2024        else
2025            no_progress_count = 0;
2026    }
2027}
2028
2029
2030static void
2031stream_dispatch_read_events_once (lsquic_stream_t *stream)
2032{
2033    if ((stream->sm_qflags & SMQF_WANT_READ) && lsquic_stream_readable(stream))
2034    {
2035        stream->stream_if->on_read(stream, stream->st_ctx);
2036    }
2037}
2038
2039
2040uint64_t
2041lsquic_stream_combined_send_off (const struct lsquic_stream *stream)
2042{
2043    size_t frames_sizes;
2044
2045    frames_sizes = active_hq_frame_sizes(stream);
2046    return stream->tosend_off + stream->sm_n_buffered + frames_sizes;
2047}
2048
2049
2050static void
2051maybe_mark_as_blocked (lsquic_stream_t *stream)
2052{
2053    struct lsquic_conn_cap *cc;
2054    uint64_t used;
2055
2056    used = lsquic_stream_combined_send_off(stream);
2057    if (stream->max_send_off == used)
2058    {
2059        if (stream->blocked_off < stream->max_send_off)
2060        {
2061            stream->blocked_off = used;
2062            if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
2063                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2064                                                            next_send_stream);
2065            stream->sm_qflags |= SMQF_SEND_BLOCKED;
2066            LSQ_DEBUG("marked stream-blocked at stream offset "
2067                                            "%"PRIu64, stream->blocked_off);
2068        }
2069        else
2070            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
2071                " has been, or is about to be, sent", stream->blocked_off);
2072    }
2073
2074    if ((stream->sm_bflags & SMBF_CONN_LIMITED)
2075        && (cc = &stream->conn_pub->conn_cap,
2076                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
2077    {
2078        if (cc->cc_blocked < cc->cc_max)
2079        {
2080            cc->cc_blocked = cc->cc_max;
2081            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
2082            LSQ_DEBUG("marked connection-blocked at connection offset "
2083                                                    "%"PRIu64, cc->cc_max);
2084        }
2085        else
2086            LSQ_DEBUG("stream has already been marked connection-blocked "
2087                "at offset %"PRIu64, cc->cc_blocked);
2088    }
2089}
2090
2091
2092void
2093lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
2094{
2095    assert(stream->sm_qflags & SMQF_WANT_READ);
2096
2097    if (stream->sm_bflags & SMBF_RW_ONCE)
2098        stream_dispatch_read_events_once(stream);
2099    else
2100        stream_dispatch_read_events_loop(stream);
2101}
2102
2103
2104void
2105lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
2106{
2107    void (*on_write) (struct lsquic_stream *, lsquic_stream_ctx_t *);
2108    int progress;
2109    uint64_t tosend_off;
2110    unsigned short n_buffered;
2111    enum stream_q_flags q_flags;
2112
2113    assert(stream->sm_qflags & SMQF_WRITE_Q_FLAGS);
2114    q_flags = stream->sm_qflags & SMQF_WRITE_Q_FLAGS;
2115    tosend_off = stream->tosend_off;
2116    n_buffered = stream->sm_n_buffered;
2117
2118    if (stream->sm_qflags & SMQF_WANT_FLUSH)
2119        (void) stream_flush(stream);
2120
2121    if (stream->sm_bflags & SMBF_RW_ONCE)
2122    {
2123        if ((stream->sm_qflags & SMQF_WANT_WRITE)
2124            && lsquic_stream_write_avail(stream))
2125        {
2126            on_write = select_on_write(stream);
2127            on_write(stream, stream->st_ctx);
2128        }
2129    }
2130    else
2131        stream_dispatch_write_events_loop(stream);
2132
2133    /* Progress means either flags or offsets changed: */
2134    progress = !((stream->sm_qflags & SMQF_WRITE_Q_FLAGS) == q_flags &&
2135                        stream->tosend_off == tosend_off &&
2136                            stream->sm_n_buffered == n_buffered);
2137
2138    if (stream->sm_qflags & SMQF_WRITE_Q_FLAGS)
2139    {
2140        if (progress)
2141        {   /* Move the stream to the end of the list to ensure fairness. */
2142            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
2143                                                            next_write_stream);
2144            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
2145                                                            next_write_stream);
2146        }
2147    }
2148}
2149
2150
2151static size_t
2152inner_reader_empty_size (void *ctx)
2153{
2154    return 0;
2155}
2156
2157
2158static size_t
2159inner_reader_empty_read (void *ctx, void *buf, size_t count)
2160{
2161    return 0;
2162}
2163
2164
2165static int
2166stream_flush (lsquic_stream_t *stream)
2167{
2168    struct lsquic_reader empty_reader;
2169    ssize_t nw;
2170
2171    assert(stream->sm_qflags & SMQF_WANT_FLUSH);
2172    assert(stream->sm_n_buffered > 0 ||
2173        /* Flushing is also used to packetize standalone FIN: */
2174        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
2175                                                    == STREAM_U_WRITE_DONE));
2176
2177    empty_reader.lsqr_size = inner_reader_empty_size;
2178    empty_reader.lsqr_read = inner_reader_empty_read;
2179    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
2180    nw = stream_write_to_packets(stream, &empty_reader, 0);
2181
2182    if (nw >= 0)
2183    {
2184        assert(nw == 0);    /* Empty reader: must have read zero bytes */
2185        return 0;
2186    }
2187    else
2188        return -1;
2189}
2190
2191
2192static int
2193stream_flush_nocheck (lsquic_stream_t *stream)
2194{
2195    size_t frames;
2196
2197    frames = active_hq_frame_sizes(stream);
2198    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered + frames;
2199    stream->sm_flush_to_payload = stream->sm_payload + stream->sm_n_buffered;
2200    maybe_put_onto_write_q(stream, SMQF_WANT_FLUSH);
2201    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
2202
2203    return stream_flush(stream);
2204}
2205
2206
2207int
2208lsquic_stream_flush (lsquic_stream_t *stream)
2209{
2210    if (stream->stream_flags & STREAM_U_WRITE_DONE)
2211    {
2212        LSQ_DEBUG("cannot flush closed stream");
2213        errno = EBADF;
2214        return -1;
2215    }
2216
2217    if (0 == stream->sm_n_buffered)
2218    {
2219        LSQ_DEBUG("flushing 0 bytes: noop");
2220        return 0;
2221    }
2222
2223    return stream_flush_nocheck(stream);
2224}
2225
2226
2227static size_t
2228stream_get_n_allowed (const struct lsquic_stream *stream)
2229{
2230    if (stream->sm_n_allocated)
2231        return stream->sm_n_allocated;
2232    else
2233        return stream->conn_pub->path->np_pack_size;
2234}
2235
2236
2237/* The flush threshold is the maximum size of stream data that can be sent
2238 * in a full packet.
2239 */
2240#ifdef NDEBUG
2241static
2242#endif
2243       size_t
2244lsquic_stream_flush_threshold (const struct lsquic_stream *stream,
2245                                                            unsigned data_sz)
2246{
2247    enum packet_out_flags flags;
2248    enum packno_bits bits;
2249    size_t packet_header_sz, stream_header_sz, tag_len;
2250    size_t threshold;
2251
2252    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
2253    flags = bits << POBIT_SHIFT;
2254    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
2255        flags |= PO_CONN_ID;
2256    if (stream_is_hsk(stream))
2257        flags |= PO_LONGHEAD;
2258
2259    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags,
2260                                        stream->conn_pub->path->np_dcid.len);
2261    stream_header_sz = stream->sm_frame_header_sz(stream, data_sz);
2262    tag_len = stream->conn_pub->lconn->cn_esf_c->esf_tag_len;
2263
2264    threshold = stream_get_n_allowed(stream) - tag_len
2265              - packet_header_sz - stream_header_sz;
2266    return threshold;
2267}
2268
2269
2270#define COMMON_WRITE_CHECKS() do {                                          \
2271    if ((stream->sm_bflags & SMBF_USE_HEADERS)                              \
2272            && !(stream->stream_flags & STREAM_HEADERS_SENT))               \
2273    {                                                                       \
2274        if (SSHS_BEGIN != stream->sm_send_headers_state)                    \
2275        {                                                                   \
2276            LSQ_DEBUG("still sending headers: no writing allowed");         \
2277            return 0;                                                       \
2278        }                                                                   \
2279        else                                                                \
2280        {                                                                   \
2281            LSQ_INFO("Attempt to write to stream before sending HTTP "      \
2282                                                                "headers"); \
2283            errno = EILSEQ;                                                 \
2284            return -1;                                                      \
2285        }                                                                   \
2286    }                                                                       \
2287    if (lsquic_stream_is_reset(stream))                                     \
2288    {                                                                       \
2289        LSQ_INFO("Attempt to write to stream after it had been reset");     \
2290        errno = ECONNRESET;                                                 \
2291        return -1;                                                          \
2292    }                                                                       \
2293    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
2294    {                                                                       \
2295        LSQ_INFO("Attempt to write to stream after it was closed for "      \
2296                                                                "writing"); \
2297        errno = EBADF;                                                      \
2298        return -1;                                                          \
2299    }                                                                       \
2300} while (0)
2301
2302
2303struct frame_gen_ctx
2304{
2305    lsquic_stream_t      *fgc_stream;
2306    struct lsquic_reader *fgc_reader;
2307    /* We keep our own count of how many bytes were read from reader because
2308     * some readers are external.  The external caller does not have to rely
2309     * on our count, but it can.
2310     */
2311    size_t                fgc_nread_from_reader;
2312    size_t              (*fgc_size) (void *ctx);
2313    int                 (*fgc_fin) (void *ctx);
2314    gsf_read_f            fgc_read;
2315};
2316
2317
2318static size_t
2319frame_std_gen_size (void *ctx)
2320{
2321    struct frame_gen_ctx *fg_ctx = ctx;
2322    size_t available, remaining;
2323
2324    /* Make sure we are not writing past available size: */
2325    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2326    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2327    if (available < remaining)
2328        remaining = available;
2329
2330    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
2331}
2332
2333
2334static size_t
2335stream_hq_frame_size (const struct stream_hq_frame *shf)
2336{
2337    if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2338        return 1 + 1 + ((shf->shf_flags & SHF_TWO_BYTES) > 0);
2339    else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)) == SHF_FIXED_SIZE)
2340        return 1 + (1 << vint_val2bits(shf->shf_frame_size));
2341    else
2342    {
2343        assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2344                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2345        return 0;
2346    }
2347}
2348
2349
2350static size_t
2351active_hq_frame_sizes (const struct lsquic_stream *stream)
2352{
2353    const struct stream_hq_frame *shf;
2354    size_t size;
2355
2356    size = 0;
2357    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2358                                        == (SMBF_IETF|SMBF_USE_HEADERS))
2359        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2360            if (!(shf->shf_flags & SHF_WRITTEN))
2361                size += stream_hq_frame_size(shf);
2362
2363    return size;
2364}
2365
2366
2367static uint64_t
2368stream_hq_frame_end (const struct stream_hq_frame *shf)
2369{
2370    if (shf->shf_flags & SHF_FIXED_SIZE)
2371        return shf->shf_off + shf->shf_frame_size;
2372    else if (shf->shf_flags & SHF_TWO_BYTES)
2373        return shf->shf_off + ((1 << 14) - 1);
2374    else
2375        return shf->shf_off + ((1 << 6) - 1);
2376}
2377
2378
2379static int
2380frame_in_stream (const struct lsquic_stream *stream,
2381                                            const struct stream_hq_frame *shf)
2382{
2383    return shf >= stream->sm_hq_frame_arr
2384        && shf < stream->sm_hq_frame_arr + sizeof(stream->sm_hq_frame_arr)
2385                                        / sizeof(stream->sm_hq_frame_arr[0])
2386        ;
2387}
2388
2389
2390static void
2391stream_hq_frame_put (struct lsquic_stream *stream,
2392                                                struct stream_hq_frame *shf)
2393{
2394    assert(STAILQ_FIRST(&stream->sm_hq_frames) == shf);
2395    STAILQ_REMOVE_HEAD(&stream->sm_hq_frames, shf_next);
2396    if (frame_in_stream(stream, shf))
2397        memset(shf, 0, sizeof(*shf));
2398    else
2399        lsquic_malo_put(shf);
2400}
2401
2402
2403static void
2404stream_hq_frame_close (struct lsquic_stream *stream,
2405                                                struct stream_hq_frame *shf)
2406{
2407    unsigned bits;
2408
2409    LSQ_DEBUG("close HQ frame of type 0x%X at payload offset %"PRIu64
2410        " (actual offset %"PRIu64")", shf->shf_frame_type,
2411        stream->sm_payload, stream->tosend_off);
2412    assert(shf->shf_flags & SHF_ACTIVE);
2413    if (!(shf->shf_flags & SHF_FIXED_SIZE))
2414    {
2415        shf->shf_frame_ptr[0] = shf->shf_frame_type;
2416        bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2417        vint_write(shf->shf_frame_ptr + 1, stream->sm_payload - shf->shf_off,
2418                                                            bits, 1 << bits);
2419    }
2420    stream_hq_frame_put(stream, shf);
2421}
2422
2423
2424static size_t
2425frame_hq_gen_size (void *ctx)
2426{
2427    struct frame_gen_ctx *fg_ctx = ctx;
2428    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2429    size_t available, remaining, frames;
2430    const struct stream_hq_frame *shf;
2431
2432    frames = 0;
2433    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2434        if (shf->shf_off >= stream->sm_payload)
2435            frames += stream_hq_frame_size(shf);
2436
2437    /* Make sure we are not writing past available size: */
2438    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2439    available = lsquic_stream_write_avail(stream);
2440    if (available < remaining)
2441        remaining = available;
2442
2443    return remaining + stream->sm_n_buffered + frames;
2444}
2445
2446
2447static int
2448frame_std_gen_fin (void *ctx)
2449{
2450    struct frame_gen_ctx *fg_ctx = ctx;
2451    return !(fg_ctx->fgc_stream->sm_bflags & SMBF_CRYPTO)
2452        && (fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE)
2453        && 0 == fg_ctx->fgc_stream->sm_n_buffered
2454        /* Do not use frame_std_gen_size() as it may chop the real size: */
2455        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
2456}
2457
2458
2459static void
2460incr_conn_cap (struct lsquic_stream *stream, size_t incr)
2461{
2462    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2463    {
2464        stream->conn_pub->conn_cap.cc_sent += incr;
2465        assert(stream->conn_pub->conn_cap.cc_sent
2466                                    <= stream->conn_pub->conn_cap.cc_max);
2467    }
2468}
2469
2470
2471void
2472incr_sm_payload (struct lsquic_stream *stream, size_t incr)
2473{
2474    stream->sm_payload += incr;
2475    stream->tosend_off += incr;
2476    assert(stream->tosend_off <= stream->max_send_off);
2477}
2478
2479
2480static size_t
2481frame_std_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2482{
2483    struct frame_gen_ctx *fg_ctx = ctx;
2484    unsigned char *p = begin_buf;
2485    unsigned char *const end = p + len;
2486    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2487    size_t n_written, available, n_to_write;
2488
2489    if (stream->sm_n_buffered > 0)
2490    {
2491        if (len <= stream->sm_n_buffered)
2492        {
2493            memcpy(p, stream->sm_buf, len);
2494            memmove(stream->sm_buf, stream->sm_buf + len,
2495                                                stream->sm_n_buffered - len);
2496            stream->sm_n_buffered -= len;
2497            if (0 == stream->sm_n_buffered)
2498                maybe_resize_stream_buffer(stream);
2499            assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
2500            incr_sm_payload(stream, len);
2501            *fin = fg_ctx->fgc_fin(fg_ctx);
2502            return len;
2503        }
2504        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
2505        p += stream->sm_n_buffered;
2506        stream->sm_n_buffered = 0;
2507        maybe_resize_stream_buffer(stream);
2508    }
2509
2510    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
2511    n_to_write = end - p;
2512    if (n_to_write > available)
2513        n_to_write = available;
2514    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
2515                                              n_to_write);
2516    p += n_written;
2517    fg_ctx->fgc_nread_from_reader += n_written;
2518    *fin = fg_ctx->fgc_fin(fg_ctx);
2519    incr_sm_payload(stream, p - (const unsigned char *) begin_buf);
2520    incr_conn_cap(stream, n_written);
2521    return p - (const unsigned char *) begin_buf;
2522}
2523
2524
2525static struct stream_hq_frame *
2526find_hq_frame (const struct lsquic_stream *stream, uint64_t off)
2527{
2528    struct stream_hq_frame *shf;
2529
2530    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
2531        if (shf->shf_off <= off && stream_hq_frame_end(shf) > off)
2532            return shf;
2533
2534    return NULL;
2535}
2536
2537
2538static struct stream_hq_frame *
2539find_cur_hq_frame (const struct lsquic_stream *stream)
2540{
2541    return find_hq_frame(stream, stream->sm_payload);
2542}
2543
2544
2545static struct stream_hq_frame *
2546open_hq_frame (struct lsquic_stream *stream)
2547{
2548    struct stream_hq_frame *shf;
2549
2550    for (shf = stream->sm_hq_frame_arr; shf < stream->sm_hq_frame_arr
2551            + sizeof(stream->sm_hq_frame_arr)
2552                / sizeof(stream->sm_hq_frame_arr[0]); ++shf)
2553        if (!(shf->shf_flags & SHF_ACTIVE))
2554            goto found;
2555
2556    shf = lsquic_malo_get(stream->conn_pub->mm->malo.stream_hq_frame);
2557    if (!shf)
2558    {
2559        LSQ_WARN("cannot allocate HQ frame");
2560        return NULL;
2561    }
2562    memset(shf, 0, sizeof(*shf));
2563
2564  found:
2565    STAILQ_INSERT_TAIL(&stream->sm_hq_frames, shf, shf_next);
2566    shf->shf_flags = SHF_ACTIVE;
2567    return shf;
2568}
2569
2570
2571static struct stream_hq_frame *
2572stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
2573            enum hq_frame_type frame_type, enum shf_flags flags, size_t size)
2574{
2575    struct stream_hq_frame *shf;
2576
2577    shf = open_hq_frame(stream);
2578    if (!shf)
2579    {
2580        LSQ_WARN("could not open HQ frame");
2581        return NULL;
2582    }
2583
2584    shf->shf_off        = off;
2585    shf->shf_flags     |= flags;
2586    shf->shf_frame_type = frame_type;
2587    if (shf->shf_flags & SHF_FIXED_SIZE)
2588    {
2589        shf->shf_frame_size = size;
2590        LSQ_DEBUG("activated fixed-size HQ frame of type 0x%X at offset "
2591            "%"PRIu64", size %zu", shf->shf_frame_type, shf->shf_off, size);
2592    }
2593    else
2594    {
2595        shf->shf_frame_ptr  = NULL;
2596        if (size >= (1 << 6))
2597            shf->shf_flags |= SHF_TWO_BYTES;
2598        LSQ_DEBUG("activated variable-size HQ frame of type 0x%X at offset "
2599            "%"PRIu64, shf->shf_frame_type, shf->shf_off);
2600    }
2601
2602    return shf;
2603}
2604
2605
2606static size_t
2607frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
2608{
2609    struct frame_gen_ctx *fg_ctx = ctx;
2610    unsigned char *p = begin_buf;
2611    unsigned char *const end = p + len;
2612    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2613    struct stream_hq_frame *shf;
2614    size_t nw, frame_sz, avail, rem;
2615    unsigned bits;
2616
2617    while (p < end)
2618    {
2619        shf = find_cur_hq_frame(stream);
2620        if (shf)
2621            LSQ_DEBUG("found current HQ frame of type 0x%X at offset %"PRIu64,
2622                                            shf->shf_frame_type, shf->shf_off);
2623        else
2624        {
2625            rem = frame_std_gen_size(ctx);
2626            if (rem)
2627            {
2628                if (rem > ((1 << 14) - 1))
2629                    rem = (1 << 14) - 1;
2630                shf = stream_activate_hq_frame(stream,
2631                                    stream->sm_payload, HQFT_DATA, 0, rem);
2632                if (shf)
2633                    goto insert;
2634                else
2635                {
2636                    /* TODO: abort connection?  Handle failure somehow */
2637                    break;
2638                }
2639            }
2640            else
2641                break;
2642        }
2643        if (shf->shf_off == stream->sm_payload
2644                                        && !(shf->shf_flags & SHF_WRITTEN))
2645        {
2646  insert:
2647            frame_sz = stream_hq_frame_size(shf);
2648            if (frame_sz > (uintptr_t) (end - p))
2649            {
2650                stream_hq_frame_put(stream, shf);
2651                break;
2652            }
2653            LSQ_DEBUG("insert %zu-byte HQ frame of type 0x%X at payload "
2654                "offset %"PRIu64" (actual offset %"PRIu64")", frame_sz,
2655                shf->shf_frame_type, stream->sm_payload, stream->tosend_off);
2656            if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
2657            {
2658                shf->shf_frame_ptr = p;
2659                memset(p, 0, frame_sz);
2660                p += frame_sz;
2661            }
2662            else if ((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2663                                                            == SHF_FIXED_SIZE)
2664            {
2665                *p++ = shf->shf_frame_type;
2666                bits = vint_val2bits(shf->shf_frame_size);
2667                vint_write(p, shf->shf_frame_size, bits, 1 << bits);
2668                p += 1 << bits;
2669            }
2670            else
2671                assert((shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))
2672                                            == (SHF_FIXED_SIZE|SHF_PHANTOM));
2673            if (!(shf->shf_flags & SHF_CC_PAID))
2674            {
2675                incr_conn_cap(stream, frame_sz);
2676                shf->shf_flags |= SHF_CC_PAID;
2677            }
2678            shf->shf_flags |= SHF_WRITTEN;
2679            stream->tosend_off += frame_sz;
2680            assert(stream->tosend_off <= stream->max_send_off);
2681        }
2682        else
2683        {
2684            avail = stream->sm_n_buffered + stream->sm_write_avail(stream);
2685            len = stream_hq_frame_end(shf) - stream->sm_payload;
2686            assert(len);
2687            if (len > (unsigned) (end - p))
2688                len = end - p;
2689            if (len > avail)
2690                len = avail;
2691            if (!len)
2692                break;
2693            nw = frame_std_gen_read(ctx, p, len, fin);
2694            p += nw;
2695            if (nw < len)
2696                break;
2697            if (stream_hq_frame_end(shf) == stream->sm_payload)
2698                stream_hq_frame_close(stream, shf);
2699        }
2700    }
2701
2702    return p - (unsigned char *) begin_buf;
2703}
2704
2705
2706static size_t
2707crypto_frame_gen_read (void *ctx, void *buf, size_t len)
2708{
2709    int fin_ignored;
2710
2711    return frame_std_gen_read(ctx, buf, len, &fin_ignored);
2712}
2713
2714
2715static void
2716check_flush_threshold (lsquic_stream_t *stream)
2717{
2718    if ((stream->sm_qflags & SMQF_WANT_FLUSH) &&
2719                            stream->tosend_off >= stream->sm_flush_to)
2720    {
2721        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
2722                                                    stream->sm_flush_to);
2723        maybe_remove_from_write_q(stream, SMQF_WANT_FLUSH);
2724    }
2725}
2726
2727
2728#if LSQUIC_EXTRA_CHECKS
2729static void
2730verify_conn_cap (const struct lsquic_conn_public *conn_pub)
2731{
2732    const struct lsquic_stream *stream;
2733    struct lsquic_hash_elem *el;
2734    unsigned n_buffered;
2735
2736    if (!conn_pub->all_streams)
2737        /* TODO: enable this check for unit tests as well */
2738        return;
2739
2740    n_buffered = 0;
2741    for (el = lsquic_hash_first(conn_pub->all_streams); el;
2742                                 el = lsquic_hash_next(conn_pub->all_streams))
2743    {
2744        stream = lsquic_hashelem_getdata(el);
2745        if (stream->sm_bflags & SMBF_CONN_LIMITED)
2746            n_buffered += stream->sm_n_buffered;
2747    }
2748
2749    assert(n_buffered + conn_pub->stream_frame_bytes
2750                                            == conn_pub->conn_cap.cc_sent);
2751}
2752
2753
2754#endif
2755
2756
2757static int
2758write_stream_frame (struct frame_gen_ctx *fg_ctx, const size_t size,
2759                                        struct lsquic_packet_out *packet_out)
2760{
2761    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
2762    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2763    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2764    unsigned off;
2765    int len, s;
2766
2767#if LSQUIC_CONN_STATS || LSQUIC_EXTRA_CHECKS
2768    const uint64_t begin_off = stream->tosend_off;
2769#endif
2770    off = packet_out->po_data_sz;
2771    len = pf->pf_gen_stream_frame(
2772                packet_out->po_data + packet_out->po_data_sz,
2773                lsquic_packet_out_avail(packet_out), stream->id,
2774                stream->tosend_off,
2775                fg_ctx->fgc_fin(fg_ctx), size, fg_ctx->fgc_read, fg_ctx);
2776    if (len < 0)
2777        return len;
2778
2779#if LSQUIC_CONN_STATS
2780    stream->conn_pub->conn_stats->out.stream_frames += 1;
2781    stream->conn_pub->conn_stats->out.stream_data_sz
2782                                            += stream->tosend_off - begin_off;
2783#endif
2784    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
2785                            packet_out->po_data + packet_out->po_data_sz, len);
2786    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2787    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
2788    if (0 == lsquic_packet_out_avail(packet_out))
2789        packet_out->po_flags |= PO_STREAM_END;
2790    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2791                                     stream, QUIC_FRAME_STREAM, off, len);
2792    if (s != 0)
2793    {
2794        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
2795        return -1;
2796    }
2797#if LSQUIC_EXTRA_CHECKS
2798    if (stream->sm_bflags & SMBF_CONN_LIMITED)
2799    {
2800        stream->conn_pub->stream_frame_bytes += stream->tosend_off - begin_off;
2801        verify_conn_cap(stream->conn_pub);
2802    }
2803#endif
2804
2805    check_flush_threshold(stream);
2806    return len;
2807}
2808
2809
2810static enum swtp_status
2811stream_write_to_packet_hsk (struct frame_gen_ctx *fg_ctx, const size_t size)
2812{
2813    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2814    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2815    struct lsquic_packet_out *packet_out;
2816    int len;
2817
2818    packet_out = lsquic_send_ctl_new_packet_out(send_ctl, 0, PNS_APP,
2819                                                    stream->conn_pub->path);
2820    if (!packet_out)
2821        return SWTP_STOP;
2822    packet_out->po_header_type = stream->tosend_off == 0
2823                                        ? HETY_INITIAL : HETY_HANDSHAKE;
2824
2825    len = write_stream_frame(fg_ctx, size, packet_out);
2826
2827    if (len > 0)
2828    {
2829        packet_out->po_flags |= PO_HELLO;
2830        lsquic_packet_out_zero_pad(packet_out);
2831        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
2832        return SWTP_OK;
2833    }
2834    else
2835        return SWTP_ERROR;
2836}
2837
2838
2839static enum swtp_status
2840stream_write_to_packet_std (struct frame_gen_ctx *fg_ctx, const size_t size)
2841{
2842    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2843    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2844    unsigned stream_header_sz, need_at_least;
2845    struct lsquic_packet_out *packet_out;
2846    struct lsquic_stream *headers_stream;
2847    int len;
2848
2849    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
2850                                                        == STREAM_HEADERS_SENT)
2851    {
2852        if (stream->sm_bflags & SMBF_IETF)
2853        {
2854            if (stream->stream_flags & STREAM_ENCODER_DEP)
2855                headers_stream = stream->conn_pub->u.ietf.qeh->qeh_enc_sm_out;
2856            else
2857                headers_stream = NULL;
2858        }
2859        else
2860            headers_stream =
2861                lsquic_headers_stream_get_stream(stream->conn_pub->u.gquic.hs);
2862        if (headers_stream && lsquic_stream_has_data_to_flush(headers_stream))
2863        {
2864            LSQ_DEBUG("flushing headers stream before packetizing stream data");
2865            (void) lsquic_stream_flush(headers_stream);
2866        }
2867        /* If there is nothing to flush, some other stream must have flushed it:
2868         * this means our headers are flushed.  Either way, only do this once.
2869         */
2870        stream->stream_flags |= STREAM_HDRS_FLUSHED;
2871    }
2872
2873    stream_header_sz = stream->sm_frame_header_sz(stream, size);
2874    need_at_least = stream_header_sz;
2875    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
2876                                       == (SMBF_IETF|SMBF_USE_HEADERS))
2877    {
2878        if (size > 0)
2879            need_at_least += 3;     /* Enough room for HTTP/3 frame */
2880    }
2881    else
2882        need_at_least += size > 0;
2883  get_packet:
2884    packet_out = lsquic_send_ctl_get_packet_for_stream(send_ctl,
2885                                need_at_least, stream->conn_pub->path, stream);
2886    if (packet_out)
2887    {
2888        len = write_stream_frame(fg_ctx, size, packet_out);
2889        if (len > 0)
2890            return SWTP_OK;
2891        assert(len < 0);
2892        if (-len > (int) need_at_least)
2893        {
2894            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
2895                "%u bytes, will try again", -len, need_at_least);
2896            need_at_least = -len;
2897            goto get_packet;
2898        }
2899        return SWTP_ERROR;
2900    }
2901    else
2902        return SWTP_STOP;
2903}
2904
2905
2906/* Use for IETF crypto streams and gQUIC crypto stream for versions >= Q050. */
2907static enum swtp_status
2908stream_write_to_packet_crypto (struct frame_gen_ctx *fg_ctx, const size_t size)
2909{
2910    struct lsquic_stream *const stream = fg_ctx->fgc_stream;
2911    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
2912    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
2913    unsigned crypto_header_sz, need_at_least;
2914    struct lsquic_packet_out *packet_out;
2915    unsigned short off;
2916    enum packnum_space pns;
2917    int len, s;
2918
2919    if (stream->sm_bflags & SMBF_IETF)
2920        pns = lsquic_enclev2pns[ crypto_level(stream) ];
2921    else
2922        pns = PNS_APP;
2923
2924    assert(size > 0);
2925    crypto_header_sz = stream->sm_frame_header_sz(stream, size);
2926    need_at_least = crypto_header_sz + 1;
2927
2928    packet_out = lsquic_send_ctl_get_packet_for_crypto(send_ctl,
2929                                    need_at_least, pns, stream->conn_pub->path);
2930    if (!packet_out)
2931        return SWTP_STOP;
2932
2933    off = packet_out->po_data_sz;
2934    len = pf->pf_gen_crypto_frame(packet_out->po_data + packet_out->po_data_sz,
2935                lsquic_packet_out_avail(packet_out), stream->tosend_off,
2936                size, crypto_frame_gen_read, fg_ctx);
2937    if (len < 0)
2938        return len;
2939
2940    EV_LOG_GENERATED_CRYPTO_FRAME(LSQUIC_LOG_CONN_ID, pf,
2941                            packet_out->po_data + packet_out->po_data_sz, len);
2942    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
2943    packet_out->po_frame_types |= 1 << QUIC_FRAME_CRYPTO;
2944    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
2945                                     stream, QUIC_FRAME_CRYPTO, off, len);
2946    if (s != 0)
2947    {
2948        LSQ_WARN("adding crypto stream to packet failed: %s", strerror(errno));
2949        return -1;
2950    }
2951
2952    packet_out->po_flags |= PO_HELLO;
2953
2954    if (!(stream->sm_bflags & SMBF_IETF))
2955    {
2956        const unsigned short before = packet_out->po_data_sz;
2957        lsquic_packet_out_zero_pad(packet_out);
2958        /* XXX: too hacky */
2959        if (before < packet_out->po_data_sz)
2960            send_ctl->sc_bytes_scheduled += packet_out->po_data_sz - before;
2961    }
2962
2963    check_flush_threshold(stream);
2964    return SWTP_OK;
2965}
2966
2967
2968static void
2969abort_connection (struct lsquic_stream *stream)
2970{
2971    if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
2972        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
2973                                                next_service_stream);
2974    stream->sm_qflags |= SMQF_ABORT_CONN;
2975    LSQ_WARN("connection will be aborted");
2976    maybe_conn_to_tickable(stream);
2977}
2978
2979
2980static void
2981maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
2982{
2983    struct stream_hq_frame *shf;
2984    uint64_t size;
2985    unsigned bits;
2986
2987    shf = find_cur_hq_frame(stream);
2988    if (!shf)
2989        return;
2990
2991    if (shf->shf_flags & SHF_FIXED_SIZE)
2992    {
2993        if (shf->shf_off + shf->shf_frame_size <= stream->sm_payload)
2994            stream_hq_frame_put(stream, shf);
2995        return;
2996    }
2997
2998    bits = (shf->shf_flags & SHF_TWO_BYTES) > 0;
2999    size = stream->sm_payload + stream->sm_n_buffered - shf->shf_off;
3000    if (size <= VINT_MAX_B(bits) && shf->shf_frame_ptr)
3001    {
3002        if (0 == stream->sm_n_buffered)
3003            LSQ_DEBUG("close HQ frame type 0x%X of size %"PRIu64,
3004                                                shf->shf_frame_type, size);
3005        else
3006            LSQ_DEBUG("convert HQ frame type 0x%X of to fixed %"PRIu64,
3007                                                shf->shf_frame_type, size);
3008        shf->shf_frame_ptr[0] = shf->shf_frame_type;
3009        vint_write(shf->shf_frame_ptr + 1, size, bits, 1 << bits);
3010        if (0 == stream->sm_n_buffered)
3011            stream_hq_frame_put(stream, shf);
3012        else
3013        {
3014            shf->shf_frame_size = size;
3015            shf->shf_flags |= SHF_FIXED_SIZE;
3016        }
3017    }
3018    else if (!shf->shf_frame_ptr)
3019        LSQ_DEBUG("HQ frame of type 0x%X has not yet been written, not "
3020            "closing", shf->shf_frame_type);
3021    else
3022    {
3023        assert(stream->sm_n_buffered);
3024        LSQ_ERROR("cannot close frame of size %"PRIu64" on stream %"PRIu64
3025            " -- too large", size, stream->id);
3026        stream->conn_pub->lconn->cn_if->ci_internal_error(
3027            stream->conn_pub->lconn, "HTTP/3 frame too large");
3028        stream_hq_frame_put(stream, shf);
3029    }
3030}
3031
3032
3033static ssize_t
3034stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
3035                         size_t thresh)
3036{
3037    size_t size;
3038    ssize_t nw;
3039    unsigned seen_ok;
3040    int use_framing;
3041    struct frame_gen_ctx fg_ctx = {
3042        .fgc_stream = stream,
3043        .fgc_reader = reader,
3044        .fgc_nread_from_reader = 0,
3045    };
3046
3047    use_framing = (stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3048                                       == (SMBF_IETF|SMBF_USE_HEADERS);
3049    if (use_framing)
3050    {
3051        fg_ctx.fgc_size = frame_hq_gen_size;
3052        fg_ctx.fgc_read = frame_hq_gen_read;
3053        fg_ctx.fgc_fin = frame_std_gen_fin; /* This seems to work for either? XXX */
3054    }
3055    else
3056    {
3057        fg_ctx.fgc_size = frame_std_gen_size;
3058        fg_ctx.fgc_read = frame_std_gen_read;
3059        fg_ctx.fgc_fin = frame_std_gen_fin;
3060    }
3061
3062    seen_ok = 0;
3063    while ((size = fg_ctx.fgc_size(&fg_ctx), thresh ? size >= thresh : size > 0)
3064           || fg_ctx.fgc_fin(&fg_ctx))
3065    {
3066        switch (stream->sm_write_to_packet(&fg_ctx, size))
3067        {
3068        case SWTP_OK:
3069            if (!seen_ok++)
3070                maybe_conn_to_tickable_if_writeable(stream, 0);
3071            if (fg_ctx.fgc_fin(&fg_ctx))
3072            {
3073                if (use_framing && seen_ok)
3074                    maybe_close_varsize_hq_frame(stream);
3075                stream->stream_flags |= STREAM_FIN_SENT;
3076                goto end;
3077            }
3078            else
3079                break;
3080        case SWTP_STOP:
3081            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3082            if (use_framing && seen_ok)
3083                maybe_close_varsize_hq_frame(stream);
3084            goto end;
3085        default:
3086            abort_connection(stream);
3087            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
3088            return -1;
3089        }
3090    }
3091
3092    if (use_framing && seen_ok)
3093        maybe_close_varsize_hq_frame(stream);
3094
3095    if (thresh)
3096    {
3097        assert(size < thresh);
3098        assert(size >= stream->sm_n_buffered);
3099        size -= stream->sm_n_buffered;
3100        if (size > 0)
3101        {
3102            nw = save_to_buffer(stream, reader, size);
3103            if (nw < 0)
3104                return -1;
3105            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
3106        }
3107    }
3108    else
3109    {
3110        /* We count flushed data towards both stream and connection limits,
3111         * so we should have been able to packetize all of it:
3112         */
3113        assert(0 == stream->sm_n_buffered);
3114        assert(size == 0);
3115    }
3116
3117    maybe_mark_as_blocked(stream);
3118
3119  end:
3120    return fg_ctx.fgc_nread_from_reader;
3121}
3122
3123
3124/* Perform an implicit flush when we hit connection limit while buffering
3125 * data.  This is to prevent a (theoretical) stall:
3126 *
3127 * Imagine a number of streams, all of which buffered some data.  The buffered
3128 * data is up to connection cap, which means no further writes are possible.
3129 * None of them flushes, which means that data is not sent and connection
3130 * WINDOW_UPDATE frame never arrives from peer.  Stall.
3131 */
3132static int
3133maybe_flush_stream (struct lsquic_stream *stream)
3134{
3135    if (stream->sm_n_buffered > 0
3136          && (stream->sm_bflags & SMBF_CONN_LIMITED)
3137            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
3138        return stream_flush_nocheck(stream);
3139    else
3140        return 0;
3141}
3142
3143
3144static int
3145stream_hq_frame_extendable (const struct stream_hq_frame *shf, uint64_t cur_off,
3146                                                                    unsigned len)
3147{
3148    return (shf->shf_flags & (SHF_TWO_BYTES|SHF_FIXED_SIZE)) == 0
3149        && cur_off - shf->shf_off < (1 << 6)
3150        && cur_off - shf->shf_off + len >= (1 << 6)
3151        ;
3152}
3153
3154
3155/* Update currently buffered HQ frame or create a new one, if possible.
3156 * Return update length to be buffered.  If a HQ frame cannot be
3157 * buffered due to size, 0 is returned, thereby preventing both HQ frame
3158 * creation and buffering.
3159 */
3160static size_t
3161update_buffered_hq_frames (struct lsquic_stream *stream, size_t len,
3162                                                                size_t avail)
3163{
3164    struct stream_hq_frame *shf;
3165    uint64_t cur_off, end;
3166    size_t frame_sz;
3167    unsigned extendable;
3168
3169    cur_off = stream->sm_payload + stream->sm_n_buffered;
3170    STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3171        if (shf->shf_off <= cur_off)
3172        {
3173            end = stream_hq_frame_end(shf);
3174            extendable = stream_hq_frame_extendable(shf, cur_off, len);
3175            if (cur_off < end + extendable)
3176                break;
3177        }
3178
3179    if (shf)
3180    {
3181        if (len > end + extendable - cur_off)
3182            len = end + extendable - cur_off;
3183        frame_sz = stream_hq_frame_size(shf);
3184    }
3185    else
3186    {
3187        assert(avail >= 3);
3188        shf = stream_activate_hq_frame(stream, cur_off, HQFT_DATA, 0, len);
3189        if (!shf)
3190            return 0;
3191        if (len > stream_hq_frame_end(shf) - cur_off)
3192            len = stream_hq_frame_end(shf) - cur_off;
3193        extendable = 0;
3194        frame_sz = stream_hq_frame_size(shf);
3195        if (avail < frame_sz)
3196            return 0;
3197        avail -= frame_sz;
3198    }
3199
3200    if (!(shf->shf_flags & SHF_CC_PAID))
3201    {
3202        incr_conn_cap(stream, frame_sz);
3203        shf->shf_flags |= SHF_CC_PAID;
3204    }
3205    if (extendable)
3206    {
3207        shf->shf_flags |= SHF_TWO_BYTES;
3208        incr_conn_cap(stream, 1);
3209        avail -= 1;
3210        if ((stream->sm_qflags & SMQF_WANT_FLUSH)
3211                && shf->shf_off <= stream->sm_payload
3212                && stream_hq_frame_end(shf) >= stream->sm_flush_to_payload)
3213            stream->sm_flush_to += 1;
3214    }
3215
3216    if (len <= avail)
3217        return len;
3218    else
3219        return avail;
3220}
3221
3222
3223static ssize_t
3224save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
3225                                                                size_t len)
3226{
3227    size_t avail, n_written, n_allowed;
3228
3229    avail = lsquic_stream_write_avail(stream);
3230    if (avail < len)
3231        len = avail;
3232    if (len == 0)
3233    {
3234        LSQ_DEBUG("zero-byte write (avail: %zu)", avail);
3235        return 0;
3236    }
3237
3238    n_allowed = stream_get_n_allowed(stream);
3239    assert(stream->sm_n_buffered + len <= n_allowed);
3240
3241    if (!stream->sm_buf)
3242    {
3243        stream->sm_buf = malloc(n_allowed);
3244        if (!stream->sm_buf)
3245            return -1;
3246        stream->sm_n_allocated = n_allowed;
3247    }
3248
3249    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3250                                            == (SMBF_IETF|SMBF_USE_HEADERS))
3251        len = update_buffered_hq_frames(stream, len, avail);
3252
3253    n_written = reader->lsqr_read(reader->lsqr_ctx,
3254                        stream->sm_buf + stream->sm_n_buffered, len);
3255    stream->sm_n_buffered += n_written;
3256    assert(stream->max_send_off >= stream->tosend_off + stream->sm_n_buffered);
3257    incr_conn_cap(stream, n_written);
3258    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
3259              n_written, stream->sm_n_buffered);
3260    if (0 != maybe_flush_stream(stream))
3261        return -1;
3262    return n_written;
3263}
3264
3265
3266static ssize_t
3267stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
3268{
3269    const struct stream_hq_frame *shf;
3270    size_t thresh, len, frames, total_len, n_allowed, nwritten;
3271    ssize_t nw;
3272
3273    len = reader->lsqr_size(reader->lsqr_ctx);
3274    if (len == 0)
3275        return 0;
3276
3277    frames = 0;
3278    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
3279                                        == (SMBF_IETF|SMBF_USE_HEADERS))
3280        STAILQ_FOREACH(shf, &stream->sm_hq_frames, shf_next)
3281            if (shf->shf_off >= stream->sm_payload)
3282                frames += stream_hq_frame_size(shf);
3283    total_len = len + frames + stream->sm_n_buffered;
3284    thresh = lsquic_stream_flush_threshold(stream, total_len);
3285    n_allowed = stream_get_n_allowed(stream);
3286    if (total_len <= n_allowed && total_len < thresh)
3287    {
3288        nwritten = 0;
3289        do
3290        {
3291            nw = save_to_buffer(stream, reader, len - nwritten);
3292            if (nw > 0)
3293                nwritten += (size_t) nw;
3294            else if (nw == 0)
3295                break;
3296            else
3297                return nw;
3298        }
3299        while (nwritten < len
3300                        && stream->sm_n_buffered < stream->sm_n_allocated);
3301        return nwritten;
3302    }
3303    else
3304        return stream_write_to_packets(stream, reader, thresh);
3305}
3306
3307
3308ssize_t
3309lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
3310{
3311    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
3312    return lsquic_stream_writev(stream, &iov, 1);
3313}
3314
3315
3316struct inner_reader_iovec {
3317    const struct iovec       *iov;
3318    const struct iovec *end;
3319    unsigned                  cur_iovec_off;
3320};
3321
3322
3323static size_t
3324inner_reader_iovec_read (void *ctx, void *buf, size_t count)
3325{
3326    struct inner_reader_iovec *const iro = ctx;
3327    unsigned char *p = buf;
3328    unsigned char *const end = p + count;
3329    unsigned n_tocopy;
3330
3331    while (iro->iov < iro->end && p < end)
3332    {
3333        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
3334        if (n_tocopy > (unsigned) (end - p))
3335            n_tocopy = end - p;
3336        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
3337                                                                    n_tocopy);
3338        p += n_tocopy;
3339        iro->cur_iovec_off += n_tocopy;
3340        if (iro->iov->iov_len == iro->cur_iovec_off)
3341        {
3342            ++iro->iov;
3343            iro->cur_iovec_off = 0;
3344        }
3345    }
3346
3347    return p + count - end;
3348}
3349
3350
3351static size_t
3352inner_reader_iovec_size (void *ctx)
3353{
3354    struct inner_reader_iovec *const iro = ctx;
3355    const struct iovec *iov;
3356    size_t size;
3357
3358    size = 0;
3359    for (iov = iro->iov; iov < iro->end; ++iov)
3360        size += iov->iov_len;
3361
3362    return size - iro->cur_iovec_off;
3363}
3364
3365
3366ssize_t
3367lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
3368                                                                    int iovcnt)
3369{
3370    COMMON_WRITE_CHECKS();
3371    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3372
3373    struct inner_reader_iovec iro = {
3374        .iov = iov,
3375        .end = iov + iovcnt,
3376        .cur_iovec_off = 0,
3377    };
3378    struct lsquic_reader reader = {
3379        .lsqr_read = inner_reader_iovec_read,
3380        .lsqr_size = inner_reader_iovec_size,
3381        .lsqr_ctx  = &iro,
3382    };
3383
3384    return stream_write(stream, &reader);
3385}
3386
3387
3388ssize_t
3389lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
3390{
3391    COMMON_WRITE_CHECKS();
3392    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
3393    return stream_write(stream, reader);
3394}
3395
3396
3397/* This bypasses COMMON_WRITE_CHECKS */
3398static ssize_t
3399stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
3400{
3401    const struct iovec iov[1] = {{ (void *) buf, sz, }};
3402    struct inner_reader_iovec iro = {
3403        .iov = iov,
3404        .end = iov + 1,
3405        .cur_iovec_off = 0,
3406    };
3407    struct lsquic_reader reader = {
3408        .lsqr_read = inner_reader_iovec_read,
3409        .lsqr_size = inner_reader_iovec_size,
3410        .lsqr_ctx  = &iro,
3411    };
3412    return stream_write(stream, &reader);
3413}
3414
3415
3416/* This limits the cumulative size of the compressed header fields */
3417#define MAX_HEADERS_SIZE (64 * 1024)
3418
3419static int
3420send_headers_ietf (struct lsquic_stream *stream,
3421                            const struct lsquic_http_headers *headers, int eos)
3422{
3423    enum qwh_status qwh;
3424    const size_t max_prefix_size =
3425                    lsquic_qeh_max_prefix_size(stream->conn_pub->u.ietf.qeh);
3426    const size_t max_push_size = 1 /* Stream type */ + 8 /* Push ID */;
3427    size_t prefix_sz, headers_sz, hblock_sz, push_sz;
3428    unsigned bits;
3429    ssize_t nw;
3430    unsigned char *header_block;
3431    enum lsqpack_enc_header_flags hflags;
3432    unsigned char buf[max_push_size + max_prefix_size + MAX_HEADERS_SIZE];
3433
3434    stream->stream_flags &= ~STREAM_PUSHING;
3435    stream->stream_flags |= STREAM_NOPUSH;
3436
3437    /* TODO: Optimize for the common case: write directly to sm_buf and fall
3438     * back to a larger buffer if that fails.
3439     */
3440    prefix_sz = max_prefix_size;
3441    headers_sz = sizeof(buf) - max_prefix_size - max_push_size;
3442    qwh = lsquic_qeh_write_headers(stream->conn_pub->u.ietf.qeh, stream->id, 0,
3443                headers, buf + max_push_size + max_prefix_size, &prefix_sz,
3444                &headers_sz, &stream->sm_hb_compl, &hflags);
3445
3446    if (!(qwh == QWH_FULL || qwh == QWH_PARTIAL))
3447    {
3448        if (qwh == QWH_ENOBUF)
3449            LSQ_INFO("not enough room for header block");
3450        else
3451            LSQ_WARN("internal error encoding and sending HTTP headers");
3452        return -1;
3453    }
3454
3455    if (hflags & LSQECH_REF_NEW_ENTRIES)
3456        stream->stream_flags |= STREAM_ENCODER_DEP;
3457
3458    if (stream->sm_promise)
3459    {
3460        assert(lsquic_stream_is_pushed(stream));
3461        bits = vint_val2bits(stream->sm_promise->pp_id);
3462        push_sz = 1 + (1 << bits);
3463        if (!stream_activate_hq_frame(stream,
3464                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PREAMBLE,
3465                SHF_FIXED_SIZE|SHF_PHANTOM, push_sz))
3466            return -1;
3467        buf[max_push_size + max_prefix_size - prefix_sz - push_sz] = HQUST_PUSH;
3468        vint_write(buf + max_push_size + max_prefix_size - prefix_sz
3469                    - push_sz + 1,stream->sm_promise->pp_id, bits, 1 << bits);
3470    }
3471    else
3472        push_sz = 0;
3473
3474    /* Construct contiguous header block buffer including HQ framing */
3475    header_block = buf + max_push_size + max_prefix_size - prefix_sz - push_sz;
3476    hblock_sz = push_sz + prefix_sz + headers_sz;
3477    if (!stream_activate_hq_frame(stream,
3478                stream->sm_payload + stream->sm_n_buffered + push_sz,
3479                HQFT_HEADERS, SHF_FIXED_SIZE, hblock_sz - push_sz))
3480        return -1;
3481
3482    if (qwh == QWH_FULL)
3483    {
3484        stream->sm_send_headers_state = SSHS_HBLOCK_SENDING;
3485        if (lsquic_stream_write_avail(stream))
3486        {
3487            nw = stream_write_buf(stream, header_block, hblock_sz);
3488            if (nw < 0)
3489            {
3490                LSQ_WARN("cannot write to stream: %s", strerror(errno));
3491                return -1;
3492            }
3493            if ((size_t) nw == hblock_sz)
3494            {
3495                stream->stream_flags |= STREAM_HEADERS_SENT;
3496                stream_hblock_sent(stream);
3497                LSQ_DEBUG("wrote all %zu bytes of header block", hblock_sz);
3498                return 0;
3499            }
3500            LSQ_DEBUG("wrote only %zd bytes of header block, stash", nw);
3501        }
3502        else
3503        {
3504            LSQ_DEBUG("cannot write to stream, stash all %zu bytes of "
3505                                        "header block", hblock_sz);
3506            nw = 0;
3507        }
3508    }
3509    else
3510    {
3511        stream->sm_send_headers_state = SSHS_ENC_SENDING;
3512        nw = 0;
3513    }
3514
3515    stream->sm_saved_want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
3516    stream_wantwrite(stream, 1);
3517
3518    stream->sm_header_block = malloc(hblock_sz - (size_t) nw);
3519    if (!stream->sm_header_block)
3520    {
3521        LSQ_WARN("cannot allocate %zd bytes to stash %s header block",
3522            hblock_sz - (size_t) nw, qwh == QWH_FULL ? "full" : "partial");
3523        return -1;
3524    }
3525    memcpy(stream->sm_header_block, header_block + (size_t) nw,
3526                                                hblock_sz - (size_t) nw);
3527    stream->sm_hblock_sz = hblock_sz - (size_t) nw;
3528    stream->sm_hblock_off = 0;
3529    LSQ_DEBUG("stashed %u bytes of header block", stream->sm_hblock_sz);
3530    return 0;
3531}
3532
3533
3534static int
3535send_headers_gquic (struct lsquic_stream *stream,
3536                            const struct lsquic_http_headers *headers, int eos)
3537{
3538    int s = lsquic_headers_stream_send_headers(stream->conn_pub->u.gquic.hs,
3539                stream->id, headers, eos, lsquic_stream_priority(stream));
3540    if (0 == s)
3541    {
3542        SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
3543        stream->stream_flags |= STREAM_HEADERS_SENT;
3544        if (eos)
3545            stream->stream_flags |= STREAM_FIN_SENT;
3546        LSQ_INFO("sent headers");
3547    }
3548    else
3549        LSQ_WARN("could not send headers: %s", strerror(errno));
3550    return s;
3551}
3552
3553
3554int
3555lsquic_stream_send_headers (lsquic_stream_t *stream,
3556                            const lsquic_http_headers_t *headers, int eos)
3557{
3558    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3559            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_U_WRITE_DONE)))
3560    {
3561        if (stream->sm_bflags & SMBF_IETF)
3562            return send_headers_ietf(stream, headers, eos);
3563        else
3564            return send_headers_gquic(stream, headers, eos);
3565    }
3566    else
3567    {
3568        LSQ_INFO("cannot send headers in this state");
3569        errno = EBADMSG;
3570        return -1;
3571    }
3572}
3573
3574
3575void
3576lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
3577{
3578    if (offset > stream->max_send_off)
3579    {
3580        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
3581        LSQ_DEBUG("update max send offset from 0x%"PRIX64" to "
3582            "0x%"PRIX64, stream->max_send_off, offset);
3583        stream->max_send_off = offset;
3584    }
3585    else
3586        LSQ_DEBUG("new offset 0x%"PRIX64" is not larger than old "
3587            "max send offset 0x%"PRIX64", ignoring", offset,
3588            stream->max_send_off);
3589}
3590
3591
3592/* This function is used to update offsets after handshake completes and we
3593 * learn of peer's limits from the handshake values.
3594 */
3595int
3596lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset)
3597{
3598    LSQ_DEBUG("setting max_send_off to %"PRIu64, offset);
3599    if (offset > stream->max_send_off)
3600    {
3601        lsquic_stream_window_update(stream, offset);
3602        return 0;
3603    }
3604    else if (offset < stream->tosend_off)
3605    {
3606        LSQ_INFO("new offset (%"PRIu64" bytes) is smaller than the amount of "
3607            "data already sent on this stream (%"PRIu64" bytes)", offset,
3608            stream->tosend_off);
3609        return -1;
3610    }
3611    else
3612    {
3613        stream->max_send_off = offset;
3614        return 0;
3615    }
3616}
3617
3618
3619void
3620lsquic_stream_reset (lsquic_stream_t *stream, uint64_t error_code)
3621{
3622    lsquic_stream_reset_ext(stream, error_code, 1);
3623}
3624
3625
3626void
3627lsquic_stream_reset_ext (lsquic_stream_t *stream, uint64_t error_code,
3628                         int do_close)
3629{
3630    if ((stream->stream_flags & STREAM_RST_SENT)
3631                                    || (stream->sm_qflags & SMQF_SEND_RST))
3632    {
3633        LSQ_INFO("reset already sent");
3634        return;
3635    }
3636
3637    SM_HISTORY_APPEND(stream, SHE_RESET);
3638
3639    LSQ_INFO("reset, error code %"PRIu64, error_code);
3640    stream->error_code = error_code;
3641
3642    if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
3643        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
3644                                                        next_send_stream);
3645    stream->sm_qflags &= ~SMQF_SENDING_FLAGS;
3646    stream->sm_qflags |= SMQF_SEND_RST;
3647
3648    if (stream->sm_qflags & SMQF_QPACK_DEC)
3649    {
3650        lsquic_qdh_cancel_stream(stream->conn_pub->u.ietf.qdh, stream);
3651        stream->sm_qflags &= ~SMQF_QPACK_DEC;
3652    }
3653
3654    drop_buffered_data(stream);
3655    maybe_elide_stream_frames(stream);
3656    maybe_schedule_call_on_close(stream);
3657
3658    if (do_close)
3659        lsquic_stream_close(stream);
3660    else
3661        maybe_conn_to_tickable_if_writeable(stream, 1);
3662}
3663
3664
3665lsquic_stream_id_t
3666lsquic_stream_id (const lsquic_stream_t *stream)
3667{
3668    return stream->id;
3669}
3670
3671
3672#if !defined(NDEBUG) && __GNUC__
3673__attribute__((weak))
3674#endif
3675struct lsquic_conn *
3676lsquic_stream_conn (const lsquic_stream_t *stream)
3677{
3678    return stream->conn_pub->lconn;
3679}
3680
3681
3682int
3683lsquic_stream_close (lsquic_stream_t *stream)
3684{
3685    LSQ_DEBUG("lsquic_stream_close() called");
3686    SM_HISTORY_APPEND(stream, SHE_CLOSE);
3687    if (lsquic_stream_is_closed(stream))
3688    {
3689        LSQ_INFO("Attempt to close an already-closed stream");
3690        errno = EBADF;
3691        return -1;
3692    }
3693    maybe_stream_shutdown_write(stream);
3694    stream_shutdown_read(stream);
3695    maybe_schedule_call_on_close(stream);
3696    maybe_finish_stream(stream);
3697    if (!(stream->stream_flags & STREAM_DELAYED_SW))
3698        maybe_conn_to_tickable_if_writeable(stream, 1);
3699    return 0;
3700}
3701
3702
3703#ifndef NDEBUG
3704#if __GNUC__
3705__attribute__((weak))
3706#endif
3707#endif
3708void
3709lsquic_stream_acked (struct lsquic_stream *stream,
3710                                            enum quic_frame_type frame_type)
3711{
3712    assert(stream->n_unacked);
3713    --stream->n_unacked;
3714    LSQ_DEBUG("ACKed; n_unacked: %u", stream->n_unacked);
3715    if (frame_type == QUIC_FRAME_RST_STREAM)
3716    {
3717        SM_HISTORY_APPEND(stream, SHE_RST_ACKED);
3718        LSQ_DEBUG("RESET that we sent has been acked by peer");
3719        stream->stream_flags |= STREAM_RST_ACKED;
3720    }
3721    if (0 == stream->n_unacked)
3722        maybe_finish_stream(stream);
3723}
3724
3725
3726void
3727lsquic_stream_push_req (lsquic_stream_t *stream,
3728                        struct uncompressed_headers *push_req)
3729{
3730    assert(!stream->push_req);
3731    stream->push_req = push_req;
3732    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
3733}
3734
3735
3736int
3737lsquic_stream_is_pushed (const lsquic_stream_t *stream)
3738{
3739    enum stream_id_type sit;
3740
3741    if (stream->sm_bflags & SMBF_IETF)
3742    {
3743        sit = stream->id & SIT_MASK;
3744        return sit == SIT_UNI_SERVER;
3745    }
3746    else
3747        return 1 & ~stream->id;
3748}
3749
3750
3751int
3752lsquic_stream_push_info (const lsquic_stream_t *stream,
3753                          lsquic_stream_id_t *ref_stream_id, void **hset)
3754{
3755    if (lsquic_stream_is_pushed(stream))
3756    {
3757        assert(stream->push_req);
3758        *ref_stream_id = stream->push_req->uh_stream_id;
3759        *hset          = stream->push_req->uh_hset;
3760        return 0;
3761    }
3762    else
3763        return -1;
3764}
3765
3766
3767static int
3768stream_uh_in_gquic (struct lsquic_stream *stream,
3769                                            struct uncompressed_headers *uh)
3770{
3771    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3772                                    && !(stream->stream_flags & STREAM_HAVE_UH))
3773    {
3774        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3775        LSQ_DEBUG("received uncompressed headers");
3776        stream->stream_flags |= STREAM_HAVE_UH;
3777        if (uh->uh_flags & UH_FIN)
3778        {
3779            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3780             * mark request as done:
3781             */
3782            if (stream->sm_bflags & SMBF_IETF)
3783                assert((stream->sm_bflags & SMBF_SERVER)
3784                                            && lsquic_stream_is_pushed(stream));
3785            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3786        }
3787        stream->uh = uh;
3788        if (uh->uh_oth_stream_id == 0)
3789        {
3790            if (uh->uh_weight)
3791                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3792        }
3793        else
3794            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3795                                                        uh->uh_oth_stream_id);
3796        return 0;
3797    }
3798    else
3799    {
3800        LSQ_ERROR("received unexpected uncompressed headers");
3801        return -1;
3802    }
3803}
3804
3805
3806static int
3807stream_uh_in_ietf (struct lsquic_stream *stream,
3808                                            struct uncompressed_headers *uh)
3809{
3810    int push_promise;
3811
3812    push_promise = lsquic_stream_header_is_pp(stream);
3813    if (!(stream->stream_flags & STREAM_HAVE_UH) && !push_promise)
3814    {
3815        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
3816        LSQ_DEBUG("received uncompressed headers");
3817        stream->stream_flags |= STREAM_HAVE_UH;
3818        if (uh->uh_flags & UH_FIN)
3819        {
3820            /* IETF QUIC only sets UH_FIN for a pushed stream on the server to
3821             * mark request as done:
3822             */
3823            if (stream->sm_bflags & SMBF_IETF)
3824                assert((stream->sm_bflags & SMBF_SERVER)
3825                                            && lsquic_stream_is_pushed(stream));
3826            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
3827        }
3828        stream->uh = uh;
3829        if (uh->uh_oth_stream_id == 0)
3830        {
3831            if (uh->uh_weight)
3832                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
3833        }
3834        else
3835            LSQ_NOTICE("don't know how to depend on stream %"PRIu64,
3836                                                        uh->uh_oth_stream_id);
3837    }
3838    else
3839    {
3840        /* Trailer should never make here, as we discard it in qdh */
3841        LSQ_DEBUG("discard %s header set",
3842                                    push_promise ? "push promise" : "trailer");
3843        if (uh->uh_hset)
3844            stream->conn_pub->enpub->enp_hsi_if
3845                            ->hsi_discard_header_set(uh->uh_hset);
3846        free(uh);
3847    }
3848
3849    return 0;
3850}
3851
3852
3853int
3854lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
3855{
3856    if (stream->sm_bflags & SMBF_USE_HEADERS)
3857    {
3858        if (stream->sm_bflags & SMBF_IETF)
3859            return stream_uh_in_ietf(stream, uh);
3860        else
3861            return stream_uh_in_gquic(stream, uh);
3862    }
3863    else
3864        return -1;
3865}
3866
3867
3868unsigned
3869lsquic_stream_priority (const lsquic_stream_t *stream)
3870{
3871    return 256 - stream->sm_priority;
3872}
3873
3874
3875int
3876lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
3877{
3878    /* The user should never get a reference to the special streams,
3879     * but let's check just in case:
3880     */
3881    if (lsquic_stream_is_critical(stream))
3882        return -1;
3883    if (priority < 1 || priority > 256)
3884        return -1;
3885    stream->sm_priority = 256 - priority;
3886    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
3887    LSQ_DEBUG("set priority to %u", priority);
3888    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
3889    return 0;
3890}
3891
3892
3893static int
3894maybe_send_priority_gquic (struct lsquic_stream *stream, unsigned priority)
3895{
3896    if ((stream->sm_bflags & SMBF_USE_HEADERS)
3897                            && (stream->stream_flags & STREAM_HEADERS_SENT))
3898    {
3899        /* We need to send headers only if we are a) using HEADERS stream
3900         * and b) we already sent initial headers.  If initial headers
3901         * have not been sent yet, stream priority will be sent in the
3902         * HEADERS frame.
3903         */
3904        return lsquic_headers_stream_send_priority(stream->conn_pub->u.gquic.hs,
3905                                                stream->id, 0, 0, priority);
3906    }
3907    else
3908        return 0;
3909}
3910
3911
3912static int
3913send_priority_ietf (struct lsquic_stream *stream, unsigned priority)
3914{
3915    LSQ_WARN("%s: TODO", __func__);     /* TODO */
3916    return -1;
3917}
3918
3919
3920int
3921lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
3922{
3923    if (0 == lsquic_stream_set_priority_internal(stream, priority))
3924    {
3925        if (stream->sm_bflags & SMBF_IETF)
3926            return send_priority_ietf(stream, priority);
3927        else
3928            return maybe_send_priority_gquic(stream, priority);
3929    }
3930    else
3931        return -1;
3932}
3933
3934
3935lsquic_stream_ctx_t *
3936lsquic_stream_get_ctx (const lsquic_stream_t *stream)
3937{
3938    fiu_return_on("stream/get_ctx", NULL);
3939    return stream->st_ctx;
3940}
3941
3942
3943int
3944lsquic_stream_refuse_push (lsquic_stream_t *stream)
3945{
3946    if (lsquic_stream_is_pushed(stream)
3947            && !(stream->sm_qflags & SMQF_SEND_RST)
3948            && !(stream->stream_flags & STREAM_RST_SENT))
3949    {
3950        LSQ_DEBUG("refusing pushed stream: send reset");
3951        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
3952        return 0;
3953    }
3954    else
3955        return -1;
3956}
3957
3958
3959size_t
3960lsquic_stream_mem_used (const struct lsquic_stream *stream)
3961{
3962    size_t size;
3963
3964    size = sizeof(stream);
3965    if (stream->sm_buf)
3966        size += stream->sm_n_allocated;
3967    if (stream->data_in)
3968        size += stream->data_in->di_if->di_mem_used(stream->data_in);
3969
3970    return size;
3971}
3972
3973
3974const lsquic_cid_t *
3975lsquic_stream_cid (const struct lsquic_stream *stream)
3976{
3977    return LSQUIC_LOG_CONN_ID;
3978}
3979
3980
3981void
3982lsquic_stream_dump_state (const struct lsquic_stream *stream)
3983{
3984    LSQ_DEBUG("flags: %X; read off: %"PRIu64, stream->stream_flags,
3985                                                    stream->read_offset);
3986    stream->data_in->di_if->di_dump_state(stream->data_in);
3987}
3988
3989
3990void *
3991lsquic_stream_get_hset (struct lsquic_stream *stream)
3992{
3993    void *hset;
3994
3995    if (lsquic_stream_is_reset(stream))
3996    {
3997        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
3998        errno = ECONNRESET;
3999        return NULL;
4000    }
4001
4002    if (!((stream->sm_bflags & SMBF_USE_HEADERS)
4003                                && (stream->stream_flags & STREAM_HAVE_UH)))
4004    {
4005        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
4006                                                        stream->stream_flags);
4007        return NULL;
4008    }
4009
4010    if (!stream->uh)
4011    {
4012        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
4013        return NULL;
4014    }
4015
4016    if (stream->uh->uh_flags & UH_H1H)
4017    {
4018        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
4019        return NULL;
4020    }
4021
4022    hset = stream->uh->uh_hset;
4023    stream->uh->uh_hset = NULL;
4024    destroy_uh(stream);
4025    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
4026    {
4027        stream->stream_flags |= STREAM_FIN_REACHED;
4028        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
4029    }
4030    LSQ_DEBUG("return header set");
4031    return hset;
4032}
4033
4034
4035void
4036lsquic_stream_set_stream_if (struct lsquic_stream *stream,
4037           const struct lsquic_stream_if *stream_if, void *stream_if_ctx)
4038{
4039    SM_HISTORY_APPEND(stream, SHE_IF_SWITCH);
4040    stream->stream_if    = stream_if;
4041    stream->sm_onnew_arg = stream_if_ctx;
4042    LSQ_DEBUG("switched interface");
4043    assert(stream->stream_flags & STREAM_ONNEW_DONE);
4044    stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
4045                                                      stream);
4046}
4047
4048
4049static int
4050update_type_hist_and_check (const struct lsquic_stream *stream,
4051                                                    struct hq_filter *filter)
4052{
4053    /* 3-bit codes: */
4054    enum {
4055        CODE_UNSET,
4056        CODE_HEADER,    /* H    Header  */
4057        CODE_DATA,      /* D    Data    */
4058        CODE_PLUS,      /* +    Plus: meaning previous frame repeats */
4059    };
4060    static const unsigned valid_seqs[] = {
4061        /* Ordered by expected frequency */
4062        0123,   /* HD+  */
4063        012,    /* HD   */
4064        01,     /* H    */
4065        01231,  /* HD+H */
4066        0121,   /* HDH  */
4067    };
4068    unsigned code, i;
4069
4070    switch (filter->hqfi_type)
4071    {
4072    case HQFT_HEADERS:
4073        code = CODE_HEADER;
4074        break;
4075    case HQFT_DATA:
4076        code = CODE_DATA;
4077        break;
4078    case HQFT_PUSH_PROMISE:
4079        /* [draft-ietf-quic-http-24], Section 7 */
4080        if ((stream->id & SIT_MASK) == SIT_BIDI_CLIENT
4081                                    && !(stream->sm_bflags & SMBF_SERVER))
4082            return 0;
4083        else
4084            return -1;
4085    case HQFT_CANCEL_PUSH:
4086    case HQFT_SETTINGS:
4087    case HQFT_GOAWAY:
4088    case HQFT_MAX_PUSH_ID:
4089        /* [draft-ietf-quic-http-24], Section 7 */
4090        return -1;
4091    default:
4092        /* Ignore unknown frames */
4093        return 0;
4094    }
4095
4096    if (filter->hqfi_hist_idx >= MAX_HQFI_ENTRIES)
4097        return -1;
4098
4099    if (filter->hqfi_hist_idx && (filter->hqfi_hist_buf & 7) == code)
4100    {
4101        filter->hqfi_hist_buf <<= 3;
4102        filter->hqfi_hist_buf |= CODE_PLUS;
4103        filter->hqfi_hist_idx++;
4104    }
4105    else if (filter->hqfi_hist_idx > 1
4106            && ((filter->hqfi_hist_buf >> 3) & 7) == code
4107            && (filter->hqfi_hist_buf & 7) == CODE_PLUS)
4108        /* Keep it at plus, do nothing */;
4109    else
4110    {
4111        filter->hqfi_hist_buf <<= 3;
4112        filter->hqfi_hist_buf |= code;
4113        filter->hqfi_hist_idx++;
4114    }
4115
4116    for (i = 0; i < sizeof(valid_seqs) / sizeof(valid_seqs[0]); ++i)
4117        if (filter->hqfi_hist_buf == valid_seqs[i])
4118            return 0;
4119
4120    return -1;
4121}
4122
4123
4124int
4125lsquic_stream_header_is_pp (const struct lsquic_stream *stream)
4126{
4127    return stream->sm_hq_filter.hqfi_type == HQFT_PUSH_PROMISE;
4128}
4129
4130
4131int
4132lsquic_stream_header_is_trailer (const struct lsquic_stream *stream)
4133{
4134    return (stream->stream_flags & STREAM_HAVE_UH)
4135        && stream->sm_hq_filter.hqfi_type == HQFT_HEADERS;
4136}
4137
4138
4139static void
4140verify_cl_on_new_data_frame (struct lsquic_stream *stream,
4141                                                    struct hq_filter *filter)
4142{
4143    struct lsquic_conn *lconn;
4144
4145    stream->sm_data_in += filter->hqfi_left;
4146    if (stream->sm_data_in > stream->sm_cont_len)
4147    {
4148        lconn = stream->conn_pub->lconn;
4149        lconn->cn_if->ci_abort_error(lconn, 1, HEC_GENERAL_PROTOCOL_ERROR,
4150            "number of bytes in DATA frames of stream %"PRIu64" exceeds "
4151            "content-length limit of %llu", stream->id, stream->sm_cont_len);
4152    }
4153}
4154
4155
4156static size_t
4157hq_read (void *ctx, const unsigned char *buf, size_t sz, int fin)
4158{
4159    struct lsquic_stream *const stream = ctx;
4160    struct hq_filter *const filter = &stream->sm_hq_filter;
4161    const unsigned char *p = buf, *prev;
4162    const unsigned char *const end = buf + sz;
4163    struct lsquic_conn *lconn;
4164    enum lsqpack_read_header_status rhs;
4165    int s;
4166
4167    while (p < end)
4168    {
4169        switch (filter->hqfi_state)
4170        {
4171        case HQFI_STATE_FRAME_HEADER_BEGIN:
4172            filter->hqfi_vint2_state.vr2s_state = 0;
4173            filter->hqfi_state = HQFI_STATE_FRAME_HEADER_CONTINUE;
4174            /* fall-through */
4175        case HQFI_STATE_FRAME_HEADER_CONTINUE:
4176            s = lsquic_varint_read_two(&p, end, &filter->hqfi_vint2_state);
4177            if (s < 0)
4178                break;
4179            filter->hqfi_flags |= HQFI_FLAG_BEGIN;
4180            filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
4181            LSQ_DEBUG("HQ frame type 0x%"PRIX64" at offset %"PRIu64", size %"PRIu64,
4182                filter->hqfi_type, stream->read_offset + (unsigned) (p - buf),
4183                filter->hqfi_left);
4184            if (0 != update_type_hist_and_check(stream, filter))
4185            {
4186                lconn = stream->conn_pub->lconn;
4187                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4188                LSQ_INFO("unexpected HTTP/3 frame sequence: %o",
4189                    filter->hqfi_hist_buf);
4190                lconn->cn_if->ci_abort_error(lconn, 1, HEC_FRAME_UNEXPECTED,
4191                    "unexpected HTTP/3 frame sequence on stream %"PRIu64,
4192                    stream->id);
4193                goto end;
4194            }
4195            if (filter->hqfi_left > 0)
4196            {
4197                if (filter->hqfi_type == HQFT_DATA)
4198                {
4199                    if (stream->sm_bflags & SMBF_VERIFY_CL)
4200                        verify_cl_on_new_data_frame(stream, filter);
4201                    goto end;
4202                }
4203                else if (filter->hqfi_type == HQFT_PUSH_PROMISE)
4204                {
4205                    if (stream->sm_bflags & SMBF_SERVER)
4206                    {
4207                        lconn = stream->conn_pub->lconn;
4208                        lconn->cn_if->ci_abort_error(lconn, 1,
4209                            HEC_FRAME_UNEXPECTED, "Received PUSH_PROMISE frame "
4210                            "on stream %"PRIu64" (clients are not supposed to "
4211                            "send those)", stream->id);
4212                        goto end;
4213                    }
4214                    else
4215                        filter->hqfi_state = HQFI_STATE_PUSH_ID_BEGIN;
4216                }
4217            }
4218            else
4219            {
4220                switch (filter->hqfi_type)
4221                {
4222                case HQFT_CANCEL_PUSH:
4223                case HQFT_GOAWAY:
4224                case HQFT_HEADERS:
4225                case HQFT_MAX_PUSH_ID:
4226                case HQFT_PUSH_PROMISE:
4227                case HQFT_SETTINGS:
4228                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4229                    LSQ_INFO("HQ frame of type %"PRIu64" cannot be size 0",
4230                                                            filter->hqfi_type);
4231                    abort_connection(stream);   /* XXX Overkill? */
4232                    goto end;
4233                default:
4234                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4235                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4236                    break;
4237                }
4238            }
4239            break;
4240        case HQFI_STATE_PUSH_ID_BEGIN:
4241            filter->hqfi_vint1_state.pos = 0;
4242            filter->hqfi_state = HQFI_STATE_PUSH_ID_CONTINUE;
4243            /* Fall-through */
4244        case HQFI_STATE_PUSH_ID_CONTINUE:
4245            prev = p;
4246            s = lsquic_varint_read_nb(&p, end, &filter->hqfi_vint1_state);
4247            filter->hqfi_left -= p - prev;
4248            if (s == 0)
4249                filter->hqfi_state = HQFI_STATE_READING_PAYLOAD;
4250                /* A bit of a white lie here */
4251            break;
4252        case HQFI_STATE_READING_PAYLOAD:
4253            if (filter->hqfi_type == HQFT_DATA)
4254                goto end;
4255            sz = filter->hqfi_left;
4256            if (sz > (uintptr_t) (end - p))
4257                sz = (uintptr_t) (end - p);
4258            switch (filter->hqfi_type)
4259            {
4260            case HQFT_HEADERS:
4261            case HQFT_PUSH_PROMISE:
4262                prev = p;
4263                if (filter->hqfi_flags & HQFI_FLAG_BEGIN)
4264                {
4265                    filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4266                    rhs = lsquic_qdh_header_in_begin(
4267                                stream->conn_pub->u.ietf.qdh,
4268                                stream, filter->hqfi_left, &p, sz);
4269                }
4270                else
4271                    rhs = lsquic_qdh_header_in_continue(
4272                                stream->conn_pub->u.ietf.qdh, stream, &p, sz);
4273                assert(p > prev || LQRHS_ERROR == rhs);
4274                filter->hqfi_left -= p - prev;
4275                if (filter->hqfi_left == 0)
4276                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4277                switch (rhs)
4278                {
4279                case LQRHS_DONE:
4280                    assert(filter->hqfi_left == 0);
4281                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4282                    break;
4283                case LQRHS_NEED:
4284                    stream->sm_qflags |= SMQF_QPACK_DEC;
4285                    break;
4286                case LQRHS_BLOCKED:
4287                    stream->sm_qflags |= SMQF_QPACK_DEC;
4288                    filter->hqfi_flags |= HQFI_FLAG_BLOCKED;
4289                    goto end;
4290                default:
4291                    assert(LQRHS_ERROR == rhs);
4292                    stream->sm_qflags &= ~SMQF_QPACK_DEC;
4293                    filter->hqfi_flags |= HQFI_FLAG_ERROR;
4294                    LSQ_INFO("error processing header block");
4295                    abort_connection(stream);   /* XXX Overkill? */
4296                    goto end;
4297                }
4298                break;
4299            default:
4300                /* Simply skip unknown frame type payload for now */
4301                filter->hqfi_flags &= ~HQFI_FLAG_BEGIN;
4302                p += sz;
4303                filter->hqfi_left -= sz;
4304                if (filter->hqfi_left == 0)
4305                    filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4306                break;
4307            }
4308            break;
4309        default:
4310            assert(0);
4311            goto end;
4312        }
4313    }
4314
4315  end:
4316    if (fin && p == end && filter->hqfi_state != HQFI_STATE_FRAME_HEADER_BEGIN)
4317    {
4318        LSQ_INFO("FIN at unexpected place in filter; state: %u",
4319                                                        filter->hqfi_state);
4320        filter->hqfi_flags |= HQFI_FLAG_ERROR;
4321/* From [draft-ietf-quic-http-16] Section 3.1:
4322 *               When a stream terminates cleanly, if the last frame on
4323 * the stream was truncated, this MUST be treated as a connection error
4324 * (see HTTP_MALFORMED_FRAME in Section 8.1).
4325 */
4326        abort_connection(stream);
4327    }
4328
4329    return p - buf;
4330}
4331
4332
4333static int
4334hq_filter_readable_now (const struct lsquic_stream *stream)
4335{
4336    const struct hq_filter *const filter = &stream->sm_hq_filter;
4337
4338    return (filter->hqfi_type == HQFT_DATA
4339                    && filter->hqfi_state == HQFI_STATE_READING_PAYLOAD)
4340        || (filter->hqfi_flags & HQFI_FLAG_ERROR)
4341        || stream->uh
4342        || (stream->stream_flags & STREAM_FIN_REACHED)
4343    ;
4344}
4345
4346
4347static int
4348hq_filter_readable (struct lsquic_stream *stream)
4349{
4350    struct hq_filter *const filter = &stream->sm_hq_filter;
4351    struct read_frames_status rfs;
4352
4353    if (filter->hqfi_flags & HQFI_FLAG_BLOCKED)
4354        return 0;
4355
4356    if (!hq_filter_readable_now(stream))
4357    {
4358        rfs = read_data_frames(stream, 0, hq_read, stream);
4359        if (rfs.total_nread == 0)
4360        {
4361            if (rfs.error)
4362            {
4363                filter->hqfi_flags |= HQFI_FLAG_ERROR;
4364                abort_connection(stream);   /* XXX Overkill? */
4365                return 1;   /* Collect error */
4366            }
4367            return 0;
4368        }
4369    }
4370
4371    return hq_filter_readable_now(stream);
4372}
4373
4374
4375static size_t
4376hq_filter_df (struct lsquic_stream *stream, struct data_frame *data_frame)
4377{
4378    struct hq_filter *const filter = &stream->sm_hq_filter;
4379    size_t nr;
4380
4381    if (!(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4382                                            && filter->hqfi_type == HQFT_DATA))
4383    {
4384        nr = hq_read(stream, data_frame->df_data + data_frame->df_read_off,
4385                            data_frame->df_size - data_frame->df_read_off,
4386                            data_frame->df_fin);
4387        if (nr)
4388        {
4389            stream->read_offset += nr;
4390            stream_consumed_bytes(stream);
4391        }
4392    }
4393    else
4394        nr = 0;
4395
4396    if (0 == (filter->hqfi_flags & HQFI_FLAG_ERROR))
4397    {
4398        data_frame->df_read_off += nr;
4399        if (filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4400                                        && filter->hqfi_type == HQFT_DATA)
4401            return MIN(filter->hqfi_left,
4402                    (unsigned) data_frame->df_size - data_frame->df_read_off);
4403        else
4404        {
4405            assert(data_frame->df_read_off == data_frame->df_size);
4406            return 0;
4407        }
4408    }
4409    else
4410    {
4411        data_frame->df_read_off = data_frame->df_size;
4412        return 0;
4413    }
4414}
4415
4416
4417static void
4418hq_decr_left (struct lsquic_stream *stream, size_t read)
4419{
4420    struct hq_filter *const filter = &stream->sm_hq_filter;
4421
4422    if (read)
4423    {
4424        assert(filter->hqfi_state == HQFI_STATE_READING_PAYLOAD
4425                                            && filter->hqfi_type == HQFT_DATA);
4426        assert(read <= filter->hqfi_left);
4427    }
4428
4429    filter->hqfi_left -= read;
4430    if (0 == filter->hqfi_left)
4431        filter->hqfi_state = HQFI_STATE_FRAME_HEADER_BEGIN;
4432}
4433
4434
4435struct qpack_dec_hdl *
4436lsquic_stream_get_qdh (const struct lsquic_stream *stream)
4437{
4438    return stream->conn_pub->u.ietf.qdh;
4439}
4440
4441
4442/* These are IETF QUIC states */
4443enum stream_state_sending
4444lsquic_stream_sending_state (const struct lsquic_stream *stream)
4445{
4446    if (0 == (stream->stream_flags & STREAM_RST_SENT))
4447    {
4448        if (stream->stream_flags & STREAM_FIN_SENT)
4449        {
4450            if (stream->n_unacked)
4451                return SSS_DATA_SENT;
4452            else
4453                return SSS_DATA_RECVD;
4454        }
4455        else
4456        {
4457            if (stream->tosend_off
4458                            || (stream->stream_flags & STREAM_BLOCKED_SENT))
4459                return SSS_SEND;
4460            else
4461                return SSS_READY;
4462        }
4463    }
4464    else if (stream->stream_flags & STREAM_RST_ACKED)
4465        return SSS_RESET_RECVD;
4466    else
4467        return SSS_RESET_SENT;
4468}
4469
4470
4471const char *const lsquic_sss2str[] =
4472{
4473    [SSS_READY]        =  "Ready",
4474    [SSS_SEND]         =  "Send",
4475    [SSS_DATA_SENT]    =  "Data Sent",
4476    [SSS_RESET_SENT]   =  "Reset Sent",
4477    [SSS_DATA_RECVD]   =  "Data Recvd",
4478    [SSS_RESET_RECVD]  =  "Reset Recvd",
4479};
4480
4481
4482const char *const lsquic_ssr2str[] =
4483{
4484    [SSR_RECV]         =  "Recv",
4485    [SSR_SIZE_KNOWN]   =  "Size Known",
4486    [SSR_DATA_RECVD]   =  "Data Recvd",
4487    [SSR_RESET_RECVD]  =  "Reset Recvd",
4488    [SSR_DATA_READ]    =  "Data Read",
4489    [SSR_RESET_READ]   =  "Reset Read",
4490};
4491
4492
4493/* These are IETF QUIC states */
4494enum stream_state_receiving
4495lsquic_stream_receiving_state (struct lsquic_stream *stream)
4496{
4497    uint64_t n_bytes;
4498
4499    if (0 == (stream->stream_flags & STREAM_RST_RECVD))
4500    {
4501        if (0 == (stream->stream_flags & STREAM_FIN_RECVD))
4502            return SSR_RECV;
4503        if (stream->stream_flags & STREAM_FIN_REACHED)
4504            return SSR_DATA_READ;
4505        if (0 == (stream->stream_flags & STREAM_DATA_RECVD))
4506        {
4507            n_bytes = stream->data_in->di_if->di_readable_bytes(
4508                                    stream->data_in, stream->read_offset);
4509            if (stream->read_offset + n_bytes == stream->sm_fin_off)
4510            {
4511                stream->stream_flags |= STREAM_DATA_RECVD;
4512                return SSR_DATA_RECVD;
4513            }
4514            else
4515                return SSR_SIZE_KNOWN;
4516        }
4517        else
4518            return SSR_DATA_RECVD;
4519    }
4520    else if (stream->stream_flags & STREAM_RST_READ)
4521        return SSR_RESET_READ;
4522    else
4523        return SSR_RESET_RECVD;
4524}
4525
4526
4527void
4528lsquic_stream_qdec_unblocked (struct lsquic_stream *stream)
4529{
4530    struct hq_filter *const filter = &stream->sm_hq_filter;
4531
4532    assert(stream->sm_qflags & SMQF_QPACK_DEC);
4533    assert(filter->hqfi_flags & HQFI_FLAG_BLOCKED);
4534
4535    filter->hqfi_flags &= ~HQFI_FLAG_BLOCKED;
4536    stream->conn_pub->cp_flags |= CP_STREAM_UNBLOCKED;
4537    LSQ_DEBUG("QPACK decoder unblocked");
4538}
4539
4540
4541int
4542lsquic_stream_is_rejected (const struct lsquic_stream *stream)
4543{
4544    return stream->stream_flags & STREAM_SS_RECVD;
4545}
4546
4547
4548int
4549lsquic_stream_can_push (const struct lsquic_stream *stream)
4550{
4551    if (lsquic_stream_is_pushed(stream))
4552        return 0;
4553    else if (stream->sm_bflags & SMBF_IETF)
4554        return (stream->sm_bflags & SMBF_USE_HEADERS)
4555            && !(stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_NOPUSH))
4556            && stream->sm_send_headers_state == SSHS_BEGIN
4557            ;
4558    else
4559        return 1;
4560}
4561
4562
4563static size_t
4564pp_reader_read (void *lsqr_ctx, void *buf, size_t count)
4565{
4566    struct push_promise *const promise = lsqr_ctx;
4567    unsigned char *dst = buf;
4568    unsigned char *const end = buf + count;
4569    size_t len;
4570
4571    while (dst < end)
4572    {
4573        switch (promise->pp_write_state)
4574        {
4575        case PPWS_ID0:
4576        case PPWS_ID1:
4577        case PPWS_ID2:
4578        case PPWS_ID3:
4579        case PPWS_ID4:
4580        case PPWS_ID5:
4581        case PPWS_ID6:
4582        case PPWS_ID7:
4583            *dst++ = promise->pp_encoded_push_id[promise->pp_write_state];
4584            ++promise->pp_write_state;
4585            break;
4586        case PPWS_PFX0:
4587            *dst++ = 0;
4588            ++promise->pp_write_state;
4589            break;
4590        case PPWS_PFX1:
4591            *dst++ = 0;
4592            ++promise->pp_write_state;
4593            break;
4594        case PPWS_HBLOCK:
4595            len = MIN(promise->pp_content_len - promise->pp_write_off,
4596                        (size_t) (end - dst));
4597            memcpy(dst, promise->pp_content_buf + promise->pp_write_off,
4598                                                                        len);
4599            promise->pp_write_off += len;
4600            dst += len;
4601            if (promise->pp_content_len == promise->pp_write_off)
4602            {
4603                LSQ_LOG1(LSQ_LOG_DEBUG, "finish writing push promise %"PRIu64
4604                    ": reset push state", promise->pp_id);
4605                promise->pp_write_state = PPWS_DONE;
4606            }
4607            goto end;
4608        default:
4609            goto end;
4610        }
4611    }
4612
4613  end:
4614    return dst - (unsigned char *) buf;
4615}
4616
4617
4618static size_t
4619pp_reader_size (void *lsqr_ctx)
4620{
4621    struct push_promise *const promise = lsqr_ctx;
4622    size_t size;
4623
4624    size = 0;
4625    switch (promise->pp_write_state)
4626    {
4627    case PPWS_ID0:
4628    case PPWS_ID1:
4629    case PPWS_ID2:
4630    case PPWS_ID3:
4631    case PPWS_ID4:
4632    case PPWS_ID5:
4633    case PPWS_ID6:
4634    case PPWS_ID7:
4635        size += 8 - promise->pp_write_state;
4636        /* fall-through */
4637    case PPWS_PFX0:
4638        ++size;
4639        /* fall-through */
4640    case PPWS_PFX1:
4641        ++size;
4642        /* fall-through */
4643    case PPWS_HBLOCK:
4644        size += promise->pp_content_len - promise->pp_write_off;
4645        break;
4646    default:
4647        break;
4648    }
4649
4650    return size;
4651}
4652
4653
4654static void
4655init_pp_reader (struct push_promise *promise, struct lsquic_reader *reader)
4656{
4657    reader->lsqr_read = pp_reader_read;
4658    reader->lsqr_size = pp_reader_size;
4659    reader->lsqr_ctx = promise;
4660}
4661
4662
4663static void
4664on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
4665{
4666    struct lsquic_reader pp_reader;
4667    struct push_promise *promise;
4668    ssize_t nw;
4669    int want_write;
4670
4671    assert(stream_is_pushing_promise(stream));
4672
4673    promise = SLIST_FIRST(&stream->sm_promises);
4674    init_pp_reader(promise, &pp_reader);
4675    nw = stream_write(stream, &pp_reader);
4676    if (nw > 0)
4677    {
4678        LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
4679            nw, promise->pp_write_state == PPWS_DONE ? "done" : "not done");
4680        if (promise->pp_write_state == PPWS_DONE)
4681        {
4682            /* Restore want_write flag */
4683            want_write = !!(stream->sm_qflags & SMQF_WANT_WRITE);
4684            if (want_write != stream->sm_saved_want_write)
4685                (void) lsquic_stream_wantwrite(stream,
4686                                                stream->sm_saved_want_write);
4687        }
4688    }
4689    else if (nw < 0)
4690    {
4691        LSQ_WARN("could not write push promise (wrapper)");
4692        /* XXX What should happen if we hit an error? TODO */
4693    }
4694}
4695
4696
4697/* Success means that the push promise has been placed on sm_promises list and
4698 * the stream now owns it.  Failure means that the push promise should be
4699 * destroyed by the caller.
4700 *
4701 * A push promise is written immediately.  If it cannot be written to packets
4702 * or buffered whole, the stream is marked as unable to push further promises.
4703 */
4704int
4705lsquic_stream_push_promise (struct lsquic_stream *stream,
4706                                                struct push_promise *promise)
4707{
4708    struct lsquic_reader pp_reader;
4709    unsigned bits, len;
4710    ssize_t nw;
4711
4712    assert(stream->sm_bflags & SMBF_IETF);
4713    assert(lsquic_stream_can_push(stream));
4714
4715    bits = vint_val2bits(promise->pp_id);
4716    len = 1 << bits;
4717    promise->pp_write_state = 8 - len;
4718    vint_write(promise->pp_encoded_push_id + 8 - len, promise->pp_id,
4719                                                            bits, 1 << bits);
4720
4721    if (!stream_activate_hq_frame(stream,
4722                stream->sm_payload + stream->sm_n_buffered, HQFT_PUSH_PROMISE,
4723                SHF_FIXED_SIZE, pp_reader_size(promise)))
4724        return -1;
4725
4726    stream->stream_flags |= STREAM_PUSHING;
4727
4728    init_pp_reader(promise, &pp_reader);
4729    nw = stream_write(stream, &pp_reader);
4730    if (nw > 0)
4731    {
4732        SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);
4733        ++promise->pp_refcnt;
4734        if (promise->pp_write_state == PPWS_DONE)
4735            LSQ_DEBUG("fully wrote promise %"PRIu64, promise->pp_id);
4736        else
4737        {
4738            LSQ_DEBUG("partially wrote promise %"PRIu64" (state: %d, off: %u)"
4739                ", disable further pushing", promise->pp_id,
4740                promise->pp_write_state, promise->pp_write_off);
4741            stream->stream_flags |= STREAM_NOPUSH;
4742            stream->sm_saved_want_write =
4743                                    !!(stream->sm_qflags & SMQF_WANT_WRITE);
4744            stream_wantwrite(stream, 1);
4745        }
4746        return 0;
4747    }
4748    else
4749    {
4750        if (nw < 0)
4751            LSQ_WARN("failure writing push promise");
4752        stream->stream_flags |= STREAM_NOPUSH;
4753        stream->stream_flags &= ~STREAM_PUSHING;
4754        return -1;
4755    }
4756}
4757
4758
4759int
4760lsquic_stream_verify_len (struct lsquic_stream *stream,
4761                                                unsigned long long cont_len)
4762{
4763    if ((stream->sm_bflags & (SMBF_IETF|SMBF_USE_HEADERS))
4764                                            == (SMBF_IETF|SMBF_USE_HEADERS))
4765    {
4766        stream->sm_cont_len = cont_len;
4767        stream->sm_bflags |= SMBF_VERIFY_CL;
4768        LSQ_DEBUG("will verify that incoming DATA frames have %llu bytes",
4769            cont_len);
4770        return 0;
4771    }
4772    else
4773        return -1;
4774}
4775