lsquic_stream.c revision e8bd737d
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_frame_reader.h"
44#include "lsquic_conn.h"
45#include "lsquic_data_in_if.h"
46#include "lsquic_parse.h"
47#include "lsquic_packet_out.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_senhist.h"
50#include "lsquic_pacer.h"
51#include "lsquic_cubic.h"
52#include "lsquic_send_ctl.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
153#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
154        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
155            LSQ_DEBUG("history: [%.*s]",                                    \
156                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
157                (stream)->sm_hist_buf);                                     \
158    } while (0)
159#else
160#   define SM_HISTORY_APPEND(stream, event)
161#   define SM_HISTORY_DUMP_REMAINING(stream)
162#endif
163
164
165static int
166stream_inside_callback (const lsquic_stream_t *stream)
167{
168    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
169}
170
171
172/* Here, "readable" means that the user is able to read from the stream. */
173static void
174maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
175{
176    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
177    {
178        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
179                                           stream->conn_pub->lconn);
180    }
181}
182
183
184/* Here, "writeable" means that data can be put into packets to be
185 * scheduled to be sent out.
186 */
187static void
188maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream)
189{
190    if (!stream_inside_callback(stream) &&
191            lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
192          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
193    {
194        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
195                                           stream->conn_pub->lconn);
196    }
197}
198
199
200static int
201stream_stalled (const lsquic_stream_t *stream)
202{
203    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
204           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
205                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
206}
207
208
209/* TODO: The logic to figure out whether the stream is connection limited
210 * should be taken out of the constructor.  The caller should specify this
211 * via one of enum stream_ctor_flags.
212 */
213lsquic_stream_t *
214lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
215                       const struct lsquic_stream_if *stream_if,
216                       void *stream_if_ctx, unsigned initial_window,
217                       unsigned initial_send_off,
218                       enum stream_ctor_flags ctor_flags)
219{
220    lsquic_cfcw_t *cfcw;
221    lsquic_stream_t *stream;
222
223    stream = calloc(1, sizeof(*stream));
224    if (!stream)
225        return NULL;
226
227    stream->stream_if = stream_if;
228    stream->id        = id;
229    stream->conn_pub  = conn_pub;
230    stream->sm_onnew_arg = stream_if_ctx;
231    if (!initial_window)
232        initial_window = 16 * 1024;
233    if (LSQUIC_STREAM_HANDSHAKE == id ||
234        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
235        cfcw = NULL;
236    else
237    {
238        cfcw = &conn_pub->cfcw;
239        stream->stream_flags |= STREAM_CONN_LIMITED;
240        if (conn_pub->hs)
241            stream->stream_flags |= STREAM_USE_HEADERS;
242        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
243    }
244    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
245    if (!initial_send_off)
246        initial_send_off = 16 * 1024;
247    stream->max_send_off = initial_send_off;
248    if (ctor_flags & SCF_USE_DI_HASH)
249        stream->data_in = data_in_hash_new(conn_pub, id, 0);
250    else
251        stream->data_in = data_in_nocopy_new(conn_pub, id);
252    LSQ_DEBUG("created stream %u @%p", id, stream);
253    SM_HISTORY_APPEND(stream, SHE_CREATED);
254    if (ctor_flags & SCF_DI_AUTOSWITCH)
255        stream->stream_flags |= STREAM_AUTOSWITCH;
256    if (ctor_flags & SCF_CALL_ON_NEW)
257        lsquic_stream_call_on_new(stream);
258    if (ctor_flags & SCF_DISP_RW_ONCE)
259        stream->stream_flags |= STREAM_RW_ONCE;
260    return stream;
261}
262
263
264void
265lsquic_stream_call_on_new (lsquic_stream_t *stream)
266{
267    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
268    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
269    {
270        LSQ_DEBUG("calling on_new_stream");
271        SM_HISTORY_APPEND(stream, SHE_ONNEW);
272        stream->stream_flags |= STREAM_ONNEW_DONE;
273        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
274                                                          stream);
275    }
276}
277
278
279static void
280decr_conn_cap (struct lsquic_stream *stream, size_t incr)
281{
282    if (stream->stream_flags & STREAM_CONN_LIMITED)
283    {
284        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
285        stream->conn_pub->conn_cap.cc_sent -= incr;
286    }
287}
288
289
290static void
291drop_buffered_data (struct lsquic_stream *stream)
292{
293    decr_conn_cap(stream, stream->sm_n_buffered);
294    stream->sm_n_buffered = 0;
295    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
296        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
297}
298
299
300void
301lsquic_stream_destroy (lsquic_stream_t *stream)
302{
303    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
304    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
305                                                            STREAM_ONNEW_DONE)
306    {
307        stream->stream_flags |= STREAM_ONCLOSE_DONE;
308        stream->stream_if->on_close(stream, stream->st_ctx);
309    }
310    if (stream->stream_flags & STREAM_SENDING_FLAGS)
311        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
312    if (stream->stream_flags & STREAM_WANT_READ)
313        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
314    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
315        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
316    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
317        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
318    drop_buffered_data(stream);
319    lsquic_sfcw_consume_rem(&stream->fc);
320    drop_frames_in(stream);
321    free(stream->push_req);
322    free(stream->uh);
323    free(stream->sm_buf);
324    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
325    SM_HISTORY_DUMP_REMAINING(stream);
326    free(stream);
327}
328
329
330static int
331stream_is_finished (const lsquic_stream_t *stream)
332{
333    return lsquic_stream_is_closed(stream)
334           /* n_unacked checks that no outgoing packets that reference this
335            * stream are outstanding:
336            */
337        && 0 == stream->n_unacked
338           /* This checks that no packets that reference this stream will
339            * become outstanding:
340            */
341        && 0 == (stream->stream_flags & STREAM_SEND_RST)
342        && ((stream->stream_flags & STREAM_FORCE_FINISH)
343          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
344           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
345}
346
347
348static void
349maybe_finish_stream (lsquic_stream_t *stream)
350{
351    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
352                                                    stream_is_finished(stream))
353    {
354        LSQ_DEBUG("stream %u is now finished", stream->id);
355        SM_HISTORY_APPEND(stream, SHE_FINISHED);
356        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
357            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
358                                                    next_service_stream);
359        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
360    }
361}
362
363
364static void
365maybe_schedule_call_on_close (lsquic_stream_t *stream)
366{
367    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
368                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
369            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
370    {
371        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
372            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
373                                                    next_service_stream);
374        stream->stream_flags |= STREAM_CALL_ONCLOSE;
375        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
376        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
377    }
378}
379
380
381void
382lsquic_stream_call_on_close (lsquic_stream_t *stream)
383{
384    assert(stream->stream_flags & STREAM_ONNEW_DONE);
385    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
386    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
387        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
388                                                    next_service_stream);
389    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
390    {
391        LSQ_DEBUG("calling on_close for stream %u", stream->id);
392        stream->stream_flags |= STREAM_ONCLOSE_DONE;
393        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
394        stream->stream_if->on_close(stream, stream->st_ctx);
395    }
396    else
397        assert(0);
398}
399
400
401int
402lsquic_stream_readable (const lsquic_stream_t *stream)
403{
404    /* A stream is readable if one of the following is true: */
405    return
406        /* - It is already finished: in that case, lsquic_stream_read() will
407         *   return 0.
408         */
409            (stream->stream_flags & STREAM_FIN_REACHED)
410        /* - The stream is reset, by either side.  In this case,
411         *   lsquic_stream_read() will return -1 (we want the user to be
412         *   able to collect the error).
413         */
414        ||  (stream->stream_flags & STREAM_RST_FLAGS)
415        /* - Either we are not in HTTP mode or the HTTP headers have been
416         *   received and the headers or data from the stream can be read.
417         */
418        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
419                                                        == STREAM_USE_HEADERS)
420            && (stream->uh != NULL
421                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
422                                                        stream->read_offset)))
423    ;
424}
425
426
427size_t
428lsquic_stream_write_avail (const struct lsquic_stream *stream)
429{
430    uint64_t stream_avail, conn_avail;
431
432    stream_avail = stream->max_send_off - stream->tosend_off
433                                                - stream->sm_n_buffered;
434    if (stream->stream_flags & STREAM_CONN_LIMITED)
435    {
436        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
437        if (conn_avail < stream_avail)
438            return conn_avail;
439    }
440
441    return stream_avail;
442}
443
444
445int
446lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
447{
448    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
449                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
450    {
451        return -1;
452    }
453    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
454    {
455        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
456            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
457                                                    next_send_stream);
458        stream->stream_flags |= STREAM_SEND_WUF;
459    }
460    return 0;
461}
462
463
464int
465lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
466{
467    uint64_t max_off;
468    int got_next_offset;
469    enum ins_frame ins_frame;
470
471    assert(frame->packet_in);
472
473    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
474    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
475        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
476
477    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
478                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
479    {
480        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
481        lsquic_malo_put(frame);
482        return -1;
483    }
484
485    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
486    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
487    if (INS_FRAME_OK == ins_frame)
488    {
489        /* Update maximum offset in the flow controller and check for flow
490         * control violation:
491         */
492        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
493        if (0 != lsquic_stream_update_sfcw(stream, max_off))
494            return -1;
495        if (frame->data_frame.df_fin)
496        {
497            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
498            stream->stream_flags |= STREAM_FIN_RECVD;
499            maybe_finish_stream(stream);
500        }
501        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
502                (stream->data_in->di_flags & DI_SWITCH_IMPL))
503        {
504            stream->data_in = stream->data_in->di_if->di_switch_impl(
505                                        stream->data_in, stream->read_offset);
506            if (!stream->data_in)
507            {
508                stream->data_in = data_in_error_new();
509                return -1;
510            }
511        }
512        if (got_next_offset)
513            /* Checking the offset saves di_get_frame() call */
514            maybe_conn_to_tickable_if_readable(stream);
515        return 0;
516    }
517    else if (INS_FRAME_DUP == ins_frame)
518    {
519        return 0;
520    }
521    else
522    {
523        assert(INS_FRAME_ERR == ins_frame);
524        return -1;
525    }
526}
527
528
529static void
530drop_frames_in (lsquic_stream_t *stream)
531{
532    if (stream->data_in)
533    {
534        stream->data_in->di_if->di_destroy(stream->data_in);
535        /* To avoid checking whether `data_in` is set, just set to the error
536         * data-in stream.  It does the right thing after incoming data is
537         * dropped.
538         */
539        stream->data_in = data_in_error_new();
540    }
541}
542
543
544static void
545maybe_elide_stream_frames (struct lsquic_stream *stream)
546{
547    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
548    {
549        if (stream->n_unacked)
550            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
551                                                stream->id);
552        stream->stream_flags |= STREAM_FRAMES_ELIDED;
553    }
554}
555
556
557int
558lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
559                      uint32_t error_code)
560{
561
562    if (stream->stream_flags & STREAM_RST_RECVD)
563    {
564        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
565        return 0;
566    }
567
568    SM_HISTORY_APPEND(stream, SHE_RST_IN);
569    /* This flag must always be set, even if we are "ignoring" it: it is
570     * used by elision code.
571     */
572    stream->stream_flags |= STREAM_RST_RECVD;
573
574    if ((stream->stream_flags & STREAM_FIN_RECVD) &&
575                    /* Pushed streams have fake STREAM_FIN_RECVD set, thus
576                     * we need a special check:
577                     */
578                                            !lsquic_stream_is_pushed(stream))
579    {
580        LSQ_DEBUG("ignore RST_STREAM frame after FIN is received");
581        return 0;
582    }
583
584    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
585    {
586        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
587            "smaller than that of byte following the last byte we have seen: "
588            "0x%"PRIX64, stream->id, offset,
589            lsquic_sfcw_get_max_recv_off(&stream->fc));
590        return -1;
591    }
592
593    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
594    {
595        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
596            " violates flow control", stream->id, offset);
597        return -1;
598    }
599
600    /* Let user collect error: */
601    maybe_conn_to_tickable_if_readable(stream);
602
603    lsquic_sfcw_consume_rem(&stream->fc);
604    drop_frames_in(stream);
605    drop_buffered_data(stream);
606    maybe_elide_stream_frames(stream);
607
608    if (!(stream->stream_flags &
609                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
610        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
611
612    stream->stream_flags |= STREAM_RST_RECVD;
613
614    maybe_finish_stream(stream);
615    maybe_schedule_call_on_close(stream);
616
617    return 0;
618}
619
620
621uint64_t
622lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
623{
624    assert(stream->stream_flags & STREAM_SEND_WUF);
625    stream->stream_flags &= ~STREAM_SEND_WUF;
626    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
627        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
628    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
629}
630
631
632void
633lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
634{
635    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
636    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
637    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
638    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
639        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
640}
641
642
643void
644lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
645{
646    assert(stream->stream_flags & STREAM_SEND_RST);
647    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
648    stream->stream_flags &= ~STREAM_SEND_RST;
649    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
650        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
651    stream->stream_flags |= STREAM_RST_SENT;
652    maybe_finish_stream(stream);
653}
654
655
656static size_t
657read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
658{
659    struct uncompressed_headers *uh = stream->uh;
660    size_t n_avail = uh->uh_size - uh->uh_off;
661    if (n_avail < len)
662        len = n_avail;
663    memcpy(dst, uh->uh_headers + uh->uh_off, len);
664    uh->uh_off += len;
665    if (uh->uh_off == uh->uh_size)
666    {
667        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
668        free(uh);
669        stream->uh = NULL;
670        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
671        {
672            stream->stream_flags |= STREAM_FIN_REACHED;
673            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
674        }
675    }
676    return len;
677}
678
679
680/* This function returns 0 when EOF is reached.
681 */
682ssize_t
683lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
684                     int iovcnt)
685{
686    size_t total_nread, nread;
687    int processed_frames, read_unc_headers, iovidx;
688    unsigned char *p, *end;
689
690    SM_HISTORY_APPEND(stream, SHE_USER_READ);
691
692#define NEXT_IOV() do {                                             \
693    ++iovidx;                                                       \
694    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
695        ++iovidx;                                                   \
696    if (iovidx < iovcnt)                                            \
697    {                                                               \
698        p = iov[iovidx].iov_base;                                   \
699        end = p + iov[iovidx].iov_len;                              \
700    }                                                               \
701    else                                                            \
702        p = end = NULL;                                             \
703} while (0)
704
705#define AVAIL() (end - p)
706
707    if (stream->stream_flags & STREAM_RST_FLAGS)
708    {
709        errno = ECONNRESET;
710        return -1;
711    }
712    if (stream->stream_flags & STREAM_U_READ_DONE)
713    {
714        errno = EBADF;
715        return -1;
716    }
717    if (stream->stream_flags & STREAM_FIN_REACHED)
718        return 0;
719
720    total_nread = 0;
721    processed_frames = 0;
722
723    iovidx = -1;
724    NEXT_IOV();
725
726    if (stream->uh && AVAIL())
727    {
728        read_unc_headers = 1;
729        do
730        {
731            nread = read_uh(stream, p, AVAIL());
732            p += nread;
733            total_nread += nread;
734            if (p == end)
735                NEXT_IOV();
736        }
737        while (stream->uh && AVAIL());
738    }
739    else
740        read_unc_headers = 0;
741
742    struct data_frame *data_frame;
743    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
744    {
745        ++processed_frames;
746        size_t navail = data_frame->df_size - data_frame->df_read_off;
747        size_t ntowrite = AVAIL();
748        if (navail < ntowrite)
749            ntowrite = navail;
750        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
751        p += ntowrite;
752        data_frame->df_read_off += ntowrite;
753        stream->read_offset += ntowrite;
754        total_nread += ntowrite;
755        if (data_frame->df_read_off == data_frame->df_size)
756        {
757            const int fin = data_frame->df_fin;
758            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
759            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
760                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
761            {
762                stream->data_in = stream->data_in->di_if->di_switch_impl(
763                                            stream->data_in, stream->read_offset);
764                if (!stream->data_in)
765                {
766                    stream->data_in = data_in_error_new();
767                    return -1;
768                }
769            }
770            if (fin)
771            {
772                stream->stream_flags |= STREAM_FIN_REACHED;
773                break;
774            }
775        }
776        if (p == end)
777            NEXT_IOV();
778    }
779
780    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
781                                        total_nread, stream->read_offset);
782
783    if (processed_frames)
784    {
785        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
786        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
787        {
788            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
789                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
790            stream->stream_flags |= STREAM_SEND_WUF;
791            maybe_conn_to_tickable_if_writeable(stream);
792        }
793    }
794
795    if (processed_frames || read_unc_headers)
796    {
797        return total_nread;
798    }
799    else
800    {
801        assert(0 == total_nread);
802        errno = EWOULDBLOCK;
803        return -1;
804    }
805}
806
807
808ssize_t
809lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
810{
811    struct iovec iov = { .iov_base = buf, .iov_len = len, };
812    return lsquic_stream_readv(stream, &iov, 1);
813}
814
815
816static void
817stream_shutdown_read (lsquic_stream_t *stream)
818{
819    if (!(stream->stream_flags & STREAM_U_READ_DONE))
820    {
821        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
822        stream->stream_flags |= STREAM_U_READ_DONE;
823        stream_wantread(stream, 0);
824        maybe_finish_stream(stream);
825    }
826}
827
828
829static void
830stream_shutdown_write (lsquic_stream_t *stream)
831{
832    if (stream->stream_flags & STREAM_U_WRITE_DONE)
833        return;
834
835    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
836    stream->stream_flags |= STREAM_U_WRITE_DONE;
837    stream_wantwrite(stream, 0);
838
839    /* Don't bother to check whether there is anything else to write if
840     * the flags indicate that nothing else should be written.
841     */
842    if (!(stream->stream_flags &
843                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
844    {
845        if (stream->sm_n_buffered == 0)
846        {
847            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
848                                                 stream))
849            {
850                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
851                stream->stream_flags |= STREAM_FIN_SENT;
852            }
853            else
854            {
855                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
856                          "flag in it");
857                (void) stream_flush_nocheck(stream);
858            }
859        }
860        else
861            (void) stream_flush_nocheck(stream);
862    }
863}
864
865
866int
867lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
868{
869    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
870    if (lsquic_stream_is_closed(stream))
871    {
872        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
873        errno = EBADF;
874        return -1;
875    }
876    /* 0: read, 1: write: 2: read and write
877     */
878    if (how < 0 || how > 2)
879    {
880        errno = EINVAL;
881        return -1;
882    }
883
884    if (how)
885        stream_shutdown_write(stream);
886    if (how != 1)
887        stream_shutdown_read(stream);
888
889    maybe_finish_stream(stream);
890    maybe_schedule_call_on_close(stream);
891    if (how)
892        maybe_conn_to_tickable_if_writeable(stream);
893
894    return 0;
895}
896
897
898void
899lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
900{
901    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
902    if (LSQUIC_STREAM_HANDSHAKE == stream->id
903        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
904                                LSQUIC_STREAM_HEADERS == stream->id))
905    {
906        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
907        stream->stream_flags |= STREAM_FORCE_FINISH;
908        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
909    }
910    maybe_finish_stream(stream);
911    maybe_schedule_call_on_close(stream);
912}
913
914
915static void
916fake_reset_unused_stream (lsquic_stream_t *stream)
917{
918    stream->stream_flags |=
919        STREAM_RST_RECVD    /* User will pick this up on read or write */
920      | STREAM_RST_SENT     /* Don't send anything else on this stream */
921    ;
922
923    /* Cancel all writes to the network scheduled for this stream: */
924    if (stream->stream_flags & STREAM_SENDING_FLAGS)
925        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
926                                                next_send_stream);
927    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
928
929    LSQ_DEBUG("fake-reset stream %u%s",
930                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
931    maybe_finish_stream(stream);
932    maybe_schedule_call_on_close(stream);
933}
934
935
936/* This function should only be called for locally-initiated streams whose ID
937 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
938 * frame sent by peer but we have not yet received it and created a stream.
939 * In this situation, we mark the stream as reset, so that user's on_read or
940 * on_write event callback picks up the error.  That, in turn, should result
941 * in stream being closed.
942 *
943 * If we have received any data frames on this stream, this probably indicates
944 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
945 * lower than this.  However, we still try to handle it gracefully and peform
946 * a shutdown, as if the stream was not reset.
947 */
948void
949lsquic_stream_received_goaway (lsquic_stream_t *stream)
950{
951    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
952    if (0 == stream->read_offset &&
953                            stream->data_in->di_if->di_empty(stream->data_in))
954        fake_reset_unused_stream(stream);       /* Normal condition */
955    else
956    {   /* This is odd, let's handle it the best we can: */
957        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
958        lsquic_stream_shutdown_internal(stream);
959    }
960}
961
962
963uint64_t
964lsquic_stream_read_offset (const lsquic_stream_t *stream)
965{
966    return stream->read_offset;
967}
968
969
970static int
971stream_wantread (lsquic_stream_t *stream, int is_want)
972{
973    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
974    const int new_val = !!is_want;
975    if (old_val != new_val)
976    {
977        if (new_val)
978        {
979            if (!old_val)
980                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
981                                                            next_read_stream);
982            stream->stream_flags |= STREAM_WANT_READ;
983        }
984        else
985        {
986            stream->stream_flags &= ~STREAM_WANT_READ;
987            if (old_val)
988                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
989                                                            next_read_stream);
990        }
991    }
992    return old_val;
993}
994
995
996static void
997maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
998{
999    assert(STREAM_WRITE_Q_FLAGS & flag);
1000    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1001        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1002                                                        next_write_stream);
1003    stream->stream_flags |= flag;
1004}
1005
1006
1007static void
1008maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1009{
1010    assert(STREAM_WRITE_Q_FLAGS & flag);
1011    if (stream->stream_flags & flag)
1012    {
1013        stream->stream_flags &= ~flag;
1014        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1015            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1016                                                        next_write_stream);
1017    }
1018}
1019
1020
1021static int
1022stream_wantwrite (lsquic_stream_t *stream, int is_want)
1023{
1024    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1025    const int new_val = !!is_want;
1026    if (old_val != new_val)
1027    {
1028        if (new_val)
1029            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1030        else
1031            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1032    }
1033    return old_val;
1034}
1035
1036
1037int
1038lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1039{
1040    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1041    {
1042        if (is_want)
1043            maybe_conn_to_tickable_if_readable(stream);
1044        return stream_wantread(stream, is_want);
1045    }
1046    else
1047    {
1048        errno = EBADF;
1049        return -1;
1050    }
1051}
1052
1053
1054int
1055lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1056{
1057    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1058        return stream_wantwrite(stream, is_want);
1059    else
1060    {
1061        errno = EBADF;
1062        return -1;
1063    }
1064}
1065
1066
1067#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1068    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1069
1070
1071static void
1072stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1073{
1074    unsigned no_progress_count, no_progress_limit;
1075    enum stream_flags flags;
1076    uint64_t size;
1077
1078    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1079
1080    no_progress_count = 0;
1081    while ((stream->stream_flags & STREAM_WANT_READ)
1082                                            && lsquic_stream_readable(stream))
1083    {
1084        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1085        size  = stream->read_offset;
1086
1087        stream->stream_if->on_read(stream, stream->st_ctx);
1088
1089        if (no_progress_limit && size == stream->read_offset &&
1090                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1091        {
1092            ++no_progress_count;
1093            if (no_progress_count >= no_progress_limit)
1094            {
1095                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1096                    "progress) in user code reading from stream",
1097                    no_progress_count,
1098                    no_progress_count == 1 ? "" : "s");
1099                break;
1100            }
1101        }
1102        else
1103            no_progress_count = 0;
1104    }
1105}
1106
1107
1108static void
1109stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1110{
1111    unsigned no_progress_count, no_progress_limit;
1112    enum stream_flags flags;
1113
1114    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1115
1116    no_progress_count = 0;
1117    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1118    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1119                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1120           && lsquic_stream_write_avail(stream))
1121    {
1122        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1123
1124        stream->stream_if->on_write(stream, stream->st_ctx);
1125
1126        if (no_progress_limit &&
1127            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1128        {
1129            ++no_progress_count;
1130            if (no_progress_count >= no_progress_limit)
1131            {
1132                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1133                    "progress) in user code writing to stream",
1134                    no_progress_count,
1135                    no_progress_count == 1 ? "" : "s");
1136                break;
1137            }
1138        }
1139        else
1140            no_progress_count = 0;
1141    }
1142}
1143
1144
1145static void
1146stream_dispatch_read_events_once (lsquic_stream_t *stream)
1147{
1148    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1149    {
1150        stream->stream_if->on_read(stream, stream->st_ctx);
1151    }
1152}
1153
1154
1155static void
1156maybe_mark_as_blocked (lsquic_stream_t *stream)
1157{
1158    struct lsquic_conn_cap *cc;
1159
1160    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1161    {
1162        if (stream->blocked_off < stream->max_send_off)
1163        {
1164            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1165            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1166                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1167                                                            next_send_stream);
1168            stream->stream_flags |= STREAM_SEND_BLOCKED;
1169            LSQ_DEBUG("marked stream-blocked at stream offset "
1170                                            "%"PRIu64, stream->blocked_off);
1171        }
1172        else
1173            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1174                " has been, or is about to be, sent", stream->blocked_off);
1175    }
1176
1177    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1178        && (cc = &stream->conn_pub->conn_cap,
1179                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1180    {
1181        if (cc->cc_blocked < cc->cc_max)
1182        {
1183            cc->cc_blocked = cc->cc_max;
1184            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1185            LSQ_DEBUG("marked connection-blocked at connection offset "
1186                                                    "%"PRIu64, cc->cc_max);
1187        }
1188        else
1189            LSQ_DEBUG("stream has already been marked connection-blocked "
1190                "at offset %"PRIu64, cc->cc_blocked);
1191    }
1192}
1193
1194
1195void
1196lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1197{
1198    assert(stream->stream_flags & STREAM_WANT_READ);
1199
1200    if (stream->stream_flags & STREAM_RW_ONCE)
1201        stream_dispatch_read_events_once(stream);
1202    else
1203        stream_dispatch_read_events_loop(stream);
1204}
1205
1206
1207void
1208lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1209{
1210    int progress;
1211    uint64_t tosend_off;
1212    unsigned short n_buffered;
1213    enum stream_flags flags;
1214
1215    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1216    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1217    tosend_off = stream->tosend_off;
1218    n_buffered = stream->sm_n_buffered;
1219
1220    if (stream->stream_flags & STREAM_WANT_FLUSH)
1221        (void) stream_flush(stream);
1222
1223    if (stream->stream_flags & STREAM_RW_ONCE)
1224    {
1225        if ((stream->stream_flags & STREAM_WANT_WRITE)
1226            && lsquic_stream_write_avail(stream))
1227        {
1228            stream->stream_if->on_write(stream, stream->st_ctx);
1229        }
1230    }
1231    else
1232        stream_dispatch_write_events_loop(stream);
1233
1234    /* Progress means either flags or offsets changed: */
1235    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1236                        stream->tosend_off == tosend_off &&
1237                            stream->sm_n_buffered == n_buffered);
1238
1239    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1240    {
1241        if (progress)
1242        {   /* Move the stream to the end of the list to ensure fairness. */
1243            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1244                                                            next_write_stream);
1245            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1246                                                            next_write_stream);
1247        }
1248    }
1249}
1250
1251
1252static size_t
1253inner_reader_empty_size (void *ctx)
1254{
1255    return 0;
1256}
1257
1258
1259static size_t
1260inner_reader_empty_read (void *ctx, void *buf, size_t count)
1261{
1262    return 0;
1263}
1264
1265
1266static int
1267stream_flush (lsquic_stream_t *stream)
1268{
1269    struct lsquic_reader empty_reader;
1270    ssize_t nw;
1271
1272    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1273    assert(stream->sm_n_buffered > 0 ||
1274        /* Flushing is also used to packetize standalone FIN: */
1275        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1276                                                    == STREAM_U_WRITE_DONE));
1277
1278    empty_reader.lsqr_size = inner_reader_empty_size;
1279    empty_reader.lsqr_read = inner_reader_empty_read;
1280    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1281    nw = stream_write_to_packets(stream, &empty_reader, 0);
1282
1283    if (nw >= 0)
1284    {
1285        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1286        return 0;
1287    }
1288    else
1289        return -1;
1290}
1291
1292
1293static int
1294stream_flush_nocheck (lsquic_stream_t *stream)
1295{
1296    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1297    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1298    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1299
1300    return stream_flush(stream);
1301}
1302
1303
1304int
1305lsquic_stream_flush (lsquic_stream_t *stream)
1306{
1307    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1308    {
1309        LSQ_DEBUG("cannot flush closed stream");
1310        errno = EBADF;
1311        return -1;
1312    }
1313
1314    if (0 == stream->sm_n_buffered)
1315    {
1316        LSQ_DEBUG("flushing 0 bytes: noop");
1317        return 0;
1318    }
1319
1320    return stream_flush_nocheck(stream);
1321}
1322
1323
1324/* The flush threshold is the maximum size of stream data that can be sent
1325 * in a full packet.
1326 */
1327static size_t
1328flush_threshold (const lsquic_stream_t *stream)
1329{
1330    enum packet_out_flags flags;
1331    enum lsquic_packno_bits bits;
1332    unsigned packet_header_sz, stream_header_sz;
1333    size_t threshold;
1334
1335    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1336    flags = bits << POBIT_SHIFT;
1337    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1338        flags |= PO_CONN_ID;
1339
1340    packet_header_sz = lsquic_po_header_length(flags);
1341    stream_header_sz = stream->conn_pub->lconn->cn_pf
1342            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1343
1344    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1345              - packet_header_sz - stream_header_sz;
1346    return threshold;
1347}
1348
1349
1350#define COMMON_WRITE_CHECKS() do {                                          \
1351    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1352                                                   == STREAM_USE_HEADERS)   \
1353    {                                                                       \
1354        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1355        errno = EILSEQ;                                                     \
1356        return -1;                                                          \
1357    }                                                                       \
1358    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1359    {                                                                       \
1360        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1361        errno = ECONNRESET;                                                 \
1362        return -1;                                                          \
1363    }                                                                       \
1364    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1365    {                                                                       \
1366        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1367                                                                "writing"); \
1368        errno = EBADF;                                                      \
1369        return -1;                                                          \
1370    }                                                                       \
1371} while (0)
1372
1373
1374struct frame_gen_ctx
1375{
1376    lsquic_stream_t      *fgc_stream;
1377    struct lsquic_reader *fgc_reader;
1378    /* We keep our own count of how many bytes were read from reader because
1379     * some readers are external.  The external caller does not have to rely
1380     * on our count, but it can.
1381     */
1382    size_t                fgc_nread_from_reader;
1383};
1384
1385
1386static size_t
1387frame_gen_size (void *ctx)
1388{
1389    struct frame_gen_ctx *fg_ctx = ctx;
1390    size_t available, remaining;
1391
1392    /* Make sure we are not writing past available size: */
1393    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1394    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1395    if (available < remaining)
1396        remaining = available;
1397
1398    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1399}
1400
1401
1402static int
1403frame_gen_fin (void *ctx)
1404{
1405    struct frame_gen_ctx *fg_ctx = ctx;
1406    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1407        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1408        /* Do not use frame_gen_size() as it may chop the real size: */
1409        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1410}
1411
1412
1413static void
1414incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1415{
1416    if (stream->stream_flags & STREAM_CONN_LIMITED)
1417    {
1418        stream->conn_pub->conn_cap.cc_sent += incr;
1419        assert(stream->conn_pub->conn_cap.cc_sent
1420                                    <= stream->conn_pub->conn_cap.cc_max);
1421    }
1422}
1423
1424
1425static size_t
1426frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1427{
1428    struct frame_gen_ctx *fg_ctx = ctx;
1429    unsigned char *p = begin_buf;
1430    unsigned char *const end = p + len;
1431    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1432    size_t n_written, available, n_to_write;
1433
1434    if (stream->sm_n_buffered > 0)
1435    {
1436        if (len <= stream->sm_n_buffered)
1437        {
1438            memcpy(p, stream->sm_buf, len);
1439            memmove(stream->sm_buf, stream->sm_buf + len,
1440                                                stream->sm_n_buffered - len);
1441            stream->sm_n_buffered -= len;
1442            stream->tosend_off += len;
1443            *fin = frame_gen_fin(fg_ctx);
1444            return len;
1445        }
1446        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1447        p += stream->sm_n_buffered;
1448        stream->sm_n_buffered = 0;
1449    }
1450
1451    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1452    n_to_write = end - p;
1453    if (n_to_write > available)
1454        n_to_write = available;
1455    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1456                                              n_to_write);
1457    p += n_written;
1458    fg_ctx->fgc_nread_from_reader += n_written;
1459    *fin = frame_gen_fin(fg_ctx);
1460    stream->tosend_off += p - (const unsigned char *) begin_buf;
1461    incr_conn_cap(stream, n_written);
1462    return p - (const unsigned char *) begin_buf;
1463}
1464
1465
1466static void
1467check_flush_threshold (lsquic_stream_t *stream)
1468{
1469    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1470                            stream->tosend_off >= stream->sm_flush_to)
1471    {
1472        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1473                                                    stream->sm_flush_to);
1474        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1475    }
1476}
1477
1478
1479static struct lsquic_packet_out *
1480get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1481                      const struct lsquic_stream *stream)
1482{
1483    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1484}
1485
1486
1487static struct lsquic_packet_out * (* const get_packet[])(
1488    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1489{
1490    lsquic_send_ctl_get_packet_for_stream,
1491    get_brand_new_packet,
1492};
1493
1494
1495static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1496stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1497{
1498    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1499    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1500    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1501    unsigned stream_header_sz, need_at_least, off;
1502    lsquic_packet_out_t *packet_out;
1503    int len, s, hsk;
1504
1505    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1506                                                        stream->tosend_off);
1507    need_at_least = stream_header_sz + (size > 0);
1508    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1509    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1510    if (!packet_out)
1511        return SWTP_STOP;
1512
1513    off = packet_out->po_data_sz;
1514    len = pf->pf_gen_stream_frame(
1515                packet_out->po_data + packet_out->po_data_sz,
1516                lsquic_packet_out_avail(packet_out), stream->id,
1517                stream->tosend_off,
1518                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1519    if (len < 0)
1520    {
1521        LSQ_ERROR("could not generate stream frame");
1522        return SWTP_ERROR;
1523    }
1524
1525    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1526                            packet_out->po_data + packet_out->po_data_sz, len);
1527    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1528    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1529    if (0 == lsquic_packet_out_avail(packet_out))
1530        packet_out->po_flags |= PO_STREAM_END;
1531    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1532                                     stream, QUIC_FRAME_STREAM, off, len);
1533    if (s != 0)
1534    {
1535        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1536        return SWTP_ERROR;
1537    }
1538
1539    check_flush_threshold(stream);
1540
1541    /* XXX: I don't like it that this is here */
1542    if (hsk && !(packet_out->po_flags & PO_HELLO))
1543    {
1544        lsquic_packet_out_zero_pad(packet_out);
1545        packet_out->po_flags |= PO_HELLO;
1546        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1547    }
1548
1549    return SWTP_OK;
1550}
1551
1552
1553static void
1554abort_connection (struct lsquic_stream *stream)
1555{
1556    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1557        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1558                                                next_service_stream);
1559    stream->stream_flags |= STREAM_ABORT_CONN;
1560    LSQ_WARN("connection will be aborted");
1561}
1562
1563
1564static ssize_t
1565stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1566                         size_t thresh)
1567{
1568    size_t size;
1569    ssize_t nw;
1570    struct frame_gen_ctx fg_ctx = {
1571        .fgc_stream = stream,
1572        .fgc_reader = reader,
1573        .fgc_nread_from_reader = 0,
1574    };
1575
1576    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1577           || frame_gen_fin(&fg_ctx))
1578    {
1579        switch (stream_write_to_packet(&fg_ctx, size))
1580        {
1581        case SWTP_OK:
1582            if (frame_gen_fin(&fg_ctx))
1583            {
1584                stream->stream_flags |= STREAM_FIN_SENT;
1585                goto end;
1586            }
1587            else
1588                break;
1589        case SWTP_STOP:
1590            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1591            goto end;
1592        default:
1593            abort_connection(stream);
1594            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1595            return -1;
1596        }
1597    }
1598
1599    if (thresh)
1600    {
1601        assert(size < thresh);
1602        assert(size >= stream->sm_n_buffered);
1603        size -= stream->sm_n_buffered;
1604        if (size > 0)
1605        {
1606            nw = save_to_buffer(stream, reader, size);
1607            if (nw < 0)
1608                return -1;
1609            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1610        }
1611    }
1612    else
1613    {
1614        /* We count flushed data towards both stream and connection limits,
1615         * so we should have been able to packetize all of it:
1616         */
1617        assert(0 == stream->sm_n_buffered);
1618        assert(size == 0);
1619    }
1620
1621    maybe_mark_as_blocked(stream);
1622
1623  end:
1624    return fg_ctx.fgc_nread_from_reader;
1625}
1626
1627
1628/* Perform an implicit flush when we hit connection limit while buffering
1629 * data.  This is to prevent a (theoretical) stall:
1630 *
1631 * Imagine a number of streams, all of which buffered some data.  The buffered
1632 * data is up to connection cap, which means no further writes are possible.
1633 * None of them flushes, which means that data is not sent and connection
1634 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1635 */
1636static void
1637maybe_flush_stream (struct lsquic_stream *stream)
1638{
1639    if (stream->sm_n_buffered > 0
1640          && (stream->stream_flags & STREAM_CONN_LIMITED)
1641            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1642        stream_flush_nocheck(stream);
1643}
1644
1645
1646static ssize_t
1647save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1648                                                                size_t len)
1649{
1650    size_t avail, n_written;
1651
1652    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1653
1654    if (!stream->sm_buf)
1655    {
1656        stream->sm_buf = malloc(SM_BUF_SIZE);
1657        if (!stream->sm_buf)
1658            return -1;
1659    }
1660
1661    avail = lsquic_stream_write_avail(stream);
1662    if (avail < len)
1663        len = avail;
1664
1665    n_written = reader->lsqr_read(reader->lsqr_ctx,
1666                        stream->sm_buf + stream->sm_n_buffered, len);
1667    stream->sm_n_buffered += n_written;
1668    incr_conn_cap(stream, n_written);
1669    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1670              n_written, stream->sm_n_buffered);
1671    maybe_flush_stream(stream);
1672    return n_written;
1673}
1674
1675
1676static ssize_t
1677stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1678{
1679    size_t thresh, len;
1680
1681    thresh = flush_threshold(stream);
1682    len = reader->lsqr_size(reader->lsqr_ctx);
1683    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1684                                    stream->sm_n_buffered + len < thresh)
1685        return save_to_buffer(stream, reader, len);
1686    else
1687        return stream_write_to_packets(stream, reader, thresh);
1688}
1689
1690
1691ssize_t
1692lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1693{
1694    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1695    return lsquic_stream_writev(stream, &iov, 1);
1696}
1697
1698
1699struct inner_reader_iovec {
1700    const struct iovec       *iov;
1701    const struct iovec *end;
1702    unsigned                  cur_iovec_off;
1703};
1704
1705
1706static size_t
1707inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1708{
1709    struct inner_reader_iovec *const iro = ctx;
1710    unsigned char *p = buf;
1711    unsigned char *const end = p + count;
1712    unsigned n_tocopy;
1713
1714    while (iro->iov < iro->end && p < end)
1715    {
1716        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1717        if (n_tocopy > (unsigned) (end - p))
1718            n_tocopy = end - p;
1719        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1720                                                                    n_tocopy);
1721        p += n_tocopy;
1722        iro->cur_iovec_off += n_tocopy;
1723        if (iro->iov->iov_len == iro->cur_iovec_off)
1724        {
1725            ++iro->iov;
1726            iro->cur_iovec_off = 0;
1727        }
1728    }
1729
1730    return p + count - end;
1731}
1732
1733
1734static size_t
1735inner_reader_iovec_size (void *ctx)
1736{
1737    struct inner_reader_iovec *const iro = ctx;
1738    const struct iovec *iov;
1739    size_t size;
1740
1741    size = 0;
1742    for (iov = iro->iov; iov < iro->end; ++iov)
1743        size += iov->iov_len;
1744
1745    return size - iro->cur_iovec_off;
1746}
1747
1748
1749ssize_t
1750lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1751                                                                    int iovcnt)
1752{
1753    COMMON_WRITE_CHECKS();
1754    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1755
1756    struct inner_reader_iovec iro = {
1757        .iov = iov,
1758        .end = iov + iovcnt,
1759        .cur_iovec_off = 0,
1760    };
1761    struct lsquic_reader reader = {
1762        .lsqr_read = inner_reader_iovec_read,
1763        .lsqr_size = inner_reader_iovec_size,
1764        .lsqr_ctx  = &iro,
1765    };
1766
1767    return stream_write(stream, &reader);
1768}
1769
1770
1771ssize_t
1772lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1773{
1774    COMMON_WRITE_CHECKS();
1775    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1776    return stream_write(stream, reader);
1777}
1778
1779
1780int
1781lsquic_stream_send_headers (lsquic_stream_t *stream,
1782                            const lsquic_http_headers_t *headers, int eos)
1783{
1784    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1785                                                     STREAM_U_WRITE_DONE))
1786                == STREAM_USE_HEADERS)
1787    {
1788        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1789                    stream->id, headers, eos, lsquic_stream_priority(stream));
1790        if (0 == s)
1791        {
1792            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1793            stream->stream_flags |= STREAM_HEADERS_SENT;
1794            if (eos)
1795                stream->stream_flags |= STREAM_FIN_SENT;
1796            LSQ_INFO("sent headers for stream %u", stream->id);
1797        }
1798        else
1799            LSQ_WARN("could not send headers: %s", strerror(errno));
1800        return s;
1801    }
1802    else
1803    {
1804        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1805        errno = EBADMSG;
1806        return -1;
1807    }
1808}
1809
1810
1811void
1812lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1813{
1814    if (offset > stream->max_send_off)
1815    {
1816        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1817        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1818            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1819        stream->max_send_off = offset;
1820    }
1821    else
1822        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1823            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1824            stream->max_send_off);
1825}
1826
1827
1828/* This function is used to update offsets after handshake completes and we
1829 * learn of peer's limits from the handshake values.
1830 */
1831int
1832lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1833{
1834    LSQ_DEBUG("setting max_send_off to %u", offset);
1835    if (offset > stream->max_send_off)
1836    {
1837        lsquic_stream_window_update(stream, offset);
1838        return 0;
1839    }
1840    else if (offset < stream->tosend_off)
1841    {
1842        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1843            "already sent on this stream (%"PRIu64" bytes)", offset,
1844            stream->tosend_off);
1845        return -1;
1846    }
1847    else
1848    {
1849        stream->max_send_off = offset;
1850        return 0;
1851    }
1852}
1853
1854
1855void
1856lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1857{
1858    lsquic_stream_reset_ext(stream, error_code, 1);
1859}
1860
1861
1862void
1863lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1864                         int do_close)
1865{
1866    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1867    {
1868        LSQ_INFO("reset already sent");
1869        return;
1870    }
1871
1872    SM_HISTORY_APPEND(stream, SHE_RESET);
1873
1874    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1875    stream->error_code = error_code;
1876
1877    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1878        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1879                                                        next_send_stream);
1880    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1881    stream->stream_flags |= STREAM_SEND_RST;
1882
1883    drop_buffered_data(stream);
1884    maybe_elide_stream_frames(stream);
1885    maybe_schedule_call_on_close(stream);
1886
1887    if (do_close)
1888        lsquic_stream_close(stream);
1889    else
1890        maybe_conn_to_tickable_if_writeable(stream);
1891}
1892
1893
1894unsigned
1895lsquic_stream_id (const lsquic_stream_t *stream)
1896{
1897    return stream->id;
1898}
1899
1900
1901struct lsquic_conn *
1902lsquic_stream_conn (const lsquic_stream_t *stream)
1903{
1904    return stream->conn_pub->lconn;
1905}
1906
1907
1908int
1909lsquic_stream_close (lsquic_stream_t *stream)
1910{
1911    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1912    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1913    if (lsquic_stream_is_closed(stream))
1914    {
1915        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1916        errno = EBADF;
1917        return -1;
1918    }
1919    stream_shutdown_write(stream);
1920    stream_shutdown_read(stream);
1921    maybe_schedule_call_on_close(stream);
1922    maybe_finish_stream(stream);
1923    maybe_conn_to_tickable_if_writeable(stream);
1924    return 0;
1925}
1926
1927
1928#ifndef NDEBUG
1929#if __GNUC__
1930__attribute__((weak))
1931#endif
1932#endif
1933void
1934lsquic_stream_acked (lsquic_stream_t *stream)
1935{
1936    assert(stream->n_unacked);
1937    --stream->n_unacked;
1938    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
1939    if (0 == stream->n_unacked)
1940        maybe_finish_stream(stream);
1941}
1942
1943
1944void
1945lsquic_stream_push_req (lsquic_stream_t *stream,
1946                        struct uncompressed_headers *push_req)
1947{
1948    assert(!stream->push_req);
1949    stream->push_req = push_req;
1950    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
1951}
1952
1953
1954int
1955lsquic_stream_is_pushed (const lsquic_stream_t *stream)
1956{
1957    return 1 & ~stream->id;
1958}
1959
1960
1961int
1962lsquic_stream_push_info (const lsquic_stream_t *stream,
1963        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
1964{
1965    if (lsquic_stream_is_pushed(stream))
1966    {
1967        assert(stream->push_req);
1968        *ref_stream_id = stream->push_req->uh_stream_id;
1969        *headers       = stream->push_req->uh_headers;
1970        *headers_sz    = stream->push_req->uh_size;
1971        return 0;
1972    }
1973    else
1974        return -1;
1975}
1976
1977
1978int
1979lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
1980{
1981    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
1982    {
1983        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
1984        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
1985        stream->stream_flags |= STREAM_HAVE_UH;
1986        if (uh->uh_flags & UH_FIN)
1987            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
1988        stream->uh = uh;
1989        if (uh->uh_oth_stream_id == 0)
1990        {
1991            if (uh->uh_weight)
1992                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
1993        }
1994        else
1995            LSQ_NOTICE("don't know how to depend on stream %u",
1996                                                        uh->uh_oth_stream_id);
1997        return 0;
1998    }
1999    else
2000    {
2001        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2002        return -1;
2003    }
2004}
2005
2006
2007unsigned
2008lsquic_stream_priority (const lsquic_stream_t *stream)
2009{
2010    return 256 - stream->sm_priority;
2011}
2012
2013
2014int
2015lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2016{
2017    /* The user should never get a reference to the special streams,
2018     * but let's check just in case:
2019     */
2020    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2021        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2022                                LSQUIC_STREAM_HEADERS == stream->id))
2023        return -1;
2024    if (priority < 1 || priority > 256)
2025        return -1;
2026    stream->sm_priority = 256 - priority;
2027    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2028    LSQ_DEBUG("set priority to %u", priority);
2029    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2030    return 0;
2031}
2032
2033
2034int
2035lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2036{
2037    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2038    {
2039        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2040                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2041        {
2042            /* We need to send headers only if we are a) using HEADERS stream
2043             * and b) we already sent initial headers.  If initial headers
2044             * have not been sent yet, stream priority will be sent in the
2045             * HEADERS frame.
2046             */
2047            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2048                                                    stream->id, 0, 0, priority);
2049        }
2050        else
2051            return 0;
2052    }
2053    else
2054        return -1;
2055}
2056
2057
2058lsquic_stream_ctx_t *
2059lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2060{
2061    return stream->st_ctx;
2062}
2063
2064
2065int
2066lsquic_stream_refuse_push (lsquic_stream_t *stream)
2067{
2068    if (lsquic_stream_is_pushed(stream) &&
2069                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2070    {
2071        LSQ_DEBUG("refusing pushed stream: send reset");
2072        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2073        return 0;
2074    }
2075    else
2076        return -1;
2077}
2078
2079
2080size_t
2081lsquic_stream_mem_used (const struct lsquic_stream *stream)
2082{
2083    size_t size;
2084
2085    size = sizeof(stream);
2086    if (stream->sm_buf)
2087        size += SM_BUF_SIZE;
2088    if (stream->data_in)
2089        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2090
2091    return size;
2092}
2093
2094
2095lsquic_cid_t
2096lsquic_stream_cid (const struct lsquic_stream *stream)
2097{
2098    return LSQUIC_LOG_CONN_ID;
2099}
2100