lsquic_spi.c revision cd35ff02
1/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_spi.c - implementation of Stream Priority Iterator.
4 */
5
6#include <assert.h>
7#include <inttypes.h>
8#include <stdint.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12#include <sys/types.h>
13#ifdef WIN32
14#include <vc_compat.h>
15#endif
16
17#include "lsquic_types.h"
18#include "lsquic_int_types.h"
19#include "lsquic_sfcw.h"
20#include "lsquic_varint.h"
21#include "lsquic_hq.h"
22#include "lsquic_hash.h"
23#include "lsquic_stream.h"
24#include "lsquic_spi.h"
25
26#define LSQUIC_LOGGER_MODULE LSQLM_SPI
27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn)
28#include "lsquic_logger.h"
29
30#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__)
31
32#define NEXT_STREAM(stream, off) \
33    (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off)))
34
35
36static void
37add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream)
38{
39    unsigned set, bit;
40    set = stream->sm_priority >> 6;
41    bit = stream->sm_priority & 0x3F;
42    if (!(iter->spi_set[set] & (1ULL << bit)))
43    {
44        iter->spi_set[set] |= 1ULL << bit;
45        TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]);
46    }
47    TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ],
48                                                stream, next_prio_stream);
49    ++iter->spi_n_added;
50}
51
52
53void
54lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first,
55         struct lsquic_stream *last, uintptr_t next_ptr_offset,
56         const struct lsquic_conn *conn,
57         const char *name,
58         int (*filter)(void *filter_ctx, struct lsquic_stream *),
59         void *filter_ctx)
60{
61    struct lsquic_stream *stream;
62    unsigned count;
63
64    iter->spi_conn          = conn;
65    iter->spi_name          = name ? name : "UNSET";
66    iter->spi_set[0]        = 0;
67    iter->spi_set[1]        = 0;
68    iter->spi_set[2]        = 0;
69    iter->spi_set[3]        = 0;
70    iter->spi_cur_prio      = 0;
71    iter->spi_next_stream   = NULL;
72    iter->spi_n_added       = 0;
73
74    stream = first;
75    count = 0;
76
77    if (filter)
78        while (1)
79        {
80            if (filter(filter_ctx, stream))
81            {
82                add_stream_to_spi(iter, stream);
83                ++count;
84            }
85            if (stream == last)
86                break;
87            stream = NEXT_STREAM(stream, next_ptr_offset);
88        }
89    else
90        while (1)
91        {
92            add_stream_to_spi(iter, stream);
93            ++count;
94            if (stream == last)
95                break;
96            stream = NEXT_STREAM(stream, next_ptr_offset);
97        }
98
99    if (count > 2)
100        SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64
101            ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0],
102            iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]);
103}
104
105
106static int
107find_and_set_lowest_priority (struct stream_prio_iter *iter)
108{
109    unsigned set, prio;
110    uint64_t mask;
111
112    for (set = 0, prio = 0; set < 4; ++set, prio += 64)
113        if (iter->spi_set[ set ])
114            break;
115
116    if (set >= 4)
117    {
118        //SPI_DEBUG("%s: cannot find any", __func__);
119        return -1;
120    }
121
122    mask = iter->spi_set[set];
123    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
124    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
125    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
126    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
127    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
128    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
129
130#ifndef NDEBUG
131    unsigned bit;
132    set = prio >> 6;
133    bit = prio & 0x3F;
134    assert(iter->spi_set[ set ] & (1ULL << bit));
135#endif
136
137    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
138    iter->spi_cur_prio = (unsigned char) prio;
139    return 0;
140}
141
142
143static int
144find_and_set_next_priority (struct stream_prio_iter *iter)
145{
146    unsigned set, bit, prio;
147    uint64_t mask;
148
149    /* Examine values in the same set first */
150    set = iter->spi_cur_prio >> 6;
151    bit = iter->spi_cur_prio & 0x3F;
152    prio = 64 * set;
153
154    if (bit < 63)
155    {
156        mask = iter->spi_set[set];
157        mask &= ~((1ULL << (bit + 1)) - 1);
158        if (mask)
159            goto calc_priority;
160    }
161
162    ++set;
163    prio += 64;
164    for (; set < 4; ++set, prio += 64)
165        if (iter->spi_set[ set ])
166            break;
167
168    if (set >= 4)
169    {
170        //SPI_DEBUG("%s: cannot find any", __func__);
171        return -1;
172    }
173
174    mask = iter->spi_set[set];
175
176  calc_priority:
177    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
178    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
179    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
180    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
181    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
182    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
183
184#ifndef NDEBUG
185    set = prio >> 6;
186    bit = prio & 0x3F;
187    assert(iter->spi_set[ set ] & (1ULL << bit));
188#endif
189
190    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
191    iter->spi_cur_prio = (unsigned char) prio;
192    return 0;
193}
194
195
196lsquic_stream_t *
197lsquic_spi_first (struct stream_prio_iter *iter)
198{
199    lsquic_stream_t *stream;
200    unsigned set, bit;
201
202    iter->spi_cur_prio = 0;
203    set = iter->spi_cur_prio >> 6;
204    bit = iter->spi_cur_prio & 0x3F;
205
206    if (!(iter->spi_set[set] & (1ULL << bit)))
207    {
208        if (0 != find_and_set_lowest_priority(iter))
209        {
210            SPI_DEBUG("%s: return NULL", __func__);
211            return NULL;
212        }
213    }
214
215    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
216    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
217    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
218        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
219                                            stream->id, iter->spi_cur_prio);
220    return stream;
221}
222
223
224lsquic_stream_t *
225lsquic_spi_next (struct stream_prio_iter *iter)
226{
227    lsquic_stream_t *stream;
228
229    stream = iter->spi_next_stream;
230    if (stream)
231    {
232        iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
233        if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
234            SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
235                                            stream->id, iter->spi_cur_prio);
236        return stream;
237    }
238
239    if (0 != find_and_set_next_priority(iter))
240    {
241        //SPI_DEBUG("%s: return NULL", __func__);
242        return NULL;
243    }
244
245    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
246    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
247
248    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
249        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
250                                            stream->id, iter->spi_cur_prio);
251    return stream;
252}
253
254
255static int
256have_non_critical_streams (const struct stream_prio_iter *iter)
257{
258    const struct lsquic_stream *stream;
259    TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ],
260                                                        next_prio_stream)
261        if (!lsquic_stream_is_critical(stream))
262            return 1;
263    return 0;
264}
265
266
267#if __GNUC__
268#   define popcount __builtin_popcountll
269#else
270static int
271popcount (unsigned long long v)
272{
273    int count, i;
274    for (i = 0, count = 0; i < sizeof(v) * 8; ++i)
275        if (v & (1 << i))
276            ++count;
277    return count;
278}
279
280
281#endif
282
283
284static int
285spi_has_more_than_one_queue (const struct stream_prio_iter *iter)
286{
287    unsigned i;
288    int count;
289
290    if (iter->spi_n_added < 2)
291        return 0;
292
293    count = 0;
294    for (i = 0; i < sizeof(iter->spi_set) / sizeof(iter->spi_set[0]); ++i)
295    {
296        count += popcount(iter->spi_set[i]);
297        if (count > 1)
298            return 1;
299    }
300
301    return 0;
302}
303
304
305static void
306spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high)
307{
308    uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ];
309    unsigned bit, set, n;
310
311    if (!spi_has_more_than_one_queue(iter))
312        return;
313
314    memset(new_set, 0, sizeof(new_set));
315
316    find_and_set_lowest_priority(iter);
317    set = iter->spi_cur_prio >> 6;
318    bit = iter->spi_cur_prio & 0x3F;
319    new_set[set] |= 1ULL << bit;
320
321    if (!have_non_critical_streams(iter))
322    {
323        ++iter->spi_cur_prio;
324        find_and_set_lowest_priority(iter);
325        set = iter->spi_cur_prio >> 6;
326        bit = iter->spi_cur_prio & 0x3F;
327        new_set[set] |= 1ULL << bit;
328    }
329
330    for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n)
331        if (drop_high)
332            iter->spi_set[n] &= ~new_set[n];
333        else
334            iter->spi_set[n] = new_set[n];
335}
336
337
338void
339lsquic_spi_drop_high (struct stream_prio_iter *iter)
340{
341    spi_drop_high_or_non_high(iter, 1);
342}
343
344
345void
346lsquic_spi_drop_non_high (struct stream_prio_iter *iter)
347{
348    spi_drop_high_or_non_high(iter, 0);
349}
350