lsquic_stream.c revision 483646eb
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_frame_reader.h"
44#include "lsquic_conn.h"
45#include "lsquic_data_in_if.h"
46#include "lsquic_parse.h"
47#include "lsquic_packet_out.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_senhist.h"
50#include "lsquic_pacer.h"
51#include "lsquic_cubic.h"
52#include "lsquic_send_ctl.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152
153#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
154#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
155        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
156            LSQ_DEBUG("history: [%.*s]",                                    \
157                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
158                (stream)->sm_hist_buf);                                     \
159    } while (0)
160#else
161#   define SM_HISTORY_APPEND(stream, event)
162#   define SM_HISTORY_DUMP_REMAINING(stream)
163#endif
164
165
166static int
167stream_inside_callback (const lsquic_stream_t *stream)
168{
169    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
170}
171
172
173static void
174maybe_conn_to_tickable (lsquic_stream_t *stream)
175{
176    if (!stream_inside_callback(stream))
177        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
178                                           stream->conn_pub->lconn);
179}
180
181
182/* Here, "readable" means that the user is able to read from the stream. */
183static void
184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
185{
186    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
187    {
188        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
189                                           stream->conn_pub->lconn);
190    }
191}
192
193
194/* Here, "writeable" means that data can be put into packets to be
195 * scheduled to be sent out.
196 *
197 * If `check_can_send' is false, it means that we do not need to check
198 * whether packets can be sent.  This check was already performed when
199 * we packetized stream data.
200 */
201static void
202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
203                                                    int check_can_send)
204{
205    if (!stream_inside_callback(stream) &&
206            (!check_can_send
207             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
208          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
209    {
210        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
211                                           stream->conn_pub->lconn);
212    }
213}
214
215
216static int
217stream_stalled (const lsquic_stream_t *stream)
218{
219    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
220           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
221                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
222}
223
224
225/* TODO: The logic to figure out whether the stream is connection limited
226 * should be taken out of the constructor.  The caller should specify this
227 * via one of enum stream_ctor_flags.
228 */
229lsquic_stream_t *
230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
231                       const struct lsquic_stream_if *stream_if,
232                       void *stream_if_ctx, unsigned initial_window,
233                       unsigned initial_send_off,
234                       enum stream_ctor_flags ctor_flags)
235{
236    lsquic_cfcw_t *cfcw;
237    lsquic_stream_t *stream;
238
239    stream = calloc(1, sizeof(*stream));
240    if (!stream)
241        return NULL;
242
243    stream->stream_if = stream_if;
244    stream->id        = id;
245    stream->conn_pub  = conn_pub;
246    stream->sm_onnew_arg = stream_if_ctx;
247    if (!initial_window)
248        initial_window = 16 * 1024;
249    if (LSQUIC_STREAM_HANDSHAKE == id ||
250        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
251        cfcw = NULL;
252    else
253    {
254        cfcw = &conn_pub->cfcw;
255        stream->stream_flags |= STREAM_CONN_LIMITED;
256        if (conn_pub->hs)
257            stream->stream_flags |= STREAM_USE_HEADERS;
258        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
259    }
260    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
261    if (!initial_send_off)
262        initial_send_off = 16 * 1024;
263    stream->max_send_off = initial_send_off;
264    if (ctor_flags & SCF_USE_DI_HASH)
265        stream->data_in = data_in_hash_new(conn_pub, id, 0);
266    else
267        stream->data_in = data_in_nocopy_new(conn_pub, id);
268    LSQ_DEBUG("created stream %u @%p", id, stream);
269    SM_HISTORY_APPEND(stream, SHE_CREATED);
270    if (ctor_flags & SCF_DI_AUTOSWITCH)
271        stream->stream_flags |= STREAM_AUTOSWITCH;
272    if (ctor_flags & SCF_CALL_ON_NEW)
273        lsquic_stream_call_on_new(stream);
274    if (ctor_flags & SCF_DISP_RW_ONCE)
275        stream->stream_flags |= STREAM_RW_ONCE;
276    if (ctor_flags & SCF_ALLOW_OVERLAP)
277        stream->stream_flags |= STREAM_ALLOW_OVERLAP;
278    return stream;
279}
280
281
282void
283lsquic_stream_call_on_new (lsquic_stream_t *stream)
284{
285    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
286    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
287    {
288        LSQ_DEBUG("calling on_new_stream");
289        SM_HISTORY_APPEND(stream, SHE_ONNEW);
290        stream->stream_flags |= STREAM_ONNEW_DONE;
291        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
292                                                          stream);
293    }
294}
295
296
297static void
298decr_conn_cap (struct lsquic_stream *stream, size_t incr)
299{
300    if (stream->stream_flags & STREAM_CONN_LIMITED)
301    {
302        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
303        stream->conn_pub->conn_cap.cc_sent -= incr;
304    }
305}
306
307
308static void
309drop_buffered_data (struct lsquic_stream *stream)
310{
311    decr_conn_cap(stream, stream->sm_n_buffered);
312    stream->sm_n_buffered = 0;
313    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
314        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
315}
316
317
318void
319lsquic_stream_destroy (lsquic_stream_t *stream)
320{
321    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
322    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
323                                                            STREAM_ONNEW_DONE)
324    {
325        stream->stream_flags |= STREAM_ONCLOSE_DONE;
326        stream->stream_if->on_close(stream, stream->st_ctx);
327    }
328    if (stream->stream_flags & STREAM_SENDING_FLAGS)
329        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
330    if (stream->stream_flags & STREAM_WANT_READ)
331        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
332    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
333        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
334    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
335        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
336    drop_buffered_data(stream);
337    lsquic_sfcw_consume_rem(&stream->fc);
338    drop_frames_in(stream);
339    free(stream->push_req);
340    free(stream->uh);
341    free(stream->sm_buf);
342    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
343    SM_HISTORY_DUMP_REMAINING(stream);
344    free(stream);
345}
346
347
348static int
349stream_is_finished (const lsquic_stream_t *stream)
350{
351    return lsquic_stream_is_closed(stream)
352           /* n_unacked checks that no outgoing packets that reference this
353            * stream are outstanding:
354            */
355        && 0 == stream->n_unacked
356           /* This checks that no packets that reference this stream will
357            * become outstanding:
358            */
359        && 0 == (stream->stream_flags & STREAM_SEND_RST)
360        && ((stream->stream_flags & STREAM_FORCE_FINISH)
361          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
362           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
363}
364
365
366static void
367maybe_finish_stream (lsquic_stream_t *stream)
368{
369    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
370                                                    stream_is_finished(stream))
371    {
372        LSQ_DEBUG("stream %u is now finished", stream->id);
373        SM_HISTORY_APPEND(stream, SHE_FINISHED);
374        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
375            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
376                                                    next_service_stream);
377        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
378    }
379}
380
381
382static void
383maybe_schedule_call_on_close (lsquic_stream_t *stream)
384{
385    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
386                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
387            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
388    {
389        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
390            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
391                                                    next_service_stream);
392        stream->stream_flags |= STREAM_CALL_ONCLOSE;
393        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
394        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
395    }
396}
397
398
399void
400lsquic_stream_call_on_close (lsquic_stream_t *stream)
401{
402    assert(stream->stream_flags & STREAM_ONNEW_DONE);
403    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
404    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
405        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
406                                                    next_service_stream);
407    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
408    {
409        LSQ_DEBUG("calling on_close for stream %u", stream->id);
410        stream->stream_flags |= STREAM_ONCLOSE_DONE;
411        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
412        stream->stream_if->on_close(stream, stream->st_ctx);
413    }
414    else
415        assert(0);
416}
417
418
419int
420lsquic_stream_readable (const lsquic_stream_t *stream)
421{
422    /* A stream is readable if one of the following is true: */
423    return
424        /* - It is already finished: in that case, lsquic_stream_read() will
425         *   return 0.
426         */
427            (stream->stream_flags & STREAM_FIN_REACHED)
428        /* - The stream is reset, by either side.  In this case,
429         *   lsquic_stream_read() will return -1 (we want the user to be
430         *   able to collect the error).
431         */
432        ||  (stream->stream_flags & STREAM_RST_FLAGS)
433        /* - Either we are not in HTTP mode or the HTTP headers have been
434         *   received and the headers or data from the stream can be read.
435         */
436        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
437                                                        == STREAM_USE_HEADERS)
438            && (stream->uh != NULL
439                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
440                                                        stream->read_offset)))
441    ;
442}
443
444
445size_t
446lsquic_stream_write_avail (const struct lsquic_stream *stream)
447{
448    uint64_t stream_avail, conn_avail;
449
450    stream_avail = stream->max_send_off - stream->tosend_off
451                                                - stream->sm_n_buffered;
452    if (stream->stream_flags & STREAM_CONN_LIMITED)
453    {
454        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
455        if (conn_avail < stream_avail)
456            return conn_avail;
457    }
458
459    return stream_avail;
460}
461
462
463int
464lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
465{
466    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
467                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
468    {
469        return -1;
470    }
471    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
472    {
473        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
474            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
475                                                    next_send_stream);
476        stream->stream_flags |= STREAM_SEND_WUF;
477    }
478    return 0;
479}
480
481
482int
483lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
484{
485    uint64_t max_off;
486    int got_next_offset;
487    enum ins_frame ins_frame;
488
489    assert(frame->packet_in);
490
491    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
492    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
493        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
494
495    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
496                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
497    {
498        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
499        lsquic_malo_put(frame);
500        return -1;
501    }
502
503    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
504  insert_frame:
505    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
506    if (INS_FRAME_OK == ins_frame)
507    {
508        /* Update maximum offset in the flow controller and check for flow
509         * control violation:
510         */
511        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
512        if (0 != lsquic_stream_update_sfcw(stream, max_off))
513            return -1;
514        if (frame->data_frame.df_fin)
515        {
516            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
517            stream->stream_flags |= STREAM_FIN_RECVD;
518            maybe_finish_stream(stream);
519        }
520        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
521                (stream->data_in->di_flags & DI_SWITCH_IMPL))
522        {
523            stream->data_in = stream->data_in->di_if->di_switch_impl(
524                                        stream->data_in, stream->read_offset);
525            if (!stream->data_in)
526            {
527                stream->data_in = data_in_error_new();
528                return -1;
529            }
530        }
531        if (got_next_offset)
532            /* Checking the offset saves di_get_frame() call */
533            maybe_conn_to_tickable_if_readable(stream);
534        return 0;
535    }
536    else if (INS_FRAME_DUP == ins_frame)
537    {
538        return 0;
539    }
540    else if (INS_FRAME_OVERLAP == ins_frame)
541    {
542        if (stream->stream_flags & STREAM_ALLOW_OVERLAP)
543        {
544            LSQ_DEBUG("overlap: switching DATA IN implementation");
545            stream->data_in = stream->data_in->di_if->di_switch_impl(
546                                        stream->data_in, stream->read_offset);
547            if (stream->data_in)
548                goto insert_frame;
549            stream->data_in = data_in_error_new();
550        }
551        else
552            LSQ_DEBUG("overlap not supported");
553        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
554        lsquic_malo_put(frame);
555        return -1;
556    }
557    else
558    {
559        assert(INS_FRAME_ERR == ins_frame);
560        return -1;
561    }
562}
563
564
565static void
566drop_frames_in (lsquic_stream_t *stream)
567{
568    if (stream->data_in)
569    {
570        stream->data_in->di_if->di_destroy(stream->data_in);
571        /* To avoid checking whether `data_in` is set, just set to the error
572         * data-in stream.  It does the right thing after incoming data is
573         * dropped.
574         */
575        stream->data_in = data_in_error_new();
576    }
577}
578
579
580static void
581maybe_elide_stream_frames (struct lsquic_stream *stream)
582{
583    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
584    {
585        if (stream->n_unacked)
586            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
587                                                stream->id);
588        stream->stream_flags |= STREAM_FRAMES_ELIDED;
589    }
590}
591
592
593int
594lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
595                      uint32_t error_code)
596{
597
598    if (stream->stream_flags & STREAM_RST_RECVD)
599    {
600        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
601        return 0;
602    }
603
604    SM_HISTORY_APPEND(stream, SHE_RST_IN);
605    /* This flag must always be set, even if we are "ignoring" it: it is
606     * used by elision code.
607     */
608    stream->stream_flags |= STREAM_RST_RECVD;
609
610    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
611    {
612        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
613            "smaller than that of byte following the last byte we have seen: "
614            "0x%"PRIX64, stream->id, offset,
615            lsquic_sfcw_get_max_recv_off(&stream->fc));
616        return -1;
617    }
618
619    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
620    {
621        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
622            " violates flow control", stream->id, offset);
623        return -1;
624    }
625
626    /* Let user collect error: */
627    maybe_conn_to_tickable_if_readable(stream);
628
629    lsquic_sfcw_consume_rem(&stream->fc);
630    drop_frames_in(stream);
631    drop_buffered_data(stream);
632    maybe_elide_stream_frames(stream);
633
634    if (!(stream->stream_flags &
635                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
636        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
637
638    stream->stream_flags |= STREAM_RST_RECVD;
639
640    maybe_finish_stream(stream);
641    maybe_schedule_call_on_close(stream);
642
643    return 0;
644}
645
646
647uint64_t
648lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
649{
650    assert(stream->stream_flags & STREAM_SEND_WUF);
651    stream->stream_flags &= ~STREAM_SEND_WUF;
652    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
653        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
654    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
655}
656
657
658void
659lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
660{
661    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
662    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
663    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
664    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
665        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
666}
667
668
669void
670lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
671{
672    assert(stream->stream_flags & STREAM_SEND_RST);
673    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
674    stream->stream_flags &= ~STREAM_SEND_RST;
675    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
676        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
677    stream->stream_flags |= STREAM_RST_SENT;
678    maybe_finish_stream(stream);
679}
680
681
682static size_t
683read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
684{
685    struct uncompressed_headers *uh = stream->uh;
686    size_t n_avail = uh->uh_size - uh->uh_off;
687    if (n_avail < len)
688        len = n_avail;
689    memcpy(dst, uh->uh_headers + uh->uh_off, len);
690    uh->uh_off += len;
691    if (uh->uh_off == uh->uh_size)
692    {
693        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
694        free(uh);
695        stream->uh = NULL;
696        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
697        {
698            stream->stream_flags |= STREAM_FIN_REACHED;
699            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
700        }
701    }
702    return len;
703}
704
705
706/* This function returns 0 when EOF is reached.
707 */
708ssize_t
709lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
710                     int iovcnt)
711{
712    size_t total_nread, nread;
713    int processed_frames, read_unc_headers, iovidx;
714    unsigned char *p, *end;
715
716    SM_HISTORY_APPEND(stream, SHE_USER_READ);
717
718#define NEXT_IOV() do {                                             \
719    ++iovidx;                                                       \
720    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
721        ++iovidx;                                                   \
722    if (iovidx < iovcnt)                                            \
723    {                                                               \
724        p = iov[iovidx].iov_base;                                   \
725        end = p + iov[iovidx].iov_len;                              \
726    }                                                               \
727    else                                                            \
728        p = end = NULL;                                             \
729} while (0)
730
731#define AVAIL() (end - p)
732
733    if (stream->stream_flags & STREAM_RST_FLAGS)
734    {
735        errno = ECONNRESET;
736        return -1;
737    }
738    if (stream->stream_flags & STREAM_U_READ_DONE)
739    {
740        errno = EBADF;
741        return -1;
742    }
743    if (stream->stream_flags & STREAM_FIN_REACHED)
744        return 0;
745
746    total_nread = 0;
747    processed_frames = 0;
748
749    iovidx = -1;
750    NEXT_IOV();
751
752    if (stream->uh && AVAIL())
753    {
754        read_unc_headers = 1;
755        do
756        {
757            nread = read_uh(stream, p, AVAIL());
758            p += nread;
759            total_nread += nread;
760            if (p == end)
761                NEXT_IOV();
762        }
763        while (stream->uh && AVAIL());
764    }
765    else
766        read_unc_headers = 0;
767
768    struct data_frame *data_frame;
769    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
770    {
771        ++processed_frames;
772        size_t navail = data_frame->df_size - data_frame->df_read_off;
773        size_t ntowrite = AVAIL();
774        if (navail < ntowrite)
775            ntowrite = navail;
776        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
777        p += ntowrite;
778        data_frame->df_read_off += ntowrite;
779        stream->read_offset += ntowrite;
780        total_nread += ntowrite;
781        if (data_frame->df_read_off == data_frame->df_size)
782        {
783            const int fin = data_frame->df_fin;
784            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
785            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
786                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
787            {
788                stream->data_in = stream->data_in->di_if->di_switch_impl(
789                                            stream->data_in, stream->read_offset);
790                if (!stream->data_in)
791                {
792                    stream->data_in = data_in_error_new();
793                    return -1;
794                }
795            }
796            if (fin)
797            {
798                stream->stream_flags |= STREAM_FIN_REACHED;
799                break;
800            }
801        }
802        if (p == end)
803            NEXT_IOV();
804    }
805
806    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
807                                        total_nread, stream->read_offset);
808
809    if (processed_frames)
810    {
811        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
812        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
813        {
814            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
815                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
816            stream->stream_flags |= STREAM_SEND_WUF;
817            maybe_conn_to_tickable_if_writeable(stream, 1);
818        }
819    }
820
821    if (processed_frames || read_unc_headers)
822    {
823        return total_nread;
824    }
825    else
826    {
827        assert(0 == total_nread);
828        errno = EWOULDBLOCK;
829        return -1;
830    }
831}
832
833
834ssize_t
835lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
836{
837    struct iovec iov = { .iov_base = buf, .iov_len = len, };
838    return lsquic_stream_readv(stream, &iov, 1);
839}
840
841
842static void
843stream_shutdown_read (lsquic_stream_t *stream)
844{
845    if (!(stream->stream_flags & STREAM_U_READ_DONE))
846    {
847        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
848        stream->stream_flags |= STREAM_U_READ_DONE;
849        stream_wantread(stream, 0);
850        maybe_finish_stream(stream);
851    }
852}
853
854
855static void
856stream_shutdown_write (lsquic_stream_t *stream)
857{
858    if (stream->stream_flags & STREAM_U_WRITE_DONE)
859        return;
860
861    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
862    stream->stream_flags |= STREAM_U_WRITE_DONE;
863    stream_wantwrite(stream, 0);
864
865    /* Don't bother to check whether there is anything else to write if
866     * the flags indicate that nothing else should be written.
867     */
868    if (!(stream->stream_flags &
869                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
870    {
871        if (stream->sm_n_buffered == 0)
872        {
873            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
874                                                 stream))
875            {
876                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
877                stream->stream_flags |= STREAM_FIN_SENT;
878            }
879            else
880            {
881                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
882                          "flag in it");
883                (void) stream_flush_nocheck(stream);
884            }
885        }
886        else
887            (void) stream_flush_nocheck(stream);
888    }
889}
890
891
892int
893lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
894{
895    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
896    if (lsquic_stream_is_closed(stream))
897    {
898        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
899        errno = EBADF;
900        return -1;
901    }
902    /* 0: read, 1: write: 2: read and write
903     */
904    if (how < 0 || how > 2)
905    {
906        errno = EINVAL;
907        return -1;
908    }
909
910    if (how)
911        stream_shutdown_write(stream);
912    if (how != 1)
913        stream_shutdown_read(stream);
914
915    maybe_finish_stream(stream);
916    maybe_schedule_call_on_close(stream);
917    if (how)
918        maybe_conn_to_tickable_if_writeable(stream, 1);
919
920    return 0;
921}
922
923
924void
925lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
926{
927    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
928    if (LSQUIC_STREAM_HANDSHAKE == stream->id
929        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
930                                LSQUIC_STREAM_HEADERS == stream->id))
931    {
932        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
933        stream->stream_flags |= STREAM_FORCE_FINISH;
934        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
935    }
936    maybe_finish_stream(stream);
937    maybe_schedule_call_on_close(stream);
938}
939
940
941static void
942fake_reset_unused_stream (lsquic_stream_t *stream)
943{
944    stream->stream_flags |=
945        STREAM_RST_RECVD    /* User will pick this up on read or write */
946      | STREAM_RST_SENT     /* Don't send anything else on this stream */
947    ;
948
949    /* Cancel all writes to the network scheduled for this stream: */
950    if (stream->stream_flags & STREAM_SENDING_FLAGS)
951        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
952                                                next_send_stream);
953    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
954
955    LSQ_DEBUG("fake-reset stream %u%s",
956                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
957    maybe_finish_stream(stream);
958    maybe_schedule_call_on_close(stream);
959}
960
961
962/* This function should only be called for locally-initiated streams whose ID
963 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
964 * frame sent by peer but we have not yet received it and created a stream.
965 * In this situation, we mark the stream as reset, so that user's on_read or
966 * on_write event callback picks up the error.  That, in turn, should result
967 * in stream being closed.
968 *
969 * If we have received any data frames on this stream, this probably indicates
970 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
971 * lower than this.  However, we still try to handle it gracefully and peform
972 * a shutdown, as if the stream was not reset.
973 */
974void
975lsquic_stream_received_goaway (lsquic_stream_t *stream)
976{
977    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
978    if (0 == stream->read_offset &&
979                            stream->data_in->di_if->di_empty(stream->data_in))
980        fake_reset_unused_stream(stream);       /* Normal condition */
981    else
982    {   /* This is odd, let's handle it the best we can: */
983        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
984        lsquic_stream_shutdown_internal(stream);
985    }
986}
987
988
989uint64_t
990lsquic_stream_read_offset (const lsquic_stream_t *stream)
991{
992    return stream->read_offset;
993}
994
995
996static int
997stream_wantread (lsquic_stream_t *stream, int is_want)
998{
999    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
1000    const int new_val = !!is_want;
1001    if (old_val != new_val)
1002    {
1003        if (new_val)
1004        {
1005            if (!old_val)
1006                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1007                                                            next_read_stream);
1008            stream->stream_flags |= STREAM_WANT_READ;
1009        }
1010        else
1011        {
1012            stream->stream_flags &= ~STREAM_WANT_READ;
1013            if (old_val)
1014                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1015                                                            next_read_stream);
1016        }
1017    }
1018    return old_val;
1019}
1020
1021
1022static void
1023maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1024{
1025    assert(STREAM_WRITE_Q_FLAGS & flag);
1026    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1027        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1028                                                        next_write_stream);
1029    stream->stream_flags |= flag;
1030}
1031
1032
1033static void
1034maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1035{
1036    assert(STREAM_WRITE_Q_FLAGS & flag);
1037    if (stream->stream_flags & flag)
1038    {
1039        stream->stream_flags &= ~flag;
1040        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1041            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1042                                                        next_write_stream);
1043    }
1044}
1045
1046
1047static int
1048stream_wantwrite (lsquic_stream_t *stream, int is_want)
1049{
1050    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1051    const int new_val = !!is_want;
1052    if (old_val != new_val)
1053    {
1054        if (new_val)
1055            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1056        else
1057            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1058    }
1059    return old_val;
1060}
1061
1062
1063int
1064lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1065{
1066    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1067    {
1068        if (is_want)
1069            maybe_conn_to_tickable_if_readable(stream);
1070        return stream_wantread(stream, is_want);
1071    }
1072    else
1073    {
1074        errno = EBADF;
1075        return -1;
1076    }
1077}
1078
1079
1080int
1081lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1082{
1083    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1084    {
1085        if (is_want)
1086            maybe_conn_to_tickable_if_writeable(stream, 1);
1087        return stream_wantwrite(stream, is_want);
1088    }
1089    else
1090    {
1091        errno = EBADF;
1092        return -1;
1093    }
1094}
1095
1096
1097#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1098    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1099
1100
1101static void
1102stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1103{
1104    unsigned no_progress_count, no_progress_limit;
1105    enum stream_flags flags;
1106    uint64_t size;
1107
1108    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1109
1110    no_progress_count = 0;
1111    while ((stream->stream_flags & STREAM_WANT_READ)
1112                                            && lsquic_stream_readable(stream))
1113    {
1114        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1115        size  = stream->read_offset;
1116
1117        stream->stream_if->on_read(stream, stream->st_ctx);
1118
1119        if (no_progress_limit && size == stream->read_offset &&
1120                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1121        {
1122            ++no_progress_count;
1123            if (no_progress_count >= no_progress_limit)
1124            {
1125                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1126                    "progress) in user code reading from stream",
1127                    no_progress_count,
1128                    no_progress_count == 1 ? "" : "s");
1129                break;
1130            }
1131        }
1132        else
1133            no_progress_count = 0;
1134    }
1135}
1136
1137
1138static void
1139stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1140{
1141    unsigned no_progress_count, no_progress_limit;
1142    enum stream_flags flags;
1143
1144    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1145
1146    no_progress_count = 0;
1147    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1148    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1149                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1150           && lsquic_stream_write_avail(stream))
1151    {
1152        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1153
1154        stream->stream_if->on_write(stream, stream->st_ctx);
1155
1156        if (no_progress_limit &&
1157            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1158        {
1159            ++no_progress_count;
1160            if (no_progress_count >= no_progress_limit)
1161            {
1162                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1163                    "progress) in user code writing to stream",
1164                    no_progress_count,
1165                    no_progress_count == 1 ? "" : "s");
1166                break;
1167            }
1168        }
1169        else
1170            no_progress_count = 0;
1171    }
1172}
1173
1174
1175static void
1176stream_dispatch_read_events_once (lsquic_stream_t *stream)
1177{
1178    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1179    {
1180        stream->stream_if->on_read(stream, stream->st_ctx);
1181    }
1182}
1183
1184
1185static void
1186maybe_mark_as_blocked (lsquic_stream_t *stream)
1187{
1188    struct lsquic_conn_cap *cc;
1189
1190    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1191    {
1192        if (stream->blocked_off < stream->max_send_off)
1193        {
1194            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1195            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1196                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1197                                                            next_send_stream);
1198            stream->stream_flags |= STREAM_SEND_BLOCKED;
1199            LSQ_DEBUG("marked stream-blocked at stream offset "
1200                                            "%"PRIu64, stream->blocked_off);
1201        }
1202        else
1203            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1204                " has been, or is about to be, sent", stream->blocked_off);
1205    }
1206
1207    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1208        && (cc = &stream->conn_pub->conn_cap,
1209                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1210    {
1211        if (cc->cc_blocked < cc->cc_max)
1212        {
1213            cc->cc_blocked = cc->cc_max;
1214            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1215            LSQ_DEBUG("marked connection-blocked at connection offset "
1216                                                    "%"PRIu64, cc->cc_max);
1217        }
1218        else
1219            LSQ_DEBUG("stream has already been marked connection-blocked "
1220                "at offset %"PRIu64, cc->cc_blocked);
1221    }
1222}
1223
1224
1225void
1226lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1227{
1228    assert(stream->stream_flags & STREAM_WANT_READ);
1229
1230    if (stream->stream_flags & STREAM_RW_ONCE)
1231        stream_dispatch_read_events_once(stream);
1232    else
1233        stream_dispatch_read_events_loop(stream);
1234}
1235
1236
1237void
1238lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1239{
1240    int progress;
1241    uint64_t tosend_off;
1242    unsigned short n_buffered;
1243    enum stream_flags flags;
1244
1245    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1246    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1247    tosend_off = stream->tosend_off;
1248    n_buffered = stream->sm_n_buffered;
1249
1250    if (stream->stream_flags & STREAM_WANT_FLUSH)
1251        (void) stream_flush(stream);
1252
1253    if (stream->stream_flags & STREAM_RW_ONCE)
1254    {
1255        if ((stream->stream_flags & STREAM_WANT_WRITE)
1256            && lsquic_stream_write_avail(stream))
1257        {
1258            stream->stream_if->on_write(stream, stream->st_ctx);
1259        }
1260    }
1261    else
1262        stream_dispatch_write_events_loop(stream);
1263
1264    /* Progress means either flags or offsets changed: */
1265    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1266                        stream->tosend_off == tosend_off &&
1267                            stream->sm_n_buffered == n_buffered);
1268
1269    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1270    {
1271        if (progress)
1272        {   /* Move the stream to the end of the list to ensure fairness. */
1273            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1274                                                            next_write_stream);
1275            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1276                                                            next_write_stream);
1277        }
1278    }
1279}
1280
1281
1282static size_t
1283inner_reader_empty_size (void *ctx)
1284{
1285    return 0;
1286}
1287
1288
1289static size_t
1290inner_reader_empty_read (void *ctx, void *buf, size_t count)
1291{
1292    return 0;
1293}
1294
1295
1296static int
1297stream_flush (lsquic_stream_t *stream)
1298{
1299    struct lsquic_reader empty_reader;
1300    ssize_t nw;
1301
1302    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1303    assert(stream->sm_n_buffered > 0 ||
1304        /* Flushing is also used to packetize standalone FIN: */
1305        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1306                                                    == STREAM_U_WRITE_DONE));
1307
1308    empty_reader.lsqr_size = inner_reader_empty_size;
1309    empty_reader.lsqr_read = inner_reader_empty_read;
1310    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1311    nw = stream_write_to_packets(stream, &empty_reader, 0);
1312
1313    if (nw >= 0)
1314    {
1315        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1316        return 0;
1317    }
1318    else
1319        return -1;
1320}
1321
1322
1323static int
1324stream_flush_nocheck (lsquic_stream_t *stream)
1325{
1326    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1327    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1328    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1329
1330    return stream_flush(stream);
1331}
1332
1333
1334int
1335lsquic_stream_flush (lsquic_stream_t *stream)
1336{
1337    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1338    {
1339        LSQ_DEBUG("cannot flush closed stream");
1340        errno = EBADF;
1341        return -1;
1342    }
1343
1344    if (0 == stream->sm_n_buffered)
1345    {
1346        LSQ_DEBUG("flushing 0 bytes: noop");
1347        return 0;
1348    }
1349
1350    return stream_flush_nocheck(stream);
1351}
1352
1353
1354/* The flush threshold is the maximum size of stream data that can be sent
1355 * in a full packet.
1356 */
1357#ifdef NDEBUG
1358static
1359#endif
1360       size_t
1361lsquic_stream_flush_threshold (const struct lsquic_stream *stream)
1362{
1363    enum packet_out_flags flags;
1364    enum lsquic_packno_bits bits;
1365    unsigned packet_header_sz, stream_header_sz;
1366    size_t threshold;
1367
1368    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1369    flags = bits << POBIT_SHIFT;
1370    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1371        flags |= PO_CONN_ID;
1372    if (LSQUIC_STREAM_HANDSHAKE == stream->id)
1373        flags |= PO_LONGHEAD;
1374
1375    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags);
1376    stream_header_sz = stream->conn_pub->lconn->cn_pf
1377            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1378
1379    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1380              - packet_header_sz - stream_header_sz;
1381    return threshold;
1382}
1383
1384
1385#define COMMON_WRITE_CHECKS() do {                                          \
1386    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1387                                                   == STREAM_USE_HEADERS)   \
1388    {                                                                       \
1389        LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \
1390        errno = EILSEQ;                                                     \
1391        return -1;                                                          \
1392    }                                                                       \
1393    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1394    {                                                                       \
1395        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1396        errno = ECONNRESET;                                                 \
1397        return -1;                                                          \
1398    }                                                                       \
1399    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1400    {                                                                       \
1401        LSQ_INFO("Attempt to write to stream after it was closed for "      \
1402                                                                "writing"); \
1403        errno = EBADF;                                                      \
1404        return -1;                                                          \
1405    }                                                                       \
1406} while (0)
1407
1408
1409struct frame_gen_ctx
1410{
1411    lsquic_stream_t      *fgc_stream;
1412    struct lsquic_reader *fgc_reader;
1413    /* We keep our own count of how many bytes were read from reader because
1414     * some readers are external.  The external caller does not have to rely
1415     * on our count, but it can.
1416     */
1417    size_t                fgc_nread_from_reader;
1418};
1419
1420
1421static size_t
1422frame_gen_size (void *ctx)
1423{
1424    struct frame_gen_ctx *fg_ctx = ctx;
1425    size_t available, remaining;
1426
1427    /* Make sure we are not writing past available size: */
1428    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1429    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1430    if (available < remaining)
1431        remaining = available;
1432
1433    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1434}
1435
1436
1437static int
1438frame_gen_fin (void *ctx)
1439{
1440    struct frame_gen_ctx *fg_ctx = ctx;
1441    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1442        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1443        /* Do not use frame_gen_size() as it may chop the real size: */
1444        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1445}
1446
1447
1448static void
1449incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1450{
1451    if (stream->stream_flags & STREAM_CONN_LIMITED)
1452    {
1453        stream->conn_pub->conn_cap.cc_sent += incr;
1454        assert(stream->conn_pub->conn_cap.cc_sent
1455                                    <= stream->conn_pub->conn_cap.cc_max);
1456    }
1457}
1458
1459
1460static size_t
1461frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1462{
1463    struct frame_gen_ctx *fg_ctx = ctx;
1464    unsigned char *p = begin_buf;
1465    unsigned char *const end = p + len;
1466    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1467    size_t n_written, available, n_to_write;
1468
1469    if (stream->sm_n_buffered > 0)
1470    {
1471        if (len <= stream->sm_n_buffered)
1472        {
1473            memcpy(p, stream->sm_buf, len);
1474            memmove(stream->sm_buf, stream->sm_buf + len,
1475                                                stream->sm_n_buffered - len);
1476            stream->sm_n_buffered -= len;
1477            stream->tosend_off += len;
1478            *fin = frame_gen_fin(fg_ctx);
1479            return len;
1480        }
1481        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1482        p += stream->sm_n_buffered;
1483        stream->sm_n_buffered = 0;
1484    }
1485
1486    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1487    n_to_write = end - p;
1488    if (n_to_write > available)
1489        n_to_write = available;
1490    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1491                                              n_to_write);
1492    p += n_written;
1493    fg_ctx->fgc_nread_from_reader += n_written;
1494    *fin = frame_gen_fin(fg_ctx);
1495    stream->tosend_off += p - (const unsigned char *) begin_buf;
1496    incr_conn_cap(stream, n_written);
1497    return p - (const unsigned char *) begin_buf;
1498}
1499
1500
1501static void
1502check_flush_threshold (lsquic_stream_t *stream)
1503{
1504    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1505                            stream->tosend_off >= stream->sm_flush_to)
1506    {
1507        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1508                                                    stream->sm_flush_to);
1509        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1510    }
1511}
1512
1513
1514static struct lsquic_packet_out *
1515get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1516                      const struct lsquic_stream *stream)
1517{
1518    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1519}
1520
1521
1522static struct lsquic_packet_out * (* const get_packet[])(
1523    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1524{
1525    lsquic_send_ctl_get_packet_for_stream,
1526    get_brand_new_packet,
1527};
1528
1529
1530static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1531stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1532{
1533    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1534    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1535    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1536    unsigned stream_header_sz, need_at_least, off;
1537    lsquic_packet_out_t *packet_out;
1538    int len, s, hsk;
1539
1540    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1541                                                        stream->tosend_off);
1542    need_at_least = stream_header_sz + (size > 0);
1543    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1544    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1545    if (!packet_out)
1546        return SWTP_STOP;
1547    if (hsk)
1548        packet_out->po_header_type = stream->tosend_off == 0
1549                                            ? HETY_INITIAL : HETY_HANDSHAKE;
1550
1551    off = packet_out->po_data_sz;
1552    len = pf->pf_gen_stream_frame(
1553                packet_out->po_data + packet_out->po_data_sz,
1554                lsquic_packet_out_avail(packet_out), stream->id,
1555                stream->tosend_off,
1556                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1557    if (len < 0)
1558    {
1559        LSQ_ERROR("could not generate stream frame");
1560        return SWTP_ERROR;
1561    }
1562
1563    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1564                            packet_out->po_data + packet_out->po_data_sz, len);
1565    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1566    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1567    if (0 == lsquic_packet_out_avail(packet_out))
1568        packet_out->po_flags |= PO_STREAM_END;
1569    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1570                                     stream, QUIC_FRAME_STREAM, off, len);
1571    if (s != 0)
1572    {
1573        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1574        return SWTP_ERROR;
1575    }
1576
1577    check_flush_threshold(stream);
1578
1579    /* XXX: I don't like it that this is here */
1580    if (hsk && !(packet_out->po_flags & PO_HELLO))
1581    {
1582        lsquic_packet_out_zero_pad(packet_out);
1583        packet_out->po_flags |= PO_HELLO;
1584        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1585    }
1586
1587    return SWTP_OK;
1588}
1589
1590
1591static void
1592abort_connection (struct lsquic_stream *stream)
1593{
1594    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1595        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1596                                                next_service_stream);
1597    stream->stream_flags |= STREAM_ABORT_CONN;
1598    LSQ_WARN("connection will be aborted");
1599    maybe_conn_to_tickable(stream);
1600}
1601
1602
1603static ssize_t
1604stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1605                         size_t thresh)
1606{
1607    size_t size;
1608    ssize_t nw;
1609    unsigned seen_ok;
1610    struct frame_gen_ctx fg_ctx = {
1611        .fgc_stream = stream,
1612        .fgc_reader = reader,
1613        .fgc_nread_from_reader = 0,
1614    };
1615
1616    seen_ok = 0;
1617    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1618           || frame_gen_fin(&fg_ctx))
1619    {
1620        switch (stream_write_to_packet(&fg_ctx, size))
1621        {
1622        case SWTP_OK:
1623            if (!seen_ok++)
1624                maybe_conn_to_tickable_if_writeable(stream, 0);
1625            if (frame_gen_fin(&fg_ctx))
1626            {
1627                stream->stream_flags |= STREAM_FIN_SENT;
1628                goto end;
1629            }
1630            else
1631                break;
1632        case SWTP_STOP:
1633            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1634            goto end;
1635        default:
1636            abort_connection(stream);
1637            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1638            return -1;
1639        }
1640    }
1641
1642    if (thresh)
1643    {
1644        assert(size < thresh);
1645        assert(size >= stream->sm_n_buffered);
1646        size -= stream->sm_n_buffered;
1647        if (size > 0)
1648        {
1649            nw = save_to_buffer(stream, reader, size);
1650            if (nw < 0)
1651                return -1;
1652            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1653        }
1654    }
1655    else
1656    {
1657        /* We count flushed data towards both stream and connection limits,
1658         * so we should have been able to packetize all of it:
1659         */
1660        assert(0 == stream->sm_n_buffered);
1661        assert(size == 0);
1662    }
1663
1664    maybe_mark_as_blocked(stream);
1665
1666  end:
1667    return fg_ctx.fgc_nread_from_reader;
1668}
1669
1670
1671/* Perform an implicit flush when we hit connection limit while buffering
1672 * data.  This is to prevent a (theoretical) stall:
1673 *
1674 * Imagine a number of streams, all of which buffered some data.  The buffered
1675 * data is up to connection cap, which means no further writes are possible.
1676 * None of them flushes, which means that data is not sent and connection
1677 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1678 */
1679static int
1680maybe_flush_stream (struct lsquic_stream *stream)
1681{
1682    if (stream->sm_n_buffered > 0
1683          && (stream->stream_flags & STREAM_CONN_LIMITED)
1684            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1685        return stream_flush_nocheck(stream);
1686    else
1687        return 0;
1688}
1689
1690
1691static ssize_t
1692save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1693                                                                size_t len)
1694{
1695    size_t avail, n_written;
1696
1697    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1698
1699    if (!stream->sm_buf)
1700    {
1701        stream->sm_buf = malloc(SM_BUF_SIZE);
1702        if (!stream->sm_buf)
1703            return -1;
1704    }
1705
1706    avail = lsquic_stream_write_avail(stream);
1707    if (avail < len)
1708        len = avail;
1709
1710    n_written = reader->lsqr_read(reader->lsqr_ctx,
1711                        stream->sm_buf + stream->sm_n_buffered, len);
1712    stream->sm_n_buffered += n_written;
1713    incr_conn_cap(stream, n_written);
1714    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1715              n_written, stream->sm_n_buffered);
1716    if (0 != maybe_flush_stream(stream))
1717        return -1;
1718    return n_written;
1719}
1720
1721
1722static ssize_t
1723stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1724{
1725    size_t thresh, len;
1726
1727    thresh = lsquic_stream_flush_threshold(stream);
1728    len = reader->lsqr_size(reader->lsqr_ctx);
1729    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1730                                    stream->sm_n_buffered + len < thresh)
1731        return save_to_buffer(stream, reader, len);
1732    else
1733        return stream_write_to_packets(stream, reader, thresh);
1734}
1735
1736
1737ssize_t
1738lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1739{
1740    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1741    return lsquic_stream_writev(stream, &iov, 1);
1742}
1743
1744
1745struct inner_reader_iovec {
1746    const struct iovec       *iov;
1747    const struct iovec *end;
1748    unsigned                  cur_iovec_off;
1749};
1750
1751
1752static size_t
1753inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1754{
1755    struct inner_reader_iovec *const iro = ctx;
1756    unsigned char *p = buf;
1757    unsigned char *const end = p + count;
1758    unsigned n_tocopy;
1759
1760    while (iro->iov < iro->end && p < end)
1761    {
1762        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1763        if (n_tocopy > (unsigned) (end - p))
1764            n_tocopy = end - p;
1765        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1766                                                                    n_tocopy);
1767        p += n_tocopy;
1768        iro->cur_iovec_off += n_tocopy;
1769        if (iro->iov->iov_len == iro->cur_iovec_off)
1770        {
1771            ++iro->iov;
1772            iro->cur_iovec_off = 0;
1773        }
1774    }
1775
1776    return p + count - end;
1777}
1778
1779
1780static size_t
1781inner_reader_iovec_size (void *ctx)
1782{
1783    struct inner_reader_iovec *const iro = ctx;
1784    const struct iovec *iov;
1785    size_t size;
1786
1787    size = 0;
1788    for (iov = iro->iov; iov < iro->end; ++iov)
1789        size += iov->iov_len;
1790
1791    return size - iro->cur_iovec_off;
1792}
1793
1794
1795ssize_t
1796lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1797                                                                    int iovcnt)
1798{
1799    COMMON_WRITE_CHECKS();
1800    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1801
1802    struct inner_reader_iovec iro = {
1803        .iov = iov,
1804        .end = iov + iovcnt,
1805        .cur_iovec_off = 0,
1806    };
1807    struct lsquic_reader reader = {
1808        .lsqr_read = inner_reader_iovec_read,
1809        .lsqr_size = inner_reader_iovec_size,
1810        .lsqr_ctx  = &iro,
1811    };
1812
1813    return stream_write(stream, &reader);
1814}
1815
1816
1817ssize_t
1818lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1819{
1820    COMMON_WRITE_CHECKS();
1821    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1822    return stream_write(stream, reader);
1823}
1824
1825
1826int
1827lsquic_stream_send_headers (lsquic_stream_t *stream,
1828                            const lsquic_http_headers_t *headers, int eos)
1829{
1830    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1831                                                     STREAM_U_WRITE_DONE))
1832                == STREAM_USE_HEADERS)
1833    {
1834        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1835                    stream->id, headers, eos, lsquic_stream_priority(stream));
1836        if (0 == s)
1837        {
1838            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1839            stream->stream_flags |= STREAM_HEADERS_SENT;
1840            if (eos)
1841                stream->stream_flags |= STREAM_FIN_SENT;
1842            LSQ_INFO("sent headers for stream %u", stream->id);
1843        }
1844        else
1845            LSQ_WARN("could not send headers: %s", strerror(errno));
1846        return s;
1847    }
1848    else
1849    {
1850        LSQ_INFO("cannot send headers for stream %u in this state", stream->id);
1851        errno = EBADMSG;
1852        return -1;
1853    }
1854}
1855
1856
1857void
1858lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1859{
1860    if (offset > stream->max_send_off)
1861    {
1862        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1863        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1864            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1865        stream->max_send_off = offset;
1866    }
1867    else
1868        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1869            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1870            stream->max_send_off);
1871}
1872
1873
1874/* This function is used to update offsets after handshake completes and we
1875 * learn of peer's limits from the handshake values.
1876 */
1877int
1878lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1879{
1880    LSQ_DEBUG("setting max_send_off to %u", offset);
1881    if (offset > stream->max_send_off)
1882    {
1883        lsquic_stream_window_update(stream, offset);
1884        return 0;
1885    }
1886    else if (offset < stream->tosend_off)
1887    {
1888        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1889            "already sent on this stream (%"PRIu64" bytes)", offset,
1890            stream->tosend_off);
1891        return -1;
1892    }
1893    else
1894    {
1895        stream->max_send_off = offset;
1896        return 0;
1897    }
1898}
1899
1900
1901void
1902lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1903{
1904    lsquic_stream_reset_ext(stream, error_code, 1);
1905}
1906
1907
1908void
1909lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1910                         int do_close)
1911{
1912    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1913    {
1914        LSQ_INFO("reset already sent");
1915        return;
1916    }
1917
1918    SM_HISTORY_APPEND(stream, SHE_RESET);
1919
1920    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1921    stream->error_code = error_code;
1922
1923    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1924        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1925                                                        next_send_stream);
1926    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1927    stream->stream_flags |= STREAM_SEND_RST;
1928
1929    drop_buffered_data(stream);
1930    maybe_elide_stream_frames(stream);
1931    maybe_schedule_call_on_close(stream);
1932
1933    if (do_close)
1934        lsquic_stream_close(stream);
1935    else
1936        maybe_conn_to_tickable_if_writeable(stream, 1);
1937}
1938
1939
1940unsigned
1941lsquic_stream_id (const lsquic_stream_t *stream)
1942{
1943    return stream->id;
1944}
1945
1946
1947struct lsquic_conn *
1948lsquic_stream_conn (const lsquic_stream_t *stream)
1949{
1950    return stream->conn_pub->lconn;
1951}
1952
1953
1954int
1955lsquic_stream_close (lsquic_stream_t *stream)
1956{
1957    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1958    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1959    if (lsquic_stream_is_closed(stream))
1960    {
1961        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1962        errno = EBADF;
1963        return -1;
1964    }
1965    stream_shutdown_write(stream);
1966    stream_shutdown_read(stream);
1967    maybe_schedule_call_on_close(stream);
1968    maybe_finish_stream(stream);
1969    maybe_conn_to_tickable_if_writeable(stream, 1);
1970    return 0;
1971}
1972
1973
1974#ifndef NDEBUG
1975#if __GNUC__
1976__attribute__((weak))
1977#endif
1978#endif
1979void
1980lsquic_stream_acked (lsquic_stream_t *stream)
1981{
1982    assert(stream->n_unacked);
1983    --stream->n_unacked;
1984    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
1985    if (0 == stream->n_unacked)
1986        maybe_finish_stream(stream);
1987}
1988
1989
1990void
1991lsquic_stream_push_req (lsquic_stream_t *stream,
1992                        struct uncompressed_headers *push_req)
1993{
1994    assert(!stream->push_req);
1995    stream->push_req = push_req;
1996    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
1997}
1998
1999
2000int
2001lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2002{
2003    return 1 & ~stream->id;
2004}
2005
2006
2007int
2008lsquic_stream_push_info (const lsquic_stream_t *stream,
2009        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
2010{
2011    if (lsquic_stream_is_pushed(stream))
2012    {
2013        assert(stream->push_req);
2014        *ref_stream_id = stream->push_req->uh_stream_id;
2015        *headers       = stream->push_req->uh_headers;
2016        *headers_sz    = stream->push_req->uh_size;
2017        return 0;
2018    }
2019    else
2020        return -1;
2021}
2022
2023
2024int
2025lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2026{
2027    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2028    {
2029        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2030        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2031        stream->stream_flags |= STREAM_HAVE_UH;
2032        if (uh->uh_flags & UH_FIN)
2033            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2034        stream->uh = uh;
2035        if (uh->uh_oth_stream_id == 0)
2036        {
2037            if (uh->uh_weight)
2038                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2039        }
2040        else
2041            LSQ_NOTICE("don't know how to depend on stream %u",
2042                                                        uh->uh_oth_stream_id);
2043        return 0;
2044    }
2045    else
2046    {
2047        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2048        return -1;
2049    }
2050}
2051
2052
2053unsigned
2054lsquic_stream_priority (const lsquic_stream_t *stream)
2055{
2056    return 256 - stream->sm_priority;
2057}
2058
2059
2060int
2061lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2062{
2063    /* The user should never get a reference to the special streams,
2064     * but let's check just in case:
2065     */
2066    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2067        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2068                                LSQUIC_STREAM_HEADERS == stream->id))
2069        return -1;
2070    if (priority < 1 || priority > 256)
2071        return -1;
2072    stream->sm_priority = 256 - priority;
2073    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2074    LSQ_DEBUG("set priority to %u", priority);
2075    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2076    return 0;
2077}
2078
2079
2080int
2081lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2082{
2083    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2084    {
2085        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2086                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2087        {
2088            /* We need to send headers only if we are a) using HEADERS stream
2089             * and b) we already sent initial headers.  If initial headers
2090             * have not been sent yet, stream priority will be sent in the
2091             * HEADERS frame.
2092             */
2093            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2094                                                    stream->id, 0, 0, priority);
2095        }
2096        else
2097            return 0;
2098    }
2099    else
2100        return -1;
2101}
2102
2103
2104lsquic_stream_ctx_t *
2105lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2106{
2107    return stream->st_ctx;
2108}
2109
2110
2111int
2112lsquic_stream_refuse_push (lsquic_stream_t *stream)
2113{
2114    if (lsquic_stream_is_pushed(stream) &&
2115                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2116    {
2117        LSQ_DEBUG("refusing pushed stream: send reset");
2118        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2119        return 0;
2120    }
2121    else
2122        return -1;
2123}
2124
2125
2126size_t
2127lsquic_stream_mem_used (const struct lsquic_stream *stream)
2128{
2129    size_t size;
2130
2131    size = sizeof(stream);
2132    if (stream->sm_buf)
2133        size += SM_BUF_SIZE;
2134    if (stream->data_in)
2135        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2136
2137    return size;
2138}
2139
2140
2141lsquic_cid_t
2142lsquic_stream_cid (const struct lsquic_stream *stream)
2143{
2144    return LSQUIC_LOG_CONN_ID;
2145}
2146
2147
2148