lsquic_stream.c revision c7d81ce1
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_stream.c -- stream processing
4 *
5 * To clear up terminology, here are some of our stream states (in order).
6 * They are not codified, but they are referred to in both code and comments.
7 *
8 *  CLOSED      STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set.  At this
9 *                point, on_close() gets called.
10 *  FINISHED    FIN or RST has been sent to peer.  Stream is scheduled to be
11 *                finished (freed): it gets put onto the `service_streams'
12 *                list for connection to clean it up.
13 *  DESTROYED   All remaining memory associated with the stream is released.
14 *                If on_close() has not been called yet, it is called now.
15 *                The stream pointer is now invalid.
16 *
17 * When connection is aborted, a stream may go directly to DESTROYED state.
18 */
19
20#include <assert.h>
21#include <errno.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/queue.h>
27#include <stddef.h>
28
29#include "lsquic.h"
30
31#include "lsquic_int_types.h"
32#include "lsquic_packet_common.h"
33#include "lsquic_packet_in.h"
34#include "lsquic_malo.h"
35#include "lsquic_conn_flow.h"
36#include "lsquic_rtt.h"
37#include "lsquic_sfcw.h"
38#include "lsquic_stream.h"
39#include "lsquic_conn_public.h"
40#include "lsquic_util.h"
41#include "lsquic_mm.h"
42#include "lsquic_headers_stream.h"
43#include "lsquic_conn.h"
44#include "lsquic_data_in_if.h"
45#include "lsquic_parse.h"
46#include "lsquic_packet_out.h"
47#include "lsquic_engine_public.h"
48#include "lsquic_senhist.h"
49#include "lsquic_pacer.h"
50#include "lsquic_cubic.h"
51#include "lsquic_send_ctl.h"
52#include "lsquic_headers.h"
53#include "lsquic_ev_log.h"
54
55#define LSQUIC_LOGGER_MODULE LSQLM_STREAM
56#define LSQUIC_LOG_CONN_ID stream->conn_pub->lconn->cn_cid
57#define LSQUIC_LOG_STREAM_ID stream->id
58#include "lsquic_logger.h"
59
60#define SM_BUF_SIZE QUIC_MAX_PACKET_SZ
61
62static void
63drop_frames_in (lsquic_stream_t *stream);
64
65static void
66maybe_schedule_call_on_close (lsquic_stream_t *stream);
67
68static int
69stream_wantread (lsquic_stream_t *stream, int is_want);
70
71static int
72stream_wantwrite (lsquic_stream_t *stream, int is_want);
73
74static ssize_t
75stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
76
77static ssize_t
78save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
79
80static int
81stream_flush (lsquic_stream_t *stream);
82
83static int
84stream_flush_nocheck (lsquic_stream_t *stream);
85
86static void
87maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag);
88
89
90#if LSQUIC_KEEP_STREAM_HISTORY
91/* These values are printable ASCII characters for ease of printing the
92 * whole history in a single line of a log message.
93 *
94 * The list of events is not exhaustive: only most interesting events
95 * are recorded.
96 */
97enum stream_history_event
98{
99    SHE_EMPTY              =  '\0',     /* Special entry.  No init besides memset required */
100    SHE_PLUS               =  '+',      /* Special entry: previous event occured more than once */
101    SHE_REACH_FIN          =  'a',
102    SHE_BLOCKED_OUT        =  'b',
103    SHE_CREATED            =  'C',
104    SHE_FRAME_IN           =  'd',
105    SHE_FRAME_OUT          =  'D',
106    SHE_RESET              =  'e',
107    SHE_WINDOW_UPDATE      =  'E',
108    SHE_FIN_IN             =  'f',
109    SHE_FINISHED           =  'F',
110    SHE_GOAWAY_IN          =  'g',
111    SHE_USER_WRITE_HEADER  =  'h',
112    SHE_HEADERS_IN         =  'H',
113    SHE_ONCLOSE_SCHED      =  'l',
114    SHE_ONCLOSE_CALL       =  'L',
115    SHE_ONNEW              =  'N',
116    SHE_SET_PRIO           =  'p',
117    SHE_USER_READ          =  'r',
118    SHE_SHUTDOWN_READ      =  'R',
119    SHE_RST_IN             =  's',
120    SHE_RST_OUT            =  't',
121    SHE_FLUSH              =  'u',
122    SHE_USER_WRITE_DATA    =  'w',
123    SHE_SHUTDOWN_WRITE     =  'W',
124    SHE_CLOSE              =  'X',
125    SHE_FORCE_FINISH       =  'Z',
126};
127
128static void
129sm_history_append (lsquic_stream_t *stream, enum stream_history_event sh_event)
130{
131    enum stream_history_event prev_event;
132    sm_hist_idx_t idx;
133    int plus;
134
135    idx = (stream->sm_hist_idx - 1) & SM_HIST_IDX_MASK;
136    plus = SHE_PLUS == stream->sm_hist_buf[idx];
137    idx = (idx - plus) & SM_HIST_IDX_MASK;
138    prev_event = stream->sm_hist_buf[idx];
139
140    if (prev_event == sh_event && plus)
141        return;
142
143    if (prev_event == sh_event)
144        sh_event = SHE_PLUS;
145    stream->sm_hist_buf[ stream->sm_hist_idx++ & SM_HIST_IDX_MASK ] = sh_event;
146
147    if (0 == (stream->sm_hist_idx & SM_HIST_IDX_MASK))
148        LSQ_DEBUG("history: [%.*s]", (int) sizeof(stream->sm_hist_buf),
149                                                        stream->sm_hist_buf);
150}
151
152
153#   define SM_HISTORY_APPEND(stream, event) sm_history_append(stream, event)
154#   define SM_HISTORY_DUMP_REMAINING(stream) do {                           \
155        if (stream->sm_hist_idx & SM_HIST_IDX_MASK)                         \
156            LSQ_DEBUG("history: [%.*s]",                                    \
157                (int) ((stream)->sm_hist_idx & SM_HIST_IDX_MASK),           \
158                (stream)->sm_hist_buf);                                     \
159    } while (0)
160#else
161#   define SM_HISTORY_APPEND(stream, event)
162#   define SM_HISTORY_DUMP_REMAINING(stream)
163#endif
164
165
166static int
167stream_inside_callback (const lsquic_stream_t *stream)
168{
169    return stream->conn_pub->enpub->enp_flags & ENPUB_PROC;
170}
171
172
173static void
174maybe_conn_to_tickable (lsquic_stream_t *stream)
175{
176    if (!stream_inside_callback(stream))
177        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
178                                           stream->conn_pub->lconn);
179}
180
181
182/* Here, "readable" means that the user is able to read from the stream. */
183static void
184maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
185{
186    if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
187    {
188        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
189                                           stream->conn_pub->lconn);
190    }
191}
192
193
194/* Here, "writeable" means that data can be put into packets to be
195 * scheduled to be sent out.
196 *
197 * If `check_can_send' is false, it means that we do not need to check
198 * whether packets can be sent.  This check was already performed when
199 * we packetized stream data.
200 */
201static void
202maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
203                                                    int check_can_send)
204{
205    if (!stream_inside_callback(stream) &&
206            (!check_can_send
207             || lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
208          ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
209    {
210        lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
211                                           stream->conn_pub->lconn);
212    }
213}
214
215
216static int
217stream_stalled (const lsquic_stream_t *stream)
218{
219    return 0 == (stream->stream_flags & (STREAM_WANT_WRITE|STREAM_WANT_READ)) &&
220           ((STREAM_U_READ_DONE|STREAM_U_WRITE_DONE) & stream->stream_flags)
221                                    != (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE);
222}
223
224
225/* TODO: The logic to figure out whether the stream is connection limited
226 * should be taken out of the constructor.  The caller should specify this
227 * via one of enum stream_ctor_flags.
228 */
229lsquic_stream_t *
230lsquic_stream_new_ext (uint32_t id, struct lsquic_conn_public *conn_pub,
231                       const struct lsquic_stream_if *stream_if,
232                       void *stream_if_ctx, unsigned initial_window,
233                       unsigned initial_send_off,
234                       enum stream_ctor_flags ctor_flags)
235{
236    lsquic_cfcw_t *cfcw;
237    lsquic_stream_t *stream;
238
239    stream = calloc(1, sizeof(*stream));
240    if (!stream)
241        return NULL;
242
243    stream->stream_if = stream_if;
244    stream->id        = id;
245    stream->conn_pub  = conn_pub;
246    stream->sm_onnew_arg = stream_if_ctx;
247    if (!initial_window)
248        initial_window = 16 * 1024;
249    if (LSQUIC_STREAM_HANDSHAKE == id ||
250        (conn_pub->hs && LSQUIC_STREAM_HEADERS == id))
251        cfcw = NULL;
252    else
253    {
254        cfcw = &conn_pub->cfcw;
255        stream->stream_flags |= STREAM_CONN_LIMITED;
256        if (conn_pub->hs)
257            stream->stream_flags |= STREAM_USE_HEADERS;
258        lsquic_stream_set_priority_internal(stream, LSQUIC_STREAM_DEFAULT_PRIO);
259    }
260    lsquic_sfcw_init(&stream->fc, initial_window, cfcw, conn_pub, id);
261    if (!initial_send_off)
262        initial_send_off = 16 * 1024;
263    stream->max_send_off = initial_send_off;
264    if (ctor_flags & SCF_USE_DI_HASH)
265        stream->data_in = data_in_hash_new(conn_pub, id, 0);
266    else
267        stream->data_in = data_in_nocopy_new(conn_pub, id);
268    LSQ_DEBUG("created stream %u @%p", id, stream);
269    SM_HISTORY_APPEND(stream, SHE_CREATED);
270    if (ctor_flags & SCF_DI_AUTOSWITCH)
271        stream->stream_flags |= STREAM_AUTOSWITCH;
272    if (ctor_flags & SCF_CALL_ON_NEW)
273        lsquic_stream_call_on_new(stream);
274    if (ctor_flags & SCF_DISP_RW_ONCE)
275        stream->stream_flags |= STREAM_RW_ONCE;
276    if (ctor_flags & SCF_CRITICAL)
277        stream->stream_flags |= STREAM_CRITICAL;
278    return stream;
279}
280
281
282void
283lsquic_stream_call_on_new (lsquic_stream_t *stream)
284{
285    assert(!(stream->stream_flags & STREAM_ONNEW_DONE));
286    if (!(stream->stream_flags & STREAM_ONNEW_DONE))
287    {
288        LSQ_DEBUG("calling on_new_stream");
289        SM_HISTORY_APPEND(stream, SHE_ONNEW);
290        stream->stream_flags |= STREAM_ONNEW_DONE;
291        stream->st_ctx = stream->stream_if->on_new_stream(stream->sm_onnew_arg,
292                                                          stream);
293    }
294}
295
296
297static void
298decr_conn_cap (struct lsquic_stream *stream, size_t incr)
299{
300    if (stream->stream_flags & STREAM_CONN_LIMITED)
301    {
302        assert(stream->conn_pub->conn_cap.cc_sent >= incr);
303        stream->conn_pub->conn_cap.cc_sent -= incr;
304    }
305}
306
307
308static void
309drop_buffered_data (struct lsquic_stream *stream)
310{
311    decr_conn_cap(stream, stream->sm_n_buffered);
312    stream->sm_n_buffered = 0;
313    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
314        maybe_remove_from_write_q(stream, STREAM_WRITE_Q_FLAGS);
315}
316
317
318static void
319destroy_uh (struct lsquic_stream *stream)
320{
321    if (stream->uh)
322    {
323        if (stream->uh->uh_hset)
324            stream->conn_pub->enpub->enp_hsi_if
325                            ->hsi_discard_header_set(stream->uh->uh_hset);
326        free(stream->uh);
327        stream->uh = NULL;
328    }
329}
330
331
332void
333lsquic_stream_destroy (lsquic_stream_t *stream)
334{
335    stream->stream_flags |= STREAM_U_WRITE_DONE|STREAM_U_READ_DONE;
336    if ((stream->stream_flags & (STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE)) ==
337                                                            STREAM_ONNEW_DONE)
338    {
339        stream->stream_flags |= STREAM_ONCLOSE_DONE;
340        stream->stream_if->on_close(stream, stream->st_ctx);
341    }
342    if (stream->stream_flags & STREAM_SENDING_FLAGS)
343        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
344    if (stream->stream_flags & STREAM_WANT_READ)
345        TAILQ_REMOVE(&stream->conn_pub->read_streams, stream, next_read_stream);
346    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
347        TAILQ_REMOVE(&stream->conn_pub->write_streams, stream, next_write_stream);
348    if (stream->stream_flags & STREAM_SERVICE_FLAGS)
349        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream, next_service_stream);
350    drop_buffered_data(stream);
351    lsquic_sfcw_consume_rem(&stream->fc);
352    drop_frames_in(stream);
353    if (stream->push_req)
354    {
355        if (stream->push_req->uh_hset)
356            stream->conn_pub->enpub->enp_hsi_if
357                            ->hsi_discard_header_set(stream->push_req->uh_hset);
358        free(stream->push_req);
359    }
360    destroy_uh(stream);
361    free(stream->sm_buf);
362    LSQ_DEBUG("destroyed stream %u @%p", stream->id, stream);
363    SM_HISTORY_DUMP_REMAINING(stream);
364    free(stream);
365}
366
367
368static int
369stream_is_finished (const lsquic_stream_t *stream)
370{
371    return lsquic_stream_is_closed(stream)
372           /* n_unacked checks that no outgoing packets that reference this
373            * stream are outstanding:
374            */
375        && 0 == stream->n_unacked
376           /* This checks that no packets that reference this stream will
377            * become outstanding:
378            */
379        && 0 == (stream->stream_flags & STREAM_SEND_RST)
380        && ((stream->stream_flags & STREAM_FORCE_FINISH)
381          || ((stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))
382           && (stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))));
383}
384
385
386static void
387maybe_finish_stream (lsquic_stream_t *stream)
388{
389    if (0 == (stream->stream_flags & STREAM_FINISHED) &&
390                                                    stream_is_finished(stream))
391    {
392        LSQ_DEBUG("stream %u is now finished", stream->id);
393        SM_HISTORY_APPEND(stream, SHE_FINISHED);
394        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
395            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
396                                                    next_service_stream);
397        stream->stream_flags |= STREAM_FREE_STREAM|STREAM_FINISHED;
398    }
399}
400
401
402static void
403maybe_schedule_call_on_close (lsquic_stream_t *stream)
404{
405    if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
406                     STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE|STREAM_CALL_ONCLOSE))
407            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE))
408    {
409        if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
410            TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
411                                                    next_service_stream);
412        stream->stream_flags |= STREAM_CALL_ONCLOSE;
413        LSQ_DEBUG("scheduled calling on_close for stream %u", stream->id);
414        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_SCHED);
415    }
416}
417
418
419void
420lsquic_stream_call_on_close (lsquic_stream_t *stream)
421{
422    assert(stream->stream_flags & STREAM_ONNEW_DONE);
423    stream->stream_flags &= ~STREAM_CALL_ONCLOSE;
424    if (!(stream->stream_flags & STREAM_SERVICE_FLAGS))
425        TAILQ_REMOVE(&stream->conn_pub->service_streams, stream,
426                                                    next_service_stream);
427    if (0 == (stream->stream_flags & STREAM_ONCLOSE_DONE))
428    {
429        LSQ_DEBUG("calling on_close for stream %u", stream->id);
430        stream->stream_flags |= STREAM_ONCLOSE_DONE;
431        SM_HISTORY_APPEND(stream, SHE_ONCLOSE_CALL);
432        stream->stream_if->on_close(stream, stream->st_ctx);
433    }
434    else
435        assert(0);
436}
437
438
439int
440lsquic_stream_readable (const lsquic_stream_t *stream)
441{
442    /* A stream is readable if one of the following is true: */
443    return
444        /* - It is already finished: in that case, lsquic_stream_read() will
445         *   return 0.
446         */
447            (stream->stream_flags & STREAM_FIN_REACHED)
448        /* - The stream is reset, by either side.  In this case,
449         *   lsquic_stream_read() will return -1 (we want the user to be
450         *   able to collect the error).
451         */
452        ||  (stream->stream_flags & STREAM_RST_FLAGS)
453        /* - Either we are not in HTTP mode or the HTTP headers have been
454         *   received and the headers or data from the stream can be read.
455         */
456        ||  (!((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
457                                                        == STREAM_USE_HEADERS)
458            && (stream->uh != NULL
459                ||  stream->data_in->di_if->di_get_frame(stream->data_in,
460                                                        stream->read_offset)))
461    ;
462}
463
464
465size_t
466lsquic_stream_write_avail (const struct lsquic_stream *stream)
467{
468    uint64_t stream_avail, conn_avail;
469
470    stream_avail = stream->max_send_off - stream->tosend_off
471                                                - stream->sm_n_buffered;
472    if (stream->stream_flags & STREAM_CONN_LIMITED)
473    {
474        conn_avail = lsquic_conn_cap_avail(&stream->conn_pub->conn_cap);
475        if (conn_avail < stream_avail)
476            return conn_avail;
477    }
478
479    return stream_avail;
480}
481
482
483int
484lsquic_stream_update_sfcw (lsquic_stream_t *stream, uint64_t max_off)
485{
486    if (max_off > lsquic_sfcw_get_max_recv_off(&stream->fc) &&
487                    !lsquic_sfcw_set_max_recv_off(&stream->fc, max_off))
488    {
489        return -1;
490    }
491    if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
492    {
493        if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
494            TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
495                                                    next_send_stream);
496        stream->stream_flags |= STREAM_SEND_WUF;
497    }
498    return 0;
499}
500
501
502int
503lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
504{
505    uint64_t max_off;
506    int got_next_offset;
507    enum ins_frame ins_frame;
508
509    assert(frame->packet_in);
510
511    SM_HISTORY_APPEND(stream, SHE_FRAME_IN);
512    LSQ_DEBUG("received stream frame, stream %u, offset 0x%"PRIX64", len %u; "
513        "fin: %d", stream->id, frame->data_frame.df_offset, frame->data_frame.df_size, !!frame->data_frame.df_fin);
514
515    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN)) ==
516                                (STREAM_USE_HEADERS|STREAM_HEAD_IN_FIN))
517    {
518        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
519        lsquic_malo_put(frame);
520        return -1;
521    }
522
523    got_next_offset = frame->data_frame.df_offset == stream->read_offset;
524  insert_frame:
525    ins_frame = stream->data_in->di_if->di_insert_frame(stream->data_in, frame, stream->read_offset);
526    if (INS_FRAME_OK == ins_frame)
527    {
528        /* Update maximum offset in the flow controller and check for flow
529         * control violation:
530         */
531        max_off = frame->data_frame.df_offset + frame->data_frame.df_size;
532        if (0 != lsquic_stream_update_sfcw(stream, max_off))
533            return -1;
534        if (frame->data_frame.df_fin)
535        {
536            SM_HISTORY_APPEND(stream, SHE_FIN_IN);
537            stream->stream_flags |= STREAM_FIN_RECVD;
538            maybe_finish_stream(stream);
539        }
540        if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
541                (stream->data_in->di_flags & DI_SWITCH_IMPL))
542        {
543            stream->data_in = stream->data_in->di_if->di_switch_impl(
544                                        stream->data_in, stream->read_offset);
545            if (!stream->data_in)
546            {
547                stream->data_in = data_in_error_new();
548                return -1;
549            }
550        }
551        if (got_next_offset)
552            /* Checking the offset saves di_get_frame() call */
553            maybe_conn_to_tickable_if_readable(stream);
554        return 0;
555    }
556    else if (INS_FRAME_DUP == ins_frame)
557    {
558        return 0;
559    }
560    else if (INS_FRAME_OVERLAP == ins_frame)
561    {
562        LSQ_DEBUG("overlap: switching DATA IN implementation");
563        stream->data_in = stream->data_in->di_if->di_switch_impl(
564                                    stream->data_in, stream->read_offset);
565        if (stream->data_in)
566            goto insert_frame;
567        stream->data_in = data_in_error_new();
568        lsquic_packet_in_put(stream->conn_pub->mm, frame->packet_in);
569        lsquic_malo_put(frame);
570        return -1;
571    }
572    else
573    {
574        assert(INS_FRAME_ERR == ins_frame);
575        return -1;
576    }
577}
578
579
580static void
581drop_frames_in (lsquic_stream_t *stream)
582{
583    if (stream->data_in)
584    {
585        stream->data_in->di_if->di_destroy(stream->data_in);
586        /* To avoid checking whether `data_in` is set, just set to the error
587         * data-in stream.  It does the right thing after incoming data is
588         * dropped.
589         */
590        stream->data_in = data_in_error_new();
591    }
592}
593
594
595static void
596maybe_elide_stream_frames (struct lsquic_stream *stream)
597{
598    if (!(stream->stream_flags & STREAM_FRAMES_ELIDED))
599    {
600        if (stream->n_unacked)
601            lsquic_send_ctl_elide_stream_frames(stream->conn_pub->send_ctl,
602                                                stream->id);
603        stream->stream_flags |= STREAM_FRAMES_ELIDED;
604    }
605}
606
607
608int
609lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
610                      uint32_t error_code)
611{
612
613    if (stream->stream_flags & STREAM_RST_RECVD)
614    {
615        LSQ_DEBUG("ignore duplicate RST_STREAM frame");
616        return 0;
617    }
618
619    SM_HISTORY_APPEND(stream, SHE_RST_IN);
620    /* This flag must always be set, even if we are "ignoring" it: it is
621     * used by elision code.
622     */
623    stream->stream_flags |= STREAM_RST_RECVD;
624
625    if (lsquic_sfcw_get_max_recv_off(&stream->fc) > offset)
626    {
627        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64" is "
628            "smaller than that of byte following the last byte we have seen: "
629            "0x%"PRIX64, stream->id, offset,
630            lsquic_sfcw_get_max_recv_off(&stream->fc));
631        return -1;
632    }
633
634    if (!lsquic_sfcw_set_max_recv_off(&stream->fc, offset))
635    {
636        LSQ_INFO("stream %u: RST_STREAM invalid: its offset 0x%"PRIX64
637            " violates flow control", stream->id, offset);
638        return -1;
639    }
640
641    /* Let user collect error: */
642    maybe_conn_to_tickable_if_readable(stream);
643
644    lsquic_sfcw_consume_rem(&stream->fc);
645    drop_frames_in(stream);
646    drop_buffered_data(stream);
647    maybe_elide_stream_frames(stream);
648
649    if (!(stream->stream_flags &
650                        (STREAM_SEND_RST|STREAM_RST_SENT|STREAM_FIN_SENT)))
651        lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
652
653    stream->stream_flags |= STREAM_RST_RECVD;
654
655    maybe_finish_stream(stream);
656    maybe_schedule_call_on_close(stream);
657
658    return 0;
659}
660
661
662uint64_t
663lsquic_stream_fc_recv_off (lsquic_stream_t *stream)
664{
665    assert(stream->stream_flags & STREAM_SEND_WUF);
666    stream->stream_flags &= ~STREAM_SEND_WUF;
667    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
668        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
669    return lsquic_sfcw_get_fc_recv_off(&stream->fc);
670}
671
672
673void
674lsquic_stream_blocked_frame_sent (lsquic_stream_t *stream)
675{
676    assert(stream->stream_flags & STREAM_SEND_BLOCKED);
677    SM_HISTORY_APPEND(stream, SHE_BLOCKED_OUT);
678    stream->stream_flags &= ~STREAM_SEND_BLOCKED;
679    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
680        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
681}
682
683
684void
685lsquic_stream_rst_frame_sent (lsquic_stream_t *stream)
686{
687    assert(stream->stream_flags & STREAM_SEND_RST);
688    SM_HISTORY_APPEND(stream, SHE_RST_OUT);
689    stream->stream_flags &= ~STREAM_SEND_RST;
690    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
691        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
692    stream->stream_flags |= STREAM_RST_SENT;
693    maybe_finish_stream(stream);
694}
695
696
697static size_t
698read_uh (lsquic_stream_t *stream, unsigned char *dst, size_t len)
699{
700    struct http1x_headers *h1h = stream->uh->uh_hset;
701    size_t n_avail = h1h->h1h_size - h1h->h1h_off;
702    if (n_avail < len)
703        len = n_avail;
704    memcpy(dst, h1h->h1h_buf + h1h->h1h_off, len);
705    h1h->h1h_off += len;
706    if (h1h->h1h_off == h1h->h1h_size)
707    {
708        LSQ_DEBUG("read all uncompressed headers for stream %u", stream->id);
709        destroy_uh(stream);
710        if (stream->stream_flags & STREAM_HEAD_IN_FIN)
711        {
712            stream->stream_flags |= STREAM_FIN_REACHED;
713            SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
714        }
715    }
716    return len;
717}
718
719
720/* This function returns 0 when EOF is reached.
721 */
722ssize_t
723lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
724                     int iovcnt)
725{
726    size_t total_nread, nread;
727    int processed_frames, read_unc_headers, iovidx;
728    unsigned char *p, *end;
729
730    SM_HISTORY_APPEND(stream, SHE_USER_READ);
731
732#define NEXT_IOV() do {                                             \
733    ++iovidx;                                                       \
734    while (iovidx < iovcnt && 0 == iov[iovidx].iov_len)             \
735        ++iovidx;                                                   \
736    if (iovidx < iovcnt)                                            \
737    {                                                               \
738        p = iov[iovidx].iov_base;                                   \
739        end = p + iov[iovidx].iov_len;                              \
740    }                                                               \
741    else                                                            \
742        p = end = NULL;                                             \
743} while (0)
744
745#define AVAIL() (end - p)
746
747    if (stream->stream_flags & STREAM_RST_FLAGS)
748    {
749        errno = ECONNRESET;
750        return -1;
751    }
752    if (stream->stream_flags & STREAM_U_READ_DONE)
753    {
754        errno = EBADF;
755        return -1;
756    }
757    if (stream->stream_flags & STREAM_FIN_REACHED)
758        return 0;
759
760    total_nread = 0;
761    processed_frames = 0;
762
763    iovidx = -1;
764    NEXT_IOV();
765
766    if (stream->uh)
767    {
768        if (stream->uh->uh_flags & UH_H1H)
769        {
770            if (AVAIL())
771            {
772                read_unc_headers = 1;
773                do
774                {
775                    nread = read_uh(stream, p, AVAIL());
776                    p += nread;
777                    total_nread += nread;
778                    if (p == end)
779                        NEXT_IOV();
780                }
781                while (stream->uh && AVAIL());
782            }
783            else
784                read_unc_headers = 0;
785        }
786        else
787        {
788            LSQ_INFO("header set not claimed: cannot read from stream");
789            return -1;
790        }
791    }
792    else
793        read_unc_headers = 0;
794
795    struct data_frame *data_frame;
796    while (AVAIL() && (data_frame = stream->data_in->di_if->di_get_frame(stream->data_in, stream->read_offset)))
797    {
798        ++processed_frames;
799        size_t navail = data_frame->df_size - data_frame->df_read_off;
800        size_t ntowrite = AVAIL();
801        if (navail < ntowrite)
802            ntowrite = navail;
803        memcpy(p, data_frame->df_data + data_frame->df_read_off, ntowrite);
804        p += ntowrite;
805        data_frame->df_read_off += ntowrite;
806        stream->read_offset += ntowrite;
807        total_nread += ntowrite;
808        if (data_frame->df_read_off == data_frame->df_size)
809        {
810            const int fin = data_frame->df_fin;
811            stream->data_in->di_if->di_frame_done(stream->data_in, data_frame);
812            if ((stream->stream_flags & STREAM_AUTOSWITCH) &&
813                    (stream->data_in->di_flags & DI_SWITCH_IMPL))
814            {
815                stream->data_in = stream->data_in->di_if->di_switch_impl(
816                                            stream->data_in, stream->read_offset);
817                if (!stream->data_in)
818                {
819                    stream->data_in = data_in_error_new();
820                    return -1;
821                }
822            }
823            if (fin)
824            {
825                stream->stream_flags |= STREAM_FIN_REACHED;
826                break;
827            }
828        }
829        if (p == end)
830            NEXT_IOV();
831    }
832
833    LSQ_DEBUG("%s: read %zd bytes, read offset %"PRIu64, __func__,
834                                        total_nread, stream->read_offset);
835
836    if (processed_frames)
837    {
838        lsquic_sfcw_set_read_off(&stream->fc, stream->read_offset);
839        if (lsquic_sfcw_fc_offsets_changed(&stream->fc))
840        {
841            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
842                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
843            stream->stream_flags |= STREAM_SEND_WUF;
844            maybe_conn_to_tickable_if_writeable(stream, 1);
845        }
846    }
847
848    if (processed_frames || read_unc_headers)
849    {
850        return total_nread;
851    }
852    else
853    {
854        assert(0 == total_nread);
855        errno = EWOULDBLOCK;
856        return -1;
857    }
858}
859
860
861ssize_t
862lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
863{
864    struct iovec iov = { .iov_base = buf, .iov_len = len, };
865    return lsquic_stream_readv(stream, &iov, 1);
866}
867
868
869static void
870stream_shutdown_read (lsquic_stream_t *stream)
871{
872    if (!(stream->stream_flags & STREAM_U_READ_DONE))
873    {
874        SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
875        stream->stream_flags |= STREAM_U_READ_DONE;
876        stream_wantread(stream, 0);
877        maybe_finish_stream(stream);
878    }
879}
880
881
882static void
883stream_shutdown_write (lsquic_stream_t *stream)
884{
885    if (stream->stream_flags & STREAM_U_WRITE_DONE)
886        return;
887
888    SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_WRITE);
889    stream->stream_flags |= STREAM_U_WRITE_DONE;
890    stream_wantwrite(stream, 0);
891
892    /* Don't bother to check whether there is anything else to write if
893     * the flags indicate that nothing else should be written.
894     */
895    if (!(stream->stream_flags &
896                    (STREAM_FIN_SENT|STREAM_SEND_RST|STREAM_RST_SENT)))
897    {
898        if (stream->sm_n_buffered == 0)
899        {
900            if (0 == lsquic_send_ctl_turn_on_fin(stream->conn_pub->send_ctl,
901                                                 stream))
902            {
903                LSQ_DEBUG("turned on FIN flag in the yet-unsent STREAM frame");
904                stream->stream_flags |= STREAM_FIN_SENT;
905            }
906            else
907            {
908                LSQ_DEBUG("have to create a separate STREAM frame with FIN "
909                          "flag in it");
910                (void) stream_flush_nocheck(stream);
911            }
912        }
913        else
914            (void) stream_flush_nocheck(stream);
915    }
916}
917
918
919int
920lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
921{
922    LSQ_DEBUG("shutdown(stream: %u; how: %d)", stream->id, how);
923    if (lsquic_stream_is_closed(stream))
924    {
925        LSQ_INFO("Attempt to shut down a closed stream %u", stream->id);
926        errno = EBADF;
927        return -1;
928    }
929    /* 0: read, 1: write: 2: read and write
930     */
931    if (how < 0 || how > 2)
932    {
933        errno = EINVAL;
934        return -1;
935    }
936
937    if (how)
938        stream_shutdown_write(stream);
939    if (how != 1)
940        stream_shutdown_read(stream);
941
942    maybe_finish_stream(stream);
943    maybe_schedule_call_on_close(stream);
944    if (how)
945        maybe_conn_to_tickable_if_writeable(stream, 1);
946
947    return 0;
948}
949
950
951void
952lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
953{
954    LSQ_DEBUG("internal shutdown of stream %u", stream->id);
955    if (LSQUIC_STREAM_HANDSHAKE == stream->id
956        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
957                                LSQUIC_STREAM_HEADERS == stream->id))
958    {
959        LSQ_DEBUG("add flag to force-finish special stream %u", stream->id);
960        stream->stream_flags |= STREAM_FORCE_FINISH;
961        SM_HISTORY_APPEND(stream, SHE_FORCE_FINISH);
962    }
963    maybe_finish_stream(stream);
964    maybe_schedule_call_on_close(stream);
965}
966
967
968static void
969fake_reset_unused_stream (lsquic_stream_t *stream)
970{
971    stream->stream_flags |=
972        STREAM_RST_RECVD    /* User will pick this up on read or write */
973      | STREAM_RST_SENT     /* Don't send anything else on this stream */
974    ;
975
976    /* Cancel all writes to the network scheduled for this stream: */
977    if (stream->stream_flags & STREAM_SENDING_FLAGS)
978        TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream,
979                                                next_send_stream);
980    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
981
982    LSQ_DEBUG("fake-reset stream %u%s",
983                    stream->id, stream_stalled(stream) ? " (stalled)" : "");
984    maybe_finish_stream(stream);
985    maybe_schedule_call_on_close(stream);
986}
987
988
989/* This function should only be called for locally-initiated streams whose ID
990 * is larger than that received in GOAWAY frame.  This may occur when GOAWAY
991 * frame sent by peer but we have not yet received it and created a stream.
992 * In this situation, we mark the stream as reset, so that user's on_read or
993 * on_write event callback picks up the error.  That, in turn, should result
994 * in stream being closed.
995 *
996 * If we have received any data frames on this stream, this probably indicates
997 * a bug in peer code: it should not have sent GOAWAY frame with stream ID
998 * lower than this.  However, we still try to handle it gracefully and peform
999 * a shutdown, as if the stream was not reset.
1000 */
1001void
1002lsquic_stream_received_goaway (lsquic_stream_t *stream)
1003{
1004    SM_HISTORY_APPEND(stream, SHE_GOAWAY_IN);
1005    if (0 == stream->read_offset &&
1006                            stream->data_in->di_if->di_empty(stream->data_in))
1007        fake_reset_unused_stream(stream);       /* Normal condition */
1008    else
1009    {   /* This is odd, let's handle it the best we can: */
1010        LSQ_WARN("GOAWAY received but have incoming data: shut down instead");
1011        lsquic_stream_shutdown_internal(stream);
1012    }
1013}
1014
1015
1016uint64_t
1017lsquic_stream_read_offset (const lsquic_stream_t *stream)
1018{
1019    return stream->read_offset;
1020}
1021
1022
1023static int
1024stream_wantread (lsquic_stream_t *stream, int is_want)
1025{
1026    const int old_val = !!(stream->stream_flags & STREAM_WANT_READ);
1027    const int new_val = !!is_want;
1028    if (old_val != new_val)
1029    {
1030        if (new_val)
1031        {
1032            if (!old_val)
1033                TAILQ_INSERT_TAIL(&stream->conn_pub->read_streams, stream,
1034                                                            next_read_stream);
1035            stream->stream_flags |= STREAM_WANT_READ;
1036        }
1037        else
1038        {
1039            stream->stream_flags &= ~STREAM_WANT_READ;
1040            if (old_val)
1041                TAILQ_REMOVE(&stream->conn_pub->read_streams, stream,
1042                                                            next_read_stream);
1043        }
1044    }
1045    return old_val;
1046}
1047
1048
1049static void
1050maybe_put_onto_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1051{
1052    assert(STREAM_WRITE_Q_FLAGS & flag);
1053    if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1054        TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1055                                                        next_write_stream);
1056    stream->stream_flags |= flag;
1057}
1058
1059
1060static void
1061maybe_remove_from_write_q (lsquic_stream_t *stream, enum stream_flags flag)
1062{
1063    assert(STREAM_WRITE_Q_FLAGS & flag);
1064    if (stream->stream_flags & flag)
1065    {
1066        stream->stream_flags &= ~flag;
1067        if (!(stream->stream_flags & STREAM_WRITE_Q_FLAGS))
1068            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1069                                                        next_write_stream);
1070    }
1071}
1072
1073
1074static int
1075stream_wantwrite (lsquic_stream_t *stream, int is_want)
1076{
1077    const int old_val = !!(stream->stream_flags & STREAM_WANT_WRITE);
1078    const int new_val = !!is_want;
1079    if (old_val != new_val)
1080    {
1081        if (new_val)
1082            maybe_put_onto_write_q(stream, STREAM_WANT_WRITE);
1083        else
1084            maybe_remove_from_write_q(stream, STREAM_WANT_WRITE);
1085    }
1086    return old_val;
1087}
1088
1089
1090int
1091lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
1092{
1093    if (!(stream->stream_flags & STREAM_U_READ_DONE))
1094    {
1095        if (is_want)
1096            maybe_conn_to_tickable_if_readable(stream);
1097        return stream_wantread(stream, is_want);
1098    }
1099    else
1100    {
1101        errno = EBADF;
1102        return -1;
1103    }
1104}
1105
1106
1107int
1108lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
1109{
1110    if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
1111    {
1112        if (is_want)
1113            maybe_conn_to_tickable_if_writeable(stream, 1);
1114        return stream_wantwrite(stream, is_want);
1115    }
1116    else
1117    {
1118        errno = EBADF;
1119        return -1;
1120    }
1121}
1122
1123
1124#define USER_PROGRESS_FLAGS (STREAM_WANT_READ|STREAM_WANT_WRITE|            \
1125    STREAM_WANT_FLUSH|STREAM_U_WRITE_DONE|STREAM_U_READ_DONE|STREAM_SEND_RST)
1126
1127
1128static void
1129stream_dispatch_read_events_loop (lsquic_stream_t *stream)
1130{
1131    unsigned no_progress_count, no_progress_limit;
1132    enum stream_flags flags;
1133    uint64_t size;
1134
1135    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1136
1137    no_progress_count = 0;
1138    while ((stream->stream_flags & STREAM_WANT_READ)
1139                                            && lsquic_stream_readable(stream))
1140    {
1141        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1142        size  = stream->read_offset;
1143
1144        stream->stream_if->on_read(stream, stream->st_ctx);
1145
1146        if (no_progress_limit && size == stream->read_offset &&
1147                        flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1148        {
1149            ++no_progress_count;
1150            if (no_progress_count >= no_progress_limit)
1151            {
1152                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1153                    "progress) in user code reading from stream",
1154                    no_progress_count,
1155                    no_progress_count == 1 ? "" : "s");
1156                break;
1157            }
1158        }
1159        else
1160            no_progress_count = 0;
1161    }
1162}
1163
1164
1165static void
1166stream_dispatch_write_events_loop (lsquic_stream_t *stream)
1167{
1168    unsigned no_progress_count, no_progress_limit;
1169    enum stream_flags flags;
1170
1171    no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
1172
1173    no_progress_count = 0;
1174    stream->stream_flags |= STREAM_LAST_WRITE_OK;
1175    while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
1176                                == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
1177           && lsquic_stream_write_avail(stream))
1178    {
1179        flags = stream->stream_flags & USER_PROGRESS_FLAGS;
1180
1181        stream->stream_if->on_write(stream, stream->st_ctx);
1182
1183        if (no_progress_limit &&
1184            flags == (stream->stream_flags & USER_PROGRESS_FLAGS))
1185        {
1186            ++no_progress_count;
1187            if (no_progress_count >= no_progress_limit)
1188            {
1189                LSQ_WARN("broke suspected infinite loop (%u callback%s without "
1190                    "progress) in user code writing to stream",
1191                    no_progress_count,
1192                    no_progress_count == 1 ? "" : "s");
1193                break;
1194            }
1195        }
1196        else
1197            no_progress_count = 0;
1198    }
1199}
1200
1201
1202static void
1203stream_dispatch_read_events_once (lsquic_stream_t *stream)
1204{
1205    if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
1206    {
1207        stream->stream_if->on_read(stream, stream->st_ctx);
1208    }
1209}
1210
1211
1212static void
1213maybe_mark_as_blocked (lsquic_stream_t *stream)
1214{
1215    struct lsquic_conn_cap *cc;
1216
1217    if (stream->max_send_off == stream->tosend_off + stream->sm_n_buffered)
1218    {
1219        if (stream->blocked_off < stream->max_send_off)
1220        {
1221            stream->blocked_off = stream->max_send_off + stream->sm_n_buffered;
1222            if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1223                TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1224                                                            next_send_stream);
1225            stream->stream_flags |= STREAM_SEND_BLOCKED;
1226            LSQ_DEBUG("marked stream-blocked at stream offset "
1227                                            "%"PRIu64, stream->blocked_off);
1228        }
1229        else
1230            LSQ_DEBUG("stream is blocked, but BLOCKED frame for offset %"PRIu64
1231                " has been, or is about to be, sent", stream->blocked_off);
1232    }
1233
1234    if ((stream->stream_flags & STREAM_CONN_LIMITED)
1235        && (cc = &stream->conn_pub->conn_cap,
1236                stream->sm_n_buffered == lsquic_conn_cap_avail(cc)))
1237    {
1238        if (cc->cc_blocked < cc->cc_max)
1239        {
1240            cc->cc_blocked = cc->cc_max;
1241            stream->conn_pub->lconn->cn_flags |= LSCONN_SEND_BLOCKED;
1242            LSQ_DEBUG("marked connection-blocked at connection offset "
1243                                                    "%"PRIu64, cc->cc_max);
1244        }
1245        else
1246            LSQ_DEBUG("stream has already been marked connection-blocked "
1247                "at offset %"PRIu64, cc->cc_blocked);
1248    }
1249}
1250
1251
1252void
1253lsquic_stream_dispatch_read_events (lsquic_stream_t *stream)
1254{
1255    assert(stream->stream_flags & STREAM_WANT_READ);
1256
1257    if (stream->stream_flags & STREAM_RW_ONCE)
1258        stream_dispatch_read_events_once(stream);
1259    else
1260        stream_dispatch_read_events_loop(stream);
1261}
1262
1263
1264void
1265lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
1266{
1267    int progress;
1268    uint64_t tosend_off;
1269    unsigned short n_buffered;
1270    enum stream_flags flags;
1271
1272    assert(stream->stream_flags & STREAM_WRITE_Q_FLAGS);
1273    flags = stream->stream_flags & STREAM_WRITE_Q_FLAGS;
1274    tosend_off = stream->tosend_off;
1275    n_buffered = stream->sm_n_buffered;
1276
1277    if (stream->stream_flags & STREAM_WANT_FLUSH)
1278        (void) stream_flush(stream);
1279
1280    if (stream->stream_flags & STREAM_RW_ONCE)
1281    {
1282        if ((stream->stream_flags & STREAM_WANT_WRITE)
1283            && lsquic_stream_write_avail(stream))
1284        {
1285            stream->stream_if->on_write(stream, stream->st_ctx);
1286        }
1287    }
1288    else
1289        stream_dispatch_write_events_loop(stream);
1290
1291    /* Progress means either flags or offsets changed: */
1292    progress = !((stream->stream_flags & STREAM_WRITE_Q_FLAGS) == flags &&
1293                        stream->tosend_off == tosend_off &&
1294                            stream->sm_n_buffered == n_buffered);
1295
1296    if (stream->stream_flags & STREAM_WRITE_Q_FLAGS)
1297    {
1298        if (progress)
1299        {   /* Move the stream to the end of the list to ensure fairness. */
1300            TAILQ_REMOVE(&stream->conn_pub->write_streams, stream,
1301                                                            next_write_stream);
1302            TAILQ_INSERT_TAIL(&stream->conn_pub->write_streams, stream,
1303                                                            next_write_stream);
1304        }
1305    }
1306}
1307
1308
1309static size_t
1310inner_reader_empty_size (void *ctx)
1311{
1312    return 0;
1313}
1314
1315
1316static size_t
1317inner_reader_empty_read (void *ctx, void *buf, size_t count)
1318{
1319    return 0;
1320}
1321
1322
1323static int
1324stream_flush (lsquic_stream_t *stream)
1325{
1326    struct lsquic_reader empty_reader;
1327    ssize_t nw;
1328
1329    assert(stream->stream_flags & STREAM_WANT_FLUSH);
1330    assert(stream->sm_n_buffered > 0 ||
1331        /* Flushing is also used to packetize standalone FIN: */
1332        ((stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))
1333                                                    == STREAM_U_WRITE_DONE));
1334
1335    empty_reader.lsqr_size = inner_reader_empty_size;
1336    empty_reader.lsqr_read = inner_reader_empty_read;
1337    empty_reader.lsqr_ctx  = NULL;  /* pro forma */
1338    nw = stream_write_to_packets(stream, &empty_reader, 0);
1339
1340    if (nw >= 0)
1341    {
1342        assert(nw == 0);    /* Empty reader: must have read zero bytes */
1343        return 0;
1344    }
1345    else
1346        return -1;
1347}
1348
1349
1350static int
1351stream_flush_nocheck (lsquic_stream_t *stream)
1352{
1353    stream->sm_flush_to = stream->tosend_off + stream->sm_n_buffered;
1354    maybe_put_onto_write_q(stream, STREAM_WANT_FLUSH);
1355    LSQ_DEBUG("will flush up to offset %"PRIu64, stream->sm_flush_to);
1356
1357    return stream_flush(stream);
1358}
1359
1360
1361int
1362lsquic_stream_flush (lsquic_stream_t *stream)
1363{
1364    if (stream->stream_flags & STREAM_U_WRITE_DONE)
1365    {
1366        LSQ_DEBUG("cannot flush closed stream");
1367        errno = EBADF;
1368        return -1;
1369    }
1370
1371    if (0 == stream->sm_n_buffered)
1372    {
1373        LSQ_DEBUG("flushing 0 bytes: noop");
1374        return 0;
1375    }
1376
1377    return stream_flush_nocheck(stream);
1378}
1379
1380
1381/* The flush threshold is the maximum size of stream data that can be sent
1382 * in a full packet.
1383 */
1384#ifdef NDEBUG
1385static
1386#endif
1387       size_t
1388lsquic_stream_flush_threshold (const struct lsquic_stream *stream)
1389{
1390    enum packet_out_flags flags;
1391    enum packno_bits bits;
1392    unsigned packet_header_sz, stream_header_sz;
1393    size_t threshold;
1394
1395    bits = lsquic_send_ctl_packno_bits(stream->conn_pub->send_ctl);
1396    flags = bits << POBIT_SHIFT;
1397    if (!(stream->conn_pub->lconn->cn_flags & LSCONN_TCID0))
1398        flags |= PO_CONN_ID;
1399    if (LSQUIC_STREAM_HANDSHAKE == stream->id)
1400        flags |= PO_LONGHEAD;
1401
1402    packet_header_sz = lsquic_po_header_length(stream->conn_pub->lconn, flags);
1403    stream_header_sz = stream->conn_pub->lconn->cn_pf
1404            ->pf_calc_stream_frame_header_sz(stream->id, stream->tosend_off);
1405
1406    threshold = stream->conn_pub->lconn->cn_pack_size - QUIC_PACKET_HASH_SZ
1407              - packet_header_sz - stream_header_sz;
1408    return threshold;
1409}
1410
1411
1412#define COMMON_WRITE_CHECKS() do {                                          \
1413    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))   \
1414                                                   == STREAM_USE_HEADERS)   \
1415    {                                                                       \
1416        LSQ_INFO("Attempt to write to stream before sending HTTP headers"); \
1417        errno = EILSEQ;                                                     \
1418        return -1;                                                          \
1419    }                                                                       \
1420    if (stream->stream_flags & STREAM_RST_FLAGS)                            \
1421    {                                                                       \
1422        LSQ_INFO("Attempt to write to stream after it had been reset");     \
1423        errno = ECONNRESET;                                                 \
1424        return -1;                                                          \
1425    }                                                                       \
1426    if (stream->stream_flags & (STREAM_U_WRITE_DONE|STREAM_FIN_SENT))       \
1427    {                                                                       \
1428        LSQ_INFO("Attempt to write to stream after it was closed for "      \
1429                                                                "writing"); \
1430        errno = EBADF;                                                      \
1431        return -1;                                                          \
1432    }                                                                       \
1433} while (0)
1434
1435
1436struct frame_gen_ctx
1437{
1438    lsquic_stream_t      *fgc_stream;
1439    struct lsquic_reader *fgc_reader;
1440    /* We keep our own count of how many bytes were read from reader because
1441     * some readers are external.  The external caller does not have to rely
1442     * on our count, but it can.
1443     */
1444    size_t                fgc_nread_from_reader;
1445};
1446
1447
1448static size_t
1449frame_gen_size (void *ctx)
1450{
1451    struct frame_gen_ctx *fg_ctx = ctx;
1452    size_t available, remaining;
1453
1454    /* Make sure we are not writing past available size: */
1455    remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1456    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1457    if (available < remaining)
1458        remaining = available;
1459
1460    return remaining + fg_ctx->fgc_stream->sm_n_buffered;
1461}
1462
1463
1464static int
1465frame_gen_fin (void *ctx)
1466{
1467    struct frame_gen_ctx *fg_ctx = ctx;
1468    return fg_ctx->fgc_stream->stream_flags & STREAM_U_WRITE_DONE
1469        && 0 == fg_ctx->fgc_stream->sm_n_buffered
1470        /* Do not use frame_gen_size() as it may chop the real size: */
1471        && 0 == fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
1472}
1473
1474
1475static void
1476incr_conn_cap (struct lsquic_stream *stream, size_t incr)
1477{
1478    if (stream->stream_flags & STREAM_CONN_LIMITED)
1479    {
1480        stream->conn_pub->conn_cap.cc_sent += incr;
1481        assert(stream->conn_pub->conn_cap.cc_sent
1482                                    <= stream->conn_pub->conn_cap.cc_max);
1483    }
1484}
1485
1486
1487static size_t
1488frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
1489{
1490    struct frame_gen_ctx *fg_ctx = ctx;
1491    unsigned char *p = begin_buf;
1492    unsigned char *const end = p + len;
1493    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1494    size_t n_written, available, n_to_write;
1495
1496    if (stream->sm_n_buffered > 0)
1497    {
1498        if (len <= stream->sm_n_buffered)
1499        {
1500            memcpy(p, stream->sm_buf, len);
1501            memmove(stream->sm_buf, stream->sm_buf + len,
1502                                                stream->sm_n_buffered - len);
1503            stream->sm_n_buffered -= len;
1504            stream->tosend_off += len;
1505            *fin = frame_gen_fin(fg_ctx);
1506            return len;
1507        }
1508        memcpy(p, stream->sm_buf, stream->sm_n_buffered);
1509        p += stream->sm_n_buffered;
1510        stream->sm_n_buffered = 0;
1511    }
1512
1513    available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
1514    n_to_write = end - p;
1515    if (n_to_write > available)
1516        n_to_write = available;
1517    n_written = fg_ctx->fgc_reader->lsqr_read(fg_ctx->fgc_reader->lsqr_ctx, p,
1518                                              n_to_write);
1519    p += n_written;
1520    fg_ctx->fgc_nread_from_reader += n_written;
1521    *fin = frame_gen_fin(fg_ctx);
1522    stream->tosend_off += p - (const unsigned char *) begin_buf;
1523    incr_conn_cap(stream, n_written);
1524    return p - (const unsigned char *) begin_buf;
1525}
1526
1527
1528static void
1529check_flush_threshold (lsquic_stream_t *stream)
1530{
1531    if ((stream->stream_flags & STREAM_WANT_FLUSH) &&
1532                            stream->tosend_off >= stream->sm_flush_to)
1533    {
1534        LSQ_DEBUG("flushed to or past required offset %"PRIu64,
1535                                                    stream->sm_flush_to);
1536        maybe_remove_from_write_q(stream, STREAM_WANT_FLUSH);
1537    }
1538}
1539
1540
1541static struct lsquic_packet_out *
1542get_brand_new_packet (struct lsquic_send_ctl *ctl, unsigned need_at_least,
1543                      const struct lsquic_stream *stream)
1544{
1545    return lsquic_send_ctl_new_packet_out(ctl, need_at_least);
1546}
1547
1548
1549static struct lsquic_packet_out * (* const get_packet[])(
1550    struct lsquic_send_ctl *, unsigned, const struct lsquic_stream *) =
1551{
1552    lsquic_send_ctl_get_packet_for_stream,
1553    get_brand_new_packet,
1554};
1555
1556
1557static enum { SWTP_OK, SWTP_STOP, SWTP_ERROR }
1558stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
1559{
1560    lsquic_stream_t *const stream = fg_ctx->fgc_stream;
1561    const struct parse_funcs *const pf = stream->conn_pub->lconn->cn_pf;
1562    struct lsquic_send_ctl *const send_ctl = stream->conn_pub->send_ctl;
1563    unsigned stream_header_sz, need_at_least, off;
1564    lsquic_packet_out_t *packet_out;
1565    int len, s, hsk;
1566
1567    if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
1568                                                        == STREAM_HEADERS_SENT
1569        && lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream))
1570    {
1571        struct lsquic_stream *const headers_stream
1572                = lsquic_headers_stream_get_stream(stream->conn_pub->hs);
1573        if (lsquic_stream_has_data_to_flush(headers_stream))
1574        {
1575            LSQ_DEBUG("flushing headers stream before potential write to a "
1576                                                            "buffered packet");
1577            (void) lsquic_stream_flush(headers_stream);
1578        }
1579        else
1580            /* Some other stream must have flushed it: this means our headers
1581             * are flushed.
1582             */
1583            stream->stream_flags |= STREAM_HDRS_FLUSHED;
1584    }
1585
1586    stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
1587                                                        stream->tosend_off);
1588    need_at_least = stream_header_sz + (size > 0);
1589    hsk = LSQUIC_STREAM_HANDSHAKE == stream->id;
1590  get_packet:
1591    packet_out = get_packet[hsk](send_ctl, need_at_least, stream);
1592    if (!packet_out)
1593        return SWTP_STOP;
1594    if (hsk)
1595        packet_out->po_header_type = stream->tosend_off == 0
1596                                            ? HETY_INITIAL : HETY_HANDSHAKE;
1597
1598#if LSQUIC_CONN_STATS
1599    const uint64_t begin_off = stream->tosend_off;
1600#endif
1601    off = packet_out->po_data_sz;
1602    len = pf->pf_gen_stream_frame(
1603                packet_out->po_data + packet_out->po_data_sz,
1604                lsquic_packet_out_avail(packet_out), stream->id,
1605                stream->tosend_off,
1606                frame_gen_fin(fg_ctx), size, frame_gen_read, fg_ctx);
1607    if (len < 0)
1608    {
1609        if (-len > (int) need_at_least)
1610        {
1611            LSQ_DEBUG("need more room (%d bytes) than initially calculated "
1612                "%u bytes, will try again", -len, need_at_least);
1613            need_at_least = -len;
1614            goto get_packet;
1615        }
1616        else
1617        {
1618            LSQ_ERROR("could not generate stream frame");
1619            return SWTP_ERROR;
1620        }
1621    }
1622
1623#if LSQUIC_CONN_STATS
1624    stream->conn_pub->conn_stats->out.stream_frames += 1;
1625    stream->conn_pub->conn_stats->out.stream_data_sz
1626                                            += stream->tosend_off - begin_off;
1627#endif
1628    EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, pf,
1629                            packet_out->po_data + packet_out->po_data_sz, len);
1630    lsquic_send_ctl_incr_pack_sz(send_ctl, packet_out, len);
1631    packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM;
1632    if (0 == lsquic_packet_out_avail(packet_out))
1633        packet_out->po_flags |= PO_STREAM_END;
1634    s = lsquic_packet_out_add_stream(packet_out, stream->conn_pub->mm,
1635                                     stream, QUIC_FRAME_STREAM, off, len);
1636    if (s != 0)
1637    {
1638        LSQ_ERROR("adding stream to packet failed: %s", strerror(errno));
1639        return SWTP_ERROR;
1640    }
1641
1642    check_flush_threshold(stream);
1643
1644    /* XXX: I don't like it that this is here */
1645    if (hsk && !(packet_out->po_flags & PO_HELLO))
1646    {
1647        lsquic_packet_out_zero_pad(packet_out);
1648        packet_out->po_flags |= PO_HELLO;
1649        lsquic_send_ctl_scheduled_one(send_ctl, packet_out);
1650    }
1651
1652    return SWTP_OK;
1653}
1654
1655
1656static void
1657abort_connection (struct lsquic_stream *stream)
1658{
1659    if (0 == (stream->stream_flags & STREAM_SERVICE_FLAGS))
1660        TAILQ_INSERT_TAIL(&stream->conn_pub->service_streams, stream,
1661                                                next_service_stream);
1662    stream->stream_flags |= STREAM_ABORT_CONN;
1663    LSQ_WARN("connection will be aborted");
1664    maybe_conn_to_tickable(stream);
1665}
1666
1667
1668static ssize_t
1669stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
1670                         size_t thresh)
1671{
1672    size_t size;
1673    ssize_t nw;
1674    unsigned seen_ok;
1675    struct frame_gen_ctx fg_ctx = {
1676        .fgc_stream = stream,
1677        .fgc_reader = reader,
1678        .fgc_nread_from_reader = 0,
1679    };
1680
1681    seen_ok = 0;
1682    while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
1683           || frame_gen_fin(&fg_ctx))
1684    {
1685        switch (stream_write_to_packet(&fg_ctx, size))
1686        {
1687        case SWTP_OK:
1688            if (!seen_ok++)
1689                maybe_conn_to_tickable_if_writeable(stream, 0);
1690            if (frame_gen_fin(&fg_ctx))
1691            {
1692                stream->stream_flags |= STREAM_FIN_SENT;
1693                goto end;
1694            }
1695            else
1696                break;
1697        case SWTP_STOP:
1698            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1699            goto end;
1700        default:
1701            abort_connection(stream);
1702            stream->stream_flags &= ~STREAM_LAST_WRITE_OK;
1703            return -1;
1704        }
1705    }
1706
1707    if (thresh)
1708    {
1709        assert(size < thresh);
1710        assert(size >= stream->sm_n_buffered);
1711        size -= stream->sm_n_buffered;
1712        if (size > 0)
1713        {
1714            nw = save_to_buffer(stream, reader, size);
1715            if (nw < 0)
1716                return -1;
1717            fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
1718        }
1719    }
1720    else
1721    {
1722        /* We count flushed data towards both stream and connection limits,
1723         * so we should have been able to packetize all of it:
1724         */
1725        assert(0 == stream->sm_n_buffered);
1726        assert(size == 0);
1727    }
1728
1729    maybe_mark_as_blocked(stream);
1730
1731  end:
1732    return fg_ctx.fgc_nread_from_reader;
1733}
1734
1735
1736/* Perform an implicit flush when we hit connection limit while buffering
1737 * data.  This is to prevent a (theoretical) stall:
1738 *
1739 * Imagine a number of streams, all of which buffered some data.  The buffered
1740 * data is up to connection cap, which means no further writes are possible.
1741 * None of them flushes, which means that data is not sent and connection
1742 * WINDOW_UPDATE frame never arrives from peer.  Stall.
1743 */
1744static int
1745maybe_flush_stream (struct lsquic_stream *stream)
1746{
1747    if (stream->sm_n_buffered > 0
1748          && (stream->stream_flags & STREAM_CONN_LIMITED)
1749            && lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
1750        return stream_flush_nocheck(stream);
1751    else
1752        return 0;
1753}
1754
1755
1756static ssize_t
1757save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
1758                                                                size_t len)
1759{
1760    size_t avail, n_written;
1761
1762    assert(stream->sm_n_buffered + len <= SM_BUF_SIZE);
1763
1764    if (!stream->sm_buf)
1765    {
1766        stream->sm_buf = malloc(SM_BUF_SIZE);
1767        if (!stream->sm_buf)
1768            return -1;
1769    }
1770
1771    avail = lsquic_stream_write_avail(stream);
1772    if (avail < len)
1773        len = avail;
1774
1775    n_written = reader->lsqr_read(reader->lsqr_ctx,
1776                        stream->sm_buf + stream->sm_n_buffered, len);
1777    stream->sm_n_buffered += n_written;
1778    incr_conn_cap(stream, n_written);
1779    LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
1780              n_written, stream->sm_n_buffered);
1781    if (0 != maybe_flush_stream(stream))
1782        return -1;
1783    return n_written;
1784}
1785
1786
1787static ssize_t
1788stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
1789{
1790    size_t thresh, len;
1791
1792    thresh = lsquic_stream_flush_threshold(stream);
1793    len = reader->lsqr_size(reader->lsqr_ctx);
1794    if (stream->sm_n_buffered + len <= SM_BUF_SIZE &&
1795                                    stream->sm_n_buffered + len < thresh)
1796        return save_to_buffer(stream, reader, len);
1797    else
1798        return stream_write_to_packets(stream, reader, thresh);
1799}
1800
1801
1802ssize_t
1803lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len)
1804{
1805    struct iovec iov = { .iov_base = (void *) buf, .iov_len = len, };
1806    return lsquic_stream_writev(stream, &iov, 1);
1807}
1808
1809
1810struct inner_reader_iovec {
1811    const struct iovec       *iov;
1812    const struct iovec *end;
1813    unsigned                  cur_iovec_off;
1814};
1815
1816
1817static size_t
1818inner_reader_iovec_read (void *ctx, void *buf, size_t count)
1819{
1820    struct inner_reader_iovec *const iro = ctx;
1821    unsigned char *p = buf;
1822    unsigned char *const end = p + count;
1823    unsigned n_tocopy;
1824
1825    while (iro->iov < iro->end && p < end)
1826    {
1827        n_tocopy = iro->iov->iov_len - iro->cur_iovec_off;
1828        if (n_tocopy > (unsigned) (end - p))
1829            n_tocopy = end - p;
1830        memcpy(p, (unsigned char *) iro->iov->iov_base + iro->cur_iovec_off,
1831                                                                    n_tocopy);
1832        p += n_tocopy;
1833        iro->cur_iovec_off += n_tocopy;
1834        if (iro->iov->iov_len == iro->cur_iovec_off)
1835        {
1836            ++iro->iov;
1837            iro->cur_iovec_off = 0;
1838        }
1839    }
1840
1841    return p + count - end;
1842}
1843
1844
1845static size_t
1846inner_reader_iovec_size (void *ctx)
1847{
1848    struct inner_reader_iovec *const iro = ctx;
1849    const struct iovec *iov;
1850    size_t size;
1851
1852    size = 0;
1853    for (iov = iro->iov; iov < iro->end; ++iov)
1854        size += iov->iov_len;
1855
1856    return size - iro->cur_iovec_off;
1857}
1858
1859
1860ssize_t
1861lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
1862                                                                    int iovcnt)
1863{
1864    COMMON_WRITE_CHECKS();
1865    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1866
1867    struct inner_reader_iovec iro = {
1868        .iov = iov,
1869        .end = iov + iovcnt,
1870        .cur_iovec_off = 0,
1871    };
1872    struct lsquic_reader reader = {
1873        .lsqr_read = inner_reader_iovec_read,
1874        .lsqr_size = inner_reader_iovec_size,
1875        .lsqr_ctx  = &iro,
1876    };
1877
1878    return stream_write(stream, &reader);
1879}
1880
1881
1882ssize_t
1883lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
1884{
1885    COMMON_WRITE_CHECKS();
1886    SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
1887    return stream_write(stream, reader);
1888}
1889
1890
1891int
1892lsquic_stream_send_headers (lsquic_stream_t *stream,
1893                            const lsquic_http_headers_t *headers, int eos)
1894{
1895    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT|
1896                                                     STREAM_U_WRITE_DONE))
1897                == STREAM_USE_HEADERS)
1898    {
1899        int s = lsquic_headers_stream_send_headers(stream->conn_pub->hs,
1900                    stream->id, headers, eos, lsquic_stream_priority(stream));
1901        if (0 == s)
1902        {
1903            SM_HISTORY_APPEND(stream, SHE_USER_WRITE_HEADER);
1904            stream->stream_flags |= STREAM_HEADERS_SENT;
1905            if (eos)
1906                stream->stream_flags |= STREAM_FIN_SENT;
1907            LSQ_INFO("sent headers for stream %u", stream->id);
1908        }
1909        else
1910            LSQ_WARN("could not send headers: %s", strerror(errno));
1911        return s;
1912    }
1913    else
1914    {
1915        LSQ_INFO("cannot send headers for stream %u in this state", stream->id);
1916        errno = EBADMSG;
1917        return -1;
1918    }
1919}
1920
1921
1922void
1923lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset)
1924{
1925    if (offset > stream->max_send_off)
1926    {
1927        SM_HISTORY_APPEND(stream, SHE_WINDOW_UPDATE);
1928        LSQ_DEBUG("stream %u: update max send offset from 0x%"PRIX64" to "
1929            "0x%"PRIX64, stream->id, stream->max_send_off, offset);
1930        stream->max_send_off = offset;
1931    }
1932    else
1933        LSQ_DEBUG("stream %u: new offset 0x%"PRIX64" is not larger than old "
1934            "max send offset 0x%"PRIX64", ignoring", stream->id, offset,
1935            stream->max_send_off);
1936}
1937
1938
1939/* This function is used to update offsets after handshake completes and we
1940 * learn of peer's limits from the handshake values.
1941 */
1942int
1943lsquic_stream_set_max_send_off (lsquic_stream_t *stream, unsigned offset)
1944{
1945    LSQ_DEBUG("setting max_send_off to %u", offset);
1946    if (offset > stream->max_send_off)
1947    {
1948        lsquic_stream_window_update(stream, offset);
1949        return 0;
1950    }
1951    else if (offset < stream->tosend_off)
1952    {
1953        LSQ_INFO("new offset (%u bytes) is smaller than the amount of data "
1954            "already sent on this stream (%"PRIu64" bytes)", offset,
1955            stream->tosend_off);
1956        return -1;
1957    }
1958    else
1959    {
1960        stream->max_send_off = offset;
1961        return 0;
1962    }
1963}
1964
1965
1966void
1967lsquic_stream_reset (lsquic_stream_t *stream, uint32_t error_code)
1968{
1969    lsquic_stream_reset_ext(stream, error_code, 1);
1970}
1971
1972
1973void
1974lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
1975                         int do_close)
1976{
1977    if (stream->stream_flags & (STREAM_SEND_RST|STREAM_RST_SENT))
1978    {
1979        LSQ_INFO("reset already sent");
1980        return;
1981    }
1982
1983    SM_HISTORY_APPEND(stream, SHE_RESET);
1984
1985    LSQ_INFO("reset stream %u, error code 0x%X", stream->id, error_code);
1986    stream->error_code = error_code;
1987
1988    if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
1989        TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
1990                                                        next_send_stream);
1991    stream->stream_flags &= ~STREAM_SENDING_FLAGS;
1992    stream->stream_flags |= STREAM_SEND_RST;
1993
1994    drop_buffered_data(stream);
1995    maybe_elide_stream_frames(stream);
1996    maybe_schedule_call_on_close(stream);
1997
1998    if (do_close)
1999        lsquic_stream_close(stream);
2000    else
2001        maybe_conn_to_tickable_if_writeable(stream, 1);
2002}
2003
2004
2005unsigned
2006lsquic_stream_id (const lsquic_stream_t *stream)
2007{
2008    return stream->id;
2009}
2010
2011
2012struct lsquic_conn *
2013lsquic_stream_conn (const lsquic_stream_t *stream)
2014{
2015    return stream->conn_pub->lconn;
2016}
2017
2018
2019int
2020lsquic_stream_close (lsquic_stream_t *stream)
2021{
2022    LSQ_DEBUG("lsquic_stream_close(stream %u) called", stream->id);
2023    SM_HISTORY_APPEND(stream, SHE_CLOSE);
2024    if (lsquic_stream_is_closed(stream))
2025    {
2026        LSQ_INFO("Attempt to close an already-closed stream %u", stream->id);
2027        errno = EBADF;
2028        return -1;
2029    }
2030    stream_shutdown_write(stream);
2031    stream_shutdown_read(stream);
2032    maybe_schedule_call_on_close(stream);
2033    maybe_finish_stream(stream);
2034    maybe_conn_to_tickable_if_writeable(stream, 1);
2035    return 0;
2036}
2037
2038
2039#ifndef NDEBUG
2040#if __GNUC__
2041__attribute__((weak))
2042#endif
2043#endif
2044void
2045lsquic_stream_acked (lsquic_stream_t *stream)
2046{
2047    assert(stream->n_unacked);
2048    --stream->n_unacked;
2049    LSQ_DEBUG("stream %u ACKed; n_unacked: %u", stream->id, stream->n_unacked);
2050    if (0 == stream->n_unacked)
2051        maybe_finish_stream(stream);
2052}
2053
2054
2055void
2056lsquic_stream_push_req (lsquic_stream_t *stream,
2057                        struct uncompressed_headers *push_req)
2058{
2059    assert(!stream->push_req);
2060    stream->push_req = push_req;
2061    stream->stream_flags |= STREAM_U_WRITE_DONE;    /* Writing not allowed */
2062}
2063
2064
2065int
2066lsquic_stream_is_pushed (const lsquic_stream_t *stream)
2067{
2068    return 1 & ~stream->id;
2069}
2070
2071
2072int
2073lsquic_stream_push_info (const lsquic_stream_t *stream,
2074                                        uint32_t *ref_stream_id, void **hset)
2075{
2076    if (lsquic_stream_is_pushed(stream))
2077    {
2078        assert(stream->push_req);
2079        *ref_stream_id = stream->push_req->uh_stream_id;
2080        *hset          = stream->push_req->uh_hset;
2081        return 0;
2082    }
2083    else
2084        return -1;
2085}
2086
2087
2088int
2089lsquic_stream_uh_in (lsquic_stream_t *stream, struct uncompressed_headers *uh)
2090{
2091    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH)) == STREAM_USE_HEADERS)
2092    {
2093        SM_HISTORY_APPEND(stream, SHE_HEADERS_IN);
2094        LSQ_DEBUG("received uncompressed headers for stream %u", stream->id);
2095        stream->stream_flags |= STREAM_HAVE_UH;
2096        if (uh->uh_flags & UH_FIN)
2097            stream->stream_flags |= STREAM_FIN_RECVD|STREAM_HEAD_IN_FIN;
2098        stream->uh = uh;
2099        if (uh->uh_oth_stream_id == 0)
2100        {
2101            if (uh->uh_weight)
2102                lsquic_stream_set_priority_internal(stream, uh->uh_weight);
2103        }
2104        else
2105            LSQ_NOTICE("don't know how to depend on stream %u",
2106                                                        uh->uh_oth_stream_id);
2107        return 0;
2108    }
2109    else
2110    {
2111        LSQ_ERROR("received unexpected uncompressed headers for stream %u", stream->id);
2112        return -1;
2113    }
2114}
2115
2116
2117unsigned
2118lsquic_stream_priority (const lsquic_stream_t *stream)
2119{
2120    return 256 - stream->sm_priority;
2121}
2122
2123
2124int
2125lsquic_stream_set_priority_internal (lsquic_stream_t *stream, unsigned priority)
2126{
2127    /* The user should never get a reference to the special streams,
2128     * but let's check just in case:
2129     */
2130    if (LSQUIC_STREAM_HANDSHAKE == stream->id
2131        || ((stream->stream_flags & STREAM_USE_HEADERS) &&
2132                                LSQUIC_STREAM_HEADERS == stream->id))
2133        return -1;
2134    if (priority < 1 || priority > 256)
2135        return -1;
2136    stream->sm_priority = 256 - priority;
2137    lsquic_send_ctl_invalidate_bpt_cache(stream->conn_pub->send_ctl);
2138    LSQ_DEBUG("set priority to %u", priority);
2139    SM_HISTORY_APPEND(stream, SHE_SET_PRIO);
2140    return 0;
2141}
2142
2143
2144int
2145lsquic_stream_set_priority (lsquic_stream_t *stream, unsigned priority)
2146{
2147    if (0 == lsquic_stream_set_priority_internal(stream, priority))
2148    {
2149        if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HEADERS_SENT)) ==
2150                                       (STREAM_USE_HEADERS|STREAM_HEADERS_SENT))
2151        {
2152            /* We need to send headers only if we are a) using HEADERS stream
2153             * and b) we already sent initial headers.  If initial headers
2154             * have not been sent yet, stream priority will be sent in the
2155             * HEADERS frame.
2156             */
2157            return lsquic_headers_stream_send_priority(stream->conn_pub->hs,
2158                                                    stream->id, 0, 0, priority);
2159        }
2160        else
2161            return 0;
2162    }
2163    else
2164        return -1;
2165}
2166
2167
2168lsquic_stream_ctx_t *
2169lsquic_stream_get_ctx (const lsquic_stream_t *stream)
2170{
2171    return stream->st_ctx;
2172}
2173
2174
2175int
2176lsquic_stream_refuse_push (lsquic_stream_t *stream)
2177{
2178    if (lsquic_stream_is_pushed(stream) &&
2179                !(stream->stream_flags & (STREAM_RST_SENT|STREAM_SEND_RST)))
2180    {
2181        LSQ_DEBUG("refusing pushed stream: send reset");
2182        lsquic_stream_reset_ext(stream, 8 /* QUIC_REFUSED_STREAM */, 1);
2183        return 0;
2184    }
2185    else
2186        return -1;
2187}
2188
2189
2190size_t
2191lsquic_stream_mem_used (const struct lsquic_stream *stream)
2192{
2193    size_t size;
2194
2195    size = sizeof(stream);
2196    if (stream->sm_buf)
2197        size += SM_BUF_SIZE;
2198    if (stream->data_in)
2199        size += stream->data_in->di_if->di_mem_used(stream->data_in);
2200
2201    return size;
2202}
2203
2204
2205lsquic_cid_t
2206lsquic_stream_cid (const struct lsquic_stream *stream)
2207{
2208    return LSQUIC_LOG_CONN_ID;
2209}
2210
2211
2212void *
2213lsquic_stream_get_hset (struct lsquic_stream *stream)
2214{
2215    void *hset;
2216
2217    if (stream->stream_flags & STREAM_RST_FLAGS)
2218    {
2219        LSQ_INFO("%s: stream is reset, no headers returned", __func__);
2220        errno = ECONNRESET;
2221        return NULL;
2222    }
2223
2224    if ((stream->stream_flags & (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2225                                        != (STREAM_USE_HEADERS|STREAM_HAVE_UH))
2226    {
2227        LSQ_INFO("%s: unexpected call, flags: 0x%X", __func__,
2228                                                        stream->stream_flags);
2229        return NULL;
2230    }
2231
2232    if (!stream->uh)
2233    {
2234        LSQ_INFO("%s: headers unavailable (already fetched?)", __func__);
2235        return NULL;
2236    }
2237
2238    if (stream->uh->uh_flags & UH_H1H)
2239    {
2240        LSQ_INFO("%s: uncompressed headers have internal format", __func__);
2241        return NULL;
2242    }
2243
2244    hset = stream->uh->uh_hset;
2245    stream->uh->uh_hset = NULL;
2246    destroy_uh(stream);
2247    if (stream->stream_flags & STREAM_HEAD_IN_FIN)
2248    {
2249        stream->stream_flags |= STREAM_FIN_REACHED;
2250        SM_HISTORY_APPEND(stream, SHE_REACH_FIN);
2251    }
2252    LSQ_DEBUG("return header set");
2253    return hset;
2254}
2255