lsquic_spi.c revision a0e1aeee
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_spi.c - implementation of Stream Priority Iterator. 4 */ 5 6#include <assert.h> 7#include <inttypes.h> 8#include <stdint.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12#include <sys/types.h> 13#ifdef WIN32 14#include <vc_compat.h> 15#endif 16 17#include "lsquic_types.h" 18#include "lsquic_int_types.h" 19#include "lsquic_sfcw.h" 20#include "lsquic_varint.h" 21#include "lsquic_hq.h" 22#include "lsquic_hash.h" 23#include "lsquic_stream.h" 24#include "lsquic_spi.h" 25 26#define LSQUIC_LOGGER_MODULE LSQLM_SPI 27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn) 28#include "lsquic_logger.h" 29 30#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__) 31 32#define NEXT_STREAM(stream, off) \ 33 (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off))) 34 35 36static void 37add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream) 38{ 39 unsigned set, bit; 40 set = stream->sm_priority >> 6; 41 bit = stream->sm_priority & 0x3F; 42 if (!(iter->spi_set[set] & (1ULL << bit))) 43 { 44 iter->spi_set[set] |= 1ULL << bit; 45 TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]); 46 } 47 TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ], 48 stream, next_prio_stream); 49 ++iter->spi_n_added; 50} 51 52 53void 54lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first, 55 struct lsquic_stream *last, uintptr_t next_ptr_offset, 56 enum stream_q_flags onlist_mask, const struct lsquic_conn *conn, 57 const char *name, 58 int (*filter)(void *filter_ctx, struct lsquic_stream *), 59 void *filter_ctx) 60{ 61 struct lsquic_stream *stream; 62 unsigned count; 63 64 iter->spi_conn = conn; 65 iter->spi_name = name ? name : "UNSET"; 66 iter->spi_set[0] = 0; 67 iter->spi_set[1] = 0; 68 iter->spi_set[2] = 0; 69 iter->spi_set[3] = 0; 70 iter->spi_onlist_mask = onlist_mask; 71 iter->spi_cur_prio = 0; 72 iter->spi_prev_stream = NULL; 73 iter->spi_next_stream = NULL; 74 iter->spi_n_added = 0; 75 76 stream = first; 77 count = 0; 78 79 if (filter) 80 while (1) 81 { 82 if (filter(filter_ctx, stream)) 83 { 84 add_stream_to_spi(iter, stream); 85 ++count; 86 } 87 if (stream == last) 88 break; 89 stream = NEXT_STREAM(stream, next_ptr_offset); 90 } 91 else 92 while (1) 93 { 94 add_stream_to_spi(iter, stream); 95 ++count; 96 if (stream == last) 97 break; 98 stream = NEXT_STREAM(stream, next_ptr_offset); 99 } 100 101 if (count > 2) 102 SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64 103 ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0], 104 iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]); 105} 106 107 108static int 109find_and_set_lowest_priority (struct stream_prio_iter *iter) 110{ 111 unsigned set, prio; 112 uint64_t mask; 113 114 for (set = 0, prio = 0; set < 4; ++set, prio += 64) 115 if (iter->spi_set[ set ]) 116 break; 117 118 if (set >= 4) 119 { 120 //SPI_DEBUG("%s: cannot find any", __func__); 121 return -1; 122 } 123 124 mask = iter->spi_set[set]; 125 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 126 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 127 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 128 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 129 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 130 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 131 132#ifndef NDEBUG 133 unsigned bit; 134 set = prio >> 6; 135 bit = prio & 0x3F; 136 assert(iter->spi_set[ set ] & (1ULL << bit)); 137#endif 138 139 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 140 iter->spi_cur_prio = (unsigned char) prio; 141 return 0; 142} 143 144 145static int 146find_and_set_next_priority (struct stream_prio_iter *iter) 147{ 148 unsigned set, bit, prio; 149 uint64_t mask; 150 151 /* Examine values in the same set first */ 152 set = iter->spi_cur_prio >> 6; 153 bit = iter->spi_cur_prio & 0x3F; 154 prio = 64 * set; 155 156 if (bit < 63) 157 { 158 mask = iter->spi_set[set]; 159 mask &= ~((1ULL << (bit + 1)) - 1); 160 if (mask) 161 goto calc_priority; 162 } 163 164 ++set; 165 prio += 64; 166 for (; set < 4; ++set, prio += 64) 167 if (iter->spi_set[ set ]) 168 break; 169 170 if (set >= 4) 171 { 172 //SPI_DEBUG("%s: cannot find any", __func__); 173 return -1; 174 } 175 176 mask = iter->spi_set[set]; 177 178 calc_priority: 179 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 180 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 181 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 182 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 183 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 184 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 185 186#ifndef NDEBUG 187 set = prio >> 6; 188 bit = prio & 0x3F; 189 assert(iter->spi_set[ set ] & (1ULL << bit)); 190#endif 191 192 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 193 iter->spi_cur_prio = (unsigned char) prio; 194 return 0; 195} 196 197 198/* Each stream returned by the iterator is processed in some fashion. If, 199 * as a result of this, the stream gets taken off the original list, we 200 * have to follow suit and remove it from the iterator's set of streams. 201 */ 202static void 203maybe_evict_prev (struct stream_prio_iter *iter) 204{ 205 unsigned set, bit; 206 207 if (0 == (iter->spi_prev_stream->sm_qflags & iter->spi_onlist_mask)) 208 { 209 SPI_DEBUG("evict stream %"PRIu64, iter->spi_prev_stream->id); 210 TAILQ_REMOVE(&iter->spi_streams[ iter->spi_prev_prio ], 211 iter->spi_prev_stream, next_prio_stream); 212 if (TAILQ_EMPTY(&iter->spi_streams[ iter->spi_prev_prio ])) 213 { 214 set = iter->spi_prev_prio >> 6; 215 bit = iter->spi_prev_prio & 0x3F; 216 iter->spi_set[ set ] &= ~(1ULL << bit); 217 SPI_DEBUG("priority %u now has no elements", iter->spi_prev_prio); 218 } 219 iter->spi_prev_stream = NULL; 220 } 221} 222 223 224lsquic_stream_t * 225lsquic_spi_first (struct stream_prio_iter *iter) 226{ 227 lsquic_stream_t *stream; 228 unsigned set, bit; 229 230 if (iter->spi_prev_stream) 231 maybe_evict_prev(iter); 232 233 iter->spi_cur_prio = 0; 234 set = iter->spi_cur_prio >> 6; 235 bit = iter->spi_cur_prio & 0x3F; 236 237 if (!(iter->spi_set[set] & (1ULL << bit))) 238 { 239 if (0 != find_and_set_lowest_priority(iter)) 240 { 241 SPI_DEBUG("%s: return NULL", __func__); 242 return NULL; 243 } 244 } 245 246 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 247 iter->spi_prev_prio = iter->spi_cur_prio; 248 iter->spi_prev_stream = stream; 249 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 250 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 251 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 252 stream->id, iter->spi_cur_prio); 253 return stream; 254} 255 256 257lsquic_stream_t * 258lsquic_spi_next (struct stream_prio_iter *iter) 259{ 260 lsquic_stream_t *stream; 261 262 if (iter->spi_prev_stream) 263 maybe_evict_prev(iter); 264 265 stream = iter->spi_next_stream; 266 if (stream) 267 { 268 assert(iter->spi_prev_prio == iter->spi_cur_prio); 269 iter->spi_prev_stream = stream; 270 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 271 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 272 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 273 stream->id, iter->spi_cur_prio); 274 return stream; 275 } 276 277 if (0 != find_and_set_next_priority(iter)) 278 { 279 //SPI_DEBUG("%s: return NULL", __func__); 280 return NULL; 281 } 282 283 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 284 iter->spi_prev_prio = iter->spi_cur_prio; 285 iter->spi_prev_stream = stream; 286 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 287 288 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 289 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 290 stream->id, iter->spi_cur_prio); 291 return stream; 292} 293 294 295static int 296have_non_critical_streams (const struct stream_prio_iter *iter) 297{ 298 const struct lsquic_stream *stream; 299 TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ], 300 next_prio_stream) 301 if (!lsquic_stream_is_critical(stream)) 302 return 1; 303 return 0; 304} 305 306 307#if __GNUC__ 308# define popcount __builtin_popcountll 309#else 310static int 311popcount (unsigned long long v) 312{ 313 int count, i; 314 for (i = 0, count = 0; i < sizeof(v) * 8; ++i) 315 if (v & (1 << i)) 316 ++count; 317 return count; 318} 319 320 321#endif 322 323 324static int 325spi_has_more_than_one_queue (const struct stream_prio_iter *iter) 326{ 327 unsigned i; 328 int count; 329 330 if (iter->spi_n_added < 2) 331 return 0; 332 333 count = 0; 334 for (i = 0; i < sizeof(iter->spi_set) / sizeof(iter->spi_set[0]); ++i) 335 { 336 count += popcount(iter->spi_set[i]); 337 if (count > 1) 338 return 1; 339 } 340 341 return 0; 342} 343 344 345static void 346spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high) 347{ 348 uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ]; 349 unsigned bit, set, n; 350 351 if (!spi_has_more_than_one_queue(iter)) 352 return; 353 354 memset(new_set, 0, sizeof(new_set)); 355 356 find_and_set_lowest_priority(iter); 357 set = iter->spi_cur_prio >> 6; 358 bit = iter->spi_cur_prio & 0x3F; 359 new_set[set] |= 1ULL << bit; 360 361 if (!have_non_critical_streams(iter)) 362 { 363 ++iter->spi_cur_prio; 364 find_and_set_lowest_priority(iter); 365 set = iter->spi_cur_prio >> 6; 366 bit = iter->spi_cur_prio & 0x3F; 367 new_set[set] |= 1ULL << bit; 368 } 369 370 for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n) 371 if (drop_high) 372 iter->spi_set[n] &= ~new_set[n]; 373 else 374 iter->spi_set[n] = new_set[n]; 375} 376 377 378void 379lsquic_spi_drop_high (struct stream_prio_iter *iter) 380{ 381 spi_drop_high_or_non_high(iter, 1); 382} 383 384 385void 386lsquic_spi_drop_non_high (struct stream_prio_iter *iter) 387{ 388 spi_drop_high_or_non_high(iter, 0); 389} 390