ptr_ring.h (16679B)
1/* SPDX-License-Identifier: GPL-2.0-or-later */ 2/* 3 * Definitions for the 'struct ptr_ring' datastructure. 4 * 5 * Author: 6 * Michael S. Tsirkin <mst@redhat.com> 7 * 8 * Copyright (C) 2016 Red Hat, Inc. 9 * 10 * This is a limited-size FIFO maintaining pointers in FIFO order, with 11 * one CPU producing entries and another consuming entries from a FIFO. 12 * 13 * This implementation tries to minimize cache-contention when there is a 14 * single producer and a single consumer CPU. 15 */ 16 17#ifndef _LINUX_PTR_RING_H 18#define _LINUX_PTR_RING_H 1 19 20#ifdef __KERNEL__ 21#include <linux/spinlock.h> 22#include <linux/cache.h> 23#include <linux/types.h> 24#include <linux/compiler.h> 25#include <linux/slab.h> 26#include <linux/mm.h> 27#include <asm/errno.h> 28#endif 29 30struct ptr_ring { 31 int producer ____cacheline_aligned_in_smp; 32 spinlock_t producer_lock; 33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 34 int consumer_tail; /* next entry to invalidate */ 35 spinlock_t consumer_lock; 36 /* Shared consumer/producer data */ 37 /* Read-only by both the producer and the consumer */ 38 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 39 int batch; /* number of entries to consume in a batch */ 40 void **queue; 41}; 42 43/* Note: callers invoking this in a loop must use a compiler barrier, 44 * for example cpu_relax(). 45 * 46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 47 * see e.g. ptr_ring_full. 48 */ 49static inline bool __ptr_ring_full(struct ptr_ring *r) 50{ 51 return r->queue[r->producer]; 52} 53 54static inline bool ptr_ring_full(struct ptr_ring *r) 55{ 56 bool ret; 57 58 spin_lock(&r->producer_lock); 59 ret = __ptr_ring_full(r); 60 spin_unlock(&r->producer_lock); 61 62 return ret; 63} 64 65static inline bool ptr_ring_full_irq(struct ptr_ring *r) 66{ 67 bool ret; 68 69 spin_lock_irq(&r->producer_lock); 70 ret = __ptr_ring_full(r); 71 spin_unlock_irq(&r->producer_lock); 72 73 return ret; 74} 75 76static inline bool ptr_ring_full_any(struct ptr_ring *r) 77{ 78 unsigned long flags; 79 bool ret; 80 81 spin_lock_irqsave(&r->producer_lock, flags); 82 ret = __ptr_ring_full(r); 83 spin_unlock_irqrestore(&r->producer_lock, flags); 84 85 return ret; 86} 87 88static inline bool ptr_ring_full_bh(struct ptr_ring *r) 89{ 90 bool ret; 91 92 spin_lock_bh(&r->producer_lock); 93 ret = __ptr_ring_full(r); 94 spin_unlock_bh(&r->producer_lock); 95 96 return ret; 97} 98 99/* Note: callers invoking this in a loop must use a compiler barrier, 100 * for example cpu_relax(). Callers must hold producer_lock. 101 * Callers are responsible for making sure pointer that is being queued 102 * points to a valid data. 103 */ 104static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 105{ 106 if (unlikely(!r->size) || r->queue[r->producer]) 107 return -ENOSPC; 108 109 /* Make sure the pointer we are storing points to a valid data. */ 110 /* Pairs with the dependency ordering in __ptr_ring_consume. */ 111 smp_wmb(); 112 113 WRITE_ONCE(r->queue[r->producer++], ptr); 114 if (unlikely(r->producer >= r->size)) 115 r->producer = 0; 116 return 0; 117} 118 119/* 120 * Note: resize (below) nests producer lock within consumer lock, so if you 121 * consume in interrupt or BH context, you must disable interrupts/BH when 122 * calling this. 123 */ 124static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 125{ 126 int ret; 127 128 spin_lock(&r->producer_lock); 129 ret = __ptr_ring_produce(r, ptr); 130 spin_unlock(&r->producer_lock); 131 132 return ret; 133} 134 135static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 136{ 137 int ret; 138 139 spin_lock_irq(&r->producer_lock); 140 ret = __ptr_ring_produce(r, ptr); 141 spin_unlock_irq(&r->producer_lock); 142 143 return ret; 144} 145 146static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 147{ 148 unsigned long flags; 149 int ret; 150 151 spin_lock_irqsave(&r->producer_lock, flags); 152 ret = __ptr_ring_produce(r, ptr); 153 spin_unlock_irqrestore(&r->producer_lock, flags); 154 155 return ret; 156} 157 158static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 159{ 160 int ret; 161 162 spin_lock_bh(&r->producer_lock); 163 ret = __ptr_ring_produce(r, ptr); 164 spin_unlock_bh(&r->producer_lock); 165 166 return ret; 167} 168 169static inline void *__ptr_ring_peek(struct ptr_ring *r) 170{ 171 if (likely(r->size)) 172 return READ_ONCE(r->queue[r->consumer_head]); 173 return NULL; 174} 175 176/* 177 * Test ring empty status without taking any locks. 178 * 179 * NB: This is only safe to call if ring is never resized. 180 * 181 * However, if some other CPU consumes ring entries at the same time, the value 182 * returned is not guaranteed to be correct. 183 * 184 * In this case - to avoid incorrectly detecting the ring 185 * as empty - the CPU consuming the ring entries is responsible 186 * for either consuming all ring entries until the ring is empty, 187 * or synchronizing with some other CPU and causing it to 188 * re-test __ptr_ring_empty and/or consume the ring enteries 189 * after the synchronization point. 190 * 191 * Note: callers invoking this in a loop must use a compiler barrier, 192 * for example cpu_relax(). 193 */ 194static inline bool __ptr_ring_empty(struct ptr_ring *r) 195{ 196 if (likely(r->size)) 197 return !r->queue[READ_ONCE(r->consumer_head)]; 198 return true; 199} 200 201static inline bool ptr_ring_empty(struct ptr_ring *r) 202{ 203 bool ret; 204 205 spin_lock(&r->consumer_lock); 206 ret = __ptr_ring_empty(r); 207 spin_unlock(&r->consumer_lock); 208 209 return ret; 210} 211 212static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 213{ 214 bool ret; 215 216 spin_lock_irq(&r->consumer_lock); 217 ret = __ptr_ring_empty(r); 218 spin_unlock_irq(&r->consumer_lock); 219 220 return ret; 221} 222 223static inline bool ptr_ring_empty_any(struct ptr_ring *r) 224{ 225 unsigned long flags; 226 bool ret; 227 228 spin_lock_irqsave(&r->consumer_lock, flags); 229 ret = __ptr_ring_empty(r); 230 spin_unlock_irqrestore(&r->consumer_lock, flags); 231 232 return ret; 233} 234 235static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 236{ 237 bool ret; 238 239 spin_lock_bh(&r->consumer_lock); 240 ret = __ptr_ring_empty(r); 241 spin_unlock_bh(&r->consumer_lock); 242 243 return ret; 244} 245 246/* Must only be called after __ptr_ring_peek returned !NULL */ 247static inline void __ptr_ring_discard_one(struct ptr_ring *r) 248{ 249 /* Fundamentally, what we want to do is update consumer 250 * index and zero out the entry so producer can reuse it. 251 * Doing it naively at each consume would be as simple as: 252 * consumer = r->consumer; 253 * r->queue[consumer++] = NULL; 254 * if (unlikely(consumer >= r->size)) 255 * consumer = 0; 256 * r->consumer = consumer; 257 * but that is suboptimal when the ring is full as producer is writing 258 * out new entries in the same cache line. Defer these updates until a 259 * batch of entries has been consumed. 260 */ 261 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 262 * to work correctly. 263 */ 264 int consumer_head = r->consumer_head; 265 int head = consumer_head++; 266 267 /* Once we have processed enough entries invalidate them in 268 * the ring all at once so producer can reuse their space in the ring. 269 * We also do this when we reach end of the ring - not mandatory 270 * but helps keep the implementation simple. 271 */ 272 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 273 consumer_head >= r->size)) { 274 /* Zero out entries in the reverse order: this way we touch the 275 * cache line that producer might currently be reading the last; 276 * producer won't make progress and touch other cache lines 277 * besides the first one until we write out all entries. 278 */ 279 while (likely(head >= r->consumer_tail)) 280 r->queue[head--] = NULL; 281 r->consumer_tail = consumer_head; 282 } 283 if (unlikely(consumer_head >= r->size)) { 284 consumer_head = 0; 285 r->consumer_tail = 0; 286 } 287 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 288 WRITE_ONCE(r->consumer_head, consumer_head); 289} 290 291static inline void *__ptr_ring_consume(struct ptr_ring *r) 292{ 293 void *ptr; 294 295 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 296 * accessing data through the pointer is up to date. Pairs 297 * with smp_wmb in __ptr_ring_produce. 298 */ 299 ptr = __ptr_ring_peek(r); 300 if (ptr) 301 __ptr_ring_discard_one(r); 302 303 return ptr; 304} 305 306static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 307 void **array, int n) 308{ 309 void *ptr; 310 int i; 311 312 for (i = 0; i < n; i++) { 313 ptr = __ptr_ring_consume(r); 314 if (!ptr) 315 break; 316 array[i] = ptr; 317 } 318 319 return i; 320} 321 322/* 323 * Note: resize (below) nests producer lock within consumer lock, so if you 324 * call this in interrupt or BH context, you must disable interrupts/BH when 325 * producing. 326 */ 327static inline void *ptr_ring_consume(struct ptr_ring *r) 328{ 329 void *ptr; 330 331 spin_lock(&r->consumer_lock); 332 ptr = __ptr_ring_consume(r); 333 spin_unlock(&r->consumer_lock); 334 335 return ptr; 336} 337 338static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 339{ 340 void *ptr; 341 342 spin_lock_irq(&r->consumer_lock); 343 ptr = __ptr_ring_consume(r); 344 spin_unlock_irq(&r->consumer_lock); 345 346 return ptr; 347} 348 349static inline void *ptr_ring_consume_any(struct ptr_ring *r) 350{ 351 unsigned long flags; 352 void *ptr; 353 354 spin_lock_irqsave(&r->consumer_lock, flags); 355 ptr = __ptr_ring_consume(r); 356 spin_unlock_irqrestore(&r->consumer_lock, flags); 357 358 return ptr; 359} 360 361static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 362{ 363 void *ptr; 364 365 spin_lock_bh(&r->consumer_lock); 366 ptr = __ptr_ring_consume(r); 367 spin_unlock_bh(&r->consumer_lock); 368 369 return ptr; 370} 371 372static inline int ptr_ring_consume_batched(struct ptr_ring *r, 373 void **array, int n) 374{ 375 int ret; 376 377 spin_lock(&r->consumer_lock); 378 ret = __ptr_ring_consume_batched(r, array, n); 379 spin_unlock(&r->consumer_lock); 380 381 return ret; 382} 383 384static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 385 void **array, int n) 386{ 387 int ret; 388 389 spin_lock_irq(&r->consumer_lock); 390 ret = __ptr_ring_consume_batched(r, array, n); 391 spin_unlock_irq(&r->consumer_lock); 392 393 return ret; 394} 395 396static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 397 void **array, int n) 398{ 399 unsigned long flags; 400 int ret; 401 402 spin_lock_irqsave(&r->consumer_lock, flags); 403 ret = __ptr_ring_consume_batched(r, array, n); 404 spin_unlock_irqrestore(&r->consumer_lock, flags); 405 406 return ret; 407} 408 409static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 410 void **array, int n) 411{ 412 int ret; 413 414 spin_lock_bh(&r->consumer_lock); 415 ret = __ptr_ring_consume_batched(r, array, n); 416 spin_unlock_bh(&r->consumer_lock); 417 418 return ret; 419} 420 421/* Cast to structure type and call a function without discarding from FIFO. 422 * Function must return a value. 423 * Callers must take consumer_lock. 424 */ 425#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 426 427#define PTR_RING_PEEK_CALL(r, f) ({ \ 428 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 429 \ 430 spin_lock(&(r)->consumer_lock); \ 431 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 432 spin_unlock(&(r)->consumer_lock); \ 433 __PTR_RING_PEEK_CALL_v; \ 434}) 435 436#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 437 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 438 \ 439 spin_lock_irq(&(r)->consumer_lock); \ 440 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 441 spin_unlock_irq(&(r)->consumer_lock); \ 442 __PTR_RING_PEEK_CALL_v; \ 443}) 444 445#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 446 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 447 \ 448 spin_lock_bh(&(r)->consumer_lock); \ 449 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 450 spin_unlock_bh(&(r)->consumer_lock); \ 451 __PTR_RING_PEEK_CALL_v; \ 452}) 453 454#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 455 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 456 unsigned long __PTR_RING_PEEK_CALL_f;\ 457 \ 458 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 460 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 461 __PTR_RING_PEEK_CALL_v; \ 462}) 463 464/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 465 * documentation for vmalloc for which of them are legal. 466 */ 467static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 468{ 469 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 470 return NULL; 471 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); 472} 473 474static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 475{ 476 r->size = size; 477 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 478 /* We need to set batch at least to 1 to make logic 479 * in __ptr_ring_discard_one work correctly. 480 * Batching too much (because ring is small) would cause a lot of 481 * burstiness. Needs tuning, for now disable batching. 482 */ 483 if (r->batch > r->size / 2 || !r->batch) 484 r->batch = 1; 485} 486 487static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 488{ 489 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 490 if (!r->queue) 491 return -ENOMEM; 492 493 __ptr_ring_set_size(r, size); 494 r->producer = r->consumer_head = r->consumer_tail = 0; 495 spin_lock_init(&r->producer_lock); 496 spin_lock_init(&r->consumer_lock); 497 498 return 0; 499} 500 501/* 502 * Return entries into ring. Destroy entries that don't fit. 503 * 504 * Note: this is expected to be a rare slow path operation. 505 * 506 * Note: producer lock is nested within consumer lock, so if you 507 * resize you must make sure all uses nest correctly. 508 * In particular if you consume ring in interrupt or BH context, you must 509 * disable interrupts/BH when doing so. 510 */ 511static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 512 void (*destroy)(void *)) 513{ 514 unsigned long flags; 515 int head; 516 517 spin_lock_irqsave(&r->consumer_lock, flags); 518 spin_lock(&r->producer_lock); 519 520 if (!r->size) 521 goto done; 522 523 /* 524 * Clean out buffered entries (for simplicity). This way following code 525 * can test entries for NULL and if not assume they are valid. 526 */ 527 head = r->consumer_head - 1; 528 while (likely(head >= r->consumer_tail)) 529 r->queue[head--] = NULL; 530 r->consumer_tail = r->consumer_head; 531 532 /* 533 * Go over entries in batch, start moving head back and copy entries. 534 * Stop when we run into previously unconsumed entries. 535 */ 536 while (n) { 537 head = r->consumer_head - 1; 538 if (head < 0) 539 head = r->size - 1; 540 if (r->queue[head]) { 541 /* This batch entry will have to be destroyed. */ 542 goto done; 543 } 544 r->queue[head] = batch[--n]; 545 r->consumer_tail = head; 546 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 547 WRITE_ONCE(r->consumer_head, head); 548 } 549 550done: 551 /* Destroy all entries left in the batch. */ 552 while (n) 553 destroy(batch[--n]); 554 spin_unlock(&r->producer_lock); 555 spin_unlock_irqrestore(&r->consumer_lock, flags); 556} 557 558static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 559 int size, gfp_t gfp, 560 void (*destroy)(void *)) 561{ 562 int producer = 0; 563 void **old; 564 void *ptr; 565 566 while ((ptr = __ptr_ring_consume(r))) 567 if (producer < size) 568 queue[producer++] = ptr; 569 else if (destroy) 570 destroy(ptr); 571 572 if (producer >= size) 573 producer = 0; 574 __ptr_ring_set_size(r, size); 575 r->producer = producer; 576 r->consumer_head = 0; 577 r->consumer_tail = 0; 578 old = r->queue; 579 r->queue = queue; 580 581 return old; 582} 583 584/* 585 * Note: producer lock is nested within consumer lock, so if you 586 * resize you must make sure all uses nest correctly. 587 * In particular if you consume ring in interrupt or BH context, you must 588 * disable interrupts/BH when doing so. 589 */ 590static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 591 void (*destroy)(void *)) 592{ 593 unsigned long flags; 594 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 595 void **old; 596 597 if (!queue) 598 return -ENOMEM; 599 600 spin_lock_irqsave(&(r)->consumer_lock, flags); 601 spin_lock(&(r)->producer_lock); 602 603 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 604 605 spin_unlock(&(r)->producer_lock); 606 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 607 608 kvfree(old); 609 610 return 0; 611} 612 613/* 614 * Note: producer lock is nested within consumer lock, so if you 615 * resize you must make sure all uses nest correctly. 616 * In particular if you consume ring in interrupt or BH context, you must 617 * disable interrupts/BH when doing so. 618 */ 619static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 620 unsigned int nrings, 621 int size, 622 gfp_t gfp, void (*destroy)(void *)) 623{ 624 unsigned long flags; 625 void ***queues; 626 int i; 627 628 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 629 if (!queues) 630 goto noqueues; 631 632 for (i = 0; i < nrings; ++i) { 633 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 634 if (!queues[i]) 635 goto nomem; 636 } 637 638 for (i = 0; i < nrings; ++i) { 639 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 640 spin_lock(&(rings[i])->producer_lock); 641 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 642 size, gfp, destroy); 643 spin_unlock(&(rings[i])->producer_lock); 644 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 645 } 646 647 for (i = 0; i < nrings; ++i) 648 kvfree(queues[i]); 649 650 kfree(queues); 651 652 return 0; 653 654nomem: 655 while (--i >= 0) 656 kvfree(queues[i]); 657 658 kfree(queues); 659 660noqueues: 661 return -ENOMEM; 662} 663 664static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 665{ 666 void *ptr; 667 668 if (destroy) 669 while ((ptr = ptr_ring_consume(r))) 670 destroy(ptr); 671 kvfree(r->queue); 672} 673 674#endif /* _LINUX_PTR_RING_H */