lsquic_full_conn.c revision 83287402
1/* Copyright (c) 2017 LiteSpeed Technologies Inc. See LICENSE. */ 2/* 3 * lsquic_full_conn.c -- A "full" connection object has full functionality 4 */ 5 6#include <assert.h> 7#include <errno.h> 8#include <inttypes.h> 9#include <netinet/in.h> 10#include <stdarg.h> 11#include <stdlib.h> 12#include <string.h> 13#include <sys/socket.h> 14#include <sys/queue.h> 15#include <sys/time.h> 16 17#include "lsquic_types.h" 18#include "lsquic.h" 19#include "lsquic_alarmset.h" 20#include "lsquic_packet_common.h" 21#include "lsquic_parse.h" 22#include "lsquic_packet_in.h" 23#include "lsquic_packet_out.h" 24#include "lsquic_rechist.h" 25#include "lsquic_util.h" 26#include "lsquic_conn_flow.h" 27#include "lsquic_sfcw.h" 28#include "lsquic_stream.h" 29#include "lsquic_senhist.h" 30#include "lsquic_rtt.h" 31#include "lsquic_cubic.h" 32#include "lsquic_pacer.h" 33#include "lsquic_send_ctl.h" 34#include "lsquic_set.h" 35#include "lsquic_malo.h" 36#include "lsquic_chsk_stream.h" 37#include "lsquic_str.h" 38#include "lsquic_qtags.h" 39#include "lsquic_handshake.h" 40#include "lsquic_headers_stream.h" 41#include "lsquic_frame_common.h" 42#include "lsquic_frame_reader.h" 43#include "lsquic_mm.h" 44#include "lsquic_engine_public.h" 45#include "lsquic_spi.h" 46#include "lsquic_ev_log.h" 47#include "lsquic_version.h" 48#include "lsquic_hash.h" 49 50#include "lsquic_conn.h" 51#include "lsquic_conn_public.h" 52#include "lsquic_ver_neg.h" 53#include "lsquic_full_conn.h" 54 55#define LSQUIC_LOGGER_MODULE LSQLM_CONN 56#define LSQUIC_LOG_CONN_ID conn->fc_conn.cn_cid 57#include "lsquic_logger.h" 58 59enum { STREAM_IF_STD, STREAM_IF_HSK, STREAM_IF_HDR, N_STREAM_IFS }; 60 61#define MAX_ANY_PACKETS_SINCE_LAST_ACK 20 62#define MAX_RETR_PACKETS_SINCE_LAST_ACK 2 63#define ACK_TIMEOUT 25000 64#define TIME_BETWEEN_PINGS 15000000 65#define IDLE_TIMEOUT 30000000 66 67/* IMPORTANT: Keep values of FC_SERVER and FC_HTTP same as LSENG_SERVER 68 * and LSENG_HTTP. 69 */ 70enum full_conn_flags { 71 FC_SERVER = LSENG_SERVER, /* Server mode */ 72 FC_HTTP = LSENG_HTTP, /* HTTP mode */ 73 FC_TIMED_OUT = (1 << 2), 74 FC_ERROR = (1 << 3), 75 FC_ABORTED = (1 << 4), 76 FC_CLOSING = (1 << 5), /* Closing */ 77 FC_SEND_PING = (1 << 6), /* PING frame scheduled */ 78 FC_NSTP = (1 << 7), /* NSTP mode */ 79 FC_SEND_GOAWAY = (1 << 8), 80 FC_SEND_WUF = (1 << 9), 81 FC_SEND_STOP_WAITING 82 = (1 <<10), 83 FC_ACK_QUEUED = (1 <<11), 84 FC_ACK_HAD_MISS = (1 <<12), /* Last ACK frame had missing packets. */ 85 FC_CREATED_OK = (1 <<13), 86 FC_RECV_CLOSE = (1 <<14), /* Received CONNECTION_CLOSE frame */ 87 FC_GOING_AWAY = (1 <<15), /* Do not accept or create new streams */ 88 FC_GOAWAY_SENT = (1 <<16), /* Only send GOAWAY once */ 89 FC_SUPPORT_PUSH = (1 <<17), 90 FC_GOT_PRST = (1 <<18), /* Received public reset packet */ 91 FC_FIRST_TICK = (1 <<19), 92 FC_TICK_CLOSE = (1 <<20), /* We returned TICK_CLOSE */ 93}; 94 95#define FC_IMMEDIATE_CLOSE_FLAGS \ 96 (FC_TIMED_OUT|FC_ERROR|FC_ABORTED) 97 98#if LSQUIC_KEEP_STREAM_HISTORY 99#define KEEP_CLOSED_STREAM_HISTORY 0 100#endif 101 102#if KEEP_CLOSED_STREAM_HISTORY 103struct stream_history 104{ 105 uint32_t shist_stream_id; 106 enum stream_flags shist_stream_flags; 107 unsigned char shist_hist_buf[1 << SM_HIST_BITS]; 108}; 109#define SHIST_BITS 5 110#define SHIST_MASK ((1 << SHIST_BITS) - 1) 111#endif 112 113struct full_conn 114{ 115 struct lsquic_conn fc_conn; 116 struct lsquic_hash *fc_streams; 117 struct lsquic_rechist fc_rechist; 118 struct { 119 const struct lsquic_stream_if *stream_if; 120 void *stream_if_ctx; 121 } fc_stream_ifs[N_STREAM_IFS]; 122 lsquic_conn_ctx_t *fc_conn_ctx; 123 struct lsquic_send_ctl fc_send_ctl; 124 struct lsquic_conn_public fc_pub; 125 lsquic_alarmset_t fc_alset; 126 lsquic_set32_t fc_closed_stream_ids[2]; 127 const struct lsquic_engine_settings 128 *fc_settings; 129 struct lsquic_engine_public *fc_enpub; 130 lsquic_packno_t fc_max_ack_packno; 131 lsquic_packno_t fc_max_swf_packno; 132 struct { 133 unsigned max_streams_in; 134 unsigned max_streams_out; 135 unsigned max_conn_send; 136 unsigned max_stream_send; 137 } fc_cfg; 138 enum full_conn_flags fc_flags; 139 /* Number of packets received since last ACK sent: */ 140 unsigned fc_n_slack_all; 141 /* Number ackable packets received since last ACK was sent: */ 142 unsigned fc_n_slack_akbl; 143 unsigned fc_n_delayed_streams; 144 unsigned fc_n_cons_unretx; 145 uint32_t fc_last_stream_id; 146 uint32_t fc_max_peer_stream_id; 147 uint32_t fc_goaway_stream_id; 148 struct ver_neg fc_ver_neg; 149 union { 150 struct client_hsk_ctx client; 151 } fc_hsk_ctx; 152#if FULL_CONN_STATS 153 struct { 154 unsigned n_all_packets_in, 155 n_packets_out, 156 n_undec_packets, 157 n_dup_packets, 158 n_err_packets; 159 unsigned long stream_data_sz; 160 } fc_stats; 161#endif 162#if KEEP_CLOSED_STREAM_HISTORY 163 /* Rolling log of histories of closed streams. Older entries are 164 * overwritten. 165 */ 166 struct stream_history fc_stream_histories[1 << SHIST_BITS]; 167 unsigned fc_stream_hist_idx; 168#endif 169}; 170 171 172#define ABORT_WITH_FLAG(conn, flag, errmsg...) do { \ 173 (conn)->fc_flags |= flag; \ 174 LSQ_ERROR("Abort connection: " errmsg); \ 175} while (0) 176 177#define ABORT_ERROR(errmsg...) ABORT_WITH_FLAG(conn, FC_ERROR, errmsg) 178 179#define ABORT_TIMEOUT(errmsg...) ABORT_WITH_FLAG(conn, FC_TIMED_OUT, errmsg) 180 181static void 182idle_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now); 183 184static void 185ping_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now); 186 187static void 188handshake_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now); 189 190static void 191ack_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now); 192 193static lsquic_stream_t * 194new_stream (struct full_conn *conn, uint32_t stream_id); 195 196static void 197reset_ack_state (struct full_conn *conn); 198 199static const struct headers_stream_callbacks headers_callbacks; 200 201#if KEEP_CLOSED_STREAM_HISTORY 202 203static void 204save_stream_history (struct full_conn *conn, const lsquic_stream_t *stream) 205{ 206 sm_hist_idx_t idx; 207 struct stream_history *const shist = 208 &conn->fc_stream_histories[ conn->fc_stream_hist_idx++ & SHIST_MASK ]; 209 210 shist->shist_stream_id = stream->id; 211 shist->shist_stream_flags = stream->stream_flags; 212 213 idx = stream->sm_hist_idx & SM_HIST_IDX_MASK; 214 if ('\0' == stream->sm_hist_buf[ idx ]) 215 memcpy(shist->shist_hist_buf, stream->sm_hist_buf, idx + 1); 216 else 217 { 218 memcpy(shist->shist_hist_buf, 219 stream->sm_hist_buf + idx, sizeof(stream->sm_hist_buf) - idx); 220 memcpy(shist->shist_hist_buf + sizeof(shist->shist_hist_buf) - idx, 221 stream->sm_hist_buf, idx); 222 } 223} 224 225 226static const struct stream_history * 227find_stream_history (const struct full_conn *conn, uint32_t stream_id) 228{ 229 const struct stream_history *shist; 230 const struct stream_history *const shist_end = 231 conn->fc_stream_histories + (1 << SHIST_BITS); 232 for (shist = conn->fc_stream_histories; shist < shist_end; ++shist) 233 if (shist->shist_stream_id == stream_id) 234 return shist; 235 return NULL; 236} 237 238 239# define SAVE_STREAM_HISTORY(conn, stream) save_stream_history(conn, stream) 240#else 241# define SAVE_STREAM_HISTORY(conn, stream) 242#endif 243 244static unsigned 245highest_bit_set (unsigned sz) 246{ 247#if __GNUC__ 248 unsigned clz = __builtin_clz(sz); 249 return 31 - clz; 250#else 251 unsigned n, y; 252 n = 32; 253 y = sz >> 16; if (y) { n -= 16; sz = y; } 254 y = sz >> 8; if (y) { n -= 8; sz = y; } 255 y = sz >> 4; if (y) { n -= 4; sz = y; } 256 y = sz >> 2; if (y) { n -= 2; sz = y; } 257 y = sz >> 1; if (y) return 31 - n + 2; 258 return 31 - n + sz; 259#endif 260} 261 262 263static void 264set_versions (struct full_conn *conn, unsigned versions) 265{ 266 conn->fc_ver_neg.vn_supp = versions; 267 conn->fc_ver_neg.vn_ver = highest_bit_set(versions); 268 conn->fc_ver_neg.vn_buf = lsquic_ver2tag(conn->fc_ver_neg.vn_ver); 269 conn->fc_conn.cn_version = conn->fc_ver_neg.vn_ver; 270 LSQ_DEBUG("negotiating version %s", 271 lsquic_ver2str[conn->fc_ver_neg.vn_ver]); 272} 273 274 275static void 276init_ver_neg (struct full_conn *conn, unsigned versions) 277{ 278 set_versions(conn, versions); 279 conn->fc_ver_neg.vn_tag = &conn->fc_ver_neg.vn_buf; 280 conn->fc_ver_neg.vn_state = VN_START; 281} 282 283 284/* If peer supplies odd values, we abort the connection immediately rather 285 * that wait for it to finish "naturally" due to inability to send things. 286 */ 287static void 288conn_on_peer_config (struct full_conn *conn, unsigned peer_cfcw, 289 unsigned peer_sfcw, unsigned max_streams_out) 290{ 291 lsquic_stream_t *stream; 292 struct lsquic_hash_elem *el; 293 294 LSQ_INFO("Applying peer config: cfcw: %u; sfcw: %u; # streams: %u", 295 peer_cfcw, peer_sfcw, max_streams_out); 296 297 if (peer_cfcw < conn->fc_pub.conn_cap.cc_sent) 298 { 299 ABORT_ERROR("peer specified CFCW=%u bytes, which is smaller than " 300 "the amount of data already sent on this connection (%"PRIu64 301 " bytes)", peer_cfcw, conn->fc_pub.conn_cap.cc_sent); 302 return; 303 } 304 305 conn->fc_cfg.max_streams_out = max_streams_out; 306 conn->fc_pub.conn_cap.cc_max = peer_cfcw; 307 308 for (el = lsquic_hash_first(conn->fc_streams); el; 309 el = lsquic_hash_next(conn->fc_streams)) 310 { 311 stream = lsquic_hashelem_getdata(el); 312 if (0 != lsquic_stream_set_max_send_off(stream, peer_sfcw)) 313 { 314 ABORT_ERROR("cannot set peer-supplied SFCW=%u on stream %u", 315 peer_sfcw, stream->id); 316 return; 317 } 318 } 319 320 conn->fc_cfg.max_stream_send = peer_sfcw; 321} 322 323 324static int 325send_smhl (const struct full_conn *conn) 326{ 327 uint32_t smhl; 328 return conn->fc_conn.cn_enc_session 329 && (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE) 330 && 0 == conn->fc_conn.cn_esf->esf_get_peer_setting( 331 conn->fc_conn.cn_enc_session, QTAG_SMHL, &smhl) 332 && 1 == smhl; 333} 334 335 336/* Once handshake has been completed, send settings to peer if appropriate. 337 */ 338static void 339maybe_send_settings (struct full_conn *conn) 340{ 341 struct lsquic_http2_setting settings[2]; 342 unsigned n_settings = 0; 343 344 if (conn->fc_settings->es_max_header_list_size && send_smhl(conn)) 345 { 346 settings[n_settings].id = SETTINGS_MAX_HEADER_LIST_SIZE; 347 settings[n_settings].value = conn->fc_settings->es_max_header_list_size; 348 LSQ_DEBUG("sending settings SETTINGS_MAX_HEADER_LIST_SIZE=%u", 349 settings[n_settings].value); 350 ++n_settings; 351 } 352 if (!(conn->fc_flags & FC_SERVER) && !conn->fc_settings->es_support_push) 353 { 354 settings[n_settings].id = SETTINGS_ENABLE_PUSH; 355 settings[n_settings].value = 0; 356 LSQ_DEBUG("sending settings SETTINGS_ENABLE_PUSH=%u", 357 settings[n_settings].value); 358 ++n_settings; 359 } 360 361 if (n_settings) 362 { 363 if (0 != lsquic_headers_stream_send_settings(conn->fc_pub.hs, 364 settings, n_settings)) 365 ABORT_ERROR("could not send settings"); 366 } 367 else 368 LSQ_DEBUG("not sending any settings"); 369} 370 371 372static int 373apply_peer_settings (struct full_conn *conn) 374{ 375 uint32_t cfcw, sfcw, mids; 376 unsigned n; 377 const struct { 378 uint32_t tag; 379 uint32_t *val; 380 const char *tag_str; 381 } tags[] = { 382 { QTAG_CFCW, &cfcw, "CFCW", }, 383 { QTAG_SFCW, &sfcw, "SFCW", }, 384 { QTAG_MIDS, &mids, "MIDS", }, 385 }; 386 387#ifndef NDEBUG 388 if (getenv("LSQUIC_TEST_ENGINE_DTOR")) 389 return 0; 390#endif 391 392 for (n = 0; n < sizeof(tags) / sizeof(tags[0]); ++n) 393 if (0 != conn->fc_conn.cn_esf->esf_get_peer_setting( 394 conn->fc_conn.cn_enc_session, tags[n].tag, tags[n].val)) 395 { 396 LSQ_INFO("peer did not supply value for %s", tags[n].tag_str); 397 return -1; 398 } 399 400 LSQ_DEBUG("peer settings: CFCW: %u; SFCW: %u; MIDS: %u", 401 cfcw, sfcw, mids); 402 conn_on_peer_config(conn, cfcw, sfcw, mids); 403 if (conn->fc_flags & FC_HTTP) 404 maybe_send_settings(conn); 405 return 0; 406} 407 408 409static const struct conn_iface full_conn_iface; 410 411static struct full_conn * 412new_conn_common (lsquic_cid_t cid, struct lsquic_engine_public *enpub, 413 const struct lsquic_stream_if *stream_if, 414 void *stream_if_ctx, unsigned flags, 415 unsigned short max_packet_size) 416{ 417 struct full_conn *conn; 418 lsquic_stream_t *headers_stream; 419 int saved_errno; 420 421 assert(0 == (flags & ~(FC_SERVER|FC_HTTP))); 422 423 conn = calloc(1, sizeof(*conn)); 424 if (!conn) 425 return NULL; 426 headers_stream = NULL; 427 conn->fc_conn.cn_cid = cid; 428 conn->fc_conn.cn_pack_size = max_packet_size; 429 conn->fc_flags = flags; 430 conn->fc_enpub = enpub; 431 conn->fc_pub.enpub = enpub; 432 conn->fc_pub.mm = &enpub->enp_mm; 433 conn->fc_pub.lconn = &conn->fc_conn; 434 conn->fc_pub.send_ctl = &conn->fc_send_ctl; 435 conn->fc_pub.packet_out_malo = 436 lsquic_malo_create(sizeof(struct lsquic_packet_out)); 437 conn->fc_stream_ifs[STREAM_IF_STD].stream_if = stream_if; 438 conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx = stream_if_ctx; 439 conn->fc_settings = &enpub->enp_settings; 440 /* Calculate maximum number of incoming streams using the same mechanism 441 * and parameters as found in Chrome: 442 */ 443 conn->fc_cfg.max_streams_in = 444 (unsigned) ((float) enpub->enp_settings.es_max_streams_in * 1.1f); 445 if (conn->fc_cfg.max_streams_in < 446 enpub->enp_settings.es_max_streams_in + 10) 447 conn->fc_cfg.max_streams_in = 448 enpub->enp_settings.es_max_streams_in + 10; 449 /* `max_streams_out' gets reset when handshake is complete and we 450 * learn of peer settings. 100 seems like a sane default value 451 * because it is what other implementations use. In server mode, 452 * we do not open any streams until the handshake is complete; in 453 * client mode, we are limited to 98 outgoing requests alongside 454 * handshake and headers streams. 455 */ 456 conn->fc_cfg.max_streams_out = 100; 457 TAILQ_INIT(&conn->fc_pub.sending_streams); 458 TAILQ_INIT(&conn->fc_pub.rw_streams); 459 TAILQ_INIT(&conn->fc_pub.service_streams); 460 lsquic_conn_cap_init(&conn->fc_pub.conn_cap, LSQUIC_MIN_FCW); 461 lsquic_alarmset_init(&conn->fc_alset, cid); 462 lsquic_alarmset_init_alarm(&conn->fc_alset, AL_IDLE, idle_alarm_expired, conn); 463 lsquic_alarmset_init_alarm(&conn->fc_alset, AL_ACK, ack_alarm_expired, conn); 464 lsquic_alarmset_init_alarm(&conn->fc_alset, AL_PING, ping_alarm_expired, conn); 465 lsquic_alarmset_init_alarm(&conn->fc_alset, AL_HANDSHAKE, handshake_alarm_expired, conn); 466 lsquic_set32_init(&conn->fc_closed_stream_ids[0]); 467 lsquic_set32_init(&conn->fc_closed_stream_ids[1]); 468 lsquic_cfcw_init(&conn->fc_pub.cfcw, &conn->fc_pub, conn->fc_settings->es_cfcw); 469 lsquic_send_ctl_init(&conn->fc_send_ctl, &conn->fc_alset, conn->fc_enpub, 470 &conn->fc_ver_neg, &conn->fc_pub, conn->fc_conn.cn_pack_size); 471 472 conn->fc_streams = lsquic_hash_create(); 473 if (!conn->fc_streams) 474 goto cleanup_on_error; 475 lsquic_rechist_init(&conn->fc_rechist, cid); 476 if (conn->fc_flags & FC_HTTP) 477 { 478 conn->fc_pub.hs = lsquic_headers_stream_new( 479 !!(conn->fc_flags & FC_SERVER), conn->fc_pub.mm, conn->fc_settings, 480 &headers_callbacks, conn); 481 if (!conn->fc_pub.hs) 482 goto cleanup_on_error; 483 conn->fc_stream_ifs[STREAM_IF_HDR].stream_if = lsquic_headers_stream_if; 484 conn->fc_stream_ifs[STREAM_IF_HDR].stream_if_ctx = conn->fc_pub.hs; 485 headers_stream = new_stream(conn, LSQUIC_STREAM_HEADERS); 486 if (!headers_stream) 487 goto cleanup_on_error; 488 } 489 else 490 { 491 conn->fc_stream_ifs[STREAM_IF_HDR].stream_if = stream_if; 492 conn->fc_stream_ifs[STREAM_IF_HDR].stream_if_ctx = stream_if_ctx; 493 } 494 if (conn->fc_settings->es_support_push) 495 conn->fc_flags |= FC_SUPPORT_PUSH; 496 conn->fc_conn.cn_if = &full_conn_iface; 497 return conn; 498 499 cleanup_on_error: 500 saved_errno = errno; 501 502 if (conn->fc_streams) 503 lsquic_hash_destroy(conn->fc_streams); 504 lsquic_rechist_cleanup(&conn->fc_rechist); 505 if (conn->fc_flags & FC_HTTP) 506 { 507 if (conn->fc_pub.hs) 508 lsquic_headers_stream_destroy(conn->fc_pub.hs); 509 if (headers_stream) 510 lsquic_stream_destroy(headers_stream); 511 } 512 memset(conn, 0, sizeof(*conn)); 513 free(conn); 514 515 errno = saved_errno; 516 return NULL; 517} 518 519 520struct lsquic_conn * 521full_conn_client_new (struct lsquic_engine_public *enpub, 522 const struct lsquic_stream_if *stream_if, 523 void *stream_if_ctx, unsigned flags, 524 const char *hostname, unsigned short max_packet_size) 525{ 526 struct full_conn *conn; 527 enum lsquic_version version; 528 lsquic_cid_t cid; 529 const struct enc_session_funcs *esf; 530 531 version = highest_bit_set(enpub->enp_settings.es_versions); 532 esf = select_esf_by_ver(version); 533 cid = esf->esf_generate_cid(); 534 conn = new_conn_common(cid, enpub, stream_if, stream_if_ctx, flags, 535 max_packet_size); 536 if (!conn) 537 return NULL; 538 conn->fc_conn.cn_esf = esf; 539 conn->fc_conn.cn_enc_session = 540 conn->fc_conn.cn_esf->esf_create_client(hostname, cid, conn->fc_enpub); 541 if (!conn->fc_conn.cn_enc_session) 542 { 543 LSQ_WARN("could not create enc session: %s", strerror(errno)); 544 conn->fc_conn.cn_if->ci_destroy(&conn->fc_conn); 545 return NULL; 546 } 547 548 if (conn->fc_flags & FC_HTTP) 549 conn->fc_last_stream_id = LSQUIC_STREAM_HEADERS; /* Client goes 5, 7, 9.... */ 550 else 551 conn->fc_last_stream_id = LSQUIC_STREAM_HANDSHAKE; 552 conn->fc_hsk_ctx.client.lconn = &conn->fc_conn; 553 conn->fc_hsk_ctx.client.mm = &enpub->enp_mm; 554 conn->fc_hsk_ctx.client.ver_neg = &conn->fc_ver_neg; 555 conn->fc_stream_ifs[STREAM_IF_HSK] 556 .stream_if = &lsquic_client_hsk_stream_if; 557 conn->fc_stream_ifs[STREAM_IF_HSK].stream_if_ctx = &conn->fc_hsk_ctx.client; 558 init_ver_neg(conn, conn->fc_settings->es_versions); 559 conn->fc_conn.cn_pf = select_pf_by_ver(conn->fc_ver_neg.vn_ver); 560 if (conn->fc_settings->es_handshake_to) 561 lsquic_alarmset_set(&conn->fc_alset, AL_HANDSHAKE, 562 lsquic_time_now() + conn->fc_settings->es_handshake_to); 563 if (!new_stream(conn, LSQUIC_STREAM_HANDSHAKE)) 564 { 565 LSQ_WARN("could not create handshake stream: %s", strerror(errno)); 566 conn->fc_conn.cn_if->ci_destroy(&conn->fc_conn); 567 return NULL; 568 } 569 conn->fc_flags |= FC_CREATED_OK; 570 conn->fc_conn_ctx = stream_if->on_new_conn(stream_if_ctx, &conn->fc_conn); 571 LSQ_INFO("Created new client connection"); 572 EV_LOG_CONN_EVENT(cid, "created full connection"); 573 return &conn->fc_conn; 574} 575 576 577static int 578is_our_stream (const struct full_conn *conn, const lsquic_stream_t *stream) 579{ 580 int is_server = !!(conn->fc_flags & FC_SERVER); 581 return (1 & stream->id) ^ is_server; 582} 583 584 585static unsigned 586count_streams (const struct full_conn *conn, int peer) 587{ 588 const lsquic_stream_t *stream; 589 unsigned count; 590 int ours; 591 int is_server; 592 struct lsquic_hash_elem *el; 593 594 peer = !!peer; 595 is_server = !!(conn->fc_flags & FC_SERVER); 596 count = 0; 597 598 for (el = lsquic_hash_first(conn->fc_streams); el; 599 el = lsquic_hash_next(conn->fc_streams)) 600 { 601 stream = lsquic_hashelem_getdata(el); 602 ours = (1 & stream->id) ^ is_server; 603 if (ours ^ peer) 604 count += !lsquic_stream_is_closed(stream); 605 } 606 607 return count; 608} 609 610 611static void 612full_conn_ci_destroy (lsquic_conn_t *lconn) 613{ 614 struct full_conn *conn = (struct full_conn *) lconn; 615 struct lsquic_hash_elem *el; 616 struct lsquic_stream *stream; 617 618 LSQ_DEBUG("destroy connection"); 619 conn->fc_flags |= FC_CLOSING; 620 lsquic_set32_cleanup(&conn->fc_closed_stream_ids[0]); 621 lsquic_set32_cleanup(&conn->fc_closed_stream_ids[1]); 622 for (el = lsquic_hash_first(conn->fc_streams); el; 623 el = lsquic_hash_next(conn->fc_streams)) 624 { 625 stream = lsquic_hashelem_getdata(el); 626 lsquic_stream_destroy(stream); 627 } 628 lsquic_hash_destroy(conn->fc_streams); 629 if (conn->fc_flags & FC_CREATED_OK) 630 conn->fc_stream_ifs[STREAM_IF_STD].stream_if 631 ->on_conn_closed(&conn->fc_conn); 632 if (conn->fc_pub.hs) 633 lsquic_headers_stream_destroy(conn->fc_pub.hs); 634 635 lsquic_send_ctl_cleanup(&conn->fc_send_ctl); 636 lsquic_rechist_cleanup(&conn->fc_rechist); 637 if (conn->fc_conn.cn_enc_session) 638 conn->fc_conn.cn_esf->esf_destroy(conn->fc_conn.cn_enc_session); 639 lsquic_malo_destroy(conn->fc_pub.packet_out_malo); 640#if FULL_CONN_STATS 641 LSQ_NOTICE("received %u packets, of which %u were not decryptable, %u were " 642 "dups and %u were errors; sent %u packets, avg stream data per outgoing" 643 " packet is %lu bytes", 644 conn->fc_stats.n_all_packets_in, conn->fc_stats.n_undec_packets, 645 conn->fc_stats.n_dup_packets, conn->fc_stats.n_err_packets, 646 conn->fc_stats.n_packets_out, 647 conn->fc_stats.stream_data_sz / conn->fc_stats.n_packets_out); 648#endif 649 EV_LOG_CONN_EVENT(LSQUIC_LOG_CONN_ID, "full connection destroyed"); 650 free(conn); 651} 652 653 654static void 655conn_mark_stream_closed (struct full_conn *conn, uint32_t stream_id) 656{ /* Because stream IDs are distributed unevenly -- there is a set of odd 657 * stream IDs and a set of even stream IDs -- it is more efficient to 658 * maintain two sets of closed stream IDs. 659 */ 660 int idx = stream_id & 1; 661 stream_id >>= 1; 662 if (0 != lsquic_set32_add(&conn->fc_closed_stream_ids[idx], stream_id)) 663 ABORT_ERROR("could not add element to set: %s", strerror(errno)); 664} 665 666 667static int 668conn_is_stream_closed (struct full_conn *conn, uint32_t stream_id) 669{ 670 int idx = stream_id & 1; 671 stream_id >>= 1; 672 return lsquic_set32_has(&conn->fc_closed_stream_ids[idx], stream_id); 673} 674 675 676static void 677set_ack_timer (struct full_conn *conn, lsquic_time_t now) 678{ 679 lsquic_alarmset_set(&conn->fc_alset, AL_ACK, now + ACK_TIMEOUT); 680 LSQ_DEBUG("ACK alarm set to %"PRIu64, now + ACK_TIMEOUT); 681} 682 683 684static void 685ack_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now) 686{ 687 struct full_conn *conn = ctx; 688 LSQ_DEBUG("ACK timer expired (%"PRIu64" < %"PRIu64"): ACK queued", 689 expiry, now); 690 conn->fc_flags |= FC_ACK_QUEUED; 691} 692 693 694static void 695try_queueing_ack (struct full_conn *conn, int was_missing, lsquic_time_t now) 696{ 697 if (conn->fc_n_slack_akbl >= MAX_RETR_PACKETS_SINCE_LAST_ACK || 698 (conn->fc_conn.cn_version < LSQVER_039 /* Since Q039 do not ack ACKs */ 699 && conn->fc_n_slack_all >= MAX_ANY_PACKETS_SINCE_LAST_ACK) || 700 ((conn->fc_flags & FC_ACK_HAD_MISS) && was_missing) || 701 lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl) > 1) 702 { 703 lsquic_alarmset_unset(&conn->fc_alset, AL_ACK); 704 lsquic_send_ctl_sanity_check(&conn->fc_send_ctl); 705 conn->fc_flags |= FC_ACK_QUEUED; 706 LSQ_DEBUG("ACK queued: ackable: %u; all: %u; had_miss: %d; " 707 "was_missing: %d; n_stop_waiting: %u", 708 conn->fc_n_slack_akbl, conn->fc_n_slack_all, 709 !!(conn->fc_flags & FC_ACK_HAD_MISS), was_missing, 710 lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl)); 711 } 712 else if (conn->fc_n_slack_akbl > 0) 713 set_ack_timer(conn, now); 714} 715 716 717static void 718reset_ack_state (struct full_conn *conn) 719{ 720 conn->fc_n_slack_all = 0; 721 conn->fc_n_slack_akbl = 0; 722 lsquic_send_ctl_n_stop_waiting_reset(&conn->fc_send_ctl); 723 conn->fc_flags &= ~FC_ACK_QUEUED; 724 lsquic_alarmset_unset(&conn->fc_alset, AL_ACK); 725 lsquic_send_ctl_sanity_check(&conn->fc_send_ctl); 726 LSQ_DEBUG("ACK state reset"); 727} 728 729 730static lsquic_stream_t * 731new_stream_ext (struct full_conn *conn, uint32_t stream_id, int if_idx, 732 enum stream_ctor_flags stream_ctor_flags) 733{ 734 lsquic_stream_t *stream = lsquic_stream_new_ext(stream_id, &conn->fc_pub, 735 conn->fc_stream_ifs[if_idx].stream_if, 736 conn->fc_stream_ifs[if_idx].stream_if_ctx, conn->fc_settings->es_sfcw, 737 conn->fc_cfg.max_stream_send, stream_ctor_flags); 738 if (stream) 739 lsquic_hash_insert(conn->fc_streams, &stream->id, sizeof(stream->id), 740 stream); 741 return stream; 742} 743 744 745static lsquic_stream_t * 746new_stream (struct full_conn *conn, uint32_t stream_id) 747{ 748 enum stream_ctor_flags flags; 749 int idx; 750 switch (stream_id) 751 { 752 case LSQUIC_STREAM_HANDSHAKE: 753 idx = STREAM_IF_HSK; 754 flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW; 755 break; 756 case LSQUIC_STREAM_HEADERS: 757 idx = STREAM_IF_HDR; 758 flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW; 759 if (!(conn->fc_flags & FC_HTTP) && 760 conn->fc_enpub->enp_settings.es_rw_once) 761 flags |= SCF_DISP_RW_ONCE; 762 break; 763 default: 764 idx = STREAM_IF_STD; 765 flags = SCF_DI_AUTOSWITCH|SCF_CALL_ON_NEW; 766 if (conn->fc_enpub->enp_settings.es_rw_once) 767 flags |= SCF_DISP_RW_ONCE; 768 break; 769 } 770 return new_stream_ext(conn, stream_id, idx, flags); 771} 772 773 774static uint32_t 775generate_stream_id (struct full_conn *conn) 776{ 777 conn->fc_last_stream_id += 2; 778 return conn->fc_last_stream_id; 779} 780 781 782unsigned 783lsquic_conn_n_pending_streams (const lsquic_conn_t *lconn) 784{ 785 struct full_conn *conn = (struct full_conn *) lconn; 786 return conn->fc_n_delayed_streams; 787} 788 789 790unsigned 791lsquic_conn_cancel_pending_streams (lsquic_conn_t *lconn, unsigned n) 792{ 793 struct full_conn *conn = (struct full_conn *) lconn; 794 if (n > conn->fc_n_delayed_streams) 795 conn->fc_n_delayed_streams = 0; 796 else 797 conn->fc_n_delayed_streams -= n; 798 return conn->fc_n_delayed_streams; 799} 800 801 802static int 803either_side_going_away (const struct full_conn *conn) 804{ 805 return (conn->fc_flags & FC_GOING_AWAY) 806 || (conn->fc_conn.cn_flags & LSCONN_PEER_GOING_AWAY); 807} 808 809 810void 811lsquic_conn_make_stream (lsquic_conn_t *lconn) 812{ 813 struct full_conn *conn = (struct full_conn *) lconn; 814 unsigned stream_count = count_streams(conn, 0); 815 if (stream_count < conn->fc_cfg.max_streams_out) 816 { 817 if (!new_stream(conn, generate_stream_id(conn))) 818 ABORT_ERROR("could not create new stream: %s", strerror(errno)); 819 } 820 else if (either_side_going_away(conn)) 821 (void) conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_new_stream( 822 conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx, NULL); 823 else 824 { 825 ++conn->fc_n_delayed_streams; 826 LSQ_DEBUG("delayed stream creation. Backlog size: %u", 827 conn->fc_n_delayed_streams); 828 } 829} 830 831 832static lsquic_stream_t * 833find_stream_by_id (struct full_conn *conn, uint32_t stream_id) 834{ 835 struct lsquic_hash_elem *el; 836 el = lsquic_hash_find(conn->fc_streams, &stream_id, sizeof(stream_id)); 837 if (el) 838 return lsquic_hashelem_getdata(el); 839 else 840 return NULL; 841} 842 843 844lsquic_stream_t * 845lsquic_conn_get_stream_by_id (lsquic_conn_t *lconn, uint32_t stream_id) 846{ 847 struct full_conn *conn = (struct full_conn *) lconn; 848 return find_stream_by_id(conn, stream_id); 849} 850 851 852static unsigned 853count_zero_bytes (const unsigned char *p, size_t len) 854{ 855 const unsigned char *const end = p + len; 856 while (p < end && 0 == *p) 857 ++p; 858 return len - (end - p); 859} 860 861 862static unsigned 863process_padding_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 864 const unsigned char *p, size_t len) 865{ 866 if (conn->fc_conn.cn_version >= LSQVER_038) 867 return count_zero_bytes(p, len); 868 if (lsquic_is_zero(p, len)) 869 { 870 EV_LOG_PADDING_FRAME_IN(LSQUIC_LOG_CONN_ID, len); 871 return len; 872 } 873 else 874 return 0; 875} 876 877 878static unsigned 879process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 880 const unsigned char *p, size_t len) 881{ /* This frame causes ACK frame to be queued, but nothing to do here; 882 * return the length of this frame. 883 */ 884 EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID); 885 LSQ_DEBUG("received PING"); 886 return 1; 887} 888 889 890static int 891is_peer_initiated (const struct full_conn *conn, uint32_t stream_id) 892{ 893 unsigned is_server = !!(conn->fc_flags & FC_SERVER); 894 int peer_initiated = (stream_id & 1) == is_server; 895 return peer_initiated; 896} 897 898 899static unsigned 900process_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 901 const unsigned char *p, size_t len) 902{ 903 stream_frame_t *stream_frame; 904 lsquic_stream_t *stream; 905 int parsed_len; 906 907 stream_frame = lsquic_malo_get(conn->fc_pub.mm->malo.stream_frame); 908 if (!stream_frame) 909 { 910 LSQ_WARN("could not allocate stream frame: %s", strerror(errno)); 911 return 0; 912 } 913 914 parsed_len = conn->fc_conn.cn_pf->pf_parse_stream_frame(p, len, 915 stream_frame); 916 if (parsed_len < 0) { 917 lsquic_malo_put(stream_frame); 918 return 0; 919 } 920 EV_LOG_STREAM_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_frame); 921 LSQ_DEBUG("Got stream frame for stream #%u", stream_frame->stream_id); 922 if (conn->fc_flags & FC_CLOSING) 923 { 924 LSQ_DEBUG("Connection closing: ignore frame"); 925 lsquic_malo_put(stream_frame); 926 return parsed_len; 927 } 928 929 stream = find_stream_by_id(conn, stream_frame->stream_id); 930 if (!stream) 931 { 932 if (conn_is_stream_closed(conn, stream_frame->stream_id)) 933 { 934 LSQ_DEBUG("drop frame for closed stream %u", stream_frame->stream_id); 935 lsquic_malo_put(stream_frame); 936 return parsed_len; 937 } 938 if (is_peer_initiated(conn, stream_frame->stream_id)) 939 { 940 unsigned in_count = count_streams(conn, 1); 941 LSQ_DEBUG("number of peer-initiated streams: %u", in_count); 942 if (in_count >= conn->fc_cfg.max_streams_in) 943 { 944 ABORT_ERROR("incoming stream would exceed limit: %u", 945 conn->fc_cfg.max_streams_in); 946 lsquic_malo_put(stream_frame); 947 return 0; 948 } 949 if ((conn->fc_flags & FC_GOING_AWAY) && 950 stream_frame->stream_id > conn->fc_max_peer_stream_id) 951 { 952 LSQ_DEBUG("going away: drop frame for new stream %u", 953 stream_frame->stream_id); 954 lsquic_malo_put(stream_frame); 955 return parsed_len; 956 } 957 } 958 else 959 { 960 ABORT_ERROR("frame for never-initiated stream"); 961 lsquic_malo_put(stream_frame); 962 return 0; 963 } 964 stream = new_stream(conn, stream_frame->stream_id); 965 if (!stream) 966 { 967 ABORT_ERROR("cannot create new stream: %s", strerror(errno)); 968 lsquic_malo_put(stream_frame); 969 return 0; 970 } 971 if (stream_frame->stream_id > conn->fc_max_peer_stream_id) 972 conn->fc_max_peer_stream_id = stream_frame->stream_id; 973 } 974 975 stream_frame->packet_in = lsquic_packet_in_get(packet_in); 976 if (0 != lsquic_stream_frame_in(stream, stream_frame)) 977 { 978 ABORT_ERROR("cannot insert stream frame"); 979 return 0; 980 } 981 982 return parsed_len; 983} 984 985 986static unsigned 987process_invalid_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 988 const unsigned char *p, size_t len) 989{ 990 ABORT_ERROR("invalid frame"); 991 return 0; 992} 993 994 995/* Reset locally-initiated streams whose IDs is larger than the stream ID 996 * specified in received GOAWAY frame. 997 */ 998static void 999reset_local_streams_over_goaway (struct full_conn *conn) 1000{ 1001 const unsigned is_server = !!(conn->fc_flags & FC_SERVER); 1002 lsquic_stream_t *stream; 1003 struct lsquic_hash_elem *el; 1004 1005 for (el = lsquic_hash_first(conn->fc_streams); el; 1006 el = lsquic_hash_next(conn->fc_streams)) 1007 { 1008 stream = lsquic_hashelem_getdata(el); 1009 if (stream->id > conn->fc_goaway_stream_id && 1010 ((stream->id & 1) ^ is_server /* Locally initiated? */)) 1011 { 1012 lsquic_stream_received_goaway(stream); 1013 } 1014 } 1015} 1016 1017 1018static unsigned 1019process_goaway_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1020 const unsigned char *p, size_t len) 1021{ 1022 uint32_t error_code, stream_id; 1023 uint16_t reason_length; 1024 const char *reason; 1025 const int parsed_len = conn->fc_conn.cn_pf->pf_parse_goaway_frame(p, len, 1026 &error_code, &stream_id, &reason_length, &reason); 1027 if (parsed_len < 0) 1028 return 0; 1029 EV_LOG_GOAWAY_FRAME_IN(LSQUIC_LOG_CONN_ID, error_code, stream_id, 1030 reason_length, reason); 1031 LSQ_DEBUG("received GOAWAY frame, last good stream ID: %u, error code: 0x%X," 1032 " reason: `%.*s'", stream_id, error_code, reason_length, reason); 1033 if (0 == (conn->fc_conn.cn_flags & LSCONN_PEER_GOING_AWAY)) 1034 { 1035 conn->fc_conn.cn_flags |= LSCONN_PEER_GOING_AWAY; 1036 conn->fc_goaway_stream_id = stream_id; 1037 if (conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_goaway_received) 1038 { 1039 LSQ_DEBUG("calling on_goaway_received"); 1040 conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_goaway_received( 1041 &conn->fc_conn); 1042 } 1043 else 1044 LSQ_DEBUG("on_goaway_received not registered"); 1045 reset_local_streams_over_goaway(conn); 1046 } 1047 else 1048 LSQ_DEBUG("ignore duplicate GOAWAY frame"); 1049 return parsed_len; 1050} 1051 1052 1053static void 1054log_invalid_ack_frame (struct full_conn *conn, const unsigned char *p, 1055 int parsed_len, const struct ack_info *acki) 1056{ 1057 char *buf; 1058 size_t sz; 1059 1060 buf = malloc(0x1000); 1061 if (buf) 1062 { 1063 lsquic_senhist_tostr(&conn->fc_send_ctl.sc_senhist, buf, 0x1000); 1064 LSQ_WARN("send history: %s", buf); 1065 hexdump(p, parsed_len, buf, 0x1000); 1066 LSQ_WARN("raw ACK frame:\n%s", buf); 1067 free(buf); 1068 } 1069 else 1070 LSQ_WARN("malloc failed"); 1071 1072 buf = acki2str(acki, &sz); 1073 if (buf) 1074 { 1075 LSQ_WARN("parsed ACK frame: %.*s", (int) sz, buf); 1076 free(buf); 1077 } 1078 else 1079 LSQ_WARN("malloc failed"); 1080} 1081 1082 1083static unsigned 1084process_ack_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1085 const unsigned char *p, size_t len) 1086{ 1087 const int parsed_len = conn->fc_conn.cn_pf->pf_parse_ack_frame(p, len, 1088 conn->fc_pub.mm->acki); 1089 if (parsed_len < 0) 1090 return 0; 1091 if (packet_in->pi_packno > conn->fc_max_ack_packno) 1092 { 1093 EV_LOG_ACK_FRAME_IN(LSQUIC_LOG_CONN_ID, conn->fc_pub.mm->acki); 1094 if (0 == lsquic_send_ctl_got_ack(&conn->fc_send_ctl, 1095 conn->fc_pub.mm->acki, packet_in->pi_received)) 1096 { 1097 conn->fc_max_ack_packno = packet_in->pi_packno; 1098 if (lsquic_send_ctl_largest_ack2ed(&conn->fc_send_ctl)) 1099 lsquic_rechist_stop_wait(&conn->fc_rechist, 1100 lsquic_send_ctl_largest_ack2ed(&conn->fc_send_ctl) + 1); 1101 } 1102 else 1103 { 1104 log_invalid_ack_frame(conn, p, parsed_len, conn->fc_pub.mm->acki); 1105 ABORT_ERROR("Received invalid ACK"); 1106 } 1107 } 1108 else 1109 LSQ_DEBUG("Ignore old ack (max %"PRIu64")", conn->fc_max_ack_packno); 1110 return parsed_len; 1111} 1112 1113 1114static unsigned 1115process_stop_waiting_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1116 const unsigned char *p, size_t len) 1117{ 1118 lsquic_packno_t least, cutoff; 1119 enum lsquic_packno_bits bits; 1120 int parsed_len; 1121 1122 bits = lsquic_packet_in_packno_bits(packet_in); 1123 1124 if (conn->fc_flags & FC_NSTP) 1125 { 1126 LSQ_DEBUG("NSTP on: ignore STOP_WAITING frame"); 1127 parsed_len = conn->fc_conn.cn_pf->pf_skip_stop_waiting_frame(len, bits); 1128 if (parsed_len > 0) 1129 return (unsigned) parsed_len; 1130 else 1131 return 0; 1132 } 1133 1134 parsed_len = conn->fc_conn.cn_pf->pf_parse_stop_waiting_frame(p, len, 1135 packet_in->pi_packno, bits, &least); 1136 if (parsed_len < 0) 1137 return 0; 1138 1139 if (packet_in->pi_packno <= conn->fc_max_swf_packno) 1140 { 1141 LSQ_DEBUG("ignore old STOP_WAITING frame"); 1142 return parsed_len; 1143 } 1144 1145 LSQ_DEBUG("Got STOP_WAITING frame, least unacked: %"PRIu64, least); 1146 EV_LOG_STOP_WAITING_FRAME_IN(LSQUIC_LOG_CONN_ID, least); 1147 1148 if (least > packet_in->pi_packno) 1149 { 1150 ABORT_ERROR("received invalid STOP_WAITING: %"PRIu64" is larger " 1151 "than the packet number%"PRIu64, least, packet_in->pi_packno); 1152 return 0; 1153 } 1154 1155 cutoff = lsquic_rechist_cutoff(&conn->fc_rechist); 1156 if (cutoff && least < cutoff) 1157 { 1158 ABORT_ERROR("received invalid STOP_WAITING: %"PRIu64" is smaller " 1159 "than the cutoff %"PRIu64, least, cutoff); 1160 return 0; 1161 } 1162 1163 conn->fc_max_swf_packno = packet_in->pi_packno; 1164 lsquic_rechist_stop_wait(&conn->fc_rechist, least); 1165 return parsed_len; 1166} 1167 1168 1169static unsigned 1170process_blocked_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1171 const unsigned char *p, size_t len) 1172{ 1173 uint32_t stream_id; 1174 const int parsed_len = conn->fc_conn.cn_pf->pf_parse_blocked_frame(p, len, 1175 &stream_id); 1176 if (parsed_len < 0) 1177 return 0; 1178 EV_LOG_BLOCKED_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id); 1179 LSQ_DEBUG("Peer reports stream %u as blocked", stream_id); 1180 return parsed_len; 1181} 1182 1183 1184static unsigned 1185process_connection_close_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1186 const unsigned char *p, size_t len) 1187{ 1188 lsquic_stream_t *stream; 1189 struct lsquic_hash_elem *el; 1190 uint32_t error_code; 1191 uint16_t reason_len; 1192 uint8_t reason_off; 1193 int parsed_len; 1194 1195 parsed_len = conn->fc_conn.cn_pf->pf_parse_connect_close_frame(p, len, 1196 &error_code, &reason_len, &reason_off); 1197 if (parsed_len < 0) 1198 return 0; 1199 EV_LOG_CONNECTION_CLOSE_FRAME_IN(LSQUIC_LOG_CONN_ID, error_code, 1200 (int) reason_len, (const char *) p + reason_off); 1201 LSQ_INFO("Received CONNECTION_CLOSE frame (code: %u; reason: %.*s)", 1202 error_code, (int) reason_len, (const char *) p + reason_off); 1203 conn->fc_flags |= FC_RECV_CLOSE; 1204 if (!(conn->fc_flags & FC_CLOSING)) 1205 { 1206 for (el = lsquic_hash_first(conn->fc_streams); el; 1207 el = lsquic_hash_next(conn->fc_streams)) 1208 { 1209 stream = lsquic_hashelem_getdata(el); 1210 lsquic_stream_shutdown_internal(stream); 1211 } 1212 conn->fc_flags |= FC_CLOSING; 1213 } 1214 return parsed_len; 1215} 1216 1217 1218static unsigned 1219process_rst_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1220 const unsigned char *p, size_t len) 1221{ 1222 uint32_t stream_id, error_code; 1223 uint64_t offset; 1224 lsquic_stream_t *stream; 1225 const int parsed_len = conn->fc_conn.cn_pf->pf_parse_rst_frame(p, len, 1226 &stream_id, &offset, &error_code); 1227 if (parsed_len < 0) 1228 return 0; 1229 1230 EV_LOG_RST_STREAM_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id, offset, 1231 error_code); 1232 LSQ_DEBUG("Got RST_STREAM; stream: %u; offset: 0x%"PRIX64, stream_id, 1233 offset); 1234 if (0 == stream_id) 1235 { /* Follow reference implementation and ignore this apparently 1236 * invalid frame. 1237 */ 1238 return parsed_len; 1239 } 1240 1241 if (LSQUIC_STREAM_HANDSHAKE == stream_id || 1242 ((conn->fc_flags & FC_HTTP) && LSQUIC_STREAM_HEADERS == stream_id)) 1243 { 1244 ABORT_ERROR("received reset on static stream %u", stream_id); 1245 return 0; 1246 } 1247 1248 stream = find_stream_by_id(conn, stream_id); 1249 if (!stream) 1250 { 1251 if (conn_is_stream_closed(conn, stream_id)) 1252 { 1253 LSQ_DEBUG("got reset frame for closed stream %u", stream_id); 1254 return parsed_len; 1255 } 1256 if (!is_peer_initiated(conn, stream_id)) 1257 { 1258 ABORT_ERROR("received reset for never-initiated stream %u", 1259 stream_id); 1260 return 0; 1261 } 1262 stream = new_stream(conn, stream_id); 1263 if (!stream) 1264 { 1265 ABORT_ERROR("cannot create new stream: %s", strerror(errno)); 1266 return 0; 1267 } 1268 if (stream_id > conn->fc_max_peer_stream_id) 1269 conn->fc_max_peer_stream_id = stream_id; 1270 } 1271 1272 if (0 != lsquic_stream_rst_in(stream, offset, error_code)) 1273 { 1274 ABORT_ERROR("received invalid RST_STREAM"); 1275 return 0; 1276 } 1277 lsquic_send_ctl_reset_stream(&conn->fc_send_ctl, stream_id); 1278 return parsed_len; 1279} 1280 1281 1282static unsigned 1283process_window_update_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1284 const unsigned char *p, size_t len) 1285{ 1286 uint32_t stream_id; 1287 uint64_t offset; 1288 const int parsed_len = 1289 conn->fc_conn.cn_pf->pf_parse_window_update_frame(p, len, 1290 &stream_id, &offset); 1291 if (parsed_len < 0) 1292 return 0; 1293 EV_LOG_WINDOW_UPDATE_FRAME_IN(LSQUIC_LOG_CONN_ID, stream_id, offset); 1294 if (stream_id) 1295 { 1296 lsquic_stream_t *stream = find_stream_by_id(conn, stream_id); 1297 if (stream) 1298 { 1299 LSQ_DEBUG("Got window update frame, stream: %u; offset: 0x%"PRIX64, 1300 stream_id, offset); 1301 lsquic_stream_window_update(stream, offset); 1302 } 1303 else /* Perhaps a result of lost packets? */ 1304 LSQ_DEBUG("Got window update frame for non-existing stream %u " 1305 "(offset: 0x%"PRIX64")", stream_id, offset); 1306 } 1307 else if (offset > conn->fc_pub.conn_cap.cc_max) 1308 { 1309 conn->fc_pub.conn_cap.cc_max = offset; 1310 assert(conn->fc_pub.conn_cap.cc_max >= conn->fc_pub.conn_cap.cc_sent); 1311 LSQ_DEBUG("Connection WUF, new offset 0x%"PRIX64, offset); 1312 } 1313 else 1314 LSQ_DEBUG("Throw ouw duplicate connection WUF"); 1315 return parsed_len; 1316} 1317 1318 1319typedef unsigned (*process_frame_f)( 1320 struct full_conn *, lsquic_packet_in_t *, const unsigned char *p, size_t); 1321 1322static process_frame_f const process_frames[N_QUIC_FRAMES] = 1323{ 1324 [QUIC_FRAME_ACK] = process_ack_frame, 1325 [QUIC_FRAME_BLOCKED] = process_blocked_frame, 1326 [QUIC_FRAME_CONNECTION_CLOSE] = process_connection_close_frame, 1327 [QUIC_FRAME_GOAWAY] = process_goaway_frame, 1328 [QUIC_FRAME_INVALID] = process_invalid_frame, 1329 [QUIC_FRAME_PADDING] = process_padding_frame, 1330 [QUIC_FRAME_PING] = process_ping_frame, 1331 [QUIC_FRAME_RST_STREAM] = process_rst_stream_frame, 1332 [QUIC_FRAME_STOP_WAITING] = process_stop_waiting_frame, 1333 [QUIC_FRAME_STREAM] = process_stream_frame, 1334 [QUIC_FRAME_WINDOW_UPDATE] = process_window_update_frame, 1335}; 1336 1337static unsigned 1338process_packet_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, 1339 const unsigned char *p, size_t len) 1340{ 1341 enum QUIC_FRAME_TYPE type = conn->fc_conn.cn_pf->pf_parse_frame_type(p[0]); 1342 packet_in->pi_frame_types |= 1 << type; 1343 return process_frames[type](conn, packet_in, p, len); 1344} 1345 1346 1347static void 1348process_ver_neg_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1349{ 1350 int s; 1351 struct ver_iter vi; 1352 lsquic_ver_tag_t ver_tag; 1353 enum lsquic_version version; 1354 unsigned versions = 0; 1355 1356 LSQ_DEBUG("Processing version-negotiation packet"); 1357 1358 if (conn->fc_ver_neg.vn_state != VN_START) 1359 { 1360 LSQ_DEBUG("ignore a likely duplicate version negotiation packet"); 1361 return; 1362 } 1363 1364 for (s = packet_in_ver_first(packet_in, &vi, &ver_tag); s; 1365 s = packet_in_ver_next(&vi, &ver_tag)) 1366 { 1367 version = lsquic_tag2ver(ver_tag); 1368 if (version < N_LSQVER) 1369 { 1370 versions |= 1 << version; 1371 LSQ_DEBUG("server supports version %s", lsquic_ver2str[version]); 1372 } 1373 } 1374 1375 if (versions & (1 << conn->fc_ver_neg.vn_ver)) 1376 { 1377 ABORT_ERROR("server replied with version we support: %s", 1378 lsquic_ver2str[conn->fc_ver_neg.vn_ver]); 1379 return; 1380 } 1381 1382 versions &= conn->fc_ver_neg.vn_supp; 1383 if (0 == versions) 1384 { 1385 ABORT_ERROR("client does not support any of the server-specified " 1386 "versions"); 1387 return; 1388 } 1389 1390 set_versions(conn, versions); 1391 conn->fc_ver_neg.vn_state = VN_IN_PROGRESS; 1392 lsquic_send_ctl_expire_all(&conn->fc_send_ctl); 1393} 1394 1395 1396static void 1397reconstruct_packet_number (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1398{ 1399 lsquic_packno_t cur_packno, max_packno; 1400 enum lsquic_packno_bits bits; 1401 1402 cur_packno = packet_in->pi_packno; 1403 max_packno = lsquic_rechist_largest_packno(&conn->fc_rechist); 1404 bits = lsquic_packet_in_packno_bits(packet_in); 1405 packet_in->pi_packno = restore_packno(cur_packno, bits, max_packno); 1406 LSQ_DEBUG("reconstructed (bits: %u, packno: %"PRIu64", max: %"PRIu64") " 1407 "to %"PRIu64"", bits, cur_packno, max_packno, packet_in->pi_packno); 1408} 1409 1410 1411static int 1412conn_decrypt_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1413{ 1414 return lsquic_conn_decrypt_packet(&conn->fc_conn, conn->fc_enpub, 1415 packet_in); 1416} 1417 1418 1419static void 1420parse_regular_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1421{ 1422 const unsigned char *p, *pend; 1423 unsigned len; 1424 1425 p = packet_in->pi_data + packet_in->pi_header_sz; 1426 pend = packet_in->pi_data + packet_in->pi_data_sz; 1427 1428 while (p < pend) 1429 { 1430 len = process_packet_frame(conn, packet_in, p, pend - p); 1431 if (len > 0) 1432 p += len; 1433 else 1434 { 1435 ABORT_ERROR("Error parsing frame"); 1436 break; 1437 } 1438 } 1439} 1440 1441 1442static int 1443process_regular_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1444{ 1445 enum received_st st; 1446 int was_missing; 1447 1448 reconstruct_packet_number(conn, packet_in); 1449 EV_LOG_PACKET_IN(LSQUIC_LOG_CONN_ID, packet_in); 1450 1451#if FULL_CONN_STATS 1452 ++conn->fc_stats.n_all_packets_in; 1453#endif 1454 1455 /* The packet is decrypted before receive history is updated. This is 1456 * done to make sure that a bad packet won't occupy a slot in receive 1457 * history and subsequent good packet won't be marked as a duplicate. 1458 */ 1459 if (0 == (packet_in->pi_flags & PI_DECRYPTED) && 1460 0 != conn_decrypt_packet(conn, packet_in)) 1461 { 1462 LSQ_INFO("could not decrypt packet"); 1463#if FULL_CONN_STATS 1464 ++conn->fc_stats.n_undec_packets; 1465#endif 1466 return 0; 1467 } 1468 1469 st = lsquic_rechist_received(&conn->fc_rechist, packet_in->pi_packno, 1470 packet_in->pi_received); 1471 switch (st) { 1472 case REC_ST_OK: 1473 parse_regular_packet(conn, packet_in); 1474 if (0 == (conn->fc_flags & FC_ACK_QUEUED)) 1475 { 1476 was_missing = packet_in->pi_packno != 1477 lsquic_rechist_largest_packno(&conn->fc_rechist); 1478 conn->fc_n_slack_all += 1; 1479 conn->fc_n_slack_akbl += 1480 !!(packet_in->pi_frame_types & QFRAME_ACKABLE_MASK); 1481 try_queueing_ack(conn, was_missing, packet_in->pi_received); 1482 } 1483 return 0; 1484 case REC_ST_DUP: 1485#if FULL_CONN_STATS 1486 ++conn->fc_stats.n_dup_packets; 1487#endif 1488 LSQ_INFO("packet %"PRIu64" is a duplicate", packet_in->pi_packno); 1489 return 0; 1490 default: 1491 assert(0); 1492 /* Fall through */ 1493 case REC_ST_ERR: 1494#if FULL_CONN_STATS 1495 ++conn->fc_stats.n_err_packets; 1496#endif 1497 LSQ_INFO("error processing packet %"PRIu64, packet_in->pi_packno); 1498 return -1; 1499 } 1500} 1501 1502 1503static int 1504process_incoming_packet (struct full_conn *conn, lsquic_packet_in_t *packet_in) 1505{ 1506 LSQ_DEBUG("Processing packet %"PRIu64, packet_in->pi_packno); 1507 /* See flowchart in Section 4.1 of [draft-ietf-quic-transport-00]. We test 1508 * for the common case first. 1509 */ 1510 const unsigned flags = lsquic_packet_in_public_flags(packet_in); 1511 if (0 == (flags & (PACKET_PUBLIC_FLAGS_RST|PACKET_PUBLIC_FLAGS_VERSION))) 1512 { 1513 if (conn->fc_ver_neg.vn_tag) 1514 { 1515 assert(conn->fc_ver_neg.vn_state != VN_END); 1516 conn->fc_ver_neg.vn_state = VN_END; 1517 conn->fc_ver_neg.vn_tag = NULL; 1518 conn->fc_conn.cn_version = conn->fc_ver_neg.vn_ver; 1519 conn->fc_conn.cn_flags |= LSCONN_VER_SET; 1520 if (conn->fc_conn.cn_version >= LSQVER_037) 1521 { 1522 assert(!(conn->fc_flags & FC_NSTP)); /* This bit off at start */ 1523 if (conn->fc_settings->es_support_nstp) 1524 { 1525 conn->fc_flags |= FC_NSTP; 1526 lsquic_send_ctl_turn_nstp_on(&conn->fc_send_ctl); 1527 } 1528 } 1529 LSQ_DEBUG("end of version negotiation: agreed upon %s", 1530 lsquic_ver2str[conn->fc_ver_neg.vn_ver]); 1531 } 1532 return process_regular_packet(conn, packet_in); 1533 } 1534 else if (flags & PACKET_PUBLIC_FLAGS_RST) 1535 { 1536 LSQ_INFO("received public reset packet: aborting connection"); 1537 conn->fc_flags |= FC_GOT_PRST; 1538 return -1; 1539 } 1540 else 1541 { 1542 if (conn->fc_flags & FC_SERVER) 1543 return process_regular_packet(conn, packet_in); 1544 else if (conn->fc_ver_neg.vn_tag) 1545 { 1546 process_ver_neg_packet(conn, packet_in); 1547 return 0; 1548 } 1549 else 1550 { 1551 LSQ_DEBUG("unexpected version negotiation packet: ignore it"); 1552 return 0; 1553 } 1554 } 1555} 1556 1557 1558static void 1559idle_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now) 1560{ 1561 struct full_conn *conn = ctx; 1562 LSQ_DEBUG("connection timed out"); 1563 conn->fc_flags |= FC_TIMED_OUT; 1564} 1565 1566 1567static void 1568handshake_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now) 1569{ 1570 struct full_conn *conn = ctx; 1571 LSQ_DEBUG("connection timed out: handshake timed out"); 1572 conn->fc_flags |= FC_TIMED_OUT; 1573} 1574 1575 1576static void 1577ping_alarm_expired (void *ctx, lsquic_time_t expiry, lsquic_time_t now) 1578{ 1579 struct full_conn *conn = ctx; 1580 LSQ_DEBUG("Ping alarm rang: schedule PING frame to be generated"); 1581 conn->fc_flags |= FC_SEND_PING; 1582} 1583 1584 1585static void 1586zero_pad_packet (lsquic_packet_out_t *packet_out) 1587{ 1588 memset(packet_out->po_data + packet_out->po_data_sz, 0, 1589 packet_out->po_n_alloc - packet_out->po_data_sz); 1590 packet_out->po_data_sz = packet_out->po_n_alloc; 1591 packet_out->po_frame_types |= 1 << QUIC_FRAME_PADDING; 1592} 1593 1594 1595#define MAX_STREAM_FRAME_HEADER (1 + 4 + 8 + 2) 1596#define MIN_STREAM_FRAME_PAYLOAD 10 /* Some sane value */ 1597 1598#define MIN(a, b) ((a) < (b) ? (a) : (b)) 1599 1600 1601static lsquic_packet_out_t * 1602get_writeable_packet (struct full_conn *conn, unsigned need_at_least) 1603{ 1604 lsquic_packet_out_t *packet_out; 1605 int is_err; 1606 1607 assert(need_at_least <= QUIC_MAX_PAYLOAD_SZ); 1608 packet_out = lsquic_send_ctl_get_writeable_packet(&conn->fc_send_ctl, 1609 need_at_least, &is_err); 1610 if (!packet_out && is_err) 1611 ABORT_ERROR("cannot allocate packet: %s", strerror(errno)); 1612 return packet_out; 1613} 1614 1615 1616static int 1617generate_wuf_stream (struct full_conn *conn, lsquic_stream_t *stream) 1618{ 1619 lsquic_packet_out_t *packet_out = get_writeable_packet(conn, QUIC_WUF_SZ); 1620 if (!packet_out) 1621 return 0; 1622 const uint64_t recv_off = lsquic_stream_fc_recv_off(stream); 1623 int sz = conn->fc_conn.cn_pf->pf_gen_window_update_frame( 1624 packet_out->po_data + packet_out->po_data_sz, 1625 lsquic_packet_out_avail(packet_out), stream->id, recv_off); 1626 if (sz < 0) { 1627 ABORT_ERROR("gen_window_update_frame failed"); 1628 return 0; 1629 } 1630 packet_out->po_data_sz += sz; 1631 packet_out->po_frame_types |= 1 << QUIC_FRAME_WINDOW_UPDATE; 1632 LSQ_DEBUG("wrote WUF: stream %u; offset 0x%"PRIX64, stream->id, recv_off); 1633 return 1; 1634} 1635 1636 1637static void 1638generate_wuf_conn (struct full_conn *conn) 1639{ 1640 assert(conn->fc_flags & FC_SEND_WUF); 1641 lsquic_packet_out_t *packet_out = get_writeable_packet(conn, QUIC_WUF_SZ); 1642 if (!packet_out) 1643 return; 1644 const uint64_t recv_off = lsquic_cfcw_get_fc_recv_off(&conn->fc_pub.cfcw); 1645 int sz = conn->fc_conn.cn_pf->pf_gen_window_update_frame( 1646 packet_out->po_data + packet_out->po_data_sz, 1647 lsquic_packet_out_avail(packet_out), 0, recv_off); 1648 if (sz < 0) { 1649 ABORT_ERROR("gen_window_update_frame failed"); 1650 return; 1651 } 1652 packet_out->po_data_sz += sz; 1653 packet_out->po_frame_types |= 1 << QUIC_FRAME_WINDOW_UPDATE; 1654 conn->fc_flags &= ~FC_SEND_WUF; 1655 LSQ_DEBUG("wrote connection WUF: offset 0x%"PRIX64, recv_off); 1656} 1657 1658 1659static void 1660generate_goaway_frame (struct full_conn *conn) 1661{ 1662 int reason_len = 0; 1663 lsquic_packet_out_t *packet_out = 1664 get_writeable_packet(conn, QUIC_GOAWAY_FRAME_SZ + reason_len); 1665 if (!packet_out) 1666 return; 1667 int sz = conn->fc_conn.cn_pf->pf_gen_goaway_frame( 1668 packet_out->po_data + packet_out->po_data_sz, 1669 lsquic_packet_out_avail(packet_out), 0, conn->fc_max_peer_stream_id, 1670 NULL, reason_len); 1671 if (sz < 0) { 1672 ABORT_ERROR("gen_goaway_frame failed"); 1673 return; 1674 } 1675 packet_out->po_data_sz += sz; 1676 packet_out->po_frame_types |= 1 << QUIC_FRAME_GOAWAY; 1677 conn->fc_flags &= ~FC_SEND_GOAWAY; 1678 conn->fc_flags |= FC_GOAWAY_SENT; 1679 LSQ_DEBUG("wrote GOAWAY frame: stream id: %u", conn->fc_max_peer_stream_id); 1680} 1681 1682 1683static void 1684generate_connection_close_packet (struct full_conn *conn) 1685{ 1686 lsquic_packet_out_t *packet_out; 1687 1688 packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0); 1689 if (!packet_out) 1690 { 1691 ABORT_ERROR("cannot allocate packet: %s", strerror(errno)); 1692 return; 1693 } 1694 1695 lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out); 1696 int sz = conn->fc_conn.cn_pf->pf_gen_connect_close_frame(packet_out->po_data + packet_out->po_data_sz, 1697 lsquic_packet_out_avail(packet_out), 16 /* PEER_GOING_AWAY */, 1698 NULL, 0); 1699 if (sz < 0) { 1700 ABORT_ERROR("generate_connection_close_packet failed"); 1701 return; 1702 } 1703 packet_out->po_data_sz += sz; 1704 packet_out->po_frame_types |= 1 << QUIC_FRAME_CONNECTION_CLOSE; 1705 LSQ_DEBUG("generated CONNECTION_CLOSE frame in its own packet"); 1706} 1707 1708 1709static int 1710generate_blocked_frame (struct full_conn *conn, uint32_t stream_id) 1711{ 1712 lsquic_packet_out_t *packet_out = 1713 get_writeable_packet(conn, QUIC_BLOCKED_FRAME_SZ); 1714 if (!packet_out) 1715 return 0; 1716 int sz = conn->fc_conn.cn_pf->pf_gen_blocked_frame( 1717 packet_out->po_data + packet_out->po_data_sz, 1718 lsquic_packet_out_avail(packet_out), stream_id); 1719 if (sz < 0) { 1720 ABORT_ERROR("gen_blocked_frame failed"); 1721 return 0; 1722 } 1723 packet_out->po_data_sz += sz; 1724 packet_out->po_frame_types |= 1 << QUIC_FRAME_BLOCKED; 1725 LSQ_DEBUG("wrote blocked frame: stream %u", stream_id); 1726 return 1; 1727} 1728 1729 1730static int 1731generate_stream_blocked_frame (struct full_conn *conn, lsquic_stream_t *stream) 1732{ 1733 if (generate_blocked_frame(conn, stream->id)) 1734 { 1735 lsquic_stream_blocked_frame_sent(stream); 1736 return 1; 1737 } 1738 else 1739 return 0; 1740} 1741 1742 1743static int 1744generate_rst_stream_frame (struct full_conn *conn, lsquic_stream_t *stream) 1745{ 1746 lsquic_packet_out_t *packet_out; 1747 int sz, s; 1748 1749 packet_out = get_writeable_packet(conn, QUIC_RST_STREAM_SZ); 1750 if (!packet_out) 1751 return 0; 1752 sz = conn->fc_conn.cn_pf->pf_gen_rst_frame( 1753 packet_out->po_data + packet_out->po_data_sz, 1754 lsquic_packet_out_avail(packet_out), stream->id, 1755 stream->tosend_off, stream->error_code); 1756 if (sz < 0) { 1757 ABORT_ERROR("gen_rst_frame failed"); 1758 return 0; 1759 } 1760 packet_out->po_data_sz += sz; 1761 packet_out->po_frame_types |= 1 << QUIC_FRAME_RST_STREAM; 1762 s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream, 1763 QUIC_FRAME_RST_STREAM, 0); 1764 if (s != 0) 1765 { 1766 ABORT_ERROR("adding stream to packet failed: %s", strerror(errno)); 1767 return 0; 1768 } 1769 lsquic_stream_rst_frame_sent(stream); 1770 LSQ_DEBUG("wrote RST: stream %u; offset 0x%"PRIX64"; error code 0x%X", 1771 stream->id, stream->tosend_off, stream->error_code); 1772 return 1; 1773} 1774 1775 1776static void 1777generate_ping_frame (struct full_conn *conn) 1778{ 1779 lsquic_packet_out_t *packet_out = get_writeable_packet(conn, 1); 1780 if (!packet_out) 1781 { 1782 LSQ_DEBUG("cannot get writeable packet for PING frame"); 1783 return; 1784 } 1785 int sz = conn->fc_conn.cn_pf->pf_gen_ping_frame( 1786 packet_out->po_data + packet_out->po_data_sz, 1787 lsquic_packet_out_avail(packet_out)); 1788 if (sz < 0) { 1789 ABORT_ERROR("gen_blocked_frame failed"); 1790 return; 1791 } 1792 packet_out->po_data_sz += sz; 1793 packet_out->po_frame_types |= 1 << QUIC_FRAME_PING; 1794 LSQ_DEBUG("wrote PING frame"); 1795} 1796 1797 1798static void 1799generate_stop_waiting_frame (struct full_conn *conn) 1800{ 1801 assert(conn->fc_flags & FC_SEND_STOP_WAITING); 1802 1803 int sz; 1804 unsigned packnum_len; 1805 lsquic_packno_t least_unacked; 1806 lsquic_packet_out_t *packet_out; 1807 1808 /* Get packet that has room for the minimum size STOP_WAITING frame: */ 1809 packet_out = get_writeable_packet(conn, 1 + packno_bits2len(PACKNO_LEN_1)); 1810 if (!packet_out) 1811 return; 1812 1813 /* Now calculate number of bytes we really need. If there is not enough 1814 * room in the current packet, get a new one. 1815 */ 1816 packnum_len = packno_bits2len(lsquic_packet_out_packno_bits(packet_out)); 1817 if ((unsigned) lsquic_packet_out_avail(packet_out) < 1 + packnum_len) 1818 { 1819 packet_out = get_writeable_packet(conn, 1 + packnum_len); 1820 if (!packet_out) 1821 return; 1822 /* Here, a new packet has been allocated, The number of bytes needed 1823 * to represent packet number in the STOP_WAITING frame may have 1824 * increased. However, this does not matter, because the newly 1825 * allocated packet must have room for a STOP_WAITING frame of any 1826 * size. 1827 */ 1828 } 1829 1830 least_unacked = lsquic_send_ctl_smallest_unacked(&conn->fc_send_ctl); 1831 sz = conn->fc_conn.cn_pf->pf_gen_stop_waiting_frame( 1832 packet_out->po_data + packet_out->po_data_sz, 1833 lsquic_packet_out_avail(packet_out), packet_out->po_packno, 1834 lsquic_packet_out_packno_bits(packet_out), least_unacked); 1835 if (sz < 0) { 1836 ABORT_ERROR("gen_stop_waiting_frame failed"); 1837 return; 1838 } 1839 packet_out->po_data_sz += sz; 1840 packet_out->po_regen_sz += sz; 1841 packet_out->po_frame_types |= 1 << QUIC_FRAME_STOP_WAITING; 1842 conn->fc_flags &= ~FC_SEND_STOP_WAITING; 1843 LSQ_DEBUG("wrote STOP_WAITING frame: least unacked: %"PRIu64, 1844 least_unacked); 1845 EV_LOG_GENERATED_STOP_WAITING_FRAME(LSQUIC_LOG_CONN_ID, least_unacked); 1846} 1847 1848 1849static int 1850generate_stream_frame (struct full_conn *conn, lsquic_stream_t *stream) 1851{ 1852 lsquic_packet_out_t *packet_out; 1853 size_t n_to_send, need_at_least; 1854 int len, s; 1855 unsigned short off; 1856 1857 /* Send one packet's worth */ 1858 n_to_send = lsquic_stream_tosend_sz(stream); 1859 assert(n_to_send > 0 || lsquic_stream_tosend_fin(stream)); /* Otherwise, why are we called? */ 1860 need_at_least = 1861 MAX_STREAM_FRAME_HEADER + MIN(MIN_STREAM_FRAME_PAYLOAD, n_to_send); 1862 packet_out = get_writeable_packet(conn, need_at_least); 1863 if (!packet_out) 1864 return 0; 1865 off = packet_out->po_data_sz; 1866 len = conn->fc_conn.cn_pf->pf_gen_stream_frame( 1867 packet_out->po_data + packet_out->po_data_sz, 1868 lsquic_packet_out_avail(packet_out), stream->id, 1869 lsquic_stream_tosend_offset(stream), 1870 (gsf_fin_f) lsquic_stream_tosend_fin, 1871 (gsf_size_f) lsquic_stream_tosend_sz, 1872 (gsf_read_f) lsquic_stream_tosend_read, stream); 1873 if (len < 0) 1874 { 1875 ABORT_ERROR("gen_stream_frame failed"); 1876 return 0; 1877 } 1878#if FULL_CONN_STATS 1879 conn->fc_stats.stream_data_sz += len - 1880 conn->fc_conn.cn_pf->pf_parse_stream_frame_header_sz( 1881 packet_out->po_data[packet_out->po_data_sz]); 1882#endif 1883 EV_LOG_GENERATED_STREAM_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf, 1884 packet_out->po_data + packet_out->po_data_sz, len); 1885 packet_out->po_data_sz += len; 1886 packet_out->po_frame_types |= 1 << QUIC_FRAME_STREAM; 1887 s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream, 1888 QUIC_FRAME_STREAM, off); 1889 if (s != 0) 1890 { 1891 ABORT_ERROR("adding stream to packet failed: %s", strerror(errno)); 1892 return 0; 1893 } 1894 lsquic_stream_stream_frame_sent(stream); 1895 LSQ_DEBUG("Put %d bytes of data from stream %u into packet on outgoing " 1896 "queue", len, stream->id); 1897 return 1; 1898} 1899 1900 1901/* TODO I think we can separate this function into two functions: one that 1902 * processes STREAM_SEND_WUF, STREAM_SEND_BLOCKED, and STREAM_SEND_RST 1903 * queues, and one that processes STREAM_SEND_DATA queue, as the four flags 1904 * are not dependent upon each other. Double-check this and implement. 1905 */ 1906static int 1907process_stream_ready_to_send (struct full_conn *conn, lsquic_stream_t *stream, 1908 int send_data) 1909{ 1910 int r = 1, written; 1911 if (stream->stream_flags & STREAM_SEND_WUF) 1912 r &= generate_wuf_stream(conn, stream); 1913 if (send_data && (stream->stream_flags & STREAM_SEND_DATA)) 1914 { 1915 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 1916 { 1917 /* Handshake messages are sent in brand-new packets. If handshake 1918 * is not complete, the packet is zero-padded. 1919 */ 1920 lsquic_packet_out_t *packet_out = get_writeable_packet(conn, 0); 1921 written = generate_stream_frame(conn, stream); 1922 r &= written; 1923 if (written) 1924 { 1925 packet_out->po_flags |= PO_HELLO; 1926 LSQ_DEBUG("packet %"PRIu64" is carrying CHLO data", 1927 packet_out->po_packno); 1928 if (!(conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE)) 1929 { 1930 zero_pad_packet(packet_out); 1931 LSQ_DEBUG("zero-padded packet %"PRIu64, 1932 packet_out->po_packno); 1933 } 1934 } 1935 assert(packet_out == 1936 lsquic_send_ctl_last_scheduled(&conn->fc_send_ctl)); 1937 } 1938 else 1939 r &= generate_stream_frame(conn, stream); 1940 } 1941 if (stream->stream_flags & STREAM_SEND_BLOCKED) 1942 r &= generate_stream_blocked_frame(conn, stream); 1943 if (stream->stream_flags & STREAM_SEND_RST) 1944 r &= generate_rst_stream_frame(conn, stream); 1945 return r; 1946} 1947 1948 1949/* Return 1 if any STREAM frames were packetized, 0 otherwise. */ 1950static int 1951process_streams_ready_to_send (struct full_conn *conn) 1952{ 1953 lsquic_stream_t *stream; 1954 struct stream_prio_iter spi; 1955 int stream_frames_packetized; 1956 1957 assert(!TAILQ_EMPTY(&conn->fc_pub.sending_streams)); 1958 1959 lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.sending_streams), 1960 TAILQ_LAST(&conn->fc_pub.sending_streams, lsquic_streams_tailq), 1961 (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_send_stream), 1962 STREAM_SENDING_FLAGS, conn->fc_conn.cn_cid, "send"); 1963 1964 /* The first iteration takes care of all WINDOW_UPDATE, BLOCKED, and 1965 * RST_STREAM frames. After that, we turn on priority level exhaustion 1966 * mechanism in the iterator and send data. 1967 */ 1968 1969 for (stream = lsquic_spi_first(&spi); stream; 1970 stream = lsquic_spi_next(&spi)) 1971 if (!process_stream_ready_to_send(conn, stream, 0)) 1972 return 0; 1973 1974 lsquic_spi_exhaust_on(&spi); 1975 1976 stream_frames_packetized = 0; 1977 for (stream = lsquic_spi_first(&spi); stream; 1978 stream = lsquic_spi_next(&spi)) 1979 if (process_stream_ready_to_send(conn, stream, 1)) 1980 ++stream_frames_packetized; 1981 else 1982 break; 1983 1984 return stream_frames_packetized > 0; 1985} 1986 1987 1988static void 1989service_streams (struct full_conn *conn) 1990{ 1991 struct lsquic_hash_elem *el; 1992 lsquic_stream_t *stream, *next; 1993 int n_our_destroyed = 0; 1994 1995 for (stream = TAILQ_FIRST(&conn->fc_pub.service_streams); stream; stream = next) 1996 { 1997 next = TAILQ_NEXT(stream, next_service_stream); 1998 if (stream->stream_flags & STREAM_CALL_ONCLOSE) 1999 lsquic_stream_call_on_close(stream); 2000 if (stream->stream_flags & STREAM_FREE_STREAM) 2001 { 2002 n_our_destroyed += is_our_stream(conn, stream); 2003 TAILQ_REMOVE(&conn->fc_pub.service_streams, stream, next_service_stream); 2004 el = lsquic_hash_find(conn->fc_streams, &stream->id, sizeof(stream->id)); 2005 if (el) 2006 lsquic_hash_erase(conn->fc_streams, el); 2007 conn_mark_stream_closed(conn, stream->id); 2008 SAVE_STREAM_HISTORY(conn, stream); 2009 lsquic_stream_destroy(stream); 2010 } 2011 } 2012 2013 if (either_side_going_away(conn)) 2014 while (conn->fc_n_delayed_streams) 2015 { 2016 --conn->fc_n_delayed_streams; 2017 LSQ_DEBUG("goaway mode: delayed stream results in null ctor"); 2018 (void) conn->fc_stream_ifs[STREAM_IF_STD].stream_if->on_new_stream( 2019 conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx, NULL); 2020 } 2021 else 2022 while (n_our_destroyed && conn->fc_n_delayed_streams) 2023 { 2024 --n_our_destroyed; 2025 --conn->fc_n_delayed_streams; 2026 LSQ_DEBUG("creating delayed stream"); 2027 if (!new_stream(conn, generate_stream_id(conn))) 2028 { 2029 ABORT_ERROR("%s: cannot create new stream: %s", __func__, 2030 strerror(errno)); 2031 break; 2032 } 2033 assert(count_streams(conn, 0) <= conn->fc_cfg.max_streams_out); 2034 } 2035} 2036 2037 2038struct filter_stream_ctx 2039{ 2040 struct full_conn *conn; 2041 uint32_t last_stream_id, 2042 max_peer_stream_id; 2043}; 2044 2045 2046static int 2047filter_out_old_streams (void *ctx, lsquic_stream_t *stream) 2048{ 2049 struct filter_stream_ctx *const fctx = ctx; 2050 return ((!((stream->id ^ fctx->last_stream_id) & 1) && 2051 stream->id > fctx->last_stream_id) 2052 || 2053 (!((stream->id ^ fctx->max_peer_stream_id) & 1) && 2054 stream->id > fctx->max_peer_stream_id)); 2055} 2056 2057 2058static int 2059dispatch_stream_rw_events (struct full_conn *conn, lsquic_stream_t *stream) 2060{ 2061 struct stream_rw_prog_status saved_status; 2062 int is_reset, progress_made; 2063 2064 is_reset = lsquic_stream_is_reset(stream); 2065 lsquic_stream_get_rw_prog_status(stream, &saved_status); 2066 lsquic_stream_dispatch_rw_events(stream); 2067 progress_made = lsquic_stream_progress_was_made(stream, &saved_status); 2068 if (!is_reset && lsquic_stream_is_reset(stream)) 2069 lsquic_send_ctl_reset_stream(&conn->fc_send_ctl, stream->id); 2070 2071 return progress_made; 2072} 2073 2074 2075/* Return 1 if progress was made, 0 otherwise */ 2076static int 2077process_streams_rw_events (struct full_conn *conn) 2078{ 2079 lsquic_stream_t *stream; 2080 struct filter_stream_ctx fctx; 2081 struct stream_prio_iter spi; 2082 int progress_count; 2083 2084 if (TAILQ_EMPTY(&conn->fc_pub.rw_streams)) 2085 return 0; 2086 2087 progress_count = 0; 2088 2089 fctx.last_stream_id = conn->fc_last_stream_id; 2090 fctx.max_peer_stream_id = conn->fc_max_peer_stream_id; 2091 2092 lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.rw_streams), 2093 TAILQ_LAST(&conn->fc_pub.rw_streams, lsquic_streams_tailq), 2094 (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_rw_stream), 2095 STREAM_RW_EVENT_FLAGS, conn->fc_conn.cn_cid, "rw-1"); 2096 2097 for (stream = lsquic_spi_first(&spi); stream; 2098 stream = lsquic_spi_next(&spi)) 2099 progress_count += 2100 dispatch_stream_rw_events(conn, stream); 2101 2102 /* If new streams were created as result of the RW dispatching above, 2103 * process these new streams. 2104 */ 2105 if (fctx.last_stream_id < conn->fc_last_stream_id || 2106 fctx.max_peer_stream_id < conn->fc_max_peer_stream_id) 2107 { 2108 fctx.conn = conn; 2109 lsquic_spi_init_ext(&spi, TAILQ_FIRST(&conn->fc_pub.rw_streams), 2110 TAILQ_LAST(&conn->fc_pub.rw_streams, lsquic_streams_tailq), 2111 (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_rw_stream), 2112 STREAM_RW_EVENT_FLAGS, filter_out_old_streams, &fctx, 2113 conn->fc_conn.cn_cid, "rw-2"); 2114 for (stream = lsquic_spi_first(&spi); stream; 2115 stream = lsquic_spi_next(&spi)) 2116 progress_count += 2117 dispatch_stream_rw_events(conn, stream); 2118 } 2119 2120 return progress_count > 0; 2121} 2122 2123 2124/* Return 1 if progress was made, 0 otherwise. */ 2125static int 2126process_hsk_stream_rw_events (struct full_conn *conn) 2127{ 2128 lsquic_stream_t *stream; 2129 TAILQ_FOREACH(stream, &conn->fc_pub.rw_streams, next_rw_stream) 2130 if (LSQUIC_STREAM_HANDSHAKE == stream->id) 2131 return dispatch_stream_rw_events(conn, stream); 2132 return 0; 2133} 2134 2135 2136#if 1 2137# define verify_ack_frame(a, b, c) 2138#else 2139static void 2140verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz) 2141{ 2142 unsigned i; 2143 int parsed_len; 2144 struct ack_info *ack_info; 2145 const struct lsquic_packno_range *range; 2146 char ack_buf[512]; 2147 unsigned buf_off = 0; 2148 int nw; 2149 2150 ack_info = conn->fc_pub.mm->acki; 2151 parsed_len = parse_ack_frame(buf, bufsz, ack_info); 2152 assert(parsed_len == bufsz); 2153 2154 for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range; 2155 range = lsquic_rechist_next(&conn->fc_rechist), ++i) 2156 { 2157 assert(i < ack_info->n_ranges); 2158 assert(range->high == ack_info->ranges[i].high); 2159 assert(range->low == ack_info->ranges[i].low); 2160 if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) 2161 { 2162 nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off, 2163 "[%"PRIu64"-%"PRIu64"]", range->high, range->low); 2164 assert(nw >= 0); 2165 buf_off += nw; 2166 } 2167 } 2168 assert(i == ack_info->n_ranges); 2169 LSQ_DEBUG("Sent ACK frame %s", ack_buf); 2170} 2171 2172 2173#endif 2174 2175 2176static void 2177generate_ack_frame (struct full_conn *conn) 2178{ 2179 lsquic_packet_out_t *packet_out; 2180 lsquic_time_t now; 2181 int has_missing, w; 2182 2183 packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0); 2184 if (!packet_out) 2185 { 2186 ABORT_ERROR("cannot allocate packet: %s", strerror(errno)); 2187 return; 2188 } 2189 2190 lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out); 2191 now = lsquic_time_now(); 2192 w = conn->fc_conn.cn_pf->pf_gen_ack_frame( 2193 packet_out->po_data + packet_out->po_data_sz, 2194 lsquic_packet_out_avail(packet_out), 2195 (gaf_rechist_first_f) lsquic_rechist_first, 2196 (gaf_rechist_next_f) lsquic_rechist_next, 2197 (gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv, 2198 &conn->fc_rechist, now, &has_missing); 2199 if (w < 0) { 2200 ABORT_ERROR("generating ACK frame failed: %d", errno); 2201 return; 2202 } 2203 EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf, 2204 packet_out->po_data + packet_out->po_data_sz, w); 2205 verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w); 2206 lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl); 2207 packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK; 2208 packet_out->po_data_sz += w; 2209 packet_out->po_regen_sz += w; 2210 if (has_missing) 2211 conn->fc_flags |= FC_ACK_HAD_MISS; 2212 else 2213 conn->fc_flags &= ~FC_ACK_HAD_MISS; 2214 LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w); 2215 if (conn->fc_conn.cn_version >= LSQVER_039 && 2216 conn->fc_n_cons_unretx >= 20 && 2217 !lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl)) 2218 { 2219 LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx " 2220 "packets sent", conn->fc_n_cons_unretx); 2221 conn->fc_flags |= FC_SEND_WUF; 2222 } 2223} 2224 2225 2226static int 2227conn_ok_to_close (const struct full_conn *conn) 2228{ 2229 assert(conn->fc_flags & FC_CLOSING); 2230 return !(conn->fc_flags & FC_SERVER) 2231 || (conn->fc_flags & FC_RECV_CLOSE) 2232 || ( 2233 !lsquic_send_ctl_have_outgoing_stream_frames(&conn->fc_send_ctl) 2234 && lsquic_hash_count(conn->fc_streams) == 0 2235 && lsquic_send_ctl_have_unacked_stream_frames(&conn->fc_send_ctl) == 0); 2236} 2237 2238 2239static enum tick_st 2240immediate_close (struct full_conn *conn) 2241{ 2242 lsquic_packet_out_t *packet_out; 2243 const char *error_reason; 2244 unsigned error_code; 2245 int sz; 2246 2247 if (conn->fc_flags & (FC_TICK_CLOSE|FC_GOT_PRST)) 2248 return TICK_CLOSE; 2249 2250 conn->fc_flags |= FC_TICK_CLOSE; 2251 2252 /* No reason to send anything that's been scheduled if connection is 2253 * being closed immedately. This also ensures that packet numbers 2254 * sequence is always increasing. 2255 */ 2256 lsquic_send_ctl_drop_scheduled(&conn->fc_send_ctl); 2257 2258 if ((conn->fc_flags & FC_TIMED_OUT) && conn->fc_settings->es_silent_close) 2259 return TICK_CLOSE; 2260 2261 packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0); 2262 if (!packet_out) 2263 { 2264 LSQ_WARN("cannot allocate packet: %s", strerror(errno)); 2265 return TICK_CLOSE; 2266 } 2267 2268 assert(conn->fc_flags & (FC_ERROR|FC_ABORTED|FC_TIMED_OUT)); 2269 if (conn->fc_flags & FC_ERROR) 2270 { 2271 error_code = 0x01; /* QUIC_INTERNAL_ERROR */ 2272 error_reason = "connection error"; 2273 } 2274 else if (conn->fc_flags & FC_ABORTED) 2275 { 2276 error_code = 0x10; /* QUIC_PEER_GOING_AWAY */ 2277 error_reason = "user aborted connection"; 2278 } 2279 else if (conn->fc_flags & FC_TIMED_OUT) 2280 { 2281 error_code = 0x19; /* QUIC_NETWORK_IDLE_TIMEOUT */ 2282 error_reason = "connection timed out"; 2283 } 2284 else 2285 { 2286 error_code = 0x10; /* QUIC_PEER_GOING_AWAY */ 2287 error_reason = NULL; 2288 } 2289 2290 lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out); 2291 sz = conn->fc_conn.cn_pf->pf_gen_connect_close_frame( 2292 packet_out->po_data + packet_out->po_data_sz, 2293 lsquic_packet_out_avail(packet_out), error_code, 2294 error_reason, error_reason ? strlen(error_reason) : 0); 2295 if (sz < 0) { 2296 LSQ_WARN("%s failed", __func__); 2297 return TICK_CLOSE; 2298 } 2299 packet_out->po_data_sz += sz; 2300 packet_out->po_frame_types |= 1 << QUIC_FRAME_CONNECTION_CLOSE; 2301 LSQ_DEBUG("generated CONNECTION_CLOSE frame in its own packet"); 2302 return TICK_SEND|TICK_CLOSE; 2303} 2304 2305 2306static enum tick_st 2307full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) 2308{ 2309 struct full_conn *conn = (struct full_conn *) lconn; 2310 lsquic_packet_out_t *last_packet_out; 2311 int have_delayed_packets; 2312 unsigned n; 2313 int progress_made; 2314 enum tick_st progress_tick = 0; 2315 2316#define CLOSE_IF_NECESSARY() do { \ 2317 if (conn->fc_flags & FC_IMMEDIATE_CLOSE_FLAGS) \ 2318 return progress_tick | immediate_close(conn); \ 2319} while (0) 2320 2321#define RETURN_IF_OUT_OF_PACKETS() do { \ 2322 if (!lsquic_send_ctl_can_send(&conn->fc_send_ctl)) \ 2323 { \ 2324 if (0 == lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl)) \ 2325 { \ 2326 LSQ_DEBUG("used up packet allowance, quiet now (line %d)", \ 2327 __LINE__); \ 2328 return progress_tick | TICK_QUIET; \ 2329 } \ 2330 else \ 2331 { \ 2332 LSQ_DEBUG("used up packet allowance, sending now (line %d)",\ 2333 __LINE__); \ 2334 return progress_tick | TICK_SEND; \ 2335 } \ 2336 } \ 2337} while (0) 2338 2339 assert(!(conn->fc_conn.cn_flags & LSCONN_RW_PENDING)); 2340 2341 lsquic_send_ctl_tick(&conn->fc_send_ctl, now); 2342 CLOSE_IF_NECESSARY(); 2343 2344 if (!(conn->fc_flags & FC_SERVER)) 2345 { 2346 lsquic_alarmset_unset(&conn->fc_alset, AL_PING); 2347 lsquic_send_ctl_sanity_check(&conn->fc_send_ctl); 2348 } 2349 2350 lsquic_alarmset_ring_expired(&conn->fc_alset, now); 2351 CLOSE_IF_NECESSARY(); 2352 2353 /* To make things simple, only stream 1 is active until the handshake 2354 * has been completed. This will be adjusted in the future: the client 2355 * does not want to wait if it has the server information. 2356 */ 2357 if (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE) 2358 progress_made = process_streams_rw_events(conn); 2359 else 2360 progress_made = process_hsk_stream_rw_events(conn); 2361 progress_tick |= progress_made << TICK_BIT_PROGRESS; 2362 CLOSE_IF_NECESSARY(); 2363 2364 if (conn->fc_flags & FC_FIRST_TICK) 2365 { 2366 conn->fc_flags &= ~FC_FIRST_TICK; 2367 have_delayed_packets = 0; 2368 } 2369 else 2370 /* If there are any scheduled packets at this point, it means that 2371 * they were not sent during previous tick; in other words, they 2372 * are delayed. When there are delayed packets, the only packet 2373 * we sometimes add is a packet with an ACK frame, and we add it 2374 * to the *front* of the queue. 2375 */ 2376 have_delayed_packets = lsquic_send_ctl_squeeze_sched( 2377 &conn->fc_send_ctl); 2378 2379 if ((conn->fc_flags & FC_ACK_QUEUED) || 2380 lsquic_send_ctl_lost_ack(&conn->fc_send_ctl)) 2381 { 2382 if (have_delayed_packets) 2383 lsquic_send_ctl_reset_packnos(&conn->fc_send_ctl); 2384 2385 /* ACK frame generation fails with an error if it does not fit into 2386 * a single packet (it always should fit). 2387 */ 2388 generate_ack_frame(conn); 2389 CLOSE_IF_NECESSARY(); 2390 reset_ack_state(conn); 2391 2392 /* Try to send STOP_WAITING frame at the same time we send an ACK 2393 * This follows reference implementation. 2394 */ 2395 if (!(conn->fc_flags & FC_NSTP)) 2396 conn->fc_flags |= FC_SEND_STOP_WAITING; 2397 2398 if (have_delayed_packets) 2399 { 2400 if (conn->fc_flags & FC_SEND_STOP_WAITING) 2401 { 2402 /* TODO: ensure that STOP_WAITING frame is in the same packet 2403 * as the ACK frame in delayed packet mode. 2404 */ 2405 generate_stop_waiting_frame(conn); 2406 CLOSE_IF_NECESSARY(); 2407 } 2408 lsquic_send_ctl_ack_to_front(&conn->fc_send_ctl); 2409 } 2410 } 2411 2412 if (have_delayed_packets) 2413 /* The reason for not adding STOP_WAITING and other frames below 2414 * to the packet carrying ACK frame generated when there are delayed 2415 * packets is so that if the ACK packet itself is delayed, it can be 2416 * dropped and replaced by new ACK packet. This way, we are never 2417 * more than 1 packet over CWND. 2418 */ 2419 return progress_tick | TICK_SEND; 2420 2421 RETURN_IF_OUT_OF_PACKETS(); 2422 2423 /* Try to fit any of the following three frames -- STOP_WAITING, 2424 * WINDOW_UPDATE, and GOAWAY -- before checking if we have run 2425 * out of packets. If either of them does not fit, it will be 2426 * tried next time around. 2427 */ 2428 if (conn->fc_flags & FC_SEND_STOP_WAITING) 2429 { 2430 generate_stop_waiting_frame(conn); 2431 CLOSE_IF_NECESSARY(); 2432 } 2433 2434 if (lsquic_cfcw_fc_offsets_changed(&conn->fc_pub.cfcw) || 2435 (conn->fc_flags & FC_SEND_WUF)) 2436 { 2437 conn->fc_flags |= FC_SEND_WUF; 2438 generate_wuf_conn(conn); 2439 CLOSE_IF_NECESSARY(); 2440 } 2441 2442 if (conn->fc_flags & FC_SEND_GOAWAY) 2443 { 2444 generate_goaway_frame(conn); 2445 CLOSE_IF_NECESSARY(); 2446 } 2447 2448 n = lsquic_send_ctl_reschedule_packets(&conn->fc_send_ctl); 2449 if (n > 0) 2450 CLOSE_IF_NECESSARY(); 2451 2452 RETURN_IF_OUT_OF_PACKETS(); 2453 2454 if (conn->fc_conn.cn_flags & LSCONN_SEND_BLOCKED) 2455 { 2456 if (generate_blocked_frame(conn, 0)) 2457 conn->fc_conn.cn_flags &= ~LSCONN_SEND_BLOCKED; 2458 else 2459 RETURN_IF_OUT_OF_PACKETS(); 2460 } 2461 2462 if (!TAILQ_EMPTY(&conn->fc_pub.sending_streams)) 2463 { 2464 progress_made = process_streams_ready_to_send(conn); 2465 progress_tick |= progress_made << TICK_BIT_PROGRESS; 2466 /* If last packet on the queue contains a stream-related frame, mark it 2467 * unwriteable to force a new packet allocation on the next tick. This 2468 * is to prevent more than one STREAM or RST_STREAM frame from the same 2469 * stream to be written to the same packet. 2470 */ 2471 last_packet_out = lsquic_send_ctl_last_scheduled(&conn->fc_send_ctl); 2472 if (last_packet_out && 2473 (last_packet_out->po_frame_types & 2474 ((1 << QUIC_FRAME_STREAM)|(1 << QUIC_FRAME_RST_STREAM)))) 2475 last_packet_out->po_flags &= ~PO_WRITEABLE; 2476 } 2477 CLOSE_IF_NECESSARY(); 2478 2479 service_streams(conn); 2480 2481 RETURN_IF_OUT_OF_PACKETS(); 2482 2483 if ((conn->fc_flags & FC_CLOSING) && conn_ok_to_close(conn)) 2484 { 2485 LSQ_DEBUG("connection is OK to close"); 2486 /* This is normal termination sequence. 2487 * 2488 * Generate CONNECTION_CLOSE frame if we are responding to one, have 2489 * packets scheduled to send, or silent close flag is not set. 2490 */ 2491 conn->fc_flags |= FC_TICK_CLOSE; 2492 if ((conn->fc_flags & FC_RECV_CLOSE) || 2493 0 != lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl) || 2494 !conn->fc_settings->es_silent_close) 2495 { 2496 generate_connection_close_packet(conn); 2497 return progress_tick | TICK_SEND|TICK_CLOSE; 2498 } 2499 else 2500 return progress_tick | TICK_CLOSE; 2501 } 2502 2503 if (0 == lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl)) 2504 { 2505 if (conn->fc_flags & FC_SEND_PING) 2506 { 2507 conn->fc_flags &= ~FC_SEND_PING; 2508 generate_ping_frame(conn); 2509 CLOSE_IF_NECESSARY(); 2510 assert(lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl) != 0); 2511 } 2512 else 2513 return progress_tick | TICK_QUIET; 2514 } 2515 else if (!(conn->fc_flags & FC_SERVER)) 2516 { 2517 lsquic_alarmset_unset(&conn->fc_alset, AL_PING); 2518 lsquic_send_ctl_sanity_check(&conn->fc_send_ctl); 2519 conn->fc_flags &= ~FC_SEND_PING; /* It may have rung */ 2520 } 2521 2522 now = lsquic_time_now(); 2523 lsquic_alarmset_set(&conn->fc_alset, AL_IDLE, 2524 now + conn->fc_settings->es_idle_conn_to); 2525 2526 /* From the spec: 2527 * " The PING frame should be used to keep a connection alive when 2528 * " a stream is open. 2529 */ 2530 if (0 == (conn->fc_flags & FC_SERVER) && 2531 lsquic_hash_count(conn->fc_streams) > 0) 2532 lsquic_alarmset_set(&conn->fc_alset, AL_PING, now + TIME_BETWEEN_PINGS); 2533 2534 return progress_tick | TICK_SEND; 2535} 2536 2537 2538static void 2539full_conn_ci_packet_in (lsquic_conn_t *lconn, lsquic_packet_in_t *packet_in) 2540{ 2541 struct full_conn *conn = (struct full_conn *) lconn; 2542 2543 lsquic_alarmset_set(&conn->fc_alset, AL_IDLE, 2544 packet_in->pi_received + conn->fc_settings->es_idle_conn_to); 2545 if (0 == (conn->fc_flags & FC_ERROR)) 2546 if (0 != process_incoming_packet(conn, packet_in)) 2547 conn->fc_flags |= FC_ERROR; 2548} 2549 2550 2551static lsquic_packet_out_t * 2552full_conn_ci_next_packet_to_send (lsquic_conn_t *lconn) 2553{ 2554 struct full_conn *conn = (struct full_conn *) lconn; 2555 return lsquic_send_ctl_next_packet_to_send(&conn->fc_send_ctl); 2556} 2557 2558 2559static void 2560full_conn_ci_packet_sent (lsquic_conn_t *lconn, lsquic_packet_out_t *packet_out) 2561{ 2562 struct full_conn *conn = (struct full_conn *) lconn; 2563 int s; 2564 2565 if (packet_out->po_frame_types & QFRAME_RETRANSMITTABLE_MASK) 2566 { 2567 conn->fc_n_cons_unretx = 0; 2568 lsquic_alarmset_set(&conn->fc_alset, AL_IDLE, 2569 packet_out->po_sent + conn->fc_settings->es_idle_conn_to); 2570 } 2571 else 2572 ++conn->fc_n_cons_unretx; 2573 s = lsquic_send_ctl_sent_packet(&conn->fc_send_ctl, packet_out); 2574 if (s != 0) 2575 ABORT_ERROR("sent packet failed: %s", strerror(errno)); 2576#if FULL_CONN_STATS 2577 ++conn->fc_stats.n_packets_out; 2578#endif 2579} 2580 2581 2582static void 2583full_conn_ci_packet_not_sent (lsquic_conn_t *lconn, lsquic_packet_out_t *packet_out) 2584{ 2585 struct full_conn *conn = (struct full_conn *) lconn; 2586 lsquic_send_ctl_delayed_one(&conn->fc_send_ctl, packet_out); 2587} 2588 2589 2590static void 2591full_conn_ci_handshake_done (lsquic_conn_t *lconn) 2592{ 2593 struct full_conn *conn = (struct full_conn *) lconn; 2594 LSQ_DEBUG("handshake reportedly done"); 2595 lsquic_alarmset_unset(&conn->fc_alset, AL_HANDSHAKE); 2596 if (0 == apply_peer_settings(conn)) 2597 lconn->cn_flags |= LSCONN_HANDSHAKE_DONE; 2598 else 2599 conn->fc_flags |= FC_ERROR; 2600} 2601 2602 2603static int 2604full_conn_ci_user_wants_read (lsquic_conn_t *lconn) 2605{ 2606 struct full_conn *conn = (struct full_conn *) lconn; 2607 const lsquic_stream_t *stream; 2608 2609 TAILQ_FOREACH(stream, &conn->fc_pub.rw_streams, next_rw_stream) 2610 if (stream->stream_flags & STREAM_WANT_READ) 2611 return 1; 2612 2613 return 0; 2614} 2615 2616 2617void 2618lsquic_conn_abort (lsquic_conn_t *lconn) 2619{ 2620 struct full_conn *conn = (struct full_conn *) lconn; 2621 LSQ_INFO("User aborted connection"); 2622 conn->fc_flags |= FC_ABORTED; 2623} 2624 2625 2626void 2627full_conn_close_internal (lsquic_conn_t *lconn, int is_user) 2628{ 2629 struct full_conn *conn = (struct full_conn *) lconn; 2630 lsquic_stream_t *stream; 2631 struct lsquic_hash_elem *el; 2632 2633 if (!(conn->fc_flags & FC_CLOSING)) 2634 { 2635 if (is_user) 2636 LSQ_INFO("User closed connection"); 2637 for (el = lsquic_hash_first(conn->fc_streams); el; 2638 el = lsquic_hash_next(conn->fc_streams)) 2639 { 2640 stream = lsquic_hashelem_getdata(el); 2641 lsquic_stream_shutdown_internal(stream); 2642 } 2643 conn->fc_flags |= FC_CLOSING; 2644 if (!(conn->fc_flags & FC_GOAWAY_SENT)) 2645 conn->fc_flags |= FC_SEND_GOAWAY; 2646 } 2647} 2648 2649 2650void 2651lsquic_conn_close (lsquic_conn_t *lconn) 2652{ 2653 full_conn_close_internal(lconn, 1); 2654} 2655 2656 2657void 2658lsquic_conn_going_away (lsquic_conn_t *lconn) 2659{ 2660 struct full_conn *conn = (struct full_conn *) lconn; 2661 if (!(conn->fc_flags & (FC_CLOSING|FC_GOING_AWAY))) 2662 { 2663 LSQ_INFO("connection marked as going away"); 2664 assert(!(conn->fc_flags & FC_SEND_GOAWAY)); 2665 conn->fc_flags |= FC_GOING_AWAY; 2666 if (!(conn->fc_flags & FC_GOAWAY_SENT)) 2667 conn->fc_flags |= FC_SEND_GOAWAY; 2668 } 2669} 2670 2671 2672/* Find stream when stream ID is read in from HEADERS stream. If stream 2673 * cannot be found or created, the connection is aborted. 2674 */ 2675#if __GNUC__ 2676__attribute__((nonnull(3))) 2677#endif 2678static lsquic_stream_t * 2679find_stream_on_headers (struct full_conn *conn, uint32_t stream_id, 2680 const char *what) 2681{ 2682 lsquic_stream_t *stream = find_stream_by_id(conn, stream_id); 2683 if (!stream) 2684 { 2685 if (conn_is_stream_closed(conn, stream_id)) 2686 { 2687 LSQ_DEBUG("drop incoming %s for closed stream %u", what, stream_id); 2688 return NULL; 2689 } 2690 if (is_peer_initiated(conn, stream_id)) 2691 { 2692 unsigned in_count = count_streams(conn, 1); 2693 LSQ_DEBUG("number of peer-initiated streams: %u", in_count); 2694 if (in_count >= conn->fc_cfg.max_streams_in) 2695 { 2696 ABORT_ERROR("incoming %s for stream %u would exceed " 2697 "limit: %u", what, stream_id, conn->fc_cfg.max_streams_in); 2698 return NULL; 2699 } 2700 if ((conn->fc_flags & FC_GOING_AWAY) && 2701 stream_id > conn->fc_max_peer_stream_id) 2702 { 2703 LSQ_DEBUG("going away: drop headers for new stream %u", 2704 stream_id); 2705 return NULL; 2706 } 2707 } 2708 else 2709 { 2710 ABORT_ERROR("frame for never-initiated stream (push promise?)"); 2711 return NULL; 2712 } 2713 stream = new_stream(conn, stream_id); 2714 if (!stream) 2715 { 2716 ABORT_ERROR("cannot create new stream: %s", strerror(errno)); 2717 return NULL; 2718 } 2719 if (stream_id > conn->fc_max_peer_stream_id) 2720 conn->fc_max_peer_stream_id = stream_id; 2721 } 2722 return stream; 2723} 2724 2725 2726static void 2727headers_stream_on_conn_error (void *ctx) 2728{ 2729 struct full_conn *conn = ctx; 2730 ABORT_ERROR("connection error reported by HEADERS stream"); 2731} 2732 2733 2734static void 2735headers_stream_on_stream_error (void *ctx, uint32_t stream_id) 2736{ 2737 struct full_conn *conn = ctx; 2738 lsquic_stream_t *stream = find_stream_on_headers(conn, stream_id, "error"); 2739 if (stream) 2740 { 2741 LSQ_DEBUG("resetting stream %u due to error", stream_id); 2742 /* We use code 1, which is QUIC_INTERNAL_ERROR (see 2743 * [draft-hamilton-quic-transport-protocol-01], Section 10), for all 2744 * errors. There does not seem to be a good reason to figure out 2745 * and send more specific error codes. 2746 */ 2747 lsquic_stream_reset_ext(stream, 1, 0); 2748 } 2749} 2750 2751 2752static void 2753headers_stream_on_enable_push (void *ctx, int enable_push) 2754{ 2755 struct full_conn *conn = ctx; 2756 if (0 == enable_push) 2757 { 2758 LSQ_DEBUG("server push %d -> 0", !!(conn->fc_flags & FC_SUPPORT_PUSH)); 2759 conn->fc_flags &= ~FC_SUPPORT_PUSH; 2760 } 2761 else if (conn->fc_settings->es_support_push) 2762 { 2763 LSQ_DEBUG("server push %d -> 1", !!(conn->fc_flags & FC_SUPPORT_PUSH)); 2764 conn->fc_flags |= FC_SUPPORT_PUSH; 2765 } 2766 else 2767 LSQ_INFO("not enabling server push that's disabled in engine settings"); 2768} 2769 2770 2771static void 2772headers_stream_on_incoming_headers (void *ctx, struct uncompressed_headers *uh) 2773{ 2774 struct full_conn *conn = ctx; 2775 lsquic_stream_t *stream; 2776 2777 LSQ_DEBUG("incoming headers for stream %u", uh->uh_stream_id); 2778 2779 stream = find_stream_on_headers(conn, uh->uh_stream_id, "headers"); 2780 if (!stream) 2781 { 2782 free(uh); 2783 return; 2784 } 2785 2786 if (0 != lsquic_stream_uh_in(stream, uh)) 2787 { 2788 ABORT_ERROR("stream %u refused incoming headers", uh->uh_stream_id); 2789 free(uh); 2790 } 2791} 2792 2793 2794static void 2795headers_stream_on_push_promise (void *ctx, struct uncompressed_headers *uh) 2796{ 2797 struct full_conn *conn = ctx; 2798 lsquic_stream_t *stream; 2799 2800 assert(!(conn->fc_flags & FC_SERVER)); 2801 2802 LSQ_DEBUG("push promise for stream %u in response to %u", 2803 uh->uh_oth_stream_id, uh->uh_stream_id); 2804 2805 if (0 == (uh->uh_stream_id & 1) || 2806 0 != (uh->uh_oth_stream_id & 1)) 2807 { 2808 ABORT_ERROR("invalid push promise stream IDs: %u, %u", 2809 uh->uh_oth_stream_id, uh->uh_stream_id); 2810 free(uh); 2811 return; 2812 } 2813 2814 if (!(conn_is_stream_closed(conn, uh->uh_stream_id) || 2815 find_stream_by_id(conn, uh->uh_stream_id))) 2816 { 2817 ABORT_ERROR("invalid push promise original stream ID %u never " 2818 "initiated", uh->uh_stream_id); 2819 free(uh); 2820 return; 2821 } 2822 2823 if (conn_is_stream_closed(conn, uh->uh_oth_stream_id) || 2824 find_stream_by_id(conn, uh->uh_oth_stream_id)) 2825 { 2826 ABORT_ERROR("invalid promised stream ID %u already used", 2827 uh->uh_oth_stream_id); 2828 free(uh); 2829 return; 2830 } 2831 2832 stream = new_stream_ext(conn, uh->uh_oth_stream_id, STREAM_IF_STD, 2833 SCF_DI_AUTOSWITCH|(conn->fc_enpub->enp_settings.es_rw_once ? 2834 SCF_DISP_RW_ONCE : 0)); 2835 if (!stream) 2836 { 2837 ABORT_ERROR("cannot create stream: %s", strerror(errno)); 2838 free(uh); 2839 return; 2840 } 2841 lsquic_stream_push_req(stream, uh); 2842 lsquic_stream_call_on_new(stream, 2843 conn->fc_stream_ifs[STREAM_IF_STD].stream_if_ctx); 2844 return; 2845} 2846 2847 2848static void 2849headers_stream_on_priority (void *ctx, uint32_t stream_id, int exclusive, 2850 uint32_t dep_stream_id, unsigned weight) 2851{ 2852 struct full_conn *conn = ctx; 2853 lsquic_stream_t *stream; 2854 LSQ_DEBUG("got priority frame for stream %u: (ex: %d; dep stream: %u; " 2855 "weight: %u)", stream_id, exclusive, dep_stream_id, weight); 2856 stream = find_stream_on_headers(conn, stream_id, "priority"); 2857 if (stream) 2858 lsquic_stream_set_priority_internal(stream, weight); 2859} 2860 2861 2862int lsquic_conn_is_push_enabled(lsquic_conn_t *c) 2863{ 2864 return ((struct full_conn *)c)->fc_flags & FC_SUPPORT_PUSH; 2865} 2866 2867 2868lsquic_conn_ctx_t * 2869lsquic_conn_get_ctx (const lsquic_conn_t *lconn) 2870{ 2871 struct full_conn *const conn = (struct full_conn *) lconn; 2872 return conn->fc_conn_ctx; 2873} 2874 2875 2876static const struct headers_stream_callbacks headers_callbacks = 2877{ 2878 .hsc_on_headers = headers_stream_on_incoming_headers, 2879 .hsc_on_push_promise = headers_stream_on_push_promise, 2880 .hsc_on_priority = headers_stream_on_priority, 2881 .hsc_on_stream_error = headers_stream_on_stream_error, 2882 .hsc_on_conn_error = headers_stream_on_conn_error, 2883 .hsc_on_enable_push = headers_stream_on_enable_push, 2884}; 2885 2886 2887 2888static const struct conn_iface full_conn_iface = { 2889 .ci_destroy = full_conn_ci_destroy, 2890 .ci_handshake_done = full_conn_ci_handshake_done, 2891 .ci_next_packet_to_send = full_conn_ci_next_packet_to_send, 2892 .ci_packet_in = full_conn_ci_packet_in, 2893 .ci_packet_not_sent = full_conn_ci_packet_not_sent, 2894 .ci_packet_sent = full_conn_ci_packet_sent, 2895 .ci_tick = full_conn_ci_tick, 2896 .ci_user_wants_read = full_conn_ci_user_wants_read, 2897}; 2898