cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

ptr_ring.h (16679B)


      1/* SPDX-License-Identifier: GPL-2.0-or-later */
      2/*
      3 *	Definitions for the 'struct ptr_ring' datastructure.
      4 *
      5 *	Author:
      6 *		Michael S. Tsirkin <mst@redhat.com>
      7 *
      8 *	Copyright (C) 2016 Red Hat, Inc.
      9 *
     10 *	This is a limited-size FIFO maintaining pointers in FIFO order, with
     11 *	one CPU producing entries and another consuming entries from a FIFO.
     12 *
     13 *	This implementation tries to minimize cache-contention when there is a
     14 *	single producer and a single consumer CPU.
     15 */
     16
     17#ifndef _LINUX_PTR_RING_H
     18#define _LINUX_PTR_RING_H 1
     19
     20#ifdef __KERNEL__
     21#include <linux/spinlock.h>
     22#include <linux/cache.h>
     23#include <linux/types.h>
     24#include <linux/compiler.h>
     25#include <linux/slab.h>
     26#include <linux/mm.h>
     27#include <asm/errno.h>
     28#endif
     29
     30struct ptr_ring {
     31	int producer ____cacheline_aligned_in_smp;
     32	spinlock_t producer_lock;
     33	int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
     34	int consumer_tail; /* next entry to invalidate */
     35	spinlock_t consumer_lock;
     36	/* Shared consumer/producer data */
     37	/* Read-only by both the producer and the consumer */
     38	int size ____cacheline_aligned_in_smp; /* max entries in queue */
     39	int batch; /* number of entries to consume in a batch */
     40	void **queue;
     41};
     42
     43/* Note: callers invoking this in a loop must use a compiler barrier,
     44 * for example cpu_relax().
     45 *
     46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
     47 * see e.g. ptr_ring_full.
     48 */
     49static inline bool __ptr_ring_full(struct ptr_ring *r)
     50{
     51	return r->queue[r->producer];
     52}
     53
     54static inline bool ptr_ring_full(struct ptr_ring *r)
     55{
     56	bool ret;
     57
     58	spin_lock(&r->producer_lock);
     59	ret = __ptr_ring_full(r);
     60	spin_unlock(&r->producer_lock);
     61
     62	return ret;
     63}
     64
     65static inline bool ptr_ring_full_irq(struct ptr_ring *r)
     66{
     67	bool ret;
     68
     69	spin_lock_irq(&r->producer_lock);
     70	ret = __ptr_ring_full(r);
     71	spin_unlock_irq(&r->producer_lock);
     72
     73	return ret;
     74}
     75
     76static inline bool ptr_ring_full_any(struct ptr_ring *r)
     77{
     78	unsigned long flags;
     79	bool ret;
     80
     81	spin_lock_irqsave(&r->producer_lock, flags);
     82	ret = __ptr_ring_full(r);
     83	spin_unlock_irqrestore(&r->producer_lock, flags);
     84
     85	return ret;
     86}
     87
     88static inline bool ptr_ring_full_bh(struct ptr_ring *r)
     89{
     90	bool ret;
     91
     92	spin_lock_bh(&r->producer_lock);
     93	ret = __ptr_ring_full(r);
     94	spin_unlock_bh(&r->producer_lock);
     95
     96	return ret;
     97}
     98
     99/* Note: callers invoking this in a loop must use a compiler barrier,
    100 * for example cpu_relax(). Callers must hold producer_lock.
    101 * Callers are responsible for making sure pointer that is being queued
    102 * points to a valid data.
    103 */
    104static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
    105{
    106	if (unlikely(!r->size) || r->queue[r->producer])
    107		return -ENOSPC;
    108
    109	/* Make sure the pointer we are storing points to a valid data. */
    110	/* Pairs with the dependency ordering in __ptr_ring_consume. */
    111	smp_wmb();
    112
    113	WRITE_ONCE(r->queue[r->producer++], ptr);
    114	if (unlikely(r->producer >= r->size))
    115		r->producer = 0;
    116	return 0;
    117}
    118
    119/*
    120 * Note: resize (below) nests producer lock within consumer lock, so if you
    121 * consume in interrupt or BH context, you must disable interrupts/BH when
    122 * calling this.
    123 */
    124static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
    125{
    126	int ret;
    127
    128	spin_lock(&r->producer_lock);
    129	ret = __ptr_ring_produce(r, ptr);
    130	spin_unlock(&r->producer_lock);
    131
    132	return ret;
    133}
    134
    135static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
    136{
    137	int ret;
    138
    139	spin_lock_irq(&r->producer_lock);
    140	ret = __ptr_ring_produce(r, ptr);
    141	spin_unlock_irq(&r->producer_lock);
    142
    143	return ret;
    144}
    145
    146static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
    147{
    148	unsigned long flags;
    149	int ret;
    150
    151	spin_lock_irqsave(&r->producer_lock, flags);
    152	ret = __ptr_ring_produce(r, ptr);
    153	spin_unlock_irqrestore(&r->producer_lock, flags);
    154
    155	return ret;
    156}
    157
    158static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
    159{
    160	int ret;
    161
    162	spin_lock_bh(&r->producer_lock);
    163	ret = __ptr_ring_produce(r, ptr);
    164	spin_unlock_bh(&r->producer_lock);
    165
    166	return ret;
    167}
    168
    169static inline void *__ptr_ring_peek(struct ptr_ring *r)
    170{
    171	if (likely(r->size))
    172		return READ_ONCE(r->queue[r->consumer_head]);
    173	return NULL;
    174}
    175
    176/*
    177 * Test ring empty status without taking any locks.
    178 *
    179 * NB: This is only safe to call if ring is never resized.
    180 *
    181 * However, if some other CPU consumes ring entries at the same time, the value
    182 * returned is not guaranteed to be correct.
    183 *
    184 * In this case - to avoid incorrectly detecting the ring
    185 * as empty - the CPU consuming the ring entries is responsible
    186 * for either consuming all ring entries until the ring is empty,
    187 * or synchronizing with some other CPU and causing it to
    188 * re-test __ptr_ring_empty and/or consume the ring enteries
    189 * after the synchronization point.
    190 *
    191 * Note: callers invoking this in a loop must use a compiler barrier,
    192 * for example cpu_relax().
    193 */
    194static inline bool __ptr_ring_empty(struct ptr_ring *r)
    195{
    196	if (likely(r->size))
    197		return !r->queue[READ_ONCE(r->consumer_head)];
    198	return true;
    199}
    200
    201static inline bool ptr_ring_empty(struct ptr_ring *r)
    202{
    203	bool ret;
    204
    205	spin_lock(&r->consumer_lock);
    206	ret = __ptr_ring_empty(r);
    207	spin_unlock(&r->consumer_lock);
    208
    209	return ret;
    210}
    211
    212static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
    213{
    214	bool ret;
    215
    216	spin_lock_irq(&r->consumer_lock);
    217	ret = __ptr_ring_empty(r);
    218	spin_unlock_irq(&r->consumer_lock);
    219
    220	return ret;
    221}
    222
    223static inline bool ptr_ring_empty_any(struct ptr_ring *r)
    224{
    225	unsigned long flags;
    226	bool ret;
    227
    228	spin_lock_irqsave(&r->consumer_lock, flags);
    229	ret = __ptr_ring_empty(r);
    230	spin_unlock_irqrestore(&r->consumer_lock, flags);
    231
    232	return ret;
    233}
    234
    235static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
    236{
    237	bool ret;
    238
    239	spin_lock_bh(&r->consumer_lock);
    240	ret = __ptr_ring_empty(r);
    241	spin_unlock_bh(&r->consumer_lock);
    242
    243	return ret;
    244}
    245
    246/* Must only be called after __ptr_ring_peek returned !NULL */
    247static inline void __ptr_ring_discard_one(struct ptr_ring *r)
    248{
    249	/* Fundamentally, what we want to do is update consumer
    250	 * index and zero out the entry so producer can reuse it.
    251	 * Doing it naively at each consume would be as simple as:
    252	 *       consumer = r->consumer;
    253	 *       r->queue[consumer++] = NULL;
    254	 *       if (unlikely(consumer >= r->size))
    255	 *               consumer = 0;
    256	 *       r->consumer = consumer;
    257	 * but that is suboptimal when the ring is full as producer is writing
    258	 * out new entries in the same cache line.  Defer these updates until a
    259	 * batch of entries has been consumed.
    260	 */
    261	/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
    262	 * to work correctly.
    263	 */
    264	int consumer_head = r->consumer_head;
    265	int head = consumer_head++;
    266
    267	/* Once we have processed enough entries invalidate them in
    268	 * the ring all at once so producer can reuse their space in the ring.
    269	 * We also do this when we reach end of the ring - not mandatory
    270	 * but helps keep the implementation simple.
    271	 */
    272	if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
    273		     consumer_head >= r->size)) {
    274		/* Zero out entries in the reverse order: this way we touch the
    275		 * cache line that producer might currently be reading the last;
    276		 * producer won't make progress and touch other cache lines
    277		 * besides the first one until we write out all entries.
    278		 */
    279		while (likely(head >= r->consumer_tail))
    280			r->queue[head--] = NULL;
    281		r->consumer_tail = consumer_head;
    282	}
    283	if (unlikely(consumer_head >= r->size)) {
    284		consumer_head = 0;
    285		r->consumer_tail = 0;
    286	}
    287	/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
    288	WRITE_ONCE(r->consumer_head, consumer_head);
    289}
    290
    291static inline void *__ptr_ring_consume(struct ptr_ring *r)
    292{
    293	void *ptr;
    294
    295	/* The READ_ONCE in __ptr_ring_peek guarantees that anyone
    296	 * accessing data through the pointer is up to date. Pairs
    297	 * with smp_wmb in __ptr_ring_produce.
    298	 */
    299	ptr = __ptr_ring_peek(r);
    300	if (ptr)
    301		__ptr_ring_discard_one(r);
    302
    303	return ptr;
    304}
    305
    306static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
    307					     void **array, int n)
    308{
    309	void *ptr;
    310	int i;
    311
    312	for (i = 0; i < n; i++) {
    313		ptr = __ptr_ring_consume(r);
    314		if (!ptr)
    315			break;
    316		array[i] = ptr;
    317	}
    318
    319	return i;
    320}
    321
    322/*
    323 * Note: resize (below) nests producer lock within consumer lock, so if you
    324 * call this in interrupt or BH context, you must disable interrupts/BH when
    325 * producing.
    326 */
    327static inline void *ptr_ring_consume(struct ptr_ring *r)
    328{
    329	void *ptr;
    330
    331	spin_lock(&r->consumer_lock);
    332	ptr = __ptr_ring_consume(r);
    333	spin_unlock(&r->consumer_lock);
    334
    335	return ptr;
    336}
    337
    338static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
    339{
    340	void *ptr;
    341
    342	spin_lock_irq(&r->consumer_lock);
    343	ptr = __ptr_ring_consume(r);
    344	spin_unlock_irq(&r->consumer_lock);
    345
    346	return ptr;
    347}
    348
    349static inline void *ptr_ring_consume_any(struct ptr_ring *r)
    350{
    351	unsigned long flags;
    352	void *ptr;
    353
    354	spin_lock_irqsave(&r->consumer_lock, flags);
    355	ptr = __ptr_ring_consume(r);
    356	spin_unlock_irqrestore(&r->consumer_lock, flags);
    357
    358	return ptr;
    359}
    360
    361static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
    362{
    363	void *ptr;
    364
    365	spin_lock_bh(&r->consumer_lock);
    366	ptr = __ptr_ring_consume(r);
    367	spin_unlock_bh(&r->consumer_lock);
    368
    369	return ptr;
    370}
    371
    372static inline int ptr_ring_consume_batched(struct ptr_ring *r,
    373					   void **array, int n)
    374{
    375	int ret;
    376
    377	spin_lock(&r->consumer_lock);
    378	ret = __ptr_ring_consume_batched(r, array, n);
    379	spin_unlock(&r->consumer_lock);
    380
    381	return ret;
    382}
    383
    384static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
    385					       void **array, int n)
    386{
    387	int ret;
    388
    389	spin_lock_irq(&r->consumer_lock);
    390	ret = __ptr_ring_consume_batched(r, array, n);
    391	spin_unlock_irq(&r->consumer_lock);
    392
    393	return ret;
    394}
    395
    396static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
    397					       void **array, int n)
    398{
    399	unsigned long flags;
    400	int ret;
    401
    402	spin_lock_irqsave(&r->consumer_lock, flags);
    403	ret = __ptr_ring_consume_batched(r, array, n);
    404	spin_unlock_irqrestore(&r->consumer_lock, flags);
    405
    406	return ret;
    407}
    408
    409static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
    410					      void **array, int n)
    411{
    412	int ret;
    413
    414	spin_lock_bh(&r->consumer_lock);
    415	ret = __ptr_ring_consume_batched(r, array, n);
    416	spin_unlock_bh(&r->consumer_lock);
    417
    418	return ret;
    419}
    420
    421/* Cast to structure type and call a function without discarding from FIFO.
    422 * Function must return a value.
    423 * Callers must take consumer_lock.
    424 */
    425#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
    426
    427#define PTR_RING_PEEK_CALL(r, f) ({ \
    428	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
    429	\
    430	spin_lock(&(r)->consumer_lock); \
    431	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
    432	spin_unlock(&(r)->consumer_lock); \
    433	__PTR_RING_PEEK_CALL_v; \
    434})
    435
    436#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
    437	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
    438	\
    439	spin_lock_irq(&(r)->consumer_lock); \
    440	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
    441	spin_unlock_irq(&(r)->consumer_lock); \
    442	__PTR_RING_PEEK_CALL_v; \
    443})
    444
    445#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
    446	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
    447	\
    448	spin_lock_bh(&(r)->consumer_lock); \
    449	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
    450	spin_unlock_bh(&(r)->consumer_lock); \
    451	__PTR_RING_PEEK_CALL_v; \
    452})
    453
    454#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
    455	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
    456	unsigned long __PTR_RING_PEEK_CALL_f;\
    457	\
    458	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
    459	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
    460	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
    461	__PTR_RING_PEEK_CALL_v; \
    462})
    463
    464/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
    465 * documentation for vmalloc for which of them are legal.
    466 */
    467static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
    468{
    469	if (size > KMALLOC_MAX_SIZE / sizeof(void *))
    470		return NULL;
    471	return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
    472}
    473
    474static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
    475{
    476	r->size = size;
    477	r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
    478	/* We need to set batch at least to 1 to make logic
    479	 * in __ptr_ring_discard_one work correctly.
    480	 * Batching too much (because ring is small) would cause a lot of
    481	 * burstiness. Needs tuning, for now disable batching.
    482	 */
    483	if (r->batch > r->size / 2 || !r->batch)
    484		r->batch = 1;
    485}
    486
    487static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
    488{
    489	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
    490	if (!r->queue)
    491		return -ENOMEM;
    492
    493	__ptr_ring_set_size(r, size);
    494	r->producer = r->consumer_head = r->consumer_tail = 0;
    495	spin_lock_init(&r->producer_lock);
    496	spin_lock_init(&r->consumer_lock);
    497
    498	return 0;
    499}
    500
    501/*
    502 * Return entries into ring. Destroy entries that don't fit.
    503 *
    504 * Note: this is expected to be a rare slow path operation.
    505 *
    506 * Note: producer lock is nested within consumer lock, so if you
    507 * resize you must make sure all uses nest correctly.
    508 * In particular if you consume ring in interrupt or BH context, you must
    509 * disable interrupts/BH when doing so.
    510 */
    511static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
    512				      void (*destroy)(void *))
    513{
    514	unsigned long flags;
    515	int head;
    516
    517	spin_lock_irqsave(&r->consumer_lock, flags);
    518	spin_lock(&r->producer_lock);
    519
    520	if (!r->size)
    521		goto done;
    522
    523	/*
    524	 * Clean out buffered entries (for simplicity). This way following code
    525	 * can test entries for NULL and if not assume they are valid.
    526	 */
    527	head = r->consumer_head - 1;
    528	while (likely(head >= r->consumer_tail))
    529		r->queue[head--] = NULL;
    530	r->consumer_tail = r->consumer_head;
    531
    532	/*
    533	 * Go over entries in batch, start moving head back and copy entries.
    534	 * Stop when we run into previously unconsumed entries.
    535	 */
    536	while (n) {
    537		head = r->consumer_head - 1;
    538		if (head < 0)
    539			head = r->size - 1;
    540		if (r->queue[head]) {
    541			/* This batch entry will have to be destroyed. */
    542			goto done;
    543		}
    544		r->queue[head] = batch[--n];
    545		r->consumer_tail = head;
    546		/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
    547		WRITE_ONCE(r->consumer_head, head);
    548	}
    549
    550done:
    551	/* Destroy all entries left in the batch. */
    552	while (n)
    553		destroy(batch[--n]);
    554	spin_unlock(&r->producer_lock);
    555	spin_unlock_irqrestore(&r->consumer_lock, flags);
    556}
    557
    558static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
    559					   int size, gfp_t gfp,
    560					   void (*destroy)(void *))
    561{
    562	int producer = 0;
    563	void **old;
    564	void *ptr;
    565
    566	while ((ptr = __ptr_ring_consume(r)))
    567		if (producer < size)
    568			queue[producer++] = ptr;
    569		else if (destroy)
    570			destroy(ptr);
    571
    572	if (producer >= size)
    573		producer = 0;
    574	__ptr_ring_set_size(r, size);
    575	r->producer = producer;
    576	r->consumer_head = 0;
    577	r->consumer_tail = 0;
    578	old = r->queue;
    579	r->queue = queue;
    580
    581	return old;
    582}
    583
    584/*
    585 * Note: producer lock is nested within consumer lock, so if you
    586 * resize you must make sure all uses nest correctly.
    587 * In particular if you consume ring in interrupt or BH context, you must
    588 * disable interrupts/BH when doing so.
    589 */
    590static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
    591				  void (*destroy)(void *))
    592{
    593	unsigned long flags;
    594	void **queue = __ptr_ring_init_queue_alloc(size, gfp);
    595	void **old;
    596
    597	if (!queue)
    598		return -ENOMEM;
    599
    600	spin_lock_irqsave(&(r)->consumer_lock, flags);
    601	spin_lock(&(r)->producer_lock);
    602
    603	old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
    604
    605	spin_unlock(&(r)->producer_lock);
    606	spin_unlock_irqrestore(&(r)->consumer_lock, flags);
    607
    608	kvfree(old);
    609
    610	return 0;
    611}
    612
    613/*
    614 * Note: producer lock is nested within consumer lock, so if you
    615 * resize you must make sure all uses nest correctly.
    616 * In particular if you consume ring in interrupt or BH context, you must
    617 * disable interrupts/BH when doing so.
    618 */
    619static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
    620					   unsigned int nrings,
    621					   int size,
    622					   gfp_t gfp, void (*destroy)(void *))
    623{
    624	unsigned long flags;
    625	void ***queues;
    626	int i;
    627
    628	queues = kmalloc_array(nrings, sizeof(*queues), gfp);
    629	if (!queues)
    630		goto noqueues;
    631
    632	for (i = 0; i < nrings; ++i) {
    633		queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
    634		if (!queues[i])
    635			goto nomem;
    636	}
    637
    638	for (i = 0; i < nrings; ++i) {
    639		spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
    640		spin_lock(&(rings[i])->producer_lock);
    641		queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
    642						  size, gfp, destroy);
    643		spin_unlock(&(rings[i])->producer_lock);
    644		spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
    645	}
    646
    647	for (i = 0; i < nrings; ++i)
    648		kvfree(queues[i]);
    649
    650	kfree(queues);
    651
    652	return 0;
    653
    654nomem:
    655	while (--i >= 0)
    656		kvfree(queues[i]);
    657
    658	kfree(queues);
    659
    660noqueues:
    661	return -ENOMEM;
    662}
    663
    664static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
    665{
    666	void *ptr;
    667
    668	if (destroy)
    669		while ((ptr = ptr_ring_consume(r)))
    670			destroy(ptr);
    671	kvfree(r->queue);
    672}
    673
    674#endif /* _LINUX_PTR_RING_H  */