lsquic_spi.c revision 5392f7a3
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_spi.c - implementation of Stream Priority Iterator.
4 */
5
6#include <assert.h>
7#include <inttypes.h>
8#include <stdint.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12#include <sys/types.h>
13#ifdef WIN32
14#include <vc_compat.h>
15#endif
16
17#include "lsquic_types.h"
18#include "lsquic_int_types.h"
19#include "lsquic_sfcw.h"
20#include "lsquic_varint.h"
21#include "lsquic_hq.h"
22#include "lsquic_hash.h"
23#include "lsquic_stream.h"
24#include "lsquic_spi.h"
25
26#define LSQUIC_LOGGER_MODULE LSQLM_SPI
27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn)
28#include "lsquic_logger.h"
29
30#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__)
31
32#define NEXT_STREAM(stream, off) \
33    (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off)))
34
35
36static void
37add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream)
38{
39    unsigned set, bit;
40    set = stream->sm_priority >> 6;
41    bit = stream->sm_priority & 0x3F;
42    if (!(iter->spi_set[set] & (1ULL << bit)))
43    {
44        iter->spi_set[set] |= 1ULL << bit;
45        TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]);
46    }
47    TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ],
48                                                stream, next_prio_stream);
49}
50
51
52void
53lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first,
54         struct lsquic_stream *last, uintptr_t next_ptr_offset,
55         enum stream_q_flags onlist_mask, const struct lsquic_conn *conn,
56         const char *name,
57         int (*filter)(void *filter_ctx, struct lsquic_stream *),
58         void *filter_ctx)
59{
60    struct lsquic_stream *stream;
61    unsigned count;
62
63    iter->spi_conn          = conn;
64    iter->spi_name          = name ? name : "UNSET";
65    iter->spi_set[0]        = 0;
66    iter->spi_set[1]        = 0;
67    iter->spi_set[2]        = 0;
68    iter->spi_set[3]        = 0;
69    iter->spi_onlist_mask   = onlist_mask;
70    iter->spi_cur_prio      = 0;
71    iter->spi_prev_stream   = NULL;
72    iter->spi_next_stream   = NULL;
73
74    stream = first;
75    count = 0;
76
77    if (filter)
78        while (1)
79        {
80            if (filter(filter_ctx, stream))
81            {
82                add_stream_to_spi(iter, stream);
83                ++count;
84            }
85            if (stream == last)
86                break;
87            stream = NEXT_STREAM(stream, next_ptr_offset);
88        }
89    else
90        while (1)
91        {
92            add_stream_to_spi(iter, stream);
93            ++count;
94            if (stream == last)
95                break;
96            stream = NEXT_STREAM(stream, next_ptr_offset);
97        }
98
99    if (count > 2)
100        SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64
101            ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0],
102            iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]);
103}
104
105
106static int
107find_and_set_lowest_priority (struct stream_prio_iter *iter)
108{
109    unsigned set, prio;
110    uint64_t mask;
111
112    for (set = 0, prio = 0; set < 4; ++set, prio += 64)
113        if (iter->spi_set[ set ])
114            break;
115
116    if (set >= 4)
117    {
118        //SPI_DEBUG("%s: cannot find any", __func__);
119        return -1;
120    }
121
122    mask = iter->spi_set[set];
123    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
124    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
125    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
126    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
127    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
128    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
129
130#ifndef NDEBUG
131    unsigned bit;
132    set = prio >> 6;
133    bit = prio & 0x3F;
134    assert(iter->spi_set[ set ] & (1ULL << bit));
135#endif
136
137    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
138    iter->spi_cur_prio = (unsigned char) prio;
139    return 0;
140}
141
142
143static int
144find_and_set_next_priority (struct stream_prio_iter *iter)
145{
146    unsigned set, bit, prio;
147    uint64_t mask;
148
149    /* Examine values in the same set first */
150    set = iter->spi_cur_prio >> 6;
151    bit = iter->spi_cur_prio & 0x3F;
152    prio = 64 * set;
153
154    if (bit < 63)
155    {
156        mask = iter->spi_set[set];
157        mask &= ~((1ULL << (bit + 1)) - 1);
158        if (mask)
159            goto calc_priority;
160    }
161
162    ++set;
163    prio += 64;
164    for (; set < 4; ++set, prio += 64)
165        if (iter->spi_set[ set ])
166            break;
167
168    if (set >= 4)
169    {
170        //SPI_DEBUG("%s: cannot find any", __func__);
171        return -1;
172    }
173
174    mask = iter->spi_set[set];
175
176  calc_priority:
177    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
178    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
179    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
180    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
181    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
182    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
183
184#ifndef NDEBUG
185    set = prio >> 6;
186    bit = prio & 0x3F;
187    assert(iter->spi_set[ set ] & (1ULL << bit));
188#endif
189
190    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
191    iter->spi_cur_prio = (unsigned char) prio;
192    return 0;
193}
194
195
196/* Each stream returned by the iterator is processed in some fashion.  If,
197 * as a result of this, the stream gets taken off the original list, we
198 * have to follow suit and remove it from the iterator's set of streams.
199 */
200static void
201maybe_evict_prev (struct stream_prio_iter *iter)
202{
203    unsigned set, bit;
204
205    if (0 == (iter->spi_prev_stream->sm_qflags & iter->spi_onlist_mask))
206    {
207        SPI_DEBUG("evict stream %"PRIu64, iter->spi_prev_stream->id);
208        TAILQ_REMOVE(&iter->spi_streams[ iter->spi_prev_prio ],
209                                    iter->spi_prev_stream, next_prio_stream);
210        if (TAILQ_EMPTY(&iter->spi_streams[ iter->spi_prev_prio ]))
211        {
212            set = iter->spi_prev_prio >> 6;
213            bit = iter->spi_prev_prio & 0x3F;
214            iter->spi_set[ set ] &= ~(1ULL << bit);
215            SPI_DEBUG("priority %u now has no elements", iter->spi_prev_prio);
216        }
217        iter->spi_prev_stream = NULL;
218    }
219}
220
221
222lsquic_stream_t *
223lsquic_spi_first (struct stream_prio_iter *iter)
224{
225    lsquic_stream_t *stream;
226    unsigned set, bit;
227
228    if (iter->spi_prev_stream)
229        maybe_evict_prev(iter);
230
231    iter->spi_cur_prio = 0;
232    set = iter->spi_cur_prio >> 6;
233    bit = iter->spi_cur_prio & 0x3F;
234
235    if (!(iter->spi_set[set] & (1ULL << bit)))
236    {
237        if (0 != find_and_set_lowest_priority(iter))
238        {
239            SPI_DEBUG("%s: return NULL", __func__);
240            return NULL;
241        }
242    }
243
244    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
245    iter->spi_prev_prio   = iter->spi_cur_prio;
246    iter->spi_prev_stream = stream;
247    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
248    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
249        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
250                                            stream->id, iter->spi_cur_prio);
251    return stream;
252}
253
254
255lsquic_stream_t *
256lsquic_spi_next (struct stream_prio_iter *iter)
257{
258    lsquic_stream_t *stream;
259
260    if (iter->spi_prev_stream)
261        maybe_evict_prev(iter);
262
263    stream = iter->spi_next_stream;
264    if (stream)
265    {
266        assert(iter->spi_prev_prio == iter->spi_cur_prio);
267        iter->spi_prev_stream = stream;
268        iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
269        if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
270            SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
271                                            stream->id, iter->spi_cur_prio);
272        return stream;
273    }
274
275    if (0 != find_and_set_next_priority(iter))
276    {
277        //SPI_DEBUG("%s: return NULL", __func__);
278        return NULL;
279    }
280
281    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
282    iter->spi_prev_prio   = iter->spi_cur_prio;
283    iter->spi_prev_stream = stream;
284    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
285
286    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
287        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
288                                            stream->id, iter->spi_cur_prio);
289    return stream;
290}
291
292
293static int
294have_non_critical_streams (const struct stream_prio_iter *iter)
295{
296    const struct lsquic_stream *stream;
297    TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ],
298                                                        next_prio_stream)
299        if (!lsquic_stream_is_critical(stream))
300            return 1;
301    return 0;
302}
303
304
305static void
306spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high)
307{
308    uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ];
309    unsigned bit, set, n;
310
311    memset(new_set, 0, sizeof(new_set));
312
313    find_and_set_lowest_priority(iter);
314    set = iter->spi_cur_prio >> 6;
315    bit = iter->spi_cur_prio & 0x3F;
316    new_set[set] |= 1ULL << bit;
317
318    if (!have_non_critical_streams(iter))
319    {
320        ++iter->spi_cur_prio;
321        find_and_set_lowest_priority(iter);
322        set = iter->spi_cur_prio >> 6;
323        bit = iter->spi_cur_prio & 0x3F;
324        new_set[set] |= 1ULL << bit;
325    }
326
327    for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n)
328        if (drop_high)
329            iter->spi_set[n] &= ~new_set[n];
330        else
331            iter->spi_set[n] = new_set[n];
332}
333
334
335void
336lsquic_spi_drop_high (struct stream_prio_iter *iter)
337{
338    spi_drop_high_or_non_high(iter, 1);
339}
340
341
342void
343lsquic_spi_drop_non_high (struct stream_prio_iter *iter)
344{
345    spi_drop_high_or_non_high(iter, 0);
346}
347