lsquic_stream.c revision bfc7bfd8
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_frame_reader.h"
44#include "lsquic_conn.h"
45#include "lsquic_data_in_if.h"
46#include "lsquic_parse.h"
47#include "lsquic_packet_out.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_senhist.h"
50#include "lsquic_pacer.h"
51#include "lsquic_cubic.h"
52#include "lsquic_send_ctl.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static int
75stream_readable (const lsquic_stream_t *stream);
76
77static ssize_t
78stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
79
80static ssize_t
81save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
82
83static int
84stream_flush (lsquic_stream_t *stream);
85
86static int
87stream_flush_nocheck (lsquic_stream_t *stream);
88
89static void
90maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
91
92
93#if LSQUIC_KEEP_STREAM_HISTORY
94/* These values are printable ASCII characters for ease of printing the
95 * whole history in a single line of a log message.
96 *
97 * The list of events is not exhaustive: only most interesting events
98 * are recorded.
99 */
100enum stream_history_event
101{
102    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
103    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
104    SHE_REACH_FIN          =  'a',
105    SHE_BLOCKED_OUT        =  'b',
106    SHE_CREATED            =  'C',
107    SHE_FRAME_IN           =  'd',
108    SHE_FRAME_OUT          =  'D',
109    SHE_RESET              =  'e',
110    SHE_WINDOW_UPDATE      =  'E',
111    SHE_FIN_IN             =  'f',
112    SHE_FINISHED           =  'F',
113    SHE_GOAWAY_IN          =  'g',
114    SHE_USER_WRITE_HEADER  =  'h',
115    SHE_HEADERS_IN         =  'H',
116    SHE_ONCLOSE_SCHED      =  'l',
117    SHE_ONCLOSE_CALL       =  'L',
118    SHE_ONNEW              =  'N',
119    SHE_SET_PRIO           =  'p',
120    SHE_USER_READ          =  'r',
121    SHE_SHUTDOWN_READ      =  'R',
122    SHE_RST_IN             =  's',
123    SHE_RST_OUT            =  't',
124    SHE_FLUSH              =  'u',
125    SHE_USER_WRITE_DATA    =  'w',
126    SHE_SHUTDOWN_WRITE     =  'W',
127    SHE_CLOSE              =  'X',
128    SHE_FORCE_FINISH       =  'Z',
129};
130
131static void
132sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
133{
134    enum stream_history_event prev_event;
135    sm_hist_idx_t idx;
136    int plus;
137
138    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
139    plus = SHE_PLUS == stream->sm_hist_buf[idx];
140    idx = (idx - plus) & SM_HIST_IDX_MASK;
141    prev_event = stream->sm_hist_buf[idx];
142
143    if (prev_event == sh_event && plus)
144        return;
145
146    if (prev_event == sh_event)
147        sh_event = SHE_PLUS;
148    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
149
150    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
151        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
152                                                        stream->sm_hist_buf);
153}
154
155#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
156#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
157        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
158            LSQ_DEBUG("history: [%.*s]",                                    \
159                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
160                (stream)->sm_hist_buf);                                     \
161    } while (0)
162#else
163#   define SM_HISTORY_APPEND(stream, event)
164#   define SM_HISTORY_DUMP_REMAINING(stream)
165#endif
166
167
168static int
169stream_inside_callback (const lsquic_stream_t *stream)
170{
171    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
172}
173
174
175/* Here, "readable" means that the user is able to read from the stream. */
176static void
177maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream,
178                                                        enum rw_reason reason)
179{
180    if (!stream_inside_callback(stream) && stream_readable(stream))
181    {
182        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
183                                            stream->conn_pub->lconn, reason);
184    }
185}
186
187
188/* Here, "writeable" means that data can be put into packets to be
189 * scheduled to be sent out.
190 */
191static void
192maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream,
193                                                        enum rw_reason reason)
194{
195    if (!stream_inside_callback(stream) &&
196            lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
197          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
198    {
199        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
200                                            stream->conn_pub->lconn, reason);
201    }
202}
203
204
205static int
206stream_stalled (const lsquic_stream_t *stream)
207{
208    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
209           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
210                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
211}
212
213
214/* TODO: The logic to figure out whether the stream is connection limited
215 * should be taken out of the constructor.  The caller should specify this
216 * via one of enum stream_ctor_flags.
217 */
218lsquic_stream_t *
219lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
220                       const struct lsquic_stream_if *stream_if,
221                       void *stream_if_ctx, unsigned initial_window,
222                       unsigned initial_send_off,
223                       enum stream_ctor_flags ctor_flags)
224{
225    lsquic_cfcw_t *cfcw;
226    lsquic_stream_t *stream;
227
228    stream = calloc(1, sizeof(*stream));
229    if (!stream)
230        return NULL;
231
232    stream->stream_if = stream_if;
233    stream->id        = id;
234    stream->conn_pub  = conn_pub;
235    stream->sm_onnew_arg = stream_if_ctx;
236    if (!initial_window)
237        initial_window = 16 * 1024;
238    if (LSQUIC_STREAM_HANDSHAKE == id ||
239        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
240        cfcw = NULL;
241    else
242    {
243        cfcw = &conn_pub->cfcw;
244        stream->stream_flags |= STREAM_CONN_LIMITED;
245        if (conn_pub->hs)
246            stream->stream_flags |= STREAM_USE_HEADERS;
247        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
248    }
249    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
250    if (!initial_send_off)
251        initial_send_off = 16 * 1024;
252    stream->max_send_off = initial_send_off;
253    if (ctor_flags & SCF_USE_DI_HASH)
254        stream->data_in = data_in_hash_new(conn_pub, id, 0);
255    else
256        stream->data_in = data_in_nocopy_new(conn_pub, id);
257    LSQ_DEBUG("created stream %u @%p", id, stream);
258    SM_HISTORY_APPEND(stream, SHE_CREATED);
259    if (ctor_flags & SCF_DI_AUTOSWITCH)
260        stream->stream_flags |= STREAM_AUTOSWITCH;
261    if (ctor_flags & SCF_CALL_ON_NEW)
262        lsquic_stream_call_on_new(stream);
263    if (ctor_flags & SCF_DISP_RW_ONCE)
264        stream->stream_flags |= STREAM_RW_ONCE;
265    return stream;
266}
267
268
269void
270lsquic_stream_call_on_new (lsquic_stream_t *stream)
271{
272    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
273    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
274    {
275        LSQ_DEBUG("calling on_new_stream");
276        SM_HISTORY_APPEND(stream, SHE_ONNEW);
277        stream->stream_flags |= STREAM_ONNEW_DONE;
278        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
279                                                          stream);
280    }
281}
282
283
284static void
285decr_conn_cap (struct lsquic_stream *stream, size_t incr)
286{
287    if (stream->stream_flags & STREAM_CONN_LIMITED)
288    {
289        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
290        stream->conn_pub->conn_cap.cc_sent -= incr;
291    }
292}
293
294
295static void
296drop_buffered_data (struct lsquic_stream *stream)
297{
298    decr_conn_cap(stream, stream->sm_n_buffered);
299    stream->sm_n_buffered = 0;
300    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
301        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
302}
303
304
305void
306lsquic_stream_destroy (lsquic_stream_t *stream)
307{
308    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
309    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
310                                                            STREAM_ONNEW_DONE)
311    {
312        stream->stream_flags |= STREAM_ONCLOSE_DONE;
313        stream->stream_if->on_close(stream, stream->st_ctx);
314    }
315    if (stream->stream_flags & STREAM_SENDING_FLAGS)
316        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
317    if (stream->stream_flags & STREAM_WANT_READ)
318        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
319    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
320        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
321    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
322        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
323    drop_buffered_data(stream);
324    lsquic_sfcw_consume_rem(&stream->fc);
325    drop_frames_in(stream);
326    free(stream->push_req);
327    free(stream->uh);
328    free(stream->sm_buf);
329    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
330    SM_HISTORY_DUMP_REMAINING(stream);
331    free(stream);
332}
333
334
335static int
336stream_is_finished (const lsquic_stream_t *stream)
337{
338    return lsquic_stream_is_closed(stream)
339           /* n_unacked checks that no outgoing packets that reference this
340            * stream are outstanding:
341            */
342        && 0 == stream->n_unacked
343           /* This checks that no packets that reference this stream will
344            * become outstanding:
345            */
346        && 0 == (stream->stream_flags & STREAM_SEND_RST)
347        && ((stream->stream_flags & STREAM_FORCE_FINISH)
348          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
349           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
350}
351
352
353static void
354maybe_finish_stream (lsquic_stream_t *stream)
355{
356    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
357                                                    stream_is_finished(stream))
358    {
359        LSQ_DEBUG("stream %u is now finished", stream->id);
360        SM_HISTORY_APPEND(stream, SHE_FINISHED);
361        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
362            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
363                                                    next_service_stream);
364        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
365    }
366}
367
368
369static void
370maybe_schedule_call_on_close (lsquic_stream_t *stream)
371{
372    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
373                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
374            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
375    {
376        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
377            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
378                                                    next_service_stream);
379        stream->stream_flags |= STREAM_CALL_ONCLOSE;
380        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
381        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
382    }
383}
384
385
386void
387lsquic_stream_call_on_close (lsquic_stream_t *stream)
388{
389    assert(stream->stream_flags & STREAM_ONNEW_DONE);
390    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
391    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
392        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
393                                                    next_service_stream);
394    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
395    {
396        LSQ_DEBUG("calling on_close for stream %u", stream->id);
397        stream->stream_flags |= STREAM_ONCLOSE_DONE;
398        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
399        stream->stream_if->on_close(stream, stream->st_ctx);
400    }
401    else
402        assert(0);
403}
404
405
406static int
407stream_readable (const lsquic_stream_t *stream)
408{
409    /* A stream is readable if one of the following is true: */
410    return
411        /* - It is already finished: in that case, lsquic_stream_read() will
412         *   return 0.
413         */
414            (stream->stream_flags & STREAM_FIN_REACHED)
415        /* - The stream is reset, by either side.  In this case,
416         *   lsquic_stream_read() will return -1 (we want the user to be
417         *   able to collect the error).
418         */
419        ||  (stream->stream_flags & STREAM_RST_FLAGS)
420        /* - Either we are not in HTTP mode or the HTTP headers have been
421         *   received and the headers or data from the stream can be read.
422         */
423        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
424                                                        == STREAM_USE_HEADERS)
425            && (stream->uh != NULL
426                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
427                                                        stream->read_offset)))
428    ;
429}
430
431
432static size_t
433stream_write_avail (const struct lsquic_stream *stream)
434{
435    uint64_t stream_avail, conn_avail;
436
437    stream_avail = stream->max_send_off - stream->tosend_off
438                                                - stream->sm_n_buffered;
439    if (stream->stream_flags & STREAM_CONN_LIMITED)
440    {
441        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
442        if (conn_avail < stream_avail)
443            return conn_avail;
444    }
445
446    return stream_avail;
447}
448
449
450int
451lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
452{
453    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
454                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
455    {
456        return -1;
457    }
458    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
459    {
460        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
461            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
462                                                    next_send_stream);
463        stream->stream_flags |= STREAM_SEND_WUF;
464    }
465    return 0;
466}
467
468
469int
470lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
471{
472    uint64_t max_off;
473    int got_next_offset;
474    enum ins_frame ins_frame;
475
476    assert(frame->packet_in);
477
478    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
479    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
480        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
481
482    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
483                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
484    {
485        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
486        lsquic_malo_put(frame);
487        return -1;
488    }
489
490    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
491    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
492    if (INS_FRAME_OK == ins_frame)
493    {
494        /* Update maximum offset in the flow controller and check for flow
495         * control violation:
496         */
497        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
498        if (0 != lsquic_stream_update_sfcw(stream, max_off))
499            return -1;
500        if (frame->data_frame.df_fin)
501        {
502            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
503            stream->stream_flags |= STREAM_FIN_RECVD;
504            maybe_finish_stream(stream);
505        }
506        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
507                (stream->data_in->di_flags & DI_SWITCH_IMPL))
508        {
509            stream->data_in = stream->data_in->di_if->di_switch_impl(
510                                        stream->data_in, stream->read_offset);
511            if (!stream->data_in)
512            {
513                stream->data_in = data_in_error_new();
514                return -1;
515            }
516        }
517        if (got_next_offset)
518            /* Checking the offset saves di_get_frame() call */
519            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN);
520        return 0;
521    }
522    else if (INS_FRAME_DUP == ins_frame)
523    {
524        return 0;
525    }
526    else
527    {
528        assert(INS_FRAME_ERR == ins_frame);
529        return -1;
530    }
531}
532
533
534static void
535drop_frames_in (lsquic_stream_t *stream)
536{
537    if (stream->data_in)
538    {
539        stream->data_in->di_if->di_destroy(stream->data_in);
540        /* To avoid checking whether `data_in` is set, just set to the error
541         * data-in stream.  It does the right thing after incoming data is
542         * dropped.
543         */
544        stream->data_in = data_in_error_new();
545    }
546}
547
548
549static void
550maybe_elide_stream_frames (struct lsquic_stream *stream)
551{
552    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
553    {
554        if (stream->n_unacked)
555            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
556                                                stream->id);
557        stream->stream_flags |= STREAM_FRAMES_ELIDED;
558    }
559}
560
561
562int
563lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
564                      uint32_t error_code)
565{
566
567    if (stream->stream_flags & STREAM_RST_RECVD)
568    {
569        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
570        return 0;
571    }
572
573    SM_HISTORY_APPEND(stream, SHE_RST_IN);
574    /* This flag must always be set, even if we are "ignoring" it: it is
575     * used by elision code.
576     */
577    stream->stream_flags |= STREAM_RST_RECVD;
578
579    if ((stream->stream_flags & STREAM_FIN_RECVD) &&
580                    /* Pushed streams have fake STREAM_FIN_RECVD set, thus
581                     * we need a special check:
582                     */
583                                            !lsquic_stream_is_pushed(stream))
584    {
585        LSQ_DEBUG("ignore RST_STREAM frame after FIN is received");
586        return 0;
587    }
588
589    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
590    {
591        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
592            "smaller than that of byte following the last byte we have seen: "
593            "0x%"PRIX64, stream->id, offset,
594            lsquic_sfcw_get_max_recv_off(&stream->fc));
595        return -1;
596    }
597
598    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
599    {
600        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
601            " violates flow control", stream->id, offset);
602        return -1;
603    }
604
605    /* Let user collect error: */
606    maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN);
607
608    lsquic_sfcw_consume_rem(&stream->fc);
609    drop_frames_in(stream);
610    drop_buffered_data(stream);
611    maybe_elide_stream_frames(stream);
612
613    if (!(stream->stream_flags &
614                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
615        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
616
617    stream->stream_flags |= STREAM_RST_RECVD;
618
619    maybe_finish_stream(stream);
620    maybe_schedule_call_on_close(stream);
621
622    return 0;
623}
624
625
626uint64_t
627lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
628{
629    assert(stream->stream_flags & STREAM_SEND_WUF);
630    stream->stream_flags &= ~STREAM_SEND_WUF;
631    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
632        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
633    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
634}
635
636
637void
638lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
639{
640    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
641    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
642    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
643    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
644        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
645}
646
647
648void
649lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
650{
651    assert(stream->stream_flags & STREAM_SEND_RST);
652    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
653    stream->stream_flags &= ~STREAM_SEND_RST;
654    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
655        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
656    stream->stream_flags |= STREAM_RST_SENT;
657    maybe_finish_stream(stream);
658}
659
660
661static size_t
662read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
663{
664    struct uncompressed_headers *uh = stream->uh;
665    size_t n_avail = uh->uh_size - uh->uh_off;
666    if (n_avail < len)
667        len = n_avail;
668    memcpy(dst, uh->uh_headers + uh->uh_off, len);
669    uh->uh_off += len;
670    if (uh->uh_off == uh->uh_size)
671    {
672        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
673        free(uh);
674        stream->uh = NULL;
675        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
676        {
677            stream->stream_flags |= STREAM_FIN_REACHED;
678            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
679        }
680    }
681    return len;
682}
683
684
685/* This function returns 0 when EOF is reached.
686 */
687ssize_t
688lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
689                     int iovcnt)
690{
691    size_t total_nread, nread;
692    int processed_frames, read_unc_headers, iovidx;
693    unsigned char *p, *end;
694
695    SM_HISTORY_APPEND(stream, SHE_USER_READ);
696
697#define NEXT_IOV() do {                                             \
698    ++iovidx;                                                       \
699    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
700        ++iovidx;                                                   \
701    if (iovidx < iovcnt)                                            \
702    {                                                               \
703        p = iov[iovidx].iov_base;                                   \
704        end = p + iov[iovidx].iov_len;                              \
705    }                                                               \
706    else                                                            \
707        p = end = NULL;                                             \
708} while (0)
709
710#define AVAIL() (end - p)
711
712    if (stream->stream_flags & STREAM_RST_FLAGS)
713    {
714        errno = ECONNRESET;
715        return -1;
716    }
717    if (stream->stream_flags & STREAM_U_READ_DONE)
718    {
719        errno = EBADF;
720        return -1;
721    }
722    if (stream->stream_flags & STREAM_FIN_REACHED)
723        return 0;
724
725    total_nread = 0;
726    processed_frames = 0;
727
728    iovidx = -1;
729    NEXT_IOV();
730
731    if (stream->uh && AVAIL())
732    {
733        read_unc_headers = 1;
734        do
735        {
736            nread = read_uh(stream, p, AVAIL());
737            p += nread;
738            total_nread += nread;
739            if (p == end)
740                NEXT_IOV();
741        }
742        while (stream->uh && AVAIL());
743    }
744    else
745        read_unc_headers = 0;
746
747    struct data_frame *data_frame;
748    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
749    {
750        ++processed_frames;
751        size_t navail = data_frame->df_size - data_frame->df_read_off;
752        size_t ntowrite = AVAIL();
753        if (navail < ntowrite)
754            ntowrite = navail;
755        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
756        p += ntowrite;
757        data_frame->df_read_off += ntowrite;
758        stream->read_offset += ntowrite;
759        total_nread += ntowrite;
760        if (data_frame->df_read_off == data_frame->df_size)
761        {
762            const int fin = data_frame->df_fin;
763            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
764            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
765                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
766            {
767                stream->data_in = stream->data_in->di_if->di_switch_impl(
768                                            stream->data_in, stream->read_offset);
769                if (!stream->data_in)
770                {
771                    stream->data_in = data_in_error_new();
772                    return -1;
773                }
774            }
775            if (fin)
776            {
777                stream->stream_flags |= STREAM_FIN_REACHED;
778                break;
779            }
780        }
781        if (p == end)
782            NEXT_IOV();
783    }
784
785    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
786                                        total_nread, stream->read_offset);
787
788    if (processed_frames)
789    {
790        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
791        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
792        {
793            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
794                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
795            stream->stream_flags |= STREAM_SEND_WUF;
796            maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ);
797        }
798    }
799
800    if (processed_frames || read_unc_headers)
801    {
802        return total_nread;
803    }
804    else
805    {
806        assert(0 == total_nread);
807        errno = EWOULDBLOCK;
808        return -1;
809    }
810}
811
812
813ssize_t
814lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
815{
816    struct iovec iov = { .iov_base = buf, .iov_len = len, };
817    return lsquic_stream_readv(stream, &iov, 1);
818}
819
820
821static void
822stream_shutdown_read (lsquic_stream_t *stream)
823{
824    if (!(stream->stream_flags & STREAM_U_READ_DONE))
825    {
826        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
827        stream->stream_flags |= STREAM_U_READ_DONE;
828        stream_wantread(stream, 0);
829        maybe_finish_stream(stream);
830    }
831}
832
833
834static void
835stream_shutdown_write (lsquic_stream_t *stream)
836{
837    if (stream->stream_flags & STREAM_U_WRITE_DONE)
838        return;
839
840    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
841    stream->stream_flags |= STREAM_U_WRITE_DONE;
842    stream_wantwrite(stream, 0);
843
844    /* Don't bother to check whether there is anything else to write if
845     * the flags indicate that nothing else should be written.
846     */
847    if (!(stream->stream_flags &
848                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
849    {
850        if (stream->sm_n_buffered == 0)
851        {
852            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
853                                                 stream))
854            {
855                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
856                stream->stream_flags |= STREAM_FIN_SENT;
857            }
858            else
859            {
860                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
861                          "flag in it");
862                (void) stream_flush_nocheck(stream);
863            }
864        }
865        else
866            (void) stream_flush_nocheck(stream);
867    }
868}
869
870
871int
872lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
873{
874    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
875    if (lsquic_stream_is_closed(stream))
876    {
877        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
878        errno = EBADF;
879        return -1;
880    }
881    /* 0: read, 1: write: 2: read and write
882     */
883    if (how < 0 || how > 2)
884    {
885        errno = EINVAL;
886        return -1;
887    }
888
889    if (how)
890        stream_shutdown_write(stream);
891    if (how != 1)
892        stream_shutdown_read(stream);
893
894    maybe_finish_stream(stream);
895    maybe_schedule_call_on_close(stream);
896    if (how)
897        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN);
898
899    return 0;
900}
901
902
903void
904lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
905{
906    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
907    if (LSQUIC_STREAM_HANDSHAKE == stream->id
908        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
909                                LSQUIC_STREAM_HEADERS == stream->id))
910    {
911        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
912        stream->stream_flags |= STREAM_FORCE_FINISH;
913        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
914    }
915    maybe_finish_stream(stream);
916    maybe_schedule_call_on_close(stream);
917}
918
919
920static void
921fake_reset_unused_stream (lsquic_stream_t *stream)
922{
923    stream->stream_flags |=
924        STREAM_RST_RECVD    /* User will pick this up on read or write */
925      | STREAM_RST_SENT     /* Don't send anything else on this stream */
926    ;
927
928    /* Cancel all writes to the network scheduled for this stream: */
929    if (stream->stream_flags & STREAM_SENDING_FLAGS)
930        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
931                                                next_send_stream);
932    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
933
934    LSQ_DEBUG("fake-reset stream %u%s",
935                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
936    maybe_finish_stream(stream);
937    maybe_schedule_call_on_close(stream);
938}
939
940
941/* This function should only be called for locally-initiated streams whose ID
942 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
943 * frame sent by peer but we have not yet received it and created a stream.
944 * In this situation, we mark the stream as reset, so that user's on_read or
945 * on_write event callback picks up the error.  That, in turn, should result
946 * in stream being closed.
947 *
948 * If we have received any data frames on this stream, this probably indicates
949 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
950 * lower than this.  However, we still try to handle it gracefully and peform
951 * a shutdown, as if the stream was not reset.
952 */
953void
954lsquic_stream_received_goaway (lsquic_stream_t *stream)
955{
956    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
957    if (0 == stream->read_offset &&
958                            stream->data_in->di_if->di_empty(stream->data_in))
959        fake_reset_unused_stream(stream);       /* Normal condition */
960    else
961    {   /* This is odd, let's handle it the best we can: */
962        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
963        lsquic_stream_shutdown_internal(stream);
964    }
965}
966
967
968uint64_t
969lsquic_stream_read_offset (const lsquic_stream_t *stream)
970{
971    return stream->read_offset;
972}
973
974
975static int
976stream_wantread (lsquic_stream_t *stream, int is_want)
977{
978    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
979    const int new_val = !!is_want;
980    if (old_val != new_val)
981    {
982        if (new_val)
983        {
984            if (!old_val)
985                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
986                                                            next_read_stream);
987            stream->stream_flags |= STREAM_WANT_READ;
988        }
989        else
990        {
991            stream->stream_flags &= ~STREAM_WANT_READ;
992            if (old_val)
993                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
994                                                            next_read_stream);
995        }
996    }
997    return old_val;
998}
999
1000
1001static void
1002maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1003{
1004    assert(STREAM_WRITE_Q_FLAGS & flag);
1005    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1006        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1007                                                        next_write_stream);
1008    stream->stream_flags |= flag;
1009}
1010
1011
1012static void
1013maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1014{
1015    assert(STREAM_WRITE_Q_FLAGS & flag);
1016    if (stream->stream_flags & flag)
1017    {
1018        stream->stream_flags &= ~flag;
1019        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1020            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1021                                                        next_write_stream);
1022    }
1023}
1024
1025
1026static int
1027stream_wantwrite (lsquic_stream_t *stream, int is_want)
1028{
1029    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1030    const int new_val = !!is_want;
1031    if (old_val != new_val)
1032    {
1033        if (new_val)
1034            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1035        else
1036            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1037    }
1038    return old_val;
1039}
1040
1041
1042int
1043lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1044{
1045    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1046    {
1047        if (is_want)
1048            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD);
1049        return stream_wantread(stream, is_want);
1050    }
1051    else
1052    {
1053        errno = EBADF;
1054        return -1;
1055    }
1056}
1057
1058
1059int
1060lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1061{
1062    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1063        return stream_wantwrite(stream, is_want);
1064    else
1065    {
1066        errno = EBADF;
1067        return -1;
1068    }
1069}
1070
1071
1072#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1073    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1074
1075
1076static void
1077stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1078{
1079    unsigned no_progress_count, no_progress_limit;
1080    enum stream_flags flags;
1081    uint64_t size;
1082
1083    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1084
1085    no_progress_count = 0;
1086    while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1087    {
1088        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1089        size  = stream->read_offset;
1090
1091        stream->stream_if->on_read(stream, stream->st_ctx);
1092
1093        if (no_progress_limit && size == stream->read_offset &&
1094                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1095        {
1096            ++no_progress_count;
1097            if (no_progress_count >= no_progress_limit)
1098            {
1099                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1100                    "progress) in user code reading from stream",
1101                    no_progress_count,
1102                    no_progress_count == 1 ? "" : "s");
1103                break;
1104            }
1105        }
1106        else
1107            no_progress_count = 0;
1108    }
1109}
1110
1111
1112static void
1113stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1114{
1115    unsigned no_progress_count, no_progress_limit;
1116    enum stream_flags flags;
1117
1118    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1119
1120    no_progress_count = 0;
1121    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1122    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1123                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1124           && stream_write_avail(stream))
1125    {
1126        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1127
1128        stream->stream_if->on_write(stream, stream->st_ctx);
1129
1130        if (no_progress_limit &&
1131            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1132        {
1133            ++no_progress_count;
1134            if (no_progress_count >= no_progress_limit)
1135            {
1136                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1137                    "progress) in user code writing to stream",
1138                    no_progress_count,
1139                    no_progress_count == 1 ? "" : "s");
1140                break;
1141            }
1142        }
1143        else
1144            no_progress_count = 0;
1145    }
1146}
1147
1148
1149static void
1150stream_dispatch_read_events_once (lsquic_stream_t *stream)
1151{
1152    if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1153    {
1154        stream->stream_if->on_read(stream, stream->st_ctx);
1155    }
1156}
1157
1158
1159static void
1160maybe_mark_as_blocked (lsquic_stream_t *stream)
1161{
1162    struct lsquic_conn_cap *cc;
1163
1164    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1165    {
1166        if (stream->blocked_off < stream->max_send_off)
1167        {
1168            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1169            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1170                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1171                                                            next_send_stream);
1172            stream->stream_flags |= STREAM_SEND_BLOCKED;
1173            LSQ_DEBUG("marked stream-blocked at stream offset "
1174                                            "%"PRIu64, stream->blocked_off);
1175        }
1176        else
1177            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1178                " has been, or is about to be, sent", stream->blocked_off);
1179    }
1180
1181    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1182        && (cc = &stream->conn_pub->conn_cap,
1183                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1184    {
1185        if (cc->cc_blocked < cc->cc_max)
1186        {
1187            cc->cc_blocked = cc->cc_max;
1188            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1189            LSQ_DEBUG("marked connection-blocked at connection offset "
1190                                                    "%"PRIu64, cc->cc_max);
1191        }
1192        else
1193            LSQ_DEBUG("stream has already been marked connection-blocked "
1194                "at offset %"PRIu64, cc->cc_blocked);
1195    }
1196}
1197
1198
1199void
1200lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1201{
1202    assert(stream->stream_flags & STREAM_WANT_READ);
1203
1204    if (stream->stream_flags & STREAM_RW_ONCE)
1205        stream_dispatch_read_events_once(stream);
1206    else
1207        stream_dispatch_read_events_loop(stream);
1208}
1209
1210
1211void
1212lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1213{
1214    int progress;
1215    uint64_t tosend_off;
1216    unsigned short n_buffered;
1217    enum stream_flags flags;
1218
1219    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1220    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1221    tosend_off = stream->tosend_off;
1222    n_buffered = stream->sm_n_buffered;
1223
1224    if (stream->stream_flags & STREAM_WANT_FLUSH)
1225        (void) stream_flush(stream);
1226
1227    if (stream->stream_flags & STREAM_RW_ONCE)
1228    {
1229        if ((stream->stream_flags & STREAM_WANT_WRITE)
1230            && stream_write_avail(stream))
1231        {
1232            stream->stream_if->on_write(stream, stream->st_ctx);
1233        }
1234    }
1235    else
1236        stream_dispatch_write_events_loop(stream);
1237
1238    /* Progress means either flags or offsets changed: */
1239    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1240                        stream->tosend_off == tosend_off &&
1241                            stream->sm_n_buffered == n_buffered);
1242
1243    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1244    {
1245        if (progress)
1246        {   /* Move the stream to the end of the list to ensure fairness. */
1247            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1248                                                            next_write_stream);
1249            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1250                                                            next_write_stream);
1251        }
1252    }
1253}
1254
1255
1256static size_t
1257inner_reader_empty_size (void *ctx)
1258{
1259    return 0;
1260}
1261
1262
1263static size_t
1264inner_reader_empty_read (void *ctx, void *buf, size_t count)
1265{
1266    return 0;
1267}
1268
1269
1270static int
1271stream_flush (lsquic_stream_t *stream)
1272{
1273    struct lsquic_reader empty_reader;
1274    ssize_t nw;
1275
1276    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1277    assert(stream->sm_n_buffered > 0 ||
1278        /* Flushing is also used to packetize standalone FIN: */
1279        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1280                                                    == STREAM_U_WRITE_DONE));
1281
1282    empty_reader.lsqr_size = inner_reader_empty_size;
1283    empty_reader.lsqr_read = inner_reader_empty_read;
1284    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1285    nw = stream_write_to_packets(stream, &empty_reader, 0);
1286
1287    if (nw >= 0)
1288    {
1289        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1290        return 0;
1291    }
1292    else
1293        return -1;
1294}
1295
1296
1297static int
1298stream_flush_nocheck (lsquic_stream_t *stream)
1299{
1300    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1301    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1302    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1303
1304    return stream_flush(stream);
1305}
1306
1307
1308int
1309lsquic_stream_flush (lsquic_stream_t *stream)
1310{
1311    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1312    {
1313        LSQ_DEBUG("cannot flush closed stream");
1314        errno = EBADF;
1315        return -1;
1316    }
1317
1318    if (0 == stream->sm_n_buffered)
1319    {
1320        LSQ_DEBUG("flushing 0 bytes: noop");
1321        return 0;
1322    }
1323
1324    return stream_flush_nocheck(stream);
1325}
1326
1327
1328/* The flush threshold is the maximum size of stream data that can be sent
1329 * in a full packet.
1330 */
1331static size_t
1332flush_threshold (const lsquic_stream_t *stream)
1333{
1334    enum packet_out_flags flags;
1335    enum lsquic_packno_bits bits;
1336    unsigned packet_header_sz, stream_header_sz;
1337    size_t threshold;
1338
1339    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1340    flags = bits << POBIT_SHIFT;
1341    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1342        flags |= PO_CONN_ID;
1343
1344    packet_header_sz = lsquic_po_header_length(flags);
1345    stream_header_sz = stream->conn_pub->lconn->cn_pf
1346            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1347
1348    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1349              - packet_header_sz - stream_header_sz;
1350    return threshold;
1351}
1352
1353
1354#define COMMON_WRITE_CHECKS() do {                                          \
1355    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1356                                                   == STREAM_USE_HEADERS)   \
1357    {                                                                       \
1358        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1359        errno = EILSEQ;                                                     \
1360        return -1;                                                          \
1361    }                                                                       \
1362    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1363    {                                                                       \
1364        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1365        errno = ECONNRESET;                                                 \
1366        return -1;                                                          \
1367    }                                                                       \
1368    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1369    {                                                                       \
1370        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1371                                                                "writing"); \
1372        errno = EBADF;                                                      \
1373        return -1;                                                          \
1374    }                                                                       \
1375} while (0)
1376
1377
1378struct frame_gen_ctx
1379{
1380    lsquic_stream_t      *fgc_stream;
1381    struct lsquic_reader *fgc_reader;
1382    /* We keep our own count of how many bytes were read from reader because
1383     * some readers are external.  The external caller does not have to rely
1384     * on our count, but it can.
1385     */
1386    size_t                fgc_nread_from_reader;
1387};
1388
1389
1390static size_t
1391frame_gen_size (void *ctx)
1392{
1393    struct frame_gen_ctx *fg_ctx = ctx;
1394    size_t available, remaining;
1395
1396    /* Make sure we are not writing past available size: */
1397    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1398    available = stream_write_avail(fg_ctx->fgc_stream);
1399    if (available < remaining)
1400        remaining = available;
1401
1402    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1403}
1404
1405
1406static int
1407frame_gen_fin (void *ctx)
1408{
1409    struct frame_gen_ctx *fg_ctx = ctx;
1410    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1411        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1412        /* Do not use frame_gen_size() as it may chop the real size: */
1413        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1414}
1415
1416
1417static void
1418incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1419{
1420    if (stream->stream_flags & STREAM_CONN_LIMITED)
1421    {
1422        stream->conn_pub->conn_cap.cc_sent += incr;
1423        assert(stream->conn_pub->conn_cap.cc_sent
1424                                    <= stream->conn_pub->conn_cap.cc_max);
1425    }
1426}
1427
1428
1429static size_t
1430frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1431{
1432    struct frame_gen_ctx *fg_ctx = ctx;
1433    unsigned char *p = begin_buf;
1434    unsigned char *const end = p + len;
1435    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1436    size_t n_written, available, n_to_write;
1437
1438    if (stream->sm_n_buffered > 0)
1439    {
1440        if (len <= stream->sm_n_buffered)
1441        {
1442            memcpy(p, stream->sm_buf, len);
1443            memmove(stream->sm_buf, stream->sm_buf + len,
1444                                                stream->sm_n_buffered - len);
1445            stream->sm_n_buffered -= len;
1446            stream->tosend_off += len;
1447            *fin = frame_gen_fin(fg_ctx);
1448            return len;
1449        }
1450        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1451        p += stream->sm_n_buffered;
1452        stream->sm_n_buffered = 0;
1453    }
1454
1455    available = stream_write_avail(fg_ctx->fgc_stream);
1456    n_to_write = end - p;
1457    if (n_to_write > available)
1458        n_to_write = available;
1459    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1460                                              n_to_write);
1461    p += n_written;
1462    fg_ctx->fgc_nread_from_reader += n_written;
1463    *fin = frame_gen_fin(fg_ctx);
1464    stream->tosend_off += p - (const unsigned char *) begin_buf;
1465    incr_conn_cap(stream, n_written);
1466    return p - (const unsigned char *) begin_buf;
1467}
1468
1469
1470static void
1471check_flush_threshold (lsquic_stream_t *stream)
1472{
1473    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1474                            stream->tosend_off >= stream->sm_flush_to)
1475    {
1476        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1477                                                    stream->sm_flush_to);
1478        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1479    }
1480}
1481
1482
1483static struct lsquic_packet_out *
1484get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1485                      const struct lsquic_stream *stream)
1486{
1487    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1488}
1489
1490
1491static struct lsquic_packet_out * (* const get_packet[])(
1492    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1493{
1494    lsquic_send_ctl_get_packet_for_stream,
1495    get_brand_new_packet,
1496};
1497
1498
1499static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1500stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1501{
1502    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1503    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1504    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1505    unsigned stream_header_sz, need_at_least, off;
1506    lsquic_packet_out_t *packet_out;
1507    int len, s, hsk;
1508
1509    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1510                                                        stream->tosend_off);
1511    need_at_least = stream_header_sz + (size > 0);
1512    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1513    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1514    if (!packet_out)
1515        return SWTP_STOP;
1516
1517    off = packet_out->po_data_sz;
1518    len = pf->pf_gen_stream_frame(
1519                packet_out->po_data + packet_out->po_data_sz,
1520                lsquic_packet_out_avail(packet_out), stream->id,
1521                stream->tosend_off,
1522                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1523    if (len < 0)
1524    {
1525        LSQ_ERROR("could not generate stream frame");
1526        return SWTP_ERROR;
1527    }
1528
1529    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1530                            packet_out->po_data + packet_out->po_data_sz, len);
1531    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1532    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1533    if (0 == lsquic_packet_out_avail(packet_out))
1534        packet_out->po_flags |= PO_STREAM_END;
1535    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1536                                     stream, QUIC_FRAME_STREAM, off, len);
1537    if (s != 0)
1538    {
1539        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1540        return SWTP_ERROR;
1541    }
1542
1543    check_flush_threshold(stream);
1544
1545    /* XXX: I don't like it that this is here */
1546    if (hsk && !(packet_out->po_flags & PO_HELLO))
1547    {
1548        lsquic_packet_out_zero_pad(packet_out);
1549        packet_out->po_flags |= PO_HELLO;
1550        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1551    }
1552
1553    return SWTP_OK;
1554}
1555
1556
1557static void
1558abort_connection (struct lsquic_stream *stream)
1559{
1560    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1561        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1562                                                next_service_stream);
1563    stream->stream_flags |= STREAM_ABORT_CONN;
1564    LSQ_WARN("connection will be aborted");
1565}
1566
1567
1568static ssize_t
1569stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1570                         size_t thresh)
1571{
1572    size_t size;
1573    ssize_t nw;
1574    struct frame_gen_ctx fg_ctx = {
1575        .fgc_stream = stream,
1576        .fgc_reader = reader,
1577        .fgc_nread_from_reader = 0,
1578    };
1579
1580    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1581           || frame_gen_fin(&fg_ctx))
1582    {
1583        switch (stream_write_to_packet(&fg_ctx, size))
1584        {
1585        case SWTP_OK:
1586            if (frame_gen_fin(&fg_ctx))
1587            {
1588                stream->stream_flags |= STREAM_FIN_SENT;
1589                goto end;
1590            }
1591            else
1592                break;
1593        case SWTP_STOP:
1594            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1595            goto end;
1596        default:
1597            abort_connection(stream);
1598            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1599            return -1;
1600        }
1601    }
1602
1603    if (thresh)
1604    {
1605        assert(size < thresh);
1606        assert(size >= stream->sm_n_buffered);
1607        size -= stream->sm_n_buffered;
1608        if (size > 0)
1609        {
1610            nw = save_to_buffer(stream, reader, size);
1611            if (nw < 0)
1612                return -1;
1613            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1614        }
1615    }
1616    else
1617    {
1618        /* We count flushed data towards both stream and connection limits,
1619         * so we should have been able to packetize all of it:
1620         */
1621        assert(0 == stream->sm_n_buffered);
1622        assert(size == 0);
1623    }
1624
1625    maybe_mark_as_blocked(stream);
1626
1627  end:
1628    return fg_ctx.fgc_nread_from_reader;
1629}
1630
1631
1632/* Perform an implicit flush when we hit connection limit while buffering
1633 * data.  This is to prevent a (theoretical) stall:
1634 *
1635 * Imagine a number of streams, all of which buffered some data.  The buffered
1636 * data is up to connection cap, which means no further writes are possible.
1637 * None of them flushes, which means that data is not sent and connection
1638 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1639 */
1640static void
1641maybe_flush_stream (struct lsquic_stream *stream)
1642{
1643    if (stream->sm_n_buffered > 0
1644          && (stream->stream_flags & STREAM_CONN_LIMITED)
1645            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1646        stream_flush_nocheck(stream);
1647}
1648
1649
1650static ssize_t
1651save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1652                                                                size_t len)
1653{
1654    size_t avail, n_written;
1655
1656    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1657
1658    if (!stream->sm_buf)
1659    {
1660        stream->sm_buf = malloc(SM_BUF_SIZE);
1661        if (!stream->sm_buf)
1662            return -1;
1663    }
1664
1665    avail = stream_write_avail(stream);
1666    if (avail < len)
1667        len = avail;
1668
1669    n_written = reader->lsqr_read(reader->lsqr_ctx,
1670                        stream->sm_buf + stream->sm_n_buffered, len);
1671    stream->sm_n_buffered += n_written;
1672    incr_conn_cap(stream, n_written);
1673    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1674              n_written, stream->sm_n_buffered);
1675    maybe_flush_stream(stream);
1676    return n_written;
1677}
1678
1679
1680static ssize_t
1681stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1682{
1683    size_t thresh, len;
1684
1685    thresh = flush_threshold(stream);
1686    len = reader->lsqr_size(reader->lsqr_ctx);
1687    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1688                                    stream->sm_n_buffered + len < thresh)
1689        return save_to_buffer(stream, reader, len);
1690    else
1691        return stream_write_to_packets(stream, reader, thresh);
1692}
1693
1694
1695ssize_t
1696lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1697{
1698    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1699    return lsquic_stream_writev(stream, &iov, 1);
1700}
1701
1702
1703struct inner_reader_iovec {
1704    const struct iovec       *iov;
1705    const struct iovec *const end;
1706    unsigned                  cur_iovec_off;
1707};
1708
1709
1710static size_t
1711inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1712{
1713    struct inner_reader_iovec *const iro = ctx;
1714    unsigned char *p = buf;
1715    unsigned char *const end = p + count;
1716    unsigned n_tocopy;
1717
1718    while (iro->iov < iro->end && p < end)
1719    {
1720        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1721        if (n_tocopy > (unsigned) (end - p))
1722            n_tocopy = end - p;
1723        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1724                                                                    n_tocopy);
1725        p += n_tocopy;
1726        iro->cur_iovec_off += n_tocopy;
1727        if (iro->iov->iov_len == iro->cur_iovec_off)
1728        {
1729            ++iro->iov;
1730            iro->cur_iovec_off = 0;
1731        }
1732    }
1733
1734    return p + count - end;
1735}
1736
1737
1738static size_t
1739inner_reader_iovec_size (void *ctx)
1740{
1741    struct inner_reader_iovec *const iro = ctx;
1742    const struct iovec *iov;
1743    size_t size;
1744
1745    size = 0;
1746    for (iov = iro->iov; iov < iro->end; ++iov)
1747        size += iov->iov_len;
1748
1749    return size - iro->cur_iovec_off;
1750}
1751
1752
1753ssize_t
1754lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1755                                                                    int iovcnt)
1756{
1757    COMMON_WRITE_CHECKS();
1758    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1759
1760    struct inner_reader_iovec iro = {
1761        .iov = iov,
1762        .end = iov + iovcnt,
1763        .cur_iovec_off = 0,
1764    };
1765    struct lsquic_reader reader = {
1766        .lsqr_read = inner_reader_iovec_read,
1767        .lsqr_size = inner_reader_iovec_size,
1768        .lsqr_ctx  = &iro,
1769    };
1770
1771    return stream_write(stream, &reader);
1772}
1773
1774
1775ssize_t
1776lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1777{
1778    COMMON_WRITE_CHECKS();
1779    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1780    return stream_write(stream, reader);
1781}
1782
1783
1784int
1785lsquic_stream_send_headers (lsquic_stream_t *stream,
1786                            const lsquic_http_headers_t *headers, int eos)
1787{
1788    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1789                                                     STREAM_U_WRITE_DONE))
1790                == STREAM_USE_HEADERS)
1791    {
1792        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1793                    stream->id, headers, eos, lsquic_stream_priority(stream));
1794        if (0 == s)
1795        {
1796            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1797            stream->stream_flags |= STREAM_HEADERS_SENT;
1798            if (eos)
1799                stream->stream_flags |= STREAM_FIN_SENT;
1800            LSQ_INFO("sent headers for stream %u", stream->id);
1801        }
1802        else
1803            LSQ_WARN("could not send headers: %s", strerror(errno));
1804        return s;
1805    }
1806    else
1807    {
1808        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1809        errno = EBADMSG;
1810        return -1;
1811    }
1812}
1813
1814
1815void
1816lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1817{
1818    if (offset > stream->max_send_off)
1819    {
1820        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1821        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1822            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1823        stream->max_send_off = offset;
1824    }
1825    else
1826        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1827            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1828            stream->max_send_off);
1829}
1830
1831
1832/* This function is used to update offsets after handshake completes and we
1833 * learn of peer's limits from the handshake values.
1834 */
1835int
1836lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1837{
1838    LSQ_DEBUG("setting max_send_off to %u", offset);
1839    if (offset > stream->max_send_off)
1840    {
1841        lsquic_stream_window_update(stream, offset);
1842        return 0;
1843    }
1844    else if (offset < stream->tosend_off)
1845    {
1846        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1847            "already sent on this stream (%"PRIu64" bytes)", offset,
1848            stream->tosend_off);
1849        return -1;
1850    }
1851    else
1852    {
1853        stream->max_send_off = offset;
1854        return 0;
1855    }
1856}
1857
1858
1859void
1860lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1861{
1862    lsquic_stream_reset_ext(stream, error_code, 1);
1863}
1864
1865
1866void
1867lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1868                         int do_close)
1869{
1870    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1871    {
1872        LSQ_INFO("reset already sent");
1873        return;
1874    }
1875
1876    SM_HISTORY_APPEND(stream, SHE_RESET);
1877
1878    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1879    stream->error_code = error_code;
1880
1881    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1882        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1883                                                        next_send_stream);
1884    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1885    stream->stream_flags |= STREAM_SEND_RST;
1886
1887    drop_buffered_data(stream);
1888    maybe_elide_stream_frames(stream);
1889    maybe_schedule_call_on_close(stream);
1890
1891    if (do_close)
1892        lsquic_stream_close(stream);
1893    else
1894        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT);
1895}
1896
1897
1898unsigned
1899lsquic_stream_id (const lsquic_stream_t *stream)
1900{
1901    return stream->id;
1902}
1903
1904
1905struct lsquic_conn *
1906lsquic_stream_conn (const lsquic_stream_t *stream)
1907{
1908    return stream->conn_pub->lconn;
1909}
1910
1911
1912int
1913lsquic_stream_close (lsquic_stream_t *stream)
1914{
1915    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1916    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1917    if (lsquic_stream_is_closed(stream))
1918    {
1919        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1920        errno = EBADF;
1921        return -1;
1922    }
1923    stream_shutdown_write(stream);
1924    stream_shutdown_read(stream);
1925    maybe_schedule_call_on_close(stream);
1926    maybe_finish_stream(stream);
1927    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE);
1928    return 0;
1929}
1930
1931
1932#ifndef NDEBUG
1933#if __GNUC__
1934__attribute__((weak))
1935#endif
1936#endif
1937void
1938lsquic_stream_acked (lsquic_stream_t *stream)
1939{
1940    assert(stream->n_unacked);
1941    --stream->n_unacked;
1942    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
1943    if (0 == stream->n_unacked)
1944        maybe_finish_stream(stream);
1945}
1946
1947
1948void
1949lsquic_stream_push_req (lsquic_stream_t *stream,
1950                        struct uncompressed_headers *push_req)
1951{
1952    assert(!stream->push_req);
1953    stream->push_req = push_req;
1954    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
1955}
1956
1957
1958int
1959lsquic_stream_is_pushed (const lsquic_stream_t *stream)
1960{
1961    return 1 & ~stream->id;
1962}
1963
1964
1965int
1966lsquic_stream_push_info (const lsquic_stream_t *stream,
1967        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
1968{
1969    if (lsquic_stream_is_pushed(stream))
1970    {
1971        assert(stream->push_req);
1972        *ref_stream_id = stream->push_req->uh_stream_id;
1973        *headers       = stream->push_req->uh_headers;
1974        *headers_sz    = stream->push_req->uh_size;
1975        return 0;
1976    }
1977    else
1978        return -1;
1979}
1980
1981
1982int
1983lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
1984{
1985    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
1986    {
1987        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
1988        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
1989        stream->stream_flags |= STREAM_HAVE_UH;
1990        if (uh->uh_flags & UH_FIN)
1991            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
1992        stream->uh = uh;
1993        if (uh->uh_oth_stream_id == 0)
1994        {
1995            if (uh->uh_weight)
1996                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
1997        }
1998        else
1999            LSQ_NOTICE("don't know how to depend on stream %u",
2000                                                        uh->uh_oth_stream_id);
2001        return 0;
2002    }
2003    else
2004    {
2005        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2006        return -1;
2007    }
2008}
2009
2010
2011unsigned
2012lsquic_stream_priority (const lsquic_stream_t *stream)
2013{
2014    return 256 - stream->sm_priority;
2015}
2016
2017
2018int
2019lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2020{
2021    /* The user should never get a reference to the special streams,
2022     * but let's check just in case:
2023     */
2024    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2025        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2026                                LSQUIC_STREAM_HEADERS == stream->id))
2027        return -1;
2028    if (priority < 1 || priority > 256)
2029        return -1;
2030    stream->sm_priority = 256 - priority;
2031    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2032    LSQ_DEBUG("set priority to %u", priority);
2033    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2034    return 0;
2035}
2036
2037
2038int
2039lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2040{
2041    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2042    {
2043        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2044                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2045        {
2046            /* We need to send headers only if we are a) using HEADERS stream
2047             * and b) we already sent initial headers.  If initial headers
2048             * have not been sent yet, stream priority will be sent in the
2049             * HEADERS frame.
2050             */
2051            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2052                                                    stream->id, 0, 0, priority);
2053        }
2054        else
2055            return 0;
2056    }
2057    else
2058        return -1;
2059}
2060
2061
2062lsquic_stream_ctx_t *
2063lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2064{
2065    return stream->st_ctx;
2066}
2067
2068
2069int
2070lsquic_stream_refuse_push (lsquic_stream_t *stream)
2071{
2072    if (lsquic_stream_is_pushed(stream) &&
2073                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2074    {
2075        LSQ_DEBUG("refusing pushed stream: send reset");
2076        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2077        return 0;
2078    }
2079    else
2080        return -1;
2081}
2082
2083
2084size_t
2085lsquic_stream_mem_used (const struct lsquic_stream *stream)
2086{
2087    size_t size;
2088
2089    size = sizeof(stream);
2090    if (stream->sm_buf)
2091        size += SM_BUF_SIZE;
2092    if (stream->data_in)
2093        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2094
2095    return size;
2096}
2097
2098
2099lsquic_cid_t
2100lsquic_stream_cid (const struct lsquic_stream *stream)
2101{
2102    return LSQUIC_LOG_CONN_ID;
2103}
2104