migrate_reuseport.c (14074B)
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Check if we can migrate child sockets. 4 * 5 * 1. call listen() for 4 server sockets. 6 * 2. call connect() for 25 client sockets. 7 * 3. call listen() for 1 server socket. (migration target) 8 * 4. update a map to migrate all child sockets 9 * to the last server socket (migrate_map[cookie] = 4) 10 * 5. call shutdown() for first 4 server sockets 11 * and migrate the requests in the accept queue 12 * to the last server socket. 13 * 6. call listen() for the second server socket. 14 * 7. call shutdown() for the last server 15 * and migrate the requests in the accept queue 16 * to the second server socket. 17 * 8. call listen() for the last server. 18 * 9. call shutdown() for the second server 19 * and migrate the requests in the accept queue 20 * to the last server socket. 21 * 10. call accept() for the last server socket. 22 * 23 * Author: Kuniyuki Iwashima <kuniyu@amazon.co.jp> 24 */ 25 26#include <bpf/bpf.h> 27#include <bpf/libbpf.h> 28 29#include "test_progs.h" 30#include "test_migrate_reuseport.skel.h" 31#include "network_helpers.h" 32 33#ifndef TCP_FASTOPEN_CONNECT 34#define TCP_FASTOPEN_CONNECT 30 35#endif 36 37#define IFINDEX_LO 1 38 39#define NR_SERVERS 5 40#define NR_CLIENTS (NR_SERVERS * 5) 41#define MIGRATED_TO (NR_SERVERS - 1) 42 43/* fastopenq->max_qlen and sk->sk_max_ack_backlog */ 44#define QLEN (NR_CLIENTS * 5) 45 46#define MSG "Hello World\0" 47#define MSGLEN 12 48 49static struct migrate_reuseport_test_case { 50 const char *name; 51 __s64 servers[NR_SERVERS]; 52 __s64 clients[NR_CLIENTS]; 53 struct sockaddr_storage addr; 54 socklen_t addrlen; 55 int family; 56 int state; 57 bool drop_ack; 58 bool expire_synack_timer; 59 bool fastopen; 60 struct bpf_link *link; 61} test_cases[] = { 62 { 63 .name = "IPv4 TCP_ESTABLISHED inet_csk_listen_stop", 64 .family = AF_INET, 65 .state = BPF_TCP_ESTABLISHED, 66 .drop_ack = false, 67 .expire_synack_timer = false, 68 .fastopen = false, 69 }, 70 { 71 .name = "IPv4 TCP_SYN_RECV inet_csk_listen_stop", 72 .family = AF_INET, 73 .state = BPF_TCP_SYN_RECV, 74 .drop_ack = true, 75 .expire_synack_timer = false, 76 .fastopen = true, 77 }, 78 { 79 .name = "IPv4 TCP_NEW_SYN_RECV reqsk_timer_handler", 80 .family = AF_INET, 81 .state = BPF_TCP_NEW_SYN_RECV, 82 .drop_ack = true, 83 .expire_synack_timer = true, 84 .fastopen = false, 85 }, 86 { 87 .name = "IPv4 TCP_NEW_SYN_RECV inet_csk_complete_hashdance", 88 .family = AF_INET, 89 .state = BPF_TCP_NEW_SYN_RECV, 90 .drop_ack = true, 91 .expire_synack_timer = false, 92 .fastopen = false, 93 }, 94 { 95 .name = "IPv6 TCP_ESTABLISHED inet_csk_listen_stop", 96 .family = AF_INET6, 97 .state = BPF_TCP_ESTABLISHED, 98 .drop_ack = false, 99 .expire_synack_timer = false, 100 .fastopen = false, 101 }, 102 { 103 .name = "IPv6 TCP_SYN_RECV inet_csk_listen_stop", 104 .family = AF_INET6, 105 .state = BPF_TCP_SYN_RECV, 106 .drop_ack = true, 107 .expire_synack_timer = false, 108 .fastopen = true, 109 }, 110 { 111 .name = "IPv6 TCP_NEW_SYN_RECV reqsk_timer_handler", 112 .family = AF_INET6, 113 .state = BPF_TCP_NEW_SYN_RECV, 114 .drop_ack = true, 115 .expire_synack_timer = true, 116 .fastopen = false, 117 }, 118 { 119 .name = "IPv6 TCP_NEW_SYN_RECV inet_csk_complete_hashdance", 120 .family = AF_INET6, 121 .state = BPF_TCP_NEW_SYN_RECV, 122 .drop_ack = true, 123 .expire_synack_timer = false, 124 .fastopen = false, 125 } 126}; 127 128static void init_fds(__s64 fds[], int len) 129{ 130 int i; 131 132 for (i = 0; i < len; i++) 133 fds[i] = -1; 134} 135 136static void close_fds(__s64 fds[], int len) 137{ 138 int i; 139 140 for (i = 0; i < len; i++) { 141 if (fds[i] != -1) { 142 close(fds[i]); 143 fds[i] = -1; 144 } 145 } 146} 147 148static int setup_fastopen(char *buf, int size, int *saved_len, bool restore) 149{ 150 int err = 0, fd, len; 151 152 fd = open("/proc/sys/net/ipv4/tcp_fastopen", O_RDWR); 153 if (!ASSERT_NEQ(fd, -1, "open")) 154 return -1; 155 156 if (restore) { 157 len = write(fd, buf, *saved_len); 158 if (!ASSERT_EQ(len, *saved_len, "write - restore")) 159 err = -1; 160 } else { 161 *saved_len = read(fd, buf, size); 162 if (!ASSERT_GE(*saved_len, 1, "read")) { 163 err = -1; 164 goto close; 165 } 166 167 err = lseek(fd, 0, SEEK_SET); 168 if (!ASSERT_OK(err, "lseek")) 169 goto close; 170 171 /* (TFO_CLIENT_ENABLE | TFO_SERVER_ENABLE | 172 * TFO_CLIENT_NO_COOKIE | TFO_SERVER_COOKIE_NOT_REQD) 173 */ 174 len = write(fd, "519", 3); 175 if (!ASSERT_EQ(len, 3, "write - setup")) 176 err = -1; 177 } 178 179close: 180 close(fd); 181 182 return err; 183} 184 185static int drop_ack(struct migrate_reuseport_test_case *test_case, 186 struct test_migrate_reuseport *skel) 187{ 188 if (test_case->family == AF_INET) 189 skel->bss->server_port = ((struct sockaddr_in *) 190 &test_case->addr)->sin_port; 191 else 192 skel->bss->server_port = ((struct sockaddr_in6 *) 193 &test_case->addr)->sin6_port; 194 195 test_case->link = bpf_program__attach_xdp(skel->progs.drop_ack, 196 IFINDEX_LO); 197 if (!ASSERT_OK_PTR(test_case->link, "bpf_program__attach_xdp")) 198 return -1; 199 200 return 0; 201} 202 203static int pass_ack(struct migrate_reuseport_test_case *test_case) 204{ 205 int err; 206 207 err = bpf_link__destroy(test_case->link); 208 if (!ASSERT_OK(err, "bpf_link__destroy")) 209 return -1; 210 211 test_case->link = NULL; 212 213 return 0; 214} 215 216static int start_servers(struct migrate_reuseport_test_case *test_case, 217 struct test_migrate_reuseport *skel) 218{ 219 int i, err, prog_fd, reuseport = 1, qlen = QLEN; 220 221 prog_fd = bpf_program__fd(skel->progs.migrate_reuseport); 222 223 make_sockaddr(test_case->family, 224 test_case->family == AF_INET ? "127.0.0.1" : "::1", 0, 225 &test_case->addr, &test_case->addrlen); 226 227 for (i = 0; i < NR_SERVERS; i++) { 228 test_case->servers[i] = socket(test_case->family, SOCK_STREAM, 229 IPPROTO_TCP); 230 if (!ASSERT_NEQ(test_case->servers[i], -1, "socket")) 231 return -1; 232 233 err = setsockopt(test_case->servers[i], SOL_SOCKET, 234 SO_REUSEPORT, &reuseport, sizeof(reuseport)); 235 if (!ASSERT_OK(err, "setsockopt - SO_REUSEPORT")) 236 return -1; 237 238 err = bind(test_case->servers[i], 239 (struct sockaddr *)&test_case->addr, 240 test_case->addrlen); 241 if (!ASSERT_OK(err, "bind")) 242 return -1; 243 244 if (i == 0) { 245 err = setsockopt(test_case->servers[i], SOL_SOCKET, 246 SO_ATTACH_REUSEPORT_EBPF, 247 &prog_fd, sizeof(prog_fd)); 248 if (!ASSERT_OK(err, 249 "setsockopt - SO_ATTACH_REUSEPORT_EBPF")) 250 return -1; 251 252 err = getsockname(test_case->servers[i], 253 (struct sockaddr *)&test_case->addr, 254 &test_case->addrlen); 255 if (!ASSERT_OK(err, "getsockname")) 256 return -1; 257 } 258 259 if (test_case->fastopen) { 260 err = setsockopt(test_case->servers[i], 261 SOL_TCP, TCP_FASTOPEN, 262 &qlen, sizeof(qlen)); 263 if (!ASSERT_OK(err, "setsockopt - TCP_FASTOPEN")) 264 return -1; 265 } 266 267 /* All requests will be tied to the first four listeners */ 268 if (i != MIGRATED_TO) { 269 err = listen(test_case->servers[i], qlen); 270 if (!ASSERT_OK(err, "listen")) 271 return -1; 272 } 273 } 274 275 return 0; 276} 277 278static int start_clients(struct migrate_reuseport_test_case *test_case) 279{ 280 char buf[MSGLEN] = MSG; 281 int i, err; 282 283 for (i = 0; i < NR_CLIENTS; i++) { 284 test_case->clients[i] = socket(test_case->family, SOCK_STREAM, 285 IPPROTO_TCP); 286 if (!ASSERT_NEQ(test_case->clients[i], -1, "socket")) 287 return -1; 288 289 /* The attached XDP program drops only the final ACK, so 290 * clients will transition to TCP_ESTABLISHED immediately. 291 */ 292 err = settimeo(test_case->clients[i], 100); 293 if (!ASSERT_OK(err, "settimeo")) 294 return -1; 295 296 if (test_case->fastopen) { 297 int fastopen = 1; 298 299 err = setsockopt(test_case->clients[i], IPPROTO_TCP, 300 TCP_FASTOPEN_CONNECT, &fastopen, 301 sizeof(fastopen)); 302 if (!ASSERT_OK(err, 303 "setsockopt - TCP_FASTOPEN_CONNECT")) 304 return -1; 305 } 306 307 err = connect(test_case->clients[i], 308 (struct sockaddr *)&test_case->addr, 309 test_case->addrlen); 310 if (!ASSERT_OK(err, "connect")) 311 return -1; 312 313 err = write(test_case->clients[i], buf, MSGLEN); 314 if (!ASSERT_EQ(err, MSGLEN, "write")) 315 return -1; 316 } 317 318 return 0; 319} 320 321static int update_maps(struct migrate_reuseport_test_case *test_case, 322 struct test_migrate_reuseport *skel) 323{ 324 int i, err, migrated_to = MIGRATED_TO; 325 int reuseport_map_fd, migrate_map_fd; 326 __u64 value; 327 328 reuseport_map_fd = bpf_map__fd(skel->maps.reuseport_map); 329 migrate_map_fd = bpf_map__fd(skel->maps.migrate_map); 330 331 for (i = 0; i < NR_SERVERS; i++) { 332 value = (__u64)test_case->servers[i]; 333 err = bpf_map_update_elem(reuseport_map_fd, &i, &value, 334 BPF_NOEXIST); 335 if (!ASSERT_OK(err, "bpf_map_update_elem - reuseport_map")) 336 return -1; 337 338 err = bpf_map_lookup_elem(reuseport_map_fd, &i, &value); 339 if (!ASSERT_OK(err, "bpf_map_lookup_elem - reuseport_map")) 340 return -1; 341 342 err = bpf_map_update_elem(migrate_map_fd, &value, &migrated_to, 343 BPF_NOEXIST); 344 if (!ASSERT_OK(err, "bpf_map_update_elem - migrate_map")) 345 return -1; 346 } 347 348 return 0; 349} 350 351static int migrate_dance(struct migrate_reuseport_test_case *test_case) 352{ 353 int i, err; 354 355 /* Migrate TCP_ESTABLISHED and TCP_SYN_RECV requests 356 * to the last listener based on eBPF. 357 */ 358 for (i = 0; i < MIGRATED_TO; i++) { 359 err = shutdown(test_case->servers[i], SHUT_RDWR); 360 if (!ASSERT_OK(err, "shutdown")) 361 return -1; 362 } 363 364 /* No dance for TCP_NEW_SYN_RECV to migrate based on eBPF */ 365 if (test_case->state == BPF_TCP_NEW_SYN_RECV) 366 return 0; 367 368 /* Note that we use the second listener instead of the 369 * first one here. 370 * 371 * The fist listener is bind()ed with port 0 and, 372 * SOCK_BINDPORT_LOCK is not set to sk_userlocks, so 373 * calling listen() again will bind() the first listener 374 * on a new ephemeral port and detach it from the existing 375 * reuseport group. (See: __inet_bind(), tcp_set_state()) 376 * 377 * OTOH, the second one is bind()ed with a specific port, 378 * and SOCK_BINDPORT_LOCK is set. Thus, re-listen() will 379 * resurrect the listener on the existing reuseport group. 380 */ 381 err = listen(test_case->servers[1], QLEN); 382 if (!ASSERT_OK(err, "listen")) 383 return -1; 384 385 /* Migrate from the last listener to the second one. 386 * 387 * All listeners were detached out of the reuseport_map, 388 * so migration will be done by kernel random pick from here. 389 */ 390 err = shutdown(test_case->servers[MIGRATED_TO], SHUT_RDWR); 391 if (!ASSERT_OK(err, "shutdown")) 392 return -1; 393 394 /* Back to the existing reuseport group */ 395 err = listen(test_case->servers[MIGRATED_TO], QLEN); 396 if (!ASSERT_OK(err, "listen")) 397 return -1; 398 399 /* Migrate back to the last one from the second one */ 400 err = shutdown(test_case->servers[1], SHUT_RDWR); 401 if (!ASSERT_OK(err, "shutdown")) 402 return -1; 403 404 return 0; 405} 406 407static void count_requests(struct migrate_reuseport_test_case *test_case, 408 struct test_migrate_reuseport *skel) 409{ 410 struct sockaddr_storage addr; 411 socklen_t len = sizeof(addr); 412 int err, cnt = 0, client; 413 char buf[MSGLEN]; 414 415 err = settimeo(test_case->servers[MIGRATED_TO], 4000); 416 if (!ASSERT_OK(err, "settimeo")) 417 goto out; 418 419 for (; cnt < NR_CLIENTS; cnt++) { 420 client = accept(test_case->servers[MIGRATED_TO], 421 (struct sockaddr *)&addr, &len); 422 if (!ASSERT_NEQ(client, -1, "accept")) 423 goto out; 424 425 memset(buf, 0, MSGLEN); 426 read(client, &buf, MSGLEN); 427 close(client); 428 429 if (!ASSERT_STREQ(buf, MSG, "read")) 430 goto out; 431 } 432 433out: 434 ASSERT_EQ(cnt, NR_CLIENTS, "count in userspace"); 435 436 switch (test_case->state) { 437 case BPF_TCP_ESTABLISHED: 438 cnt = skel->bss->migrated_at_close; 439 break; 440 case BPF_TCP_SYN_RECV: 441 cnt = skel->bss->migrated_at_close_fastopen; 442 break; 443 case BPF_TCP_NEW_SYN_RECV: 444 if (test_case->expire_synack_timer) 445 cnt = skel->bss->migrated_at_send_synack; 446 else 447 cnt = skel->bss->migrated_at_recv_ack; 448 break; 449 default: 450 cnt = 0; 451 } 452 453 ASSERT_EQ(cnt, NR_CLIENTS, "count in BPF prog"); 454} 455 456static void run_test(struct migrate_reuseport_test_case *test_case, 457 struct test_migrate_reuseport *skel) 458{ 459 int err, saved_len; 460 char buf[16]; 461 462 skel->bss->migrated_at_close = 0; 463 skel->bss->migrated_at_close_fastopen = 0; 464 skel->bss->migrated_at_send_synack = 0; 465 skel->bss->migrated_at_recv_ack = 0; 466 467 init_fds(test_case->servers, NR_SERVERS); 468 init_fds(test_case->clients, NR_CLIENTS); 469 470 if (test_case->fastopen) { 471 memset(buf, 0, sizeof(buf)); 472 473 err = setup_fastopen(buf, sizeof(buf), &saved_len, false); 474 if (!ASSERT_OK(err, "setup_fastopen - setup")) 475 return; 476 } 477 478 err = start_servers(test_case, skel); 479 if (!ASSERT_OK(err, "start_servers")) 480 goto close_servers; 481 482 if (test_case->drop_ack) { 483 /* Drop the final ACK of the 3-way handshake and stick the 484 * in-flight requests on TCP_SYN_RECV or TCP_NEW_SYN_RECV. 485 */ 486 err = drop_ack(test_case, skel); 487 if (!ASSERT_OK(err, "drop_ack")) 488 goto close_servers; 489 } 490 491 /* Tie requests to the first four listners */ 492 err = start_clients(test_case); 493 if (!ASSERT_OK(err, "start_clients")) 494 goto close_clients; 495 496 err = listen(test_case->servers[MIGRATED_TO], QLEN); 497 if (!ASSERT_OK(err, "listen")) 498 goto close_clients; 499 500 err = update_maps(test_case, skel); 501 if (!ASSERT_OK(err, "fill_maps")) 502 goto close_clients; 503 504 /* Migrate the requests in the accept queue only. 505 * TCP_NEW_SYN_RECV requests are not migrated at this point. 506 */ 507 err = migrate_dance(test_case); 508 if (!ASSERT_OK(err, "migrate_dance")) 509 goto close_clients; 510 511 if (test_case->expire_synack_timer) { 512 /* Wait for SYN+ACK timers to expire so that 513 * reqsk_timer_handler() migrates TCP_NEW_SYN_RECV requests. 514 */ 515 sleep(1); 516 } 517 518 if (test_case->link) { 519 /* Resume 3WHS and migrate TCP_NEW_SYN_RECV requests */ 520 err = pass_ack(test_case); 521 if (!ASSERT_OK(err, "pass_ack")) 522 goto close_clients; 523 } 524 525 count_requests(test_case, skel); 526 527close_clients: 528 close_fds(test_case->clients, NR_CLIENTS); 529 530 if (test_case->link) { 531 err = pass_ack(test_case); 532 ASSERT_OK(err, "pass_ack - clean up"); 533 } 534 535close_servers: 536 close_fds(test_case->servers, NR_SERVERS); 537 538 if (test_case->fastopen) { 539 err = setup_fastopen(buf, sizeof(buf), &saved_len, true); 540 ASSERT_OK(err, "setup_fastopen - restore"); 541 } 542} 543 544void serial_test_migrate_reuseport(void) 545{ 546 struct test_migrate_reuseport *skel; 547 int i; 548 549 skel = test_migrate_reuseport__open_and_load(); 550 if (!ASSERT_OK_PTR(skel, "open_and_load")) 551 return; 552 553 for (i = 0; i < ARRAY_SIZE(test_cases); i++) { 554 test__start_subtest(test_cases[i].name); 555 run_test(&test_cases[i], skel); 556 } 557 558 test_migrate_reuseport__destroy(skel); 559}