lsquic_stream.c revision 50aadb33
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <sys/types.h>
28#include <sys/stat.h>
29#include <fcntl.h>
30#include <unistd.h>
31#include <stddef.h>
32
33#include "lsquic.h"
34
35#include "lsquic_int_types.h"
36#include "lsquic_packet_common.h"
37#include "lsquic_packet_in.h"
38#include "lsquic_malo.h"
39#include "lsquic_conn_flow.h"
40#include "lsquic_rtt.h"
41#include "lsquic_sfcw.h"
42#include "lsquic_stream.h"
43#include "lsquic_conn_public.h"
44#include "lsquic_util.h"
45#include "lsquic_mm.h"
46#include "lsquic_headers_stream.h"
47#include "lsquic_frame_reader.h"
48#include "lsquic_conn.h"
49#include "lsquic_data_in_if.h"
50#include "lsquic_parse.h"
51#include "lsquic_packet_out.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_senhist.h"
54#include "lsquic_pacer.h"
55#include "lsquic_cubic.h"
56#include "lsquic_send_ctl.h"
57#include "lsquic_ev_log.h"
58
59#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
60#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
61#define LSQUIC_LOG_STREAM_ID stream->id
62#include "lsquic_logger.h"
63
64enum sbt_type {
65    SBT_BUF,
66    SBT_FILE,
67};
68
69struct stream_buf_tosend
70{
71    TAILQ_ENTRY(stream_buf_tosend)  next_sbt;
72    enum sbt_type                   sbt_type;
73    /* On 64-bit platform, here is a four-byte hole */
74    union {
75        struct {
76            size_t                  sbt_sz;
77            size_t                  sbt_off;
78            unsigned char           sbt_data[
79                0x1000 - sizeof(enum sbt_type) - sizeof(long) + 4 - sizeof(size_t) * 2
80                                - sizeof(TAILQ_ENTRY(stream_buf_tosend))
81            ];
82        }                           buf;
83        struct {
84            struct lsquic_stream   *sbt_stream;
85            off_t                   sbt_sz;
86            off_t                   sbt_off;
87            int                     sbt_fd;
88            signed char             sbt_last;
89        }                           file;
90    }                               u;
91};
92
93typedef char _sbt_is_4K[(sizeof(struct stream_buf_tosend) == 0x1000) - 1];
94
95static size_t
96sum_sbts (const lsquic_stream_t *stream);
97
98static void
99drop_sbts (lsquic_stream_t *stream);
100
101static void
102drop_frames_in (lsquic_stream_t *stream);
103
104static void
105maybe_schedule_call_on_close (lsquic_stream_t *stream);
106
107static void
108stream_file_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *);
109
110static void
111stream_flush_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *);
112
113static int
114stream_wantread (lsquic_stream_t *stream, int is_want);
115
116static int
117stream_wantwrite (lsquic_stream_t *stream, int is_want);
118
119static void
120stop_reading_from_file (lsquic_stream_t *stream);
121
122static int
123stream_readable (const lsquic_stream_t *stream);
124
125static int
126stream_writeable (const lsquic_stream_t *stream);
127
128static size_t
129stream_flush_internal (lsquic_stream_t *stream, size_t size);
130
131static void
132incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr);
133
134static void
135maybe_put_on_sending_streams (lsquic_stream_t *stream);
136
137
138#if LSQUIC_KEEP_STREAM_HISTORY
139/* These values are printable ASCII characters for ease of printing the
140 * whole history in a single line of a log message.
141 *
142 * The list of events is not exhaustive: only most interesting events
143 * are recorded.
144 */
145enum stream_history_event
146{
147    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
148    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
149    SHE_REACH_FIN          =  'a',
150    SHE_BLOCKED_OUT        =  'b',
151    SHE_CREATED            =  'C',
152    SHE_FRAME_IN           =  'd',
153    SHE_FRAME_OUT          =  'D',
154    SHE_RESET              =  'e',
155    SHE_WINDOW_UPDATE      =  'E',
156    SHE_FIN_IN             =  'f',
157    SHE_FINISHED           =  'F',
158    SHE_GOAWAY_IN          =  'g',
159    SHE_USER_WRITE_HEADER  =  'h',
160    SHE_HEADERS_IN         =  'H',
161    SHE_ONCLOSE_SCHED      =  'l',
162    SHE_ONCLOSE_CALL       =  'L',
163    SHE_ONNEW              =  'N',
164    SHE_SET_PRIO           =  'p',
165    SHE_USER_READ          =  'r',
166    SHE_SHUTDOWN_READ      =  'R',
167    SHE_RST_IN             =  's',
168    SHE_RST_OUT            =  't',
169    SHE_FLUSH              =  'u',
170    SHE_USER_WRITE_DATA    =  'w',
171    SHE_SHUTDOWN_WRITE     =  'W',
172    SHE_CLOSE              =  'X',
173    SHE_FORCE_FINISH       =  'Z',
174};
175
176static void
177sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
178{
179    enum stream_history_event prev_event;
180    sm_hist_idx_t idx;
181    int plus;
182
183    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
184    plus = SHE_PLUS == stream->sm_hist_buf[idx];
185    idx = (idx - plus) & SM_HIST_IDX_MASK;
186    prev_event = stream->sm_hist_buf[idx];
187
188    if (prev_event == sh_event && plus)
189        return;
190
191    if (prev_event == sh_event)
192        sh_event = SHE_PLUS;
193    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
194
195    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
196        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
197                                                        stream->sm_hist_buf);
198}
199
200
201#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
202#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
203        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
204            LSQ_DEBUG("history: [%.*s]",                                    \
205                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
206                (stream)->sm_hist_buf);                                     \
207    } while (0)
208#else
209#   define SM_HISTORY_APPEND(stream, event)
210#   define SM_HISTORY_DUMP_REMAINING(stream)
211#endif
212
213
214/* Here, "readable" means that the user is able to read from the stream. */
215static void
216maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream,
217                                                        enum rw_reason reason)
218{
219    if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) &&
220                                                stream_readable(stream))
221    {
222        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
223                                            stream->conn_pub->lconn, reason);
224    }
225}
226
227
228/* Here, "writeable" means that data can be put into packets to be
229 * scheduled to be sent out.
230 */
231static void
232maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream,
233                                                        enum rw_reason reason)
234{
235    if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) &&
236            lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
237          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
238    {
239        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
240                                            stream->conn_pub->lconn, reason);
241    }
242}
243
244
245static int
246stream_stalled (const lsquic_stream_t *stream)
247{
248    return 0 == (stream->stream_flags & STREAM_RW_EVENT_FLAGS) &&
249           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
250                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
251}
252
253
254static void
255use_user_on_write (lsquic_stream_t *stream)
256{
257    LSQ_DEBUG("stream %u: use user-supplied on-write callback", stream->id);
258    stream->on_write_cb  = stream->stream_if->on_write;
259}
260
261
262static void
263use_internal_on_write_file (lsquic_stream_t *stream)
264{
265    LSQ_DEBUG("use internal on-write callback (file)");
266    stream->on_write_cb  = stream_file_on_write;
267}
268
269
270static void
271use_internal_on_write_flush (lsquic_stream_t *stream)
272{
273    LSQ_DEBUG("use internal on-write callback (flush)");
274    stream->on_write_cb  = stream_flush_on_write;
275}
276
277
278#define writing_file(stream) ((stream)->file_fd >= 0)
279
280
281/* TODO: The logic to figure out whether the stream is connection limited
282 * should be taken out of the constructor.  The caller should specify this
283 * via one of enum stream_ctor_flags.
284 */
285lsquic_stream_t *
286lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
287                       const struct lsquic_stream_if *stream_if,
288                       void *stream_if_ctx, unsigned initial_window,
289                       unsigned initial_send_off,
290                       enum stream_ctor_flags ctor_flags)
291{
292    lsquic_cfcw_t *cfcw;
293    lsquic_stream_t *stream;
294
295    stream = calloc(1, sizeof(*stream));
296    if (!stream)
297        return NULL;
298
299    stream->stream_if = stream_if;
300    stream->id        = id;
301    stream->file_fd   = -1;
302    stream->conn_pub  = conn_pub;
303    if (!initial_window)
304        initial_window = 16 * 1024;
305    if (LSQUIC_STREAM_HANDSHAKE == id ||
306        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
307        cfcw = NULL;
308    else
309    {
310        cfcw = &conn_pub->cfcw;
311        stream->stream_flags |= STREAM_CONN_LIMITED;
312        if (conn_pub->hs)
313            stream->stream_flags |= STREAM_USE_HEADERS;
314        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
315    }
316    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
317    if (!initial_send_off)
318        initial_send_off = 16 * 1024;
319    stream->max_send_off = initial_send_off;
320    TAILQ_INIT(&stream->bufs_tosend);
321    if (ctor_flags & SCF_USE_DI_HASH)
322        stream->data_in = data_in_hash_new(conn_pub, id, 0);
323    else
324        stream->data_in = data_in_nocopy_new(conn_pub, id);
325    LSQ_DEBUG("created stream %u", id);
326    SM_HISTORY_APPEND(stream, SHE_CREATED);
327    if (ctor_flags & SCF_DI_AUTOSWITCH)
328        stream->stream_flags |= STREAM_AUTOSWITCH;
329    if (ctor_flags & SCF_CALL_ON_NEW)
330        lsquic_stream_call_on_new(stream, stream_if_ctx);
331    if (ctor_flags & SCF_DISP_RW_ONCE)
332        stream->stream_flags |= STREAM_RW_ONCE;
333    use_user_on_write(stream);
334    return stream;
335}
336
337
338void
339lsquic_stream_call_on_new (lsquic_stream_t *stream, void *stream_if_ctx)
340{
341    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
342    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
343    {
344        LSQ_DEBUG("calling on_new_stream");
345        SM_HISTORY_APPEND(stream, SHE_ONNEW);
346        stream->stream_flags |= STREAM_ONNEW_DONE;
347        stream->st_ctx = stream->stream_if->on_new_stream(stream_if_ctx, stream);
348    }
349}
350
351
352void
353lsquic_stream_destroy (lsquic_stream_t *stream)
354{
355    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
356                                                            STREAM_ONNEW_DONE)
357    {
358        stream->stream_flags |= STREAM_ONCLOSE_DONE;
359        stream->stream_if->on_close(stream, stream->st_ctx);
360    }
361    if (stream->file_fd >= 0)
362        (void) close(stream->file_fd);
363    if (stream->stream_flags & STREAM_SENDING_FLAGS)
364        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
365    if (stream->stream_flags & STREAM_RW_EVENT_FLAGS)
366        TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
367    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
368        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
369    lsquic_sfcw_consume_rem(&stream->fc);
370    drop_frames_in(stream);
371    drop_sbts(stream);
372    free(stream->push_req);
373    free(stream->uh);
374    LSQ_DEBUG("destroyed stream %u", stream->id);
375    SM_HISTORY_DUMP_REMAINING(stream);
376    free(stream);
377}
378
379
380static int
381stream_is_finished (const lsquic_stream_t *stream)
382{
383    return lsquic_stream_is_closed(stream)
384           /* n_unacked checks that no outgoing packets that reference this
385            * stream are outstanding:
386            */
387        && 0 == stream->n_unacked
388           /* This checks that no packets that reference this stream will
389            * become outstanding:
390            */
391        && 0 == (stream->stream_flags & (STREAM_SEND_DATA|STREAM_SEND_RST))
392        && ((stream->stream_flags & STREAM_FORCE_FINISH)
393          || (((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
394                || lsquic_stream_is_pushed(stream))
395           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
396}
397
398
399static void
400maybe_finish_stream (lsquic_stream_t *stream)
401{
402    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
403                                                    stream_is_finished(stream))
404    {
405        LSQ_DEBUG("stream %u is now finished", stream->id);
406        SM_HISTORY_APPEND(stream, SHE_FINISHED);
407        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
408            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
409                                                    next_service_stream);
410        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
411    }
412}
413
414
415static void
416maybe_schedule_call_on_close (lsquic_stream_t *stream)
417{
418    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
419                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
420            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
421    {
422        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
423            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
424                                                    next_service_stream);
425        stream->stream_flags |= STREAM_CALL_ONCLOSE;
426        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
427        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
428    }
429}
430
431
432void
433lsquic_stream_call_on_close (lsquic_stream_t *stream)
434{
435    assert(stream->stream_flags & STREAM_ONNEW_DONE);
436    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
437    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
438        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
439                                                    next_service_stream);
440    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
441    {
442        LSQ_DEBUG("calling on_close for stream %u", stream->id);
443        stream->stream_flags |= STREAM_ONCLOSE_DONE;
444        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
445        stream->stream_if->on_close(stream, stream->st_ctx);
446    }
447    else
448        assert(0);
449}
450
451
452static int
453stream_readable (const lsquic_stream_t *stream)
454{
455    /* A stream is readable if one of the following is true: */
456    return
457        /* - It is already finished: in that case, lsquic_stream_read() will
458         *   return 0.
459         */
460            (stream->stream_flags & STREAM_FIN_REACHED)
461        /* - The stream is reset, by either side.  In this case,
462         *   lsquic_stream_read() will return -1 (we want the user to be
463         *   able to collect the error).
464         */
465        ||  (stream->stream_flags & STREAM_RST_FLAGS)
466        /* - Either we are not in HTTP mode or the HTTP headers have been
467         *   received and the headers or data from the stream can be read.
468         */
469        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
470                                                        == STREAM_USE_HEADERS)
471            && (stream->uh != NULL
472                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
473                                                        stream->read_offset)))
474    ;
475}
476
477
478size_t
479lsquic_stream_write_avail (const lsquic_stream_t *stream)
480{
481    uint64_t stream_avail, conn_avail;
482    size_t unflushed_sum;
483
484    assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off);
485    stream_avail = stream->max_send_off - stream->tosend_off
486                                                - stream->tosend_sz;
487    if (0 == stream->tosend_sz)
488    {
489        unflushed_sum = sum_sbts(stream);
490        if (unflushed_sum > stream_avail)
491            stream_avail = 0;
492        else
493            stream_avail -= unflushed_sum;
494    }
495
496    if (stream->stream_flags & STREAM_CONN_LIMITED)
497    {
498        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
499        if (conn_avail < stream_avail)
500        {
501            LSQ_DEBUG("stream %u write buffer is limited by connection: %jd",
502                stream->id, conn_avail);
503            return conn_avail;
504        }
505    }
506
507    LSQ_DEBUG("stream %u write buffer is limited by stream: %jd",
508        stream->id, stream_avail);
509    return stream_avail;
510}
511
512
513static int
514stream_writeable (const lsquic_stream_t *stream)
515{
516    /* A stream is writeable if one of the following is true: */
517    return
518        /* - The stream is reset, by either side.  In this case,
519         *   lsquic_stream_write() will return -1 (we want the user to be
520         *   able to collect the error).
521         */
522            (stream->stream_flags & STREAM_RST_FLAGS)
523        /* - There is room to write to the stream.
524         */
525        ||  lsquic_stream_write_avail(stream) > 0
526    ;
527}
528
529
530int
531lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
532{
533    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
534                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
535    {
536        return -1;
537    }
538    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
539    {
540        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
541            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
542                                                    next_send_stream);
543        stream->stream_flags |= STREAM_SEND_WUF;
544    }
545    return 0;
546}
547
548
549int
550lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
551{
552    uint64_t max_off;
553    int got_next_offset;
554    enum ins_frame ins_frame;
555
556    assert(frame->packet_in);
557
558    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
559    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
560        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
561
562    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
563                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
564    {
565        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
566        lsquic_malo_put(frame);
567        return -1;
568    }
569
570    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
571    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
572    if (INS_FRAME_OK == ins_frame)
573    {
574        /* Update maximum offset in the flow controller and check for flow
575         * control violation:
576         */
577        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
578        if (0 != lsquic_stream_update_sfcw(stream, max_off))
579            return -1;
580        if ((stream->stream_flags & STREAM_U_READ_DONE))
581            lsquic_stream_reset_ext(stream, 1, 0);
582        if (frame->data_frame.df_fin)
583        {
584            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
585            stream->stream_flags |= STREAM_FIN_RECVD;
586            maybe_finish_stream(stream);
587        }
588        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
589                (stream->data_in->di_flags & DI_SWITCH_IMPL))
590        {
591            stream->data_in = stream->data_in->di_if->di_switch_impl(
592                                        stream->data_in, stream->read_offset);
593            if (!stream->data_in)
594            {
595                stream->data_in = data_in_error_new();
596                return -1;
597            }
598        }
599        if (got_next_offset)
600            /* Checking the offset saves di_get_frame() call */
601            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN);
602        return 0;
603    }
604    else if (INS_FRAME_DUP == ins_frame)
605    {
606        return 0;
607    }
608    else
609    {
610        assert(INS_FRAME_ERR == ins_frame);
611        return -1;
612    }
613}
614
615
616static void
617drop_frames_in (lsquic_stream_t *stream)
618{
619    if (stream->data_in)
620    {
621        stream->data_in->di_if->di_destroy(stream->data_in);
622        /* To avoid checking whether `data_in` is set, just set to the error
623         * data-in stream.  It does the right thing after incoming data is
624         * dropped.
625         */
626        stream->data_in = data_in_error_new();
627    }
628}
629
630
631int
632lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
633                      uint32_t error_code)
634{
635
636    if (stream->stream_flags & STREAM_RST_RECVD)
637    {
638        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
639        return 0;
640    }
641
642    SM_HISTORY_APPEND(stream, SHE_RST_IN);
643    /* This flag must always be set, even if we are "ignoring" it: it is
644     * used by elision code.
645     */
646    stream->stream_flags |= STREAM_RST_RECVD;
647
648    if ((stream->stream_flags & STREAM_FIN_RECVD) &&
649                    /* Pushed streams have fake STREAM_FIN_RECVD set, thus
650                     * we need a special check:
651                     */
652                                            !lsquic_stream_is_pushed(stream))
653    {
654        LSQ_DEBUG("ignore RST_STREAM frame after FIN is received");
655        return 0;
656    }
657
658    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
659    {
660        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
661            "smaller than that of byte following the last byte we have seen: "
662            "0x%"PRIX64, stream->id, offset,
663            lsquic_sfcw_get_max_recv_off(&stream->fc));
664        return -1;
665    }
666
667    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
668    {
669        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
670            " violates flow control", stream->id, offset);
671        return -1;
672    }
673
674    /* Let user collect error: */
675    maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN);
676
677    lsquic_sfcw_consume_rem(&stream->fc);
678    drop_frames_in(stream);
679
680    if (!(stream->stream_flags &
681                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
682        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
683
684    stream->stream_flags |= STREAM_RST_RECVD;
685
686    maybe_finish_stream(stream);
687    maybe_schedule_call_on_close(stream);
688
689    return 0;
690}
691
692
693uint64_t
694lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
695{
696    assert(stream->stream_flags & STREAM_SEND_WUF);
697    stream->stream_flags &= ~STREAM_SEND_WUF;
698    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
699        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
700    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
701}
702
703
704void
705lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
706{
707    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
708    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
709    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
710    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
711        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
712}
713
714
715void
716lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
717{
718    assert(stream->stream_flags & STREAM_SEND_RST);
719    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
720    stream->stream_flags &= ~STREAM_SEND_RST;
721    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
722        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
723    stream->stream_flags |= STREAM_RST_SENT;
724    maybe_finish_stream(stream);
725}
726
727
728static size_t
729read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
730{
731    struct uncompressed_headers *uh = stream->uh;
732    size_t n_avail = uh->uh_size - uh->uh_off;
733    if (n_avail < len)
734        len = n_avail;
735    memcpy(dst, uh->uh_headers + uh->uh_off, len);
736    uh->uh_off += len;
737    if (uh->uh_off == uh->uh_size)
738    {
739        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
740        free(uh);
741        stream->uh = NULL;
742        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
743        {
744            stream->stream_flags |= STREAM_FIN_REACHED;
745            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
746        }
747    }
748    return len;
749}
750
751
752/* This function returns 0 when EOF is reached.
753 */
754ssize_t
755lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
756                     int iovcnt)
757{
758    size_t total_nread, nread;
759    int processed_frames, read_unc_headers, iovidx;
760    unsigned char *p, *end;
761
762    SM_HISTORY_APPEND(stream, SHE_USER_READ);
763
764#define NEXT_IOV() do {                                             \
765    ++iovidx;                                                       \
766    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
767        ++iovidx;                                                   \
768    if (iovidx < iovcnt)                                            \
769    {                                                               \
770        p = iov[iovidx].iov_base;                                   \
771        end = p + iov[iovidx].iov_len;                              \
772    }                                                               \
773    else                                                            \
774        p = end = NULL;                                             \
775} while (0)
776
777#define AVAIL() (end - p)
778
779    if (stream->stream_flags & STREAM_RST_FLAGS)
780    {
781        errno = ECONNRESET;
782        return -1;
783    }
784    if (stream->stream_flags & STREAM_U_READ_DONE)
785    {
786        errno = EBADF;
787        return -1;
788    }
789    if (stream->stream_flags & STREAM_FIN_REACHED)
790        return 0;
791
792    total_nread = 0;
793    processed_frames = 0;
794
795    iovidx = -1;
796    NEXT_IOV();
797
798    if (stream->uh && AVAIL())
799    {
800        read_unc_headers = 1;
801        do
802        {
803            nread = read_uh(stream, p, AVAIL());
804            p += nread;
805            total_nread += nread;
806            if (p == end)
807                NEXT_IOV();
808        }
809        while (stream->uh && AVAIL());
810    }
811    else
812        read_unc_headers = 0;
813
814    struct data_frame *data_frame;
815    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
816    {
817        ++processed_frames;
818        size_t navail = data_frame->df_size - data_frame->df_read_off;
819        size_t ntowrite = AVAIL();
820        if (navail < ntowrite)
821            ntowrite = navail;
822        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
823        p += ntowrite;
824        data_frame->df_read_off += ntowrite;
825        stream->read_offset += ntowrite;
826        total_nread += ntowrite;
827        if (data_frame->df_read_off == data_frame->df_size)
828        {
829            const int fin = data_frame->df_fin;
830            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
831            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
832                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
833            {
834                stream->data_in = stream->data_in->di_if->di_switch_impl(
835                                            stream->data_in, stream->read_offset);
836                if (!stream->data_in)
837                {
838                    stream->data_in = data_in_error_new();
839                    return -1;
840                }
841            }
842            if (fin)
843            {
844                stream->stream_flags |= STREAM_FIN_REACHED;
845                break;
846            }
847        }
848        if (p == end)
849            NEXT_IOV();
850    }
851
852    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
853                                        total_nread, stream->read_offset);
854
855    if (processed_frames)
856    {
857        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
858        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
859        {
860            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
861                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
862            stream->stream_flags |= STREAM_SEND_WUF;
863            maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ);
864        }
865    }
866
867    if (processed_frames || read_unc_headers)
868    {
869        return total_nread;
870    }
871    else
872    {
873        assert(0 == total_nread);
874        errno = EWOULDBLOCK;
875        return -1;
876    }
877}
878
879
880ssize_t
881lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
882{
883    struct iovec iov = { .iov_base = buf, .iov_len = len, };
884    return lsquic_stream_readv(stream, &iov, 1);
885}
886
887
888#ifndef NDEBUG
889/* Use weak linkage so that tests can override this function */
890int
891lsquic_stream_tosend_fin (const lsquic_stream_t *stream)
892    __attribute__((weak))
893    ;
894#endif
895int
896lsquic_stream_tosend_fin (const lsquic_stream_t *stream)
897{
898    return (stream->stream_flags & STREAM_U_WRITE_DONE)
899        && !writing_file(stream)
900        && 0 == stream->tosend_sz
901        && 0 == sum_sbts(stream);
902}
903
904
905static int
906readable_data_frame_remains (lsquic_stream_t *stream)
907{
908        return !stream->data_in->di_if->di_empty(stream->data_in);
909}
910
911
912static void
913stream_shutdown_read (lsquic_stream_t *stream)
914{
915    if (!(stream->stream_flags & STREAM_U_READ_DONE))
916    {
917        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
918        if (stream->uh || readable_data_frame_remains(stream))
919        {
920            LSQ_INFO("read shut down, but there is still data to be read");
921            lsquic_stream_reset_ext(stream, 1, 1);
922        }
923        stream->stream_flags |= STREAM_U_READ_DONE;
924        stream_wantread(stream, 0);
925        maybe_finish_stream(stream);
926    }
927}
928
929
930static void
931stream_shutdown_write (lsquic_stream_t *stream)
932{
933    int flushed_all, data_send_ok;
934
935    if (stream->stream_flags & STREAM_U_WRITE_DONE)
936        return;
937
938    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
939    stream->stream_flags |= STREAM_U_WRITE_DONE;
940
941    data_send_ok = !(stream->stream_flags &
942                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT));
943    if (!data_send_ok)
944    {
945        stream_wantwrite(stream, 0);
946        return;
947    }
948
949    lsquic_stream_flush(stream);
950    flushed_all = stream->tosend_sz == sum_sbts(stream);
951    if (flushed_all)
952        stream_wantwrite(stream, 0);
953    else
954    {
955        stream_wantwrite(stream, 1);
956        use_internal_on_write_flush(stream);
957    }
958
959    if (flushed_all || stream->tosend_sz > 0)
960    {
961        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
962            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
963        stream->stream_flags |= STREAM_SEND_DATA;
964    }
965}
966
967
968int
969lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
970{
971    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
972    if (lsquic_stream_is_closed(stream))
973    {
974        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
975        errno = EBADF;
976        return -1;
977    }
978    /* 0: read, 1: write: 2: read and write
979     */
980    if (how < 0 || how > 2)
981    {
982        errno = EINVAL;
983        return -1;
984    }
985
986    if (how)
987        stream_shutdown_write(stream);
988    if (how != 1)
989        stream_shutdown_read(stream);
990
991    maybe_finish_stream(stream);
992    maybe_schedule_call_on_close(stream);
993    if (how)
994        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN);
995
996    return 0;
997}
998
999
1000void
1001lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
1002{
1003    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
1004    if (LSQUIC_STREAM_HANDSHAKE == stream->id
1005        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
1006                                LSQUIC_STREAM_HEADERS == stream->id))
1007    {
1008        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
1009        stream->stream_flags |= STREAM_FORCE_FINISH;
1010        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
1011    }
1012    maybe_finish_stream(stream);
1013    maybe_schedule_call_on_close(stream);
1014}
1015
1016
1017static void
1018fake_reset_unused_stream (lsquic_stream_t *stream)
1019{
1020    stream->stream_flags |=
1021        STREAM_RST_RECVD    /* User will pick this up on read or write */
1022      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1023    ;
1024
1025    /* Cancel all writes to the network scheduled for this stream: */
1026    if (stream->stream_flags & STREAM_SENDING_FLAGS)
1027        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1028                                                next_send_stream);
1029    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1030
1031    if (writing_file(stream))
1032        stop_reading_from_file(stream);
1033
1034    LSQ_DEBUG("fake-reset stream %u%s",
1035                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
1036    maybe_finish_stream(stream);
1037    maybe_schedule_call_on_close(stream);
1038}
1039
1040
1041/* This function should only be called for locally-initiated streams whose ID
1042 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1043 * frame sent by peer but we have not yet received it and created a stream.
1044 * In this situation, we mark the stream as reset, so that user's on_read or
1045 * on_write event callback picks up the error.  That, in turn, should result
1046 * in stream being closed.
1047 *
1048 * If we have received any data frames on this stream, this probably indicates
1049 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1050 * lower than this.  However, we still try to handle it gracefully and peform
1051 * a shutdown, as if the stream was not reset.
1052 */
1053void
1054lsquic_stream_received_goaway (lsquic_stream_t *stream)
1055{
1056    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1057    if (0 == stream->read_offset &&
1058                            stream->data_in->di_if->di_empty(stream->data_in))
1059        fake_reset_unused_stream(stream);       /* Normal condition */
1060    else
1061    {   /* This is odd, let's handle it the best we can: */
1062        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1063        lsquic_stream_shutdown_internal(stream);
1064    }
1065}
1066
1067
1068uint64_t
1069lsquic_stream_read_offset (const lsquic_stream_t *stream)
1070{
1071    return stream->read_offset;
1072}
1073
1074
1075static int
1076stream_want_read_or_write (lsquic_stream_t *stream, int is_want, int flag)
1077{
1078    const int old_val = !!(stream->stream_flags & flag);
1079    if (old_val != is_want)
1080    {
1081        if (is_want)
1082        {
1083            if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS))
1084                TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1085            stream->stream_flags |= flag;
1086        }
1087        else
1088        {
1089            stream->stream_flags &= ~flag;
1090            if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS))
1091                TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1092        }
1093    }
1094    return old_val;
1095}
1096
1097
1098static int
1099stream_wantread (lsquic_stream_t *stream, int is_want)
1100{
1101    return stream_want_read_or_write(stream, is_want, STREAM_WANT_READ);
1102}
1103
1104
1105int
1106lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1107{
1108    if (0 == (stream->stream_flags & STREAM_U_READ_DONE))
1109    {
1110        if (is_want)
1111            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD);
1112        return stream_wantread(stream, is_want);
1113    }
1114    else
1115    {
1116        errno = EBADF;
1117        return -1;
1118    }
1119}
1120
1121
1122static int
1123stream_wantwrite (lsquic_stream_t *stream, int is_want)
1124{
1125    if (writing_file(stream))
1126    {
1127        int old_val = stream->stream_flags & STREAM_SAVED_WANTWR;
1128        stream->stream_flags |= old_val & (0 - !!is_want);
1129        return !!old_val;
1130    }
1131    else
1132        return stream_want_read_or_write(stream, is_want, STREAM_WANT_WRITE);
1133}
1134
1135
1136int
1137lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1138{
1139    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1140    {
1141        return stream_wantwrite(stream, is_want);
1142    }
1143    else
1144    {
1145        errno = EBADF;
1146        return -1;
1147    }
1148}
1149
1150
1151static void
1152stream_flush_on_write (lsquic_stream_t *stream,
1153                                    struct lsquic_stream_ctx *stream_ctx)
1154{
1155    size_t sum, n_flushed;
1156
1157    assert(stream->stream_flags & STREAM_U_WRITE_DONE);
1158
1159    sum = sum_sbts(stream) - stream->tosend_sz;
1160    if (sum == 0)
1161    {   /* This can occur if the stream has been written to and closed by
1162         * the user, but a RST_STREAM comes in that drops all data.
1163         */
1164        LSQ_DEBUG("%s: no more data to send", __func__);
1165        stream_wantwrite(stream, 0);
1166        return;
1167    }
1168
1169    n_flushed = stream_flush_internal(stream, sum);
1170    if (n_flushed == sum)
1171    {
1172        LSQ_DEBUG("Flushed all remaining data (%zd bytes)", n_flushed);
1173        stream_wantwrite(stream, 0);
1174    }
1175    else
1176        LSQ_DEBUG("Flushed %zd out of %zd remaining data", n_flushed, sum);
1177}
1178
1179
1180#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1181                    STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1182
1183
1184static void
1185stream_dispatch_rw_events_loop (lsquic_stream_t *stream, int *processed)
1186{
1187    unsigned no_progress_count, no_progress_limit;
1188    enum stream_flags flags;
1189    uint64_t size;
1190    size_t sbt_size;
1191
1192    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1193    *processed = 0;
1194
1195    no_progress_count = 0;
1196    while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1197    {
1198        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1199        size  = stream->read_offset;
1200
1201        stream->stream_if->on_read(stream, stream->st_ctx);
1202        *processed = 1;
1203
1204        if (no_progress_limit && size == stream->read_offset &&
1205                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1206        {
1207            ++no_progress_count;
1208            if (no_progress_count >= no_progress_limit)
1209            {
1210                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1211                    "progress) in user code reading from stream",
1212                    no_progress_count,
1213                    no_progress_count == 1 ? "" : "s");
1214                break;
1215            }
1216        }
1217        else
1218            no_progress_count = 0;
1219    }
1220
1221    no_progress_count = 0;
1222    while ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream))
1223    {
1224        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1225        size  = stream->tosend_sz;
1226        if (0 == size)
1227            sbt_size = sum_sbts(stream);
1228
1229        stream->on_write_cb(stream, stream->st_ctx);
1230        *processed = 1;
1231
1232        if (no_progress_limit &&
1233            flags == (stream->stream_flags & USER_PROGRESS_FLAGS) &&
1234            (0 == size ? sbt_size == sum_sbts(stream) :
1235                                                size == stream->tosend_sz))
1236        {
1237            ++no_progress_count;
1238            if (no_progress_count >= no_progress_limit)
1239            {
1240                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1241                    "progress) in user code writing to stream",
1242                    no_progress_count,
1243                    no_progress_count == 1 ? "" : "s");
1244                break;
1245            }
1246        }
1247        else
1248            no_progress_count = 0;
1249    }
1250}
1251
1252
1253static void
1254stream_dispatch_rw_events_once (lsquic_stream_t *stream, int *processed)
1255{
1256    *processed = 0;
1257
1258    if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1259    {
1260        stream->stream_if->on_read(stream, stream->st_ctx);
1261        *processed = 1;
1262    }
1263
1264    if ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream))
1265    {
1266        stream->on_write_cb(stream, stream->st_ctx);
1267        *processed = 1;
1268    }
1269}
1270
1271
1272static void
1273maybe_mark_as_blocked (lsquic_stream_t *stream)
1274{
1275    uint64_t off, stream_data_sz;
1276
1277    if (stream->tosend_sz)
1278        stream_data_sz = stream->tosend_sz;
1279    else
1280        stream_data_sz = sum_sbts(stream);
1281
1282    off = stream->tosend_off + stream_data_sz;
1283    if (off >= stream->max_send_off)
1284    {
1285        assert(off == stream->max_send_off);
1286        if (stream->blocked_off < stream->max_send_off)
1287        {
1288            stream->blocked_off = stream->max_send_off;
1289            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1290                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1291                                                            next_send_stream);
1292            stream->stream_flags |= STREAM_SEND_BLOCKED;
1293            LSQ_DEBUG("marked stream-blocked at stream offset "
1294                                            "%"PRIu64, stream->blocked_off);
1295            return;
1296        }
1297    }
1298
1299    if (stream->stream_flags & STREAM_CONN_LIMITED)
1300    {
1301        struct lsquic_conn_cap *const cc = &stream->conn_pub->conn_cap;
1302        off = cc->cc_sent + cc->cc_tosend;
1303        if (off >= cc->cc_max)
1304        {
1305            assert(off == cc->cc_max);
1306            if (cc->cc_blocked < cc->cc_max)
1307            {
1308                cc->cc_blocked = cc->cc_max;
1309                stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1310                LSQ_DEBUG("marked connection-blocked at connection offset "
1311                                                        "%"PRIu64, cc->cc_max);
1312            }
1313        }
1314    }
1315}
1316
1317
1318void
1319lsquic_stream_dispatch_rw_events (lsquic_stream_t *stream)
1320{
1321    int processed;
1322    uint64_t tosend_off;
1323
1324    assert(stream->stream_flags & STREAM_RW_EVENT_FLAGS);
1325    tosend_off = stream->tosend_off;
1326
1327    if (stream->stream_flags & STREAM_RW_ONCE)
1328        stream_dispatch_rw_events_once(stream, &processed);
1329    else
1330        stream_dispatch_rw_events_loop(stream, &processed);
1331
1332    /* User wants to write, but no progress has been made: either stream
1333     * or connection is blocked.
1334     */
1335    if ((stream->stream_flags & STREAM_WANT_WRITE) &&
1336                        stream->tosend_off == tosend_off &&
1337                            (stream->tosend_sz == 0 && sum_sbts(stream) == 0))
1338        maybe_mark_as_blocked(stream);
1339
1340    if (stream->stream_flags & STREAM_RW_EVENT_FLAGS)
1341    {
1342        if (processed)
1343        {   /* Move the stream to the end of the list to ensure fairness. */
1344            TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1345            TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1346        }
1347    }
1348    else if (((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
1349                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))
1350        LSQ_DEBUG("stream %u stalled", stream->id);
1351}
1352
1353
1354static struct stream_buf_tosend *
1355get_sbt_buf (lsquic_stream_t *stream)
1356{
1357    struct stream_buf_tosend *sbt;
1358
1359    sbt = TAILQ_LAST(&stream->bufs_tosend, sbts_tailq);
1360    if (!(sbt && SBT_BUF == sbt->sbt_type && (sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz) > 0))
1361    {
1362        sbt = lsquic_mm_get_4k(stream->conn_pub->mm);
1363        if (!sbt)
1364            return NULL;
1365        sbt->sbt_type = SBT_BUF;
1366        sbt->u.buf.sbt_sz  = 0;
1367        sbt->u.buf.sbt_off = 0;
1368        TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt);
1369    }
1370
1371    return sbt;
1372}
1373
1374
1375static size_t
1376sbt_write (struct stream_buf_tosend *sbt, const void *buf, size_t len)
1377{
1378    assert(SBT_BUF == sbt->sbt_type);
1379    size_t ntowrite = sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz;
1380    if (len < ntowrite)
1381        ntowrite = len;
1382    memcpy(sbt->u.buf.sbt_data + sbt->u.buf.sbt_sz, buf, ntowrite);
1383    sbt->u.buf.sbt_sz += ntowrite;
1384    return ntowrite;
1385}
1386
1387
1388static size_t
1389sbt_read_buf (struct stream_buf_tosend *sbt, void *buf, size_t len)
1390{
1391    size_t navail = sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off;
1392    if (len > navail)
1393        len = navail;
1394    memcpy(buf, sbt->u.buf.sbt_data + sbt->u.buf.sbt_off, len);
1395    sbt->u.buf.sbt_off += len;
1396    return len;
1397}
1398
1399
1400static void
1401incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr)
1402{
1403    stream->tosend_sz                    += incr;
1404    if (stream->stream_flags & STREAM_CONN_LIMITED)
1405    {
1406        assert(stream->conn_pub->conn_cap.cc_tosend +
1407                stream->conn_pub->conn_cap.cc_sent + incr <=
1408                    stream->conn_pub->conn_cap.cc_max);
1409        stream->conn_pub->conn_cap.cc_tosend += incr;
1410    }
1411}
1412
1413
1414static void
1415decr_tosend_sz (lsquic_stream_t *stream, uint64_t decr)
1416{
1417    assert(decr <= stream->tosend_sz);
1418    stream->tosend_sz                    -= decr;
1419    if (stream->stream_flags & STREAM_CONN_LIMITED)
1420    {
1421        assert(decr <= stream->conn_pub->conn_cap.cc_tosend);
1422        stream->conn_pub->conn_cap.cc_tosend -= decr;
1423    }
1424}
1425
1426
1427static void
1428sbt_truncated_file (struct stream_buf_tosend *sbt)
1429{
1430    off_t delta = sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1431    decr_tosend_sz(sbt->u.file.sbt_stream, delta);
1432    sbt->u.file.sbt_sz = sbt->u.file.sbt_off;
1433    sbt->u.file.sbt_last = 1;
1434}
1435
1436
1437static void
1438stop_reading_from_file (lsquic_stream_t *stream)
1439{
1440    assert(stream->file_fd >= 0);
1441    if (stream->stream_flags & STREAM_CLOSE_FILE)
1442        (void) close(stream->file_fd);
1443    stream->file_fd = -1;
1444    stream_wantwrite(stream, !!(stream->stream_flags & STREAM_SAVED_WANTWR));
1445    use_user_on_write(stream);
1446}
1447
1448
1449static size_t
1450sbt_read_file (struct stream_buf_tosend *sbt, void *pbuf, size_t len)
1451{
1452    const lsquic_stream_t *const stream = sbt->u.file.sbt_stream;
1453    size_t navail;
1454    ssize_t nread;
1455    unsigned char *buf = pbuf;
1456
1457    navail = sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1458    if (len > navail)
1459        len = navail;
1460
1461    assert(len > 0);
1462
1463    *buf++ = sbt->u.file.sbt_stream->file_byte;
1464    sbt->u.file.sbt_off += 1;
1465    len -= 1;
1466
1467    while (len > 0)
1468    {
1469        nread = read(sbt->u.file.sbt_fd, buf, len);
1470        if (-1 == nread)
1471        {
1472            LSQ_WARN("error reading: %s", strerror(errno));
1473            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1474                (intmax_t) sbt->u.file.sbt_off,
1475                (intmax_t) sbt->u.file.sbt_sz);
1476            sbt_truncated_file(sbt);
1477            break;
1478        }
1479        else if (0 == nread)
1480        {
1481            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1482                (intmax_t) sbt->u.file.sbt_off,
1483                (intmax_t) sbt->u.file.sbt_sz);
1484            sbt_truncated_file(sbt);
1485            break;
1486        }
1487        buf += nread;
1488        len -= nread;
1489        sbt->u.file.sbt_off += nread;
1490    }
1491
1492    len = buf - (unsigned char *) pbuf;
1493
1494    if (sbt->u.file.sbt_off < sbt->u.file.sbt_sz || !sbt->u.file.sbt_last)
1495    {
1496        nread = read(sbt->u.file.sbt_fd, &sbt->u.file.sbt_stream->file_byte, 1);
1497        if (-1 == nread)
1498        {
1499            LSQ_WARN("error reading: %s", strerror(errno));
1500            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1501                (intmax_t) sbt->u.file.sbt_off,
1502                (intmax_t) sbt->u.file.sbt_sz);
1503            sbt_truncated_file(sbt);
1504        }
1505        else if (0 == nread)
1506        {
1507            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1508                (intmax_t) sbt->u.file.sbt_off,
1509                (intmax_t) sbt->u.file.sbt_sz);
1510            sbt_truncated_file(sbt);
1511        }
1512    }
1513
1514    if (sbt->u.file.sbt_last && sbt->u.file.sbt_off == sbt->u.file.sbt_sz)
1515        stop_reading_from_file(sbt->u.file.sbt_stream);
1516
1517    return len;
1518}
1519
1520
1521static size_t
1522sbt_read (struct stream_buf_tosend *sbt, void *buf, size_t len)
1523{
1524    switch (sbt->sbt_type)
1525    {
1526    case SBT_BUF:
1527        return sbt_read_buf(sbt, buf, len);
1528    default:
1529        assert(SBT_FILE == sbt->sbt_type);
1530        return sbt_read_file(sbt, buf, len);
1531    }
1532}
1533
1534
1535static int
1536sbt_done (const struct stream_buf_tosend *sbt)
1537{
1538    switch (sbt->sbt_type)
1539    {
1540    case SBT_BUF:
1541        return sbt->u.buf.sbt_off == sbt->u.buf.sbt_sz;
1542    default:
1543        assert(SBT_FILE == sbt->sbt_type);
1544        return sbt->u.file.sbt_off == sbt->u.file.sbt_sz;
1545    }
1546}
1547
1548
1549static void
1550sbt_destroy (lsquic_stream_t *stream, struct stream_buf_tosend *sbt)
1551{
1552    switch (sbt->sbt_type)
1553    {
1554    case SBT_BUF:
1555        lsquic_mm_put_4k(stream->conn_pub->mm, sbt);
1556        break;
1557    default:
1558        assert(SBT_FILE == sbt->sbt_type);
1559        free(sbt);
1560    }
1561}
1562
1563
1564static size_t
1565sbt_size (const struct stream_buf_tosend *sbt)
1566{
1567    switch (sbt->sbt_type)
1568    {
1569    case SBT_BUF:
1570        return sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off;
1571    default:
1572        return sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1573    }
1574}
1575
1576
1577static size_t
1578sum_sbts (const lsquic_stream_t *stream)
1579{
1580    size_t sum;
1581    struct stream_buf_tosend *sbt;
1582
1583    sum = 0;
1584    TAILQ_FOREACH(sbt, &stream->bufs_tosend, next_sbt)
1585        sum += sbt_size(sbt);
1586
1587    return sum;
1588}
1589
1590
1591static void
1592maybe_put_on_sending_streams (lsquic_stream_t *stream)
1593{
1594    if (lsquic_stream_tosend_sz(stream))
1595    {
1596        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1597            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
1598        stream->stream_flags |= STREAM_SEND_DATA;
1599    }
1600}
1601
1602
1603static size_t
1604stream_flush_internal (lsquic_stream_t *stream, size_t size)
1605{
1606    uint64_t conn_avail;
1607
1608    assert(0 == stream->tosend_sz ||
1609            (stream->stream_flags & STREAM_U_WRITE_DONE));
1610    if (stream->stream_flags & STREAM_CONN_LIMITED)
1611    {
1612        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
1613        if (size > conn_avail)
1614        {
1615            LSQ_DEBUG("connection-limited: flushing only %"PRIu64
1616                " out of %zd bytes", conn_avail, size);
1617            size = conn_avail;
1618        }
1619    }
1620    LSQ_DEBUG("flushed %zd bytes of stream %u", size, stream->id);
1621    SM_HISTORY_APPEND(stream, SHE_FLUSH);
1622    incr_tosend_sz(stream, size);
1623    maybe_put_on_sending_streams(stream);
1624    return size;
1625}
1626
1627
1628/* When stream->tosend_sz is zero and we have anything in SBT list, this
1629 * means that we have unflushed data.
1630 */
1631int
1632lsquic_stream_flush (lsquic_stream_t *stream)
1633{
1634    size_t sum;
1635    if (0 == stream->tosend_sz && !TAILQ_EMPTY(&stream->bufs_tosend))
1636    {
1637        sum = sum_sbts(stream);
1638        if (stream_flush_internal(stream, sum) > 0)
1639            maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_FLUSH);
1640    }
1641    return 0;
1642}
1643
1644
1645/* The flush threshold is the maximum size of stream data that can be sent
1646 * in a full packet.
1647 */
1648static size_t
1649flush_threshold (const lsquic_stream_t *stream)
1650{
1651    enum packet_out_flags flags;
1652    unsigned packet_header_sz, stream_header_sz;
1653    size_t threshold;
1654
1655    /* We are guessing the number of bytes that will be used to encode
1656     * packet number, because we do not have this information at this
1657     * point in time.
1658     */
1659    flags = PACKNO_LEN_2 << POBIT_SHIFT;
1660    if (stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)
1661        flags |= PO_CONN_ID;
1662
1663    packet_header_sz = lsquic_po_header_length(flags);
1664    stream_header_sz = stream->conn_pub->lconn->cn_pf
1665            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1666
1667    threshold = stream->conn_pub->lconn->cn_pack_size - packet_header_sz
1668                                                         - stream_header_sz;
1669    return threshold;
1670}
1671
1672
1673static size_t
1674flush_or_check_flags (lsquic_stream_t *stream, size_t sz)
1675{
1676    size_t sum;
1677    if (0 == stream->tosend_sz)
1678    {
1679        sum = sum_sbts(stream);
1680        if (sum >= flush_threshold(stream))
1681            return stream_flush_internal(stream, sum);
1682        else
1683            return 0;
1684    }
1685    else
1686    {
1687        incr_tosend_sz(stream, sz);
1688        maybe_put_on_sending_streams(stream);
1689        return sz;
1690    }
1691}
1692
1693
1694static void
1695stream_file_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *st_ctx)
1696{
1697    struct stream_buf_tosend *sbt;
1698    ssize_t nr;
1699    size_t size, left;
1700    int last;
1701
1702    if (stream->stream_flags & STREAM_RST_FLAGS)
1703    {
1704        LSQ_INFO("stream was reset: stopping sending the file at offset %jd",
1705                                                (intmax_t) stream->file_off);
1706        stop_reading_from_file(stream);
1707        return;
1708    }
1709
1710    /* Write as much as we can */
1711    size = lsquic_stream_write_avail(stream);
1712    left = stream->file_size - stream->file_off;
1713    if (left < size)
1714        size = left;
1715
1716    if (0 == stream->file_off)
1717    {
1718        /* Try to read in 1 byte to check for truncation.  Having a byte in
1719         * store guarantees that we can generate a frame even if the file is
1720         * truncated later.  This function only does it once, when the first
1721         * SBT is queued.  Subsequent SBT use the byte read in by previous
1722         * SBT in sbt_read_file().
1723         */
1724        nr = read(stream->file_fd, &stream->file_byte, 1);
1725        if (nr != 1)
1726        {
1727            if (nr < 0)
1728                LSQ_WARN("cannot read from file: %s", strerror(errno));
1729            LSQ_INFO("stopping sending the file at offset %jd",
1730                                                (intmax_t) stream->file_off);
1731            stop_reading_from_file(stream);
1732            return;
1733        }
1734    }
1735
1736    last = stream->file_off + (off_t) size == stream->file_size;
1737
1738    sbt = malloc(offsetof(struct stream_buf_tosend, u.file.sbt_last) +
1739                    sizeof(((struct stream_buf_tosend *)0)->u.file.sbt_last));
1740    if (!sbt)
1741    {
1742        LSQ_WARN("malloc failed: %s", strerror(errno));
1743        LSQ_INFO("stopping sending the file at offset %jd",
1744                                                (intmax_t) stream->file_off);
1745        stop_reading_from_file(stream);
1746        return;
1747    }
1748
1749    sbt->sbt_type          = SBT_FILE;
1750    sbt->u.file.sbt_stream = stream;
1751    sbt->u.file.sbt_fd     = stream->file_fd;
1752    sbt->u.file.sbt_sz     = size;
1753    sbt->u.file.sbt_off    = 0;
1754    sbt->u.file.sbt_last   = last;
1755    TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt);
1756
1757    LSQ_DEBUG("inserted %zd-byte sbt at offset %jd, last: %d", size,
1758                                        (intmax_t) stream->file_off, last);
1759
1760    stream->file_off += size;
1761
1762    incr_tosend_sz(stream, size);
1763    maybe_put_on_sending_streams(stream);
1764
1765    if (last)
1766        stream_want_read_or_write(stream, 0, STREAM_WANT_WRITE);
1767}
1768
1769
1770static void
1771stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, size_t size,
1772                 int close)
1773{
1774    int want_write;
1775
1776    /* STREAM_WANT_WRITE is not guaranteed to be set: the user may have
1777     * already unset it.
1778     */
1779    want_write = !!(stream->stream_flags & STREAM_WANT_WRITE);
1780    stream_wantwrite(stream, 1);
1781
1782    stream->file_fd   = fd;
1783    stream->file_off  = off;
1784    stream->file_size = size;
1785
1786    stream_wantwrite(stream, want_write);
1787
1788    if (close)
1789        stream->stream_flags |= STREAM_CLOSE_FILE;
1790    else
1791        stream->stream_flags &= ~STREAM_CLOSE_FILE;
1792
1793    use_internal_on_write_file(stream);
1794    stream->on_write_cb(stream, stream->st_ctx);
1795}
1796
1797
1798#define COMMON_WRITE_CHECKS() do {                                          \
1799    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1800                                                   == STREAM_USE_HEADERS)   \
1801    {                                                                       \
1802        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1803        errno = EILSEQ;                                                     \
1804        return -1;                                                          \
1805    }                                                                       \
1806    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1807    {                                                                       \
1808        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1809        errno = ECONNRESET;                                                 \
1810        return -1;                                                          \
1811    }                                                                       \
1812    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1813    {                                                                       \
1814        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1815                                                                "writing"); \
1816        errno = EBADF;                                                      \
1817        return -1;                                                          \
1818    }                                                                       \
1819} while (0)
1820
1821
1822int
1823lsquic_stream_write_file (lsquic_stream_t *stream, const char *filename)
1824{
1825    int fd, saved_errno;
1826    struct stat st;
1827
1828    COMMON_WRITE_CHECKS();
1829
1830    fd = open(filename, O_RDONLY);
1831    if (fd < 0)
1832    {
1833        LSQ_WARN("could not open `%s' for reading: %s", filename, strerror(errno));
1834        return -1;
1835    }
1836
1837    if (fstat(fd, &st) < 0)
1838    {
1839        LSQ_WARN("fstat64(%s) failed: %s", filename, strerror(errno));
1840        saved_errno = errno;
1841        (void) close(fd);
1842        errno = saved_errno;
1843        return -1;
1844    }
1845
1846    if (0 == st.st_size)
1847    {
1848        LSQ_INFO("Writing zero-sized file `%s' is a no-op", filename);
1849        (void) close(fd);
1850        return 0;
1851    }
1852
1853    LSQ_DEBUG("Inserted `%s' into SBT queue; size: %jd", filename,
1854                                                    (intmax_t) st.st_size);
1855
1856    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1857    stream_sendfile(stream, fd, 0, st.st_size, 1);
1858    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_WRITEFILE);
1859    return 0;
1860}
1861
1862
1863int
1864lsquic_stream_sendfile (lsquic_stream_t *stream, int fd, off_t off,
1865                        size_t size)
1866{
1867    COMMON_WRITE_CHECKS();
1868    if ((off_t) -1 == lseek(fd, off, SEEK_SET))
1869    {
1870        LSQ_INFO("lseek failed: %s", strerror(errno));
1871        return -1;
1872    }
1873    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1874    stream_sendfile(stream, fd, off, size, 0);
1875    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SENDFILE);
1876    return 0;
1877}
1878
1879
1880ssize_t
1881lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1882{
1883    struct stream_buf_tosend *sbt;
1884    size_t nw, stream_avail, n_flushed;
1885    const unsigned char *p = buf;
1886
1887    COMMON_WRITE_CHECKS();
1888    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1889
1890    stream_avail = lsquic_stream_write_avail(stream);
1891    if (stream_avail < len)
1892    {
1893        LSQ_DEBUG("cap length from %zd to %zd bytes", len, stream_avail);
1894        len = stream_avail;
1895    }
1896
1897    while (len > 0)
1898    {
1899        sbt = get_sbt_buf(stream);
1900        if (!sbt)
1901        {
1902            LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno));
1903            break;
1904        }
1905        nw = sbt_write(sbt, p, len);
1906        len -= nw;
1907        p += nw;
1908    }
1909
1910    const size_t n_written = p - (unsigned char *) buf;
1911    LSQ_DEBUG("wrote %"PRIiPTR" bytes to stream %u", n_written, stream->id);
1912
1913    n_flushed = flush_or_check_flags(stream, n_written);
1914    if (n_flushed)
1915        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITE);
1916
1917    return n_written;
1918}
1919
1920
1921ssize_t
1922lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1923                                                                    int iovcnt)
1924{
1925    int i;
1926    const unsigned char *p;
1927    struct stream_buf_tosend *sbt;
1928    size_t nw, stream_avail, len, n_flushed;
1929    ssize_t nw_total;
1930
1931    COMMON_WRITE_CHECKS();
1932    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1933
1934    nw_total = 0;
1935    stream_avail = lsquic_stream_write_avail(stream);
1936
1937    for (i = 0; i < iovcnt && stream_avail > 0; ++i)
1938    {
1939        len = iov[i].iov_len;
1940        p   = iov[i].iov_base;
1941        if (len > stream_avail)
1942        {
1943            LSQ_DEBUG("cap length from %zd to %zd bytes", nw_total + len,
1944                                                    nw_total + stream_avail);
1945            len = stream_avail;
1946        }
1947        nw_total += len;
1948        stream_avail -= len;
1949        while (len > 0)
1950        {
1951            sbt = get_sbt_buf(stream);
1952            if (!sbt)
1953            {
1954                LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno));
1955                break;
1956            }
1957            nw = sbt_write(sbt, p, len);
1958            len -= nw;
1959            p += nw;
1960        }
1961    }
1962
1963    LSQ_DEBUG("wrote %zd bytes to stream", nw_total);
1964
1965    n_flushed = flush_or_check_flags(stream, nw_total);
1966    if (n_flushed)
1967        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITEV);
1968
1969    return nw_total;
1970}
1971
1972
1973int
1974lsquic_stream_send_headers (lsquic_stream_t *stream,
1975                            const lsquic_http_headers_t *headers, int eos)
1976{
1977    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1978                                                     STREAM_U_WRITE_DONE))
1979                == STREAM_USE_HEADERS)
1980    {
1981        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1982                    stream->id, headers, eos, lsquic_stream_priority(stream));
1983        if (0 == s)
1984        {
1985            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1986            stream->stream_flags |= STREAM_HEADERS_SENT;
1987            if (eos)
1988                stream->stream_flags |= STREAM_FIN_SENT;
1989            LSQ_INFO("sent headers for stream %u", stream->id);
1990        }
1991        else
1992            LSQ_WARN("could not send headers: %s", strerror(errno));
1993        return s;
1994    }
1995    else
1996    {
1997        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1998        errno = EBADMSG;
1999        return -1;
2000    }
2001}
2002
2003
2004void
2005lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
2006{
2007    if (offset > stream->max_send_off)
2008    {
2009        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
2010        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
2011            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
2012        stream->max_send_off = offset;
2013        if (lsquic_stream_tosend_sz(stream))
2014        {
2015            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2016            {
2017                LSQ_DEBUG("stream %u unblocked, schedule sending again",
2018                    stream->id);
2019                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2020                                                            next_send_stream);
2021            }
2022            stream->stream_flags |= STREAM_SEND_DATA;
2023        }
2024    }
2025    else
2026        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
2027            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
2028            stream->max_send_off);
2029}
2030
2031
2032/* This function is used to update offsets after handshake completes and we
2033 * learn of peer's limits from the handshake values.
2034 */
2035int
2036lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
2037{
2038    LSQ_DEBUG("setting max_send_off to %u", offset);
2039    if (offset > stream->max_send_off)
2040    {
2041        lsquic_stream_window_update(stream, offset);
2042        return 0;
2043    }
2044    else if (offset < stream->tosend_off)
2045    {
2046        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
2047            "already sent on this stream (%"PRIu64" bytes)", offset,
2048            stream->tosend_off);
2049        return -1;
2050    }
2051    else
2052    {
2053        stream->max_send_off = offset;
2054        return 0;
2055    }
2056}
2057
2058
2059#ifndef NDEBUG
2060/* Use weak linkage so that tests can override this function */
2061size_t
2062lsquic_stream_tosend_sz (const lsquic_stream_t *stream)
2063    __attribute__((weak))
2064    ;
2065#endif
2066size_t
2067lsquic_stream_tosend_sz (const lsquic_stream_t *stream)
2068{
2069    assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off);
2070    return stream->tosend_sz;
2071}
2072
2073
2074#ifndef NDEBUG
2075/* Use weak linkage so that tests can override this function */
2076size_t
2077lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len,
2078                           int *reached_fin)
2079    __attribute__((weak))
2080    ;
2081#endif
2082size_t
2083lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len,
2084                           int *reached_fin)
2085{
2086    assert(stream->tosend_sz > 0);
2087    assert(stream->stream_flags & STREAM_SEND_DATA);
2088    const size_t tosend_sz = lsquic_stream_tosend_sz(stream);
2089    if (tosend_sz < len)
2090        len = tosend_sz;
2091    struct stream_buf_tosend *sbt;
2092    unsigned char *p = buf;
2093    unsigned char *const end = p + len;
2094    while (p < end && (sbt = TAILQ_FIRST(&stream->bufs_tosend)))
2095    {
2096        size_t nread = sbt_read(sbt, p, len);
2097        p += nread;
2098        len -= nread;
2099        if (sbt_done(sbt))
2100        {
2101            TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt);
2102            sbt_destroy(stream, sbt);
2103            LSQ_DEBUG("destroyed SBT");
2104        }
2105        else
2106            break;
2107    }
2108    const size_t n_read = p - (unsigned char *) buf;
2109    decr_tosend_sz(stream, n_read);
2110    stream->tosend_off += n_read;
2111    if (stream->stream_flags & STREAM_CONN_LIMITED)
2112    {
2113        stream->conn_pub->conn_cap.cc_sent += n_read;
2114        assert(stream->conn_pub->conn_cap.cc_sent <=
2115                                        stream->conn_pub->conn_cap.cc_max);
2116    }
2117    *reached_fin = lsquic_stream_tosend_fin(stream);
2118    return n_read;
2119}
2120
2121
2122void
2123lsquic_stream_stream_frame_sent (lsquic_stream_t *stream)
2124{
2125    assert(stream->stream_flags & STREAM_SEND_DATA);
2126    SM_HISTORY_APPEND(stream, SHE_FRAME_OUT);
2127    if (0 == lsquic_stream_tosend_sz(stream))
2128    {
2129        /* Mark the stream as having no sendable data independent of reason
2130         * why there is no data to send.
2131         */
2132        if (0 == stream->tosend_sz)
2133        {
2134            LSQ_DEBUG("all stream %u data has been scheduled for sending, "
2135                "now at offset 0x%"PRIX64, stream->id, stream->tosend_off);
2136            stream->stream_flags &= ~STREAM_SEND_DATA;
2137            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2138                TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
2139                                                        next_send_stream);
2140
2141            if ((stream->stream_flags & STREAM_U_WRITE_DONE) && !writing_file(stream))
2142            {
2143                stream->stream_flags |= STREAM_FIN_SENT;
2144                maybe_finish_stream(stream);
2145            }
2146        }
2147        else
2148        {
2149            LSQ_DEBUG("stream %u blocked from sending", stream->id);
2150            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2151                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2152                                                        next_send_stream);
2153            stream->stream_flags &= ~STREAM_SEND_DATA;
2154            stream->stream_flags |= STREAM_SEND_BLOCKED;
2155        }
2156    }
2157}
2158
2159
2160#ifndef NDEBUG
2161/* Use weak linkage so that tests can override this function */
2162uint64_t
2163lsquic_stream_tosend_offset (const lsquic_stream_t *stream)
2164    __attribute__((weak))
2165    ;
2166#endif
2167uint64_t
2168lsquic_stream_tosend_offset (const lsquic_stream_t *stream)
2169{
2170    return stream->tosend_off;
2171}
2172
2173
2174static void
2175drop_sbts (lsquic_stream_t *stream)
2176{
2177    struct stream_buf_tosend *sbt;
2178
2179    while ((sbt = TAILQ_FIRST(&stream->bufs_tosend)))
2180    {
2181        TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt);
2182        sbt_destroy(stream, sbt);
2183    }
2184
2185    decr_tosend_sz(stream, stream->tosend_sz);
2186    stream->tosend_sz = 0;
2187}
2188
2189
2190void
2191lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
2192{
2193    lsquic_stream_reset_ext(stream, error_code, 1);
2194}
2195
2196
2197void
2198lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
2199                         int do_close)
2200{
2201    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
2202    {
2203        LSQ_INFO("reset already sent");
2204        return;
2205    }
2206
2207    SM_HISTORY_APPEND(stream, SHE_RESET);
2208
2209    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
2210    stream->error_code = error_code;
2211
2212    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2213        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2214                                                        next_send_stream);
2215    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
2216    stream->stream_flags |= STREAM_SEND_RST;
2217
2218    drop_sbts(stream);
2219    maybe_schedule_call_on_close(stream);
2220
2221    if (do_close)
2222        lsquic_stream_close(stream);
2223    else
2224        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT);
2225}
2226
2227
2228unsigned
2229lsquic_stream_id (const lsquic_stream_t *stream)
2230{
2231    return stream->id;
2232}
2233
2234
2235struct lsquic_conn *
2236lsquic_stream_conn (const lsquic_stream_t *stream)
2237{
2238    return stream->conn_pub->lconn;
2239}
2240
2241
2242int
2243lsquic_stream_close (lsquic_stream_t *stream)
2244{
2245    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
2246    SM_HISTORY_APPEND(stream, SHE_CLOSE);
2247    if (lsquic_stream_is_closed(stream))
2248    {
2249        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
2250        errno = EBADF;
2251        return -1;
2252    }
2253    stream_shutdown_write(stream);
2254    stream_shutdown_read(stream);
2255    maybe_schedule_call_on_close(stream);
2256    maybe_finish_stream(stream);
2257    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE);
2258    return 0;
2259}
2260
2261
2262void
2263lsquic_stream_acked (lsquic_stream_t *stream)
2264{
2265    assert(stream->n_unacked);
2266    --stream->n_unacked;
2267    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
2268    if (0 == stream->n_unacked)
2269        maybe_finish_stream(stream);
2270}
2271
2272
2273void
2274lsquic_stream_push_req (lsquic_stream_t *stream,
2275                        struct uncompressed_headers *push_req)
2276{
2277    assert(!stream->push_req);
2278    stream->push_req = push_req;
2279    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
2280}
2281
2282
2283int
2284lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2285{
2286    return 1 & ~stream->id;
2287}
2288
2289
2290int
2291lsquic_stream_push_info (const lsquic_stream_t *stream,
2292        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
2293{
2294    if (lsquic_stream_is_pushed(stream))
2295    {
2296        assert(stream->push_req);
2297        *ref_stream_id = stream->push_req->uh_stream_id;
2298        *headers       = stream->push_req->uh_headers;
2299        *headers_sz    = stream->push_req->uh_size;
2300        return 0;
2301    }
2302    else
2303        return -1;
2304}
2305
2306
2307int
2308lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2309{
2310    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2311    {
2312        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2313        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2314        stream->stream_flags |= STREAM_HAVE_UH;
2315        if (uh->uh_flags & UH_FIN)
2316            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2317        stream->uh = uh;
2318        if (uh->uh_oth_stream_id == 0)
2319        {
2320            if (uh->uh_weight)
2321                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2322        }
2323        else
2324            LSQ_NOTICE("don't know how to depend on stream %u",
2325                                                        uh->uh_oth_stream_id);
2326        return 0;
2327    }
2328    else
2329    {
2330        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2331        return -1;
2332    }
2333}
2334
2335
2336unsigned
2337lsquic_stream_priority (const lsquic_stream_t *stream)
2338{
2339    return 256 - stream->sm_priority;
2340}
2341
2342
2343int
2344lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2345{
2346    /* The user should never get a reference to the special streams,
2347     * but let's check just in case:
2348     */
2349    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2350        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2351                                LSQUIC_STREAM_HEADERS == stream->id))
2352        return -1;
2353    if (priority < 1 || priority > 256)
2354        return -1;
2355    stream->sm_priority = 256 - priority;
2356    LSQ_DEBUG("set priority to %u", priority);
2357    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2358    return 0;
2359}
2360
2361
2362int
2363lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2364{
2365    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2366    {
2367        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2368                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2369        {
2370            /* We need to send headers only if we are a) using HEADERS stream
2371             * and b) we already sent initial headers.  If initial headers
2372             * have not been sent yet, stream priority will be sent in the
2373             * HEADERS frame.
2374             */
2375            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2376                                                    stream->id, 0, 0, priority);
2377        }
2378        else
2379            return 0;
2380    }
2381    else
2382        return -1;
2383}
2384
2385
2386lsquic_stream_ctx_t *
2387lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2388{
2389    return stream->st_ctx;
2390}
2391
2392
2393int
2394lsquic_stream_refuse_push (lsquic_stream_t *stream)
2395{
2396    if (lsquic_stream_is_pushed(stream) &&
2397                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2398    {
2399        LSQ_DEBUG("refusing pushed stream: send reset");
2400        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2401        return 0;
2402    }
2403    else
2404        return -1;
2405}
2406
2407
2408