lsquic_stream.c revision 82f3bcef
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_frame_reader.h"
44#include "lsquic_conn.h"
45#include "lsquic_data_in_if.h"
46#include "lsquic_parse.h"
47#include "lsquic_packet_out.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_senhist.h"
50#include "lsquic_pacer.h"
51#include "lsquic_cubic.h"
52#include "lsquic_send_ctl.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
153#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
154        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
155            LSQ_DEBUG("history: [%.*s]",                                    \
156                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
157                (stream)->sm_hist_buf);                                     \
158    } while (0)
159#else
160#   define SM_HISTORY_APPEND(stream, event)
161#   define SM_HISTORY_DUMP_REMAINING(stream)
162#endif
163
164
165static int
166stream_inside_callback (const lsquic_stream_t *stream)
167{
168    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
169}
170
171
172static void
173maybe_conn_to_tickable (lsquic_stream_t *stream)
174{
175    if (!stream_inside_callback(stream))
176        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
177                                           stream->conn_pub->lconn);
178}
179
180
181/* Here, "readable" means that the user is able to read from the stream. */
182static void
183maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
184{
185    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
186    {
187        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
188                                           stream->conn_pub->lconn);
189    }
190}
191
192
193/* Here, "writeable" means that data can be put into packets to be
194 * scheduled to be sent out.
195 *
196 * If `check_can_send' is false, it means that we do not need to check
197 * whether packets can be sent.  This check was already performed when
198 * we packetized stream data.
199 */
200static void
201maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
202                                                    int check_can_send)
203{
204    if (!stream_inside_callback(stream) &&
205            (!check_can_send
206             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
207          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
208    {
209        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
210                                           stream->conn_pub->lconn);
211    }
212}
213
214
215static int
216stream_stalled (const lsquic_stream_t *stream)
217{
218    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
219           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
220                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
221}
222
223
224/* TODO: The logic to figure out whether the stream is connection limited
225 * should be taken out of the constructor.  The caller should specify this
226 * via one of enum stream_ctor_flags.
227 */
228lsquic_stream_t *
229lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
230                       const struct lsquic_stream_if *stream_if,
231                       void *stream_if_ctx, unsigned initial_window,
232                       unsigned initial_send_off,
233                       enum stream_ctor_flags ctor_flags)
234{
235    lsquic_cfcw_t *cfcw;
236    lsquic_stream_t *stream;
237
238    stream = calloc(1, sizeof(*stream));
239    if (!stream)
240        return NULL;
241
242    stream->stream_if = stream_if;
243    stream->id        = id;
244    stream->conn_pub  = conn_pub;
245    stream->sm_onnew_arg = stream_if_ctx;
246    if (!initial_window)
247        initial_window = 16 * 1024;
248    if (LSQUIC_STREAM_HANDSHAKE == id ||
249        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
250        cfcw = NULL;
251    else
252    {
253        cfcw = &conn_pub->cfcw;
254        stream->stream_flags |= STREAM_CONN_LIMITED;
255        if (conn_pub->hs)
256            stream->stream_flags |= STREAM_USE_HEADERS;
257        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
258    }
259    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
260    if (!initial_send_off)
261        initial_send_off = 16 * 1024;
262    stream->max_send_off = initial_send_off;
263    if (ctor_flags & SCF_USE_DI_HASH)
264        stream->data_in = data_in_hash_new(conn_pub, id, 0);
265    else
266        stream->data_in = data_in_nocopy_new(conn_pub, id);
267    LSQ_DEBUG("created stream %u @%p", id, stream);
268    SM_HISTORY_APPEND(stream, SHE_CREATED);
269    if (ctor_flags & SCF_DI_AUTOSWITCH)
270        stream->stream_flags |= STREAM_AUTOSWITCH;
271    if (ctor_flags & SCF_CALL_ON_NEW)
272        lsquic_stream_call_on_new(stream);
273    if (ctor_flags & SCF_DISP_RW_ONCE)
274        stream->stream_flags |= STREAM_RW_ONCE;
275    return stream;
276}
277
278
279void
280lsquic_stream_call_on_new (lsquic_stream_t *stream)
281{
282    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
283    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
284    {
285        LSQ_DEBUG("calling on_new_stream");
286        SM_HISTORY_APPEND(stream, SHE_ONNEW);
287        stream->stream_flags |= STREAM_ONNEW_DONE;
288        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
289                                                          stream);
290    }
291}
292
293
294static void
295decr_conn_cap (struct lsquic_stream *stream, size_t incr)
296{
297    if (stream->stream_flags & STREAM_CONN_LIMITED)
298    {
299        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
300        stream->conn_pub->conn_cap.cc_sent -= incr;
301    }
302}
303
304
305static void
306drop_buffered_data (struct lsquic_stream *stream)
307{
308    decr_conn_cap(stream, stream->sm_n_buffered);
309    stream->sm_n_buffered = 0;
310    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
311        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
312}
313
314
315void
316lsquic_stream_destroy (lsquic_stream_t *stream)
317{
318    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
319    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
320                                                            STREAM_ONNEW_DONE)
321    {
322        stream->stream_flags |= STREAM_ONCLOSE_DONE;
323        stream->stream_if->on_close(stream, stream->st_ctx);
324    }
325    if (stream->stream_flags & STREAM_SENDING_FLAGS)
326        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
327    if (stream->stream_flags & STREAM_WANT_READ)
328        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
329    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
330        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
331    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
332        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
333    drop_buffered_data(stream);
334    lsquic_sfcw_consume_rem(&stream->fc);
335    drop_frames_in(stream);
336    free(stream->push_req);
337    free(stream->uh);
338    free(stream->sm_buf);
339    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
340    SM_HISTORY_DUMP_REMAINING(stream);
341    free(stream);
342}
343
344
345static int
346stream_is_finished (const lsquic_stream_t *stream)
347{
348    return lsquic_stream_is_closed(stream)
349           /* n_unacked checks that no outgoing packets that reference this
350            * stream are outstanding:
351            */
352        && 0 == stream->n_unacked
353           /* This checks that no packets that reference this stream will
354            * become outstanding:
355            */
356        && 0 == (stream->stream_flags & STREAM_SEND_RST)
357        && ((stream->stream_flags & STREAM_FORCE_FINISH)
358          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
359           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
360}
361
362
363static void
364maybe_finish_stream (lsquic_stream_t *stream)
365{
366    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
367                                                    stream_is_finished(stream))
368    {
369        LSQ_DEBUG("stream %u is now finished", stream->id);
370        SM_HISTORY_APPEND(stream, SHE_FINISHED);
371        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
372            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
373                                                    next_service_stream);
374        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
375    }
376}
377
378
379static void
380maybe_schedule_call_on_close (lsquic_stream_t *stream)
381{
382    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
383                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
384            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
385    {
386        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
387            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
388                                                    next_service_stream);
389        stream->stream_flags |= STREAM_CALL_ONCLOSE;
390        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
391        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
392    }
393}
394
395
396void
397lsquic_stream_call_on_close (lsquic_stream_t *stream)
398{
399    assert(stream->stream_flags & STREAM_ONNEW_DONE);
400    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
401    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
402        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
403                                                    next_service_stream);
404    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
405    {
406        LSQ_DEBUG("calling on_close for stream %u", stream->id);
407        stream->stream_flags |= STREAM_ONCLOSE_DONE;
408        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
409        stream->stream_if->on_close(stream, stream->st_ctx);
410    }
411    else
412        assert(0);
413}
414
415
416int
417lsquic_stream_readable (const lsquic_stream_t *stream)
418{
419    /* A stream is readable if one of the following is true: */
420    return
421        /* - It is already finished: in that case, lsquic_stream_read() will
422         *   return 0.
423         */
424            (stream->stream_flags & STREAM_FIN_REACHED)
425        /* - The stream is reset, by either side.  In this case,
426         *   lsquic_stream_read() will return -1 (we want the user to be
427         *   able to collect the error).
428         */
429        ||  (stream->stream_flags & STREAM_RST_FLAGS)
430        /* - Either we are not in HTTP mode or the HTTP headers have been
431         *   received and the headers or data from the stream can be read.
432         */
433        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
434                                                        == STREAM_USE_HEADERS)
435            && (stream->uh != NULL
436                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
437                                                        stream->read_offset)))
438    ;
439}
440
441
442size_t
443lsquic_stream_write_avail (const struct lsquic_stream *stream)
444{
445    uint64_t stream_avail, conn_avail;
446
447    stream_avail = stream->max_send_off - stream->tosend_off
448                                                - stream->sm_n_buffered;
449    if (stream->stream_flags & STREAM_CONN_LIMITED)
450    {
451        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
452        if (conn_avail < stream_avail)
453            return conn_avail;
454    }
455
456    return stream_avail;
457}
458
459
460int
461lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
462{
463    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
464                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
465    {
466        return -1;
467    }
468    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
469    {
470        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
471            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
472                                                    next_send_stream);
473        stream->stream_flags |= STREAM_SEND_WUF;
474    }
475    return 0;
476}
477
478
479int
480lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
481{
482    uint64_t max_off;
483    int got_next_offset;
484    enum ins_frame ins_frame;
485
486    assert(frame->packet_in);
487
488    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
489    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
490        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
491
492    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
493                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
494    {
495        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
496        lsquic_malo_put(frame);
497        return -1;
498    }
499
500    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
501    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
502    if (INS_FRAME_OK == ins_frame)
503    {
504        /* Update maximum offset in the flow controller and check for flow
505         * control violation:
506         */
507        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
508        if (0 != lsquic_stream_update_sfcw(stream, max_off))
509            return -1;
510        if (frame->data_frame.df_fin)
511        {
512            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
513            stream->stream_flags |= STREAM_FIN_RECVD;
514            maybe_finish_stream(stream);
515        }
516        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
517                (stream->data_in->di_flags & DI_SWITCH_IMPL))
518        {
519            stream->data_in = stream->data_in->di_if->di_switch_impl(
520                                        stream->data_in, stream->read_offset);
521            if (!stream->data_in)
522            {
523                stream->data_in = data_in_error_new();
524                return -1;
525            }
526        }
527        if (got_next_offset)
528            /* Checking the offset saves di_get_frame() call */
529            maybe_conn_to_tickable_if_readable(stream);
530        return 0;
531    }
532    else if (INS_FRAME_DUP == ins_frame)
533    {
534        return 0;
535    }
536    else
537    {
538        assert(INS_FRAME_ERR == ins_frame);
539        return -1;
540    }
541}
542
543
544static void
545drop_frames_in (lsquic_stream_t *stream)
546{
547    if (stream->data_in)
548    {
549        stream->data_in->di_if->di_destroy(stream->data_in);
550        /* To avoid checking whether `data_in` is set, just set to the error
551         * data-in stream.  It does the right thing after incoming data is
552         * dropped.
553         */
554        stream->data_in = data_in_error_new();
555    }
556}
557
558
559static void
560maybe_elide_stream_frames (struct lsquic_stream *stream)
561{
562    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
563    {
564        if (stream->n_unacked)
565            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
566                                                stream->id);
567        stream->stream_flags |= STREAM_FRAMES_ELIDED;
568    }
569}
570
571
572int
573lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
574                      uint32_t error_code)
575{
576
577    if (stream->stream_flags & STREAM_RST_RECVD)
578    {
579        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
580        return 0;
581    }
582
583    SM_HISTORY_APPEND(stream, SHE_RST_IN);
584    /* This flag must always be set, even if we are "ignoring" it: it is
585     * used by elision code.
586     */
587    stream->stream_flags |= STREAM_RST_RECVD;
588
589    if ((stream->stream_flags & STREAM_FIN_RECVD) &&
590                    /* Pushed streams have fake STREAM_FIN_RECVD set, thus
591                     * we need a special check:
592                     */
593                                            !lsquic_stream_is_pushed(stream))
594    {
595        LSQ_DEBUG("ignore RST_STREAM frame after FIN is received");
596        return 0;
597    }
598
599    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
600    {
601        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
602            "smaller than that of byte following the last byte we have seen: "
603            "0x%"PRIX64, stream->id, offset,
604            lsquic_sfcw_get_max_recv_off(&stream->fc));
605        return -1;
606    }
607
608    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
609    {
610        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
611            " violates flow control", stream->id, offset);
612        return -1;
613    }
614
615    /* Let user collect error: */
616    maybe_conn_to_tickable_if_readable(stream);
617
618    lsquic_sfcw_consume_rem(&stream->fc);
619    drop_frames_in(stream);
620    drop_buffered_data(stream);
621    maybe_elide_stream_frames(stream);
622
623    if (!(stream->stream_flags &
624                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
625        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
626
627    stream->stream_flags |= STREAM_RST_RECVD;
628
629    maybe_finish_stream(stream);
630    maybe_schedule_call_on_close(stream);
631
632    return 0;
633}
634
635
636uint64_t
637lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
638{
639    assert(stream->stream_flags & STREAM_SEND_WUF);
640    stream->stream_flags &= ~STREAM_SEND_WUF;
641    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
642        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
643    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
644}
645
646
647void
648lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
649{
650    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
651    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
652    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
653    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
654        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
655}
656
657
658void
659lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
660{
661    assert(stream->stream_flags & STREAM_SEND_RST);
662    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
663    stream->stream_flags &= ~STREAM_SEND_RST;
664    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
665        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
666    stream->stream_flags |= STREAM_RST_SENT;
667    maybe_finish_stream(stream);
668}
669
670
671static size_t
672read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
673{
674    struct uncompressed_headers *uh = stream->uh;
675    size_t n_avail = uh->uh_size - uh->uh_off;
676    if (n_avail < len)
677        len = n_avail;
678    memcpy(dst, uh->uh_headers + uh->uh_off, len);
679    uh->uh_off += len;
680    if (uh->uh_off == uh->uh_size)
681    {
682        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
683        free(uh);
684        stream->uh = NULL;
685        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
686        {
687            stream->stream_flags |= STREAM_FIN_REACHED;
688            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
689        }
690    }
691    return len;
692}
693
694
695/* This function returns 0 when EOF is reached.
696 */
697ssize_t
698lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
699                     int iovcnt)
700{
701    size_t total_nread, nread;
702    int processed_frames, read_unc_headers, iovidx;
703    unsigned char *p, *end;
704
705    SM_HISTORY_APPEND(stream, SHE_USER_READ);
706
707#define NEXT_IOV() do {                                             \
708    ++iovidx;                                                       \
709    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
710        ++iovidx;                                                   \
711    if (iovidx < iovcnt)                                            \
712    {                                                               \
713        p = iov[iovidx].iov_base;                                   \
714        end = p + iov[iovidx].iov_len;                              \
715    }                                                               \
716    else                                                            \
717        p = end = NULL;                                             \
718} while (0)
719
720#define AVAIL() (end - p)
721
722    if (stream->stream_flags & STREAM_RST_FLAGS)
723    {
724        errno = ECONNRESET;
725        return -1;
726    }
727    if (stream->stream_flags & STREAM_U_READ_DONE)
728    {
729        errno = EBADF;
730        return -1;
731    }
732    if (stream->stream_flags & STREAM_FIN_REACHED)
733        return 0;
734
735    total_nread = 0;
736    processed_frames = 0;
737
738    iovidx = -1;
739    NEXT_IOV();
740
741    if (stream->uh && AVAIL())
742    {
743        read_unc_headers = 1;
744        do
745        {
746            nread = read_uh(stream, p, AVAIL());
747            p += nread;
748            total_nread += nread;
749            if (p == end)
750                NEXT_IOV();
751        }
752        while (stream->uh && AVAIL());
753    }
754    else
755        read_unc_headers = 0;
756
757    struct data_frame *data_frame;
758    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
759    {
760        ++processed_frames;
761        size_t navail = data_frame->df_size - data_frame->df_read_off;
762        size_t ntowrite = AVAIL();
763        if (navail < ntowrite)
764            ntowrite = navail;
765        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
766        p += ntowrite;
767        data_frame->df_read_off += ntowrite;
768        stream->read_offset += ntowrite;
769        total_nread += ntowrite;
770        if (data_frame->df_read_off == data_frame->df_size)
771        {
772            const int fin = data_frame->df_fin;
773            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
774            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
775                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
776            {
777                stream->data_in = stream->data_in->di_if->di_switch_impl(
778                                            stream->data_in, stream->read_offset);
779                if (!stream->data_in)
780                {
781                    stream->data_in = data_in_error_new();
782                    return -1;
783                }
784            }
785            if (fin)
786            {
787                stream->stream_flags |= STREAM_FIN_REACHED;
788                break;
789            }
790        }
791        if (p == end)
792            NEXT_IOV();
793    }
794
795    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
796                                        total_nread, stream->read_offset);
797
798    if (processed_frames)
799    {
800        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
801        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
802        {
803            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
804                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
805            stream->stream_flags |= STREAM_SEND_WUF;
806            maybe_conn_to_tickable_if_writeable(stream, 1);
807        }
808    }
809
810    if (processed_frames || read_unc_headers)
811    {
812        return total_nread;
813    }
814    else
815    {
816        assert(0 == total_nread);
817        errno = EWOULDBLOCK;
818        return -1;
819    }
820}
821
822
823ssize_t
824lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
825{
826    struct iovec iov = { .iov_base = buf, .iov_len = len, };
827    return lsquic_stream_readv(stream, &iov, 1);
828}
829
830
831static void
832stream_shutdown_read (lsquic_stream_t *stream)
833{
834    if (!(stream->stream_flags & STREAM_U_READ_DONE))
835    {
836        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
837        stream->stream_flags |= STREAM_U_READ_DONE;
838        stream_wantread(stream, 0);
839        maybe_finish_stream(stream);
840    }
841}
842
843
844static void
845stream_shutdown_write (lsquic_stream_t *stream)
846{
847    if (stream->stream_flags & STREAM_U_WRITE_DONE)
848        return;
849
850    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
851    stream->stream_flags |= STREAM_U_WRITE_DONE;
852    stream_wantwrite(stream, 0);
853
854    /* Don't bother to check whether there is anything else to write if
855     * the flags indicate that nothing else should be written.
856     */
857    if (!(stream->stream_flags &
858                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
859    {
860        if (stream->sm_n_buffered == 0)
861        {
862            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
863                                                 stream))
864            {
865                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
866                stream->stream_flags |= STREAM_FIN_SENT;
867            }
868            else
869            {
870                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
871                          "flag in it");
872                (void) stream_flush_nocheck(stream);
873            }
874        }
875        else
876            (void) stream_flush_nocheck(stream);
877    }
878}
879
880
881int
882lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
883{
884    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
885    if (lsquic_stream_is_closed(stream))
886    {
887        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
888        errno = EBADF;
889        return -1;
890    }
891    /* 0: read, 1: write: 2: read and write
892     */
893    if (how < 0 || how > 2)
894    {
895        errno = EINVAL;
896        return -1;
897    }
898
899    if (how)
900        stream_shutdown_write(stream);
901    if (how != 1)
902        stream_shutdown_read(stream);
903
904    maybe_finish_stream(stream);
905    maybe_schedule_call_on_close(stream);
906    if (how)
907        maybe_conn_to_tickable_if_writeable(stream, 1);
908
909    return 0;
910}
911
912
913void
914lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
915{
916    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
917    if (LSQUIC_STREAM_HANDSHAKE == stream->id
918        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
919                                LSQUIC_STREAM_HEADERS == stream->id))
920    {
921        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
922        stream->stream_flags |= STREAM_FORCE_FINISH;
923        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
924    }
925    maybe_finish_stream(stream);
926    maybe_schedule_call_on_close(stream);
927}
928
929
930static void
931fake_reset_unused_stream (lsquic_stream_t *stream)
932{
933    stream->stream_flags |=
934        STREAM_RST_RECVD    /* User will pick this up on read or write */
935      | STREAM_RST_SENT     /* Don't send anything else on this stream */
936    ;
937
938    /* Cancel all writes to the network scheduled for this stream: */
939    if (stream->stream_flags & STREAM_SENDING_FLAGS)
940        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
941                                                next_send_stream);
942    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
943
944    LSQ_DEBUG("fake-reset stream %u%s",
945                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
946    maybe_finish_stream(stream);
947    maybe_schedule_call_on_close(stream);
948}
949
950
951/* This function should only be called for locally-initiated streams whose ID
952 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
953 * frame sent by peer but we have not yet received it and created a stream.
954 * In this situation, we mark the stream as reset, so that user's on_read or
955 * on_write event callback picks up the error.  That, in turn, should result
956 * in stream being closed.
957 *
958 * If we have received any data frames on this stream, this probably indicates
959 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
960 * lower than this.  However, we still try to handle it gracefully and peform
961 * a shutdown, as if the stream was not reset.
962 */
963void
964lsquic_stream_received_goaway (lsquic_stream_t *stream)
965{
966    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
967    if (0 == stream->read_offset &&
968                            stream->data_in->di_if->di_empty(stream->data_in))
969        fake_reset_unused_stream(stream);       /* Normal condition */
970    else
971    {   /* This is odd, let's handle it the best we can: */
972        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
973        lsquic_stream_shutdown_internal(stream);
974    }
975}
976
977
978uint64_t
979lsquic_stream_read_offset (const lsquic_stream_t *stream)
980{
981    return stream->read_offset;
982}
983
984
985static int
986stream_wantread (lsquic_stream_t *stream, int is_want)
987{
988    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
989    const int new_val = !!is_want;
990    if (old_val != new_val)
991    {
992        if (new_val)
993        {
994            if (!old_val)
995                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
996                                                            next_read_stream);
997            stream->stream_flags |= STREAM_WANT_READ;
998        }
999        else
1000        {
1001            stream->stream_flags &= ~STREAM_WANT_READ;
1002            if (old_val)
1003                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1004                                                            next_read_stream);
1005        }
1006    }
1007    return old_val;
1008}
1009
1010
1011static void
1012maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1013{
1014    assert(STREAM_WRITE_Q_FLAGS & flag);
1015    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1016        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1017                                                        next_write_stream);
1018    stream->stream_flags |= flag;
1019}
1020
1021
1022static void
1023maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1024{
1025    assert(STREAM_WRITE_Q_FLAGS & flag);
1026    if (stream->stream_flags & flag)
1027    {
1028        stream->stream_flags &= ~flag;
1029        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1030            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1031                                                        next_write_stream);
1032    }
1033}
1034
1035
1036static int
1037stream_wantwrite (lsquic_stream_t *stream, int is_want)
1038{
1039    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1040    const int new_val = !!is_want;
1041    if (old_val != new_val)
1042    {
1043        if (new_val)
1044            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1045        else
1046            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1047    }
1048    return old_val;
1049}
1050
1051
1052int
1053lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1054{
1055    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1056    {
1057        if (is_want)
1058            maybe_conn_to_tickable_if_readable(stream);
1059        return stream_wantread(stream, is_want);
1060    }
1061    else
1062    {
1063        errno = EBADF;
1064        return -1;
1065    }
1066}
1067
1068
1069int
1070lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1071{
1072    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1073    {
1074        if (is_want)
1075            maybe_conn_to_tickable_if_writeable(stream, 1);
1076        return stream_wantwrite(stream, is_want);
1077    }
1078    else
1079    {
1080        errno = EBADF;
1081        return -1;
1082    }
1083}
1084
1085
1086#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1087    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1088
1089
1090static void
1091stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1092{
1093    unsigned no_progress_count, no_progress_limit;
1094    enum stream_flags flags;
1095    uint64_t size;
1096
1097    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1098
1099    no_progress_count = 0;
1100    while ((stream->stream_flags & STREAM_WANT_READ)
1101                                            && lsquic_stream_readable(stream))
1102    {
1103        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1104        size  = stream->read_offset;
1105
1106        stream->stream_if->on_read(stream, stream->st_ctx);
1107
1108        if (no_progress_limit && size == stream->read_offset &&
1109                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1110        {
1111            ++no_progress_count;
1112            if (no_progress_count >= no_progress_limit)
1113            {
1114                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1115                    "progress) in user code reading from stream",
1116                    no_progress_count,
1117                    no_progress_count == 1 ? "" : "s");
1118                break;
1119            }
1120        }
1121        else
1122            no_progress_count = 0;
1123    }
1124}
1125
1126
1127static void
1128stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1129{
1130    unsigned no_progress_count, no_progress_limit;
1131    enum stream_flags flags;
1132
1133    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1134
1135    no_progress_count = 0;
1136    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1137    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1138                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1139           && lsquic_stream_write_avail(stream))
1140    {
1141        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1142
1143        stream->stream_if->on_write(stream, stream->st_ctx);
1144
1145        if (no_progress_limit &&
1146            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1147        {
1148            ++no_progress_count;
1149            if (no_progress_count >= no_progress_limit)
1150            {
1151                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1152                    "progress) in user code writing to stream",
1153                    no_progress_count,
1154                    no_progress_count == 1 ? "" : "s");
1155                break;
1156            }
1157        }
1158        else
1159            no_progress_count = 0;
1160    }
1161}
1162
1163
1164static void
1165stream_dispatch_read_events_once (lsquic_stream_t *stream)
1166{
1167    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1168    {
1169        stream->stream_if->on_read(stream, stream->st_ctx);
1170    }
1171}
1172
1173
1174static void
1175maybe_mark_as_blocked (lsquic_stream_t *stream)
1176{
1177    struct lsquic_conn_cap *cc;
1178
1179    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1180    {
1181        if (stream->blocked_off < stream->max_send_off)
1182        {
1183            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1184            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1185                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1186                                                            next_send_stream);
1187            stream->stream_flags |= STREAM_SEND_BLOCKED;
1188            LSQ_DEBUG("marked stream-blocked at stream offset "
1189                                            "%"PRIu64, stream->blocked_off);
1190        }
1191        else
1192            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1193                " has been, or is about to be, sent", stream->blocked_off);
1194    }
1195
1196    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1197        && (cc = &stream->conn_pub->conn_cap,
1198                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1199    {
1200        if (cc->cc_blocked < cc->cc_max)
1201        {
1202            cc->cc_blocked = cc->cc_max;
1203            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1204            LSQ_DEBUG("marked connection-blocked at connection offset "
1205                                                    "%"PRIu64, cc->cc_max);
1206        }
1207        else
1208            LSQ_DEBUG("stream has already been marked connection-blocked "
1209                "at offset %"PRIu64, cc->cc_blocked);
1210    }
1211}
1212
1213
1214void
1215lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1216{
1217    assert(stream->stream_flags & STREAM_WANT_READ);
1218
1219    if (stream->stream_flags & STREAM_RW_ONCE)
1220        stream_dispatch_read_events_once(stream);
1221    else
1222        stream_dispatch_read_events_loop(stream);
1223}
1224
1225
1226void
1227lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1228{
1229    int progress;
1230    uint64_t tosend_off;
1231    unsigned short n_buffered;
1232    enum stream_flags flags;
1233
1234    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1235    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1236    tosend_off = stream->tosend_off;
1237    n_buffered = stream->sm_n_buffered;
1238
1239    if (stream->stream_flags & STREAM_WANT_FLUSH)
1240        (void) stream_flush(stream);
1241
1242    if (stream->stream_flags & STREAM_RW_ONCE)
1243    {
1244        if ((stream->stream_flags & STREAM_WANT_WRITE)
1245            && lsquic_stream_write_avail(stream))
1246        {
1247            stream->stream_if->on_write(stream, stream->st_ctx);
1248        }
1249    }
1250    else
1251        stream_dispatch_write_events_loop(stream);
1252
1253    /* Progress means either flags or offsets changed: */
1254    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1255                        stream->tosend_off == tosend_off &&
1256                            stream->sm_n_buffered == n_buffered);
1257
1258    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1259    {
1260        if (progress)
1261        {   /* Move the stream to the end of the list to ensure fairness. */
1262            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1263                                                            next_write_stream);
1264            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1265                                                            next_write_stream);
1266        }
1267    }
1268}
1269
1270
1271static size_t
1272inner_reader_empty_size (void *ctx)
1273{
1274    return 0;
1275}
1276
1277
1278static size_t
1279inner_reader_empty_read (void *ctx, void *buf, size_t count)
1280{
1281    return 0;
1282}
1283
1284
1285static int
1286stream_flush (lsquic_stream_t *stream)
1287{
1288    struct lsquic_reader empty_reader;
1289    ssize_t nw;
1290
1291    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1292    assert(stream->sm_n_buffered > 0 ||
1293        /* Flushing is also used to packetize standalone FIN: */
1294        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1295                                                    == STREAM_U_WRITE_DONE));
1296
1297    empty_reader.lsqr_size = inner_reader_empty_size;
1298    empty_reader.lsqr_read = inner_reader_empty_read;
1299    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1300    nw = stream_write_to_packets(stream, &empty_reader, 0);
1301
1302    if (nw >= 0)
1303    {
1304        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1305        return 0;
1306    }
1307    else
1308        return -1;
1309}
1310
1311
1312static int
1313stream_flush_nocheck (lsquic_stream_t *stream)
1314{
1315    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1316    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1317    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1318
1319    return stream_flush(stream);
1320}
1321
1322
1323int
1324lsquic_stream_flush (lsquic_stream_t *stream)
1325{
1326    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1327    {
1328        LSQ_DEBUG("cannot flush closed stream");
1329        errno = EBADF;
1330        return -1;
1331    }
1332
1333    if (0 == stream->sm_n_buffered)
1334    {
1335        LSQ_DEBUG("flushing 0 bytes: noop");
1336        return 0;
1337    }
1338
1339    return stream_flush_nocheck(stream);
1340}
1341
1342
1343/* The flush threshold is the maximum size of stream data that can be sent
1344 * in a full packet.
1345 */
1346static size_t
1347flush_threshold (const lsquic_stream_t *stream)
1348{
1349    enum packet_out_flags flags;
1350    enum lsquic_packno_bits bits;
1351    unsigned packet_header_sz, stream_header_sz;
1352    size_t threshold;
1353
1354    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1355    flags = bits << POBIT_SHIFT;
1356    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1357        flags |= PO_CONN_ID;
1358
1359    packet_header_sz = lsquic_po_header_length(flags);
1360    stream_header_sz = stream->conn_pub->lconn->cn_pf
1361            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1362
1363    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1364              - packet_header_sz - stream_header_sz;
1365    return threshold;
1366}
1367
1368
1369#define COMMON_WRITE_CHECKS() do {                                          \
1370    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1371                                                   == STREAM_USE_HEADERS)   \
1372    {                                                                       \
1373        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1374        errno = EILSEQ;                                                     \
1375        return -1;                                                          \
1376    }                                                                       \
1377    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1378    {                                                                       \
1379        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1380        errno = ECONNRESET;                                                 \
1381        return -1;                                                          \
1382    }                                                                       \
1383    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1384    {                                                                       \
1385        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1386                                                                "writing"); \
1387        errno = EBADF;                                                      \
1388        return -1;                                                          \
1389    }                                                                       \
1390} while (0)
1391
1392
1393struct frame_gen_ctx
1394{
1395    lsquic_stream_t      *fgc_stream;
1396    struct lsquic_reader *fgc_reader;
1397    /* We keep our own count of how many bytes were read from reader because
1398     * some readers are external.  The external caller does not have to rely
1399     * on our count, but it can.
1400     */
1401    size_t                fgc_nread_from_reader;
1402};
1403
1404
1405static size_t
1406frame_gen_size (void *ctx)
1407{
1408    struct frame_gen_ctx *fg_ctx = ctx;
1409    size_t available, remaining;
1410
1411    /* Make sure we are not writing past available size: */
1412    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1413    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1414    if (available < remaining)
1415        remaining = available;
1416
1417    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1418}
1419
1420
1421static int
1422frame_gen_fin (void *ctx)
1423{
1424    struct frame_gen_ctx *fg_ctx = ctx;
1425    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1426        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1427        /* Do not use frame_gen_size() as it may chop the real size: */
1428        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1429}
1430
1431
1432static void
1433incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1434{
1435    if (stream->stream_flags & STREAM_CONN_LIMITED)
1436    {
1437        stream->conn_pub->conn_cap.cc_sent += incr;
1438        assert(stream->conn_pub->conn_cap.cc_sent
1439                                    <= stream->conn_pub->conn_cap.cc_max);
1440    }
1441}
1442
1443
1444static size_t
1445frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1446{
1447    struct frame_gen_ctx *fg_ctx = ctx;
1448    unsigned char *p = begin_buf;
1449    unsigned char *const end = p + len;
1450    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1451    size_t n_written, available, n_to_write;
1452
1453    if (stream->sm_n_buffered > 0)
1454    {
1455        if (len <= stream->sm_n_buffered)
1456        {
1457            memcpy(p, stream->sm_buf, len);
1458            memmove(stream->sm_buf, stream->sm_buf + len,
1459                                                stream->sm_n_buffered - len);
1460            stream->sm_n_buffered -= len;
1461            stream->tosend_off += len;
1462            *fin = frame_gen_fin(fg_ctx);
1463            return len;
1464        }
1465        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1466        p += stream->sm_n_buffered;
1467        stream->sm_n_buffered = 0;
1468    }
1469
1470    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1471    n_to_write = end - p;
1472    if (n_to_write > available)
1473        n_to_write = available;
1474    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1475                                              n_to_write);
1476    p += n_written;
1477    fg_ctx->fgc_nread_from_reader += n_written;
1478    *fin = frame_gen_fin(fg_ctx);
1479    stream->tosend_off += p - (const unsigned char *) begin_buf;
1480    incr_conn_cap(stream, n_written);
1481    return p - (const unsigned char *) begin_buf;
1482}
1483
1484
1485static void
1486check_flush_threshold (lsquic_stream_t *stream)
1487{
1488    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1489                            stream->tosend_off >= stream->sm_flush_to)
1490    {
1491        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1492                                                    stream->sm_flush_to);
1493        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1494    }
1495}
1496
1497
1498static struct lsquic_packet_out *
1499get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1500                      const struct lsquic_stream *stream)
1501{
1502    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1503}
1504
1505
1506static struct lsquic_packet_out * (* const get_packet[])(
1507    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1508{
1509    lsquic_send_ctl_get_packet_for_stream,
1510    get_brand_new_packet,
1511};
1512
1513
1514static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1515stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1516{
1517    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1518    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1519    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1520    unsigned stream_header_sz, need_at_least, off;
1521    lsquic_packet_out_t *packet_out;
1522    int len, s, hsk;
1523
1524    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1525                                                        stream->tosend_off);
1526    need_at_least = stream_header_sz + (size > 0);
1527    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1528    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1529    if (!packet_out)
1530        return SWTP_STOP;
1531
1532    off = packet_out->po_data_sz;
1533    len = pf->pf_gen_stream_frame(
1534                packet_out->po_data + packet_out->po_data_sz,
1535                lsquic_packet_out_avail(packet_out), stream->id,
1536                stream->tosend_off,
1537                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1538    if (len < 0)
1539    {
1540        LSQ_ERROR("could not generate stream frame");
1541        return SWTP_ERROR;
1542    }
1543
1544    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1545                            packet_out->po_data + packet_out->po_data_sz, len);
1546    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1547    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1548    if (0 == lsquic_packet_out_avail(packet_out))
1549        packet_out->po_flags |= PO_STREAM_END;
1550    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1551                                     stream, QUIC_FRAME_STREAM, off, len);
1552    if (s != 0)
1553    {
1554        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1555        return SWTP_ERROR;
1556    }
1557
1558    check_flush_threshold(stream);
1559
1560    /* XXX: I don't like it that this is here */
1561    if (hsk && !(packet_out->po_flags & PO_HELLO))
1562    {
1563        lsquic_packet_out_zero_pad(packet_out);
1564        packet_out->po_flags |= PO_HELLO;
1565        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1566    }
1567
1568    return SWTP_OK;
1569}
1570
1571
1572static void
1573abort_connection (struct lsquic_stream *stream)
1574{
1575    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1576        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1577                                                next_service_stream);
1578    stream->stream_flags |= STREAM_ABORT_CONN;
1579    LSQ_WARN("connection will be aborted");
1580    maybe_conn_to_tickable(stream);
1581}
1582
1583
1584static ssize_t
1585stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1586                         size_t thresh)
1587{
1588    size_t size;
1589    ssize_t nw;
1590    unsigned seen_ok;
1591    struct frame_gen_ctx fg_ctx = {
1592        .fgc_stream = stream,
1593        .fgc_reader = reader,
1594        .fgc_nread_from_reader = 0,
1595    };
1596
1597    seen_ok = 0;
1598    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1599           || frame_gen_fin(&fg_ctx))
1600    {
1601        switch (stream_write_to_packet(&fg_ctx, size))
1602        {
1603        case SWTP_OK:
1604            if (!seen_ok++)
1605                maybe_conn_to_tickable_if_writeable(stream, 0);
1606            if (frame_gen_fin(&fg_ctx))
1607            {
1608                stream->stream_flags |= STREAM_FIN_SENT;
1609                goto end;
1610            }
1611            else
1612                break;
1613        case SWTP_STOP:
1614            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1615            goto end;
1616        default:
1617            abort_connection(stream);
1618            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1619            return -1;
1620        }
1621    }
1622
1623    if (thresh)
1624    {
1625        assert(size < thresh);
1626        assert(size >= stream->sm_n_buffered);
1627        size -= stream->sm_n_buffered;
1628        if (size > 0)
1629        {
1630            nw = save_to_buffer(stream, reader, size);
1631            if (nw < 0)
1632                return -1;
1633            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1634        }
1635    }
1636    else
1637    {
1638        /* We count flushed data towards both stream and connection limits,
1639         * so we should have been able to packetize all of it:
1640         */
1641        assert(0 == stream->sm_n_buffered);
1642        assert(size == 0);
1643    }
1644
1645    maybe_mark_as_blocked(stream);
1646
1647  end:
1648    return fg_ctx.fgc_nread_from_reader;
1649}
1650
1651
1652/* Perform an implicit flush when we hit connection limit while buffering
1653 * data.  This is to prevent a (theoretical) stall:
1654 *
1655 * Imagine a number of streams, all of which buffered some data.  The buffered
1656 * data is up to connection cap, which means no further writes are possible.
1657 * None of them flushes, which means that data is not sent and connection
1658 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1659 */
1660static int
1661maybe_flush_stream (struct lsquic_stream *stream)
1662{
1663    if (stream->sm_n_buffered > 0
1664          && (stream->stream_flags & STREAM_CONN_LIMITED)
1665            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1666        return stream_flush_nocheck(stream);
1667    else
1668        return 0;
1669}
1670
1671
1672static ssize_t
1673save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1674                                                                size_t len)
1675{
1676    size_t avail, n_written;
1677
1678    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1679
1680    if (!stream->sm_buf)
1681    {
1682        stream->sm_buf = malloc(SM_BUF_SIZE);
1683        if (!stream->sm_buf)
1684            return -1;
1685    }
1686
1687    avail = lsquic_stream_write_avail(stream);
1688    if (avail < len)
1689        len = avail;
1690
1691    n_written = reader->lsqr_read(reader->lsqr_ctx,
1692                        stream->sm_buf + stream->sm_n_buffered, len);
1693    stream->sm_n_buffered += n_written;
1694    incr_conn_cap(stream, n_written);
1695    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1696              n_written, stream->sm_n_buffered);
1697    if (0 != maybe_flush_stream(stream))
1698        return -1;
1699    return n_written;
1700}
1701
1702
1703static ssize_t
1704stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1705{
1706    size_t thresh, len;
1707
1708    thresh = flush_threshold(stream);
1709    len = reader->lsqr_size(reader->lsqr_ctx);
1710    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1711                                    stream->sm_n_buffered + len < thresh)
1712        return save_to_buffer(stream, reader, len);
1713    else
1714        return stream_write_to_packets(stream, reader, thresh);
1715}
1716
1717
1718ssize_t
1719lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1720{
1721    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1722    return lsquic_stream_writev(stream, &iov, 1);
1723}
1724
1725
1726struct inner_reader_iovec {
1727    const struct iovec       *iov;
1728    const struct iovec *end;
1729    unsigned                  cur_iovec_off;
1730};
1731
1732
1733static size_t
1734inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1735{
1736    struct inner_reader_iovec *const iro = ctx;
1737    unsigned char *p = buf;
1738    unsigned char *const end = p + count;
1739    unsigned n_tocopy;
1740
1741    while (iro->iov < iro->end && p < end)
1742    {
1743        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1744        if (n_tocopy > (unsigned) (end - p))
1745            n_tocopy = end - p;
1746        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1747                                                                    n_tocopy);
1748        p += n_tocopy;
1749        iro->cur_iovec_off += n_tocopy;
1750        if (iro->iov->iov_len == iro->cur_iovec_off)
1751        {
1752            ++iro->iov;
1753            iro->cur_iovec_off = 0;
1754        }
1755    }
1756
1757    return p + count - end;
1758}
1759
1760
1761static size_t
1762inner_reader_iovec_size (void *ctx)
1763{
1764    struct inner_reader_iovec *const iro = ctx;
1765    const struct iovec *iov;
1766    size_t size;
1767
1768    size = 0;
1769    for (iov = iro->iov; iov < iro->end; ++iov)
1770        size += iov->iov_len;
1771
1772    return size - iro->cur_iovec_off;
1773}
1774
1775
1776ssize_t
1777lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1778                                                                    int iovcnt)
1779{
1780    COMMON_WRITE_CHECKS();
1781    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1782
1783    struct inner_reader_iovec iro = {
1784        .iov = iov,
1785        .end = iov + iovcnt,
1786        .cur_iovec_off = 0,
1787    };
1788    struct lsquic_reader reader = {
1789        .lsqr_read = inner_reader_iovec_read,
1790        .lsqr_size = inner_reader_iovec_size,
1791        .lsqr_ctx  = &iro,
1792    };
1793
1794    return stream_write(stream, &reader);
1795}
1796
1797
1798ssize_t
1799lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1800{
1801    COMMON_WRITE_CHECKS();
1802    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1803    return stream_write(stream, reader);
1804}
1805
1806
1807int
1808lsquic_stream_send_headers (lsquic_stream_t *stream,
1809                            const lsquic_http_headers_t *headers, int eos)
1810{
1811    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1812                                                     STREAM_U_WRITE_DONE))
1813                == STREAM_USE_HEADERS)
1814    {
1815        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1816                    stream->id, headers, eos, lsquic_stream_priority(stream));
1817        if (0 == s)
1818        {
1819            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1820            stream->stream_flags |= STREAM_HEADERS_SENT;
1821            if (eos)
1822                stream->stream_flags |= STREAM_FIN_SENT;
1823            LSQ_INFO("sent headers for stream %u", stream->id);
1824        }
1825        else
1826            LSQ_WARN("could not send headers: %s", strerror(errno));
1827        return s;
1828    }
1829    else
1830    {
1831        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1832        errno = EBADMSG;
1833        return -1;
1834    }
1835}
1836
1837
1838void
1839lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1840{
1841    if (offset > stream->max_send_off)
1842    {
1843        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1844        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1845            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1846        stream->max_send_off = offset;
1847    }
1848    else
1849        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1850            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1851            stream->max_send_off);
1852}
1853
1854
1855/* This function is used to update offsets after handshake completes and we
1856 * learn of peer's limits from the handshake values.
1857 */
1858int
1859lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1860{
1861    LSQ_DEBUG("setting max_send_off to %u", offset);
1862    if (offset > stream->max_send_off)
1863    {
1864        lsquic_stream_window_update(stream, offset);
1865        return 0;
1866    }
1867    else if (offset < stream->tosend_off)
1868    {
1869        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1870            "already sent on this stream (%"PRIu64" bytes)", offset,
1871            stream->tosend_off);
1872        return -1;
1873    }
1874    else
1875    {
1876        stream->max_send_off = offset;
1877        return 0;
1878    }
1879}
1880
1881
1882void
1883lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1884{
1885    lsquic_stream_reset_ext(stream, error_code, 1);
1886}
1887
1888
1889void
1890lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1891                         int do_close)
1892{
1893    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1894    {
1895        LSQ_INFO("reset already sent");
1896        return;
1897    }
1898
1899    SM_HISTORY_APPEND(stream, SHE_RESET);
1900
1901    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1902    stream->error_code = error_code;
1903
1904    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1905        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1906                                                        next_send_stream);
1907    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1908    stream->stream_flags |= STREAM_SEND_RST;
1909
1910    drop_buffered_data(stream);
1911    maybe_elide_stream_frames(stream);
1912    maybe_schedule_call_on_close(stream);
1913
1914    if (do_close)
1915        lsquic_stream_close(stream);
1916    else
1917        maybe_conn_to_tickable_if_writeable(stream, 1);
1918}
1919
1920
1921unsigned
1922lsquic_stream_id (const lsquic_stream_t *stream)
1923{
1924    return stream->id;
1925}
1926
1927
1928struct lsquic_conn *
1929lsquic_stream_conn (const lsquic_stream_t *stream)
1930{
1931    return stream->conn_pub->lconn;
1932}
1933
1934
1935int
1936lsquic_stream_close (lsquic_stream_t *stream)
1937{
1938    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1939    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1940    if (lsquic_stream_is_closed(stream))
1941    {
1942        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1943        errno = EBADF;
1944        return -1;
1945    }
1946    stream_shutdown_write(stream);
1947    stream_shutdown_read(stream);
1948    maybe_schedule_call_on_close(stream);
1949    maybe_finish_stream(stream);
1950    maybe_conn_to_tickable_if_writeable(stream, 1);
1951    return 0;
1952}
1953
1954
1955#ifndef NDEBUG
1956#if __GNUC__
1957__attribute__((weak))
1958#endif
1959#endif
1960void
1961lsquic_stream_acked (lsquic_stream_t *stream)
1962{
1963    assert(stream->n_unacked);
1964    --stream->n_unacked;
1965    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
1966    if (0 == stream->n_unacked)
1967        maybe_finish_stream(stream);
1968}
1969
1970
1971void
1972lsquic_stream_push_req (lsquic_stream_t *stream,
1973                        struct uncompressed_headers *push_req)
1974{
1975    assert(!stream->push_req);
1976    stream->push_req = push_req;
1977    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
1978}
1979
1980
1981int
1982lsquic_stream_is_pushed (const lsquic_stream_t *stream)
1983{
1984    return 1 & ~stream->id;
1985}
1986
1987
1988int
1989lsquic_stream_push_info (const lsquic_stream_t *stream,
1990        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
1991{
1992    if (lsquic_stream_is_pushed(stream))
1993    {
1994        assert(stream->push_req);
1995        *ref_stream_id = stream->push_req->uh_stream_id;
1996        *headers       = stream->push_req->uh_headers;
1997        *headers_sz    = stream->push_req->uh_size;
1998        return 0;
1999    }
2000    else
2001        return -1;
2002}
2003
2004
2005int
2006lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2007{
2008    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2009    {
2010        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2011        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2012        stream->stream_flags |= STREAM_HAVE_UH;
2013        if (uh->uh_flags & UH_FIN)
2014            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2015        stream->uh = uh;
2016        if (uh->uh_oth_stream_id == 0)
2017        {
2018            if (uh->uh_weight)
2019                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2020        }
2021        else
2022            LSQ_NOTICE("don't know how to depend on stream %u",
2023                                                        uh->uh_oth_stream_id);
2024        return 0;
2025    }
2026    else
2027    {
2028        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2029        return -1;
2030    }
2031}
2032
2033
2034unsigned
2035lsquic_stream_priority (const lsquic_stream_t *stream)
2036{
2037    return 256 - stream->sm_priority;
2038}
2039
2040
2041int
2042lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2043{
2044    /* The user should never get a reference to the special streams,
2045     * but let's check just in case:
2046     */
2047    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2048        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2049                                LSQUIC_STREAM_HEADERS == stream->id))
2050        return -1;
2051    if (priority < 1 || priority > 256)
2052        return -1;
2053    stream->sm_priority = 256 - priority;
2054    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2055    LSQ_DEBUG("set priority to %u", priority);
2056    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2057    return 0;
2058}
2059
2060
2061int
2062lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2063{
2064    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2065    {
2066        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2067                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2068        {
2069            /* We need to send headers only if we are a) using HEADERS stream
2070             * and b) we already sent initial headers.  If initial headers
2071             * have not been sent yet, stream priority will be sent in the
2072             * HEADERS frame.
2073             */
2074            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2075                                                    stream->id, 0, 0, priority);
2076        }
2077        else
2078            return 0;
2079    }
2080    else
2081        return -1;
2082}
2083
2084
2085lsquic_stream_ctx_t *
2086lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2087{
2088    return stream->st_ctx;
2089}
2090
2091
2092int
2093lsquic_stream_refuse_push (lsquic_stream_t *stream)
2094{
2095    if (lsquic_stream_is_pushed(stream) &&
2096                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2097    {
2098        LSQ_DEBUG("refusing pushed stream: send reset");
2099        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2100        return 0;
2101    }
2102    else
2103        return -1;
2104}
2105
2106
2107size_t
2108lsquic_stream_mem_used (const struct lsquic_stream *stream)
2109{
2110    size_t size;
2111
2112    size = sizeof(stream);
2113    if (stream->sm_buf)
2114        size += SM_BUF_SIZE;
2115    if (stream->data_in)
2116        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2117
2118    return size;
2119}
2120
2121
2122lsquic_cid_t
2123lsquic_stream_cid (const struct lsquic_stream *stream)
2124{
2125    return LSQUIC_LOG_CONN_ID;
2126}
2127