xsk_fwd.c (25770B)
1// SPDX-License-Identifier: GPL-2.0 2/* Copyright(c) 2020 Intel Corporation. */ 3 4#define _GNU_SOURCE 5#include <poll.h> 6#include <pthread.h> 7#include <signal.h> 8#include <sched.h> 9#include <stdio.h> 10#include <stdlib.h> 11#include <string.h> 12#include <sys/mman.h> 13#include <sys/socket.h> 14#include <sys/types.h> 15#include <time.h> 16#include <unistd.h> 17#include <getopt.h> 18#include <netinet/ether.h> 19#include <net/if.h> 20 21#include <linux/bpf.h> 22#include <linux/if_link.h> 23#include <linux/if_xdp.h> 24 25#include <bpf/libbpf.h> 26#include <bpf/xsk.h> 27#include <bpf/bpf.h> 28 29/* libbpf APIs for AF_XDP are deprecated starting from v0.7 */ 30#pragma GCC diagnostic ignored "-Wdeprecated-declarations" 31 32#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) 33 34typedef __u64 u64; 35typedef __u32 u32; 36typedef __u16 u16; 37typedef __u8 u8; 38 39/* This program illustrates the packet forwarding between multiple AF_XDP 40 * sockets in multi-threaded environment. All threads are sharing a common 41 * buffer pool, with each socket having its own private buffer cache. 42 * 43 * Example 1: Single thread handling two sockets. The packets received by socket 44 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue 45 * QB), while the packets received by socket B are forwarded to socket A. The 46 * thread is running on CPU core X: 47 * 48 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X 49 * 50 * Example 2: Two threads, each handling two sockets. The thread running on CPU 51 * core X forwards all the packets received by socket A to socket B, and all the 52 * packets received by socket B to socket A. The thread running on CPU core Y is 53 * performing the same packet forwarding between sockets C and D: 54 * 55 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD 56 * -c CX -c CY 57 */ 58 59/* 60 * Buffer pool and buffer cache 61 * 62 * For packet forwarding, the packet buffers are typically allocated from the 63 * pool for packet reception and freed back to the pool for further reuse once 64 * the packet transmission is completed. 65 * 66 * The buffer pool is shared between multiple threads. In order to minimize the 67 * access latency to the shared buffer pool, each thread creates one (or 68 * several) buffer caches, which, unlike the buffer pool, are private to the 69 * thread that creates them and therefore cannot be shared with other threads. 70 * The access to the shared pool is only needed either (A) when the cache gets 71 * empty due to repeated buffer allocations and it needs to be replenished from 72 * the pool, or (B) when the cache gets full due to repeated buffer free and it 73 * needs to be flushed back to the pull. 74 * 75 * In a packet forwarding system, a packet received on any input port can 76 * potentially be transmitted on any output port, depending on the forwarding 77 * configuration. For AF_XDP sockets, for this to work with zero-copy of the 78 * packet buffers when, it is required that the buffer pool memory fits into the 79 * UMEM area shared by all the sockets. 80 */ 81 82struct bpool_params { 83 u32 n_buffers; 84 u32 buffer_size; 85 int mmap_flags; 86 87 u32 n_users_max; 88 u32 n_buffers_per_slab; 89}; 90 91/* This buffer pool implementation organizes the buffers into equally sized 92 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the 93 * pool that are completely filled with buffer pointers (full slabs). 94 * 95 * Each buffer cache has a slab for buffer allocation and a slab for buffer 96 * free, with both of these slabs initially empty. When the cache's allocation 97 * slab goes empty, it is swapped with one of the available full slabs from the 98 * pool, if any is available. When the cache's free slab goes full, it is 99 * swapped for one of the empty slabs from the pool, which is guaranteed to 100 * succeed. 101 * 102 * Partially filled slabs never get traded between the cache and the pool 103 * (except when the cache itself is destroyed), which enables fast operation 104 * through pointer swapping. 105 */ 106struct bpool { 107 struct bpool_params params; 108 pthread_mutex_t lock; 109 void *addr; 110 111 u64 **slabs; 112 u64 **slabs_reserved; 113 u64 *buffers; 114 u64 *buffers_reserved; 115 116 u64 n_slabs; 117 u64 n_slabs_reserved; 118 u64 n_buffers; 119 120 u64 n_slabs_available; 121 u64 n_slabs_reserved_available; 122 123 struct xsk_umem_config umem_cfg; 124 struct xsk_ring_prod umem_fq; 125 struct xsk_ring_cons umem_cq; 126 struct xsk_umem *umem; 127}; 128 129static struct bpool * 130bpool_init(struct bpool_params *params, 131 struct xsk_umem_config *umem_cfg) 132{ 133 u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved; 134 u64 slabs_size, slabs_reserved_size; 135 u64 buffers_size, buffers_reserved_size; 136 u64 total_size, i; 137 struct bpool *bp; 138 u8 *p; 139 int status; 140 141 /* Use libbpf 1.0 API mode */ 142 libbpf_set_strict_mode(LIBBPF_STRICT_ALL); 143 144 /* bpool internals dimensioning. */ 145 n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) / 146 params->n_buffers_per_slab; 147 n_slabs_reserved = params->n_users_max * 2; 148 n_buffers = n_slabs * params->n_buffers_per_slab; 149 n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab; 150 151 slabs_size = n_slabs * sizeof(u64 *); 152 slabs_reserved_size = n_slabs_reserved * sizeof(u64 *); 153 buffers_size = n_buffers * sizeof(u64); 154 buffers_reserved_size = n_buffers_reserved * sizeof(u64); 155 156 total_size = sizeof(struct bpool) + 157 slabs_size + slabs_reserved_size + 158 buffers_size + buffers_reserved_size; 159 160 /* bpool memory allocation. */ 161 p = calloc(total_size, sizeof(u8)); 162 if (!p) 163 return NULL; 164 165 /* bpool memory initialization. */ 166 bp = (struct bpool *)p; 167 memcpy(&bp->params, params, sizeof(*params)); 168 bp->params.n_buffers = n_buffers; 169 170 bp->slabs = (u64 **)&p[sizeof(struct bpool)]; 171 bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) + 172 slabs_size]; 173 bp->buffers = (u64 *)&p[sizeof(struct bpool) + 174 slabs_size + slabs_reserved_size]; 175 bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) + 176 slabs_size + slabs_reserved_size + buffers_size]; 177 178 bp->n_slabs = n_slabs; 179 bp->n_slabs_reserved = n_slabs_reserved; 180 bp->n_buffers = n_buffers; 181 182 for (i = 0; i < n_slabs; i++) 183 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab]; 184 bp->n_slabs_available = n_slabs; 185 186 for (i = 0; i < n_slabs_reserved; i++) 187 bp->slabs_reserved[i] = &bp->buffers_reserved[i * 188 params->n_buffers_per_slab]; 189 bp->n_slabs_reserved_available = n_slabs_reserved; 190 191 for (i = 0; i < n_buffers; i++) 192 bp->buffers[i] = i * params->buffer_size; 193 194 /* lock. */ 195 status = pthread_mutex_init(&bp->lock, NULL); 196 if (status) { 197 free(p); 198 return NULL; 199 } 200 201 /* mmap. */ 202 bp->addr = mmap(NULL, 203 n_buffers * params->buffer_size, 204 PROT_READ | PROT_WRITE, 205 MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags, 206 -1, 207 0); 208 if (bp->addr == MAP_FAILED) { 209 pthread_mutex_destroy(&bp->lock); 210 free(p); 211 return NULL; 212 } 213 214 /* umem. */ 215 status = xsk_umem__create(&bp->umem, 216 bp->addr, 217 bp->params.n_buffers * bp->params.buffer_size, 218 &bp->umem_fq, 219 &bp->umem_cq, 220 umem_cfg); 221 if (status) { 222 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); 223 pthread_mutex_destroy(&bp->lock); 224 free(p); 225 return NULL; 226 } 227 memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg)); 228 229 return bp; 230} 231 232static void 233bpool_free(struct bpool *bp) 234{ 235 if (!bp) 236 return; 237 238 xsk_umem__delete(bp->umem); 239 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); 240 pthread_mutex_destroy(&bp->lock); 241 free(bp); 242} 243 244struct bcache { 245 struct bpool *bp; 246 247 u64 *slab_cons; 248 u64 *slab_prod; 249 250 u64 n_buffers_cons; 251 u64 n_buffers_prod; 252}; 253 254static u32 255bcache_slab_size(struct bcache *bc) 256{ 257 struct bpool *bp = bc->bp; 258 259 return bp->params.n_buffers_per_slab; 260} 261 262static struct bcache * 263bcache_init(struct bpool *bp) 264{ 265 struct bcache *bc; 266 267 bc = calloc(1, sizeof(struct bcache)); 268 if (!bc) 269 return NULL; 270 271 bc->bp = bp; 272 bc->n_buffers_cons = 0; 273 bc->n_buffers_prod = 0; 274 275 pthread_mutex_lock(&bp->lock); 276 if (bp->n_slabs_reserved_available == 0) { 277 pthread_mutex_unlock(&bp->lock); 278 free(bc); 279 return NULL; 280 } 281 282 bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1]; 283 bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2]; 284 bp->n_slabs_reserved_available -= 2; 285 pthread_mutex_unlock(&bp->lock); 286 287 return bc; 288} 289 290static void 291bcache_free(struct bcache *bc) 292{ 293 struct bpool *bp; 294 295 if (!bc) 296 return; 297 298 /* In order to keep this example simple, the case of freeing any 299 * existing buffers from the cache back to the pool is ignored. 300 */ 301 302 bp = bc->bp; 303 pthread_mutex_lock(&bp->lock); 304 bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod; 305 bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons; 306 bp->n_slabs_reserved_available += 2; 307 pthread_mutex_unlock(&bp->lock); 308 309 free(bc); 310} 311 312/* To work correctly, the implementation requires that the *n_buffers* input 313 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This 314 * is typically the case, with one exception taking place when large number of 315 * buffers are allocated at init time (e.g. for the UMEM fill queue setup). 316 */ 317static inline u32 318bcache_cons_check(struct bcache *bc, u32 n_buffers) 319{ 320 struct bpool *bp = bc->bp; 321 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; 322 u64 n_buffers_cons = bc->n_buffers_cons; 323 u64 n_slabs_available; 324 u64 *slab_full; 325 326 /* 327 * Consumer slab is not empty: Use what's available locally. Do not 328 * look for more buffers from the pool when the ask can only be 329 * partially satisfied. 330 */ 331 if (n_buffers_cons) 332 return (n_buffers_cons < n_buffers) ? 333 n_buffers_cons : 334 n_buffers; 335 336 /* 337 * Consumer slab is empty: look to trade the current consumer slab 338 * (full) for a full slab from the pool, if any is available. 339 */ 340 pthread_mutex_lock(&bp->lock); 341 n_slabs_available = bp->n_slabs_available; 342 if (!n_slabs_available) { 343 pthread_mutex_unlock(&bp->lock); 344 return 0; 345 } 346 347 n_slabs_available--; 348 slab_full = bp->slabs[n_slabs_available]; 349 bp->slabs[n_slabs_available] = bc->slab_cons; 350 bp->n_slabs_available = n_slabs_available; 351 pthread_mutex_unlock(&bp->lock); 352 353 bc->slab_cons = slab_full; 354 bc->n_buffers_cons = n_buffers_per_slab; 355 return n_buffers; 356} 357 358static inline u64 359bcache_cons(struct bcache *bc) 360{ 361 u64 n_buffers_cons = bc->n_buffers_cons - 1; 362 u64 buffer; 363 364 buffer = bc->slab_cons[n_buffers_cons]; 365 bc->n_buffers_cons = n_buffers_cons; 366 return buffer; 367} 368 369static inline void 370bcache_prod(struct bcache *bc, u64 buffer) 371{ 372 struct bpool *bp = bc->bp; 373 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; 374 u64 n_buffers_prod = bc->n_buffers_prod; 375 u64 n_slabs_available; 376 u64 *slab_empty; 377 378 /* 379 * Producer slab is not yet full: store the current buffer to it. 380 */ 381 if (n_buffers_prod < n_buffers_per_slab) { 382 bc->slab_prod[n_buffers_prod] = buffer; 383 bc->n_buffers_prod = n_buffers_prod + 1; 384 return; 385 } 386 387 /* 388 * Producer slab is full: trade the cache's current producer slab 389 * (full) for an empty slab from the pool, then store the current 390 * buffer to the new producer slab. As one full slab exists in the 391 * cache, it is guaranteed that there is at least one empty slab 392 * available in the pool. 393 */ 394 pthread_mutex_lock(&bp->lock); 395 n_slabs_available = bp->n_slabs_available; 396 slab_empty = bp->slabs[n_slabs_available]; 397 bp->slabs[n_slabs_available] = bc->slab_prod; 398 bp->n_slabs_available = n_slabs_available + 1; 399 pthread_mutex_unlock(&bp->lock); 400 401 slab_empty[0] = buffer; 402 bc->slab_prod = slab_empty; 403 bc->n_buffers_prod = 1; 404} 405 406/* 407 * Port 408 * 409 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for 410 * packet forwarding to happen with no packet buffer copy, all the sockets need 411 * to share the same UMEM area, which is used as the buffer pool memory. 412 */ 413#ifndef MAX_BURST_RX 414#define MAX_BURST_RX 64 415#endif 416 417#ifndef MAX_BURST_TX 418#define MAX_BURST_TX 64 419#endif 420 421struct burst_rx { 422 u64 addr[MAX_BURST_RX]; 423 u32 len[MAX_BURST_RX]; 424}; 425 426struct burst_tx { 427 u64 addr[MAX_BURST_TX]; 428 u32 len[MAX_BURST_TX]; 429 u32 n_pkts; 430}; 431 432struct port_params { 433 struct xsk_socket_config xsk_cfg; 434 struct bpool *bp; 435 const char *iface; 436 u32 iface_queue; 437}; 438 439struct port { 440 struct port_params params; 441 442 struct bcache *bc; 443 444 struct xsk_ring_cons rxq; 445 struct xsk_ring_prod txq; 446 struct xsk_ring_prod umem_fq; 447 struct xsk_ring_cons umem_cq; 448 struct xsk_socket *xsk; 449 int umem_fq_initialized; 450 451 u64 n_pkts_rx; 452 u64 n_pkts_tx; 453}; 454 455static void 456port_free(struct port *p) 457{ 458 if (!p) 459 return; 460 461 /* To keep this example simple, the code to free the buffers from the 462 * socket's receive and transmit queues, as well as from the UMEM fill 463 * and completion queues, is not included. 464 */ 465 466 if (p->xsk) 467 xsk_socket__delete(p->xsk); 468 469 bcache_free(p->bc); 470 471 free(p); 472} 473 474static struct port * 475port_init(struct port_params *params) 476{ 477 struct port *p; 478 u32 umem_fq_size, pos = 0; 479 int status, i; 480 481 /* Memory allocation and initialization. */ 482 p = calloc(sizeof(struct port), 1); 483 if (!p) 484 return NULL; 485 486 memcpy(&p->params, params, sizeof(p->params)); 487 umem_fq_size = params->bp->umem_cfg.fill_size; 488 489 /* bcache. */ 490 p->bc = bcache_init(params->bp); 491 if (!p->bc || 492 (bcache_slab_size(p->bc) < umem_fq_size) || 493 (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) { 494 port_free(p); 495 return NULL; 496 } 497 498 /* xsk socket. */ 499 status = xsk_socket__create_shared(&p->xsk, 500 params->iface, 501 params->iface_queue, 502 params->bp->umem, 503 &p->rxq, 504 &p->txq, 505 &p->umem_fq, 506 &p->umem_cq, 507 ¶ms->xsk_cfg); 508 if (status) { 509 port_free(p); 510 return NULL; 511 } 512 513 /* umem fq. */ 514 xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos); 515 516 for (i = 0; i < umem_fq_size; i++) 517 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = 518 bcache_cons(p->bc); 519 520 xsk_ring_prod__submit(&p->umem_fq, umem_fq_size); 521 p->umem_fq_initialized = 1; 522 523 return p; 524} 525 526static inline u32 527port_rx_burst(struct port *p, struct burst_rx *b) 528{ 529 u32 n_pkts, pos, i; 530 531 /* Free buffers for FQ replenish. */ 532 n_pkts = ARRAY_SIZE(b->addr); 533 534 n_pkts = bcache_cons_check(p->bc, n_pkts); 535 if (!n_pkts) 536 return 0; 537 538 /* RXQ. */ 539 n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos); 540 if (!n_pkts) { 541 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { 542 struct pollfd pollfd = { 543 .fd = xsk_socket__fd(p->xsk), 544 .events = POLLIN, 545 }; 546 547 poll(&pollfd, 1, 0); 548 } 549 return 0; 550 } 551 552 for (i = 0; i < n_pkts; i++) { 553 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr; 554 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len; 555 } 556 557 xsk_ring_cons__release(&p->rxq, n_pkts); 558 p->n_pkts_rx += n_pkts; 559 560 /* UMEM FQ. */ 561 for ( ; ; ) { 562 int status; 563 564 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos); 565 if (status == n_pkts) 566 break; 567 568 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { 569 struct pollfd pollfd = { 570 .fd = xsk_socket__fd(p->xsk), 571 .events = POLLIN, 572 }; 573 574 poll(&pollfd, 1, 0); 575 } 576 } 577 578 for (i = 0; i < n_pkts; i++) 579 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = 580 bcache_cons(p->bc); 581 582 xsk_ring_prod__submit(&p->umem_fq, n_pkts); 583 584 return n_pkts; 585} 586 587static inline void 588port_tx_burst(struct port *p, struct burst_tx *b) 589{ 590 u32 n_pkts, pos, i; 591 int status; 592 593 /* UMEM CQ. */ 594 n_pkts = p->params.bp->umem_cfg.comp_size; 595 596 n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos); 597 598 for (i = 0; i < n_pkts; i++) { 599 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i); 600 601 bcache_prod(p->bc, addr); 602 } 603 604 xsk_ring_cons__release(&p->umem_cq, n_pkts); 605 606 /* TXQ. */ 607 n_pkts = b->n_pkts; 608 609 for ( ; ; ) { 610 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos); 611 if (status == n_pkts) 612 break; 613 614 if (xsk_ring_prod__needs_wakeup(&p->txq)) 615 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, 616 NULL, 0); 617 } 618 619 for (i = 0; i < n_pkts; i++) { 620 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i]; 621 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i]; 622 } 623 624 xsk_ring_prod__submit(&p->txq, n_pkts); 625 if (xsk_ring_prod__needs_wakeup(&p->txq)) 626 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); 627 p->n_pkts_tx += n_pkts; 628} 629 630/* 631 * Thread 632 * 633 * Packet forwarding threads. 634 */ 635#ifndef MAX_PORTS_PER_THREAD 636#define MAX_PORTS_PER_THREAD 16 637#endif 638 639struct thread_data { 640 struct port *ports_rx[MAX_PORTS_PER_THREAD]; 641 struct port *ports_tx[MAX_PORTS_PER_THREAD]; 642 u32 n_ports_rx; 643 struct burst_rx burst_rx; 644 struct burst_tx burst_tx[MAX_PORTS_PER_THREAD]; 645 u32 cpu_core_id; 646 int quit; 647}; 648 649static void swap_mac_addresses(void *data) 650{ 651 struct ether_header *eth = (struct ether_header *)data; 652 struct ether_addr *src_addr = (struct ether_addr *)ð->ether_shost; 653 struct ether_addr *dst_addr = (struct ether_addr *)ð->ether_dhost; 654 struct ether_addr tmp; 655 656 tmp = *src_addr; 657 *src_addr = *dst_addr; 658 *dst_addr = tmp; 659} 660 661static void * 662thread_func(void *arg) 663{ 664 struct thread_data *t = arg; 665 cpu_set_t cpu_cores; 666 u32 i; 667 668 CPU_ZERO(&cpu_cores); 669 CPU_SET(t->cpu_core_id, &cpu_cores); 670 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores); 671 672 for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) { 673 struct port *port_rx = t->ports_rx[i]; 674 struct port *port_tx = t->ports_tx[i]; 675 struct burst_rx *brx = &t->burst_rx; 676 struct burst_tx *btx = &t->burst_tx[i]; 677 u32 n_pkts, j; 678 679 /* RX. */ 680 n_pkts = port_rx_burst(port_rx, brx); 681 if (!n_pkts) 682 continue; 683 684 /* Process & TX. */ 685 for (j = 0; j < n_pkts; j++) { 686 u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]); 687 u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr, 688 addr); 689 690 swap_mac_addresses(pkt); 691 692 btx->addr[btx->n_pkts] = brx->addr[j]; 693 btx->len[btx->n_pkts] = brx->len[j]; 694 btx->n_pkts++; 695 696 if (btx->n_pkts == MAX_BURST_TX) { 697 port_tx_burst(port_tx, btx); 698 btx->n_pkts = 0; 699 } 700 } 701 } 702 703 return NULL; 704} 705 706/* 707 * Process 708 */ 709static const struct bpool_params bpool_params_default = { 710 .n_buffers = 64 * 1024, 711 .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 712 .mmap_flags = 0, 713 714 .n_users_max = 16, 715 .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, 716}; 717 718static const struct xsk_umem_config umem_cfg_default = { 719 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, 720 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 721 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 722 .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM, 723 .flags = 0, 724}; 725 726static const struct port_params port_params_default = { 727 .xsk_cfg = { 728 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 729 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 730 .libbpf_flags = 0, 731 .xdp_flags = XDP_FLAGS_DRV_MODE, 732 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY, 733 }, 734 735 .bp = NULL, 736 .iface = NULL, 737 .iface_queue = 0, 738}; 739 740#ifndef MAX_PORTS 741#define MAX_PORTS 64 742#endif 743 744#ifndef MAX_THREADS 745#define MAX_THREADS 64 746#endif 747 748static struct bpool_params bpool_params; 749static struct xsk_umem_config umem_cfg; 750static struct bpool *bp; 751 752static struct port_params port_params[MAX_PORTS]; 753static struct port *ports[MAX_PORTS]; 754static u64 n_pkts_rx[MAX_PORTS]; 755static u64 n_pkts_tx[MAX_PORTS]; 756static int n_ports; 757 758static pthread_t threads[MAX_THREADS]; 759static struct thread_data thread_data[MAX_THREADS]; 760static int n_threads; 761 762static void 763print_usage(char *prog_name) 764{ 765 const char *usage = 766 "Usage:\n" 767 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n" 768 "\n" 769 "-c CORE CPU core to run a packet forwarding thread\n" 770 " on. May be invoked multiple times.\n" 771 "\n" 772 "-b SIZE Number of buffers in the buffer pool shared\n" 773 " by all the forwarding threads. Default: %u.\n" 774 "\n" 775 "-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n" 776 " pair specifies one forwarding port. May be\n" 777 " invoked multiple times.\n" 778 "\n" 779 "-q QUEUE Network interface queue for RX and TX. Each\n" 780 " (INTERFACE, QUEUE) pair specified one\n" 781 " forwarding port. Default: %u. May be invoked\n" 782 " multiple times.\n" 783 "\n"; 784 printf(usage, 785 prog_name, 786 bpool_params_default.n_buffers, 787 port_params_default.iface_queue); 788} 789 790static int 791parse_args(int argc, char **argv) 792{ 793 struct option lgopts[] = { 794 { NULL, 0, 0, 0 } 795 }; 796 int opt, option_index; 797 798 /* Parse the input arguments. */ 799 for ( ; ;) { 800 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index); 801 if (opt == EOF) 802 break; 803 804 switch (opt) { 805 case 'b': 806 bpool_params.n_buffers = atoi(optarg); 807 break; 808 809 case 'c': 810 if (n_threads == MAX_THREADS) { 811 printf("Max number of threads (%d) reached.\n", 812 MAX_THREADS); 813 return -1; 814 } 815 816 thread_data[n_threads].cpu_core_id = atoi(optarg); 817 n_threads++; 818 break; 819 820 case 'i': 821 if (n_ports == MAX_PORTS) { 822 printf("Max number of ports (%d) reached.\n", 823 MAX_PORTS); 824 return -1; 825 } 826 827 port_params[n_ports].iface = optarg; 828 port_params[n_ports].iface_queue = 0; 829 n_ports++; 830 break; 831 832 case 'q': 833 if (n_ports == 0) { 834 printf("No port specified for queue.\n"); 835 return -1; 836 } 837 port_params[n_ports - 1].iface_queue = atoi(optarg); 838 break; 839 840 default: 841 printf("Illegal argument.\n"); 842 return -1; 843 } 844 } 845 846 optind = 1; /* reset getopt lib */ 847 848 /* Check the input arguments. */ 849 if (!n_ports) { 850 printf("No ports specified.\n"); 851 return -1; 852 } 853 854 if (!n_threads) { 855 printf("No threads specified.\n"); 856 return -1; 857 } 858 859 if (n_ports % n_threads) { 860 printf("Ports cannot be evenly distributed to threads.\n"); 861 return -1; 862 } 863 864 return 0; 865} 866 867static void 868print_port(u32 port_id) 869{ 870 struct port *port = ports[port_id]; 871 872 printf("Port %u: interface = %s, queue = %u\n", 873 port_id, port->params.iface, port->params.iface_queue); 874} 875 876static void 877print_thread(u32 thread_id) 878{ 879 struct thread_data *t = &thread_data[thread_id]; 880 u32 i; 881 882 printf("Thread %u (CPU core %u): ", 883 thread_id, t->cpu_core_id); 884 885 for (i = 0; i < t->n_ports_rx; i++) { 886 struct port *port_rx = t->ports_rx[i]; 887 struct port *port_tx = t->ports_tx[i]; 888 889 printf("(%s, %u) -> (%s, %u), ", 890 port_rx->params.iface, 891 port_rx->params.iface_queue, 892 port_tx->params.iface, 893 port_tx->params.iface_queue); 894 } 895 896 printf("\n"); 897} 898 899static void 900print_port_stats_separator(void) 901{ 902 printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n", 903 "----", 904 "------------", 905 "-------------", 906 "------------", 907 "-------------"); 908} 909 910static void 911print_port_stats_header(void) 912{ 913 print_port_stats_separator(); 914 printf("| %4s | %12s | %13s | %12s | %13s |\n", 915 "Port", 916 "RX packets", 917 "RX rate (pps)", 918 "TX packets", 919 "TX_rate (pps)"); 920 print_port_stats_separator(); 921} 922 923static void 924print_port_stats_trailer(void) 925{ 926 print_port_stats_separator(); 927 printf("\n"); 928} 929 930static void 931print_port_stats(int port_id, u64 ns_diff) 932{ 933 struct port *p = ports[port_id]; 934 double rx_pps, tx_pps; 935 936 rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff; 937 tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff; 938 939 printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n", 940 port_id, 941 p->n_pkts_rx, 942 rx_pps, 943 p->n_pkts_tx, 944 tx_pps); 945 946 n_pkts_rx[port_id] = p->n_pkts_rx; 947 n_pkts_tx[port_id] = p->n_pkts_tx; 948} 949 950static void 951print_port_stats_all(u64 ns_diff) 952{ 953 int i; 954 955 print_port_stats_header(); 956 for (i = 0; i < n_ports; i++) 957 print_port_stats(i, ns_diff); 958 print_port_stats_trailer(); 959} 960 961static int quit; 962 963static void 964signal_handler(int sig) 965{ 966 quit = 1; 967} 968 969static void remove_xdp_program(void) 970{ 971 int i; 972 973 for (i = 0 ; i < n_ports; i++) 974 bpf_xdp_detach(if_nametoindex(port_params[i].iface), 975 port_params[i].xsk_cfg.xdp_flags, NULL); 976} 977 978int main(int argc, char **argv) 979{ 980 struct timespec time; 981 u64 ns0; 982 int i; 983 984 /* Parse args. */ 985 memcpy(&bpool_params, &bpool_params_default, 986 sizeof(struct bpool_params)); 987 memcpy(&umem_cfg, &umem_cfg_default, 988 sizeof(struct xsk_umem_config)); 989 for (i = 0; i < MAX_PORTS; i++) 990 memcpy(&port_params[i], &port_params_default, 991 sizeof(struct port_params)); 992 993 if (parse_args(argc, argv)) { 994 print_usage(argv[0]); 995 return -1; 996 } 997 998 /* Buffer pool initialization. */ 999 bp = bpool_init(&bpool_params, &umem_cfg); 1000 if (!bp) { 1001 printf("Buffer pool initialization failed.\n"); 1002 return -1; 1003 } 1004 printf("Buffer pool created successfully.\n"); 1005 1006 /* Ports initialization. */ 1007 for (i = 0; i < MAX_PORTS; i++) 1008 port_params[i].bp = bp; 1009 1010 for (i = 0; i < n_ports; i++) { 1011 ports[i] = port_init(&port_params[i]); 1012 if (!ports[i]) { 1013 printf("Port %d initialization failed.\n", i); 1014 return -1; 1015 } 1016 print_port(i); 1017 } 1018 printf("All ports created successfully.\n"); 1019 1020 /* Threads. */ 1021 for (i = 0; i < n_threads; i++) { 1022 struct thread_data *t = &thread_data[i]; 1023 u32 n_ports_per_thread = n_ports / n_threads, j; 1024 1025 for (j = 0; j < n_ports_per_thread; j++) { 1026 t->ports_rx[j] = ports[i * n_ports_per_thread + j]; 1027 t->ports_tx[j] = ports[i * n_ports_per_thread + 1028 (j + 1) % n_ports_per_thread]; 1029 } 1030 1031 t->n_ports_rx = n_ports_per_thread; 1032 1033 print_thread(i); 1034 } 1035 1036 for (i = 0; i < n_threads; i++) { 1037 int status; 1038 1039 status = pthread_create(&threads[i], 1040 NULL, 1041 thread_func, 1042 &thread_data[i]); 1043 if (status) { 1044 printf("Thread %d creation failed.\n", i); 1045 return -1; 1046 } 1047 } 1048 printf("All threads created successfully.\n"); 1049 1050 /* Print statistics. */ 1051 signal(SIGINT, signal_handler); 1052 signal(SIGTERM, signal_handler); 1053 signal(SIGABRT, signal_handler); 1054 1055 clock_gettime(CLOCK_MONOTONIC, &time); 1056 ns0 = time.tv_sec * 1000000000UL + time.tv_nsec; 1057 for ( ; !quit; ) { 1058 u64 ns1, ns_diff; 1059 1060 sleep(1); 1061 clock_gettime(CLOCK_MONOTONIC, &time); 1062 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec; 1063 ns_diff = ns1 - ns0; 1064 ns0 = ns1; 1065 1066 print_port_stats_all(ns_diff); 1067 } 1068 1069 /* Threads completion. */ 1070 printf("Quit.\n"); 1071 for (i = 0; i < n_threads; i++) 1072 thread_data[i].quit = 1; 1073 1074 for (i = 0; i < n_threads; i++) 1075 pthread_join(threads[i], NULL); 1076 1077 for (i = 0; i < n_ports; i++) 1078 port_free(ports[i]); 1079 1080 bpool_free(bp); 1081 1082 remove_xdp_program(); 1083 1084 return 0; 1085}