lsquic_stream.c revision 0ae3fccd
1/* Copyright (c) 2017 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <sys/types.h>
28#include <sys/stat.h>
29#include <fcntl.h>
30#include <unistd.h>
31#include <stddef.h>
32
33#include "lsquic.h"
34
35#include "lsquic_int_types.h"
36#include "lsquic_packet_common.h"
37#include "lsquic_packet_in.h"
38#include "lsquic_malo.h"
39#include "lsquic_conn_flow.h"
40#include "lsquic_rtt.h"
41#include "lsquic_sfcw.h"
42#include "lsquic_stream.h"
43#include "lsquic_conn_public.h"
44#include "lsquic_util.h"
45#include "lsquic_mm.h"
46#include "lsquic_headers_stream.h"
47#include "lsquic_frame_reader.h"
48#include "lsquic_conn.h"
49#include "lsquic_data_in_if.h"
50#include "lsquic_parse.h"
51#include "lsquic_packet_out.h"
52#include "lsquic_engine_public.h"
53#include "lsquic_senhist.h"
54#include "lsquic_pacer.h"
55#include "lsquic_cubic.h"
56#include "lsquic_send_ctl.h"
57#include "lsquic_ev_log.h"
58
59#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
60#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
61#define LSQUIC_LOG_STREAM_ID stream->id
62#include "lsquic_logger.h"
63
64enum sbt_type {
65    SBT_BUF,
66    SBT_FILE,
67};
68
69struct stream_buf_tosend
70{
71    TAILQ_ENTRY(stream_buf_tosend)  next_sbt;
72    enum sbt_type                   sbt_type;
73    /* On 64-bit platform, here is a four-byte hole */
74    union {
75        struct {
76            size_t                  sbt_sz;
77            size_t                  sbt_off;
78            unsigned char           sbt_data[
79                0x1000 - sizeof(enum sbt_type) - sizeof(long) + 4 - sizeof(size_t) * 2
80                                - sizeof(TAILQ_ENTRY(stream_buf_tosend))
81            ];
82        }                           buf;
83        struct {
84            struct lsquic_stream   *sbt_stream;
85            off_t                   sbt_sz;
86            off_t                   sbt_off;
87            int                     sbt_fd;
88            signed char             sbt_last;
89        }                           file;
90    }                               u;
91};
92
93typedef char _sbt_is_4K[(sizeof(struct stream_buf_tosend) == 0x1000) - 1];
94
95static size_t
96sum_sbts (const lsquic_stream_t *stream);
97
98static void
99drop_sbts (lsquic_stream_t *stream);
100
101static void
102drop_frames_in (lsquic_stream_t *stream);
103
104static void
105maybe_schedule_call_on_close (lsquic_stream_t *stream);
106
107static void
108stream_file_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *);
109
110static void
111stream_flush_on_write (lsquic_stream_t *, struct lsquic_stream_ctx *);
112
113static int
114stream_wantread (lsquic_stream_t *stream, int is_want);
115
116static int
117stream_wantwrite (lsquic_stream_t *stream, int is_want);
118
119static void
120stop_reading_from_file (lsquic_stream_t *stream);
121
122static int
123stream_readable (const lsquic_stream_t *stream);
124
125static int
126stream_writeable (const lsquic_stream_t *stream);
127
128static size_t
129stream_flush_internal (lsquic_stream_t *stream, size_t size);
130
131static void
132incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr);
133
134static void
135maybe_put_on_sending_streams (lsquic_stream_t *stream);
136
137
138#if LSQUIC_KEEP_STREAM_HISTORY
139/* These values are printable ASCII characters for ease of printing the
140 * whole history in a single line of a log message.
141 *
142 * The list of events is not exhaustive: only most interesting events
143 * are recorded.
144 */
145enum stream_history_event
146{
147    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
148    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
149    SHE_REACH_FIN          =  'a',
150    SHE_BLOCKED_OUT        =  'b',
151    SHE_CREATED            =  'C',
152    SHE_FRAME_IN           =  'd',
153    SHE_FRAME_OUT          =  'D',
154    SHE_RESET              =  'e',
155    SHE_WINDOW_UPDATE      =  'E',
156    SHE_FIN_IN             =  'f',
157    SHE_FINISHED           =  'F',
158    SHE_GOAWAY_IN          =  'g',
159    SHE_USER_WRITE_HEADER  =  'h',
160    SHE_HEADERS_IN         =  'H',
161    SHE_ONCLOSE_SCHED      =  'l',
162    SHE_ONCLOSE_CALL       =  'L',
163    SHE_ONNEW              =  'N',
164    SHE_SET_PRIO           =  'p',
165    SHE_USER_READ          =  'r',
166    SHE_SHUTDOWN_READ      =  'R',
167    SHE_RST_IN             =  's',
168    SHE_RST_OUT            =  't',
169    SHE_FLUSH              =  'u',
170    SHE_USER_WRITE_DATA    =  'w',
171    SHE_SHUTDOWN_WRITE     =  'W',
172    SHE_CLOSE              =  'X',
173    SHE_FORCE_FINISH       =  'Z',
174};
175
176static void
177sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
178{
179    enum stream_history_event prev_event;
180    sm_hist_idx_t idx;
181    int plus;
182
183    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
184    plus = SHE_PLUS == stream->sm_hist_buf[idx];
185    idx = (idx - plus) & SM_HIST_IDX_MASK;
186    prev_event = stream->sm_hist_buf[idx];
187
188    if (prev_event == sh_event && plus)
189        return;
190
191    if (prev_event == sh_event)
192        sh_event = SHE_PLUS;
193    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
194
195    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
196        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
197                                                        stream->sm_hist_buf);
198}
199
200#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
201#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
202        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
203            LSQ_DEBUG("history: [%.*s]",                                    \
204                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
205                (stream)->sm_hist_buf);                                     \
206    } while (0)
207#else
208#   define SM_HISTORY_APPEND(stream, event)
209#   define SM_HISTORY_DUMP_REMAINING(stream)
210#endif
211
212
213/* Here, "readable" means that the user is able to read from the stream. */
214static void
215maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream,
216                                                        enum rw_reason reason)
217{
218    if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) &&
219                                                stream_readable(stream))
220    {
221        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
222                                            stream->conn_pub->lconn, reason);
223    }
224}
225
226
227/* Here, "writeable" means that data can be put into packets to be
228 * scheduled to be sent out.
229 */
230static void
231maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream,
232                                                        enum rw_reason reason)
233{
234    if (!(stream->conn_pub->enpub->enp_flags & ENPUB_PROC) &&
235            lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
236          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
237    {
238        lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
239                                            stream->conn_pub->lconn, reason);
240    }
241}
242
243
244static int
245stream_stalled (const lsquic_stream_t *stream)
246{
247    return 0 == (stream->stream_flags & STREAM_RW_EVENT_FLAGS) &&
248           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
249                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
250}
251
252
253static void
254use_user_on_write (lsquic_stream_t *stream)
255{
256    LSQ_DEBUG("stream %u: use user-supplied on-write callback", stream->id);
257    stream->on_write_cb  = stream->stream_if->on_write;
258}
259
260
261static void
262use_internal_on_write_file (lsquic_stream_t *stream)
263{
264    LSQ_DEBUG("use internal on-write callback (file)");
265    stream->on_write_cb  = stream_file_on_write;
266}
267
268
269static void
270use_internal_on_write_flush (lsquic_stream_t *stream)
271{
272    LSQ_DEBUG("use internal on-write callback (flush)");
273    stream->on_write_cb  = stream_flush_on_write;
274}
275
276
277#define writing_file(stream) ((stream)->file_fd >= 0)
278
279
280/* TODO: The logic to figure out whether the stream is connection limited
281 * should be taken out of the constructor.  The caller should specify this
282 * via one of enum stream_ctor_flags.
283 */
284lsquic_stream_t *
285lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
286                       const struct lsquic_stream_if *stream_if,
287                       void *stream_if_ctx, unsigned initial_window,
288                       unsigned initial_send_off,
289                       enum stream_ctor_flags ctor_flags)
290{
291    lsquic_cfcw_t *cfcw;
292    lsquic_stream_t *stream;
293
294    stream = calloc(1, sizeof(*stream));
295    if (!stream)
296        return NULL;
297
298    stream->stream_if = stream_if;
299    stream->id        = id;
300    stream->file_fd   = -1;
301    stream->conn_pub  = conn_pub;
302    if (!initial_window)
303        initial_window = 16 * 1024;
304    if (LSQUIC_STREAM_HANDSHAKE == id ||
305        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
306        cfcw = NULL;
307    else
308    {
309        cfcw = &conn_pub->cfcw;
310        stream->stream_flags |= STREAM_CONN_LIMITED;
311        if (conn_pub->hs)
312            stream->stream_flags |= STREAM_USE_HEADERS;
313        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
314    }
315    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
316    if (!initial_send_off)
317        initial_send_off = 16 * 1024;
318    stream->max_send_off = initial_send_off;
319    TAILQ_INIT(&stream->bufs_tosend);
320    if (ctor_flags & SCF_USE_DI_HASH)
321        stream->data_in = data_in_hash_new(conn_pub, id, 0);
322    else
323        stream->data_in = data_in_nocopy_new(conn_pub, id);
324    LSQ_DEBUG("created stream %u", id);
325    SM_HISTORY_APPEND(stream, SHE_CREATED);
326    if (ctor_flags & SCF_DI_AUTOSWITCH)
327        stream->stream_flags |= STREAM_AUTOSWITCH;
328    if (ctor_flags & SCF_CALL_ON_NEW)
329        lsquic_stream_call_on_new(stream, stream_if_ctx);
330    if (ctor_flags & SCF_DISP_RW_ONCE)
331        stream->stream_flags |= STREAM_RW_ONCE;
332    use_user_on_write(stream);
333    return stream;
334}
335
336
337void
338lsquic_stream_call_on_new (lsquic_stream_t *stream, void *stream_if_ctx)
339{
340    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
341    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
342    {
343        LSQ_DEBUG("calling on_new_stream");
344        SM_HISTORY_APPEND(stream, SHE_ONNEW);
345        stream->stream_flags |= STREAM_ONNEW_DONE;
346        stream->st_ctx = stream->stream_if->on_new_stream(stream_if_ctx, stream);
347    }
348}
349
350
351void
352lsquic_stream_destroy (lsquic_stream_t *stream)
353{
354    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
355                                                            STREAM_ONNEW_DONE)
356    {
357        stream->stream_flags |= STREAM_ONCLOSE_DONE;
358        stream->stream_if->on_close(stream, stream->st_ctx);
359    }
360    if (stream->file_fd >= 0)
361        (void) close(stream->file_fd);
362    if (stream->stream_flags & STREAM_SENDING_FLAGS)
363        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
364    if (stream->stream_flags & STREAM_RW_EVENT_FLAGS)
365        TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
366    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
367        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
368    lsquic_sfcw_consume_rem(&stream->fc);
369    drop_frames_in(stream);
370    drop_sbts(stream);
371    free(stream->push_req);
372    free(stream->uh);
373    LSQ_DEBUG("destroyed stream %u", stream->id);
374    SM_HISTORY_DUMP_REMAINING(stream);
375    free(stream);
376}
377
378
379static int
380stream_is_finished (const lsquic_stream_t *stream)
381{
382    return lsquic_stream_is_closed(stream)
383           /* n_unacked checks that no outgoing packets that reference this
384            * stream are outstanding:
385            */
386        && 0 == stream->n_unacked
387           /* This checks that no packets that reference this stream will
388            * become outstanding:
389            */
390        && 0 == (stream->stream_flags & (STREAM_SEND_DATA|STREAM_SEND_RST))
391        && ((stream->stream_flags & STREAM_FORCE_FINISH)
392          || (((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
393                || lsquic_stream_is_pushed(stream))
394           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
395}
396
397
398static void
399maybe_finish_stream (lsquic_stream_t *stream)
400{
401    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
402                                                    stream_is_finished(stream))
403    {
404        LSQ_DEBUG("stream %u is now finished", stream->id);
405        SM_HISTORY_APPEND(stream, SHE_FINISHED);
406        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
407            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
408                                                    next_service_stream);
409        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
410    }
411}
412
413
414static void
415maybe_schedule_call_on_close (lsquic_stream_t *stream)
416{
417    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
418                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
419            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
420    {
421        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
422            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
423                                                    next_service_stream);
424        stream->stream_flags |= STREAM_CALL_ONCLOSE;
425        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
426        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
427    }
428}
429
430
431void
432lsquic_stream_call_on_close (lsquic_stream_t *stream)
433{
434    assert(stream->stream_flags & STREAM_ONNEW_DONE);
435    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
436    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
437        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
438                                                    next_service_stream);
439    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
440    {
441        LSQ_DEBUG("calling on_close for stream %u", stream->id);
442        stream->stream_flags |= STREAM_ONCLOSE_DONE;
443        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
444        stream->stream_if->on_close(stream, stream->st_ctx);
445    }
446    else
447        assert(0);
448}
449
450
451static int
452stream_readable (const lsquic_stream_t *stream)
453{
454    /* A stream is readable if one of the following is true: */
455    return
456        /* - It is already finished: in that case, lsquic_stream_read() will
457         *   return 0.
458         */
459            (stream->stream_flags & STREAM_FIN_REACHED)
460        /* - The stream is reset, by either side.  In this case,
461         *   lsquic_stream_read() will return -1 (we want the user to be
462         *   able to collect the error).
463         */
464        ||  (stream->stream_flags & STREAM_RST_FLAGS)
465        /* - Either we are not in HTTP mode or the HTTP headers have been
466         *   received and the headers or data from the stream can be read.
467         */
468        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
469                                                        == STREAM_USE_HEADERS)
470            && (stream->uh != NULL
471                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
472                                                        stream->read_offset)))
473    ;
474}
475
476
477size_t
478lsquic_stream_write_avail (const lsquic_stream_t *stream)
479{
480    uint64_t stream_avail, conn_avail;
481    size_t unflushed_sum;
482
483    assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off);
484    stream_avail = stream->max_send_off - stream->tosend_off
485                                                - stream->tosend_sz;
486    if (0 == stream->tosend_sz)
487    {
488        unflushed_sum = sum_sbts(stream);
489        if (unflushed_sum > stream_avail)
490            stream_avail = 0;
491        else
492            stream_avail -= unflushed_sum;
493    }
494
495    if (stream->stream_flags & STREAM_CONN_LIMITED)
496    {
497        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
498        if (conn_avail < stream_avail)
499        {
500            LSQ_DEBUG("stream %u write buffer is limited by connection: "
501                "%"PRIu64, stream->id, conn_avail);
502            return conn_avail;
503        }
504    }
505
506    LSQ_DEBUG("stream %u write buffer is limited by stream: %"PRIu64,
507        stream->id, stream_avail);
508    return stream_avail;
509}
510
511
512static int
513stream_writeable (const lsquic_stream_t *stream)
514{
515    /* A stream is writeable if one of the following is true: */
516    return
517        /* - The stream is reset, by either side.  In this case,
518         *   lsquic_stream_write() will return -1 (we want the user to be
519         *   able to collect the error).
520         */
521            (stream->stream_flags & STREAM_RST_FLAGS)
522        /* - There is room to write to the stream.
523         */
524        ||  lsquic_stream_write_avail(stream) > 0
525    ;
526}
527
528
529int
530lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
531{
532    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
533                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
534    {
535        return -1;
536    }
537    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
538    {
539        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
540            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
541                                                    next_send_stream);
542        stream->stream_flags |= STREAM_SEND_WUF;
543    }
544    return 0;
545}
546
547
548int
549lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
550{
551    uint64_t max_off;
552    int got_next_offset;
553    enum ins_frame ins_frame;
554
555    assert(frame->packet_in);
556
557    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
558    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
559        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
560
561    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
562                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
563    {
564        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
565        lsquic_malo_put(frame);
566        return -1;
567    }
568
569    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
570    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
571    if (INS_FRAME_OK == ins_frame)
572    {
573        /* Update maximum offset in the flow controller and check for flow
574         * control violation:
575         */
576        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
577        if (0 != lsquic_stream_update_sfcw(stream, max_off))
578            return -1;
579        if (frame->data_frame.df_fin)
580        {
581            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
582            stream->stream_flags |= STREAM_FIN_RECVD;
583            maybe_finish_stream(stream);
584        }
585        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
586                (stream->data_in->di_flags & DI_SWITCH_IMPL))
587        {
588            stream->data_in = stream->data_in->di_if->di_switch_impl(
589                                        stream->data_in, stream->read_offset);
590            if (!stream->data_in)
591            {
592                stream->data_in = data_in_error_new();
593                return -1;
594            }
595        }
596        if (got_next_offset)
597            /* Checking the offset saves di_get_frame() call */
598            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN);
599        return 0;
600    }
601    else if (INS_FRAME_DUP == ins_frame)
602    {
603        return 0;
604    }
605    else
606    {
607        assert(INS_FRAME_ERR == ins_frame);
608        return -1;
609    }
610}
611
612
613static void
614drop_frames_in (lsquic_stream_t *stream)
615{
616    if (stream->data_in)
617    {
618        stream->data_in->di_if->di_destroy(stream->data_in);
619        /* To avoid checking whether `data_in` is set, just set to the error
620         * data-in stream.  It does the right thing after incoming data is
621         * dropped.
622         */
623        stream->data_in = data_in_error_new();
624    }
625}
626
627
628int
629lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
630                      uint32_t error_code)
631{
632
633    if (stream->stream_flags & STREAM_RST_RECVD)
634    {
635        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
636        return 0;
637    }
638
639    SM_HISTORY_APPEND(stream, SHE_RST_IN);
640    /* This flag must always be set, even if we are "ignoring" it: it is
641     * used by elision code.
642     */
643    stream->stream_flags |= STREAM_RST_RECVD;
644
645    if ((stream->stream_flags & STREAM_FIN_RECVD) &&
646                    /* Pushed streams have fake STREAM_FIN_RECVD set, thus
647                     * we need a special check:
648                     */
649                                            !lsquic_stream_is_pushed(stream))
650    {
651        LSQ_DEBUG("ignore RST_STREAM frame after FIN is received");
652        return 0;
653    }
654
655    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
656    {
657        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
658            "smaller than that of byte following the last byte we have seen: "
659            "0x%"PRIX64, stream->id, offset,
660            lsquic_sfcw_get_max_recv_off(&stream->fc));
661        return -1;
662    }
663
664    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
665    {
666        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
667            " violates flow control", stream->id, offset);
668        return -1;
669    }
670
671    /* Let user collect error: */
672    maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN);
673
674    lsquic_sfcw_consume_rem(&stream->fc);
675    drop_frames_in(stream);
676
677    if (!(stream->stream_flags &
678                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
679        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
680
681    stream->stream_flags |= STREAM_RST_RECVD;
682
683    maybe_finish_stream(stream);
684    maybe_schedule_call_on_close(stream);
685
686    return 0;
687}
688
689
690uint64_t
691lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
692{
693    assert(stream->stream_flags & STREAM_SEND_WUF);
694    stream->stream_flags &= ~STREAM_SEND_WUF;
695    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
696        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
697    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
698}
699
700
701void
702lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
703{
704    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
705    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
706    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
707    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
708        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
709}
710
711
712void
713lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
714{
715    assert(stream->stream_flags & STREAM_SEND_RST);
716    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
717    stream->stream_flags &= ~STREAM_SEND_RST;
718    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
719        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
720    stream->stream_flags |= STREAM_RST_SENT;
721    maybe_finish_stream(stream);
722}
723
724
725static size_t
726read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
727{
728    struct uncompressed_headers *uh = stream->uh;
729    size_t n_avail = uh->uh_size - uh->uh_off;
730    if (n_avail < len)
731        len = n_avail;
732    memcpy(dst, uh->uh_headers + uh->uh_off, len);
733    uh->uh_off += len;
734    if (uh->uh_off == uh->uh_size)
735    {
736        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
737        free(uh);
738        stream->uh = NULL;
739        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
740        {
741            stream->stream_flags |= STREAM_FIN_REACHED;
742            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
743        }
744    }
745    return len;
746}
747
748
749/* This function returns 0 when EOF is reached.
750 */
751ssize_t
752lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
753                     int iovcnt)
754{
755    size_t total_nread, nread;
756    int processed_frames, read_unc_headers, iovidx;
757    unsigned char *p, *end;
758
759    SM_HISTORY_APPEND(stream, SHE_USER_READ);
760
761#define NEXT_IOV() do {                                             \
762    ++iovidx;                                                       \
763    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
764        ++iovidx;                                                   \
765    if (iovidx < iovcnt)                                            \
766    {                                                               \
767        p = iov[iovidx].iov_base;                                   \
768        end = p + iov[iovidx].iov_len;                              \
769    }                                                               \
770    else                                                            \
771        p = end = NULL;                                             \
772} while (0)
773
774#define AVAIL() (end - p)
775
776    if (stream->stream_flags & STREAM_RST_FLAGS)
777    {
778        errno = ECONNRESET;
779        return -1;
780    }
781    if (stream->stream_flags & STREAM_U_READ_DONE)
782    {
783        errno = EBADF;
784        return -1;
785    }
786    if (stream->stream_flags & STREAM_FIN_REACHED)
787        return 0;
788
789    total_nread = 0;
790    processed_frames = 0;
791
792    iovidx = -1;
793    NEXT_IOV();
794
795    if (stream->uh && AVAIL())
796    {
797        read_unc_headers = 1;
798        do
799        {
800            nread = read_uh(stream, p, AVAIL());
801            p += nread;
802            total_nread += nread;
803            if (p == end)
804                NEXT_IOV();
805        }
806        while (stream->uh && AVAIL());
807    }
808    else
809        read_unc_headers = 0;
810
811    struct data_frame *data_frame;
812    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
813    {
814        ++processed_frames;
815        size_t navail = data_frame->df_size - data_frame->df_read_off;
816        size_t ntowrite = AVAIL();
817        if (navail < ntowrite)
818            ntowrite = navail;
819        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
820        p += ntowrite;
821        data_frame->df_read_off += ntowrite;
822        stream->read_offset += ntowrite;
823        total_nread += ntowrite;
824        if (data_frame->df_read_off == data_frame->df_size)
825        {
826            const int fin = data_frame->df_fin;
827            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
828            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
829                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
830            {
831                stream->data_in = stream->data_in->di_if->di_switch_impl(
832                                            stream->data_in, stream->read_offset);
833                if (!stream->data_in)
834                {
835                    stream->data_in = data_in_error_new();
836                    return -1;
837                }
838            }
839            if (fin)
840            {
841                stream->stream_flags |= STREAM_FIN_REACHED;
842                break;
843            }
844        }
845        if (p == end)
846            NEXT_IOV();
847    }
848
849    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
850                                        total_nread, stream->read_offset);
851
852    if (processed_frames)
853    {
854        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
855        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
856        {
857            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
858                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
859            stream->stream_flags |= STREAM_SEND_WUF;
860            maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ);
861        }
862    }
863
864    if (processed_frames || read_unc_headers)
865    {
866        return total_nread;
867    }
868    else
869    {
870        assert(0 == total_nread);
871        errno = EWOULDBLOCK;
872        return -1;
873    }
874}
875
876
877ssize_t
878lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
879{
880    struct iovec iov = { .iov_base = buf, .iov_len = len, };
881    return lsquic_stream_readv(stream, &iov, 1);
882}
883
884
885#ifndef NDEBUG
886/* Use weak linkage so that tests can override this function */
887int
888lsquic_stream_tosend_fin (const lsquic_stream_t *stream)
889    __attribute__((weak))
890    ;
891#endif
892int
893lsquic_stream_tosend_fin (const lsquic_stream_t *stream)
894{
895    return (stream->stream_flags & STREAM_U_WRITE_DONE)
896        && !writing_file(stream)
897        && 0 == stream->tosend_sz
898        && 0 == sum_sbts(stream);
899}
900
901
902static void
903stream_shutdown_read (lsquic_stream_t *stream)
904{
905    if (!(stream->stream_flags & STREAM_U_READ_DONE))
906    {
907        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
908        stream->stream_flags |= STREAM_U_READ_DONE;
909        stream_wantread(stream, 0);
910        maybe_finish_stream(stream);
911    }
912}
913
914
915static void
916stream_shutdown_write (lsquic_stream_t *stream)
917{
918    int flushed_all, data_send_ok;
919
920    if (stream->stream_flags & STREAM_U_WRITE_DONE)
921        return;
922
923    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
924    stream->stream_flags |= STREAM_U_WRITE_DONE;
925
926    data_send_ok = !(stream->stream_flags &
927                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT));
928    if (!data_send_ok)
929    {
930        stream_wantwrite(stream, 0);
931        return;
932    }
933
934    lsquic_stream_flush(stream);
935    flushed_all = stream->tosend_sz == sum_sbts(stream);
936    if (flushed_all)
937        stream_wantwrite(stream, 0);
938    else
939    {
940        stream_wantwrite(stream, 1);
941        use_internal_on_write_flush(stream);
942    }
943
944    if (flushed_all || stream->tosend_sz > 0)
945    {
946        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
947            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
948        stream->stream_flags |= STREAM_SEND_DATA;
949    }
950}
951
952
953int
954lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
955{
956    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
957    if (lsquic_stream_is_closed(stream))
958    {
959        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
960        errno = EBADF;
961        return -1;
962    }
963    /* 0: read, 1: write: 2: read and write
964     */
965    if (how < 0 || how > 2)
966    {
967        errno = EINVAL;
968        return -1;
969    }
970
971    if (how)
972        stream_shutdown_write(stream);
973    if (how != 1)
974        stream_shutdown_read(stream);
975
976    maybe_finish_stream(stream);
977    maybe_schedule_call_on_close(stream);
978    if (how)
979        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN);
980
981    return 0;
982}
983
984
985void
986lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
987{
988    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
989    if (LSQUIC_STREAM_HANDSHAKE == stream->id
990        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
991                                LSQUIC_STREAM_HEADERS == stream->id))
992    {
993        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
994        stream->stream_flags |= STREAM_FORCE_FINISH;
995        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
996    }
997    maybe_finish_stream(stream);
998    maybe_schedule_call_on_close(stream);
999}
1000
1001
1002static void
1003fake_reset_unused_stream (lsquic_stream_t *stream)
1004{
1005    stream->stream_flags |=
1006        STREAM_RST_RECVD    /* User will pick this up on read or write */
1007      | STREAM_RST_SENT     /* Don't send anything else on this stream */
1008    ;
1009
1010    /* Cancel all writes to the network scheduled for this stream: */
1011    if (stream->stream_flags & STREAM_SENDING_FLAGS)
1012        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
1013                                                next_send_stream);
1014    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1015
1016    if (writing_file(stream))
1017        stop_reading_from_file(stream);
1018
1019    LSQ_DEBUG("fake-reset stream %u%s",
1020                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
1021    maybe_finish_stream(stream);
1022    maybe_schedule_call_on_close(stream);
1023}
1024
1025
1026/* This function should only be called for locally-initiated streams whose ID
1027 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
1028 * frame sent by peer but we have not yet received it and created a stream.
1029 * In this situation, we mark the stream as reset, so that user's on_read or
1030 * on_write event callback picks up the error.  That, in turn, should result
1031 * in stream being closed.
1032 *
1033 * If we have received any data frames on this stream, this probably indicates
1034 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
1035 * lower than this.  However, we still try to handle it gracefully and peform
1036 * a shutdown, as if the stream was not reset.
1037 */
1038void
1039lsquic_stream_received_goaway (lsquic_stream_t *stream)
1040{
1041    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1042    if (0 == stream->read_offset &&
1043                            stream->data_in->di_if->di_empty(stream->data_in))
1044        fake_reset_unused_stream(stream);       /* Normal condition */
1045    else
1046    {   /* This is odd, let's handle it the best we can: */
1047        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1048        lsquic_stream_shutdown_internal(stream);
1049    }
1050}
1051
1052
1053uint64_t
1054lsquic_stream_read_offset (const lsquic_stream_t *stream)
1055{
1056    return stream->read_offset;
1057}
1058
1059
1060static int
1061stream_want_read_or_write (lsquic_stream_t *stream, int is_want, int flag)
1062{
1063    const int old_val = !!(stream->stream_flags & flag);
1064    if (old_val != is_want)
1065    {
1066        if (is_want)
1067        {
1068            if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS))
1069                TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1070            stream->stream_flags |= flag;
1071        }
1072        else
1073        {
1074            stream->stream_flags &= ~flag;
1075            if (!(stream->stream_flags & STREAM_RW_EVENT_FLAGS))
1076                TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1077        }
1078    }
1079    return old_val;
1080}
1081
1082
1083static int
1084stream_wantread (lsquic_stream_t *stream, int is_want)
1085{
1086    return stream_want_read_or_write(stream, is_want, STREAM_WANT_READ);
1087}
1088
1089
1090int
1091lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1092{
1093    if (0 == (stream->stream_flags & STREAM_U_READ_DONE))
1094    {
1095        if (is_want)
1096            maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD);
1097        return stream_wantread(stream, is_want);
1098    }
1099    else
1100    {
1101        errno = EBADF;
1102        return -1;
1103    }
1104}
1105
1106
1107static int
1108stream_wantwrite (lsquic_stream_t *stream, int is_want)
1109{
1110    if (writing_file(stream))
1111    {
1112        int old_val = stream->stream_flags & STREAM_SAVED_WANTWR;
1113        stream->stream_flags |= old_val & (0 - !!is_want);
1114        return !!old_val;
1115    }
1116    else
1117        return stream_want_read_or_write(stream, is_want, STREAM_WANT_WRITE);
1118}
1119
1120
1121int
1122lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1123{
1124    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1125    {
1126        return stream_wantwrite(stream, is_want);
1127    }
1128    else
1129    {
1130        errno = EBADF;
1131        return -1;
1132    }
1133}
1134
1135
1136static void
1137stream_flush_on_write (lsquic_stream_t *stream,
1138                                    struct lsquic_stream_ctx *stream_ctx)
1139{
1140    size_t sum, n_flushed;
1141
1142    assert(stream->stream_flags & STREAM_U_WRITE_DONE);
1143
1144    sum = sum_sbts(stream) - stream->tosend_sz;
1145    if (sum == 0)
1146    {   /* This can occur if the stream has been written to and closed by
1147         * the user, but a RST_STREAM comes in that drops all data.
1148         */
1149        LSQ_DEBUG("%s: no more data to send", __func__);
1150        stream_wantwrite(stream, 0);
1151        return;
1152    }
1153
1154    n_flushed = stream_flush_internal(stream, sum);
1155    if (n_flushed == sum)
1156    {
1157        LSQ_DEBUG("Flushed all remaining data (%zd bytes)", n_flushed);
1158        stream_wantwrite(stream, 0);
1159    }
1160    else
1161        LSQ_DEBUG("Flushed %zd out of %zd remaining data", n_flushed, sum);
1162}
1163
1164
1165#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1166                    STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1167
1168
1169static void
1170stream_dispatch_rw_events_loop (lsquic_stream_t *stream, int *processed)
1171{
1172    unsigned no_progress_count, no_progress_limit;
1173    enum stream_flags flags;
1174    uint64_t size;
1175    size_t sbt_size;
1176
1177    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1178    *processed = 0;
1179
1180    no_progress_count = 0;
1181    while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1182    {
1183        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1184        size  = stream->read_offset;
1185
1186        stream->stream_if->on_read(stream, stream->st_ctx);
1187        *processed = 1;
1188
1189        if (no_progress_limit && size == stream->read_offset &&
1190                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1191        {
1192            ++no_progress_count;
1193            if (no_progress_count >= no_progress_limit)
1194            {
1195                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1196                    "progress) in user code reading from stream",
1197                    no_progress_count,
1198                    no_progress_count == 1 ? "" : "s");
1199                break;
1200            }
1201        }
1202        else
1203            no_progress_count = 0;
1204    }
1205
1206    no_progress_count = 0;
1207    while ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream))
1208    {
1209        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1210        size  = stream->tosend_sz;
1211        if (0 == size)
1212            sbt_size = sum_sbts(stream);
1213
1214        stream->on_write_cb(stream, stream->st_ctx);
1215        *processed = 1;
1216
1217        if (no_progress_limit &&
1218            flags == (stream->stream_flags & USER_PROGRESS_FLAGS) &&
1219            (0 == size ? sbt_size == sum_sbts(stream) :
1220                                                size == stream->tosend_sz))
1221        {
1222            ++no_progress_count;
1223            if (no_progress_count >= no_progress_limit)
1224            {
1225                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1226                    "progress) in user code writing to stream",
1227                    no_progress_count,
1228                    no_progress_count == 1 ? "" : "s");
1229                break;
1230            }
1231        }
1232        else
1233            no_progress_count = 0;
1234    }
1235}
1236
1237
1238static void
1239stream_dispatch_rw_events_once (lsquic_stream_t *stream, int *processed)
1240{
1241    *processed = 0;
1242
1243    if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
1244    {
1245        stream->stream_if->on_read(stream, stream->st_ctx);
1246        *processed = 1;
1247    }
1248
1249    if ((stream->stream_flags & STREAM_WANT_WRITE) && stream_writeable(stream))
1250    {
1251        stream->on_write_cb(stream, stream->st_ctx);
1252        *processed = 1;
1253    }
1254}
1255
1256
1257static void
1258maybe_mark_as_blocked (lsquic_stream_t *stream)
1259{
1260    uint64_t off, stream_data_sz;
1261
1262    if (stream->tosend_sz)
1263        stream_data_sz = stream->tosend_sz;
1264    else
1265        stream_data_sz = sum_sbts(stream);
1266
1267    off = stream->tosend_off + stream_data_sz;
1268    if (off >= stream->max_send_off)
1269    {
1270        assert(off == stream->max_send_off);
1271        if (stream->blocked_off < stream->max_send_off)
1272        {
1273            stream->blocked_off = stream->max_send_off;
1274            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1275                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1276                                                            next_send_stream);
1277            stream->stream_flags |= STREAM_SEND_BLOCKED;
1278            LSQ_DEBUG("marked stream-blocked at stream offset "
1279                                            "%"PRIu64, stream->blocked_off);
1280            return;
1281        }
1282    }
1283
1284    if (stream->stream_flags & STREAM_CONN_LIMITED)
1285    {
1286        struct lsquic_conn_cap *const cc = &stream->conn_pub->conn_cap;
1287        off = cc->cc_sent + cc->cc_tosend;
1288        if (off >= cc->cc_max)
1289        {
1290            assert(off == cc->cc_max);
1291            if (cc->cc_blocked < cc->cc_max)
1292            {
1293                cc->cc_blocked = cc->cc_max;
1294                stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1295                LSQ_DEBUG("marked connection-blocked at connection offset "
1296                                                        "%"PRIu64, cc->cc_max);
1297            }
1298        }
1299    }
1300}
1301
1302void
1303lsquic_stream_dispatch_rw_events (lsquic_stream_t *stream)
1304{
1305    int processed;
1306    uint64_t tosend_off;
1307
1308    assert(stream->stream_flags & STREAM_RW_EVENT_FLAGS);
1309    tosend_off = stream->tosend_off;
1310
1311    if (stream->stream_flags & STREAM_RW_ONCE)
1312        stream_dispatch_rw_events_once(stream, &processed);
1313    else
1314        stream_dispatch_rw_events_loop(stream, &processed);
1315
1316    /* User wants to write, but no progress has been made: either stream
1317     * or connection is blocked.
1318     */
1319    if ((stream->stream_flags & STREAM_WANT_WRITE) &&
1320                        stream->tosend_off == tosend_off &&
1321                            (stream->tosend_sz == 0 && sum_sbts(stream) == 0))
1322        maybe_mark_as_blocked(stream);
1323
1324    if (stream->stream_flags & STREAM_RW_EVENT_FLAGS)
1325    {
1326        if (processed)
1327        {   /* Move the stream to the end of the list to ensure fairness. */
1328            TAILQ_REMOVE(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1329            TAILQ_INSERT_TAIL(&stream->conn_pub->rw_streams, stream, next_rw_stream);
1330        }
1331    }
1332    else if (((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
1333                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))
1334        LSQ_DEBUG("stream %u stalled", stream->id);
1335}
1336
1337
1338static struct stream_buf_tosend *
1339get_sbt_buf (lsquic_stream_t *stream)
1340{
1341    struct stream_buf_tosend *sbt;
1342
1343    sbt = TAILQ_LAST(&stream->bufs_tosend, sbts_tailq);
1344    if (!(sbt && SBT_BUF == sbt->sbt_type && (sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz) > 0))
1345    {
1346        sbt = lsquic_mm_get_4k(stream->conn_pub->mm);
1347        if (!sbt)
1348            return NULL;
1349        sbt->sbt_type = SBT_BUF;
1350        sbt->u.buf.sbt_sz  = 0;
1351        sbt->u.buf.sbt_off = 0;
1352        TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt);
1353    }
1354
1355    return sbt;
1356}
1357
1358
1359static size_t
1360sbt_write (struct stream_buf_tosend *sbt, const void *buf, size_t len)
1361{
1362    assert(SBT_BUF == sbt->sbt_type);
1363    size_t ntowrite = sizeof(sbt->u.buf.sbt_data) - sbt->u.buf.sbt_sz;
1364    if (len < ntowrite)
1365        ntowrite = len;
1366    memcpy(sbt->u.buf.sbt_data + sbt->u.buf.sbt_sz, buf, ntowrite);
1367    sbt->u.buf.sbt_sz += ntowrite;
1368    return ntowrite;
1369}
1370
1371static size_t
1372sbt_read_buf (struct stream_buf_tosend *sbt, void *buf, size_t len)
1373{
1374    size_t navail = sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off;
1375    if (len > navail)
1376        len = navail;
1377    memcpy(buf, sbt->u.buf.sbt_data + sbt->u.buf.sbt_off, len);
1378    sbt->u.buf.sbt_off += len;
1379    return len;
1380}
1381
1382
1383static void
1384incr_tosend_sz (lsquic_stream_t *stream, uint64_t incr)
1385{
1386    stream->tosend_sz                    += incr;
1387    if (stream->stream_flags & STREAM_CONN_LIMITED)
1388    {
1389        assert(stream->conn_pub->conn_cap.cc_tosend +
1390                stream->conn_pub->conn_cap.cc_sent + incr <=
1391                    stream->conn_pub->conn_cap.cc_max);
1392        stream->conn_pub->conn_cap.cc_tosend += incr;
1393    }
1394}
1395
1396
1397static void
1398decr_tosend_sz (lsquic_stream_t *stream, uint64_t decr)
1399{
1400    assert(decr <= stream->tosend_sz);
1401    stream->tosend_sz                    -= decr;
1402    if (stream->stream_flags & STREAM_CONN_LIMITED)
1403    {
1404        assert(decr <= stream->conn_pub->conn_cap.cc_tosend);
1405        stream->conn_pub->conn_cap.cc_tosend -= decr;
1406    }
1407}
1408
1409
1410static void
1411sbt_truncated_file (struct stream_buf_tosend *sbt)
1412{
1413    off_t delta = sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1414    decr_tosend_sz(sbt->u.file.sbt_stream, delta);
1415    sbt->u.file.sbt_sz = sbt->u.file.sbt_off;
1416    sbt->u.file.sbt_last = 1;
1417}
1418
1419
1420static void
1421stop_reading_from_file (lsquic_stream_t *stream)
1422{
1423    assert(stream->file_fd >= 0);
1424    if (stream->stream_flags & STREAM_CLOSE_FILE)
1425        (void) close(stream->file_fd);
1426    stream->file_fd = -1;
1427    stream_wantwrite(stream, !!(stream->stream_flags & STREAM_SAVED_WANTWR));
1428    use_user_on_write(stream);
1429}
1430
1431
1432static size_t
1433sbt_read_file (struct stream_buf_tosend *sbt, void *pbuf, size_t len)
1434{
1435    const lsquic_stream_t *const stream = sbt->u.file.sbt_stream;
1436    size_t navail;
1437    ssize_t nread;
1438    unsigned char *buf = pbuf;
1439
1440    navail = sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1441    if (len > navail)
1442        len = navail;
1443
1444    assert(len > 0);
1445
1446    *buf++ = sbt->u.file.sbt_stream->file_byte;
1447    sbt->u.file.sbt_off += 1;
1448    len -= 1;
1449
1450    while (len > 0)
1451    {
1452        nread = read(sbt->u.file.sbt_fd, buf, len);
1453        if (-1 == nread)
1454        {
1455            LSQ_WARN("error reading: %s", strerror(errno));
1456            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1457                (intmax_t) sbt->u.file.sbt_off,
1458                (intmax_t) sbt->u.file.sbt_sz);
1459            sbt_truncated_file(sbt);
1460            break;
1461        }
1462        else if (0 == nread)
1463        {
1464            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1465                (intmax_t) sbt->u.file.sbt_off,
1466                (intmax_t) sbt->u.file.sbt_sz);
1467            sbt_truncated_file(sbt);
1468            break;
1469        }
1470        buf += nread;
1471        len -= nread;
1472        sbt->u.file.sbt_off += nread;
1473    }
1474
1475    len = buf - (unsigned char *) pbuf;
1476
1477    if (sbt->u.file.sbt_off < sbt->u.file.sbt_sz || !sbt->u.file.sbt_last)
1478    {
1479        nread = read(sbt->u.file.sbt_fd, &sbt->u.file.sbt_stream->file_byte, 1);
1480        if (-1 == nread)
1481        {
1482            LSQ_WARN("error reading: %s", strerror(errno));
1483            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1484                (intmax_t) sbt->u.file.sbt_off,
1485                (intmax_t) sbt->u.file.sbt_sz);
1486            sbt_truncated_file(sbt);
1487        }
1488        else if (0 == nread)
1489        {
1490            LSQ_WARN("could only send %jd bytes instead of intended %jd",
1491                (intmax_t) sbt->u.file.sbt_off,
1492                (intmax_t) sbt->u.file.sbt_sz);
1493            sbt_truncated_file(sbt);
1494        }
1495    }
1496
1497    if (sbt->u.file.sbt_last && sbt->u.file.sbt_off == sbt->u.file.sbt_sz)
1498        stop_reading_from_file(sbt->u.file.sbt_stream);
1499
1500    return len;
1501}
1502
1503
1504static size_t
1505sbt_read (struct stream_buf_tosend *sbt, void *buf, size_t len)
1506{
1507    switch (sbt->sbt_type)
1508    {
1509    case SBT_BUF:
1510        return sbt_read_buf(sbt, buf, len);
1511    default:
1512        assert(SBT_FILE == sbt->sbt_type);
1513        return sbt_read_file(sbt, buf, len);
1514    }
1515}
1516
1517
1518static int
1519sbt_done (const struct stream_buf_tosend *sbt)
1520{
1521    switch (sbt->sbt_type)
1522    {
1523    case SBT_BUF:
1524        return sbt->u.buf.sbt_off == sbt->u.buf.sbt_sz;
1525    default:
1526        assert(SBT_FILE == sbt->sbt_type);
1527        return sbt->u.file.sbt_off == sbt->u.file.sbt_sz;
1528    }
1529}
1530
1531
1532static void
1533sbt_destroy (lsquic_stream_t *stream, struct stream_buf_tosend *sbt)
1534{
1535    switch (sbt->sbt_type)
1536    {
1537    case SBT_BUF:
1538        lsquic_mm_put_4k(stream->conn_pub->mm, sbt);
1539        break;
1540    default:
1541        assert(SBT_FILE == sbt->sbt_type);
1542        free(sbt);
1543    }
1544}
1545
1546
1547static size_t
1548sbt_size (const struct stream_buf_tosend *sbt)
1549{
1550    switch (sbt->sbt_type)
1551    {
1552    case SBT_BUF:
1553        return sbt->u.buf.sbt_sz - sbt->u.buf.sbt_off;
1554    default:
1555        return sbt->u.file.sbt_sz - sbt->u.file.sbt_off;
1556    }
1557}
1558
1559
1560static size_t
1561sum_sbts (const lsquic_stream_t *stream)
1562{
1563    size_t sum;
1564    struct stream_buf_tosend *sbt;
1565
1566    sum = 0;
1567    TAILQ_FOREACH(sbt, &stream->bufs_tosend, next_sbt)
1568        sum += sbt_size(sbt);
1569
1570    return sum;
1571}
1572
1573
1574static void
1575maybe_put_on_sending_streams (lsquic_stream_t *stream)
1576{
1577    if (lsquic_stream_tosend_sz(stream))
1578    {
1579        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1580            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
1581        stream->stream_flags |= STREAM_SEND_DATA;
1582    }
1583}
1584
1585
1586static size_t
1587stream_flush_internal (lsquic_stream_t *stream, size_t size)
1588{
1589    uint64_t conn_avail;
1590
1591    assert(0 == stream->tosend_sz ||
1592            (stream->stream_flags & STREAM_U_WRITE_DONE));
1593    if (stream->stream_flags & STREAM_CONN_LIMITED)
1594    {
1595        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
1596        if (size > conn_avail)
1597        {
1598            LSQ_DEBUG("connection-limited: flushing only %"PRIu64
1599                " out of %zd bytes", conn_avail, size);
1600            size = conn_avail;
1601        }
1602    }
1603    LSQ_DEBUG("flushed %zd bytes of stream %u", size, stream->id);
1604    SM_HISTORY_APPEND(stream, SHE_FLUSH);
1605    incr_tosend_sz(stream, size);
1606    maybe_put_on_sending_streams(stream);
1607    return size;
1608}
1609
1610
1611/* When stream->tosend_sz is zero and we have anything in SBT list, this
1612 * means that we have unflushed data.
1613 */
1614int
1615lsquic_stream_flush (lsquic_stream_t *stream)
1616{
1617    size_t sum;
1618    if (0 == stream->tosend_sz && !TAILQ_EMPTY(&stream->bufs_tosend))
1619    {
1620        sum = sum_sbts(stream);
1621        if (stream_flush_internal(stream, sum) > 0)
1622            maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_FLUSH);
1623    }
1624    return 0;
1625}
1626
1627
1628/* The flush threshold is the maximum size of stream data that can be sent
1629 * in a full packet.
1630 */
1631static size_t
1632flush_threshold (const lsquic_stream_t *stream)
1633{
1634    enum packet_out_flags flags;
1635    unsigned packet_header_sz, stream_header_sz;
1636    size_t threshold;
1637
1638    /* We are guessing the number of bytes that will be used to encode
1639     * packet number, because we do not have this information at this
1640     * point in time.
1641     */
1642    flags = PACKNO_LEN_2 << POBIT_SHIFT;
1643    if (stream->conn_pub->lconn->cn_flags & LSCONN_TCID0)
1644        flags |= PO_CONN_ID;
1645
1646    packet_header_sz = lsquic_po_header_length(flags);
1647    stream_header_sz = stream->conn_pub->lconn->cn_pf
1648            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1649
1650    threshold = stream->conn_pub->lconn->cn_pack_size - packet_header_sz
1651                                                         - stream_header_sz;
1652    return threshold;
1653}
1654
1655
1656static size_t
1657flush_or_check_flags (lsquic_stream_t *stream, size_t sz)
1658{
1659    size_t sum;
1660    if (0 == stream->tosend_sz)
1661    {
1662        sum = sum_sbts(stream);
1663        if (sum >= flush_threshold(stream))
1664            return stream_flush_internal(stream, sum);
1665        else
1666            return 0;
1667    }
1668    else
1669    {
1670        incr_tosend_sz(stream, sz);
1671        maybe_put_on_sending_streams(stream);
1672        return sz;
1673    }
1674}
1675
1676
1677static void
1678stream_file_on_write (lsquic_stream_t *stream, struct lsquic_stream_ctx *st_ctx)
1679{
1680    struct stream_buf_tosend *sbt;
1681    ssize_t nr;
1682    size_t size, left;
1683    int last;
1684
1685    if (stream->stream_flags & STREAM_RST_FLAGS)
1686    {
1687        LSQ_INFO("stream was reset: stopping sending the file at offset %jd",
1688                                                (intmax_t) stream->file_off);
1689        stop_reading_from_file(stream);
1690        return;
1691    }
1692
1693    /* Write as much as we can */
1694    size = lsquic_stream_write_avail(stream);
1695    left = stream->file_size - stream->file_off;
1696    if (left < size)
1697        size = left;
1698
1699    if (0 == stream->file_off)
1700    {
1701        /* Try to read in 1 byte to check for truncation.  Having a byte in
1702         * store guarantees that we can generate a frame even if the file is
1703         * truncated later.  This function only does it once, when the first
1704         * SBT is queued.  Subsequent SBT use the byte read in by previous
1705         * SBT in sbt_read_file().
1706         */
1707        nr = read(stream->file_fd, &stream->file_byte, 1);
1708        if (nr != 1)
1709        {
1710            if (nr < 0)
1711                LSQ_WARN("cannot read from file: %s", strerror(errno));
1712            LSQ_INFO("stopping sending the file at offset %jd",
1713                                                (intmax_t) stream->file_off);
1714            stop_reading_from_file(stream);
1715            return;
1716        }
1717    }
1718
1719    last = stream->file_off + (off_t) size == stream->file_size;
1720
1721    sbt = malloc(offsetof(struct stream_buf_tosend, u.file.sbt_last) +
1722                    sizeof(((struct stream_buf_tosend *)0)->u.file.sbt_last));
1723    if (!sbt)
1724    {
1725        LSQ_WARN("malloc failed: %s", strerror(errno));
1726        LSQ_INFO("stopping sending the file at offset %jd",
1727                                                (intmax_t) stream->file_off);
1728        stop_reading_from_file(stream);
1729        return;
1730    }
1731
1732    sbt->sbt_type          = SBT_FILE;
1733    sbt->u.file.sbt_stream = stream;
1734    sbt->u.file.sbt_fd     = stream->file_fd;
1735    sbt->u.file.sbt_sz     = size;
1736    sbt->u.file.sbt_off    = 0;
1737    sbt->u.file.sbt_last   = last;
1738    TAILQ_INSERT_TAIL(&stream->bufs_tosend, sbt, next_sbt);
1739
1740    LSQ_DEBUG("inserted %zd-byte sbt at offset %jd, last: %d", size,
1741                                        (intmax_t) stream->file_off, last);
1742
1743    stream->file_off += size;
1744
1745    incr_tosend_sz(stream, size);
1746    maybe_put_on_sending_streams(stream);
1747
1748    if (last)
1749        stream_want_read_or_write(stream, 0, STREAM_WANT_WRITE);
1750}
1751
1752
1753static void
1754stream_sendfile (lsquic_stream_t *stream, int fd, off_t off, size_t size,
1755                 int close)
1756{
1757    int want_write;
1758
1759    /* STREAM_WANT_WRITE is not guaranteed to be set: the user may have
1760     * already unset it.
1761     */
1762    want_write = !!(stream->stream_flags & STREAM_WANT_WRITE);
1763    stream_wantwrite(stream, 1);
1764
1765    stream->file_fd   = fd;
1766    stream->file_off  = off;
1767    stream->file_size = size;
1768
1769    stream_wantwrite(stream, want_write);
1770
1771    if (close)
1772        stream->stream_flags |= STREAM_CLOSE_FILE;
1773    else
1774        stream->stream_flags &= ~STREAM_CLOSE_FILE;
1775
1776    use_internal_on_write_file(stream);
1777    stream->on_write_cb(stream, stream->st_ctx);
1778}
1779
1780
1781#define COMMON_WRITE_CHECKS() do {                                          \
1782    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1783                                                   == STREAM_USE_HEADERS)   \
1784    {                                                                       \
1785        LSQ_WARN("Attempt to write to stream before sending HTTP headers"); \
1786        errno = EILSEQ;                                                     \
1787        return -1;                                                          \
1788    }                                                                       \
1789    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1790    {                                                                       \
1791        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1792        errno = ECONNRESET;                                                 \
1793        return -1;                                                          \
1794    }                                                                       \
1795    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1796    {                                                                       \
1797        LSQ_WARN("Attempt to write to stream after it was closed for "      \
1798                                                                "writing"); \
1799        errno = EBADF;                                                      \
1800        return -1;                                                          \
1801    }                                                                       \
1802} while (0)
1803
1804
1805int
1806lsquic_stream_write_file (lsquic_stream_t *stream, const char *filename)
1807{
1808    int fd, saved_errno;
1809    struct stat st;
1810
1811    COMMON_WRITE_CHECKS();
1812
1813    fd = open(filename, O_RDONLY);
1814    if (fd < 0)
1815    {
1816        LSQ_WARN("could not open `%s' for reading: %s", filename, strerror(errno));
1817        return -1;
1818    }
1819
1820    if (fstat(fd, &st) < 0)
1821    {
1822        LSQ_WARN("fstat64(%s) failed: %s", filename, strerror(errno));
1823        saved_errno = errno;
1824        (void) close(fd);
1825        errno = saved_errno;
1826        return -1;
1827    }
1828
1829    if (0 == st.st_size)
1830    {
1831        LSQ_INFO("Writing zero-sized file `%s' is a no-op", filename);
1832        (void) close(fd);
1833        return 0;
1834    }
1835
1836    LSQ_DEBUG("Inserted `%s' into SBT queue; size: %jd", filename,
1837                                                    (intmax_t) st.st_size);
1838
1839    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1840    stream_sendfile(stream, fd, 0, st.st_size, 1);
1841    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_WRITEFILE);
1842    return 0;
1843}
1844
1845
1846int
1847lsquic_stream_sendfile (lsquic_stream_t *stream, int fd, off_t off,
1848                        size_t size)
1849{
1850    COMMON_WRITE_CHECKS();
1851    if ((off_t) -1 == lseek(fd, off, SEEK_SET))
1852    {
1853        LSQ_INFO("lseek failed: %s", strerror(errno));
1854        return -1;
1855    }
1856    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1857    stream_sendfile(stream, fd, off, size, 0);
1858    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SENDFILE);
1859    return 0;
1860}
1861
1862
1863ssize_t
1864lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1865{
1866    struct stream_buf_tosend *sbt;
1867    size_t nw, stream_avail, n_flushed;
1868    const unsigned char *p = buf;
1869
1870    COMMON_WRITE_CHECKS();
1871    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1872
1873    stream_avail = lsquic_stream_write_avail(stream);
1874    if (stream_avail < len)
1875    {
1876        LSQ_DEBUG("cap length from %zd to %zd bytes", len, stream_avail);
1877        len = stream_avail;
1878    }
1879
1880    while (len > 0)
1881    {
1882        sbt = get_sbt_buf(stream);
1883        if (!sbt)
1884        {
1885            LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno));
1886            break;
1887        }
1888        nw = sbt_write(sbt, p, len);
1889        len -= nw;
1890        p += nw;
1891    }
1892
1893    const size_t n_written = p - (unsigned char *) buf;
1894    LSQ_DEBUG("wrote %"PRIiPTR" bytes to stream %u", n_written, stream->id);
1895
1896    n_flushed = flush_or_check_flags(stream, n_written);
1897    if (n_flushed)
1898        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITE);
1899
1900    return n_written;
1901}
1902
1903
1904ssize_t
1905lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1906                                                                    int iovcnt)
1907{
1908    int i;
1909    const unsigned char *p;
1910    struct stream_buf_tosend *sbt;
1911    size_t nw, stream_avail, len, n_flushed;
1912    ssize_t nw_total;
1913
1914    COMMON_WRITE_CHECKS();
1915    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1916
1917    nw_total = 0;
1918    stream_avail = lsquic_stream_write_avail(stream);
1919
1920    for (i = 0; i < iovcnt && stream_avail > 0; ++i)
1921    {
1922        len = iov[i].iov_len;
1923        p   = iov[i].iov_base;
1924        if (len > stream_avail)
1925        {
1926            LSQ_DEBUG("cap length from %zd to %zd bytes", nw_total + len,
1927                                                    nw_total + stream_avail);
1928            len = stream_avail;
1929        }
1930        nw_total += len;
1931        stream_avail -= len;
1932        while (len > 0)
1933        {
1934            sbt = get_sbt_buf(stream);
1935            if (!sbt)
1936            {
1937                LSQ_WARN("could not allocate SBT buffer: %s", strerror(errno));
1938                break;
1939            }
1940            nw = sbt_write(sbt, p, len);
1941            len -= nw;
1942            p += nw;
1943        }
1944    }
1945
1946    LSQ_DEBUG("wrote %zd bytes to stream", nw_total);
1947
1948    n_flushed = flush_or_check_flags(stream, nw_total);
1949    if (n_flushed)
1950        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_WRITEV);
1951
1952    return nw_total;
1953}
1954
1955
1956int
1957lsquic_stream_send_headers (lsquic_stream_t *stream,
1958                            const lsquic_http_headers_t *headers, int eos)
1959{
1960    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1961                                                     STREAM_U_WRITE_DONE))
1962                == STREAM_USE_HEADERS)
1963    {
1964        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1965                    stream->id, headers, eos, lsquic_stream_priority(stream));
1966        if (0 == s)
1967        {
1968            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1969            stream->stream_flags |= STREAM_HEADERS_SENT;
1970            if (eos)
1971                stream->stream_flags |= STREAM_FIN_SENT;
1972            LSQ_INFO("sent headers for stream %u", stream->id);
1973        }
1974        else
1975            LSQ_WARN("could not send headers: %s", strerror(errno));
1976        return s;
1977    }
1978    else
1979    {
1980        LSQ_WARN("cannot send headers for stream %u in this state", stream->id);
1981        errno = EBADMSG;
1982        return -1;
1983    }
1984}
1985
1986
1987void
1988lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1989{
1990    if (offset > stream->max_send_off)
1991    {
1992        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1993        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1994            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1995        stream->max_send_off = offset;
1996        if (lsquic_stream_tosend_sz(stream))
1997        {
1998            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1999            {
2000                LSQ_DEBUG("stream %u unblocked, schedule sending again",
2001                    stream->id);
2002                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2003                                                            next_send_stream);
2004            }
2005            stream->stream_flags |= STREAM_SEND_DATA;
2006        }
2007    }
2008    else
2009        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
2010            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
2011            stream->max_send_off);
2012}
2013
2014
2015/* This function is used to update offsets after handshake completes and we
2016 * learn of peer's limits from the handshake values.
2017 */
2018int
2019lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
2020{
2021    LSQ_DEBUG("setting max_send_off to %u", offset);
2022    if (offset > stream->max_send_off)
2023    {
2024        lsquic_stream_window_update(stream, offset);
2025        return 0;
2026    }
2027    else if (offset < stream->tosend_off)
2028    {
2029        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
2030            "already sent on this stream (%"PRIu64" bytes)", offset,
2031            stream->tosend_off);
2032        return -1;
2033    }
2034    else
2035    {
2036        stream->max_send_off = offset;
2037        return 0;
2038    }
2039}
2040
2041
2042#ifndef NDEBUG
2043/* Use weak linkage so that tests can override this function */
2044size_t
2045lsquic_stream_tosend_sz (const lsquic_stream_t *stream)
2046    __attribute__((weak))
2047    ;
2048#endif
2049size_t
2050lsquic_stream_tosend_sz (const lsquic_stream_t *stream)
2051{
2052    assert(stream->tosend_off + stream->tosend_sz <= stream->max_send_off);
2053    return stream->tosend_sz;
2054}
2055
2056
2057#ifndef NDEBUG
2058/* Use weak linkage so that tests can override this function */
2059size_t
2060lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len,
2061                           int *reached_fin)
2062    __attribute__((weak))
2063    ;
2064#endif
2065size_t
2066lsquic_stream_tosend_read (lsquic_stream_t *stream, void *buf, size_t len,
2067                           int *reached_fin)
2068{
2069    assert(stream->tosend_sz > 0);
2070    assert(stream->stream_flags & STREAM_SEND_DATA);
2071    const size_t tosend_sz = lsquic_stream_tosend_sz(stream);
2072    if (tosend_sz < len)
2073        len = tosend_sz;
2074    struct stream_buf_tosend *sbt;
2075    unsigned char *p = buf;
2076    unsigned char *const end = p + len;
2077    while (p < end && (sbt = TAILQ_FIRST(&stream->bufs_tosend)))
2078    {
2079        size_t nread = sbt_read(sbt, p, len);
2080        p += nread;
2081        len -= nread;
2082        if (sbt_done(sbt))
2083        {
2084            TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt);
2085            sbt_destroy(stream, sbt);
2086            LSQ_DEBUG("destroyed SBT");
2087        }
2088        else
2089            break;
2090    }
2091    const size_t n_read = p - (unsigned char *) buf;
2092    decr_tosend_sz(stream, n_read);
2093    stream->tosend_off += n_read;
2094    if (stream->stream_flags & STREAM_CONN_LIMITED)
2095    {
2096        stream->conn_pub->conn_cap.cc_sent += n_read;
2097        assert(stream->conn_pub->conn_cap.cc_sent <=
2098                                        stream->conn_pub->conn_cap.cc_max);
2099    }
2100    *reached_fin = lsquic_stream_tosend_fin(stream);
2101    return n_read;
2102}
2103
2104
2105void
2106lsquic_stream_stream_frame_sent (lsquic_stream_t *stream)
2107{
2108    assert(stream->stream_flags & STREAM_SEND_DATA);
2109    SM_HISTORY_APPEND(stream, SHE_FRAME_OUT);
2110    if (0 == lsquic_stream_tosend_sz(stream))
2111    {
2112        /* Mark the stream as having no sendable data independent of reason
2113         * why there is no data to send.
2114         */
2115        if (0 == stream->tosend_sz)
2116        {
2117            LSQ_DEBUG("all stream %u data has been scheduled for sending, "
2118                "now at offset 0x%"PRIX64, stream->id, stream->tosend_off);
2119            stream->stream_flags &= ~STREAM_SEND_DATA;
2120            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2121                TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
2122                                                        next_send_stream);
2123
2124            if ((stream->stream_flags & STREAM_U_WRITE_DONE) && !writing_file(stream))
2125            {
2126                stream->stream_flags |= STREAM_FIN_SENT;
2127                maybe_finish_stream(stream);
2128            }
2129        }
2130        else
2131        {
2132            LSQ_DEBUG("stream %u blocked from sending", stream->id);
2133            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2134                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2135                                                        next_send_stream);
2136            stream->stream_flags &= ~STREAM_SEND_DATA;
2137            stream->stream_flags |= STREAM_SEND_BLOCKED;
2138        }
2139    }
2140}
2141
2142
2143#ifndef NDEBUG
2144/* Use weak linkage so that tests can override this function */
2145uint64_t
2146lsquic_stream_tosend_offset (const lsquic_stream_t *stream)
2147    __attribute__((weak))
2148    ;
2149#endif
2150uint64_t
2151lsquic_stream_tosend_offset (const lsquic_stream_t *stream)
2152{
2153    return stream->tosend_off;
2154}
2155
2156
2157static void
2158drop_sbts (lsquic_stream_t *stream)
2159{
2160    struct stream_buf_tosend *sbt;
2161
2162    while ((sbt = TAILQ_FIRST(&stream->bufs_tosend)))
2163    {
2164        TAILQ_REMOVE(&stream->bufs_tosend, sbt, next_sbt);
2165        sbt_destroy(stream, sbt);
2166    }
2167
2168    decr_tosend_sz(stream, stream->tosend_sz);
2169    stream->tosend_sz = 0;
2170}
2171
2172
2173void
2174lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
2175{
2176    lsquic_stream_reset_ext(stream, error_code, 1);
2177}
2178
2179
2180void
2181lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
2182                         int do_close)
2183{
2184    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
2185    {
2186        LSQ_INFO("reset already sent");
2187        return;
2188    }
2189
2190    SM_HISTORY_APPEND(stream, SHE_RESET);
2191
2192    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
2193    stream->error_code = error_code;
2194
2195    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
2196        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
2197                                                        next_send_stream);
2198    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
2199    stream->stream_flags |= STREAM_SEND_RST;
2200
2201    drop_sbts(stream);
2202    maybe_schedule_call_on_close(stream);
2203
2204    if (do_close)
2205        lsquic_stream_close(stream);
2206    else
2207        maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT);
2208}
2209
2210
2211unsigned
2212lsquic_stream_id (const lsquic_stream_t *stream)
2213{
2214    return stream->id;
2215}
2216
2217
2218struct lsquic_conn *
2219lsquic_stream_conn (const lsquic_stream_t *stream)
2220{
2221    return stream->conn_pub->lconn;
2222}
2223
2224
2225int
2226lsquic_stream_close (lsquic_stream_t *stream)
2227{
2228    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
2229    SM_HISTORY_APPEND(stream, SHE_CLOSE);
2230    if (lsquic_stream_is_closed(stream))
2231    {
2232        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
2233        errno = EBADF;
2234        return -1;
2235    }
2236    stream_shutdown_write(stream);
2237    stream_shutdown_read(stream);
2238    maybe_schedule_call_on_close(stream);
2239    maybe_finish_stream(stream);
2240    maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE);
2241    return 0;
2242}
2243
2244
2245void
2246lsquic_stream_acked (lsquic_stream_t *stream)
2247{
2248    assert(stream->n_unacked);
2249    --stream->n_unacked;
2250    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
2251    if (0 == stream->n_unacked)
2252        maybe_finish_stream(stream);
2253}
2254
2255
2256void
2257lsquic_stream_push_req (lsquic_stream_t *stream,
2258                        struct uncompressed_headers *push_req)
2259{
2260    assert(!stream->push_req);
2261    stream->push_req = push_req;
2262    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
2263}
2264
2265
2266int
2267lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2268{
2269    return 1 & ~stream->id;
2270}
2271
2272
2273int
2274lsquic_stream_push_info (const lsquic_stream_t *stream,
2275        uint32_t *ref_stream_id, const char **headers, size_t *headers_sz)
2276{
2277    if (lsquic_stream_is_pushed(stream))
2278    {
2279        assert(stream->push_req);
2280        *ref_stream_id = stream->push_req->uh_stream_id;
2281        *headers       = stream->push_req->uh_headers;
2282        *headers_sz    = stream->push_req->uh_size;
2283        return 0;
2284    }
2285    else
2286        return -1;
2287}
2288
2289
2290int
2291lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2292{
2293    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2294    {
2295        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2296        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2297        stream->stream_flags |= STREAM_HAVE_UH;
2298        if (uh->uh_flags & UH_FIN)
2299            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2300        stream->uh = uh;
2301        if (uh->uh_oth_stream_id == 0)
2302        {
2303            if (uh->uh_weight)
2304                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2305        }
2306        else
2307            LSQ_NOTICE("don't know how to depend on stream %u",
2308                                                        uh->uh_oth_stream_id);
2309        return 0;
2310    }
2311    else
2312    {
2313        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2314        return -1;
2315    }
2316}
2317
2318
2319unsigned
2320lsquic_stream_priority (const lsquic_stream_t *stream)
2321{
2322    return 256 - stream->sm_priority;
2323}
2324
2325
2326int
2327lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2328{
2329    /* The user should never get a reference to the special streams,
2330     * but let's check just in case:
2331     */
2332    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2333        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2334                                LSQUIC_STREAM_HEADERS == stream->id))
2335        return -1;
2336    if (priority < 1 || priority > 256)
2337        return -1;
2338    stream->sm_priority = 256 - priority;
2339    LSQ_DEBUG("set priority to %u", priority);
2340    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2341    return 0;
2342}
2343
2344
2345int
2346lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2347{
2348    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2349    {
2350        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2351                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2352        {
2353            /* We need to send headers only if we are a) using HEADERS stream
2354             * and b) we already sent initial headers.  If initial headers
2355             * have not been sent yet, stream priority will be sent in the
2356             * HEADERS frame.
2357             */
2358            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2359                                                    stream->id, 0, 0, priority);
2360        }
2361        else
2362            return 0;
2363    }
2364    else
2365        return -1;
2366}
2367
2368
2369lsquic_stream_ctx_t *
2370lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2371{
2372    return stream->st_ctx;
2373}
2374
2375
2376int
2377lsquic_stream_refuse_push (lsquic_stream_t *stream)
2378{
2379    if (lsquic_stream_is_pushed(stream) &&
2380                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2381    {
2382        LSQ_DEBUG("refusing pushed stream: send reset");
2383        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2384        return 0;
2385    }
2386    else
2387        return -1;
2388}
2389