lsquic_stream.c revision 229fce07
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_conn.h"
44#include "lsquic_data_in_if.h"
45#include "lsquic_parse.h"
46#include "lsquic_packet_out.h"
47#include "lsquic_engine_public.h"
48#include "lsquic_senhist.h"
49#include "lsquic_pacer.h"
50#include "lsquic_cubic.h"
51#include "lsquic_send_ctl.h"
52#include "lsquic_headers.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152
153#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
154#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
155        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
156            LSQ_DEBUG("history: [%.*s]",                                    \
157                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
158                (stream)->sm_hist_buf);                                     \
159    } while (0)
160#else
161#   define SM_HISTORY_APPEND(stream, event)
162#   define SM_HISTORY_DUMP_REMAINING(stream)
163#endif
164
165
166static int
167stream_inside_callback (const lsquic_stream_t *stream)
168{
169    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
170}
171
172
173static void
174maybe_conn_to_tickable (lsquic_stream_t *stream)
175{
176    if (!stream_inside_callback(stream))
177        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
178                                           stream->conn_pub->lconn);
179}
180
181
182/* Here, "readable" means that the user is able to read from the stream. */
183static void
184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
185{
186    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
187    {
188        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
189                                           stream->conn_pub->lconn);
190    }
191}
192
193
194/* Here, "writeable" means that data can be put into packets to be
195 * scheduled to be sent out.
196 *
197 * If `check_can_send' is false, it means that we do not need to check
198 * whether packets can be sent.  This check was already performed when
199 * we packetized stream data.
200 */
201static void
202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
203                                                    int check_can_send)
204{
205    if (!stream_inside_callback(stream) &&
206            (!check_can_send
207             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
208          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
209    {
210        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
211                                           stream->conn_pub->lconn);
212    }
213}
214
215
216static int
217stream_stalled (const lsquic_stream_t *stream)
218{
219    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
220           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
221                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
222}
223
224
225/* TODO: The logic to figure out whether the stream is connection limited
226 * should be taken out of the constructor.  The caller should specify this
227 * via one of enum stream_ctor_flags.
228 */
229lsquic_stream_t *
230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
231                       const struct lsquic_stream_if *stream_if,
232                       void *stream_if_ctx, unsigned initial_window,
233                       unsigned initial_send_off,
234                       enum stream_ctor_flags ctor_flags)
235{
236    lsquic_cfcw_t *cfcw;
237    lsquic_stream_t *stream;
238
239    stream = calloc(1, sizeof(*stream));
240    if (!stream)
241        return NULL;
242
243    stream->stream_if = stream_if;
244    stream->id        = id;
245    stream->conn_pub  = conn_pub;
246    stream->sm_onnew_arg = stream_if_ctx;
247    if (!initial_window)
248        initial_window = 16 * 1024;
249    if (LSQUIC_STREAM_HANDSHAKE == id ||
250        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
251        cfcw = NULL;
252    else
253    {
254        cfcw = &conn_pub->cfcw;
255        stream->stream_flags |= STREAM_CONN_LIMITED;
256        if (conn_pub->hs)
257            stream->stream_flags |= STREAM_USE_HEADERS;
258        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
259    }
260    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
261    if (!initial_send_off)
262        initial_send_off = 16 * 1024;
263    stream->max_send_off = initial_send_off;
264    if (ctor_flags & SCF_USE_DI_HASH)
265        stream->data_in = data_in_hash_new(conn_pub, id, 0);
266    else
267        stream->data_in = data_in_nocopy_new(conn_pub, id);
268    LSQ_DEBUG("created stream %u @%p", id, stream);
269    SM_HISTORY_APPEND(stream, SHE_CREATED);
270    if (ctor_flags & SCF_DI_AUTOSWITCH)
271        stream->stream_flags |= STREAM_AUTOSWITCH;
272    if (ctor_flags & SCF_CALL_ON_NEW)
273        lsquic_stream_call_on_new(stream);
274    if (ctor_flags & SCF_DISP_RW_ONCE)
275        stream->stream_flags |= STREAM_RW_ONCE;
276    return stream;
277}
278
279
280void
281lsquic_stream_call_on_new (lsquic_stream_t *stream)
282{
283    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
284    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
285    {
286        LSQ_DEBUG("calling on_new_stream");
287        SM_HISTORY_APPEND(stream, SHE_ONNEW);
288        stream->stream_flags |= STREAM_ONNEW_DONE;
289        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
290                                                          stream);
291    }
292}
293
294
295static void
296decr_conn_cap (struct lsquic_stream *stream, size_t incr)
297{
298    if (stream->stream_flags & STREAM_CONN_LIMITED)
299    {
300        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
301        stream->conn_pub->conn_cap.cc_sent -= incr;
302    }
303}
304
305
306static void
307drop_buffered_data (struct lsquic_stream *stream)
308{
309    decr_conn_cap(stream, stream->sm_n_buffered);
310    stream->sm_n_buffered = 0;
311    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
312        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
313}
314
315
316static void
317destroy_uh (struct lsquic_stream *stream)
318{
319    if (stream->uh)
320    {
321        if (stream->uh->uh_hset)
322            stream->conn_pub->enpub->enp_hsi_if
323                            ->hsi_discard_header_set(stream->uh->uh_hset);
324        free(stream->uh);
325        stream->uh = NULL;
326    }
327}
328
329
330void
331lsquic_stream_destroy (lsquic_stream_t *stream)
332{
333    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
334    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
335                                                            STREAM_ONNEW_DONE)
336    {
337        stream->stream_flags |= STREAM_ONCLOSE_DONE;
338        stream->stream_if->on_close(stream, stream->st_ctx);
339    }
340    if (stream->stream_flags & STREAM_SENDING_FLAGS)
341        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
342    if (stream->stream_flags & STREAM_WANT_READ)
343        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
344    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
345        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
346    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
347        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
348    drop_buffered_data(stream);
349    lsquic_sfcw_consume_rem(&stream->fc);
350    drop_frames_in(stream);
351    if (stream->push_req)
352    {
353        if (stream->push_req->uh_hset)
354            stream->conn_pub->enpub->enp_hsi_if
355                            ->hsi_discard_header_set(stream->push_req->uh_hset);
356        free(stream->push_req);
357    }
358    destroy_uh(stream);
359    free(stream->sm_buf);
360    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
361    SM_HISTORY_DUMP_REMAINING(stream);
362    free(stream);
363}
364
365
366static int
367stream_is_finished (const lsquic_stream_t *stream)
368{
369    return lsquic_stream_is_closed(stream)
370           /* n_unacked checks that no outgoing packets that reference this
371            * stream are outstanding:
372            */
373        && 0 == stream->n_unacked
374           /* This checks that no packets that reference this stream will
375            * become outstanding:
376            */
377        && 0 == (stream->stream_flags & STREAM_SEND_RST)
378        && ((stream->stream_flags & STREAM_FORCE_FINISH)
379          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
380           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
381}
382
383
384static void
385maybe_finish_stream (lsquic_stream_t *stream)
386{
387    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
388                                                    stream_is_finished(stream))
389    {
390        LSQ_DEBUG("stream %u is now finished", stream->id);
391        SM_HISTORY_APPEND(stream, SHE_FINISHED);
392        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
393            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
394                                                    next_service_stream);
395        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
396    }
397}
398
399
400static void
401maybe_schedule_call_on_close (lsquic_stream_t *stream)
402{
403    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
404                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
405            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
406    {
407        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
408            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
409                                                    next_service_stream);
410        stream->stream_flags |= STREAM_CALL_ONCLOSE;
411        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
412        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
413    }
414}
415
416
417void
418lsquic_stream_call_on_close (lsquic_stream_t *stream)
419{
420    assert(stream->stream_flags & STREAM_ONNEW_DONE);
421    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
422    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
423        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
424                                                    next_service_stream);
425    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
426    {
427        LSQ_DEBUG("calling on_close for stream %u", stream->id);
428        stream->stream_flags |= STREAM_ONCLOSE_DONE;
429        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
430        stream->stream_if->on_close(stream, stream->st_ctx);
431    }
432    else
433        assert(0);
434}
435
436
437int
438lsquic_stream_readable (const lsquic_stream_t *stream)
439{
440    /* A stream is readable if one of the following is true: */
441    return
442        /* - It is already finished: in that case, lsquic_stream_read() will
443         *   return 0.
444         */
445            (stream->stream_flags & STREAM_FIN_REACHED)
446        /* - The stream is reset, by either side.  In this case,
447         *   lsquic_stream_read() will return -1 (we want the user to be
448         *   able to collect the error).
449         */
450        ||  (stream->stream_flags & STREAM_RST_FLAGS)
451        /* - Either we are not in HTTP mode or the HTTP headers have been
452         *   received and the headers or data from the stream can be read.
453         */
454        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
455                                                        == STREAM_USE_HEADERS)
456            && (stream->uh != NULL
457                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
458                                                        stream->read_offset)))
459    ;
460}
461
462
463size_t
464lsquic_stream_write_avail (const struct lsquic_stream *stream)
465{
466    uint64_t stream_avail, conn_avail;
467
468    stream_avail = stream->max_send_off - stream->tosend_off
469                                                - stream->sm_n_buffered;
470    if (stream->stream_flags & STREAM_CONN_LIMITED)
471    {
472        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
473        if (conn_avail < stream_avail)
474            return conn_avail;
475    }
476
477    return stream_avail;
478}
479
480
481int
482lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
483{
484    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
485                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
486    {
487        return -1;
488    }
489    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
490    {
491        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
492            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
493                                                    next_send_stream);
494        stream->stream_flags |= STREAM_SEND_WUF;
495    }
496    return 0;
497}
498
499
500int
501lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
502{
503    uint64_t max_off;
504    int got_next_offset;
505    enum ins_frame ins_frame;
506
507    assert(frame->packet_in);
508
509    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
510    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
511        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
512
513    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
514                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
515    {
516        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
517        lsquic_malo_put(frame);
518        return -1;
519    }
520
521    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
522  insert_frame:
523    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
524    if (INS_FRAME_OK == ins_frame)
525    {
526        /* Update maximum offset in the flow controller and check for flow
527         * control violation:
528         */
529        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
530        if (0 != lsquic_stream_update_sfcw(stream, max_off))
531            return -1;
532        if (frame->data_frame.df_fin)
533        {
534            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
535            stream->stream_flags |= STREAM_FIN_RECVD;
536            maybe_finish_stream(stream);
537        }
538        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
539                (stream->data_in->di_flags & DI_SWITCH_IMPL))
540        {
541            stream->data_in = stream->data_in->di_if->di_switch_impl(
542                                        stream->data_in, stream->read_offset);
543            if (!stream->data_in)
544            {
545                stream->data_in = data_in_error_new();
546                return -1;
547            }
548        }
549        if (got_next_offset)
550            /* Checking the offset saves di_get_frame() call */
551            maybe_conn_to_tickable_if_readable(stream);
552        return 0;
553    }
554    else if (INS_FRAME_DUP == ins_frame)
555    {
556        return 0;
557    }
558    else if (INS_FRAME_OVERLAP == ins_frame)
559    {
560        LSQ_DEBUG("overlap: switching DATA IN implementation");
561        stream->data_in = stream->data_in->di_if->di_switch_impl(
562                                    stream->data_in, stream->read_offset);
563        if (stream->data_in)
564            goto insert_frame;
565        stream->data_in = data_in_error_new();
566        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
567        lsquic_malo_put(frame);
568        return -1;
569    }
570    else
571    {
572        assert(INS_FRAME_ERR == ins_frame);
573        return -1;
574    }
575}
576
577
578static void
579drop_frames_in (lsquic_stream_t *stream)
580{
581    if (stream->data_in)
582    {
583        stream->data_in->di_if->di_destroy(stream->data_in);
584        /* To avoid checking whether `data_in` is set, just set to the error
585         * data-in stream.  It does the right thing after incoming data is
586         * dropped.
587         */
588        stream->data_in = data_in_error_new();
589    }
590}
591
592
593static void
594maybe_elide_stream_frames (struct lsquic_stream *stream)
595{
596    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
597    {
598        if (stream->n_unacked)
599            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
600                                                stream->id);
601        stream->stream_flags |= STREAM_FRAMES_ELIDED;
602    }
603}
604
605
606int
607lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
608                      uint32_t error_code)
609{
610
611    if (stream->stream_flags & STREAM_RST_RECVD)
612    {
613        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
614        return 0;
615    }
616
617    SM_HISTORY_APPEND(stream, SHE_RST_IN);
618    /* This flag must always be set, even if we are "ignoring" it: it is
619     * used by elision code.
620     */
621    stream->stream_flags |= STREAM_RST_RECVD;
622
623    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
624    {
625        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
626            "smaller than that of byte following the last byte we have seen: "
627            "0x%"PRIX64, stream->id, offset,
628            lsquic_sfcw_get_max_recv_off(&stream->fc));
629        return -1;
630    }
631
632    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
633    {
634        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
635            " violates flow control", stream->id, offset);
636        return -1;
637    }
638
639    /* Let user collect error: */
640    maybe_conn_to_tickable_if_readable(stream);
641
642    lsquic_sfcw_consume_rem(&stream->fc);
643    drop_frames_in(stream);
644    drop_buffered_data(stream);
645    maybe_elide_stream_frames(stream);
646
647    if (!(stream->stream_flags &
648                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
649        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
650
651    stream->stream_flags |= STREAM_RST_RECVD;
652
653    maybe_finish_stream(stream);
654    maybe_schedule_call_on_close(stream);
655
656    return 0;
657}
658
659
660uint64_t
661lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
662{
663    assert(stream->stream_flags & STREAM_SEND_WUF);
664    stream->stream_flags &= ~STREAM_SEND_WUF;
665    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
666        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
667    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
668}
669
670
671void
672lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
673{
674    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
675    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
676    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
677    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
678        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
679}
680
681
682void
683lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
684{
685    assert(stream->stream_flags & STREAM_SEND_RST);
686    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
687    stream->stream_flags &= ~STREAM_SEND_RST;
688    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
689        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
690    stream->stream_flags |= STREAM_RST_SENT;
691    maybe_finish_stream(stream);
692}
693
694
695static size_t
696read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
697{
698    struct http1x_headers *h1h = stream->uh->uh_hset;
699    size_t n_avail = h1h->h1h_size - h1h->h1h_off;
700    if (n_avail < len)
701        len = n_avail;
702    memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len);
703    h1h->h1h_off += len;
704    if (h1h->h1h_off == h1h->h1h_size)
705    {
706        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
707        destroy_uh(stream);
708        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
709        {
710            stream->stream_flags |= STREAM_FIN_REACHED;
711            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
712        }
713    }
714    return len;
715}
716
717
718/* This function returns 0 when EOF is reached.
719 */
720ssize_t
721lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
722                     int iovcnt)
723{
724    size_t total_nread, nread;
725    int processed_frames, read_unc_headers, iovidx;
726    unsigned char *p, *end;
727
728    SM_HISTORY_APPEND(stream, SHE_USER_READ);
729
730#define NEXT_IOV() do {                                             \
731    ++iovidx;                                                       \
732    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
733        ++iovidx;                                                   \
734    if (iovidx < iovcnt)                                            \
735    {                                                               \
736        p = iov[iovidx].iov_base;                                   \
737        end = p + iov[iovidx].iov_len;                              \
738    }                                                               \
739    else                                                            \
740        p = end = NULL;                                             \
741} while (0)
742
743#define AVAIL() (end - p)
744
745    if (stream->stream_flags & STREAM_RST_FLAGS)
746    {
747        errno = ECONNRESET;
748        return -1;
749    }
750    if (stream->stream_flags & STREAM_U_READ_DONE)
751    {
752        errno = EBADF;
753        return -1;
754    }
755    if (stream->stream_flags & STREAM_FIN_REACHED)
756        return 0;
757
758    total_nread = 0;
759    processed_frames = 0;
760
761    iovidx = -1;
762    NEXT_IOV();
763
764    if (stream->uh)
765    {
766        if (stream->uh->uh_flags & UH_H1H)
767        {
768            if (AVAIL())
769            {
770                read_unc_headers = 1;
771                do
772                {
773                    nread = read_uh(stream, p, AVAIL());
774                    p += nread;
775                    total_nread += nread;
776                    if (p == end)
777                        NEXT_IOV();
778                }
779                while (stream->uh && AVAIL());
780            }
781            else
782                read_unc_headers = 0;
783        }
784        else
785        {
786            LSQ_INFO("header set not claimed: cannot read from stream");
787            return -1;
788        }
789    }
790    else
791        read_unc_headers = 0;
792
793    struct data_frame *data_frame;
794    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
795    {
796        ++processed_frames;
797        size_t navail = data_frame->df_size - data_frame->df_read_off;
798        size_t ntowrite = AVAIL();
799        if (navail < ntowrite)
800            ntowrite = navail;
801        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
802        p += ntowrite;
803        data_frame->df_read_off += ntowrite;
804        stream->read_offset += ntowrite;
805        total_nread += ntowrite;
806        if (data_frame->df_read_off == data_frame->df_size)
807        {
808            const int fin = data_frame->df_fin;
809            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
810            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
811                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
812            {
813                stream->data_in = stream->data_in->di_if->di_switch_impl(
814                                            stream->data_in, stream->read_offset);
815                if (!stream->data_in)
816                {
817                    stream->data_in = data_in_error_new();
818                    return -1;
819                }
820            }
821            if (fin)
822            {
823                stream->stream_flags |= STREAM_FIN_REACHED;
824                break;
825            }
826        }
827        if (p == end)
828            NEXT_IOV();
829    }
830
831    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
832                                        total_nread, stream->read_offset);
833
834    if (processed_frames)
835    {
836        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
837        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
838        {
839            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
840                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
841            stream->stream_flags |= STREAM_SEND_WUF;
842            maybe_conn_to_tickable_if_writeable(stream, 1);
843        }
844    }
845
846    if (processed_frames || read_unc_headers)
847    {
848        return total_nread;
849    }
850    else
851    {
852        assert(0 == total_nread);
853        errno = EWOULDBLOCK;
854        return -1;
855    }
856}
857
858
859ssize_t
860lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
861{
862    struct iovec iov = { .iov_base = buf, .iov_len = len, };
863    return lsquic_stream_readv(stream, &iov, 1);
864}
865
866
867static void
868stream_shutdown_read (lsquic_stream_t *stream)
869{
870    if (!(stream->stream_flags & STREAM_U_READ_DONE))
871    {
872        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
873        stream->stream_flags |= STREAM_U_READ_DONE;
874        stream_wantread(stream, 0);
875        maybe_finish_stream(stream);
876    }
877}
878
879
880static void
881stream_shutdown_write (lsquic_stream_t *stream)
882{
883    if (stream->stream_flags & STREAM_U_WRITE_DONE)
884        return;
885
886    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
887    stream->stream_flags |= STREAM_U_WRITE_DONE;
888    stream_wantwrite(stream, 0);
889
890    /* Don't bother to check whether there is anything else to write if
891     * the flags indicate that nothing else should be written.
892     */
893    if (!(stream->stream_flags &
894                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
895    {
896        if (stream->sm_n_buffered == 0)
897        {
898            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
899                                                 stream))
900            {
901                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
902                stream->stream_flags |= STREAM_FIN_SENT;
903            }
904            else
905            {
906                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
907                          "flag in it");
908                (void) stream_flush_nocheck(stream);
909            }
910        }
911        else
912            (void) stream_flush_nocheck(stream);
913    }
914}
915
916
917int
918lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
919{
920    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
921    if (lsquic_stream_is_closed(stream))
922    {
923        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
924        errno = EBADF;
925        return -1;
926    }
927    /* 0: read, 1: write: 2: read and write
928     */
929    if (how < 0 || how > 2)
930    {
931        errno = EINVAL;
932        return -1;
933    }
934
935    if (how)
936        stream_shutdown_write(stream);
937    if (how != 1)
938        stream_shutdown_read(stream);
939
940    maybe_finish_stream(stream);
941    maybe_schedule_call_on_close(stream);
942    if (how)
943        maybe_conn_to_tickable_if_writeable(stream, 1);
944
945    return 0;
946}
947
948
949void
950lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
951{
952    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
953    if (LSQUIC_STREAM_HANDSHAKE == stream->id
954        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
955                                LSQUIC_STREAM_HEADERS == stream->id))
956    {
957        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
958        stream->stream_flags |= STREAM_FORCE_FINISH;
959        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
960    }
961    maybe_finish_stream(stream);
962    maybe_schedule_call_on_close(stream);
963}
964
965
966static void
967fake_reset_unused_stream (lsquic_stream_t *stream)
968{
969    stream->stream_flags |=
970        STREAM_RST_RECVD    /* User will pick this up on read or write */
971      | STREAM_RST_SENT     /* Don't send anything else on this stream */
972    ;
973
974    /* Cancel all writes to the network scheduled for this stream: */
975    if (stream->stream_flags & STREAM_SENDING_FLAGS)
976        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
977                                                next_send_stream);
978    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
979
980    LSQ_DEBUG("fake-reset stream %u%s",
981                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
982    maybe_finish_stream(stream);
983    maybe_schedule_call_on_close(stream);
984}
985
986
987/* This function should only be called for locally-initiated streams whose ID
988 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
989 * frame sent by peer but we have not yet received it and created a stream.
990 * In this situation, we mark the stream as reset, so that user's on_read or
991 * on_write event callback picks up the error.  That, in turn, should result
992 * in stream being closed.
993 *
994 * If we have received any data frames on this stream, this probably indicates
995 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
996 * lower than this.  However, we still try to handle it gracefully and peform
997 * a shutdown, as if the stream was not reset.
998 */
999void
1000lsquic_stream_received_goaway (lsquic_stream_t *stream)
1001{
1002    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1003    if (0 == stream->read_offset &&
1004                            stream->data_in->di_if->di_empty(stream->data_in))
1005        fake_reset_unused_stream(stream);       /* Normal condition */
1006    else
1007    {   /* This is odd, let's handle it the best we can: */
1008        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1009        lsquic_stream_shutdown_internal(stream);
1010    }
1011}
1012
1013
1014uint64_t
1015lsquic_stream_read_offset (const lsquic_stream_t *stream)
1016{
1017    return stream->read_offset;
1018}
1019
1020
1021static int
1022stream_wantread (lsquic_stream_t *stream, int is_want)
1023{
1024    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
1025    const int new_val = !!is_want;
1026    if (old_val != new_val)
1027    {
1028        if (new_val)
1029        {
1030            if (!old_val)
1031                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1032                                                            next_read_stream);
1033            stream->stream_flags |= STREAM_WANT_READ;
1034        }
1035        else
1036        {
1037            stream->stream_flags &= ~STREAM_WANT_READ;
1038            if (old_val)
1039                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1040                                                            next_read_stream);
1041        }
1042    }
1043    return old_val;
1044}
1045
1046
1047static void
1048maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1049{
1050    assert(STREAM_WRITE_Q_FLAGS & flag);
1051    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1052        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1053                                                        next_write_stream);
1054    stream->stream_flags |= flag;
1055}
1056
1057
1058static void
1059maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1060{
1061    assert(STREAM_WRITE_Q_FLAGS & flag);
1062    if (stream->stream_flags & flag)
1063    {
1064        stream->stream_flags &= ~flag;
1065        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1066            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1067                                                        next_write_stream);
1068    }
1069}
1070
1071
1072static int
1073stream_wantwrite (lsquic_stream_t *stream, int is_want)
1074{
1075    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1076    const int new_val = !!is_want;
1077    if (old_val != new_val)
1078    {
1079        if (new_val)
1080            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1081        else
1082            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1083    }
1084    return old_val;
1085}
1086
1087
1088int
1089lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1090{
1091    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1092    {
1093        if (is_want)
1094            maybe_conn_to_tickable_if_readable(stream);
1095        return stream_wantread(stream, is_want);
1096    }
1097    else
1098    {
1099        errno = EBADF;
1100        return -1;
1101    }
1102}
1103
1104
1105int
1106lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1107{
1108    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1109    {
1110        if (is_want)
1111            maybe_conn_to_tickable_if_writeable(stream, 1);
1112        return stream_wantwrite(stream, is_want);
1113    }
1114    else
1115    {
1116        errno = EBADF;
1117        return -1;
1118    }
1119}
1120
1121
1122#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1123    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1124
1125
1126static void
1127stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1128{
1129    unsigned no_progress_count, no_progress_limit;
1130    enum stream_flags flags;
1131    uint64_t size;
1132
1133    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1134
1135    no_progress_count = 0;
1136    while ((stream->stream_flags & STREAM_WANT_READ)
1137                                            && lsquic_stream_readable(stream))
1138    {
1139        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1140        size  = stream->read_offset;
1141
1142        stream->stream_if->on_read(stream, stream->st_ctx);
1143
1144        if (no_progress_limit && size == stream->read_offset &&
1145                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1146        {
1147            ++no_progress_count;
1148            if (no_progress_count >= no_progress_limit)
1149            {
1150                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1151                    "progress) in user code reading from stream",
1152                    no_progress_count,
1153                    no_progress_count == 1 ? "" : "s");
1154                break;
1155            }
1156        }
1157        else
1158            no_progress_count = 0;
1159    }
1160}
1161
1162
1163static void
1164stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1165{
1166    unsigned no_progress_count, no_progress_limit;
1167    enum stream_flags flags;
1168
1169    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1170
1171    no_progress_count = 0;
1172    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1173    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1174                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1175           && lsquic_stream_write_avail(stream))
1176    {
1177        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1178
1179        stream->stream_if->on_write(stream, stream->st_ctx);
1180
1181        if (no_progress_limit &&
1182            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1183        {
1184            ++no_progress_count;
1185            if (no_progress_count >= no_progress_limit)
1186            {
1187                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1188                    "progress) in user code writing to stream",
1189                    no_progress_count,
1190                    no_progress_count == 1 ? "" : "s");
1191                break;
1192            }
1193        }
1194        else
1195            no_progress_count = 0;
1196    }
1197}
1198
1199
1200static void
1201stream_dispatch_read_events_once (lsquic_stream_t *stream)
1202{
1203    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1204    {
1205        stream->stream_if->on_read(stream, stream->st_ctx);
1206    }
1207}
1208
1209
1210static void
1211maybe_mark_as_blocked (lsquic_stream_t *stream)
1212{
1213    struct lsquic_conn_cap *cc;
1214
1215    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1216    {
1217        if (stream->blocked_off < stream->max_send_off)
1218        {
1219            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1220            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1221                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1222                                                            next_send_stream);
1223            stream->stream_flags |= STREAM_SEND_BLOCKED;
1224            LSQ_DEBUG("marked stream-blocked at stream offset "
1225                                            "%"PRIu64, stream->blocked_off);
1226        }
1227        else
1228            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1229                " has been, or is about to be, sent", stream->blocked_off);
1230    }
1231
1232    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1233        && (cc = &stream->conn_pub->conn_cap,
1234                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1235    {
1236        if (cc->cc_blocked < cc->cc_max)
1237        {
1238            cc->cc_blocked = cc->cc_max;
1239            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1240            LSQ_DEBUG("marked connection-blocked at connection offset "
1241                                                    "%"PRIu64, cc->cc_max);
1242        }
1243        else
1244            LSQ_DEBUG("stream has already been marked connection-blocked "
1245                "at offset %"PRIu64, cc->cc_blocked);
1246    }
1247}
1248
1249
1250void
1251lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1252{
1253    assert(stream->stream_flags & STREAM_WANT_READ);
1254
1255    if (stream->stream_flags & STREAM_RW_ONCE)
1256        stream_dispatch_read_events_once(stream);
1257    else
1258        stream_dispatch_read_events_loop(stream);
1259}
1260
1261
1262void
1263lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1264{
1265    int progress;
1266    uint64_t tosend_off;
1267    unsigned short n_buffered;
1268    enum stream_flags flags;
1269
1270    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1271    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1272    tosend_off = stream->tosend_off;
1273    n_buffered = stream->sm_n_buffered;
1274
1275    if (stream->stream_flags & STREAM_WANT_FLUSH)
1276        (void) stream_flush(stream);
1277
1278    if (stream->stream_flags & STREAM_RW_ONCE)
1279    {
1280        if ((stream->stream_flags & STREAM_WANT_WRITE)
1281            && lsquic_stream_write_avail(stream))
1282        {
1283            stream->stream_if->on_write(stream, stream->st_ctx);
1284        }
1285    }
1286    else
1287        stream_dispatch_write_events_loop(stream);
1288
1289    /* Progress means either flags or offsets changed: */
1290    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1291                        stream->tosend_off == tosend_off &&
1292                            stream->sm_n_buffered == n_buffered);
1293
1294    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1295    {
1296        if (progress)
1297        {   /* Move the stream to the end of the list to ensure fairness. */
1298            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1299                                                            next_write_stream);
1300            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1301                                                            next_write_stream);
1302        }
1303    }
1304}
1305
1306
1307static size_t
1308inner_reader_empty_size (void *ctx)
1309{
1310    return 0;
1311}
1312
1313
1314static size_t
1315inner_reader_empty_read (void *ctx, void *buf, size_t count)
1316{
1317    return 0;
1318}
1319
1320
1321static int
1322stream_flush (lsquic_stream_t *stream)
1323{
1324    struct lsquic_reader empty_reader;
1325    ssize_t nw;
1326
1327    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1328    assert(stream->sm_n_buffered > 0 ||
1329        /* Flushing is also used to packetize standalone FIN: */
1330        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1331                                                    == STREAM_U_WRITE_DONE));
1332
1333    empty_reader.lsqr_size = inner_reader_empty_size;
1334    empty_reader.lsqr_read = inner_reader_empty_read;
1335    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1336    nw = stream_write_to_packets(stream, &empty_reader, 0);
1337
1338    if (nw >= 0)
1339    {
1340        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1341        return 0;
1342    }
1343    else
1344        return -1;
1345}
1346
1347
1348static int
1349stream_flush_nocheck (lsquic_stream_t *stream)
1350{
1351    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1352    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1353    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1354
1355    return stream_flush(stream);
1356}
1357
1358
1359int
1360lsquic_stream_flush (lsquic_stream_t *stream)
1361{
1362    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1363    {
1364        LSQ_DEBUG("cannot flush closed stream");
1365        errno = EBADF;
1366        return -1;
1367    }
1368
1369    if (0 == stream->sm_n_buffered)
1370    {
1371        LSQ_DEBUG("flushing 0 bytes: noop");
1372        return 0;
1373    }
1374
1375    return stream_flush_nocheck(stream);
1376}
1377
1378
1379/* The flush threshold is the maximum size of stream data that can be sent
1380 * in a full packet.
1381 */
1382#ifdef NDEBUG
1383static
1384#endif
1385       size_t
1386lsquic_stream_flush_threshold (const struct lsquic_stream *stream)
1387{
1388    enum packet_out_flags flags;
1389    enum lsquic_packno_bits bits;
1390    unsigned packet_header_sz, stream_header_sz;
1391    size_t threshold;
1392
1393    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1394    flags = bits << POBIT_SHIFT;
1395    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1396        flags |= PO_CONN_ID;
1397    if (LSQUIC_STREAM_HANDSHAKE == stream->id)
1398        flags |= PO_LONGHEAD;
1399
1400    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags);
1401    stream_header_sz = stream->conn_pub->lconn->cn_pf
1402            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1403
1404    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1405              - packet_header_sz - stream_header_sz;
1406    return threshold;
1407}
1408
1409
1410#define COMMON_WRITE_CHECKS() do {                                          \
1411    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1412                                                   == STREAM_USE_HEADERS)   \
1413    {                                                                       \
1414        LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \
1415        errno = EILSEQ;                                                     \
1416        return -1;                                                          \
1417    }                                                                       \
1418    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1419    {                                                                       \
1420        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1421        errno = ECONNRESET;                                                 \
1422        return -1;                                                          \
1423    }                                                                       \
1424    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1425    {                                                                       \
1426        LSQ_INFO("Attempt to write to stream after it was closed for "      \
1427                                                                "writing"); \
1428        errno = EBADF;                                                      \
1429        return -1;                                                          \
1430    }                                                                       \
1431} while (0)
1432
1433
1434struct frame_gen_ctx
1435{
1436    lsquic_stream_t      *fgc_stream;
1437    struct lsquic_reader *fgc_reader;
1438    /* We keep our own count of how many bytes were read from reader because
1439     * some readers are external.  The external caller does not have to rely
1440     * on our count, but it can.
1441     */
1442    size_t                fgc_nread_from_reader;
1443};
1444
1445
1446static size_t
1447frame_gen_size (void *ctx)
1448{
1449    struct frame_gen_ctx *fg_ctx = ctx;
1450    size_t available, remaining;
1451
1452    /* Make sure we are not writing past available size: */
1453    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1454    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1455    if (available < remaining)
1456        remaining = available;
1457
1458    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1459}
1460
1461
1462static int
1463frame_gen_fin (void *ctx)
1464{
1465    struct frame_gen_ctx *fg_ctx = ctx;
1466    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1467        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1468        /* Do not use frame_gen_size() as it may chop the real size: */
1469        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1470}
1471
1472
1473static void
1474incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1475{
1476    if (stream->stream_flags & STREAM_CONN_LIMITED)
1477    {
1478        stream->conn_pub->conn_cap.cc_sent += incr;
1479        assert(stream->conn_pub->conn_cap.cc_sent
1480                                    <= stream->conn_pub->conn_cap.cc_max);
1481    }
1482}
1483
1484
1485static size_t
1486frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1487{
1488    struct frame_gen_ctx *fg_ctx = ctx;
1489    unsigned char *p = begin_buf;
1490    unsigned char *const end = p + len;
1491    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1492    size_t n_written, available, n_to_write;
1493
1494    if (stream->sm_n_buffered > 0)
1495    {
1496        if (len <= stream->sm_n_buffered)
1497        {
1498            memcpy(p, stream->sm_buf, len);
1499            memmove(stream->sm_buf, stream->sm_buf + len,
1500                                                stream->sm_n_buffered - len);
1501            stream->sm_n_buffered -= len;
1502            stream->tosend_off += len;
1503            *fin = frame_gen_fin(fg_ctx);
1504            return len;
1505        }
1506        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1507        p += stream->sm_n_buffered;
1508        stream->sm_n_buffered = 0;
1509    }
1510
1511    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1512    n_to_write = end - p;
1513    if (n_to_write > available)
1514        n_to_write = available;
1515    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1516                                              n_to_write);
1517    p += n_written;
1518    fg_ctx->fgc_nread_from_reader += n_written;
1519    *fin = frame_gen_fin(fg_ctx);
1520    stream->tosend_off += p - (const unsigned char *) begin_buf;
1521    incr_conn_cap(stream, n_written);
1522    return p - (const unsigned char *) begin_buf;
1523}
1524
1525
1526static void
1527check_flush_threshold (lsquic_stream_t *stream)
1528{
1529    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1530                            stream->tosend_off >= stream->sm_flush_to)
1531    {
1532        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1533                                                    stream->sm_flush_to);
1534        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1535    }
1536}
1537
1538
1539static struct lsquic_packet_out *
1540get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1541                      const struct lsquic_stream *stream)
1542{
1543    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1544}
1545
1546
1547static struct lsquic_packet_out * (* const get_packet[])(
1548    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1549{
1550    lsquic_send_ctl_get_packet_for_stream,
1551    get_brand_new_packet,
1552};
1553
1554
1555static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1556stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1557{
1558    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1559    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1560    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1561    unsigned stream_header_sz, need_at_least, off;
1562    lsquic_packet_out_t *packet_out;
1563    int len, s, hsk;
1564
1565    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1566                                                        stream->tosend_off);
1567    need_at_least = stream_header_sz + (size > 0);
1568    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1569  get_packet:
1570    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1571    if (!packet_out)
1572        return SWTP_STOP;
1573    if (hsk)
1574        packet_out->po_header_type = stream->tosend_off == 0
1575                                            ? HETY_INITIAL : HETY_HANDSHAKE;
1576
1577    off = packet_out->po_data_sz;
1578    len = pf->pf_gen_stream_frame(
1579                packet_out->po_data + packet_out->po_data_sz,
1580                lsquic_packet_out_avail(packet_out), stream->id,
1581                stream->tosend_off,
1582                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1583    if (len < 0)
1584    {
1585        if (-len > (int) need_at_least)
1586        {
1587            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
1588                "%u bytes, will try again", -len, need_at_least);
1589            need_at_least = -len;
1590            goto get_packet;
1591        }
1592        else
1593        {
1594            LSQ_ERROR("could not generate stream frame");
1595            return SWTP_ERROR;
1596        }
1597    }
1598
1599    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1600                            packet_out->po_data + packet_out->po_data_sz, len);
1601    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1602    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1603    if (0 == lsquic_packet_out_avail(packet_out))
1604        packet_out->po_flags |= PO_STREAM_END;
1605    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1606                                     stream, QUIC_FRAME_STREAM, off, len);
1607    if (s != 0)
1608    {
1609        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1610        return SWTP_ERROR;
1611    }
1612
1613    check_flush_threshold(stream);
1614
1615    /* XXX: I don't like it that this is here */
1616    if (hsk && !(packet_out->po_flags & PO_HELLO))
1617    {
1618        lsquic_packet_out_zero_pad(packet_out);
1619        packet_out->po_flags |= PO_HELLO;
1620        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1621    }
1622
1623    return SWTP_OK;
1624}
1625
1626
1627static void
1628abort_connection (struct lsquic_stream *stream)
1629{
1630    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1631        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1632                                                next_service_stream);
1633    stream->stream_flags |= STREAM_ABORT_CONN;
1634    LSQ_WARN("connection will be aborted");
1635    maybe_conn_to_tickable(stream);
1636}
1637
1638
1639static ssize_t
1640stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1641                         size_t thresh)
1642{
1643    size_t size;
1644    ssize_t nw;
1645    unsigned seen_ok;
1646    struct frame_gen_ctx fg_ctx = {
1647        .fgc_stream = stream,
1648        .fgc_reader = reader,
1649        .fgc_nread_from_reader = 0,
1650    };
1651
1652    seen_ok = 0;
1653    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1654           || frame_gen_fin(&fg_ctx))
1655    {
1656        switch (stream_write_to_packet(&fg_ctx, size))
1657        {
1658        case SWTP_OK:
1659            if (!seen_ok++)
1660                maybe_conn_to_tickable_if_writeable(stream, 0);
1661            if (frame_gen_fin(&fg_ctx))
1662            {
1663                stream->stream_flags |= STREAM_FIN_SENT;
1664                goto end;
1665            }
1666            else
1667                break;
1668        case SWTP_STOP:
1669            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1670            goto end;
1671        default:
1672            abort_connection(stream);
1673            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1674            return -1;
1675        }
1676    }
1677
1678    if (thresh)
1679    {
1680        assert(size < thresh);
1681        assert(size >= stream->sm_n_buffered);
1682        size -= stream->sm_n_buffered;
1683        if (size > 0)
1684        {
1685            nw = save_to_buffer(stream, reader, size);
1686            if (nw < 0)
1687                return -1;
1688            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1689        }
1690    }
1691    else
1692    {
1693        /* We count flushed data towards both stream and connection limits,
1694         * so we should have been able to packetize all of it:
1695         */
1696        assert(0 == stream->sm_n_buffered);
1697        assert(size == 0);
1698    }
1699
1700    maybe_mark_as_blocked(stream);
1701
1702  end:
1703    return fg_ctx.fgc_nread_from_reader;
1704}
1705
1706
1707/* Perform an implicit flush when we hit connection limit while buffering
1708 * data.  This is to prevent a (theoretical) stall:
1709 *
1710 * Imagine a number of streams, all of which buffered some data.  The buffered
1711 * data is up to connection cap, which means no further writes are possible.
1712 * None of them flushes, which means that data is not sent and connection
1713 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1714 */
1715static int
1716maybe_flush_stream (struct lsquic_stream *stream)
1717{
1718    if (stream->sm_n_buffered > 0
1719          && (stream->stream_flags & STREAM_CONN_LIMITED)
1720            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1721        return stream_flush_nocheck(stream);
1722    else
1723        return 0;
1724}
1725
1726
1727static ssize_t
1728save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1729                                                                size_t len)
1730{
1731    size_t avail, n_written;
1732
1733    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1734
1735    if (!stream->sm_buf)
1736    {
1737        stream->sm_buf = malloc(SM_BUF_SIZE);
1738        if (!stream->sm_buf)
1739            return -1;
1740    }
1741
1742    avail = lsquic_stream_write_avail(stream);
1743    if (avail < len)
1744        len = avail;
1745
1746    n_written = reader->lsqr_read(reader->lsqr_ctx,
1747                        stream->sm_buf + stream->sm_n_buffered, len);
1748    stream->sm_n_buffered += n_written;
1749    incr_conn_cap(stream, n_written);
1750    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1751              n_written, stream->sm_n_buffered);
1752    if (0 != maybe_flush_stream(stream))
1753        return -1;
1754    return n_written;
1755}
1756
1757
1758static ssize_t
1759stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1760{
1761    size_t thresh, len;
1762
1763    thresh = lsquic_stream_flush_threshold(stream);
1764    len = reader->lsqr_size(reader->lsqr_ctx);
1765    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1766                                    stream->sm_n_buffered + len < thresh)
1767        return save_to_buffer(stream, reader, len);
1768    else
1769        return stream_write_to_packets(stream, reader, thresh);
1770}
1771
1772
1773ssize_t
1774lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1775{
1776    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1777    return lsquic_stream_writev(stream, &iov, 1);
1778}
1779
1780
1781struct inner_reader_iovec {
1782    const struct iovec       *iov;
1783    const struct iovec *end;
1784    unsigned                  cur_iovec_off;
1785};
1786
1787
1788static size_t
1789inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1790{
1791    struct inner_reader_iovec *const iro = ctx;
1792    unsigned char *p = buf;
1793    unsigned char *const end = p + count;
1794    unsigned n_tocopy;
1795
1796    while (iro->iov < iro->end && p < end)
1797    {
1798        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1799        if (n_tocopy > (unsigned) (end - p))
1800            n_tocopy = end - p;
1801        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1802                                                                    n_tocopy);
1803        p += n_tocopy;
1804        iro->cur_iovec_off += n_tocopy;
1805        if (iro->iov->iov_len == iro->cur_iovec_off)
1806        {
1807            ++iro->iov;
1808            iro->cur_iovec_off = 0;
1809        }
1810    }
1811
1812    return p + count - end;
1813}
1814
1815
1816static size_t
1817inner_reader_iovec_size (void *ctx)
1818{
1819    struct inner_reader_iovec *const iro = ctx;
1820    const struct iovec *iov;
1821    size_t size;
1822
1823    size = 0;
1824    for (iov = iro->iov; iov < iro->end; ++iov)
1825        size += iov->iov_len;
1826
1827    return size - iro->cur_iovec_off;
1828}
1829
1830
1831ssize_t
1832lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1833                                                                    int iovcnt)
1834{
1835    COMMON_WRITE_CHECKS();
1836    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1837
1838    struct inner_reader_iovec iro = {
1839        .iov = iov,
1840        .end = iov + iovcnt,
1841        .cur_iovec_off = 0,
1842    };
1843    struct lsquic_reader reader = {
1844        .lsqr_read = inner_reader_iovec_read,
1845        .lsqr_size = inner_reader_iovec_size,
1846        .lsqr_ctx  = &iro,
1847    };
1848
1849    return stream_write(stream, &reader);
1850}
1851
1852
1853ssize_t
1854lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1855{
1856    COMMON_WRITE_CHECKS();
1857    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1858    return stream_write(stream, reader);
1859}
1860
1861
1862int
1863lsquic_stream_send_headers (lsquic_stream_t *stream,
1864                            const lsquic_http_headers_t *headers, int eos)
1865{
1866    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1867                                                     STREAM_U_WRITE_DONE))
1868                == STREAM_USE_HEADERS)
1869    {
1870        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1871                    stream->id, headers, eos, lsquic_stream_priority(stream));
1872        if (0 == s)
1873        {
1874            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1875            stream->stream_flags |= STREAM_HEADERS_SENT;
1876            if (eos)
1877                stream->stream_flags |= STREAM_FIN_SENT;
1878            LSQ_INFO("sent headers for stream %u", stream->id);
1879        }
1880        else
1881            LSQ_WARN("could not send headers: %s", strerror(errno));
1882        return s;
1883    }
1884    else
1885    {
1886        LSQ_INFO("cannot send headers for stream %u in this state", stream->id);
1887        errno = EBADMSG;
1888        return -1;
1889    }
1890}
1891
1892
1893void
1894lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1895{
1896    if (offset > stream->max_send_off)
1897    {
1898        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1899        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1900            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1901        stream->max_send_off = offset;
1902    }
1903    else
1904        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1905            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1906            stream->max_send_off);
1907}
1908
1909
1910/* This function is used to update offsets after handshake completes and we
1911 * learn of peer's limits from the handshake values.
1912 */
1913int
1914lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1915{
1916    LSQ_DEBUG("setting max_send_off to %u", offset);
1917    if (offset > stream->max_send_off)
1918    {
1919        lsquic_stream_window_update(stream, offset);
1920        return 0;
1921    }
1922    else if (offset < stream->tosend_off)
1923    {
1924        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1925            "already sent on this stream (%"PRIu64" bytes)", offset,
1926            stream->tosend_off);
1927        return -1;
1928    }
1929    else
1930    {
1931        stream->max_send_off = offset;
1932        return 0;
1933    }
1934}
1935
1936
1937void
1938lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1939{
1940    lsquic_stream_reset_ext(stream, error_code, 1);
1941}
1942
1943
1944void
1945lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1946                         int do_close)
1947{
1948    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1949    {
1950        LSQ_INFO("reset already sent");
1951        return;
1952    }
1953
1954    SM_HISTORY_APPEND(stream, SHE_RESET);
1955
1956    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1957    stream->error_code = error_code;
1958
1959    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1960        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1961                                                        next_send_stream);
1962    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1963    stream->stream_flags |= STREAM_SEND_RST;
1964
1965    drop_buffered_data(stream);
1966    maybe_elide_stream_frames(stream);
1967    maybe_schedule_call_on_close(stream);
1968
1969    if (do_close)
1970        lsquic_stream_close(stream);
1971    else
1972        maybe_conn_to_tickable_if_writeable(stream, 1);
1973}
1974
1975
1976unsigned
1977lsquic_stream_id (const lsquic_stream_t *stream)
1978{
1979    return stream->id;
1980}
1981
1982
1983struct lsquic_conn *
1984lsquic_stream_conn (const lsquic_stream_t *stream)
1985{
1986    return stream->conn_pub->lconn;
1987}
1988
1989
1990int
1991lsquic_stream_close (lsquic_stream_t *stream)
1992{
1993    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1994    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1995    if (lsquic_stream_is_closed(stream))
1996    {
1997        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1998        errno = EBADF;
1999        return -1;
2000    }
2001    stream_shutdown_write(stream);
2002    stream_shutdown_read(stream);
2003    maybe_schedule_call_on_close(stream);
2004    maybe_finish_stream(stream);
2005    maybe_conn_to_tickable_if_writeable(stream, 1);
2006    return 0;
2007}
2008
2009
2010#ifndef NDEBUG
2011#if __GNUC__
2012__attribute__((weak))
2013#endif
2014#endif
2015void
2016lsquic_stream_acked (lsquic_stream_t *stream)
2017{
2018    assert(stream->n_unacked);
2019    --stream->n_unacked;
2020    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
2021    if (0 == stream->n_unacked)
2022        maybe_finish_stream(stream);
2023}
2024
2025
2026void
2027lsquic_stream_push_req (lsquic_stream_t *stream,
2028                        struct uncompressed_headers *push_req)
2029{
2030    assert(!stream->push_req);
2031    stream->push_req = push_req;
2032    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
2033}
2034
2035
2036int
2037lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2038{
2039    return 1 & ~stream->id;
2040}
2041
2042
2043int
2044lsquic_stream_push_info (const lsquic_stream_t *stream,
2045                                        uint32_t *ref_stream_id, void **hset)
2046{
2047    if (lsquic_stream_is_pushed(stream))
2048    {
2049        assert(stream->push_req);
2050        *ref_stream_id = stream->push_req->uh_stream_id;
2051        *hset          = stream->push_req->uh_hset;
2052        return 0;
2053    }
2054    else
2055        return -1;
2056}
2057
2058
2059int
2060lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2061{
2062    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2063    {
2064        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2065        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2066        stream->stream_flags |= STREAM_HAVE_UH;
2067        if (uh->uh_flags & UH_FIN)
2068            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2069        stream->uh = uh;
2070        if (uh->uh_oth_stream_id == 0)
2071        {
2072            if (uh->uh_weight)
2073                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2074        }
2075        else
2076            LSQ_NOTICE("don't know how to depend on stream %u",
2077                                                        uh->uh_oth_stream_id);
2078        return 0;
2079    }
2080    else
2081    {
2082        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2083        return -1;
2084    }
2085}
2086
2087
2088unsigned
2089lsquic_stream_priority (const lsquic_stream_t *stream)
2090{
2091    return 256 - stream->sm_priority;
2092}
2093
2094
2095int
2096lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2097{
2098    /* The user should never get a reference to the special streams,
2099     * but let's check just in case:
2100     */
2101    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2102        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2103                                LSQUIC_STREAM_HEADERS == stream->id))
2104        return -1;
2105    if (priority < 1 || priority > 256)
2106        return -1;
2107    stream->sm_priority = 256 - priority;
2108    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2109    LSQ_DEBUG("set priority to %u", priority);
2110    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2111    return 0;
2112}
2113
2114
2115int
2116lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2117{
2118    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2119    {
2120        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2121                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2122        {
2123            /* We need to send headers only if we are a) using HEADERS stream
2124             * and b) we already sent initial headers.  If initial headers
2125             * have not been sent yet, stream priority will be sent in the
2126             * HEADERS frame.
2127             */
2128            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2129                                                    stream->id, 0, 0, priority);
2130        }
2131        else
2132            return 0;
2133    }
2134    else
2135        return -1;
2136}
2137
2138
2139lsquic_stream_ctx_t *
2140lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2141{
2142    return stream->st_ctx;
2143}
2144
2145
2146int
2147lsquic_stream_refuse_push (lsquic_stream_t *stream)
2148{
2149    if (lsquic_stream_is_pushed(stream) &&
2150                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2151    {
2152        LSQ_DEBUG("refusing pushed stream: send reset");
2153        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2154        return 0;
2155    }
2156    else
2157        return -1;
2158}
2159
2160
2161size_t
2162lsquic_stream_mem_used (const struct lsquic_stream *stream)
2163{
2164    size_t size;
2165
2166    size = sizeof(stream);
2167    if (stream->sm_buf)
2168        size += SM_BUF_SIZE;
2169    if (stream->data_in)
2170        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2171
2172    return size;
2173}
2174
2175
2176lsquic_cid_t
2177lsquic_stream_cid (const struct lsquic_stream *stream)
2178{
2179    return LSQUIC_LOG_CONN_ID;
2180}
2181
2182
2183void *
2184lsquic_stream_get_hset (struct lsquic_stream *stream)
2185{
2186    void *hset;
2187
2188    if (stream->stream_flags & STREAM_RST_FLAGS)
2189    {
2190        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
2191        errno = ECONNRESET;
2192        return NULL;
2193    }
2194
2195    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2196                                        != (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2197    {
2198        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
2199                                                        stream->stream_flags);
2200        return NULL;
2201    }
2202
2203    if (!stream->uh)
2204    {
2205        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
2206        return NULL;
2207    }
2208
2209    if (stream->uh->uh_flags & UH_H1H)
2210    {
2211        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
2212        return NULL;
2213    }
2214
2215    hset = stream->uh->uh_hset;
2216    stream->uh->uh_hset = NULL;
2217    destroy_uh(stream);
2218    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
2219    {
2220        stream->stream_flags |= STREAM_FIN_REACHED;
2221        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
2222    }
2223    LSQ_DEBUG("return header set");
2224    return hset;
2225}
2226