lsquic_spi.c revision 03fb9352
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_spi.c - implementation of Stream Priority Iterator. 4 */ 5 6#include <assert.h> 7#include <inttypes.h> 8#include <stdint.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12#include <sys/types.h> 13#ifdef WIN32 14#include <vc_compat.h> 15#endif 16 17#include "lsquic_types.h" 18#include "lsquic_int_types.h" 19#include "lsquic_sfcw.h" 20#include "lsquic_stream.h" 21#include "lsquic_spi.h" 22 23#define LSQUIC_LOGGER_MODULE LSQLM_SPI 24#define LSQUIC_LOG_CONN_ID iter->spi_cid 25#include "lsquic_logger.h" 26 27#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__) 28 29#define NEXT_STREAM(stream, off) \ 30 (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off))) 31 32 33static void 34add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream) 35{ 36 unsigned set, bit; 37 set = stream->sm_priority >> 6; 38 bit = stream->sm_priority & 0x3F; 39 if (!(iter->spi_set[set] & (1ULL << bit))) 40 { 41 iter->spi_set[set] |= 1ULL << bit; 42 TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]); 43 } 44 TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ], 45 stream, next_prio_stream); 46} 47 48 49void 50lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first, 51 struct lsquic_stream *last, uintptr_t next_ptr_offset, 52 enum stream_flags onlist_mask, lsquic_cid_t cid, const char *name, 53 int (*filter)(void *filter_ctx, struct lsquic_stream *), 54 void *filter_ctx) 55{ 56 struct lsquic_stream *stream; 57 unsigned count; 58 59 iter->spi_cid = cid; 60 iter->spi_name = name ? name : "UNSET"; 61 iter->spi_set[0] = 0; 62 iter->spi_set[1] = 0; 63 iter->spi_set[2] = 0; 64 iter->spi_set[3] = 0; 65 iter->spi_onlist_mask = onlist_mask; 66 iter->spi_cur_prio = 0; 67 iter->spi_prev_stream = NULL; 68 iter->spi_next_stream = NULL; 69 70 stream = first; 71 count = 0; 72 73 if (filter) 74 while (1) 75 { 76 if (filter(filter_ctx, stream)) 77 { 78 add_stream_to_spi(iter, stream); 79 ++count; 80 } 81 if (stream == last) 82 break; 83 stream = NEXT_STREAM(stream, next_ptr_offset); 84 } 85 else 86 while (1) 87 { 88 add_stream_to_spi(iter, stream); 89 ++count; 90 if (stream == last) 91 break; 92 stream = NEXT_STREAM(stream, next_ptr_offset); 93 } 94 95 if (count > 2) 96 SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64 97 ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0], 98 iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]); 99} 100 101 102static int 103find_and_set_lowest_priority (struct stream_prio_iter *iter) 104{ 105 unsigned set, prio; 106 uint64_t mask; 107 108 for (set = 0, prio = 0; set < 4; ++set, prio += 64) 109 if (iter->spi_set[ set ]) 110 break; 111 112 if (set == 4) 113 { 114 //SPI_DEBUG("%s: cannot find any", __func__); 115 return -1; 116 } 117 118 mask = iter->spi_set[set]; 119 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 120 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 121 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 122 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 123 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 124 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 125 126#ifndef NDEBUG 127 unsigned bit; 128 set = prio >> 6; 129 bit = prio & 0x3F; 130 assert(iter->spi_set[ set ] & (1ULL << bit)); 131#endif 132 133 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 134 iter->spi_cur_prio = (unsigned char) prio; 135 return 0; 136} 137 138 139static int 140find_and_set_next_priority (struct stream_prio_iter *iter) 141{ 142 unsigned set, bit, prio; 143 uint64_t mask; 144 145 /* Examine values in the same set first */ 146 set = iter->spi_cur_prio >> 6; 147 bit = iter->spi_cur_prio & 0x3F; 148 prio = 64 * set; 149 150 if (bit < 63) 151 { 152 mask = iter->spi_set[set]; 153 mask &= ~((1ULL << (bit + 1)) - 1); 154 if (mask) 155 goto calc_priority; 156 } 157 158 ++set; 159 prio += 64; 160 for (; set < 4; ++set, prio += 64) 161 if (iter->spi_set[ set ]) 162 break; 163 164 if (set >= 4) 165 { 166 //SPI_DEBUG("%s: cannot find any", __func__); 167 return -1; 168 } 169 170 mask = iter->spi_set[set]; 171 172 calc_priority: 173 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 174 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 175 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 176 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 177 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 178 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 179 180#ifndef NDEBUG 181 set = prio >> 6; 182 bit = prio & 0x3F; 183 assert(iter->spi_set[ set ] & (1ULL << bit)); 184#endif 185 186 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 187 iter->spi_cur_prio = (unsigned char) prio; 188 return 0; 189} 190 191 192/* Each stream returned by the iterator is processed in some fashion. If, 193 * as a result of this, the stream gets taken off the original list, we 194 * have to follow suit and remove it from the iterator's set of streams. 195 */ 196static void 197maybe_evict_prev (struct stream_prio_iter *iter) 198{ 199 unsigned set, bit; 200 201 if (0 == (iter->spi_prev_stream->stream_flags & iter->spi_onlist_mask)) 202 { 203 SPI_DEBUG("evict stream %u", iter->spi_prev_stream->id); 204 TAILQ_REMOVE(&iter->spi_streams[ iter->spi_prev_prio ], 205 iter->spi_prev_stream, next_prio_stream); 206 if (TAILQ_EMPTY(&iter->spi_streams[ iter->spi_prev_prio ])) 207 { 208 set = iter->spi_prev_prio >> 6; 209 bit = iter->spi_prev_prio & 0x3F; 210 iter->spi_set[ set ] &= ~(1ULL << bit); 211 SPI_DEBUG("priority %u now has no elements", iter->spi_prev_prio); 212 } 213 iter->spi_prev_stream = NULL; 214 } 215} 216 217 218lsquic_stream_t * 219lsquic_spi_first (struct stream_prio_iter *iter) 220{ 221 lsquic_stream_t *stream; 222 unsigned set, bit; 223 224 if (iter->spi_prev_stream) 225 maybe_evict_prev(iter); 226 227 iter->spi_cur_prio = 0; 228 set = iter->spi_cur_prio >> 6; 229 bit = iter->spi_cur_prio & 0x3F; 230 231 if (!(iter->spi_set[set] & (1ULL << bit))) 232 { 233 if (0 != find_and_set_lowest_priority(iter)) 234 { 235 SPI_DEBUG("%s: return NULL", __func__); 236 return NULL; 237 } 238 } 239 240 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 241 iter->spi_prev_prio = iter->spi_cur_prio; 242 iter->spi_prev_stream = stream; 243 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 244 if (stream->id != 1 && stream->id != 3) 245 SPI_DEBUG("%s: return stream %u, priority %u", __func__, stream->id, 246 iter->spi_cur_prio); 247 return stream; 248} 249 250 251lsquic_stream_t * 252lsquic_spi_next (struct stream_prio_iter *iter) 253{ 254 lsquic_stream_t *stream; 255 256 if (iter->spi_prev_stream) 257 maybe_evict_prev(iter); 258 259 stream = iter->spi_next_stream; 260 if (stream) 261 { 262 assert(iter->spi_prev_prio == iter->spi_cur_prio); 263 iter->spi_prev_stream = stream; 264 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 265 if (stream->id != 1 && stream->id != 3) 266 SPI_DEBUG("%s: return stream %u, priority %u", __func__, stream->id, 267 iter->spi_cur_prio); 268 return stream; 269 } 270 271 if (0 != find_and_set_next_priority(iter)) 272 { 273 //SPI_DEBUG("%s: return NULL", __func__); 274 return NULL; 275 } 276 277 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 278 iter->spi_prev_prio = iter->spi_cur_prio; 279 iter->spi_prev_stream = stream; 280 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 281 282 if (!lsquic_stream_is_critical(stream)) 283 SPI_DEBUG("%s: return stream %u, priority %u", __func__, stream->id, 284 iter->spi_cur_prio); 285 return stream; 286} 287 288 289static int 290have_non_critical_streams (const struct stream_prio_iter *iter) 291{ 292 const struct lsquic_stream *stream; 293 TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ], 294 next_prio_stream) 295 if (!lsquic_stream_is_critical(stream)) 296 return 1; 297 return 0; 298} 299 300 301static void 302spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high) 303{ 304 uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ]; 305 unsigned bit, set, n; 306 307 memset(new_set, 0, sizeof(new_set)); 308 309 find_and_set_lowest_priority(iter); 310 set = iter->spi_cur_prio >> 6; 311 bit = iter->spi_cur_prio & 0x3F; 312 new_set[set] |= 1ULL << bit; 313 314 if (!have_non_critical_streams(iter)) 315 { 316 ++iter->spi_cur_prio; 317 find_and_set_lowest_priority(iter); 318 set = iter->spi_cur_prio >> 6; 319 bit = iter->spi_cur_prio & 0x3F; 320 new_set[set] |= 1ULL << bit; 321 } 322 323 for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n) 324 if (drop_high) 325 iter->spi_set[n] &= ~new_set[n]; 326 else 327 iter->spi_set[n] = new_set[n]; 328} 329 330 331void 332lsquic_spi_drop_high (struct stream_prio_iter *iter) 333{ 334 spi_drop_high_or_non_high(iter, 1); 335} 336 337 338void 339lsquic_spi_drop_non_high (struct stream_prio_iter *iter) 340{ 341 spi_drop_high_or_non_high(iter, 0); 342} 343