lsquic_spi.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc.  See LICENSE. */
2/*
3 * lsquic_spi.c - implementation of Stream Priority Iterator.
4 */
5
6#include <assert.h>
7#include <inttypes.h>
8#include <stdint.h>
9#include <stdlib.h>
10#include <string.h>
11#include <sys/queue.h>
12#include <sys/types.h>
13#ifdef WIN32
14#include <vc_compat.h>
15#endif
16
17#include "lsquic_types.h"
18#include "lsquic_int_types.h"
19#include "lsquic_sfcw.h"
20#include "lsquic_varint.h"
21#include "lsquic_hq.h"
22#include "lsquic_hash.h"
23#include "lsquic_stream.h"
24#include "lsquic_spi.h"
25
26#define LSQUIC_LOGGER_MODULE LSQLM_SPI
27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn)
28#include "lsquic_logger.h"
29
30#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__)
31
32#define NEXT_STREAM(stream, off) \
33    (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off)))
34
35
36static void
37add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream)
38{
39    unsigned set, bit;
40    set = stream->sm_priority >> 6;
41    bit = stream->sm_priority & 0x3F;
42    if (!(iter->spi_set[set] & (1ULL << bit)))
43    {
44        iter->spi_set[set] |= 1ULL << bit;
45        TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]);
46    }
47    TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ],
48                                                stream, next_prio_stream);
49    ++iter->spi_n_added;
50}
51
52
53void
54lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first,
55         struct lsquic_stream *last, uintptr_t next_ptr_offset,
56         enum stream_q_flags onlist_mask, const struct lsquic_conn *conn,
57         const char *name,
58         int (*filter)(void *filter_ctx, struct lsquic_stream *),
59         void *filter_ctx)
60{
61    struct lsquic_stream *stream;
62    unsigned count;
63
64    iter->spi_conn          = conn;
65    iter->spi_name          = name ? name : "UNSET";
66    iter->spi_set[0]        = 0;
67    iter->spi_set[1]        = 0;
68    iter->spi_set[2]        = 0;
69    iter->spi_set[3]        = 0;
70    iter->spi_onlist_mask   = onlist_mask;
71    iter->spi_cur_prio      = 0;
72    iter->spi_prev_stream   = NULL;
73    iter->spi_next_stream   = NULL;
74    iter->spi_n_added       = 0;
75
76    stream = first;
77    count = 0;
78
79    if (filter)
80        while (1)
81        {
82            if (filter(filter_ctx, stream))
83            {
84                add_stream_to_spi(iter, stream);
85                ++count;
86            }
87            if (stream == last)
88                break;
89            stream = NEXT_STREAM(stream, next_ptr_offset);
90        }
91    else
92        while (1)
93        {
94            add_stream_to_spi(iter, stream);
95            ++count;
96            if (stream == last)
97                break;
98            stream = NEXT_STREAM(stream, next_ptr_offset);
99        }
100
101    if (count > 2)
102        SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64
103            ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0],
104            iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]);
105}
106
107
108static int
109find_and_set_lowest_priority (struct stream_prio_iter *iter)
110{
111    unsigned set, prio;
112    uint64_t mask;
113
114    for (set = 0, prio = 0; set < 4; ++set, prio += 64)
115        if (iter->spi_set[ set ])
116            break;
117
118    if (set >= 4)
119    {
120        //SPI_DEBUG("%s: cannot find any", __func__);
121        return -1;
122    }
123
124    mask = iter->spi_set[set];
125    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
126    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
127    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
128    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
129    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
130    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
131
132#ifndef NDEBUG
133    unsigned bit;
134    set = prio >> 6;
135    bit = prio & 0x3F;
136    assert(iter->spi_set[ set ] & (1ULL << bit));
137#endif
138
139    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
140    iter->spi_cur_prio = (unsigned char) prio;
141    return 0;
142}
143
144
145static int
146find_and_set_next_priority (struct stream_prio_iter *iter)
147{
148    unsigned set, bit, prio;
149    uint64_t mask;
150
151    /* Examine values in the same set first */
152    set = iter->spi_cur_prio >> 6;
153    bit = iter->spi_cur_prio & 0x3F;
154    prio = 64 * set;
155
156    if (bit < 63)
157    {
158        mask = iter->spi_set[set];
159        mask &= ~((1ULL << (bit + 1)) - 1);
160        if (mask)
161            goto calc_priority;
162    }
163
164    ++set;
165    prio += 64;
166    for (; set < 4; ++set, prio += 64)
167        if (iter->spi_set[ set ])
168            break;
169
170    if (set >= 4)
171    {
172        //SPI_DEBUG("%s: cannot find any", __func__);
173        return -1;
174    }
175
176    mask = iter->spi_set[set];
177
178  calc_priority:
179    if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; }
180    if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; }
181    if (!(mask & ((1ULL <<  8) - 1))) { prio +=  8; mask >>=  8; }
182    if (!(mask & ((1ULL <<  4) - 1))) { prio +=  4; mask >>=  4; }
183    if (!(mask & ((1ULL <<  2) - 1))) { prio +=  2; mask >>=  2; }
184    if (!(mask & ((1ULL <<  1) - 1))) { prio +=  1;              }
185
186#ifndef NDEBUG
187    set = prio >> 6;
188    bit = prio & 0x3F;
189    assert(iter->spi_set[ set ] & (1ULL << bit));
190#endif
191
192    SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio);
193    iter->spi_cur_prio = (unsigned char) prio;
194    return 0;
195}
196
197
198/* Each stream returned by the iterator is processed in some fashion.  If,
199 * as a result of this, the stream gets taken off the original list, we
200 * have to follow suit and remove it from the iterator's set of streams.
201 */
202static void
203maybe_evict_prev (struct stream_prio_iter *iter)
204{
205    unsigned set, bit;
206
207    if (0 == (iter->spi_prev_stream->sm_qflags & iter->spi_onlist_mask))
208    {
209        SPI_DEBUG("evict stream %"PRIu64, iter->spi_prev_stream->id);
210        TAILQ_REMOVE(&iter->spi_streams[ iter->spi_prev_prio ],
211                                    iter->spi_prev_stream, next_prio_stream);
212        if (TAILQ_EMPTY(&iter->spi_streams[ iter->spi_prev_prio ]))
213        {
214            set = iter->spi_prev_prio >> 6;
215            bit = iter->spi_prev_prio & 0x3F;
216            iter->spi_set[ set ] &= ~(1ULL << bit);
217            SPI_DEBUG("priority %u now has no elements", iter->spi_prev_prio);
218        }
219        iter->spi_prev_stream = NULL;
220    }
221}
222
223
224lsquic_stream_t *
225lsquic_spi_first (struct stream_prio_iter *iter)
226{
227    lsquic_stream_t *stream;
228    unsigned set, bit;
229
230    if (iter->spi_prev_stream)
231        maybe_evict_prev(iter);
232
233    iter->spi_cur_prio = 0;
234    set = iter->spi_cur_prio >> 6;
235    bit = iter->spi_cur_prio & 0x3F;
236
237    if (!(iter->spi_set[set] & (1ULL << bit)))
238    {
239        if (0 != find_and_set_lowest_priority(iter))
240        {
241            SPI_DEBUG("%s: return NULL", __func__);
242            return NULL;
243        }
244    }
245
246    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
247    iter->spi_prev_prio   = iter->spi_cur_prio;
248    iter->spi_prev_stream = stream;
249    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
250    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
251        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
252                                            stream->id, iter->spi_cur_prio);
253    return stream;
254}
255
256
257lsquic_stream_t *
258lsquic_spi_next (struct stream_prio_iter *iter)
259{
260    lsquic_stream_t *stream;
261
262    if (iter->spi_prev_stream)
263        maybe_evict_prev(iter);
264
265    stream = iter->spi_next_stream;
266    if (stream)
267    {
268        assert(iter->spi_prev_prio == iter->spi_cur_prio);
269        iter->spi_prev_stream = stream;
270        iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
271        if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
272            SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
273                                            stream->id, iter->spi_cur_prio);
274        return stream;
275    }
276
277    if (0 != find_and_set_next_priority(iter))
278    {
279        //SPI_DEBUG("%s: return NULL", __func__);
280        return NULL;
281    }
282
283    stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]);
284    iter->spi_prev_prio   = iter->spi_cur_prio;
285    iter->spi_prev_stream = stream;
286    iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream);
287
288    if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream))
289        SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__,
290                                            stream->id, iter->spi_cur_prio);
291    return stream;
292}
293
294
295static int
296have_non_critical_streams (const struct stream_prio_iter *iter)
297{
298    const struct lsquic_stream *stream;
299    TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ],
300                                                        next_prio_stream)
301        if (!lsquic_stream_is_critical(stream))
302            return 1;
303    return 0;
304}
305
306
307#if __GNUC__
308#   define popcount __builtin_popcountll
309#else
310static int
311popcount (unsigned long long v)
312{
313    int count, i;
314    for (i = 0, count = 0; i < sizeof(v) * 8; ++i)
315        if (v & (1 << i))
316            ++count;
317    return count;
318}
319
320
321#endif
322
323
324static int
325spi_has_more_than_one_queue (const struct stream_prio_iter *iter)
326{
327    unsigned i;
328    int count;
329
330    if (iter->spi_n_added < 2)
331        return 0;
332
333    count = 0;
334    for (i = 0; i < sizeof(iter->spi_set) / sizeof(iter->spi_set[0]); ++i)
335    {
336        count += popcount(iter->spi_set[i]);
337        if (count > 1)
338            return 1;
339    }
340
341    return 0;
342}
343
344
345static void
346spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high)
347{
348    uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ];
349    unsigned bit, set, n;
350
351    if (!spi_has_more_than_one_queue(iter))
352        return;
353
354    memset(new_set, 0, sizeof(new_set));
355
356    find_and_set_lowest_priority(iter);
357    set = iter->spi_cur_prio >> 6;
358    bit = iter->spi_cur_prio & 0x3F;
359    new_set[set] |= 1ULL << bit;
360
361    if (!have_non_critical_streams(iter))
362    {
363        ++iter->spi_cur_prio;
364        find_and_set_lowest_priority(iter);
365        set = iter->spi_cur_prio >> 6;
366        bit = iter->spi_cur_prio & 0x3F;
367        new_set[set] |= 1ULL << bit;
368    }
369
370    for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n)
371        if (drop_high)
372            iter->spi_set[n] &= ~new_set[n];
373        else
374            iter->spi_set[n] = new_set[n];
375}
376
377
378void
379lsquic_spi_drop_high (struct stream_prio_iter *iter)
380{
381    spi_drop_high_or_non_high(iter, 1);
382}
383
384
385void
386lsquic_spi_drop_non_high (struct stream_prio_iter *iter)
387{
388    spi_drop_high_or_non_high(iter, 0);
389}
390