lsquic_stream.h revision 55613f44
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2#ifndef LSQUIC_STREAM_H
3#define LSQUIC_STREAM_H
4
5#define LSQUIC_STREAM_DEFAULT_PRIO 16   /* RFC 7540, Section 5.3.5 */
6
7
8struct lsquic_stream_if;
9struct lsquic_stream_ctx;
10struct lsquic_conn_public;
11struct stream_frame;
12struct uncompressed_headers;
13enum enc_level;
14enum swtp_status;
15struct frame_gen_ctx;
16struct data_frame;
17enum quic_frame_type;
18struct push_promise;
19union hblock_ctx;
20
21TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream);
22
23
24#ifndef LSQUIC_KEEP_STREAM_HISTORY
25#   ifdef NDEBUG
26#       define LSQUIC_KEEP_STREAM_HISTORY 0
27#   else
28#       define LSQUIC_KEEP_STREAM_HISTORY 1
29#   endif
30#endif
31
32
33#if LSQUIC_KEEP_STREAM_HISTORY
34#define SM_HIST_BITS 6
35#define SM_HIST_IDX_MASK ((1 << SM_HIST_BITS) - 1)
36typedef unsigned char sm_hist_idx_t;
37#endif
38
39
40/*
41 *  +----------+----------------------------------+
42 *  | Low Bits | Stream Type                      |
43 *  +----------+----------------------------------+
44 *  | 0x0      | Client-Initiated, Bidirectional  |
45 *  |          |                                  |
46 *  | 0x1      | Server-Initiated, Bidirectional  |
47 *  |          |                                  |
48 *  | 0x2      | Client-Initiated, Unidirectional |
49 *  |          |                                  |
50 *  | 0x3      | Server-Initiated, Unidirectional |
51 *  +----------+----------------------------------+
52 */
53
54enum stream_id_type
55{
56    SIT_BIDI_CLIENT,
57    SIT_BIDI_SERVER,
58    SIT_UNI_CLIENT,
59    SIT_UNI_SERVER,
60    N_SITS
61};
62
63#define SIT_MASK (N_SITS - 1)
64
65#define SIT_SHIFT 2
66#define SD_SHIFT 1
67
68enum stream_dir { SD_BIDI, SD_UNI, N_SDS };
69
70
71struct stream_hq_frame
72{
73    STAILQ_ENTRY(stream_hq_frame)
74                        shf_next;
75    /* At which point in the stream (sm_payload) to insert the HQ frame. */
76    uint64_t            shf_off;
77    union {
78        /* Points to the frame if SHF_FIXED_SIZE is not set */
79        unsigned char  *frame_ptr;
80        /* If SHF_FIXED_SIZE is set, the size of the frame to follow.
81         * Non-fixed frame size gets calculated using sm_payload when they
82         * are closed.
83         */
84        size_t          frame_size;
85    }                   shf_u;
86#define shf_frame_ptr shf_u.frame_ptr
87#define shf_frame_size shf_u.frame_size
88    enum hq_frame_type  shf_frame_type:8;
89    enum shf_flags {
90        SHF_TWO_BYTES   = 1 << 0,   /* Use two byte to encode frame length */
91        SHF_FIXED_SIZE  = 1 << 1,   /* Payload size guaranteed */
92        SHF_ACTIVE      = 1 << 2,   /* On sm_hq_frames list */
93        SHF_WRITTEN     = 1 << 3,   /* Framing bytes have been packetized */
94        SHF_CC_PAID     = 1 << 4,   /* Paid connection cap */
95        SHF_PHANTOM     = 1 << 5,   /* Phantom frame headers are not written */
96    }                   shf_flags:8;
97};
98
99
100struct hq_filter
101{
102    struct varint_read2_state   hqfi_vint2_state;
103    /* No need to copy the values: use it directly */
104#define hqfi_left hqfi_vint2_state.vr2s_two
105#define hqfi_type hqfi_vint2_state.vr2s_one
106    struct varint_read_state    hqfi_vint1_state;
107#define hqfi_push_id hqfi_vint1_state.value
108    enum {
109        HQFI_FLAG_UNUSED_0      = 1 << 0,
110        HQFI_FLAG_ERROR         = 1 << 1,
111        HQFI_FLAG_BEGIN         = 1 << 2,
112        HQFI_FLAG_BLOCKED       = 1 << 3,
113    }                           hqfi_flags:8;
114    enum {
115        HQFI_STATE_FRAME_HEADER_BEGIN,
116        HQFI_STATE_FRAME_HEADER_CONTINUE,
117        HQFI_STATE_READING_PAYLOAD,
118        HQFI_STATE_PUSH_ID_BEGIN,
119        HQFI_STATE_PUSH_ID_CONTINUE,
120    }                           hqfi_state:8;
121    unsigned char               hqfi_hist_idx;
122#define MAX_HQFI_ENTRIES (sizeof(unsigned) * 8 / 3)
123    unsigned                    hqfi_hist_buf;
124};
125
126
127struct stream_filter_if
128{
129    int         (*sfi_readable)(struct lsquic_stream *);
130    size_t      (*sfi_filter_df)(struct lsquic_stream *, struct data_frame *);
131    void        (*sfi_decr_left)(struct lsquic_stream *, size_t);
132};
133
134
135/* These flags indicate which queues -- or other entities -- currently
136 * reference the stream.
137 */
138enum stream_q_flags
139{
140    /* read_streams: */
141    SMQF_WANT_READ    = 1 << 0,
142
143    /* write_streams: */
144#define SMQF_WRITE_Q_FLAGS (SMQF_WANT_FLUSH|SMQF_WANT_WRITE)
145    SMQF_WANT_WRITE   = 1 << 1,
146    SMQF_WANT_FLUSH   = 1 << 2,     /* Flush until sm_flush_to is hit */
147
148    /* There are more than one reason that a stream may be put onto
149     * connections's sending_streams queue.  Note that writing STREAM
150     * frames is done separately.
151     */
152#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED)
153    /* sending_streams: */
154    SMQF_SEND_WUF     = 1 << 3,     /* WUF: Window Update Frame */
155    SMQF_SEND_BLOCKED = 1 << 4,
156    SMQF_SEND_RST     = 1 << 5,     /* Error: want to send RST_STREAM */
157
158    /* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is
159     * the MAX_STREAM_DATA frame.  Define an alias for use in the IETF
160     * QUIC code:
161     */
162#define SMQF_SEND_MAX_STREAM_DATA SMQF_SEND_WUF
163
164#define SMQF_SERVICE_FLAGS (SMQF_CALL_ONCLOSE|SMQF_FREE_STREAM|SMQF_ABORT_CONN)
165    SMQF_CALL_ONCLOSE = 1 << 6,
166    SMQF_FREE_STREAM  = 1 << 7,
167    SMQF_ABORT_CONN   = 1 << 8,     /* Unrecoverable error occurred */
168
169    SMQF_QPACK_DEC    = 1 << 9,     /* QPACK decoder handler is holding a reference to this stream */
170};
171
172
173/* Stream behavior flags */
174enum stream_b_flags
175{
176    SMBF_SERVER       = 1 << 0,
177    SMBF_IETF         = 1 << 1,
178    SMBF_USE_HEADERS  = 1 << 2,
179    SMBF_CRYPTO       = 1 << 3,  /* Crypto stream: applies to both gQUIC and IETF QUIC */
180    SMBF_CRITICAL     = 1 << 4,  /* This is a critical stream */
181    SMBF_AUTOSWITCH   = 1 << 5,
182    SMBF_RW_ONCE      = 1 << 6,  /* When set, read/write events are dispatched once per call */
183    SMBF_CONN_LIMITED = 1 << 7,
184    SMBF_HEADERS      = 1 << 8,  /* Headers stream */
185    SMBF_VERIFY_CL    = 1 << 9,  /* Verify content-length (stored in sm_cont_len) */
186#define N_SMBF_FLAGS 10
187};
188
189
190enum stream_flags {
191    STREAM_FIN_RECVD    = 1 << 0,   /* Received STREAM frame with FIN bit set */
192    STREAM_RST_RECVD    = 1 << 1,   /* Received RST frame */
193    STREAM_LAST_WRITE_OK= 1 << 2,   /* Used to break out of write event dispatch loop */
194    STREAM_U_READ_DONE  = 1 << 3,   /* User is done reading (shutdown was called) */
195    STREAM_U_WRITE_DONE = 1 << 4,   /* User is done writing (shutdown was called) */
196    STREAM_FIN_SENT     = 1 << 5,   /* FIN was written to network */
197    STREAM_RST_SENT     = 1 << 6,   /* RST_STREAM was written to network */
198    STREAM_FIN_REACHED  = 1 << 7,   /* User read data up to FIN */
199    STREAM_FINISHED     = 1 << 8,   /* Stream is finished */
200    STREAM_ONCLOSE_DONE = 1 << 9,   /* on_close has been called */
201    STREAM_CACHED_FRAME = 1 << 10,  /* If set, sm_has_frame can be used */
202    STREAM_HEADERS_SENT = 1 << 11,
203    STREAM_HAVE_UH      = 1 << 12,  /* Have uncompressed headers */
204    STREAM_ENCODER_DEP  = 1 << 13,  /* Encoder dependency: flush (IETF only) */
205    STREAM_HEAD_IN_FIN  = 1 << 14,  /* Incoming headers has FIN bit set */
206    STREAM_FRAMES_ELIDED= 1 << 15,
207    STREAM_FORCE_FINISH = 1 << 16,  /* Replaces FIN sent and received */
208    STREAM_ONNEW_DONE   = 1 << 17,  /* on_new_stream has been called */
209    STREAM_PUSHING      = 1 << 18,
210    STREAM_NOPUSH       = 1 << 19,  /* Disallow further push promises */
211    STREAM_UNUSED20     = 1 << 20,  /* Unused */
212    STREAM_UNUSED21     = 1 << 21,  /* Unused */
213    STREAM_RST_ACKED    = 1 << 22,  /* Packet containing RST has been acked */
214    STREAM_BLOCKED_SENT = 1 << 23,  /* Stays set once a STREAM_BLOCKED frame is sent */
215    STREAM_RST_READ     = 1 << 24,  /* User code collected the error */
216    STREAM_DATA_RECVD   = 1 << 25,  /* Cache stream state calculation */
217    STREAM_UNUSED26     = 1 << 26,  /* Unused */
218    STREAM_HDRS_FLUSHED = 1 << 27,  /* Only used in buffered packets mode */
219    STREAM_SS_RECVD     = 1 << 28,  /* Received STOP_SENDING frame */
220    STREAM_DELAYED_SW   = 1 << 29,  /* Delayed shutdown_write call */
221};
222
223
224/* By keeping this number low, we make sure that the code to allocate HQ
225 * frames dynamically gets exercised whenever push promises are sent.
226 */
227#define NUM_ALLOCED_HQ_FRAMES 2
228
229
230struct lsquic_stream
231{
232    struct lsquic_hash_elem         sm_hash_el;
233    lsquic_stream_id_t              id;
234    enum stream_flags               stream_flags;
235    enum stream_b_flags             sm_bflags;
236    enum stream_q_flags             sm_qflags;
237    unsigned                        n_unacked;
238
239    const struct lsquic_stream_if  *stream_if;
240    struct lsquic_stream_ctx       *st_ctx;
241    struct lsquic_conn_public      *conn_pub;
242    TAILQ_ENTRY(lsquic_stream)      next_send_stream, next_read_stream,
243                                        next_write_stream, next_service_stream,
244                                        next_prio_stream;
245
246    uint64_t                        tosend_off;
247    uint64_t                        sm_payload;     /* Not counting HQ frames */
248    uint64_t                        max_send_off;
249    uint64_t                        sm_last_recv_off;
250    uint64_t                        error_code;
251
252    /* From the network, we get frames, which we keep on a list ordered
253     * by offset.
254     */
255    struct data_in                 *data_in;
256    uint64_t                        read_offset;
257    lsquic_sfcw_t                   fc;
258
259    /* List of active HQ frames */
260    STAILQ_HEAD(, stream_hq_frame)  sm_hq_frames;
261
262    /* For efficiency, several frames are allocated as part of the stream
263     * itself.  If more frames are needed, they are allocated.
264     */
265    struct stream_hq_frame          sm_hq_frame_arr[NUM_ALLOCED_HQ_FRAMES];
266
267    struct hq_filter                sm_hq_filter;
268
269    /* We can safely use sm_hq_filter */
270#define sm_uni_type_state sm_hq_filter.hqfi_vint2_state.vr2s_varint_state
271
272    /** If @ref SMQF_WANT_FLUSH is set, flush until this offset. */
273    uint64_t                        sm_flush_to;
274
275    /**
276     * If @ref SMQF_WANT_FLUSH is set, this indicates payload offset
277     * to flush to.  Used to adjust @ref sm_flush_to when H3 frame
278     * size grows.
279     */
280    uint64_t                        sm_flush_to_payload;
281
282    /* Last offset sent in BLOCKED frame */
283    uint64_t                        blocked_off;
284
285    struct uncompressed_headers    *uh,
286                                   *push_req;
287    union hblock_ctx               *sm_hblock_ctx;
288
289    unsigned char                  *sm_buf;
290    void                           *sm_onnew_arg;
291
292    unsigned char                  *sm_header_block;
293    uint64_t                        sm_hb_compl;
294
295    /* Valid if STREAM_FIN_RECVD is set: */
296    uint64_t                        sm_fin_off;
297
298    /* A stream may be generating STREAM or CRYPTO frames */
299    size_t                        (*sm_frame_header_sz)(
300                                        const struct lsquic_stream *, unsigned);
301    enum swtp_status              (*sm_write_to_packet)(struct frame_gen_ctx *,
302                                                const size_t);
303    size_t                        (*sm_write_avail)(struct lsquic_stream *);
304    int                           (*sm_readable)(struct lsquic_stream *);
305
306    /* This element is optional */
307    const struct stream_filter_if  *sm_sfi;
308
309    /* sm_promise and sm_promises are never used at the same time and can
310     * be combined into a union should space in this struct become tight.
311     */
312    /* Push promise that engendered this push stream */
313    struct push_promise            *sm_promise;
314
315    /* Push promises sent on this stream */
316    SLIST_HEAD(, push_promise)      sm_promises;
317
318    uint64_t                        sm_last_frame_off;
319
320    /* Content length specified in incoming `content-length' header field.
321     * Used to verify size of DATA frames.
322     */
323    unsigned long long              sm_cont_len;
324    /* Sum of bytes in all incoming DATA frames.  Used for verification. */
325    unsigned long long              sm_data_in;
326
327    /* How much data there is in sm_header_block and how much of it has been
328     * sent:
329     */
330    unsigned                        sm_hblock_sz,
331                                    sm_hblock_off;
332
333    unsigned short                  sm_n_buffered;  /* Amount of data in sm_buf */
334    unsigned short                  sm_n_allocated;  /* Size of sm_buf */
335
336    unsigned char                   sm_priority;  /* 0: high; 255: low */
337    unsigned char                   sm_enc_level;
338    enum {
339        SSHS_BEGIN,         /* Nothing has happened yet */
340        SSHS_ENC_SENDING,   /* Sending encoder stream data */
341        SSHS_HBLOCK_SENDING,/* Sending header block data */
342    }                               sm_send_headers_state:8;
343    signed char                     sm_saved_want_write;
344    signed char                     sm_has_frame;
345
346#if LSQUIC_KEEP_STREAM_HISTORY
347    sm_hist_idx_t                   sm_hist_idx;
348#endif
349
350#if LSQUIC_KEEP_STREAM_HISTORY
351    /* Stream history: see enum stream_history_event */
352    unsigned char                   sm_hist_buf[ 1 << SM_HIST_BITS ];
353#endif
354};
355
356
357enum stream_ctor_flags
358{
359    SCF_CALL_ON_NEW   = (1 << (N_SMBF_FLAGS + 0)), /* Call on_new_stream() immediately */
360    SCF_USE_DI_HASH   = (1 << (N_SMBF_FLAGS + 1)), /* Use hash-based data input.  If not set,
361                                   * the nocopy data input is used.
362                                   */
363    SCF_CRYPTO_FRAMES = (1 << (N_SMBF_FLAGS + 2)), /* Write CRYPTO frames */
364    SCF_DI_AUTOSWITCH = SMBF_AUTOSWITCH, /* Automatically switch between nocopy
365                                   * and hash-based to data input for optimal
366                                   * performance.
367                                   */
368    SCF_DISP_RW_ONCE  = SMBF_RW_ONCE,
369    SCF_CRITICAL      = SMBF_CRITICAL, /* This is a critical stream */
370    SCF_IETF          = SMBF_IETF,
371    SCF_HTTP          = SMBF_USE_HEADERS,
372    SCF_CRYPTO        = SMBF_CRYPTO,
373    SCF_HEADERS       = SMBF_HEADERS,
374};
375
376
377lsquic_stream_t *
378lsquic_stream_new (lsquic_stream_id_t id, struct lsquic_conn_public *,
379                   const struct lsquic_stream_if *, void *stream_if_ctx,
380                   unsigned initial_sfrw, uint64_t initial_send_off,
381                   enum stream_ctor_flags);
382
383struct lsquic_stream *
384lsquic_stream_new_crypto (enum enc_level,
385        struct lsquic_conn_public *conn_pub,
386        const struct lsquic_stream_if *stream_if, void *stream_if_ctx,
387        enum stream_ctor_flags ctor_flags);
388
389void
390lsquic_stream_call_on_new (lsquic_stream_t *);
391
392void
393lsquic_stream_destroy (lsquic_stream_t *);
394
395/* Any of these flags will cause user-facing read and write and
396 * shutdown calls to return an error.  They also make the stream
397 * both readable and writeable, as we want the user to collect
398 * the error.
399 */
400#define lsquic_stream_is_reset(stream) \
401    (((stream)->stream_flags & \
402                    (STREAM_RST_RECVD|STREAM_RST_SENT|STREAM_SS_RECVD)) \
403        || ((stream)->sm_qflags & SMQF_SEND_RST))
404
405/* Data that from the network gets inserted into the stream using
406 * lsquic_stream_frame_in() function.  Returns 0 on success, -1 on
407 * failure.  The latter may be caused by flow control violation or
408 * invalid stream frame data, e.g. overlapping segments.
409 *
410 * Note that the caller does gives up control of `frame' no matter
411 * what this function returns.
412 *
413 * This data is read by the user using lsquic_stream_read() function.
414 */
415int
416lsquic_stream_frame_in (lsquic_stream_t *, struct stream_frame *frame);
417
418/* Only one (at least for now) uncompressed header structure is allowed to be
419 * passed in, and only in HTTP mode.
420 */
421int
422lsquic_stream_uh_in (lsquic_stream_t *, struct uncompressed_headers *);
423
424void
425lsquic_stream_push_req (lsquic_stream_t *,
426                        struct uncompressed_headers *push_req);
427
428int
429lsquic_stream_rst_in (lsquic_stream_t *, uint64_t offset, uint64_t error_code);
430
431void
432lsquic_stream_stop_sending_in (struct lsquic_stream *, uint64_t error_code);
433
434ssize_t
435lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len);
436
437uint64_t
438lsquic_stream_read_offset (const lsquic_stream_t *stream);
439
440/* Return true if we sent all available data to the network and write
441 * end of the stream was closed.
442 */
443int
444lsquic_stream_tosend_fin (const lsquic_stream_t *stream);
445
446/* Data to be sent out to the network is written using lsquic_stream_write().
447 */
448ssize_t
449lsquic_stream_write (lsquic_stream_t *stream, const void *buf, size_t len);
450
451void
452lsquic_stream_window_update (lsquic_stream_t *stream, uint64_t offset);
453
454int
455lsquic_stream_set_max_send_off (lsquic_stream_t *stream, uint64_t offset);
456
457/* The caller should only call this function if SMQF_SEND_WUF is set and
458 * it must generate a window update frame using this value.
459 */
460uint64_t
461lsquic_stream_fc_recv_off (lsquic_stream_t *stream);
462
463void
464lsquic_stream_peer_blocked (struct lsquic_stream *, uint64_t);
465
466void
467lsquic_stream_peer_blocked_gquic (struct lsquic_stream *);
468
469void
470lsquic_stream_dispatch_read_events (lsquic_stream_t *);
471
472void
473lsquic_stream_dispatch_write_events (lsquic_stream_t *);
474
475void
476lsquic_stream_blocked_frame_sent (lsquic_stream_t *);
477
478void
479lsquic_stream_rst_frame_sent (lsquic_stream_t *);
480
481void
482lsquic_stream_stream_frame_sent (lsquic_stream_t *);
483
484void
485lsquic_stream_reset (lsquic_stream_t *, uint64_t error_code);
486
487void
488lsquic_stream_reset_ext (lsquic_stream_t *, uint64_t error_code, int close);
489
490void
491lsquic_stream_call_on_close (lsquic_stream_t *);
492
493void
494lsquic_stream_shutdown_internal (lsquic_stream_t *);
495
496void
497lsquic_stream_received_goaway (lsquic_stream_t *);
498
499void
500lsquic_stream_acked (struct lsquic_stream *, enum quic_frame_type);
501
502#define lsquic_stream_is_closed(s)                                          \
503    (((s)->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))         \
504                            == (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE))
505int
506lsquic_stream_update_sfcw (lsquic_stream_t *, uint64_t max_off);
507
508int
509lsquic_stream_set_priority_internal (lsquic_stream_t *, unsigned priority);
510
511#define lsquic_stream_is_critical(s) ((s)->sm_bflags & SMBF_CRITICAL)
512
513#define lsquic_stream_is_crypto(s) ((s)->sm_bflags & SMBF_CRYPTO)
514
515size_t
516lsquic_stream_mem_used (const struct lsquic_stream *);
517
518const lsquic_cid_t *
519lsquic_stream_cid (const struct lsquic_stream *);
520
521#define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0)
522
523int
524lsquic_stream_readable (struct lsquic_stream *);
525
526size_t
527lsquic_stream_write_avail (struct lsquic_stream *);
528
529void
530lsquic_stream_dump_state (const struct lsquic_stream *);
531
532#ifndef NDEBUG
533size_t
534lsquic_stream_flush_threshold (const struct lsquic_stream *, unsigned);
535#endif
536
537#define crypto_level(stream) (~0ULL - (stream)->id)
538
539void
540lsquic_stream_set_stream_if (struct lsquic_stream *,
541                   const struct lsquic_stream_if *, void *stream_if_ctx);
542
543uint64_t
544lsquic_stream_combined_send_off (const struct lsquic_stream *);
545
546/* [draft-ietf-quic-transport-16] Section 3.1 */
547enum stream_state_sending
548{
549    SSS_READY,
550    SSS_SEND,
551    SSS_DATA_SENT,
552    SSS_RESET_SENT,
553    SSS_DATA_RECVD,
554    SSS_RESET_RECVD,
555};
556
557extern const char *const lsquic_sss2str[];
558
559enum stream_state_sending
560lsquic_stream_sending_state (const struct lsquic_stream *);
561
562/* [draft-ietf-quic-transport-16] Section 3.2 */
563enum stream_state_receiving
564{
565    SSR_RECV,
566    SSR_SIZE_KNOWN,
567    SSR_DATA_RECVD,
568    SSR_RESET_RECVD,
569    SSR_DATA_READ,
570    SSR_RESET_READ,
571};
572
573extern const char *const lsquic_ssr2str[];
574
575enum stream_state_receiving
576lsquic_stream_receiving_state (struct lsquic_stream *);
577
578uint64_t
579lsquic_stream_fc_recv_off_const (const struct lsquic_stream *);
580
581void
582lsquic_stream_max_stream_data_sent (struct lsquic_stream *);
583
584void
585lsquic_stream_qdec_unblocked (struct lsquic_stream *);
586
587int
588lsquic_stream_can_push (const struct lsquic_stream *);
589
590int
591lsquic_stream_push_promise (struct lsquic_stream *, struct push_promise *);
592
593void
594lsquic_stream_force_finish (struct lsquic_stream *);
595
596int
597lsquic_stream_header_is_pp (const struct lsquic_stream *);
598
599int
600lsquic_stream_header_is_trailer (const struct lsquic_stream *);
601
602int
603lsquic_stream_verify_len (struct lsquic_stream *, unsigned long long);
604
605#define lsquic_stream_is_blocked(stream_) ((stream_)->blocked_off && \
606                        (stream_)->blocked_off == (stream_)->max_send_off)
607
608#endif
609