cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xsk_queue.h (11775B)


      1/* SPDX-License-Identifier: GPL-2.0 */
      2/* XDP user-space ring structure
      3 * Copyright(c) 2018 Intel Corporation.
      4 */
      5
      6#ifndef _LINUX_XSK_QUEUE_H
      7#define _LINUX_XSK_QUEUE_H
      8
      9#include <linux/types.h>
     10#include <linux/if_xdp.h>
     11#include <net/xdp_sock.h>
     12#include <net/xsk_buff_pool.h>
     13
     14#include "xsk.h"
     15
     16struct xdp_ring {
     17	u32 producer ____cacheline_aligned_in_smp;
     18	/* Hinder the adjacent cache prefetcher to prefetch the consumer
     19	 * pointer if the producer pointer is touched and vice versa.
     20	 */
     21	u32 pad1 ____cacheline_aligned_in_smp;
     22	u32 consumer ____cacheline_aligned_in_smp;
     23	u32 pad2 ____cacheline_aligned_in_smp;
     24	u32 flags;
     25	u32 pad3 ____cacheline_aligned_in_smp;
     26};
     27
     28/* Used for the RX and TX queues for packets */
     29struct xdp_rxtx_ring {
     30	struct xdp_ring ptrs;
     31	struct xdp_desc desc[] ____cacheline_aligned_in_smp;
     32};
     33
     34/* Used for the fill and completion queues for buffers */
     35struct xdp_umem_ring {
     36	struct xdp_ring ptrs;
     37	u64 desc[] ____cacheline_aligned_in_smp;
     38};
     39
     40struct xsk_queue {
     41	u32 ring_mask;
     42	u32 nentries;
     43	u32 cached_prod;
     44	u32 cached_cons;
     45	struct xdp_ring *ring;
     46	u64 invalid_descs;
     47	u64 queue_empty_descs;
     48};
     49
     50/* The structure of the shared state of the rings are a simple
     51 * circular buffer, as outlined in
     52 * Documentation/core-api/circular-buffers.rst. For the Rx and
     53 * completion ring, the kernel is the producer and user space is the
     54 * consumer. For the Tx and fill rings, the kernel is the consumer and
     55 * user space is the producer.
     56 *
     57 * producer                         consumer
     58 *
     59 * if (LOAD ->consumer) {  (A)      LOAD.acq ->producer  (C)
     60 *    STORE $data                   LOAD $data
     61 *    STORE.rel ->producer (B)      STORE.rel ->consumer (D)
     62 * }
     63 *
     64 * (A) pairs with (D), and (B) pairs with (C).
     65 *
     66 * Starting with (B), it protects the data from being written after
     67 * the producer pointer. If this barrier was missing, the consumer
     68 * could observe the producer pointer being set and thus load the data
     69 * before the producer has written the new data. The consumer would in
     70 * this case load the old data.
     71 *
     72 * (C) protects the consumer from speculatively loading the data before
     73 * the producer pointer actually has been read. If we do not have this
     74 * barrier, some architectures could load old data as speculative loads
     75 * are not discarded as the CPU does not know there is a dependency
     76 * between ->producer and data.
     77 *
     78 * (A) is a control dependency that separates the load of ->consumer
     79 * from the stores of $data. In case ->consumer indicates there is no
     80 * room in the buffer to store $data we do not. The dependency will
     81 * order both of the stores after the loads. So no barrier is needed.
     82 *
     83 * (D) protects the load of the data to be observed to happen after the
     84 * store of the consumer pointer. If we did not have this memory
     85 * barrier, the producer could observe the consumer pointer being set
     86 * and overwrite the data with a new value before the consumer got the
     87 * chance to read the old value. The consumer would thus miss reading
     88 * the old entry and very likely read the new entry twice, once right
     89 * now and again after circling through the ring.
     90 */
     91
     92/* The operations on the rings are the following:
     93 *
     94 * producer                           consumer
     95 *
     96 * RESERVE entries                    PEEK in the ring for entries
     97 * WRITE data into the ring           READ data from the ring
     98 * SUBMIT entries                     RELEASE entries
     99 *
    100 * The producer reserves one or more entries in the ring. It can then
    101 * fill in these entries and finally submit them so that they can be
    102 * seen and read by the consumer.
    103 *
    104 * The consumer peeks into the ring to see if the producer has written
    105 * any new entries. If so, the consumer can then read these entries
    106 * and when it is done reading them release them back to the producer
    107 * so that the producer can use these slots to fill in new entries.
    108 *
    109 * The function names below reflect these operations.
    110 */
    111
    112/* Functions that read and validate content from consumer rings. */
    113
    114static inline void __xskq_cons_read_addr_unchecked(struct xsk_queue *q, u32 cached_cons, u64 *addr)
    115{
    116	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
    117	u32 idx = cached_cons & q->ring_mask;
    118
    119	*addr = ring->desc[idx];
    120}
    121
    122static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr)
    123{
    124	if (q->cached_cons != q->cached_prod) {
    125		__xskq_cons_read_addr_unchecked(q, q->cached_cons, addr);
    126		return true;
    127	}
    128
    129	return false;
    130}
    131
    132static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool,
    133					    struct xdp_desc *desc)
    134{
    135	u64 chunk, chunk_end;
    136
    137	chunk = xp_aligned_extract_addr(pool, desc->addr);
    138	if (likely(desc->len)) {
    139		chunk_end = xp_aligned_extract_addr(pool, desc->addr + desc->len - 1);
    140		if (chunk != chunk_end)
    141			return false;
    142	}
    143
    144	if (chunk >= pool->addrs_cnt)
    145		return false;
    146
    147	if (desc->options)
    148		return false;
    149	return true;
    150}
    151
    152static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool,
    153					      struct xdp_desc *desc)
    154{
    155	u64 addr, base_addr;
    156
    157	base_addr = xp_unaligned_extract_addr(desc->addr);
    158	addr = xp_unaligned_add_offset_to_addr(desc->addr);
    159
    160	if (desc->len > pool->chunk_size)
    161		return false;
    162
    163	if (base_addr >= pool->addrs_cnt || addr >= pool->addrs_cnt ||
    164	    xp_desc_crosses_non_contig_pg(pool, addr, desc->len))
    165		return false;
    166
    167	if (desc->options)
    168		return false;
    169	return true;
    170}
    171
    172static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
    173				    struct xdp_desc *desc)
    174{
    175	return pool->unaligned ? xp_unaligned_validate_desc(pool, desc) :
    176		xp_aligned_validate_desc(pool, desc);
    177}
    178
    179static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
    180					   struct xdp_desc *d,
    181					   struct xsk_buff_pool *pool)
    182{
    183	if (!xp_validate_desc(pool, d)) {
    184		q->invalid_descs++;
    185		return false;
    186	}
    187	return true;
    188}
    189
    190static inline bool xskq_cons_read_desc(struct xsk_queue *q,
    191				       struct xdp_desc *desc,
    192				       struct xsk_buff_pool *pool)
    193{
    194	while (q->cached_cons != q->cached_prod) {
    195		struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
    196		u32 idx = q->cached_cons & q->ring_mask;
    197
    198		*desc = ring->desc[idx];
    199		if (xskq_cons_is_valid_desc(q, desc, pool))
    200			return true;
    201
    202		q->cached_cons++;
    203	}
    204
    205	return false;
    206}
    207
    208static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
    209					    u32 max)
    210{
    211	u32 cached_cons = q->cached_cons, nb_entries = 0;
    212	struct xdp_desc *descs = pool->tx_descs;
    213
    214	while (cached_cons != q->cached_prod && nb_entries < max) {
    215		struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
    216		u32 idx = cached_cons & q->ring_mask;
    217
    218		descs[nb_entries] = ring->desc[idx];
    219		if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
    220			/* Skip the entry */
    221			cached_cons++;
    222			continue;
    223		}
    224
    225		nb_entries++;
    226		cached_cons++;
    227	}
    228
    229	return nb_entries;
    230}
    231
    232/* Functions for consumers */
    233
    234static inline void __xskq_cons_release(struct xsk_queue *q)
    235{
    236	smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
    237}
    238
    239static inline void __xskq_cons_peek(struct xsk_queue *q)
    240{
    241	/* Refresh the local pointer */
    242	q->cached_prod = smp_load_acquire(&q->ring->producer);  /* C, matches B */
    243}
    244
    245static inline void xskq_cons_get_entries(struct xsk_queue *q)
    246{
    247	__xskq_cons_release(q);
    248	__xskq_cons_peek(q);
    249}
    250
    251static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
    252{
    253	u32 entries = q->cached_prod - q->cached_cons;
    254
    255	if (entries >= max)
    256		return max;
    257
    258	__xskq_cons_peek(q);
    259	entries = q->cached_prod - q->cached_cons;
    260
    261	return entries >= max ? max : entries;
    262}
    263
    264static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
    265{
    266	return xskq_cons_nb_entries(q, cnt) >= cnt;
    267}
    268
    269static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
    270{
    271	if (q->cached_prod == q->cached_cons)
    272		xskq_cons_get_entries(q);
    273	return xskq_cons_read_addr_unchecked(q, addr);
    274}
    275
    276static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
    277				       struct xdp_desc *desc,
    278				       struct xsk_buff_pool *pool)
    279{
    280	if (q->cached_prod == q->cached_cons)
    281		xskq_cons_get_entries(q);
    282	return xskq_cons_read_desc(q, desc, pool);
    283}
    284
    285/* To improve performance in the xskq_cons_release functions, only update local state here.
    286 * Reflect this to global state when we get new entries from the ring in
    287 * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
    288 */
    289static inline void xskq_cons_release(struct xsk_queue *q)
    290{
    291	q->cached_cons++;
    292}
    293
    294static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
    295{
    296	q->cached_cons += cnt;
    297}
    298
    299static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
    300{
    301	/* No barriers needed since data is not accessed */
    302	return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer);
    303}
    304
    305/* Functions for producers */
    306
    307static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
    308{
    309	u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
    310
    311	if (free_entries >= max)
    312		return max;
    313
    314	/* Refresh the local tail pointer */
    315	q->cached_cons = READ_ONCE(q->ring->consumer);
    316	free_entries = q->nentries - (q->cached_prod - q->cached_cons);
    317
    318	return free_entries >= max ? max : free_entries;
    319}
    320
    321static inline bool xskq_prod_is_full(struct xsk_queue *q)
    322{
    323	return xskq_prod_nb_free(q, 1) ? false : true;
    324}
    325
    326static inline void xskq_prod_cancel(struct xsk_queue *q)
    327{
    328	q->cached_prod--;
    329}
    330
    331static inline int xskq_prod_reserve(struct xsk_queue *q)
    332{
    333	if (xskq_prod_is_full(q))
    334		return -ENOSPC;
    335
    336	/* A, matches D */
    337	q->cached_prod++;
    338	return 0;
    339}
    340
    341static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
    342{
    343	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
    344
    345	if (xskq_prod_is_full(q))
    346		return -ENOSPC;
    347
    348	/* A, matches D */
    349	ring->desc[q->cached_prod++ & q->ring_mask] = addr;
    350	return 0;
    351}
    352
    353static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
    354					       u32 max)
    355{
    356	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
    357	u32 nb_entries, i, cached_prod;
    358
    359	nb_entries = xskq_prod_nb_free(q, max);
    360
    361	/* A, matches D */
    362	cached_prod = q->cached_prod;
    363	for (i = 0; i < nb_entries; i++)
    364		ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
    365	q->cached_prod = cached_prod;
    366
    367	return nb_entries;
    368}
    369
    370static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
    371					 u64 addr, u32 len)
    372{
    373	struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
    374	u32 idx;
    375
    376	if (xskq_prod_is_full(q))
    377		return -ENOBUFS;
    378
    379	/* A, matches D */
    380	idx = q->cached_prod++ & q->ring_mask;
    381	ring->desc[idx].addr = addr;
    382	ring->desc[idx].len = len;
    383
    384	return 0;
    385}
    386
    387static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
    388{
    389	smp_store_release(&q->ring->producer, idx); /* B, matches C */
    390}
    391
    392static inline void xskq_prod_submit(struct xsk_queue *q)
    393{
    394	__xskq_prod_submit(q, q->cached_prod);
    395}
    396
    397static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
    398{
    399	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
    400	u32 idx = q->ring->producer;
    401
    402	ring->desc[idx++ & q->ring_mask] = addr;
    403
    404	__xskq_prod_submit(q, idx);
    405}
    406
    407static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
    408{
    409	__xskq_prod_submit(q, q->ring->producer + nb_entries);
    410}
    411
    412static inline bool xskq_prod_is_empty(struct xsk_queue *q)
    413{
    414	/* No barriers needed since data is not accessed */
    415	return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
    416}
    417
    418/* For both producers and consumers */
    419
    420static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
    421{
    422	return q ? q->invalid_descs : 0;
    423}
    424
    425static inline u64 xskq_nb_queue_empty_descs(struct xsk_queue *q)
    426{
    427	return q ? q->queue_empty_descs : 0;
    428}
    429
    430struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
    431void xskq_destroy(struct xsk_queue *q_ops);
    432
    433#endif /* _LINUX_XSK_QUEUE_H */