cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xsk_fwd.c (25770B)


      1// SPDX-License-Identifier: GPL-2.0
      2/* Copyright(c) 2020 Intel Corporation. */
      3
      4#define _GNU_SOURCE
      5#include <poll.h>
      6#include <pthread.h>
      7#include <signal.h>
      8#include <sched.h>
      9#include <stdio.h>
     10#include <stdlib.h>
     11#include <string.h>
     12#include <sys/mman.h>
     13#include <sys/socket.h>
     14#include <sys/types.h>
     15#include <time.h>
     16#include <unistd.h>
     17#include <getopt.h>
     18#include <netinet/ether.h>
     19#include <net/if.h>
     20
     21#include <linux/bpf.h>
     22#include <linux/if_link.h>
     23#include <linux/if_xdp.h>
     24
     25#include <bpf/libbpf.h>
     26#include <bpf/xsk.h>
     27#include <bpf/bpf.h>
     28
     29/* libbpf APIs for AF_XDP are deprecated starting from v0.7 */
     30#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
     31
     32#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
     33
     34typedef __u64 u64;
     35typedef __u32 u32;
     36typedef __u16 u16;
     37typedef __u8  u8;
     38
     39/* This program illustrates the packet forwarding between multiple AF_XDP
     40 * sockets in multi-threaded environment. All threads are sharing a common
     41 * buffer pool, with each socket having its own private buffer cache.
     42 *
     43 * Example 1: Single thread handling two sockets. The packets received by socket
     44 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
     45 * QB), while the packets received by socket B are forwarded to socket A. The
     46 * thread is running on CPU core X:
     47 *
     48 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
     49 *
     50 * Example 2: Two threads, each handling two sockets. The thread running on CPU
     51 * core X forwards all the packets received by socket A to socket B, and all the
     52 * packets received by socket B to socket A. The thread running on CPU core Y is
     53 * performing the same packet forwarding between sockets C and D:
     54 *
     55 *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
     56 *         -c CX -c CY
     57 */
     58
     59/*
     60 * Buffer pool and buffer cache
     61 *
     62 * For packet forwarding, the packet buffers are typically allocated from the
     63 * pool for packet reception and freed back to the pool for further reuse once
     64 * the packet transmission is completed.
     65 *
     66 * The buffer pool is shared between multiple threads. In order to minimize the
     67 * access latency to the shared buffer pool, each thread creates one (or
     68 * several) buffer caches, which, unlike the buffer pool, are private to the
     69 * thread that creates them and therefore cannot be shared with other threads.
     70 * The access to the shared pool is only needed either (A) when the cache gets
     71 * empty due to repeated buffer allocations and it needs to be replenished from
     72 * the pool, or (B) when the cache gets full due to repeated buffer free and it
     73 * needs to be flushed back to the pull.
     74 *
     75 * In a packet forwarding system, a packet received on any input port can
     76 * potentially be transmitted on any output port, depending on the forwarding
     77 * configuration. For AF_XDP sockets, for this to work with zero-copy of the
     78 * packet buffers when, it is required that the buffer pool memory fits into the
     79 * UMEM area shared by all the sockets.
     80 */
     81
     82struct bpool_params {
     83	u32 n_buffers;
     84	u32 buffer_size;
     85	int mmap_flags;
     86
     87	u32 n_users_max;
     88	u32 n_buffers_per_slab;
     89};
     90
     91/* This buffer pool implementation organizes the buffers into equally sized
     92 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
     93 * pool that are completely filled with buffer pointers (full slabs).
     94 *
     95 * Each buffer cache has a slab for buffer allocation and a slab for buffer
     96 * free, with both of these slabs initially empty. When the cache's allocation
     97 * slab goes empty, it is swapped with one of the available full slabs from the
     98 * pool, if any is available. When the cache's free slab goes full, it is
     99 * swapped for one of the empty slabs from the pool, which is guaranteed to
    100 * succeed.
    101 *
    102 * Partially filled slabs never get traded between the cache and the pool
    103 * (except when the cache itself is destroyed), which enables fast operation
    104 * through pointer swapping.
    105 */
    106struct bpool {
    107	struct bpool_params params;
    108	pthread_mutex_t lock;
    109	void *addr;
    110
    111	u64 **slabs;
    112	u64 **slabs_reserved;
    113	u64 *buffers;
    114	u64 *buffers_reserved;
    115
    116	u64 n_slabs;
    117	u64 n_slabs_reserved;
    118	u64 n_buffers;
    119
    120	u64 n_slabs_available;
    121	u64 n_slabs_reserved_available;
    122
    123	struct xsk_umem_config umem_cfg;
    124	struct xsk_ring_prod umem_fq;
    125	struct xsk_ring_cons umem_cq;
    126	struct xsk_umem *umem;
    127};
    128
    129static struct bpool *
    130bpool_init(struct bpool_params *params,
    131	   struct xsk_umem_config *umem_cfg)
    132{
    133	u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
    134	u64 slabs_size, slabs_reserved_size;
    135	u64 buffers_size, buffers_reserved_size;
    136	u64 total_size, i;
    137	struct bpool *bp;
    138	u8 *p;
    139	int status;
    140
    141	/* Use libbpf 1.0 API mode */
    142	libbpf_set_strict_mode(LIBBPF_STRICT_ALL);
    143
    144	/* bpool internals dimensioning. */
    145	n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
    146		params->n_buffers_per_slab;
    147	n_slabs_reserved = params->n_users_max * 2;
    148	n_buffers = n_slabs * params->n_buffers_per_slab;
    149	n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
    150
    151	slabs_size = n_slabs * sizeof(u64 *);
    152	slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
    153	buffers_size = n_buffers * sizeof(u64);
    154	buffers_reserved_size = n_buffers_reserved * sizeof(u64);
    155
    156	total_size = sizeof(struct bpool) +
    157		slabs_size + slabs_reserved_size +
    158		buffers_size + buffers_reserved_size;
    159
    160	/* bpool memory allocation. */
    161	p = calloc(total_size, sizeof(u8));
    162	if (!p)
    163		return NULL;
    164
    165	/* bpool memory initialization. */
    166	bp = (struct bpool *)p;
    167	memcpy(&bp->params, params, sizeof(*params));
    168	bp->params.n_buffers = n_buffers;
    169
    170	bp->slabs = (u64 **)&p[sizeof(struct bpool)];
    171	bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
    172		slabs_size];
    173	bp->buffers = (u64 *)&p[sizeof(struct bpool) +
    174		slabs_size + slabs_reserved_size];
    175	bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
    176		slabs_size + slabs_reserved_size + buffers_size];
    177
    178	bp->n_slabs = n_slabs;
    179	bp->n_slabs_reserved = n_slabs_reserved;
    180	bp->n_buffers = n_buffers;
    181
    182	for (i = 0; i < n_slabs; i++)
    183		bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
    184	bp->n_slabs_available = n_slabs;
    185
    186	for (i = 0; i < n_slabs_reserved; i++)
    187		bp->slabs_reserved[i] = &bp->buffers_reserved[i *
    188			params->n_buffers_per_slab];
    189	bp->n_slabs_reserved_available = n_slabs_reserved;
    190
    191	for (i = 0; i < n_buffers; i++)
    192		bp->buffers[i] = i * params->buffer_size;
    193
    194	/* lock. */
    195	status = pthread_mutex_init(&bp->lock, NULL);
    196	if (status) {
    197		free(p);
    198		return NULL;
    199	}
    200
    201	/* mmap. */
    202	bp->addr = mmap(NULL,
    203			n_buffers * params->buffer_size,
    204			PROT_READ | PROT_WRITE,
    205			MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
    206			-1,
    207			0);
    208	if (bp->addr == MAP_FAILED) {
    209		pthread_mutex_destroy(&bp->lock);
    210		free(p);
    211		return NULL;
    212	}
    213
    214	/* umem. */
    215	status = xsk_umem__create(&bp->umem,
    216				  bp->addr,
    217				  bp->params.n_buffers * bp->params.buffer_size,
    218				  &bp->umem_fq,
    219				  &bp->umem_cq,
    220				  umem_cfg);
    221	if (status) {
    222		munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
    223		pthread_mutex_destroy(&bp->lock);
    224		free(p);
    225		return NULL;
    226	}
    227	memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
    228
    229	return bp;
    230}
    231
    232static void
    233bpool_free(struct bpool *bp)
    234{
    235	if (!bp)
    236		return;
    237
    238	xsk_umem__delete(bp->umem);
    239	munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
    240	pthread_mutex_destroy(&bp->lock);
    241	free(bp);
    242}
    243
    244struct bcache {
    245	struct bpool *bp;
    246
    247	u64 *slab_cons;
    248	u64 *slab_prod;
    249
    250	u64 n_buffers_cons;
    251	u64 n_buffers_prod;
    252};
    253
    254static u32
    255bcache_slab_size(struct bcache *bc)
    256{
    257	struct bpool *bp = bc->bp;
    258
    259	return bp->params.n_buffers_per_slab;
    260}
    261
    262static struct bcache *
    263bcache_init(struct bpool *bp)
    264{
    265	struct bcache *bc;
    266
    267	bc = calloc(1, sizeof(struct bcache));
    268	if (!bc)
    269		return NULL;
    270
    271	bc->bp = bp;
    272	bc->n_buffers_cons = 0;
    273	bc->n_buffers_prod = 0;
    274
    275	pthread_mutex_lock(&bp->lock);
    276	if (bp->n_slabs_reserved_available == 0) {
    277		pthread_mutex_unlock(&bp->lock);
    278		free(bc);
    279		return NULL;
    280	}
    281
    282	bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
    283	bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
    284	bp->n_slabs_reserved_available -= 2;
    285	pthread_mutex_unlock(&bp->lock);
    286
    287	return bc;
    288}
    289
    290static void
    291bcache_free(struct bcache *bc)
    292{
    293	struct bpool *bp;
    294
    295	if (!bc)
    296		return;
    297
    298	/* In order to keep this example simple, the case of freeing any
    299	 * existing buffers from the cache back to the pool is ignored.
    300	 */
    301
    302	bp = bc->bp;
    303	pthread_mutex_lock(&bp->lock);
    304	bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
    305	bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
    306	bp->n_slabs_reserved_available += 2;
    307	pthread_mutex_unlock(&bp->lock);
    308
    309	free(bc);
    310}
    311
    312/* To work correctly, the implementation requires that the *n_buffers* input
    313 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
    314 * is typically the case, with one exception taking place when large number of
    315 * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
    316 */
    317static inline u32
    318bcache_cons_check(struct bcache *bc, u32 n_buffers)
    319{
    320	struct bpool *bp = bc->bp;
    321	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
    322	u64 n_buffers_cons = bc->n_buffers_cons;
    323	u64 n_slabs_available;
    324	u64 *slab_full;
    325
    326	/*
    327	 * Consumer slab is not empty: Use what's available locally. Do not
    328	 * look for more buffers from the pool when the ask can only be
    329	 * partially satisfied.
    330	 */
    331	if (n_buffers_cons)
    332		return (n_buffers_cons < n_buffers) ?
    333			n_buffers_cons :
    334			n_buffers;
    335
    336	/*
    337	 * Consumer slab is empty: look to trade the current consumer slab
    338	 * (full) for a full slab from the pool, if any is available.
    339	 */
    340	pthread_mutex_lock(&bp->lock);
    341	n_slabs_available = bp->n_slabs_available;
    342	if (!n_slabs_available) {
    343		pthread_mutex_unlock(&bp->lock);
    344		return 0;
    345	}
    346
    347	n_slabs_available--;
    348	slab_full = bp->slabs[n_slabs_available];
    349	bp->slabs[n_slabs_available] = bc->slab_cons;
    350	bp->n_slabs_available = n_slabs_available;
    351	pthread_mutex_unlock(&bp->lock);
    352
    353	bc->slab_cons = slab_full;
    354	bc->n_buffers_cons = n_buffers_per_slab;
    355	return n_buffers;
    356}
    357
    358static inline u64
    359bcache_cons(struct bcache *bc)
    360{
    361	u64 n_buffers_cons = bc->n_buffers_cons - 1;
    362	u64 buffer;
    363
    364	buffer = bc->slab_cons[n_buffers_cons];
    365	bc->n_buffers_cons = n_buffers_cons;
    366	return buffer;
    367}
    368
    369static inline void
    370bcache_prod(struct bcache *bc, u64 buffer)
    371{
    372	struct bpool *bp = bc->bp;
    373	u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
    374	u64 n_buffers_prod = bc->n_buffers_prod;
    375	u64 n_slabs_available;
    376	u64 *slab_empty;
    377
    378	/*
    379	 * Producer slab is not yet full: store the current buffer to it.
    380	 */
    381	if (n_buffers_prod < n_buffers_per_slab) {
    382		bc->slab_prod[n_buffers_prod] = buffer;
    383		bc->n_buffers_prod = n_buffers_prod + 1;
    384		return;
    385	}
    386
    387	/*
    388	 * Producer slab is full: trade the cache's current producer slab
    389	 * (full) for an empty slab from the pool, then store the current
    390	 * buffer to the new producer slab. As one full slab exists in the
    391	 * cache, it is guaranteed that there is at least one empty slab
    392	 * available in the pool.
    393	 */
    394	pthread_mutex_lock(&bp->lock);
    395	n_slabs_available = bp->n_slabs_available;
    396	slab_empty = bp->slabs[n_slabs_available];
    397	bp->slabs[n_slabs_available] = bc->slab_prod;
    398	bp->n_slabs_available = n_slabs_available + 1;
    399	pthread_mutex_unlock(&bp->lock);
    400
    401	slab_empty[0] = buffer;
    402	bc->slab_prod = slab_empty;
    403	bc->n_buffers_prod = 1;
    404}
    405
    406/*
    407 * Port
    408 *
    409 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
    410 * packet forwarding to happen with no packet buffer copy, all the sockets need
    411 * to share the same UMEM area, which is used as the buffer pool memory.
    412 */
    413#ifndef MAX_BURST_RX
    414#define MAX_BURST_RX 64
    415#endif
    416
    417#ifndef MAX_BURST_TX
    418#define MAX_BURST_TX 64
    419#endif
    420
    421struct burst_rx {
    422	u64 addr[MAX_BURST_RX];
    423	u32 len[MAX_BURST_RX];
    424};
    425
    426struct burst_tx {
    427	u64 addr[MAX_BURST_TX];
    428	u32 len[MAX_BURST_TX];
    429	u32 n_pkts;
    430};
    431
    432struct port_params {
    433	struct xsk_socket_config xsk_cfg;
    434	struct bpool *bp;
    435	const char *iface;
    436	u32 iface_queue;
    437};
    438
    439struct port {
    440	struct port_params params;
    441
    442	struct bcache *bc;
    443
    444	struct xsk_ring_cons rxq;
    445	struct xsk_ring_prod txq;
    446	struct xsk_ring_prod umem_fq;
    447	struct xsk_ring_cons umem_cq;
    448	struct xsk_socket *xsk;
    449	int umem_fq_initialized;
    450
    451	u64 n_pkts_rx;
    452	u64 n_pkts_tx;
    453};
    454
    455static void
    456port_free(struct port *p)
    457{
    458	if (!p)
    459		return;
    460
    461	/* To keep this example simple, the code to free the buffers from the
    462	 * socket's receive and transmit queues, as well as from the UMEM fill
    463	 * and completion queues, is not included.
    464	 */
    465
    466	if (p->xsk)
    467		xsk_socket__delete(p->xsk);
    468
    469	bcache_free(p->bc);
    470
    471	free(p);
    472}
    473
    474static struct port *
    475port_init(struct port_params *params)
    476{
    477	struct port *p;
    478	u32 umem_fq_size, pos = 0;
    479	int status, i;
    480
    481	/* Memory allocation and initialization. */
    482	p = calloc(sizeof(struct port), 1);
    483	if (!p)
    484		return NULL;
    485
    486	memcpy(&p->params, params, sizeof(p->params));
    487	umem_fq_size = params->bp->umem_cfg.fill_size;
    488
    489	/* bcache. */
    490	p->bc = bcache_init(params->bp);
    491	if (!p->bc ||
    492	    (bcache_slab_size(p->bc) < umem_fq_size) ||
    493	    (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
    494		port_free(p);
    495		return NULL;
    496	}
    497
    498	/* xsk socket. */
    499	status = xsk_socket__create_shared(&p->xsk,
    500					   params->iface,
    501					   params->iface_queue,
    502					   params->bp->umem,
    503					   &p->rxq,
    504					   &p->txq,
    505					   &p->umem_fq,
    506					   &p->umem_cq,
    507					   &params->xsk_cfg);
    508	if (status) {
    509		port_free(p);
    510		return NULL;
    511	}
    512
    513	/* umem fq. */
    514	xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
    515
    516	for (i = 0; i < umem_fq_size; i++)
    517		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
    518			bcache_cons(p->bc);
    519
    520	xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
    521	p->umem_fq_initialized = 1;
    522
    523	return p;
    524}
    525
    526static inline u32
    527port_rx_burst(struct port *p, struct burst_rx *b)
    528{
    529	u32 n_pkts, pos, i;
    530
    531	/* Free buffers for FQ replenish. */
    532	n_pkts = ARRAY_SIZE(b->addr);
    533
    534	n_pkts = bcache_cons_check(p->bc, n_pkts);
    535	if (!n_pkts)
    536		return 0;
    537
    538	/* RXQ. */
    539	n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
    540	if (!n_pkts) {
    541		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
    542			struct pollfd pollfd = {
    543				.fd = xsk_socket__fd(p->xsk),
    544				.events = POLLIN,
    545			};
    546
    547			poll(&pollfd, 1, 0);
    548		}
    549		return 0;
    550	}
    551
    552	for (i = 0; i < n_pkts; i++) {
    553		b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
    554		b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
    555	}
    556
    557	xsk_ring_cons__release(&p->rxq, n_pkts);
    558	p->n_pkts_rx += n_pkts;
    559
    560	/* UMEM FQ. */
    561	for ( ; ; ) {
    562		int status;
    563
    564		status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
    565		if (status == n_pkts)
    566			break;
    567
    568		if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
    569			struct pollfd pollfd = {
    570				.fd = xsk_socket__fd(p->xsk),
    571				.events = POLLIN,
    572			};
    573
    574			poll(&pollfd, 1, 0);
    575		}
    576	}
    577
    578	for (i = 0; i < n_pkts; i++)
    579		*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
    580			bcache_cons(p->bc);
    581
    582	xsk_ring_prod__submit(&p->umem_fq, n_pkts);
    583
    584	return n_pkts;
    585}
    586
    587static inline void
    588port_tx_burst(struct port *p, struct burst_tx *b)
    589{
    590	u32 n_pkts, pos, i;
    591	int status;
    592
    593	/* UMEM CQ. */
    594	n_pkts = p->params.bp->umem_cfg.comp_size;
    595
    596	n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
    597
    598	for (i = 0; i < n_pkts; i++) {
    599		u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
    600
    601		bcache_prod(p->bc, addr);
    602	}
    603
    604	xsk_ring_cons__release(&p->umem_cq, n_pkts);
    605
    606	/* TXQ. */
    607	n_pkts = b->n_pkts;
    608
    609	for ( ; ; ) {
    610		status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
    611		if (status == n_pkts)
    612			break;
    613
    614		if (xsk_ring_prod__needs_wakeup(&p->txq))
    615			sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
    616			       NULL, 0);
    617	}
    618
    619	for (i = 0; i < n_pkts; i++) {
    620		xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
    621		xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
    622	}
    623
    624	xsk_ring_prod__submit(&p->txq, n_pkts);
    625	if (xsk_ring_prod__needs_wakeup(&p->txq))
    626		sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
    627	p->n_pkts_tx += n_pkts;
    628}
    629
    630/*
    631 * Thread
    632 *
    633 * Packet forwarding threads.
    634 */
    635#ifndef MAX_PORTS_PER_THREAD
    636#define MAX_PORTS_PER_THREAD 16
    637#endif
    638
    639struct thread_data {
    640	struct port *ports_rx[MAX_PORTS_PER_THREAD];
    641	struct port *ports_tx[MAX_PORTS_PER_THREAD];
    642	u32 n_ports_rx;
    643	struct burst_rx burst_rx;
    644	struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
    645	u32 cpu_core_id;
    646	int quit;
    647};
    648
    649static void swap_mac_addresses(void *data)
    650{
    651	struct ether_header *eth = (struct ether_header *)data;
    652	struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
    653	struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
    654	struct ether_addr tmp;
    655
    656	tmp = *src_addr;
    657	*src_addr = *dst_addr;
    658	*dst_addr = tmp;
    659}
    660
    661static void *
    662thread_func(void *arg)
    663{
    664	struct thread_data *t = arg;
    665	cpu_set_t cpu_cores;
    666	u32 i;
    667
    668	CPU_ZERO(&cpu_cores);
    669	CPU_SET(t->cpu_core_id, &cpu_cores);
    670	pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
    671
    672	for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
    673		struct port *port_rx = t->ports_rx[i];
    674		struct port *port_tx = t->ports_tx[i];
    675		struct burst_rx *brx = &t->burst_rx;
    676		struct burst_tx *btx = &t->burst_tx[i];
    677		u32 n_pkts, j;
    678
    679		/* RX. */
    680		n_pkts = port_rx_burst(port_rx, brx);
    681		if (!n_pkts)
    682			continue;
    683
    684		/* Process & TX. */
    685		for (j = 0; j < n_pkts; j++) {
    686			u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
    687			u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
    688						     addr);
    689
    690			swap_mac_addresses(pkt);
    691
    692			btx->addr[btx->n_pkts] = brx->addr[j];
    693			btx->len[btx->n_pkts] = brx->len[j];
    694			btx->n_pkts++;
    695
    696			if (btx->n_pkts == MAX_BURST_TX) {
    697				port_tx_burst(port_tx, btx);
    698				btx->n_pkts = 0;
    699			}
    700		}
    701	}
    702
    703	return NULL;
    704}
    705
    706/*
    707 * Process
    708 */
    709static const struct bpool_params bpool_params_default = {
    710	.n_buffers = 64 * 1024,
    711	.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
    712	.mmap_flags = 0,
    713
    714	.n_users_max = 16,
    715	.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
    716};
    717
    718static const struct xsk_umem_config umem_cfg_default = {
    719	.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
    720	.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
    721	.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
    722	.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
    723	.flags = 0,
    724};
    725
    726static const struct port_params port_params_default = {
    727	.xsk_cfg = {
    728		.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
    729		.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
    730		.libbpf_flags = 0,
    731		.xdp_flags = XDP_FLAGS_DRV_MODE,
    732		.bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
    733	},
    734
    735	.bp = NULL,
    736	.iface = NULL,
    737	.iface_queue = 0,
    738};
    739
    740#ifndef MAX_PORTS
    741#define MAX_PORTS 64
    742#endif
    743
    744#ifndef MAX_THREADS
    745#define MAX_THREADS 64
    746#endif
    747
    748static struct bpool_params bpool_params;
    749static struct xsk_umem_config umem_cfg;
    750static struct bpool *bp;
    751
    752static struct port_params port_params[MAX_PORTS];
    753static struct port *ports[MAX_PORTS];
    754static u64 n_pkts_rx[MAX_PORTS];
    755static u64 n_pkts_tx[MAX_PORTS];
    756static int n_ports;
    757
    758static pthread_t threads[MAX_THREADS];
    759static struct thread_data thread_data[MAX_THREADS];
    760static int n_threads;
    761
    762static void
    763print_usage(char *prog_name)
    764{
    765	const char *usage =
    766		"Usage:\n"
    767		"\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
    768		"\n"
    769		"-c CORE        CPU core to run a packet forwarding thread\n"
    770		"               on. May be invoked multiple times.\n"
    771		"\n"
    772		"-b SIZE        Number of buffers in the buffer pool shared\n"
    773		"               by all the forwarding threads. Default: %u.\n"
    774		"\n"
    775		"-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
    776		"               pair specifies one forwarding port. May be\n"
    777		"               invoked multiple times.\n"
    778		"\n"
    779		"-q QUEUE       Network interface queue for RX and TX. Each\n"
    780		"               (INTERFACE, QUEUE) pair specified one\n"
    781		"               forwarding port. Default: %u. May be invoked\n"
    782		"               multiple times.\n"
    783		"\n";
    784	printf(usage,
    785	       prog_name,
    786	       bpool_params_default.n_buffers,
    787	       port_params_default.iface_queue);
    788}
    789
    790static int
    791parse_args(int argc, char **argv)
    792{
    793	struct option lgopts[] = {
    794		{ NULL,  0, 0, 0 }
    795	};
    796	int opt, option_index;
    797
    798	/* Parse the input arguments. */
    799	for ( ; ;) {
    800		opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
    801		if (opt == EOF)
    802			break;
    803
    804		switch (opt) {
    805		case 'b':
    806			bpool_params.n_buffers = atoi(optarg);
    807			break;
    808
    809		case 'c':
    810			if (n_threads == MAX_THREADS) {
    811				printf("Max number of threads (%d) reached.\n",
    812				       MAX_THREADS);
    813				return -1;
    814			}
    815
    816			thread_data[n_threads].cpu_core_id = atoi(optarg);
    817			n_threads++;
    818			break;
    819
    820		case 'i':
    821			if (n_ports == MAX_PORTS) {
    822				printf("Max number of ports (%d) reached.\n",
    823				       MAX_PORTS);
    824				return -1;
    825			}
    826
    827			port_params[n_ports].iface = optarg;
    828			port_params[n_ports].iface_queue = 0;
    829			n_ports++;
    830			break;
    831
    832		case 'q':
    833			if (n_ports == 0) {
    834				printf("No port specified for queue.\n");
    835				return -1;
    836			}
    837			port_params[n_ports - 1].iface_queue = atoi(optarg);
    838			break;
    839
    840		default:
    841			printf("Illegal argument.\n");
    842			return -1;
    843		}
    844	}
    845
    846	optind = 1; /* reset getopt lib */
    847
    848	/* Check the input arguments. */
    849	if (!n_ports) {
    850		printf("No ports specified.\n");
    851		return -1;
    852	}
    853
    854	if (!n_threads) {
    855		printf("No threads specified.\n");
    856		return -1;
    857	}
    858
    859	if (n_ports % n_threads) {
    860		printf("Ports cannot be evenly distributed to threads.\n");
    861		return -1;
    862	}
    863
    864	return 0;
    865}
    866
    867static void
    868print_port(u32 port_id)
    869{
    870	struct port *port = ports[port_id];
    871
    872	printf("Port %u: interface = %s, queue = %u\n",
    873	       port_id, port->params.iface, port->params.iface_queue);
    874}
    875
    876static void
    877print_thread(u32 thread_id)
    878{
    879	struct thread_data *t = &thread_data[thread_id];
    880	u32 i;
    881
    882	printf("Thread %u (CPU core %u): ",
    883	       thread_id, t->cpu_core_id);
    884
    885	for (i = 0; i < t->n_ports_rx; i++) {
    886		struct port *port_rx = t->ports_rx[i];
    887		struct port *port_tx = t->ports_tx[i];
    888
    889		printf("(%s, %u) -> (%s, %u), ",
    890		       port_rx->params.iface,
    891		       port_rx->params.iface_queue,
    892		       port_tx->params.iface,
    893		       port_tx->params.iface_queue);
    894	}
    895
    896	printf("\n");
    897}
    898
    899static void
    900print_port_stats_separator(void)
    901{
    902	printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
    903	       "----",
    904	       "------------",
    905	       "-------------",
    906	       "------------",
    907	       "-------------");
    908}
    909
    910static void
    911print_port_stats_header(void)
    912{
    913	print_port_stats_separator();
    914	printf("| %4s | %12s | %13s | %12s | %13s |\n",
    915	       "Port",
    916	       "RX packets",
    917	       "RX rate (pps)",
    918	       "TX packets",
    919	       "TX_rate (pps)");
    920	print_port_stats_separator();
    921}
    922
    923static void
    924print_port_stats_trailer(void)
    925{
    926	print_port_stats_separator();
    927	printf("\n");
    928}
    929
    930static void
    931print_port_stats(int port_id, u64 ns_diff)
    932{
    933	struct port *p = ports[port_id];
    934	double rx_pps, tx_pps;
    935
    936	rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
    937	tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
    938
    939	printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
    940	       port_id,
    941	       p->n_pkts_rx,
    942	       rx_pps,
    943	       p->n_pkts_tx,
    944	       tx_pps);
    945
    946	n_pkts_rx[port_id] = p->n_pkts_rx;
    947	n_pkts_tx[port_id] = p->n_pkts_tx;
    948}
    949
    950static void
    951print_port_stats_all(u64 ns_diff)
    952{
    953	int i;
    954
    955	print_port_stats_header();
    956	for (i = 0; i < n_ports; i++)
    957		print_port_stats(i, ns_diff);
    958	print_port_stats_trailer();
    959}
    960
    961static int quit;
    962
    963static void
    964signal_handler(int sig)
    965{
    966	quit = 1;
    967}
    968
    969static void remove_xdp_program(void)
    970{
    971	int i;
    972
    973	for (i = 0 ; i < n_ports; i++)
    974		bpf_xdp_detach(if_nametoindex(port_params[i].iface),
    975			       port_params[i].xsk_cfg.xdp_flags, NULL);
    976}
    977
    978int main(int argc, char **argv)
    979{
    980	struct timespec time;
    981	u64 ns0;
    982	int i;
    983
    984	/* Parse args. */
    985	memcpy(&bpool_params, &bpool_params_default,
    986	       sizeof(struct bpool_params));
    987	memcpy(&umem_cfg, &umem_cfg_default,
    988	       sizeof(struct xsk_umem_config));
    989	for (i = 0; i < MAX_PORTS; i++)
    990		memcpy(&port_params[i], &port_params_default,
    991		       sizeof(struct port_params));
    992
    993	if (parse_args(argc, argv)) {
    994		print_usage(argv[0]);
    995		return -1;
    996	}
    997
    998	/* Buffer pool initialization. */
    999	bp = bpool_init(&bpool_params, &umem_cfg);
   1000	if (!bp) {
   1001		printf("Buffer pool initialization failed.\n");
   1002		return -1;
   1003	}
   1004	printf("Buffer pool created successfully.\n");
   1005
   1006	/* Ports initialization. */
   1007	for (i = 0; i < MAX_PORTS; i++)
   1008		port_params[i].bp = bp;
   1009
   1010	for (i = 0; i < n_ports; i++) {
   1011		ports[i] = port_init(&port_params[i]);
   1012		if (!ports[i]) {
   1013			printf("Port %d initialization failed.\n", i);
   1014			return -1;
   1015		}
   1016		print_port(i);
   1017	}
   1018	printf("All ports created successfully.\n");
   1019
   1020	/* Threads. */
   1021	for (i = 0; i < n_threads; i++) {
   1022		struct thread_data *t = &thread_data[i];
   1023		u32 n_ports_per_thread = n_ports / n_threads, j;
   1024
   1025		for (j = 0; j < n_ports_per_thread; j++) {
   1026			t->ports_rx[j] = ports[i * n_ports_per_thread + j];
   1027			t->ports_tx[j] = ports[i * n_ports_per_thread +
   1028				(j + 1) % n_ports_per_thread];
   1029		}
   1030
   1031		t->n_ports_rx = n_ports_per_thread;
   1032
   1033		print_thread(i);
   1034	}
   1035
   1036	for (i = 0; i < n_threads; i++) {
   1037		int status;
   1038
   1039		status = pthread_create(&threads[i],
   1040					NULL,
   1041					thread_func,
   1042					&thread_data[i]);
   1043		if (status) {
   1044			printf("Thread %d creation failed.\n", i);
   1045			return -1;
   1046		}
   1047	}
   1048	printf("All threads created successfully.\n");
   1049
   1050	/* Print statistics. */
   1051	signal(SIGINT, signal_handler);
   1052	signal(SIGTERM, signal_handler);
   1053	signal(SIGABRT, signal_handler);
   1054
   1055	clock_gettime(CLOCK_MONOTONIC, &time);
   1056	ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
   1057	for ( ; !quit; ) {
   1058		u64 ns1, ns_diff;
   1059
   1060		sleep(1);
   1061		clock_gettime(CLOCK_MONOTONIC, &time);
   1062		ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
   1063		ns_diff = ns1 - ns0;
   1064		ns0 = ns1;
   1065
   1066		print_port_stats_all(ns_diff);
   1067	}
   1068
   1069	/* Threads completion. */
   1070	printf("Quit.\n");
   1071	for (i = 0; i < n_threads; i++)
   1072		thread_data[i].quit = 1;
   1073
   1074	for (i = 0; i < n_threads; i++)
   1075		pthread_join(threads[i], NULL);
   1076
   1077	for (i = 0; i < n_ports; i++)
   1078		port_free(ports[i]);
   1079
   1080	bpool_free(bp);
   1081
   1082	remove_xdp_program();
   1083
   1084	return 0;
   1085}