lsquic_spi.c revision a74702c6
1/* Copyright (c) 2017 - 2022 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_spi.c - implementation of Stream Priority Iterator. 4 */ 5 6#include <assert.h> 7#include <inttypes.h> 8#include <stdint.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12#include <sys/types.h> 13#ifdef WIN32 14#include <vc_compat.h> 15#endif 16 17#include "lsquic_types.h" 18#include "lsquic_int_types.h" 19#include "lsquic_sfcw.h" 20#include "lsquic_varint.h" 21#include "lsquic_hq.h" 22#include "lsquic_hash.h" 23#include "lsquic_stream.h" 24#include "lsquic_conn_flow.h" 25#include "lsquic_rtt.h" 26#include "lsquic_conn_public.h" 27#include "lsquic_spi.h" 28 29#define LSQUIC_LOGGER_MODULE LSQLM_SPI 30#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn) 31#include "lsquic_logger.h" 32 33#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__) 34 35#define NEXT_STREAM(stream, off) \ 36 (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off))) 37 38 39static void 40add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream) 41{ 42 unsigned set, bit; 43 set = stream->sm_priority >> 6; 44 bit = stream->sm_priority & 0x3F; 45 if (!(iter->spi_set[set] & (1ULL << bit))) 46 { 47 iter->spi_set[set] |= 1ULL << bit; 48 TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]); 49 } 50 TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ], 51 stream, next_prio_stream); 52 ++iter->spi_n_added; 53} 54 55 56void 57lsquic_spi_init (void *iter_p, struct lsquic_stream *first, 58 struct lsquic_stream *last, uintptr_t next_ptr_offset, 59 struct lsquic_conn_public *conn_pub, 60 const char *name, 61 int (*filter)(void *filter_ctx, struct lsquic_stream *), 62 void *filter_ctx) 63{ 64 struct stream_prio_iter *const iter = iter_p; 65 struct lsquic_stream *stream; 66 unsigned count; 67 68 iter->spi_conn = conn_pub->lconn; 69 iter->spi_name = name ? name : "UNSET"; 70 iter->spi_set[0] = 0; 71 iter->spi_set[1] = 0; 72 iter->spi_set[2] = 0; 73 iter->spi_set[3] = 0; 74 iter->spi_cur_prio = 0; 75 iter->spi_next_stream = NULL; 76 iter->spi_n_added = 0; 77 78 stream = first; 79 count = 0; 80 81 if (filter) 82 while (1) 83 { 84 if (filter(filter_ctx, stream)) 85 { 86 add_stream_to_spi(iter, stream); 87 ++count; 88 } 89 if (stream == last) 90 break; 91 stream = NEXT_STREAM(stream, next_ptr_offset); 92 } 93 else 94 while (1) 95 { 96 add_stream_to_spi(iter, stream); 97 ++count; 98 if (stream == last) 99 break; 100 stream = NEXT_STREAM(stream, next_ptr_offset); 101 } 102 103 if (count > 2) 104 SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64 105 ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0], 106 iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]); 107} 108 109 110static int 111find_and_set_lowest_priority (struct stream_prio_iter *iter) 112{ 113 unsigned set, prio; 114 uint64_t mask; 115 116 for (set = 0, prio = 0; set < 4; ++set, prio += 64) 117 if (iter->spi_set[ set ]) 118 break; 119 120 if (set >= 4) 121 { 122 //SPI_DEBUG("%s: cannot find any", __func__); 123 return -1; 124 } 125 126 mask = iter->spi_set[set]; 127 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 128 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 129 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 130 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 131 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 132 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 133 134#ifndef NDEBUG 135 unsigned bit; 136 set = prio >> 6; 137 bit = prio & 0x3F; 138 assert(iter->spi_set[ set ] & (1ULL << bit)); 139#endif 140 141 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 142 iter->spi_cur_prio = (unsigned char) prio; 143 return 0; 144} 145 146 147static int 148find_and_set_next_priority (struct stream_prio_iter *iter) 149{ 150 unsigned set, bit, prio; 151 uint64_t mask; 152 153 /* Examine values in the same set first */ 154 set = iter->spi_cur_prio >> 6; 155 bit = iter->spi_cur_prio & 0x3F; 156 prio = 64 * set; 157 158 if (bit < 63) 159 { 160 mask = iter->spi_set[set]; 161 mask &= ~((1ULL << (bit + 1)) - 1); 162 if (mask) 163 goto calc_priority; 164 } 165 166 ++set; 167 prio += 64; 168 for (; set < 4; ++set, prio += 64) 169 if (iter->spi_set[ set ]) 170 break; 171 172 if (set >= 4) 173 { 174 //SPI_DEBUG("%s: cannot find any", __func__); 175 return -1; 176 } 177 178 mask = iter->spi_set[set]; 179 180 calc_priority: 181 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 182 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 183 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 184 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 185 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 186 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 187 188#ifndef NDEBUG 189 set = prio >> 6; 190 bit = prio & 0x3F; 191 assert(iter->spi_set[ set ] & (1ULL << bit)); 192#endif 193 194 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 195 iter->spi_cur_prio = (unsigned char) prio; 196 return 0; 197} 198 199 200lsquic_stream_t * 201lsquic_spi_first (void *iter_p) 202{ 203 struct stream_prio_iter *const iter = iter_p; 204 lsquic_stream_t *stream; 205 unsigned set, bit; 206 207 iter->spi_cur_prio = 0; 208 set = iter->spi_cur_prio >> 6; 209 bit = iter->spi_cur_prio & 0x3F; 210 211 if (!(iter->spi_set[set] & (1ULL << bit))) 212 { 213 if (0 != find_and_set_lowest_priority(iter)) 214 { 215 SPI_DEBUG("%s: return NULL", __func__); 216 return NULL; 217 } 218 } 219 220 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 221 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 222 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 223 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 224 stream->id, iter->spi_cur_prio); 225 return stream; 226} 227 228 229lsquic_stream_t * 230lsquic_spi_next (void *iter_p) 231{ 232 struct stream_prio_iter *const iter = iter_p; 233 lsquic_stream_t *stream; 234 235 stream = iter->spi_next_stream; 236 if (stream) 237 { 238 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 239 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 240 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 241 stream->id, iter->spi_cur_prio); 242 return stream; 243 } 244 245 if (0 != find_and_set_next_priority(iter)) 246 { 247 //SPI_DEBUG("%s: return NULL", __func__); 248 return NULL; 249 } 250 251 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 252 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 253 254 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 255 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 256 stream->id, iter->spi_cur_prio); 257 return stream; 258} 259 260 261static int 262have_non_critical_streams (const struct stream_prio_iter *iter) 263{ 264 const struct lsquic_stream *stream; 265 TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ], 266 next_prio_stream) 267 if (!lsquic_stream_is_critical(stream)) 268 return 1; 269 return 0; 270} 271 272 273#if __GNUC__ 274# define popcount __builtin_popcountll 275#else 276static int 277popcount (unsigned long long v) 278{ 279 int count, i; 280 for (i = 0, count = 0; i < sizeof(v) * 8; ++i) 281 if (v & (1 << i)) 282 ++count; 283 return count; 284} 285 286 287#endif 288 289 290static int 291spi_has_more_than_one_queue (const struct stream_prio_iter *iter) 292{ 293 unsigned i; 294 int count; 295 296 if (iter->spi_n_added < 2) 297 return 0; 298 299 count = 0; 300 for (i = 0; i < sizeof(iter->spi_set) / sizeof(iter->spi_set[0]); ++i) 301 { 302 count += popcount(iter->spi_set[i]); 303 if (count > 1) 304 return 1; 305 } 306 307 return 0; 308} 309 310 311static void 312spi_drop_high_or_non_high (void *iter_p, int drop_high) 313{ 314 struct stream_prio_iter *const iter = iter_p; 315 uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ]; 316 unsigned bit, set, n; 317 318 if (!spi_has_more_than_one_queue(iter)) 319 return; 320 321 memset(new_set, 0, sizeof(new_set)); 322 323 find_and_set_lowest_priority(iter); 324 set = iter->spi_cur_prio >> 6; 325 bit = iter->spi_cur_prio & 0x3F; 326 new_set[set] |= 1ULL << bit; 327 328 if (!have_non_critical_streams(iter)) 329 { 330 ++iter->spi_cur_prio; 331 find_and_set_lowest_priority(iter); 332 set = iter->spi_cur_prio >> 6; 333 bit = iter->spi_cur_prio & 0x3F; 334 new_set[set] |= 1ULL << bit; 335 } 336 337 for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n) 338 if (drop_high) 339 iter->spi_set[n] &= ~new_set[n]; 340 else 341 iter->spi_set[n] = new_set[n]; 342} 343 344 345void 346lsquic_spi_drop_high (void *iter_p) 347{ 348 struct stream_prio_iter *const iter = iter_p; 349 spi_drop_high_or_non_high(iter, 1); 350} 351 352 353void 354lsquic_spi_drop_non_high (void *iter_p) 355{ 356 struct stream_prio_iter *const iter = iter_p; 357 spi_drop_high_or_non_high(iter, 0); 358} 359 360 361void 362lsquic_spi_cleanup (void *iter_p) 363{ 364} 365