lsquic_stream.c revision d539a752
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_conn.h"
44#include "lsquic_data_in_if.h"
45#include "lsquic_parse.h"
46#include "lsquic_packet_out.h"
47#include "lsquic_engine_public.h"
48#include "lsquic_senhist.h"
49#include "lsquic_pacer.h"
50#include "lsquic_cubic.h"
51#include "lsquic_send_ctl.h"
52#include "lsquic_headers.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152
153#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
154#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
155        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
156            LSQ_DEBUG("history: [%.*s]",                                    \
157                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
158                (stream)->sm_hist_buf);                                     \
159    } while (0)
160#else
161#   define SM_HISTORY_APPEND(stream, event)
162#   define SM_HISTORY_DUMP_REMAINING(stream)
163#endif
164
165
166static int
167stream_inside_callback (const lsquic_stream_t *stream)
168{
169    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
170}
171
172
173static void
174maybe_conn_to_tickable (lsquic_stream_t *stream)
175{
176    if (!stream_inside_callback(stream))
177        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
178                                           stream->conn_pub->lconn);
179}
180
181
182/* Here, "readable" means that the user is able to read from the stream. */
183static void
184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
185{
186    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
187    {
188        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
189                                           stream->conn_pub->lconn);
190    }
191}
192
193
194/* Here, "writeable" means that data can be put into packets to be
195 * scheduled to be sent out.
196 *
197 * If `check_can_send' is false, it means that we do not need to check
198 * whether packets can be sent.  This check was already performed when
199 * we packetized stream data.
200 */
201static void
202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
203                                                    int check_can_send)
204{
205    if (!stream_inside_callback(stream) &&
206            (!check_can_send
207             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
208          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
209    {
210        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
211                                           stream->conn_pub->lconn);
212    }
213}
214
215
216static int
217stream_stalled (const lsquic_stream_t *stream)
218{
219    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
220           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
221                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
222}
223
224
225/* TODO: The logic to figure out whether the stream is connection limited
226 * should be taken out of the constructor.  The caller should specify this
227 * via one of enum stream_ctor_flags.
228 */
229lsquic_stream_t *
230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
231                       const struct lsquic_stream_if *stream_if,
232                       void *stream_if_ctx, unsigned initial_window,
233                       unsigned initial_send_off,
234                       enum stream_ctor_flags ctor_flags)
235{
236    lsquic_cfcw_t *cfcw;
237    lsquic_stream_t *stream;
238
239    stream = calloc(1, sizeof(*stream));
240    if (!stream)
241        return NULL;
242
243    stream->stream_if = stream_if;
244    stream->id        = id;
245    stream->conn_pub  = conn_pub;
246    stream->sm_onnew_arg = stream_if_ctx;
247    if (!initial_window)
248        initial_window = 16 * 1024;
249    if (LSQUIC_STREAM_HANDSHAKE == id ||
250        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
251        cfcw = NULL;
252    else
253    {
254        cfcw = &conn_pub->cfcw;
255        stream->stream_flags |= STREAM_CONN_LIMITED;
256        if (conn_pub->hs)
257            stream->stream_flags |= STREAM_USE_HEADERS;
258        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
259    }
260    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
261    if (!initial_send_off)
262        initial_send_off = 16 * 1024;
263    stream->max_send_off = initial_send_off;
264    if (ctor_flags & SCF_USE_DI_HASH)
265        stream->data_in = data_in_hash_new(conn_pub, id, 0);
266    else
267        stream->data_in = data_in_nocopy_new(conn_pub, id);
268    LSQ_DEBUG("created stream %u @%p", id, stream);
269    SM_HISTORY_APPEND(stream, SHE_CREATED);
270    if (ctor_flags & SCF_DI_AUTOSWITCH)
271        stream->stream_flags |= STREAM_AUTOSWITCH;
272    if (ctor_flags & SCF_CALL_ON_NEW)
273        lsquic_stream_call_on_new(stream);
274    if (ctor_flags & SCF_DISP_RW_ONCE)
275        stream->stream_flags |= STREAM_RW_ONCE;
276    if (ctor_flags & SCF_CRITICAL)
277        stream->stream_flags |= STREAM_CRITICAL;
278    return stream;
279}
280
281
282void
283lsquic_stream_call_on_new (lsquic_stream_t *stream)
284{
285    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
286    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
287    {
288        LSQ_DEBUG("calling on_new_stream");
289        SM_HISTORY_APPEND(stream, SHE_ONNEW);
290        stream->stream_flags |= STREAM_ONNEW_DONE;
291        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
292                                                          stream);
293    }
294}
295
296
297static void
298decr_conn_cap (struct lsquic_stream *stream, size_t incr)
299{
300    if (stream->stream_flags & STREAM_CONN_LIMITED)
301    {
302        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
303        stream->conn_pub->conn_cap.cc_sent -= incr;
304    }
305}
306
307
308static void
309drop_buffered_data (struct lsquic_stream *stream)
310{
311    decr_conn_cap(stream, stream->sm_n_buffered);
312    stream->sm_n_buffered = 0;
313    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
314        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
315}
316
317
318static void
319destroy_uh (struct lsquic_stream *stream)
320{
321    if (stream->uh)
322    {
323        if (stream->uh->uh_hset)
324            stream->conn_pub->enpub->enp_hsi_if
325                            ->hsi_discard_header_set(stream->uh->uh_hset);
326        free(stream->uh);
327        stream->uh = NULL;
328    }
329}
330
331
332void
333lsquic_stream_destroy (lsquic_stream_t *stream)
334{
335    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
336    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
337                                                            STREAM_ONNEW_DONE)
338    {
339        stream->stream_flags |= STREAM_ONCLOSE_DONE;
340        stream->stream_if->on_close(stream, stream->st_ctx);
341    }
342    if (stream->stream_flags & STREAM_SENDING_FLAGS)
343        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
344    if (stream->stream_flags & STREAM_WANT_READ)
345        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
346    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
347        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
348    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
349        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
350    drop_buffered_data(stream);
351    lsquic_sfcw_consume_rem(&stream->fc);
352    drop_frames_in(stream);
353    if (stream->push_req)
354    {
355        if (stream->push_req->uh_hset)
356            stream->conn_pub->enpub->enp_hsi_if
357                            ->hsi_discard_header_set(stream->push_req->uh_hset);
358        free(stream->push_req);
359    }
360    destroy_uh(stream);
361    free(stream->sm_buf);
362    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
363    SM_HISTORY_DUMP_REMAINING(stream);
364    free(stream);
365}
366
367
368static int
369stream_is_finished (const lsquic_stream_t *stream)
370{
371    return lsquic_stream_is_closed(stream)
372           /* n_unacked checks that no outgoing packets that reference this
373            * stream are outstanding:
374            */
375        && 0 == stream->n_unacked
376           /* This checks that no packets that reference this stream will
377            * become outstanding:
378            */
379        && 0 == (stream->stream_flags & STREAM_SEND_RST)
380        && ((stream->stream_flags & STREAM_FORCE_FINISH)
381          || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
382}
383
384
385static void
386maybe_finish_stream (lsquic_stream_t *stream)
387{
388    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
389                                                    stream_is_finished(stream))
390    {
391        LSQ_DEBUG("stream %u is now finished", stream->id);
392        SM_HISTORY_APPEND(stream, SHE_FINISHED);
393        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
394            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
395                                                    next_service_stream);
396        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
397    }
398}
399
400
401static void
402maybe_schedule_call_on_close (lsquic_stream_t *stream)
403{
404    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
405                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
406            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
407    {
408        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
409            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
410                                                    next_service_stream);
411        stream->stream_flags |= STREAM_CALL_ONCLOSE;
412        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
413        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
414    }
415}
416
417
418void
419lsquic_stream_call_on_close (lsquic_stream_t *stream)
420{
421    assert(stream->stream_flags & STREAM_ONNEW_DONE);
422    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
423    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
424        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
425                                                    next_service_stream);
426    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
427    {
428        LSQ_DEBUG("calling on_close for stream %u", stream->id);
429        stream->stream_flags |= STREAM_ONCLOSE_DONE;
430        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
431        stream->stream_if->on_close(stream, stream->st_ctx);
432    }
433    else
434        assert(0);
435}
436
437
438int
439lsquic_stream_readable (const lsquic_stream_t *stream)
440{
441    /* A stream is readable if one of the following is true: */
442    return
443        /* - It is already finished: in that case, lsquic_stream_read() will
444         *   return 0.
445         */
446            (stream->stream_flags & STREAM_FIN_REACHED)
447        /* - The stream is reset, by either side.  In this case,
448         *   lsquic_stream_read() will return -1 (we want the user to be
449         *   able to collect the error).
450         */
451        ||  (stream->stream_flags & STREAM_RST_FLAGS)
452        /* - Either we are not in HTTP mode or the HTTP headers have been
453         *   received and the headers or data from the stream can be read.
454         */
455        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
456                                                        == STREAM_USE_HEADERS)
457            && (stream->uh != NULL
458                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
459                                                        stream->read_offset)))
460    ;
461}
462
463
464size_t
465lsquic_stream_write_avail (const struct lsquic_stream *stream)
466{
467    uint64_t stream_avail, conn_avail;
468
469    stream_avail = stream->max_send_off - stream->tosend_off
470                                                - stream->sm_n_buffered;
471    if (stream->stream_flags & STREAM_CONN_LIMITED)
472    {
473        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
474        if (conn_avail < stream_avail)
475            return conn_avail;
476    }
477
478    return stream_avail;
479}
480
481
482int
483lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
484{
485    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
486                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
487    {
488        return -1;
489    }
490    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
491    {
492        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
493            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
494                                                    next_send_stream);
495        stream->stream_flags |= STREAM_SEND_WUF;
496    }
497    return 0;
498}
499
500
501int
502lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
503{
504    uint64_t max_off;
505    int got_next_offset;
506    enum ins_frame ins_frame;
507
508    assert(frame->packet_in);
509
510    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
511    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
512        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
513
514    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
515                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
516    {
517        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
518        lsquic_malo_put(frame);
519        return -1;
520    }
521
522    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
523  insert_frame:
524    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
525    if (INS_FRAME_OK == ins_frame)
526    {
527        /* Update maximum offset in the flow controller and check for flow
528         * control violation:
529         */
530        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
531        if (0 != lsquic_stream_update_sfcw(stream, max_off))
532            return -1;
533        if (frame->data_frame.df_fin)
534        {
535            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
536            stream->stream_flags |= STREAM_FIN_RECVD;
537            maybe_finish_stream(stream);
538        }
539        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
540                (stream->data_in->di_flags & DI_SWITCH_IMPL))
541        {
542            stream->data_in = stream->data_in->di_if->di_switch_impl(
543                                        stream->data_in, stream->read_offset);
544            if (!stream->data_in)
545            {
546                stream->data_in = data_in_error_new();
547                return -1;
548            }
549        }
550        if (got_next_offset)
551            /* Checking the offset saves di_get_frame() call */
552            maybe_conn_to_tickable_if_readable(stream);
553        return 0;
554    }
555    else if (INS_FRAME_DUP == ins_frame)
556    {
557        return 0;
558    }
559    else if (INS_FRAME_OVERLAP == ins_frame)
560    {
561        LSQ_DEBUG("overlap: switching DATA IN implementation");
562        stream->data_in = stream->data_in->di_if->di_switch_impl(
563                                    stream->data_in, stream->read_offset);
564        if (stream->data_in)
565            goto insert_frame;
566        stream->data_in = data_in_error_new();
567        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
568        lsquic_malo_put(frame);
569        return -1;
570    }
571    else
572    {
573        assert(INS_FRAME_ERR == ins_frame);
574        return -1;
575    }
576}
577
578
579static void
580drop_frames_in (lsquic_stream_t *stream)
581{
582    if (stream->data_in)
583    {
584        stream->data_in->di_if->di_destroy(stream->data_in);
585        /* To avoid checking whether `data_in` is set, just set to the error
586         * data-in stream.  It does the right thing after incoming data is
587         * dropped.
588         */
589        stream->data_in = data_in_error_new();
590    }
591}
592
593
594static void
595maybe_elide_stream_frames (struct lsquic_stream *stream)
596{
597    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
598    {
599        if (stream->n_unacked)
600            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
601                                                stream->id);
602        stream->stream_flags |= STREAM_FRAMES_ELIDED;
603    }
604}
605
606
607int
608lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
609                      uint32_t error_code)
610{
611
612    if (stream->stream_flags & STREAM_RST_RECVD)
613    {
614        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
615        return 0;
616    }
617
618    SM_HISTORY_APPEND(stream, SHE_RST_IN);
619    /* This flag must always be set, even if we are "ignoring" it: it is
620     * used by elision code.
621     */
622    stream->stream_flags |= STREAM_RST_RECVD;
623
624    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
625    {
626        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
627            "smaller than that of byte following the last byte we have seen: "
628            "0x%"PRIX64, stream->id, offset,
629            lsquic_sfcw_get_max_recv_off(&stream->fc));
630        return -1;
631    }
632
633    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
634    {
635        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
636            " violates flow control", stream->id, offset);
637        return -1;
638    }
639
640    /* Let user collect error: */
641    maybe_conn_to_tickable_if_readable(stream);
642
643    lsquic_sfcw_consume_rem(&stream->fc);
644    drop_frames_in(stream);
645    drop_buffered_data(stream);
646    maybe_elide_stream_frames(stream);
647
648    if (!(stream->stream_flags &
649                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
650        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
651
652    stream->stream_flags |= STREAM_RST_RECVD;
653
654    maybe_finish_stream(stream);
655    maybe_schedule_call_on_close(stream);
656
657    return 0;
658}
659
660
661uint64_t
662lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
663{
664    assert(stream->stream_flags & STREAM_SEND_WUF);
665    stream->stream_flags &= ~STREAM_SEND_WUF;
666    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
667        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
668    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
669}
670
671
672void
673lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
674{
675    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
676    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
677    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
678    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
679        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
680}
681
682
683void
684lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
685{
686    assert(stream->stream_flags & STREAM_SEND_RST);
687    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
688    stream->stream_flags &= ~STREAM_SEND_RST;
689    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
690        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
691    stream->stream_flags |= STREAM_RST_SENT;
692    maybe_finish_stream(stream);
693}
694
695
696static size_t
697read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
698{
699    struct http1x_headers *h1h = stream->uh->uh_hset;
700    size_t n_avail = h1h->h1h_size - h1h->h1h_off;
701    if (n_avail < len)
702        len = n_avail;
703    memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len);
704    h1h->h1h_off += len;
705    if (h1h->h1h_off == h1h->h1h_size)
706    {
707        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
708        destroy_uh(stream);
709        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
710        {
711            stream->stream_flags |= STREAM_FIN_REACHED;
712            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
713        }
714    }
715    return len;
716}
717
718
719/* This function returns 0 when EOF is reached.
720 */
721ssize_t
722lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
723                     int iovcnt)
724{
725    size_t total_nread, nread;
726    int processed_frames, read_unc_headers, iovidx;
727    unsigned char *p, *end;
728
729    SM_HISTORY_APPEND(stream, SHE_USER_READ);
730
731#define NEXT_IOV() do {                                             \
732    ++iovidx;                                                       \
733    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
734        ++iovidx;                                                   \
735    if (iovidx < iovcnt)                                            \
736    {                                                               \
737        p = iov[iovidx].iov_base;                                   \
738        end = p + iov[iovidx].iov_len;                              \
739    }                                                               \
740    else                                                            \
741        p = end = NULL;                                             \
742} while (0)
743
744#define AVAIL() (end - p)
745
746    if (stream->stream_flags & STREAM_RST_FLAGS)
747    {
748        errno = ECONNRESET;
749        return -1;
750    }
751    if (stream->stream_flags & STREAM_U_READ_DONE)
752    {
753        errno = EBADF;
754        return -1;
755    }
756    if (stream->stream_flags & STREAM_FIN_REACHED)
757        return 0;
758
759    total_nread = 0;
760    processed_frames = 0;
761
762    iovidx = -1;
763    NEXT_IOV();
764
765    if (stream->uh)
766    {
767        if (stream->uh->uh_flags & UH_H1H)
768        {
769            if (AVAIL())
770            {
771                read_unc_headers = 1;
772                do
773                {
774                    nread = read_uh(stream, p, AVAIL());
775                    p += nread;
776                    total_nread += nread;
777                    if (p == end)
778                        NEXT_IOV();
779                }
780                while (stream->uh && AVAIL());
781            }
782            else
783                read_unc_headers = 0;
784        }
785        else
786        {
787            LSQ_INFO("header set not claimed: cannot read from stream");
788            return -1;
789        }
790    }
791    else
792        read_unc_headers = 0;
793
794    struct data_frame *data_frame;
795    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
796    {
797        ++processed_frames;
798        size_t navail = data_frame->df_size - data_frame->df_read_off;
799        size_t ntowrite = AVAIL();
800        if (navail < ntowrite)
801            ntowrite = navail;
802        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
803        p += ntowrite;
804        data_frame->df_read_off += ntowrite;
805        stream->read_offset += ntowrite;
806        total_nread += ntowrite;
807        if (data_frame->df_read_off == data_frame->df_size)
808        {
809            const int fin = data_frame->df_fin;
810            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
811            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
812                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
813            {
814                stream->data_in = stream->data_in->di_if->di_switch_impl(
815                                            stream->data_in, stream->read_offset);
816                if (!stream->data_in)
817                {
818                    stream->data_in = data_in_error_new();
819                    return -1;
820                }
821            }
822            if (fin)
823            {
824                stream->stream_flags |= STREAM_FIN_REACHED;
825                break;
826            }
827        }
828        if (p == end)
829            NEXT_IOV();
830    }
831
832    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
833                                        total_nread, stream->read_offset);
834
835    if (processed_frames)
836    {
837        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
838        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
839        {
840            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
841                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
842            stream->stream_flags |= STREAM_SEND_WUF;
843            maybe_conn_to_tickable_if_writeable(stream, 1);
844        }
845    }
846
847    if (processed_frames || read_unc_headers)
848    {
849        return total_nread;
850    }
851    else
852    {
853        assert(0 == total_nread);
854        errno = EWOULDBLOCK;
855        return -1;
856    }
857}
858
859
860ssize_t
861lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
862{
863    struct iovec iov = { .iov_base = buf, .iov_len = len, };
864    return lsquic_stream_readv(stream, &iov, 1);
865}
866
867
868static void
869stream_shutdown_read (lsquic_stream_t *stream)
870{
871    if (!(stream->stream_flags & STREAM_U_READ_DONE))
872    {
873        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
874        stream->stream_flags |= STREAM_U_READ_DONE;
875        stream_wantread(stream, 0);
876        maybe_finish_stream(stream);
877    }
878}
879
880
881static void
882stream_shutdown_write (lsquic_stream_t *stream)
883{
884    if (stream->stream_flags & STREAM_U_WRITE_DONE)
885        return;
886
887    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
888    stream->stream_flags |= STREAM_U_WRITE_DONE;
889    stream_wantwrite(stream, 0);
890
891    /* Don't bother to check whether there is anything else to write if
892     * the flags indicate that nothing else should be written.
893     */
894    if (!(stream->stream_flags &
895                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
896    {
897        if (stream->sm_n_buffered == 0)
898        {
899            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
900                                                 stream))
901            {
902                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
903                stream->stream_flags |= STREAM_FIN_SENT;
904            }
905            else
906            {
907                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
908                          "flag in it");
909                (void) stream_flush_nocheck(stream);
910            }
911        }
912        else
913            (void) stream_flush_nocheck(stream);
914    }
915}
916
917
918int
919lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
920{
921    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
922    if (lsquic_stream_is_closed(stream))
923    {
924        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
925        errno = EBADF;
926        return -1;
927    }
928    /* 0: read, 1: write: 2: read and write
929     */
930    if (how < 0 || how > 2)
931    {
932        errno = EINVAL;
933        return -1;
934    }
935
936    if (how)
937        stream_shutdown_write(stream);
938    if (how != 1)
939        stream_shutdown_read(stream);
940
941    maybe_finish_stream(stream);
942    maybe_schedule_call_on_close(stream);
943    if (how)
944        maybe_conn_to_tickable_if_writeable(stream, 1);
945
946    return 0;
947}
948
949
950void
951lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
952{
953    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
954    if (LSQUIC_STREAM_HANDSHAKE == stream->id
955        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
956                                LSQUIC_STREAM_HEADERS == stream->id))
957    {
958        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
959        stream->stream_flags |= STREAM_FORCE_FINISH;
960        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
961    }
962    maybe_finish_stream(stream);
963    maybe_schedule_call_on_close(stream);
964}
965
966
967static void
968fake_reset_unused_stream (lsquic_stream_t *stream)
969{
970    stream->stream_flags |=
971        STREAM_RST_RECVD    /* User will pick this up on read or write */
972      | STREAM_RST_SENT     /* Don't send anything else on this stream */
973    ;
974
975    /* Cancel all writes to the network scheduled for this stream: */
976    if (stream->stream_flags & STREAM_SENDING_FLAGS)
977        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
978                                                next_send_stream);
979    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
980
981    LSQ_DEBUG("fake-reset stream %u%s",
982                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
983    maybe_finish_stream(stream);
984    maybe_schedule_call_on_close(stream);
985}
986
987
988/* This function should only be called for locally-initiated streams whose ID
989 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
990 * frame sent by peer but we have not yet received it and created a stream.
991 * In this situation, we mark the stream as reset, so that user's on_read or
992 * on_write event callback picks up the error.  That, in turn, should result
993 * in stream being closed.
994 *
995 * If we have received any data frames on this stream, this probably indicates
996 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
997 * lower than this.  However, we still try to handle it gracefully and peform
998 * a shutdown, as if the stream was not reset.
999 */
1000void
1001lsquic_stream_received_goaway (lsquic_stream_t *stream)
1002{
1003    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1004    if (0 == stream->read_offset &&
1005                            stream->data_in->di_if->di_empty(stream->data_in))
1006        fake_reset_unused_stream(stream);       /* Normal condition */
1007    else
1008    {   /* This is odd, let's handle it the best we can: */
1009        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1010        lsquic_stream_shutdown_internal(stream);
1011    }
1012}
1013
1014
1015uint64_t
1016lsquic_stream_read_offset (const lsquic_stream_t *stream)
1017{
1018    return stream->read_offset;
1019}
1020
1021
1022static int
1023stream_wantread (lsquic_stream_t *stream, int is_want)
1024{
1025    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
1026    const int new_val = !!is_want;
1027    if (old_val != new_val)
1028    {
1029        if (new_val)
1030        {
1031            if (!old_val)
1032                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1033                                                            next_read_stream);
1034            stream->stream_flags |= STREAM_WANT_READ;
1035        }
1036        else
1037        {
1038            stream->stream_flags &= ~STREAM_WANT_READ;
1039            if (old_val)
1040                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1041                                                            next_read_stream);
1042        }
1043    }
1044    return old_val;
1045}
1046
1047
1048static void
1049maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1050{
1051    assert(STREAM_WRITE_Q_FLAGS & flag);
1052    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1053        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1054                                                        next_write_stream);
1055    stream->stream_flags |= flag;
1056}
1057
1058
1059static void
1060maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1061{
1062    assert(STREAM_WRITE_Q_FLAGS & flag);
1063    if (stream->stream_flags & flag)
1064    {
1065        stream->stream_flags &= ~flag;
1066        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1067            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1068                                                        next_write_stream);
1069    }
1070}
1071
1072
1073static int
1074stream_wantwrite (lsquic_stream_t *stream, int is_want)
1075{
1076    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1077    const int new_val = !!is_want;
1078    if (old_val != new_val)
1079    {
1080        if (new_val)
1081            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1082        else
1083            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1084    }
1085    return old_val;
1086}
1087
1088
1089int
1090lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1091{
1092    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1093    {
1094        if (is_want)
1095            maybe_conn_to_tickable_if_readable(stream);
1096        return stream_wantread(stream, is_want);
1097    }
1098    else
1099    {
1100        errno = EBADF;
1101        return -1;
1102    }
1103}
1104
1105
1106int
1107lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1108{
1109    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1110    {
1111        if (is_want)
1112            maybe_conn_to_tickable_if_writeable(stream, 1);
1113        return stream_wantwrite(stream, is_want);
1114    }
1115    else
1116    {
1117        errno = EBADF;
1118        return -1;
1119    }
1120}
1121
1122
1123#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1124    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1125
1126
1127static void
1128stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1129{
1130    unsigned no_progress_count, no_progress_limit;
1131    enum stream_flags flags;
1132    uint64_t size;
1133
1134    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1135
1136    no_progress_count = 0;
1137    while ((stream->stream_flags & STREAM_WANT_READ)
1138                                            && lsquic_stream_readable(stream))
1139    {
1140        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1141        size  = stream->read_offset;
1142
1143        stream->stream_if->on_read(stream, stream->st_ctx);
1144
1145        if (no_progress_limit && size == stream->read_offset &&
1146                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1147        {
1148            ++no_progress_count;
1149            if (no_progress_count >= no_progress_limit)
1150            {
1151                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1152                    "progress) in user code reading from stream",
1153                    no_progress_count,
1154                    no_progress_count == 1 ? "" : "s");
1155                break;
1156            }
1157        }
1158        else
1159            no_progress_count = 0;
1160    }
1161}
1162
1163
1164static void
1165stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1166{
1167    unsigned no_progress_count, no_progress_limit;
1168    enum stream_flags flags;
1169
1170    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1171
1172    no_progress_count = 0;
1173    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1174    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1175                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1176           && lsquic_stream_write_avail(stream))
1177    {
1178        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1179
1180        stream->stream_if->on_write(stream, stream->st_ctx);
1181
1182        if (no_progress_limit &&
1183            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1184        {
1185            ++no_progress_count;
1186            if (no_progress_count >= no_progress_limit)
1187            {
1188                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1189                    "progress) in user code writing to stream",
1190                    no_progress_count,
1191                    no_progress_count == 1 ? "" : "s");
1192                break;
1193            }
1194        }
1195        else
1196            no_progress_count = 0;
1197    }
1198}
1199
1200
1201static void
1202stream_dispatch_read_events_once (lsquic_stream_t *stream)
1203{
1204    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1205    {
1206        stream->stream_if->on_read(stream, stream->st_ctx);
1207    }
1208}
1209
1210
1211static void
1212maybe_mark_as_blocked (lsquic_stream_t *stream)
1213{
1214    struct lsquic_conn_cap *cc;
1215
1216    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1217    {
1218        if (stream->blocked_off < stream->max_send_off)
1219        {
1220            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1221            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1222                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1223                                                            next_send_stream);
1224            stream->stream_flags |= STREAM_SEND_BLOCKED;
1225            LSQ_DEBUG("marked stream-blocked at stream offset "
1226                                            "%"PRIu64, stream->blocked_off);
1227        }
1228        else
1229            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1230                " has been, or is about to be, sent", stream->blocked_off);
1231    }
1232
1233    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1234        && (cc = &stream->conn_pub->conn_cap,
1235                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1236    {
1237        if (cc->cc_blocked < cc->cc_max)
1238        {
1239            cc->cc_blocked = cc->cc_max;
1240            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1241            LSQ_DEBUG("marked connection-blocked at connection offset "
1242                                                    "%"PRIu64, cc->cc_max);
1243        }
1244        else
1245            LSQ_DEBUG("stream has already been marked connection-blocked "
1246                "at offset %"PRIu64, cc->cc_blocked);
1247    }
1248}
1249
1250
1251void
1252lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1253{
1254    assert(stream->stream_flags & STREAM_WANT_READ);
1255
1256    if (stream->stream_flags & STREAM_RW_ONCE)
1257        stream_dispatch_read_events_once(stream);
1258    else
1259        stream_dispatch_read_events_loop(stream);
1260}
1261
1262
1263void
1264lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1265{
1266    int progress;
1267    uint64_t tosend_off;
1268    unsigned short n_buffered;
1269    enum stream_flags flags;
1270
1271    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1272    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1273    tosend_off = stream->tosend_off;
1274    n_buffered = stream->sm_n_buffered;
1275
1276    if (stream->stream_flags & STREAM_WANT_FLUSH)
1277        (void) stream_flush(stream);
1278
1279    if (stream->stream_flags & STREAM_RW_ONCE)
1280    {
1281        if ((stream->stream_flags & STREAM_WANT_WRITE)
1282            && lsquic_stream_write_avail(stream))
1283        {
1284            stream->stream_if->on_write(stream, stream->st_ctx);
1285        }
1286    }
1287    else
1288        stream_dispatch_write_events_loop(stream);
1289
1290    /* Progress means either flags or offsets changed: */
1291    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1292                        stream->tosend_off == tosend_off &&
1293                            stream->sm_n_buffered == n_buffered);
1294
1295    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1296    {
1297        if (progress)
1298        {   /* Move the stream to the end of the list to ensure fairness. */
1299            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1300                                                            next_write_stream);
1301            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1302                                                            next_write_stream);
1303        }
1304    }
1305}
1306
1307
1308static size_t
1309inner_reader_empty_size (void *ctx)
1310{
1311    return 0;
1312}
1313
1314
1315static size_t
1316inner_reader_empty_read (void *ctx, void *buf, size_t count)
1317{
1318    return 0;
1319}
1320
1321
1322static int
1323stream_flush (lsquic_stream_t *stream)
1324{
1325    struct lsquic_reader empty_reader;
1326    ssize_t nw;
1327
1328    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1329    assert(stream->sm_n_buffered > 0 ||
1330        /* Flushing is also used to packetize standalone FIN: */
1331        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1332                                                    == STREAM_U_WRITE_DONE));
1333
1334    empty_reader.lsqr_size = inner_reader_empty_size;
1335    empty_reader.lsqr_read = inner_reader_empty_read;
1336    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1337    nw = stream_write_to_packets(stream, &empty_reader, 0);
1338
1339    if (nw >= 0)
1340    {
1341        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1342        return 0;
1343    }
1344    else
1345        return -1;
1346}
1347
1348
1349static int
1350stream_flush_nocheck (lsquic_stream_t *stream)
1351{
1352    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1353    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1354    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1355
1356    return stream_flush(stream);
1357}
1358
1359
1360int
1361lsquic_stream_flush (lsquic_stream_t *stream)
1362{
1363    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1364    {
1365        LSQ_DEBUG("cannot flush closed stream");
1366        errno = EBADF;
1367        return -1;
1368    }
1369
1370    if (0 == stream->sm_n_buffered)
1371    {
1372        LSQ_DEBUG("flushing 0 bytes: noop");
1373        return 0;
1374    }
1375
1376    return stream_flush_nocheck(stream);
1377}
1378
1379
1380/* The flush threshold is the maximum size of stream data that can be sent
1381 * in a full packet.
1382 */
1383#ifdef NDEBUG
1384static
1385#endif
1386       size_t
1387lsquic_stream_flush_threshold (const struct lsquic_stream *stream)
1388{
1389    enum packet_out_flags flags;
1390    enum packno_bits bits;
1391    unsigned packet_header_sz, stream_header_sz;
1392    size_t threshold;
1393
1394    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1395    flags = bits << POBIT_SHIFT;
1396    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1397        flags |= PO_CONN_ID;
1398    if (LSQUIC_STREAM_HANDSHAKE == stream->id)
1399        flags |= PO_LONGHEAD;
1400
1401    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags);
1402    stream_header_sz = stream->conn_pub->lconn->cn_pf
1403            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1404
1405    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1406              - packet_header_sz - stream_header_sz;
1407    return threshold;
1408}
1409
1410
1411#define COMMON_WRITE_CHECKS() do {                                          \
1412    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1413                                                   == STREAM_USE_HEADERS)   \
1414    {                                                                       \
1415        LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \
1416        errno = EILSEQ;                                                     \
1417        return -1;                                                          \
1418    }                                                                       \
1419    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1420    {                                                                       \
1421        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1422        errno = ECONNRESET;                                                 \
1423        return -1;                                                          \
1424    }                                                                       \
1425    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1426    {                                                                       \
1427        LSQ_INFO("Attempt to write to stream after it was closed for "      \
1428                                                                "writing"); \
1429        errno = EBADF;                                                      \
1430        return -1;                                                          \
1431    }                                                                       \
1432} while (0)
1433
1434
1435struct frame_gen_ctx
1436{
1437    lsquic_stream_t      *fgc_stream;
1438    struct lsquic_reader *fgc_reader;
1439    /* We keep our own count of how many bytes were read from reader because
1440     * some readers are external.  The external caller does not have to rely
1441     * on our count, but it can.
1442     */
1443    size_t                fgc_nread_from_reader;
1444};
1445
1446
1447static size_t
1448frame_gen_size (void *ctx)
1449{
1450    struct frame_gen_ctx *fg_ctx = ctx;
1451    size_t available, remaining;
1452
1453    /* Make sure we are not writing past available size: */
1454    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1455    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1456    if (available < remaining)
1457        remaining = available;
1458
1459    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1460}
1461
1462
1463static int
1464frame_gen_fin (void *ctx)
1465{
1466    struct frame_gen_ctx *fg_ctx = ctx;
1467    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1468        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1469        /* Do not use frame_gen_size() as it may chop the real size: */
1470        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1471}
1472
1473
1474static void
1475incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1476{
1477    if (stream->stream_flags & STREAM_CONN_LIMITED)
1478    {
1479        stream->conn_pub->conn_cap.cc_sent += incr;
1480        assert(stream->conn_pub->conn_cap.cc_sent
1481                                    <= stream->conn_pub->conn_cap.cc_max);
1482    }
1483}
1484
1485
1486static size_t
1487frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1488{
1489    struct frame_gen_ctx *fg_ctx = ctx;
1490    unsigned char *p = begin_buf;
1491    unsigned char *const end = p + len;
1492    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1493    size_t n_written, available, n_to_write;
1494
1495    if (stream->sm_n_buffered > 0)
1496    {
1497        if (len <= stream->sm_n_buffered)
1498        {
1499            memcpy(p, stream->sm_buf, len);
1500            memmove(stream->sm_buf, stream->sm_buf + len,
1501                                                stream->sm_n_buffered - len);
1502            stream->sm_n_buffered -= len;
1503            stream->tosend_off += len;
1504            *fin = frame_gen_fin(fg_ctx);
1505            return len;
1506        }
1507        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1508        p += stream->sm_n_buffered;
1509        stream->sm_n_buffered = 0;
1510    }
1511
1512    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1513    n_to_write = end - p;
1514    if (n_to_write > available)
1515        n_to_write = available;
1516    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1517                                              n_to_write);
1518    p += n_written;
1519    fg_ctx->fgc_nread_from_reader += n_written;
1520    *fin = frame_gen_fin(fg_ctx);
1521    stream->tosend_off += p - (const unsigned char *) begin_buf;
1522    incr_conn_cap(stream, n_written);
1523    return p - (const unsigned char *) begin_buf;
1524}
1525
1526
1527static void
1528check_flush_threshold (lsquic_stream_t *stream)
1529{
1530    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1531                            stream->tosend_off >= stream->sm_flush_to)
1532    {
1533        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1534                                                    stream->sm_flush_to);
1535        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1536    }
1537}
1538
1539
1540static struct lsquic_packet_out *
1541get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1542                      const struct lsquic_stream *stream)
1543{
1544    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1545}
1546
1547
1548static struct lsquic_packet_out * (* const get_packet[])(
1549    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1550{
1551    lsquic_send_ctl_get_packet_for_stream,
1552    get_brand_new_packet,
1553};
1554
1555
1556static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1557stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1558{
1559    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1560    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1561    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1562    unsigned stream_header_sz, need_at_least, off;
1563    lsquic_packet_out_t *packet_out;
1564    int len, s, hsk;
1565
1566    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
1567                                                        == STREAM_HEADERS_SENT
1568        && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream))
1569    {
1570        struct lsquic_stream *const headers_stream
1571                = lsquic_headers_stream_get_stream(stream->conn_pub->hs);
1572        if (lsquic_stream_has_data_to_flush(headers_stream))
1573        {
1574            LSQ_DEBUG("flushing headers stream before potential write to a "
1575                                                            "buffered packet");
1576            (void) lsquic_stream_flush(headers_stream);
1577        }
1578        else
1579            /* Some other stream must have flushed it: this means our headers
1580             * are flushed.
1581             */
1582            stream->stream_flags |= STREAM_HDRS_FLUSHED;
1583    }
1584
1585    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1586                                                        stream->tosend_off);
1587    need_at_least = stream_header_sz + (size > 0);
1588    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1589  get_packet:
1590    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1591    if (!packet_out)
1592        return SWTP_STOP;
1593    if (hsk)
1594        packet_out->po_header_type = stream->tosend_off == 0
1595                                            ? HETY_INITIAL : HETY_HANDSHAKE;
1596
1597#if LSQUIC_CONN_STATS
1598    const uint64_t begin_off = stream->tosend_off;
1599#endif
1600    off = packet_out->po_data_sz;
1601    len = pf->pf_gen_stream_frame(
1602                packet_out->po_data + packet_out->po_data_sz,
1603                lsquic_packet_out_avail(packet_out), stream->id,
1604                stream->tosend_off,
1605                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1606    if (len < 0)
1607    {
1608        if (-len > (int) need_at_least)
1609        {
1610            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
1611                "%u bytes, will try again", -len, need_at_least);
1612            need_at_least = -len;
1613            goto get_packet;
1614        }
1615        else
1616        {
1617            LSQ_ERROR("could not generate stream frame");
1618            return SWTP_ERROR;
1619        }
1620    }
1621
1622#if LSQUIC_CONN_STATS
1623    stream->conn_pub->conn_stats->out.stream_frames += 1;
1624    stream->conn_pub->conn_stats->out.stream_data_sz
1625                                            += stream->tosend_off - begin_off;
1626#endif
1627    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1628                            packet_out->po_data + packet_out->po_data_sz, len);
1629    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1630    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1631    if (0 == lsquic_packet_out_avail(packet_out))
1632        packet_out->po_flags |= PO_STREAM_END;
1633    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1634                                     stream, QUIC_FRAME_STREAM, off, len);
1635    if (s != 0)
1636    {
1637        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1638        return SWTP_ERROR;
1639    }
1640
1641    check_flush_threshold(stream);
1642
1643    /* XXX: I don't like it that this is here */
1644    if (hsk && !(packet_out->po_flags & PO_HELLO))
1645    {
1646        lsquic_packet_out_zero_pad(packet_out);
1647        packet_out->po_flags |= PO_HELLO;
1648        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1649    }
1650
1651    return SWTP_OK;
1652}
1653
1654
1655static void
1656abort_connection (struct lsquic_stream *stream)
1657{
1658    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1659        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1660                                                next_service_stream);
1661    stream->stream_flags |= STREAM_ABORT_CONN;
1662    LSQ_WARN("connection will be aborted");
1663    maybe_conn_to_tickable(stream);
1664}
1665
1666
1667static ssize_t
1668stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1669                         size_t thresh)
1670{
1671    size_t size;
1672    ssize_t nw;
1673    unsigned seen_ok;
1674    struct frame_gen_ctx fg_ctx = {
1675        .fgc_stream = stream,
1676        .fgc_reader = reader,
1677        .fgc_nread_from_reader = 0,
1678    };
1679
1680    seen_ok = 0;
1681    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1682           || frame_gen_fin(&fg_ctx))
1683    {
1684        switch (stream_write_to_packet(&fg_ctx, size))
1685        {
1686        case SWTP_OK:
1687            if (!seen_ok++)
1688                maybe_conn_to_tickable_if_writeable(stream, 0);
1689            if (frame_gen_fin(&fg_ctx))
1690            {
1691                stream->stream_flags |= STREAM_FIN_SENT;
1692                goto end;
1693            }
1694            else
1695                break;
1696        case SWTP_STOP:
1697            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1698            goto end;
1699        default:
1700            abort_connection(stream);
1701            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1702            return -1;
1703        }
1704    }
1705
1706    if (thresh)
1707    {
1708        assert(size < thresh);
1709        assert(size >= stream->sm_n_buffered);
1710        size -= stream->sm_n_buffered;
1711        if (size > 0)
1712        {
1713            nw = save_to_buffer(stream, reader, size);
1714            if (nw < 0)
1715                return -1;
1716            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1717        }
1718    }
1719    else
1720    {
1721        /* We count flushed data towards both stream and connection limits,
1722         * so we should have been able to packetize all of it:
1723         */
1724        assert(0 == stream->sm_n_buffered);
1725        assert(size == 0);
1726    }
1727
1728    maybe_mark_as_blocked(stream);
1729
1730  end:
1731    return fg_ctx.fgc_nread_from_reader;
1732}
1733
1734
1735/* Perform an implicit flush when we hit connection limit while buffering
1736 * data.  This is to prevent a (theoretical) stall:
1737 *
1738 * Imagine a number of streams, all of which buffered some data.  The buffered
1739 * data is up to connection cap, which means no further writes are possible.
1740 * None of them flushes, which means that data is not sent and connection
1741 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1742 */
1743static int
1744maybe_flush_stream (struct lsquic_stream *stream)
1745{
1746    if (stream->sm_n_buffered > 0
1747          && (stream->stream_flags & STREAM_CONN_LIMITED)
1748            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1749        return stream_flush_nocheck(stream);
1750    else
1751        return 0;
1752}
1753
1754
1755static ssize_t
1756save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1757                                                                size_t len)
1758{
1759    size_t avail, n_written;
1760
1761    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1762
1763    if (!stream->sm_buf)
1764    {
1765        stream->sm_buf = malloc(SM_BUF_SIZE);
1766        if (!stream->sm_buf)
1767            return -1;
1768    }
1769
1770    avail = lsquic_stream_write_avail(stream);
1771    if (avail < len)
1772        len = avail;
1773
1774    n_written = reader->lsqr_read(reader->lsqr_ctx,
1775                        stream->sm_buf + stream->sm_n_buffered, len);
1776    stream->sm_n_buffered += n_written;
1777    incr_conn_cap(stream, n_written);
1778    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1779              n_written, stream->sm_n_buffered);
1780    if (0 != maybe_flush_stream(stream))
1781        return -1;
1782    return n_written;
1783}
1784
1785
1786static ssize_t
1787stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1788{
1789    size_t thresh, len;
1790
1791    thresh = lsquic_stream_flush_threshold(stream);
1792    len = reader->lsqr_size(reader->lsqr_ctx);
1793    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1794                                    stream->sm_n_buffered + len < thresh)
1795        return save_to_buffer(stream, reader, len);
1796    else
1797        return stream_write_to_packets(stream, reader, thresh);
1798}
1799
1800
1801ssize_t
1802lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1803{
1804    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1805    return lsquic_stream_writev(stream, &iov, 1);
1806}
1807
1808
1809struct inner_reader_iovec {
1810    const struct iovec       *iov;
1811    const struct iovec *end;
1812    unsigned                  cur_iovec_off;
1813};
1814
1815
1816static size_t
1817inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1818{
1819    struct inner_reader_iovec *const iro = ctx;
1820    unsigned char *p = buf;
1821    unsigned char *const end = p + count;
1822    unsigned n_tocopy;
1823
1824    while (iro->iov < iro->end && p < end)
1825    {
1826        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1827        if (n_tocopy > (unsigned) (end - p))
1828            n_tocopy = end - p;
1829        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1830                                                                    n_tocopy);
1831        p += n_tocopy;
1832        iro->cur_iovec_off += n_tocopy;
1833        if (iro->iov->iov_len == iro->cur_iovec_off)
1834        {
1835            ++iro->iov;
1836            iro->cur_iovec_off = 0;
1837        }
1838    }
1839
1840    return p + count - end;
1841}
1842
1843
1844static size_t
1845inner_reader_iovec_size (void *ctx)
1846{
1847    struct inner_reader_iovec *const iro = ctx;
1848    const struct iovec *iov;
1849    size_t size;
1850
1851    size = 0;
1852    for (iov = iro->iov; iov < iro->end; ++iov)
1853        size += iov->iov_len;
1854
1855    return size - iro->cur_iovec_off;
1856}
1857
1858
1859ssize_t
1860lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1861                                                                    int iovcnt)
1862{
1863    COMMON_WRITE_CHECKS();
1864    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1865
1866    struct inner_reader_iovec iro = {
1867        .iov = iov,
1868        .end = iov + iovcnt,
1869        .cur_iovec_off = 0,
1870    };
1871    struct lsquic_reader reader = {
1872        .lsqr_read = inner_reader_iovec_read,
1873        .lsqr_size = inner_reader_iovec_size,
1874        .lsqr_ctx  = &iro,
1875    };
1876
1877    return stream_write(stream, &reader);
1878}
1879
1880
1881ssize_t
1882lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1883{
1884    COMMON_WRITE_CHECKS();
1885    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1886    return stream_write(stream, reader);
1887}
1888
1889
1890int
1891lsquic_stream_send_headers (lsquic_stream_t *stream,
1892                            const lsquic_http_headers_t *headers, int eos)
1893{
1894    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1895                                                     STREAM_U_WRITE_DONE))
1896                == STREAM_USE_HEADERS)
1897    {
1898        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1899                    stream->id, headers, eos, lsquic_stream_priority(stream));
1900        if (0 == s)
1901        {
1902            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1903            stream->stream_flags |= STREAM_HEADERS_SENT;
1904            if (eos)
1905                stream->stream_flags |= STREAM_FIN_SENT;
1906            LSQ_INFO("sent headers for stream %u", stream->id);
1907        }
1908        else
1909            LSQ_WARN("could not send headers: %s", strerror(errno));
1910        return s;
1911    }
1912    else
1913    {
1914        LSQ_INFO("cannot send headers for stream %u in this state", stream->id);
1915        errno = EBADMSG;
1916        return -1;
1917    }
1918}
1919
1920
1921void
1922lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1923{
1924    if (offset > stream->max_send_off)
1925    {
1926        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1927        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1928            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1929        stream->max_send_off = offset;
1930    }
1931    else
1932        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1933            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1934            stream->max_send_off);
1935}
1936
1937
1938/* This function is used to update offsets after handshake completes and we
1939 * learn of peer's limits from the handshake values.
1940 */
1941int
1942lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1943{
1944    LSQ_DEBUG("setting max_send_off to %u", offset);
1945    if (offset > stream->max_send_off)
1946    {
1947        lsquic_stream_window_update(stream, offset);
1948        return 0;
1949    }
1950    else if (offset < stream->tosend_off)
1951    {
1952        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1953            "already sent on this stream (%"PRIu64" bytes)", offset,
1954            stream->tosend_off);
1955        return -1;
1956    }
1957    else
1958    {
1959        stream->max_send_off = offset;
1960        return 0;
1961    }
1962}
1963
1964
1965void
1966lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1967{
1968    lsquic_stream_reset_ext(stream, error_code, 1);
1969}
1970
1971
1972void
1973lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1974                         int do_close)
1975{
1976    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1977    {
1978        LSQ_INFO("reset already sent");
1979        return;
1980    }
1981
1982    SM_HISTORY_APPEND(stream, SHE_RESET);
1983
1984    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1985    stream->error_code = error_code;
1986
1987    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1988        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1989                                                        next_send_stream);
1990    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1991    stream->stream_flags |= STREAM_SEND_RST;
1992
1993    drop_buffered_data(stream);
1994    maybe_elide_stream_frames(stream);
1995    maybe_schedule_call_on_close(stream);
1996
1997    if (do_close)
1998        lsquic_stream_close(stream);
1999    else
2000        maybe_conn_to_tickable_if_writeable(stream, 1);
2001}
2002
2003
2004unsigned
2005lsquic_stream_id (const lsquic_stream_t *stream)
2006{
2007    return stream->id;
2008}
2009
2010
2011struct lsquic_conn *
2012lsquic_stream_conn (const lsquic_stream_t *stream)
2013{
2014    return stream->conn_pub->lconn;
2015}
2016
2017
2018int
2019lsquic_stream_close (lsquic_stream_t *stream)
2020{
2021    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
2022    SM_HISTORY_APPEND(stream, SHE_CLOSE);
2023    if (lsquic_stream_is_closed(stream))
2024    {
2025        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
2026        errno = EBADF;
2027        return -1;
2028    }
2029    stream_shutdown_write(stream);
2030    stream_shutdown_read(stream);
2031    maybe_schedule_call_on_close(stream);
2032    maybe_finish_stream(stream);
2033    maybe_conn_to_tickable_if_writeable(stream, 1);
2034    return 0;
2035}
2036
2037
2038#ifndef NDEBUG
2039#if __GNUC__
2040__attribute__((weak))
2041#endif
2042#endif
2043void
2044lsquic_stream_acked (lsquic_stream_t *stream)
2045{
2046    assert(stream->n_unacked);
2047    --stream->n_unacked;
2048    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
2049    if (0 == stream->n_unacked)
2050        maybe_finish_stream(stream);
2051}
2052
2053
2054void
2055lsquic_stream_push_req (lsquic_stream_t *stream,
2056                        struct uncompressed_headers *push_req)
2057{
2058    assert(!stream->push_req);
2059    stream->push_req = push_req;
2060    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
2061}
2062
2063
2064int
2065lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2066{
2067    return 1 & ~stream->id;
2068}
2069
2070
2071int
2072lsquic_stream_push_info (const lsquic_stream_t *stream,
2073                                        uint32_t *ref_stream_id, void **hset)
2074{
2075    if (lsquic_stream_is_pushed(stream))
2076    {
2077        assert(stream->push_req);
2078        *ref_stream_id = stream->push_req->uh_stream_id;
2079        *hset          = stream->push_req->uh_hset;
2080        return 0;
2081    }
2082    else
2083        return -1;
2084}
2085
2086
2087int
2088lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2089{
2090    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2091    {
2092        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2093        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2094        stream->stream_flags |= STREAM_HAVE_UH;
2095        if (uh->uh_flags & UH_FIN)
2096            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2097        stream->uh = uh;
2098        if (uh->uh_oth_stream_id == 0)
2099        {
2100            if (uh->uh_weight)
2101                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2102        }
2103        else
2104            LSQ_NOTICE("don't know how to depend on stream %u",
2105                                                        uh->uh_oth_stream_id);
2106        return 0;
2107    }
2108    else
2109    {
2110        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2111        return -1;
2112    }
2113}
2114
2115
2116unsigned
2117lsquic_stream_priority (const lsquic_stream_t *stream)
2118{
2119    return 256 - stream->sm_priority;
2120}
2121
2122
2123int
2124lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2125{
2126    /* The user should never get a reference to the special streams,
2127     * but let's check just in case:
2128     */
2129    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2130        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2131                                LSQUIC_STREAM_HEADERS == stream->id))
2132        return -1;
2133    if (priority < 1 || priority > 256)
2134        return -1;
2135    stream->sm_priority = 256 - priority;
2136    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2137    LSQ_DEBUG("set priority to %u", priority);
2138    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2139    return 0;
2140}
2141
2142
2143int
2144lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2145{
2146    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2147    {
2148        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2149                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2150        {
2151            /* We need to send headers only if we are a) using HEADERS stream
2152             * and b) we already sent initial headers.  If initial headers
2153             * have not been sent yet, stream priority will be sent in the
2154             * HEADERS frame.
2155             */
2156            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2157                                                    stream->id, 0, 0, priority);
2158        }
2159        else
2160            return 0;
2161    }
2162    else
2163        return -1;
2164}
2165
2166
2167lsquic_stream_ctx_t *
2168lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2169{
2170    return stream->st_ctx;
2171}
2172
2173
2174int
2175lsquic_stream_refuse_push (lsquic_stream_t *stream)
2176{
2177    if (lsquic_stream_is_pushed(stream) &&
2178                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2179    {
2180        LSQ_DEBUG("refusing pushed stream: send reset");
2181        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2182        return 0;
2183    }
2184    else
2185        return -1;
2186}
2187
2188
2189size_t
2190lsquic_stream_mem_used (const struct lsquic_stream *stream)
2191{
2192    size_t size;
2193
2194    size = sizeof(stream);
2195    if (stream->sm_buf)
2196        size += SM_BUF_SIZE;
2197    if (stream->data_in)
2198        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2199
2200    return size;
2201}
2202
2203
2204lsquic_cid_t
2205lsquic_stream_cid (const struct lsquic_stream *stream)
2206{
2207    return LSQUIC_LOG_CONN_ID;
2208}
2209
2210
2211void *
2212lsquic_stream_get_hset (struct lsquic_stream *stream)
2213{
2214    void *hset;
2215
2216    if (stream->stream_flags & STREAM_RST_FLAGS)
2217    {
2218        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
2219        errno = ECONNRESET;
2220        return NULL;
2221    }
2222
2223    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2224                                        != (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2225    {
2226        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
2227                                                        stream->stream_flags);
2228        return NULL;
2229    }
2230
2231    if (!stream->uh)
2232    {
2233        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
2234        return NULL;
2235    }
2236
2237    if (stream->uh->uh_flags & UH_H1H)
2238    {
2239        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
2240        return NULL;
2241    }
2242
2243    hset = stream->uh->uh_hset;
2244    stream->uh->uh_hset = NULL;
2245    destroy_uh(stream);
2246    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
2247    {
2248        stream->stream_flags |= STREAM_FIN_REACHED;
2249        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
2250    }
2251    LSQ_DEBUG("return header set");
2252    return hset;
2253}
2254