lsquic_stream.c revision be4cfad0
1/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_frame_reader.h"
44#include "lsquic_conn.h"
45#include "lsquic_data_in_if.h"
46#include "lsquic_parse.h"
47#include "lsquic_packet_out.h"
48#include "lsquic_engine_public.h"
49#include "lsquic_senhist.h"
50#include "lsquic_pacer.h"
51#include "lsquic_cubic.h"
52#include "lsquic_send_ctl.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
153#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
154        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
155            LSQ_DEBUG("history: [%.*s]",                                    \
156                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
157                (stream)->sm_hist_buf);                                     \
158    } while (0)
159#else
160#   define SM_HISTORY_APPEND(stream, event)
161#   define SM_HISTORY_DUMP_REMAINING(stream)
162#endif
163
164
165static int
166stream_inside_callback (const lsquic_stream_t *stream)
167{
168    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
169}
170
171
172static void
173maybe_conn_to_tickable (lsquic_stream_t *stream)
174{
175    if (!stream_inside_callback(stream))
176        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
177                                           stream->conn_pub->lconn);
178}
179
180
181/* Here, "readable" means that the user is able to read from the stream. */
182static void
183maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
184{
185    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
186    {
187        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
188                                           stream->conn_pub->lconn);
189    }
190}
191
192
193/* Here, "writeable" means that data can be put into packets to be
194 * scheduled to be sent out.
195 *
196 * If `check_can_send' is false, it means that we do not need to check
197 * whether packets can be sent.  This check was already performed when
198 * we packetized stream data.
199 */
200static void
201maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
202                                                    int check_can_send)
203{
204    if (!stream_inside_callback(stream) &&
205            (!check_can_send
206             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
207          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
208    {
209        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
210                                           stream->conn_pub->lconn);
211    }
212}
213
214
215static int
216stream_stalled (const lsquic_stream_t *stream)
217{
218    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
219           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
220                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
221}
222
223
224/* TODO: The logic to figure out whether the stream is connection limited
225 * should be taken out of the constructor.  The caller should specify this
226 * via one of enum stream_ctor_flags.
227 */
228lsquic_stream_t *
229lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
230                       const struct lsquic_stream_if *stream_if,
231                       void *stream_if_ctx, unsigned initial_window,
232                       unsigned initial_send_off,
233                       enum stream_ctor_flags ctor_flags)
234{
235    lsquic_cfcw_t *cfcw;
236    lsquic_stream_t *stream;
237
238    stream = calloc(1, sizeof(*stream));
239    if (!stream)
240        return NULL;
241
242    stream->stream_if = stream_if;
243    stream->id        = id;
244    stream->conn_pub  = conn_pub;
245    stream->sm_onnew_arg = stream_if_ctx;
246    if (!initial_window)
247        initial_window = 16 * 1024;
248    if (LSQUIC_STREAM_HANDSHAKE == id ||
249        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
250        cfcw = NULL;
251    else
252    {
253        cfcw = &conn_pub->cfcw;
254        stream->stream_flags |= STREAM_CONN_LIMITED;
255        if (conn_pub->hs)
256            stream->stream_flags |= STREAM_USE_HEADERS;
257        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
258    }
259    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
260    if (!initial_send_off)
261        initial_send_off = 16 * 1024;
262    stream->max_send_off = initial_send_off;
263    if (ctor_flags & SCF_USE_DI_HASH)
264        stream->data_in = data_in_hash_new(conn_pub, id, 0);
265    else
266        stream->data_in = data_in_nocopy_new(conn_pub, id);
267    LSQ_DEBUG("created stream %u @%p", id, stream);
268    SM_HISTORY_APPEND(stream, SHE_CREATED);
269    if (ctor_flags & SCF_DI_AUTOSWITCH)
270        stream->stream_flags |= STREAM_AUTOSWITCH;
271    if (ctor_flags & SCF_CALL_ON_NEW)
272        lsquic_stream_call_on_new(stream);
273    if (ctor_flags & SCF_DISP_RW_ONCE)
274        stream->stream_flags |= STREAM_RW_ONCE;
275    if (ctor_flags & SCF_ALLOW_OVERLAP)
276        stream->stream_flags |= STREAM_ALLOW_OVERLAP;
277    return stream;
278}
279
280
281void
282lsquic_stream_call_on_new (lsquic_stream_t *stream)
283{
284    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
285    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
286    {
287        LSQ_DEBUG("calling on_new_stream");
288        SM_HISTORY_APPEND(stream, SHE_ONNEW);
289        stream->stream_flags |= STREAM_ONNEW_DONE;
290        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
291                                                          stream);
292    }
293}
294
295
296static void
297decr_conn_cap (struct lsquic_stream *stream, size_t incr)
298{
299    if (stream->stream_flags & STREAM_CONN_LIMITED)
300    {
301        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
302        stream->conn_pub->conn_cap.cc_sent -= incr;
303    }
304}
305
306
307static void
308drop_buffered_data (struct lsquic_stream *stream)
309{
310    decr_conn_cap(stream, stream->sm_n_buffered);
311    stream->sm_n_buffered = 0;
312    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
313        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
314}
315
316
317void
318lsquic_stream_destroy (lsquic_stream_t *stream)
319{
320    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
321    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
322                                                            STREAM_ONNEW_DONE)
323    {
324        stream->stream_flags |= STREAM_ONCLOSE_DONE;
325        stream->stream_if->on_close(stream, stream->st_ctx);
326    }
327    if (stream->stream_flags & STREAM_SENDING_FLAGS)
328        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
329    if (stream->stream_flags & STREAM_WANT_READ)
330        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
331    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
332        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
333    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
334        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
335    drop_buffered_data(stream);
336    lsquic_sfcw_consume_rem(&stream->fc);
337    drop_frames_in(stream);
338    free(stream->push_req);
339    free(stream->uh);
340    free(stream->sm_buf);
341    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
342    SM_HISTORY_DUMP_REMAINING(stream);
343    free(stream);
344}
345
346
347static int
348stream_is_finished (const lsquic_stream_t *stream)
349{
350    return lsquic_stream_is_closed(stream)
351           /* n_unacked checks that no outgoing packets that reference this
352            * stream are outstanding:
353            */
354        && 0 == stream->n_unacked
355           /* This checks that no packets that reference this stream will
356            * become outstanding:
357            */
358        && 0 == (stream->stream_flags & STREAM_SEND_RST)
359        && ((stream->stream_flags & STREAM_FORCE_FINISH)
360          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
361           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
362}
363
364
365static void
366maybe_finish_stream (lsquic_stream_t *stream)
367{
368    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
369                                                    stream_is_finished(stream))
370    {
371        LSQ_DEBUG("stream %u is now finished", stream->id);
372        SM_HISTORY_APPEND(stream, SHE_FINISHED);
373        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
374            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
375                                                    next_service_stream);
376        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
377    }
378}
379
380
381static void
382maybe_schedule_call_on_close (lsquic_stream_t *stream)
383{
384    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
385                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
386            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
387    {
388        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
389            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
390                                                    next_service_stream);
391        stream->stream_flags |= STREAM_CALL_ONCLOSE;
392        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
393        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
394    }
395}
396
397
398void
399lsquic_stream_call_on_close (lsquic_stream_t *stream)
400{
401    assert(stream->stream_flags & STREAM_ONNEW_DONE);
402    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
403    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
404        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
405                                                    next_service_stream);
406    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
407    {
408        LSQ_DEBUG("calling on_close for stream %u", stream->id);
409        stream->stream_flags |= STREAM_ONCLOSE_DONE;
410        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
411        stream->stream_if->on_close(stream, stream->st_ctx);
412    }
413    else
414        assert(0);
415}
416
417
418int
419lsquic_stream_readable (const lsquic_stream_t *stream)
420{
421    /* A stream is readable if one of the following is true: */
422    return
423        /* - It is already finished: in that case, lsquic_stream_read() will
424         *   return 0.
425         */
426            (stream->stream_flags & STREAM_FIN_REACHED)
427        /* - The stream is reset, by either side.  In this case,
428         *   lsquic_stream_read() will return -1 (we want the user to be
429         *   able to collect the error).
430         */
431        ||  (stream->stream_flags & STREAM_RST_FLAGS)
432        /* - Either we are not in HTTP mode or the HTTP headers have been
433         *   received and the headers or data from the stream can be read.
434         */
435        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
436                                                        == STREAM_USE_HEADERS)
437            && (stream->uh != NULL
438                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
439                                                        stream->read_offset)))
440    ;
441}
442
443
444size_t
445lsquic_stream_write_avail (const struct lsquic_stream *stream)
446{
447    uint64_t stream_avail, conn_avail;
448
449    stream_avail = stream->max_send_off - stream->tosend_off
450                                                - stream->sm_n_buffered;
451    if (stream->stream_flags & STREAM_CONN_LIMITED)
452    {
453        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
454        if (conn_avail < stream_avail)
455            return conn_avail;
456    }
457
458    return stream_avail;
459}
460
461
462int
463lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
464{
465    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
466                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
467    {
468        return -1;
469    }
470    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
471    {
472        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
473            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
474                                                    next_send_stream);
475        stream->stream_flags |= STREAM_SEND_WUF;
476    }
477    return 0;
478}
479
480
481int
482lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
483{
484    uint64_t max_off;
485    int got_next_offset;
486    enum ins_frame ins_frame;
487
488    assert(frame->packet_in);
489
490    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
491    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
492        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
493
494    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
495                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
496    {
497        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
498        lsquic_malo_put(frame);
499        return -1;
500    }
501
502    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
503  insert_frame:
504    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
505    if (INS_FRAME_OK == ins_frame)
506    {
507        /* Update maximum offset in the flow controller and check for flow
508         * control violation:
509         */
510        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
511        if (0 != lsquic_stream_update_sfcw(stream, max_off))
512            return -1;
513        if (frame->data_frame.df_fin)
514        {
515            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
516            stream->stream_flags |= STREAM_FIN_RECVD;
517            maybe_finish_stream(stream);
518        }
519        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
520                (stream->data_in->di_flags & DI_SWITCH_IMPL))
521        {
522            stream->data_in = stream->data_in->di_if->di_switch_impl(
523                                        stream->data_in, stream->read_offset);
524            if (!stream->data_in)
525            {
526                stream->data_in = data_in_error_new();
527                return -1;
528            }
529        }
530        if (got_next_offset)
531            /* Checking the offset saves di_get_frame() call */
532            maybe_conn_to_tickable_if_readable(stream);
533        return 0;
534    }
535    else if (INS_FRAME_DUP == ins_frame)
536    {
537        return 0;
538    }
539    else if (INS_FRAME_OVERLAP == ins_frame)
540    {
541        if (stream->stream_flags & STREAM_ALLOW_OVERLAP)
542        {
543            LSQ_DEBUG("overlap: switching DATA IN implementation");
544            stream->data_in = stream->data_in->di_if->di_switch_impl(
545                                        stream->data_in, stream->read_offset);
546            if (stream->data_in)
547                goto insert_frame;
548            stream->data_in = data_in_error_new();
549        }
550        else
551            LSQ_DEBUG("overlap not supported");
552        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
553        lsquic_malo_put(frame);
554        return -1;
555    }
556    else
557    {
558        assert(INS_FRAME_ERR == ins_frame);
559        return -1;
560    }
561}
562
563
564static void
565drop_frames_in (lsquic_stream_t *stream)
566{
567    if (stream->data_in)
568    {
569        stream->data_in->di_if->di_destroy(stream->data_in);
570        /* To avoid checking whether `data_in` is set, just set to the error
571         * data-in stream.  It does the right thing after incoming data is
572         * dropped.
573         */
574        stream->data_in = data_in_error_new();
575    }
576}
577
578
579static void
580maybe_elide_stream_frames (struct lsquic_stream *stream)
581{
582    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
583    {
584        if (stream->n_unacked)
585            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
586                                                stream->id);
587        stream->stream_flags |= STREAM_FRAMES_ELIDED;
588    }
589}
590
591
592int
593lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
594                      uint32_t error_code)
595{
596
597    if (stream->stream_flags & STREAM_RST_RECVD)
598    {
599        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
600        return 0;
601    }
602
603    SM_HISTORY_APPEND(stream, SHE_RST_IN);
604    /* This flag must always be set, even if we are "ignoring" it: it is
605     * used by elision code.
606     */
607    stream->stream_flags |= STREAM_RST_RECVD;
608
609    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
610    {
611        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
612            "smaller than that of byte following the last byte we have seen: "
613            "0x%"PRIX64, stream->id, offset,
614            lsquic_sfcw_get_max_recv_off(&stream->fc));
615        return -1;
616    }
617
618    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
619    {
620        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
621            " violates flow control", stream->id, offset);
622        return -1;
623    }
624
625    /* Let user collect error: */
626    maybe_conn_to_tickable_if_readable(stream);
627
628    lsquic_sfcw_consume_rem(&stream->fc);
629    drop_frames_in(stream);
630    drop_buffered_data(stream);
631    maybe_elide_stream_frames(stream);
632
633    if (!(stream->stream_flags &
634                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
635        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
636
637    stream->stream_flags |= STREAM_RST_RECVD;
638
639    maybe_finish_stream(stream);
640    maybe_schedule_call_on_close(stream);
641
642    return 0;
643}
644
645
646uint64_t
647lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
648{
649    assert(stream->stream_flags & STREAM_SEND_WUF);
650    stream->stream_flags &= ~STREAM_SEND_WUF;
651    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
652        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
653    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
654}
655
656
657void
658lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
659{
660    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
661    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
662    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
663    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
664        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
665}
666
667
668void
669lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
670{
671    assert(stream->stream_flags & STREAM_SEND_RST);
672    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
673    stream->stream_flags &= ~STREAM_SEND_RST;
674    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
675        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
676    stream->stream_flags |= STREAM_RST_SENT;
677    maybe_finish_stream(stream);
678}
679
680
681static size_t
682read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
683{
684    struct uncompressed_headers *uh = stream->uh;
685    size_t n_avail = uh->uh_size - uh->uh_off;
686    if (n_avail < len)
687        len = n_avail;
688    memcpy(dst, uh->uh_headers + uh->uh_off, len);
689    uh->uh_off += len;
690    if (uh->uh_off == uh->uh_size)
691    {
692        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
693        free(uh);
694        stream->uh = NULL;
695        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
696        {
697            stream->stream_flags |= STREAM_FIN_REACHED;
698            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
699        }
700    }
701    return len;
702}
703
704
705/* This function returns 0 when EOF is reached.
706 */
707ssize_t
708lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
709                     int iovcnt)
710{
711    size_t total_nread, nread;
712    int processed_frames, read_unc_headers, iovidx;
713    unsigned char *p, *end;
714
715    SM_HISTORY_APPEND(stream, SHE_USER_READ);
716
717#define NEXT_IOV() do {                                             \
718    ++iovidx;                                                       \
719    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
720        ++iovidx;                                                   \
721    if (iovidx < iovcnt)                                            \
722    {                                                               \
723        p = iov[iovidx].iov_base;                                   \
724        end = p + iov[iovidx].iov_len;                              \
725    }                                                               \
726    else                                                            \
727        p = end = NULL;                                             \
728} while (0)
729
730#define AVAIL() (end - p)
731
732    if (stream->stream_flags & STREAM_RST_FLAGS)
733    {
734        errno = ECONNRESET;
735        return -1;
736    }
737    if (stream->stream_flags & STREAM_U_READ_DONE)
738    {
739        errno = EBADF;
740        return -1;
741    }
742    if (stream->stream_flags & STREAM_FIN_REACHED)
743        return 0;
744
745    total_nread = 0;
746    processed_frames = 0;
747
748    iovidx = -1;
749    NEXT_IOV();
750
751    if (stream->uh && AVAIL())
752    {
753        read_unc_headers = 1;
754        do
755        {
756            nread = read_uh(stream, p, AVAIL());
757            p += nread;
758            total_nread += nread;
759            if (p == end)
760                NEXT_IOV();
761        }
762        while (stream->uh && AVAIL());
763    }
764    else
765        read_unc_headers = 0;
766
767    struct data_frame *data_frame;
768    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
769    {
770        ++processed_frames;
771        size_t navail = data_frame->df_size - data_frame->df_read_off;
772        size_t ntowrite = AVAIL();
773        if (navail < ntowrite)
774            ntowrite = navail;
775        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
776        p += ntowrite;
777        data_frame->df_read_off += ntowrite;
778        stream->read_offset += ntowrite;
779        total_nread += ntowrite;
780        if (data_frame->df_read_off == data_frame->df_size)
781        {
782            const int fin = data_frame->df_fin;
783            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
784            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
785                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
786            {
787                stream->data_in = stream->data_in->di_if->di_switch_impl(
788                                            stream->data_in, stream->read_offset);
789                if (!stream->data_in)
790                {
791                    stream->data_in = data_in_error_new();
792                    return -1;
793                }
794            }
795            if (fin)
796            {
797                stream->stream_flags |= STREAM_FIN_REACHED;
798                break;
799            }
800        }
801        if (p == end)
802            NEXT_IOV();
803    }
804
805    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
806                                        total_nread, stream->read_offset);
807
808    if (processed_frames)
809    {
810        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
811        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
812        {
813            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
814                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
815            stream->stream_flags |= STREAM_SEND_WUF;
816            maybe_conn_to_tickable_if_writeable(stream, 1);
817        }
818    }
819
820    if (processed_frames || read_unc_headers)
821    {
822        return total_nread;
823    }
824    else
825    {
826        assert(0 == total_nread);
827        errno = EWOULDBLOCK;
828        return -1;
829    }
830}
831
832
833ssize_t
834lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
835{
836    struct iovec iov = { .iov_base = buf, .iov_len = len, };
837    return lsquic_stream_readv(stream, &iov, 1);
838}
839
840
841static void
842stream_shutdown_read (lsquic_stream_t *stream)
843{
844    if (!(stream->stream_flags & STREAM_U_READ_DONE))
845    {
846        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
847        stream->stream_flags |= STREAM_U_READ_DONE;
848        stream_wantread(stream, 0);
849        maybe_finish_stream(stream);
850    }
851}
852
853
854static void
855stream_shutdown_write (lsquic_stream_t *stream)
856{
857    if (stream->stream_flags & STREAM_U_WRITE_DONE)
858        return;
859
860    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
861    stream->stream_flags |= STREAM_U_WRITE_DONE;
862    stream_wantwrite(stream, 0);
863
864    /* Don't bother to check whether there is anything else to write if
865     * the flags indicate that nothing else should be written.
866     */
867    if (!(stream->stream_flags &
868                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
869    {
870        if (stream->sm_n_buffered == 0)
871        {
872            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
873                                                 stream))
874            {
875                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
876                stream->stream_flags |= STREAM_FIN_SENT;
877            }
878            else
879            {
880                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
881                          "flag in it");
882                (void) stream_flush_nocheck(stream);
883            }
884        }
885        else
886            (void) stream_flush_nocheck(stream);
887    }
888}
889
890
891int
892lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
893{
894    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
895    if (lsquic_stream_is_closed(stream))
896    {
897        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
898        errno = EBADF;
899        return -1;
900    }
901    /* 0: read, 1: write: 2: read and write
902     */
903    if (how < 0 || how > 2)
904    {
905        errno = EINVAL;
906        return -1;
907    }
908
909    if (how)
910        stream_shutdown_write(stream);
911    if (how != 1)
912        stream_shutdown_read(stream);
913
914    maybe_finish_stream(stream);
915    maybe_schedule_call_on_close(stream);
916    if (how)
917        maybe_conn_to_tickable_if_writeable(stream, 1);
918
919    return 0;
920}
921
922
923void
924lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
925{
926    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
927    if (LSQUIC_STREAM_HANDSHAKE == stream->id
928        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
929                                LSQUIC_STREAM_HEADERS == stream->id))
930    {
931        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
932        stream->stream_flags |= STREAM_FORCE_FINISH;
933        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
934    }
935    maybe_finish_stream(stream);
936    maybe_schedule_call_on_close(stream);
937}
938
939
940static void
941fake_reset_unused_stream (lsquic_stream_t *stream)
942{
943    stream->stream_flags |=
944        STREAM_RST_RECVD    /* User will pick this up on read or write */
945      | STREAM_RST_SENT     /* Don't send anything else on this stream */
946    ;
947
948    /* Cancel all writes to the network scheduled for this stream: */
949    if (stream->stream_flags & STREAM_SENDING_FLAGS)
950        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
951                                                next_send_stream);
952    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
953
954    LSQ_DEBUG("fake-reset stream %u%s",
955                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
956    maybe_finish_stream(stream);
957    maybe_schedule_call_on_close(stream);
958}
959
960
961/* This function should only be called for locally-initiated streams whose ID
962 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
963 * frame sent by peer but we have not yet received it and created a stream.
964 * In this situation, we mark the stream as reset, so that user's on_read or
965 * on_write event callback picks up the error.  That, in turn, should result
966 * in stream being closed.
967 *
968 * If we have received any data frames on this stream, this probably indicates
969 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
970 * lower than this.  However, we still try to handle it gracefully and peform
971 * a shutdown, as if the stream was not reset.
972 */
973void
974lsquic_stream_received_goaway (lsquic_stream_t *stream)
975{
976    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
977    if (0 == stream->read_offset &&
978                            stream->data_in->di_if->di_empty(stream->data_in))
979        fake_reset_unused_stream(stream);       /* Normal condition */
980    else
981    {   /* This is odd, let's handle it the best we can: */
982        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
983        lsquic_stream_shutdown_internal(stream);
984    }
985}
986
987
988uint64_t
989lsquic_stream_read_offset (const lsquic_stream_t *stream)
990{
991    return stream->read_offset;
992}
993
994
995static int
996stream_wantread (lsquic_stream_t *stream, int is_want)
997{
998    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
999    const int new_val = !!is_want;
1000    if (old_val != new_val)
1001    {
1002        if (new_val)
1003        {
1004            if (!old_val)
1005                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1006                                                            next_read_stream);
1007            stream->stream_flags |= STREAM_WANT_READ;
1008        }
1009        else
1010        {
1011            stream->stream_flags &= ~STREAM_WANT_READ;
1012            if (old_val)
1013                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1014                                                            next_read_stream);
1015        }
1016    }
1017    return old_val;
1018}
1019
1020
1021static void
1022maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1023{
1024    assert(STREAM_WRITE_Q_FLAGS & flag);
1025    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1026        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1027                                                        next_write_stream);
1028    stream->stream_flags |= flag;
1029}
1030
1031
1032static void
1033maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1034{
1035    assert(STREAM_WRITE_Q_FLAGS & flag);
1036    if (stream->stream_flags & flag)
1037    {
1038        stream->stream_flags &= ~flag;
1039        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1040            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1041                                                        next_write_stream);
1042    }
1043}
1044
1045
1046static int
1047stream_wantwrite (lsquic_stream_t *stream, int is_want)
1048{
1049    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1050    const int new_val = !!is_want;
1051    if (old_val != new_val)
1052    {
1053        if (new_val)
1054            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1055        else
1056            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1057    }
1058    return old_val;
1059}
1060
1061
1062int
1063lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1064{
1065    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1066    {
1067        if (is_want)
1068            maybe_conn_to_tickable_if_readable(stream);
1069        return stream_wantread(stream, is_want);
1070    }
1071    else
1072    {
1073        errno = EBADF;
1074        return -1;
1075    }
1076}
1077
1078
1079int
1080lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1081{
1082    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1083    {
1084        if (is_want)
1085            maybe_conn_to_tickable_if_writeable(stream, 1);
1086        return stream_wantwrite(stream, is_want);
1087    }
1088    else
1089    {
1090        errno = EBADF;
1091        return -1;
1092    }
1093}
1094
1095
1096#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1097    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1098
1099
1100static void
1101stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1102{
1103    unsigned no_progress_count, no_progress_limit;
1104    enum stream_flags flags;
1105    uint64_t size;
1106
1107    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1108
1109    no_progress_count = 0;
1110    while ((stream->stream_flags & STREAM_WANT_READ)
1111                                            && lsquic_stream_readable(stream))
1112    {
1113        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1114        size  = stream->read_offset;
1115
1116        stream->stream_if->on_read(stream, stream->st_ctx);
1117
1118        if (no_progress_limit && size == stream->read_offset &&
1119                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1120        {
1121            ++no_progress_count;
1122            if (no_progress_count >= no_progress_limit)
1123            {
1124                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1125                    "progress) in user code reading from stream",
1126                    no_progress_count,
1127                    no_progress_count == 1 ? "" : "s");
1128                break;
1129            }
1130        }
1131        else
1132            no_progress_count = 0;
1133    }
1134}
1135
1136
1137static void
1138stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1139{
1140    unsigned no_progress_count, no_progress_limit;
1141    enum stream_flags flags;
1142
1143    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1144
1145    no_progress_count = 0;
1146    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1147    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1148                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1149           && lsquic_stream_write_avail(stream))
1150    {
1151        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1152
1153        stream->stream_if->on_write(stream, stream->st_ctx);
1154
1155        if (no_progress_limit &&
1156            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1157        {
1158            ++no_progress_count;
1159            if (no_progress_count >= no_progress_limit)
1160            {
1161                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1162                    "progress) in user code writing to stream",
1163                    no_progress_count,
1164                    no_progress_count == 1 ? "" : "s");
1165                break;
1166            }
1167        }
1168        else
1169            no_progress_count = 0;
1170    }
1171}
1172
1173
1174static void
1175stream_dispatch_read_events_once (lsquic_stream_t *stream)
1176{
1177    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1178    {
1179        stream->stream_if->on_read(stream, stream->st_ctx);
1180    }
1181}
1182
1183
1184static void
1185maybe_mark_as_blocked (lsquic_stream_t *stream)
1186{
1187    struct lsquic_conn_cap *cc;
1188
1189    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1190    {
1191        if (stream->blocked_off < stream->max_send_off)
1192        {
1193            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1194            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1195                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1196                                                            next_send_stream);
1197            stream->stream_flags |= STREAM_SEND_BLOCKED;
1198            LSQ_DEBUG("marked stream-blocked at stream offset "
1199                                            "%"PRIu64, stream->blocked_off);
1200        }
1201        else
1202            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1203                " has been, or is about to be, sent", stream->blocked_off);
1204    }
1205
1206    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1207        && (cc = &stream->conn_pub->conn_cap,
1208                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1209    {
1210        if (cc->cc_blocked < cc->cc_max)
1211        {
1212            cc->cc_blocked = cc->cc_max;
1213            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1214            LSQ_DEBUG("marked connection-blocked at connection offset "
1215                                                    "%"PRIu64, cc->cc_max);
1216        }
1217        else
1218            LSQ_DEBUG("stream has already been marked connection-blocked "
1219                "at offset %"PRIu64, cc->cc_blocked);
1220    }
1221}
1222
1223
1224void
1225lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1226{
1227    assert(stream->stream_flags & STREAM_WANT_READ);
1228
1229    if (stream->stream_flags & STREAM_RW_ONCE)
1230        stream_dispatch_read_events_once(stream);
1231    else
1232        stream_dispatch_read_events_loop(stream);
1233}
1234
1235
1236void
1237lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1238{
1239    int progress;
1240    uint64_t tosend_off;
1241    unsigned short n_buffered;
1242    enum stream_flags flags;
1243
1244    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1245    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1246    tosend_off = stream->tosend_off;
1247    n_buffered = stream->sm_n_buffered;
1248
1249    if (stream->stream_flags & STREAM_WANT_FLUSH)
1250        (void) stream_flush(stream);
1251
1252    if (stream->stream_flags & STREAM_RW_ONCE)
1253    {
1254        if ((stream->stream_flags & STREAM_WANT_WRITE)
1255            && lsquic_stream_write_avail(stream))
1256        {
1257            stream->stream_if->on_write(stream, stream->st_ctx);
1258        }
1259    }
1260    else
1261        stream_dispatch_write_events_loop(stream);
1262
1263    /* Progress means either flags or offsets changed: */
1264    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1265                        stream->tosend_off == tosend_off &&
1266                            stream->sm_n_buffered == n_buffered);
1267
1268    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1269    {
1270        if (progress)
1271        {   /* Move the stream to the end of the list to ensure fairness. */
1272            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1273                                                            next_write_stream);
1274            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1275                                                            next_write_stream);
1276        }
1277    }
1278}
1279
1280
1281static size_t
1282inner_reader_empty_size (void *ctx)
1283{
1284    return 0;
1285}
1286
1287
1288static size_t
1289inner_reader_empty_read (void *ctx, void *buf, size_t count)
1290{
1291    return 0;
1292}
1293
1294
1295static int
1296stream_flush (lsquic_stream_t *stream)
1297{
1298    struct lsquic_reader empty_reader;
1299    ssize_t nw;
1300
1301    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1302    assert(stream->sm_n_buffered > 0 ||
1303        /* Flushing is also used to packetize standalone FIN: */
1304        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1305                                                    == STREAM_U_WRITE_DONE));
1306
1307    empty_reader.lsqr_size = inner_reader_empty_size;
1308    empty_reader.lsqr_read = inner_reader_empty_read;
1309    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1310    nw = stream_write_to_packets(stream, &empty_reader, 0);
1311
1312    if (nw >= 0)
1313    {
1314        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1315        return 0;
1316    }
1317    else
1318        return -1;
1319}
1320
1321
1322static int
1323stream_flush_nocheck (lsquic_stream_t *stream)
1324{
1325    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1326    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1327    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1328
1329    return stream_flush(stream);
1330}
1331
1332
1333int
1334lsquic_stream_flush (lsquic_stream_t *stream)
1335{
1336    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1337    {
1338        LSQ_DEBUG("cannot flush closed stream");
1339        errno = EBADF;
1340        return -1;
1341    }
1342
1343    if (0 == stream->sm_n_buffered)
1344    {
1345        LSQ_DEBUG("flushing 0 bytes: noop");
1346        return 0;
1347    }
1348
1349    return stream_flush_nocheck(stream);
1350}
1351
1352
1353/* The flush threshold is the maximum size of stream data that can be sent
1354 * in a full packet.
1355 */
1356static size_t
1357flush_threshold (const lsquic_stream_t *stream)
1358{
1359    enum packet_out_flags flags;
1360    enum lsquic_packno_bits bits;
1361    unsigned packet_header_sz, stream_header_sz;
1362    size_t threshold;
1363
1364    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1365    flags = bits << POBIT_SHIFT;
1366    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1367        flags |= PO_CONN_ID;
1368
1369    packet_header_sz = lsquic_po_header_length(flags);
1370    stream_header_sz = stream->conn_pub->lconn->cn_pf
1371            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1372
1373    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1374              - packet_header_sz - stream_header_sz;
1375    return threshold;
1376}
1377
1378
1379#define COMMON_WRITE_CHECKS() do {                                          \
1380    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1381                                                   == STREAM_USE_HEADERS)   \
1382    {                                                                       \
1383        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1384        errno = EILSEQ;                                                     \
1385        return -1;                                                          \
1386    }                                                                       \
1387    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1388    {                                                                       \
1389        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1390        errno = ECONNRESET;                                                 \
1391        return -1;                                                          \
1392    }                                                                       \
1393    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1394    {                                                                       \
1395        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1396                                                                "writing"); \
1397        errno = EBADF;                                                      \
1398        return -1;                                                          \
1399    }                                                                       \
1400} while (0)
1401
1402
1403struct frame_gen_ctx
1404{
1405    lsquic_stream_t      *fgc_stream;
1406    struct lsquic_reader *fgc_reader;
1407    /* We keep our own count of how many bytes were read from reader because
1408     * some readers are external.  The external caller does not have to rely
1409     * on our count, but it can.
1410     */
1411    size_t                fgc_nread_from_reader;
1412};
1413
1414
1415static size_t
1416frame_gen_size (void *ctx)
1417{
1418    struct frame_gen_ctx *fg_ctx = ctx;
1419    size_t available, remaining;
1420
1421    /* Make sure we are not writing past available size: */
1422    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1423    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1424    if (available < remaining)
1425        remaining = available;
1426
1427    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1428}
1429
1430
1431static int
1432frame_gen_fin (void *ctx)
1433{
1434    struct frame_gen_ctx *fg_ctx = ctx;
1435    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1436        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1437        /* Do not use frame_gen_size() as it may chop the real size: */
1438        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1439}
1440
1441
1442static void
1443incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1444{
1445    if (stream->stream_flags & STREAM_CONN_LIMITED)
1446    {
1447        stream->conn_pub->conn_cap.cc_sent += incr;
1448        assert(stream->conn_pub->conn_cap.cc_sent
1449                                    <= stream->conn_pub->conn_cap.cc_max);
1450    }
1451}
1452
1453
1454static size_t
1455frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1456{
1457    struct frame_gen_ctx *fg_ctx = ctx;
1458    unsigned char *p = begin_buf;
1459    unsigned char *const end = p + len;
1460    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1461    size_t n_written, available, n_to_write;
1462
1463    if (stream->sm_n_buffered > 0)
1464    {
1465        if (len <= stream->sm_n_buffered)
1466        {
1467            memcpy(p, stream->sm_buf, len);
1468            memmove(stream->sm_buf, stream->sm_buf + len,
1469                                                stream->sm_n_buffered - len);
1470            stream->sm_n_buffered -= len;
1471            stream->tosend_off += len;
1472            *fin = frame_gen_fin(fg_ctx);
1473            return len;
1474        }
1475        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1476        p += stream->sm_n_buffered;
1477        stream->sm_n_buffered = 0;
1478    }
1479
1480    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1481    n_to_write = end - p;
1482    if (n_to_write > available)
1483        n_to_write = available;
1484    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1485                                              n_to_write);
1486    p += n_written;
1487    fg_ctx->fgc_nread_from_reader += n_written;
1488    *fin = frame_gen_fin(fg_ctx);
1489    stream->tosend_off += p - (const unsigned char *) begin_buf;
1490    incr_conn_cap(stream, n_written);
1491    return p - (const unsigned char *) begin_buf;
1492}
1493
1494
1495static void
1496check_flush_threshold (lsquic_stream_t *stream)
1497{
1498    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1499                            stream->tosend_off >= stream->sm_flush_to)
1500    {
1501        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1502                                                    stream->sm_flush_to);
1503        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1504    }
1505}
1506
1507
1508static struct lsquic_packet_out *
1509get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1510                      const struct lsquic_stream *stream)
1511{
1512    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1513}
1514
1515
1516static struct lsquic_packet_out * (* const get_packet[])(
1517    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1518{
1519    lsquic_send_ctl_get_packet_for_stream,
1520    get_brand_new_packet,
1521};
1522
1523
1524static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1525stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1526{
1527    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1528    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1529    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1530    unsigned stream_header_sz, need_at_least, off;
1531    lsquic_packet_out_t *packet_out;
1532    int len, s, hsk;
1533
1534    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1535                                                        stream->tosend_off);
1536    need_at_least = stream_header_sz + (size > 0);
1537    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1538    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1539    if (!packet_out)
1540        return SWTP_STOP;
1541
1542    off = packet_out->po_data_sz;
1543    len = pf->pf_gen_stream_frame(
1544                packet_out->po_data + packet_out->po_data_sz,
1545                lsquic_packet_out_avail(packet_out), stream->id,
1546                stream->tosend_off,
1547                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1548    if (len < 0)
1549    {
1550        LSQ_ERROR("could not generate stream frame");
1551        return SWTP_ERROR;
1552    }
1553
1554    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1555                            packet_out->po_data + packet_out->po_data_sz, len);
1556    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1557    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1558    if (0 == lsquic_packet_out_avail(packet_out))
1559        packet_out->po_flags |= PO_STREAM_END;
1560    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1561                                     stream, QUIC_FRAME_STREAM, off, len);
1562    if (s != 0)
1563    {
1564        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1565        return SWTP_ERROR;
1566    }
1567
1568    check_flush_threshold(stream);
1569
1570    /* XXX: I don't like it that this is here */
1571    if (hsk && !(packet_out->po_flags & PO_HELLO))
1572    {
1573        lsquic_packet_out_zero_pad(packet_out);
1574        packet_out->po_flags |= PO_HELLO;
1575        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1576    }
1577
1578    return SWTP_OK;
1579}
1580
1581
1582static void
1583abort_connection (struct lsquic_stream *stream)
1584{
1585    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1586        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1587                                                next_service_stream);
1588    stream->stream_flags |= STREAM_ABORT_CONN;
1589    LSQ_WARN("connection will be aborted");
1590    maybe_conn_to_tickable(stream);
1591}
1592
1593
1594static ssize_t
1595stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1596                         size_t thresh)
1597{
1598    size_t size;
1599    ssize_t nw;
1600    unsigned seen_ok;
1601    struct frame_gen_ctx fg_ctx = {
1602        .fgc_stream = stream,
1603        .fgc_reader = reader,
1604        .fgc_nread_from_reader = 0,
1605    };
1606
1607    seen_ok = 0;
1608    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1609           || frame_gen_fin(&fg_ctx))
1610    {
1611        switch (stream_write_to_packet(&fg_ctx, size))
1612        {
1613        case SWTP_OK:
1614            if (!seen_ok++)
1615                maybe_conn_to_tickable_if_writeable(stream, 0);
1616            if (frame_gen_fin(&fg_ctx))
1617            {
1618                stream->stream_flags |= STREAM_FIN_SENT;
1619                goto end;
1620            }
1621            else
1622                break;
1623        case SWTP_STOP:
1624            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1625            goto end;
1626        default:
1627            abort_connection(stream);
1628            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1629            return -1;
1630        }
1631    }
1632
1633    if (thresh)
1634    {
1635        assert(size < thresh);
1636        assert(size >= stream->sm_n_buffered);
1637        size -= stream->sm_n_buffered;
1638        if (size > 0)
1639        {
1640            nw = save_to_buffer(stream, reader, size);
1641            if (nw < 0)
1642                return -1;
1643            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1644        }
1645    }
1646    else
1647    {
1648        /* We count flushed data towards both stream and connection limits,
1649         * so we should have been able to packetize all of it:
1650         */
1651        assert(0 == stream->sm_n_buffered);
1652        assert(size == 0);
1653    }
1654
1655    maybe_mark_as_blocked(stream);
1656
1657  end:
1658    return fg_ctx.fgc_nread_from_reader;
1659}
1660
1661
1662/* Perform an implicit flush when we hit connection limit while buffering
1663 * data.  This is to prevent a (theoretical) stall:
1664 *
1665 * Imagine a number of streams, all of which buffered some data.  The buffered
1666 * data is up to connection cap, which means no further writes are possible.
1667 * None of them flushes, which means that data is not sent and connection
1668 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1669 */
1670static int
1671maybe_flush_stream (struct lsquic_stream *stream)
1672{
1673    if (stream->sm_n_buffered > 0
1674          && (stream->stream_flags & STREAM_CONN_LIMITED)
1675            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1676        return stream_flush_nocheck(stream);
1677    else
1678        return 0;
1679}
1680
1681
1682static ssize_t
1683save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1684                                                                size_t len)
1685{
1686    size_t avail, n_written;
1687
1688    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1689
1690    if (!stream->sm_buf)
1691    {
1692        stream->sm_buf = malloc(SM_BUF_SIZE);
1693        if (!stream->sm_buf)
1694            return -1;
1695    }
1696
1697    avail = lsquic_stream_write_avail(stream);
1698    if (avail < len)
1699        len = avail;
1700
1701    n_written = reader->lsqr_read(reader->lsqr_ctx,
1702                        stream->sm_buf + stream->sm_n_buffered, len);
1703    stream->sm_n_buffered += n_written;
1704    incr_conn_cap(stream, n_written);
1705    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1706              n_written, stream->sm_n_buffered);
1707    if (0 != maybe_flush_stream(stream))
1708        return -1;
1709    return n_written;
1710}
1711
1712
1713static ssize_t
1714stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1715{
1716    size_t thresh, len;
1717
1718    thresh = flush_threshold(stream);
1719    len = reader->lsqr_size(reader->lsqr_ctx);
1720    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1721                                    stream->sm_n_buffered + len < thresh)
1722        return save_to_buffer(stream, reader, len);
1723    else
1724        return stream_write_to_packets(stream, reader, thresh);
1725}
1726
1727
1728ssize_t
1729lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1730{
1731    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1732    return lsquic_stream_writev(stream, &iov, 1);
1733}
1734
1735
1736struct inner_reader_iovec {
1737    const struct iovec       *iov;
1738    const struct iovec *end;
1739    unsigned                  cur_iovec_off;
1740};
1741
1742
1743static size_t
1744inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1745{
1746    struct inner_reader_iovec *const iro = ctx;
1747    unsigned char *p = buf;
1748    unsigned char *const end = p + count;
1749    unsigned n_tocopy;
1750
1751    while (iro->iov < iro->end && p < end)
1752    {
1753        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1754        if (n_tocopy > (unsigned) (end - p))
1755            n_tocopy = end - p;
1756        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1757                                                                    n_tocopy);
1758        p += n_tocopy;
1759        iro->cur_iovec_off += n_tocopy;
1760        if (iro->iov->iov_len == iro->cur_iovec_off)
1761        {
1762            ++iro->iov;
1763            iro->cur_iovec_off = 0;
1764        }
1765    }
1766
1767    return p + count - end;
1768}
1769
1770
1771static size_t
1772inner_reader_iovec_size (void *ctx)
1773{
1774    struct inner_reader_iovec *const iro = ctx;
1775    const struct iovec *iov;
1776    size_t size;
1777
1778    size = 0;
1779    for (iov = iro->iov; iov < iro->end; ++iov)
1780        size += iov->iov_len;
1781
1782    return size - iro->cur_iovec_off;
1783}
1784
1785
1786ssize_t
1787lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1788                                                                    int iovcnt)
1789{
1790    COMMON_WRITE_CHECKS();
1791    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1792
1793    struct inner_reader_iovec iro = {
1794        .iov = iov,
1795        .end = iov + iovcnt,
1796        .cur_iovec_off = 0,
1797    };
1798    struct lsquic_reader reader = {
1799        .lsqr_read = inner_reader_iovec_read,
1800        .lsqr_size = inner_reader_iovec_size,
1801        .lsqr_ctx  = &iro,
1802    };
1803
1804    return stream_write(stream, &reader);
1805}
1806
1807
1808ssize_t
1809lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1810{
1811    COMMON_WRITE_CHECKS();
1812    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1813    return stream_write(stream, reader);
1814}
1815
1816
1817int
1818lsquic_stream_send_headers (lsquic_stream_t *stream,
1819                            const lsquic_http_headers_t *headers, int eos)
1820{
1821    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1822                                                     STREAM_U_WRITE_DONE))
1823                == STREAM_USE_HEADERS)
1824    {
1825        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1826                    stream->id, headers, eos, lsquic_stream_priority(stream));
1827        if (0 == s)
1828        {
1829            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1830            stream->stream_flags |= STREAM_HEADERS_SENT;
1831            if (eos)
1832                stream->stream_flags |= STREAM_FIN_SENT;
1833            LSQ_INFO("sent headers for stream %u", stream->id);
1834        }
1835        else
1836            LSQ_WARN("could not send headers: %s", strerror(errno));
1837        return s;
1838    }
1839    else
1840    {
1841        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1842        errno = EBADMSG;
1843        return -1;
1844    }
1845}
1846
1847
1848void
1849lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1850{
1851    if (offset > stream->max_send_off)
1852    {
1853        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1854        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1855            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1856        stream->max_send_off = offset;
1857    }
1858    else
1859        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1860            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1861            stream->max_send_off);
1862}
1863
1864
1865/* This function is used to update offsets after handshake completes and we
1866 * learn of peer's limits from the handshake values.
1867 */
1868int
1869lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1870{
1871    LSQ_DEBUG("setting max_send_off to %u", offset);
1872    if (offset > stream->max_send_off)
1873    {
1874        lsquic_stream_window_update(stream, offset);
1875        return 0;
1876    }
1877    else if (offset < stream->tosend_off)
1878    {
1879        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1880            "already sent on this stream (%"PRIu64" bytes)", offset,
1881            stream->tosend_off);
1882        return -1;
1883    }
1884    else
1885    {
1886        stream->max_send_off = offset;
1887        return 0;
1888    }
1889}
1890
1891
1892void
1893lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1894{
1895    lsquic_stream_reset_ext(stream, error_code, 1);
1896}
1897
1898
1899void
1900lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1901                         int do_close)
1902{
1903    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1904    {
1905        LSQ_INFO("reset already sent");
1906        return;
1907    }
1908
1909    SM_HISTORY_APPEND(stream, SHE_RESET);
1910
1911    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1912    stream->error_code = error_code;
1913
1914    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1915        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1916                                                        next_send_stream);
1917    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1918    stream->stream_flags |= STREAM_SEND_RST;
1919
1920    drop_buffered_data(stream);
1921    maybe_elide_stream_frames(stream);
1922    maybe_schedule_call_on_close(stream);
1923
1924    if (do_close)
1925        lsquic_stream_close(stream);
1926    else
1927        maybe_conn_to_tickable_if_writeable(stream, 1);
1928}
1929
1930
1931unsigned
1932lsquic_stream_id (const lsquic_stream_t *stream)
1933{
1934    return stream->id;
1935}
1936
1937
1938struct lsquic_conn *
1939lsquic_stream_conn (const lsquic_stream_t *stream)
1940{
1941    return stream->conn_pub->lconn;
1942}
1943
1944
1945int
1946lsquic_stream_close (lsquic_stream_t *stream)
1947{
1948    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
1949    SM_HISTORY_APPEND(stream, SHE_CLOSE);
1950    if (lsquic_stream_is_closed(stream))
1951    {
1952        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
1953        errno = EBADF;
1954        return -1;
1955    }
1956    stream_shutdown_write(stream);
1957    stream_shutdown_read(stream);
1958    maybe_schedule_call_on_close(stream);
1959    maybe_finish_stream(stream);
1960    maybe_conn_to_tickable_if_writeable(stream, 1);
1961    return 0;
1962}
1963
1964
1965#ifndef NDEBUG
1966#if __GNUC__
1967__attribute__((weak))
1968#endif
1969#endif
1970void
1971lsquic_stream_acked (lsquic_stream_t *stream)
1972{
1973    assert(stream->n_unacked);
1974    --stream->n_unacked;
1975    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
1976    if (0 == stream->n_unacked)
1977        maybe_finish_stream(stream);
1978}
1979
1980
1981void
1982lsquic_stream_push_req (lsquic_stream_t *stream,
1983                        struct uncompressed_headers *push_req)
1984{
1985    assert(!stream->push_req);
1986    stream->push_req = push_req;
1987    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
1988}
1989
1990
1991int
1992lsquic_stream_is_pushed (const lsquic_stream_t *stream)
1993{
1994    return 1 & ~stream->id;
1995}
1996
1997
1998int
1999lsquic_stream_push_info (const lsquic_stream_t *stream,
2000        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
2001{
2002    if (lsquic_stream_is_pushed(stream))
2003    {
2004        assert(stream->push_req);
2005        *ref_stream_id = stream->push_req->uh_stream_id;
2006        *headers       = stream->push_req->uh_headers;
2007        *headers_sz    = stream->push_req->uh_size;
2008        return 0;
2009    }
2010    else
2011        return -1;
2012}
2013
2014
2015int
2016lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2017{
2018    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2019    {
2020        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2021        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2022        stream->stream_flags |= STREAM_HAVE_UH;
2023        if (uh->uh_flags & UH_FIN)
2024            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2025        stream->uh = uh;
2026        if (uh->uh_oth_stream_id == 0)
2027        {
2028            if (uh->uh_weight)
2029                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2030        }
2031        else
2032            LSQ_NOTICE("don't know how to depend on stream %u",
2033                                                        uh->uh_oth_stream_id);
2034        return 0;
2035    }
2036    else
2037    {
2038        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2039        return -1;
2040    }
2041}
2042
2043
2044unsigned
2045lsquic_stream_priority (const lsquic_stream_t *stream)
2046{
2047    return 256 - stream->sm_priority;
2048}
2049
2050
2051int
2052lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2053{
2054    /* The user should never get a reference to the special streams,
2055     * but let's check just in case:
2056     */
2057    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2058        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2059                                LSQUIC_STREAM_HEADERS == stream->id))
2060        return -1;
2061    if (priority < 1 || priority > 256)
2062        return -1;
2063    stream->sm_priority = 256 - priority;
2064    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2065    LSQ_DEBUG("set priority to %u", priority);
2066    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2067    return 0;
2068}
2069
2070
2071int
2072lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2073{
2074    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2075    {
2076        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2077                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2078        {
2079            /* We need to send headers only if we are a) using HEADERS stream
2080             * and b) we already sent initial headers.  If initial headers
2081             * have not been sent yet, stream priority will be sent in the
2082             * HEADERS frame.
2083             */
2084            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2085                                                    stream->id, 0, 0, priority);
2086        }
2087        else
2088            return 0;
2089    }
2090    else
2091        return -1;
2092}
2093
2094
2095lsquic_stream_ctx_t *
2096lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2097{
2098    return stream->st_ctx;
2099}
2100
2101
2102int
2103lsquic_stream_refuse_push (lsquic_stream_t *stream)
2104{
2105    if (lsquic_stream_is_pushed(stream) &&
2106                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2107    {
2108        LSQ_DEBUG("refusing pushed stream: send reset");
2109        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2110        return 0;
2111    }
2112    else
2113        return -1;
2114}
2115
2116
2117size_t
2118lsquic_stream_mem_used (const struct lsquic_stream *stream)
2119{
2120    size_t size;
2121
2122    size = sizeof(stream);
2123    if (stream->sm_buf)
2124        size += SM_BUF_SIZE;
2125    if (stream->data_in)
2126        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2127
2128    return size;
2129}
2130
2131
2132lsquic_cid_t
2133lsquic_stream_cid (const struct lsquic_stream *stream)
2134{
2135    return LSQUIC_LOG_CONN_ID;
2136}
2137