lsquic_spi.c revision 5392f7a3
1/* Copyright (c) 2017 - 2019 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_spi.c - implementation of Stream Priority Iterator. 4 */ 5 6#include <assert.h> 7#include <inttypes.h> 8#include <stdint.h> 9#include <stdlib.h> 10#include <string.h> 11#include <sys/queue.h> 12#include <sys/types.h> 13#ifdef WIN32 14#include <vc_compat.h> 15#endif 16 17#include "lsquic_types.h" 18#include "lsquic_int_types.h" 19#include "lsquic_sfcw.h" 20#include "lsquic_varint.h" 21#include "lsquic_hq.h" 22#include "lsquic_hash.h" 23#include "lsquic_stream.h" 24#include "lsquic_spi.h" 25 26#define LSQUIC_LOGGER_MODULE LSQLM_SPI 27#define LSQUIC_LOG_CONN_ID lsquic_conn_log_cid(iter->spi_conn) 28#include "lsquic_logger.h" 29 30#define SPI_DEBUG(fmt, ...) LSQ_DEBUG("%s: " fmt, iter->spi_name, __VA_ARGS__) 31 32#define NEXT_STREAM(stream, off) \ 33 (* (struct lsquic_stream **) ((unsigned char *) (stream) + (off))) 34 35 36static void 37add_stream_to_spi (struct stream_prio_iter *iter, lsquic_stream_t *stream) 38{ 39 unsigned set, bit; 40 set = stream->sm_priority >> 6; 41 bit = stream->sm_priority & 0x3F; 42 if (!(iter->spi_set[set] & (1ULL << bit))) 43 { 44 iter->spi_set[set] |= 1ULL << bit; 45 TAILQ_INIT(&iter->spi_streams[ stream->sm_priority ]); 46 } 47 TAILQ_INSERT_TAIL(&iter->spi_streams[ stream->sm_priority ], 48 stream, next_prio_stream); 49} 50 51 52void 53lsquic_spi_init (struct stream_prio_iter *iter, struct lsquic_stream *first, 54 struct lsquic_stream *last, uintptr_t next_ptr_offset, 55 enum stream_q_flags onlist_mask, const struct lsquic_conn *conn, 56 const char *name, 57 int (*filter)(void *filter_ctx, struct lsquic_stream *), 58 void *filter_ctx) 59{ 60 struct lsquic_stream *stream; 61 unsigned count; 62 63 iter->spi_conn = conn; 64 iter->spi_name = name ? name : "UNSET"; 65 iter->spi_set[0] = 0; 66 iter->spi_set[1] = 0; 67 iter->spi_set[2] = 0; 68 iter->spi_set[3] = 0; 69 iter->spi_onlist_mask = onlist_mask; 70 iter->spi_cur_prio = 0; 71 iter->spi_prev_stream = NULL; 72 iter->spi_next_stream = NULL; 73 74 stream = first; 75 count = 0; 76 77 if (filter) 78 while (1) 79 { 80 if (filter(filter_ctx, stream)) 81 { 82 add_stream_to_spi(iter, stream); 83 ++count; 84 } 85 if (stream == last) 86 break; 87 stream = NEXT_STREAM(stream, next_ptr_offset); 88 } 89 else 90 while (1) 91 { 92 add_stream_to_spi(iter, stream); 93 ++count; 94 if (stream == last) 95 break; 96 stream = NEXT_STREAM(stream, next_ptr_offset); 97 } 98 99 if (count > 2) 100 SPI_DEBUG("initialized; # elems: %u; sets: [ %016"PRIX64", %016"PRIX64 101 ", %016"PRIX64", %016"PRIX64" ]", count, iter->spi_set[0], 102 iter->spi_set[1], iter->spi_set[2], iter->spi_set[3]); 103} 104 105 106static int 107find_and_set_lowest_priority (struct stream_prio_iter *iter) 108{ 109 unsigned set, prio; 110 uint64_t mask; 111 112 for (set = 0, prio = 0; set < 4; ++set, prio += 64) 113 if (iter->spi_set[ set ]) 114 break; 115 116 if (set >= 4) 117 { 118 //SPI_DEBUG("%s: cannot find any", __func__); 119 return -1; 120 } 121 122 mask = iter->spi_set[set]; 123 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 124 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 125 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 126 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 127 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 128 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 129 130#ifndef NDEBUG 131 unsigned bit; 132 set = prio >> 6; 133 bit = prio & 0x3F; 134 assert(iter->spi_set[ set ] & (1ULL << bit)); 135#endif 136 137 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 138 iter->spi_cur_prio = (unsigned char) prio; 139 return 0; 140} 141 142 143static int 144find_and_set_next_priority (struct stream_prio_iter *iter) 145{ 146 unsigned set, bit, prio; 147 uint64_t mask; 148 149 /* Examine values in the same set first */ 150 set = iter->spi_cur_prio >> 6; 151 bit = iter->spi_cur_prio & 0x3F; 152 prio = 64 * set; 153 154 if (bit < 63) 155 { 156 mask = iter->spi_set[set]; 157 mask &= ~((1ULL << (bit + 1)) - 1); 158 if (mask) 159 goto calc_priority; 160 } 161 162 ++set; 163 prio += 64; 164 for (; set < 4; ++set, prio += 64) 165 if (iter->spi_set[ set ]) 166 break; 167 168 if (set >= 4) 169 { 170 //SPI_DEBUG("%s: cannot find any", __func__); 171 return -1; 172 } 173 174 mask = iter->spi_set[set]; 175 176 calc_priority: 177 if (!(mask & ((1ULL << 32) - 1))) { prio += 32; mask >>= 32; } 178 if (!(mask & ((1ULL << 16) - 1))) { prio += 16; mask >>= 16; } 179 if (!(mask & ((1ULL << 8) - 1))) { prio += 8; mask >>= 8; } 180 if (!(mask & ((1ULL << 4) - 1))) { prio += 4; mask >>= 4; } 181 if (!(mask & ((1ULL << 2) - 1))) { prio += 2; mask >>= 2; } 182 if (!(mask & ((1ULL << 1) - 1))) { prio += 1; } 183 184#ifndef NDEBUG 185 set = prio >> 6; 186 bit = prio & 0x3F; 187 assert(iter->spi_set[ set ] & (1ULL << bit)); 188#endif 189 190 SPI_DEBUG("%s: prio %u -> %u", __func__, iter->spi_cur_prio, prio); 191 iter->spi_cur_prio = (unsigned char) prio; 192 return 0; 193} 194 195 196/* Each stream returned by the iterator is processed in some fashion. If, 197 * as a result of this, the stream gets taken off the original list, we 198 * have to follow suit and remove it from the iterator's set of streams. 199 */ 200static void 201maybe_evict_prev (struct stream_prio_iter *iter) 202{ 203 unsigned set, bit; 204 205 if (0 == (iter->spi_prev_stream->sm_qflags & iter->spi_onlist_mask)) 206 { 207 SPI_DEBUG("evict stream %"PRIu64, iter->spi_prev_stream->id); 208 TAILQ_REMOVE(&iter->spi_streams[ iter->spi_prev_prio ], 209 iter->spi_prev_stream, next_prio_stream); 210 if (TAILQ_EMPTY(&iter->spi_streams[ iter->spi_prev_prio ])) 211 { 212 set = iter->spi_prev_prio >> 6; 213 bit = iter->spi_prev_prio & 0x3F; 214 iter->spi_set[ set ] &= ~(1ULL << bit); 215 SPI_DEBUG("priority %u now has no elements", iter->spi_prev_prio); 216 } 217 iter->spi_prev_stream = NULL; 218 } 219} 220 221 222lsquic_stream_t * 223lsquic_spi_first (struct stream_prio_iter *iter) 224{ 225 lsquic_stream_t *stream; 226 unsigned set, bit; 227 228 if (iter->spi_prev_stream) 229 maybe_evict_prev(iter); 230 231 iter->spi_cur_prio = 0; 232 set = iter->spi_cur_prio >> 6; 233 bit = iter->spi_cur_prio & 0x3F; 234 235 if (!(iter->spi_set[set] & (1ULL << bit))) 236 { 237 if (0 != find_and_set_lowest_priority(iter)) 238 { 239 SPI_DEBUG("%s: return NULL", __func__); 240 return NULL; 241 } 242 } 243 244 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 245 iter->spi_prev_prio = iter->spi_cur_prio; 246 iter->spi_prev_stream = stream; 247 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 248 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 249 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 250 stream->id, iter->spi_cur_prio); 251 return stream; 252} 253 254 255lsquic_stream_t * 256lsquic_spi_next (struct stream_prio_iter *iter) 257{ 258 lsquic_stream_t *stream; 259 260 if (iter->spi_prev_stream) 261 maybe_evict_prev(iter); 262 263 stream = iter->spi_next_stream; 264 if (stream) 265 { 266 assert(iter->spi_prev_prio == iter->spi_cur_prio); 267 iter->spi_prev_stream = stream; 268 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 269 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 270 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 271 stream->id, iter->spi_cur_prio); 272 return stream; 273 } 274 275 if (0 != find_and_set_next_priority(iter)) 276 { 277 //SPI_DEBUG("%s: return NULL", __func__); 278 return NULL; 279 } 280 281 stream = TAILQ_FIRST(&iter->spi_streams[ iter->spi_cur_prio ]); 282 iter->spi_prev_prio = iter->spi_cur_prio; 283 iter->spi_prev_stream = stream; 284 iter->spi_next_stream = TAILQ_NEXT(stream, next_prio_stream); 285 286 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG) && !lsquic_stream_is_critical(stream)) 287 SPI_DEBUG("%s: return stream %"PRIu64", priority %u", __func__, 288 stream->id, iter->spi_cur_prio); 289 return stream; 290} 291 292 293static int 294have_non_critical_streams (const struct stream_prio_iter *iter) 295{ 296 const struct lsquic_stream *stream; 297 TAILQ_FOREACH(stream, &iter->spi_streams[ iter->spi_cur_prio ], 298 next_prio_stream) 299 if (!lsquic_stream_is_critical(stream)) 300 return 1; 301 return 0; 302} 303 304 305static void 306spi_drop_high_or_non_high (struct stream_prio_iter *iter, int drop_high) 307{ 308 uint64_t new_set[ sizeof(iter->spi_set) / sizeof(iter->spi_set[0]) ]; 309 unsigned bit, set, n; 310 311 memset(new_set, 0, sizeof(new_set)); 312 313 find_and_set_lowest_priority(iter); 314 set = iter->spi_cur_prio >> 6; 315 bit = iter->spi_cur_prio & 0x3F; 316 new_set[set] |= 1ULL << bit; 317 318 if (!have_non_critical_streams(iter)) 319 { 320 ++iter->spi_cur_prio; 321 find_and_set_lowest_priority(iter); 322 set = iter->spi_cur_prio >> 6; 323 bit = iter->spi_cur_prio & 0x3F; 324 new_set[set] |= 1ULL << bit; 325 } 326 327 for (n = 0; n < sizeof(new_set) / sizeof(new_set[0]); ++n) 328 if (drop_high) 329 iter->spi_set[n] &= ~new_set[n]; 330 else 331 iter->spi_set[n] = new_set[n]; 332} 333 334 335void 336lsquic_spi_drop_high (struct stream_prio_iter *iter) 337{ 338 spi_drop_high_or_non_high(iter, 1); 339} 340 341 342void 343lsquic_spi_drop_non_high (struct stream_prio_iter *iter) 344{ 345 spi_drop_high_or_non_high(iter, 0); 346} 347