cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

ring_buffer.c (165037B)


      1// SPDX-License-Identifier: GPL-2.0
      2/*
      3 * Generic ring buffer
      4 *
      5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
      6 */
      7#include <linux/trace_recursion.h>
      8#include <linux/trace_events.h>
      9#include <linux/ring_buffer.h>
     10#include <linux/trace_clock.h>
     11#include <linux/sched/clock.h>
     12#include <linux/trace_seq.h>
     13#include <linux/spinlock.h>
     14#include <linux/irq_work.h>
     15#include <linux/security.h>
     16#include <linux/uaccess.h>
     17#include <linux/hardirq.h>
     18#include <linux/kthread.h>	/* for self test */
     19#include <linux/module.h>
     20#include <linux/percpu.h>
     21#include <linux/mutex.h>
     22#include <linux/delay.h>
     23#include <linux/slab.h>
     24#include <linux/init.h>
     25#include <linux/hash.h>
     26#include <linux/list.h>
     27#include <linux/cpu.h>
     28#include <linux/oom.h>
     29
     30#include <asm/local.h>
     31
     32/*
     33 * The "absolute" timestamp in the buffer is only 59 bits.
     34 * If a clock has the 5 MSBs set, it needs to be saved and
     35 * reinserted.
     36 */
     37#define TS_MSB		(0xf8ULL << 56)
     38#define ABS_TS_MASK	(~TS_MSB)
     39
     40static void update_pages_handler(struct work_struct *work);
     41
     42/*
     43 * The ring buffer header is special. We must manually up keep it.
     44 */
     45int ring_buffer_print_entry_header(struct trace_seq *s)
     46{
     47	trace_seq_puts(s, "# compressed entry header\n");
     48	trace_seq_puts(s, "\ttype_len    :    5 bits\n");
     49	trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
     50	trace_seq_puts(s, "\tarray       :   32 bits\n");
     51	trace_seq_putc(s, '\n');
     52	trace_seq_printf(s, "\tpadding     : type == %d\n",
     53			 RINGBUF_TYPE_PADDING);
     54	trace_seq_printf(s, "\ttime_extend : type == %d\n",
     55			 RINGBUF_TYPE_TIME_EXTEND);
     56	trace_seq_printf(s, "\ttime_stamp : type == %d\n",
     57			 RINGBUF_TYPE_TIME_STAMP);
     58	trace_seq_printf(s, "\tdata max type_len  == %d\n",
     59			 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
     60
     61	return !trace_seq_has_overflowed(s);
     62}
     63
     64/*
     65 * The ring buffer is made up of a list of pages. A separate list of pages is
     66 * allocated for each CPU. A writer may only write to a buffer that is
     67 * associated with the CPU it is currently executing on.  A reader may read
     68 * from any per cpu buffer.
     69 *
     70 * The reader is special. For each per cpu buffer, the reader has its own
     71 * reader page. When a reader has read the entire reader page, this reader
     72 * page is swapped with another page in the ring buffer.
     73 *
     74 * Now, as long as the writer is off the reader page, the reader can do what
     75 * ever it wants with that page. The writer will never write to that page
     76 * again (as long as it is out of the ring buffer).
     77 *
     78 * Here's some silly ASCII art.
     79 *
     80 *   +------+
     81 *   |reader|          RING BUFFER
     82 *   |page  |
     83 *   +------+        +---+   +---+   +---+
     84 *                   |   |-->|   |-->|   |
     85 *                   +---+   +---+   +---+
     86 *                     ^               |
     87 *                     |               |
     88 *                     +---------------+
     89 *
     90 *
     91 *   +------+
     92 *   |reader|          RING BUFFER
     93 *   |page  |------------------v
     94 *   +------+        +---+   +---+   +---+
     95 *                   |   |-->|   |-->|   |
     96 *                   +---+   +---+   +---+
     97 *                     ^               |
     98 *                     |               |
     99 *                     +---------------+
    100 *
    101 *
    102 *   +------+
    103 *   |reader|          RING BUFFER
    104 *   |page  |------------------v
    105 *   +------+        +---+   +---+   +---+
    106 *      ^            |   |-->|   |-->|   |
    107 *      |            +---+   +---+   +---+
    108 *      |                              |
    109 *      |                              |
    110 *      +------------------------------+
    111 *
    112 *
    113 *   +------+
    114 *   |buffer|          RING BUFFER
    115 *   |page  |------------------v
    116 *   +------+        +---+   +---+   +---+
    117 *      ^            |   |   |   |-->|   |
    118 *      |   New      +---+   +---+   +---+
    119 *      |  Reader------^               |
    120 *      |   page                       |
    121 *      +------------------------------+
    122 *
    123 *
    124 * After we make this swap, the reader can hand this page off to the splice
    125 * code and be done with it. It can even allocate a new page if it needs to
    126 * and swap that into the ring buffer.
    127 *
    128 * We will be using cmpxchg soon to make all this lockless.
    129 *
    130 */
    131
    132/* Used for individual buffers (after the counter) */
    133#define RB_BUFFER_OFF		(1 << 20)
    134
    135#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
    136
    137#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
    138#define RB_ALIGNMENT		4U
    139#define RB_MAX_SMALL_DATA	(RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
    140#define RB_EVNT_MIN_SIZE	8U	/* two 32bit words */
    141
    142#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
    143# define RB_FORCE_8BYTE_ALIGNMENT	0
    144# define RB_ARCH_ALIGNMENT		RB_ALIGNMENT
    145#else
    146# define RB_FORCE_8BYTE_ALIGNMENT	1
    147# define RB_ARCH_ALIGNMENT		8U
    148#endif
    149
    150#define RB_ALIGN_DATA		__aligned(RB_ARCH_ALIGNMENT)
    151
    152/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
    153#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
    154
    155enum {
    156	RB_LEN_TIME_EXTEND = 8,
    157	RB_LEN_TIME_STAMP =  8,
    158};
    159
    160#define skip_time_extend(event) \
    161	((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
    162
    163#define extended_time(event) \
    164	(event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
    165
    166static inline int rb_null_event(struct ring_buffer_event *event)
    167{
    168	return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
    169}
    170
    171static void rb_event_set_padding(struct ring_buffer_event *event)
    172{
    173	/* padding has a NULL time_delta */
    174	event->type_len = RINGBUF_TYPE_PADDING;
    175	event->time_delta = 0;
    176}
    177
    178static unsigned
    179rb_event_data_length(struct ring_buffer_event *event)
    180{
    181	unsigned length;
    182
    183	if (event->type_len)
    184		length = event->type_len * RB_ALIGNMENT;
    185	else
    186		length = event->array[0];
    187	return length + RB_EVNT_HDR_SIZE;
    188}
    189
    190/*
    191 * Return the length of the given event. Will return
    192 * the length of the time extend if the event is a
    193 * time extend.
    194 */
    195static inline unsigned
    196rb_event_length(struct ring_buffer_event *event)
    197{
    198	switch (event->type_len) {
    199	case RINGBUF_TYPE_PADDING:
    200		if (rb_null_event(event))
    201			/* undefined */
    202			return -1;
    203		return  event->array[0] + RB_EVNT_HDR_SIZE;
    204
    205	case RINGBUF_TYPE_TIME_EXTEND:
    206		return RB_LEN_TIME_EXTEND;
    207
    208	case RINGBUF_TYPE_TIME_STAMP:
    209		return RB_LEN_TIME_STAMP;
    210
    211	case RINGBUF_TYPE_DATA:
    212		return rb_event_data_length(event);
    213	default:
    214		WARN_ON_ONCE(1);
    215	}
    216	/* not hit */
    217	return 0;
    218}
    219
    220/*
    221 * Return total length of time extend and data,
    222 *   or just the event length for all other events.
    223 */
    224static inline unsigned
    225rb_event_ts_length(struct ring_buffer_event *event)
    226{
    227	unsigned len = 0;
    228
    229	if (extended_time(event)) {
    230		/* time extends include the data event after it */
    231		len = RB_LEN_TIME_EXTEND;
    232		event = skip_time_extend(event);
    233	}
    234	return len + rb_event_length(event);
    235}
    236
    237/**
    238 * ring_buffer_event_length - return the length of the event
    239 * @event: the event to get the length of
    240 *
    241 * Returns the size of the data load of a data event.
    242 * If the event is something other than a data event, it
    243 * returns the size of the event itself. With the exception
    244 * of a TIME EXTEND, where it still returns the size of the
    245 * data load of the data event after it.
    246 */
    247unsigned ring_buffer_event_length(struct ring_buffer_event *event)
    248{
    249	unsigned length;
    250
    251	if (extended_time(event))
    252		event = skip_time_extend(event);
    253
    254	length = rb_event_length(event);
    255	if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
    256		return length;
    257	length -= RB_EVNT_HDR_SIZE;
    258	if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
    259                length -= sizeof(event->array[0]);
    260	return length;
    261}
    262EXPORT_SYMBOL_GPL(ring_buffer_event_length);
    263
    264/* inline for ring buffer fast paths */
    265static __always_inline void *
    266rb_event_data(struct ring_buffer_event *event)
    267{
    268	if (extended_time(event))
    269		event = skip_time_extend(event);
    270	WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
    271	/* If length is in len field, then array[0] has the data */
    272	if (event->type_len)
    273		return (void *)&event->array[0];
    274	/* Otherwise length is in array[0] and array[1] has the data */
    275	return (void *)&event->array[1];
    276}
    277
    278/**
    279 * ring_buffer_event_data - return the data of the event
    280 * @event: the event to get the data from
    281 */
    282void *ring_buffer_event_data(struct ring_buffer_event *event)
    283{
    284	return rb_event_data(event);
    285}
    286EXPORT_SYMBOL_GPL(ring_buffer_event_data);
    287
    288#define for_each_buffer_cpu(buffer, cpu)		\
    289	for_each_cpu(cpu, buffer->cpumask)
    290
    291#define for_each_online_buffer_cpu(buffer, cpu)		\
    292	for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
    293
    294#define TS_SHIFT	27
    295#define TS_MASK		((1ULL << TS_SHIFT) - 1)
    296#define TS_DELTA_TEST	(~TS_MASK)
    297
    298static u64 rb_event_time_stamp(struct ring_buffer_event *event)
    299{
    300	u64 ts;
    301
    302	ts = event->array[0];
    303	ts <<= TS_SHIFT;
    304	ts += event->time_delta;
    305
    306	return ts;
    307}
    308
    309/* Flag when events were overwritten */
    310#define RB_MISSED_EVENTS	(1 << 31)
    311/* Missed count stored at end */
    312#define RB_MISSED_STORED	(1 << 30)
    313
    314struct buffer_data_page {
    315	u64		 time_stamp;	/* page time stamp */
    316	local_t		 commit;	/* write committed index */
    317	unsigned char	 data[] RB_ALIGN_DATA;	/* data of buffer page */
    318};
    319
    320/*
    321 * Note, the buffer_page list must be first. The buffer pages
    322 * are allocated in cache lines, which means that each buffer
    323 * page will be at the beginning of a cache line, and thus
    324 * the least significant bits will be zero. We use this to
    325 * add flags in the list struct pointers, to make the ring buffer
    326 * lockless.
    327 */
    328struct buffer_page {
    329	struct list_head list;		/* list of buffer pages */
    330	local_t		 write;		/* index for next write */
    331	unsigned	 read;		/* index for next read */
    332	local_t		 entries;	/* entries on this page */
    333	unsigned long	 real_end;	/* real end of data */
    334	struct buffer_data_page *page;	/* Actual data page */
    335};
    336
    337/*
    338 * The buffer page counters, write and entries, must be reset
    339 * atomically when crossing page boundaries. To synchronize this
    340 * update, two counters are inserted into the number. One is
    341 * the actual counter for the write position or count on the page.
    342 *
    343 * The other is a counter of updaters. Before an update happens
    344 * the update partition of the counter is incremented. This will
    345 * allow the updater to update the counter atomically.
    346 *
    347 * The counter is 20 bits, and the state data is 12.
    348 */
    349#define RB_WRITE_MASK		0xfffff
    350#define RB_WRITE_INTCNT		(1 << 20)
    351
    352static void rb_init_page(struct buffer_data_page *bpage)
    353{
    354	local_set(&bpage->commit, 0);
    355}
    356
    357/*
    358 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
    359 * this issue out.
    360 */
    361static void free_buffer_page(struct buffer_page *bpage)
    362{
    363	free_page((unsigned long)bpage->page);
    364	kfree(bpage);
    365}
    366
    367/*
    368 * We need to fit the time_stamp delta into 27 bits.
    369 */
    370static inline int test_time_stamp(u64 delta)
    371{
    372	if (delta & TS_DELTA_TEST)
    373		return 1;
    374	return 0;
    375}
    376
    377#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
    378
    379/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
    380#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
    381
    382int ring_buffer_print_page_header(struct trace_seq *s)
    383{
    384	struct buffer_data_page field;
    385
    386	trace_seq_printf(s, "\tfield: u64 timestamp;\t"
    387			 "offset:0;\tsize:%u;\tsigned:%u;\n",
    388			 (unsigned int)sizeof(field.time_stamp),
    389			 (unsigned int)is_signed_type(u64));
    390
    391	trace_seq_printf(s, "\tfield: local_t commit;\t"
    392			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
    393			 (unsigned int)offsetof(typeof(field), commit),
    394			 (unsigned int)sizeof(field.commit),
    395			 (unsigned int)is_signed_type(long));
    396
    397	trace_seq_printf(s, "\tfield: int overwrite;\t"
    398			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
    399			 (unsigned int)offsetof(typeof(field), commit),
    400			 1,
    401			 (unsigned int)is_signed_type(long));
    402
    403	trace_seq_printf(s, "\tfield: char data;\t"
    404			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
    405			 (unsigned int)offsetof(typeof(field), data),
    406			 (unsigned int)BUF_PAGE_SIZE,
    407			 (unsigned int)is_signed_type(char));
    408
    409	return !trace_seq_has_overflowed(s);
    410}
    411
    412struct rb_irq_work {
    413	struct irq_work			work;
    414	wait_queue_head_t		waiters;
    415	wait_queue_head_t		full_waiters;
    416	bool				waiters_pending;
    417	bool				full_waiters_pending;
    418	bool				wakeup_full;
    419};
    420
    421/*
    422 * Structure to hold event state and handle nested events.
    423 */
    424struct rb_event_info {
    425	u64			ts;
    426	u64			delta;
    427	u64			before;
    428	u64			after;
    429	unsigned long		length;
    430	struct buffer_page	*tail_page;
    431	int			add_timestamp;
    432};
    433
    434/*
    435 * Used for the add_timestamp
    436 *  NONE
    437 *  EXTEND - wants a time extend
    438 *  ABSOLUTE - the buffer requests all events to have absolute time stamps
    439 *  FORCE - force a full time stamp.
    440 */
    441enum {
    442	RB_ADD_STAMP_NONE		= 0,
    443	RB_ADD_STAMP_EXTEND		= BIT(1),
    444	RB_ADD_STAMP_ABSOLUTE		= BIT(2),
    445	RB_ADD_STAMP_FORCE		= BIT(3)
    446};
    447/*
    448 * Used for which event context the event is in.
    449 *  TRANSITION = 0
    450 *  NMI     = 1
    451 *  IRQ     = 2
    452 *  SOFTIRQ = 3
    453 *  NORMAL  = 4
    454 *
    455 * See trace_recursive_lock() comment below for more details.
    456 */
    457enum {
    458	RB_CTX_TRANSITION,
    459	RB_CTX_NMI,
    460	RB_CTX_IRQ,
    461	RB_CTX_SOFTIRQ,
    462	RB_CTX_NORMAL,
    463	RB_CTX_MAX
    464};
    465
    466#if BITS_PER_LONG == 32
    467#define RB_TIME_32
    468#endif
    469
    470/* To test on 64 bit machines */
    471//#define RB_TIME_32
    472
    473#ifdef RB_TIME_32
    474
    475struct rb_time_struct {
    476	local_t		cnt;
    477	local_t		top;
    478	local_t		bottom;
    479	local_t		msb;
    480};
    481#else
    482#include <asm/local64.h>
    483struct rb_time_struct {
    484	local64_t	time;
    485};
    486#endif
    487typedef struct rb_time_struct rb_time_t;
    488
    489#define MAX_NEST	5
    490
    491/*
    492 * head_page == tail_page && head == tail then buffer is empty.
    493 */
    494struct ring_buffer_per_cpu {
    495	int				cpu;
    496	atomic_t			record_disabled;
    497	atomic_t			resize_disabled;
    498	struct trace_buffer	*buffer;
    499	raw_spinlock_t			reader_lock;	/* serialize readers */
    500	arch_spinlock_t			lock;
    501	struct lock_class_key		lock_key;
    502	struct buffer_data_page		*free_page;
    503	unsigned long			nr_pages;
    504	unsigned int			current_context;
    505	struct list_head		*pages;
    506	struct buffer_page		*head_page;	/* read from head */
    507	struct buffer_page		*tail_page;	/* write to tail */
    508	struct buffer_page		*commit_page;	/* committed pages */
    509	struct buffer_page		*reader_page;
    510	unsigned long			lost_events;
    511	unsigned long			last_overrun;
    512	unsigned long			nest;
    513	local_t				entries_bytes;
    514	local_t				entries;
    515	local_t				overrun;
    516	local_t				commit_overrun;
    517	local_t				dropped_events;
    518	local_t				committing;
    519	local_t				commits;
    520	local_t				pages_touched;
    521	local_t				pages_read;
    522	long				last_pages_touch;
    523	size_t				shortest_full;
    524	unsigned long			read;
    525	unsigned long			read_bytes;
    526	rb_time_t			write_stamp;
    527	rb_time_t			before_stamp;
    528	u64				event_stamp[MAX_NEST];
    529	u64				read_stamp;
    530	/* ring buffer pages to update, > 0 to add, < 0 to remove */
    531	long				nr_pages_to_update;
    532	struct list_head		new_pages; /* new pages to add */
    533	struct work_struct		update_pages_work;
    534	struct completion		update_done;
    535
    536	struct rb_irq_work		irq_work;
    537};
    538
    539struct trace_buffer {
    540	unsigned			flags;
    541	int				cpus;
    542	atomic_t			record_disabled;
    543	cpumask_var_t			cpumask;
    544
    545	struct lock_class_key		*reader_lock_key;
    546
    547	struct mutex			mutex;
    548
    549	struct ring_buffer_per_cpu	**buffers;
    550
    551	struct hlist_node		node;
    552	u64				(*clock)(void);
    553
    554	struct rb_irq_work		irq_work;
    555	bool				time_stamp_abs;
    556};
    557
    558struct ring_buffer_iter {
    559	struct ring_buffer_per_cpu	*cpu_buffer;
    560	unsigned long			head;
    561	unsigned long			next_event;
    562	struct buffer_page		*head_page;
    563	struct buffer_page		*cache_reader_page;
    564	unsigned long			cache_read;
    565	u64				read_stamp;
    566	u64				page_stamp;
    567	struct ring_buffer_event	*event;
    568	int				missed_events;
    569};
    570
    571#ifdef RB_TIME_32
    572
    573/*
    574 * On 32 bit machines, local64_t is very expensive. As the ring
    575 * buffer doesn't need all the features of a true 64 bit atomic,
    576 * on 32 bit, it uses these functions (64 still uses local64_t).
    577 *
    578 * For the ring buffer, 64 bit required operations for the time is
    579 * the following:
    580 *
    581 *  - Reads may fail if it interrupted a modification of the time stamp.
    582 *      It will succeed if it did not interrupt another write even if
    583 *      the read itself is interrupted by a write.
    584 *      It returns whether it was successful or not.
    585 *
    586 *  - Writes always succeed and will overwrite other writes and writes
    587 *      that were done by events interrupting the current write.
    588 *
    589 *  - A write followed by a read of the same time stamp will always succeed,
    590 *      but may not contain the same value.
    591 *
    592 *  - A cmpxchg will fail if it interrupted another write or cmpxchg.
    593 *      Other than that, it acts like a normal cmpxchg.
    594 *
    595 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
    596 *  (bottom being the least significant 30 bits of the 60 bit time stamp).
    597 *
    598 * The two most significant bits of each half holds a 2 bit counter (0-3).
    599 * Each update will increment this counter by one.
    600 * When reading the top and bottom, if the two counter bits match then the
    601 *  top and bottom together make a valid 60 bit number.
    602 */
    603#define RB_TIME_SHIFT	30
    604#define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
    605#define RB_TIME_MSB_SHIFT	 60
    606
    607static inline int rb_time_cnt(unsigned long val)
    608{
    609	return (val >> RB_TIME_SHIFT) & 3;
    610}
    611
    612static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
    613{
    614	u64 val;
    615
    616	val = top & RB_TIME_VAL_MASK;
    617	val <<= RB_TIME_SHIFT;
    618	val |= bottom & RB_TIME_VAL_MASK;
    619
    620	return val;
    621}
    622
    623static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
    624{
    625	unsigned long top, bottom, msb;
    626	unsigned long c;
    627
    628	/*
    629	 * If the read is interrupted by a write, then the cnt will
    630	 * be different. Loop until both top and bottom have been read
    631	 * without interruption.
    632	 */
    633	do {
    634		c = local_read(&t->cnt);
    635		top = local_read(&t->top);
    636		bottom = local_read(&t->bottom);
    637		msb = local_read(&t->msb);
    638	} while (c != local_read(&t->cnt));
    639
    640	*cnt = rb_time_cnt(top);
    641
    642	/* If top and bottom counts don't match, this interrupted a write */
    643	if (*cnt != rb_time_cnt(bottom))
    644		return false;
    645
    646	/* The shift to msb will lose its cnt bits */
    647	*ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT);
    648	return true;
    649}
    650
    651static bool rb_time_read(rb_time_t *t, u64 *ret)
    652{
    653	unsigned long cnt;
    654
    655	return __rb_time_read(t, ret, &cnt);
    656}
    657
    658static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
    659{
    660	return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
    661}
    662
    663static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom,
    664				 unsigned long *msb)
    665{
    666	*top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
    667	*bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
    668	*msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT);
    669}
    670
    671static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
    672{
    673	val = rb_time_val_cnt(val, cnt);
    674	local_set(t, val);
    675}
    676
    677static void rb_time_set(rb_time_t *t, u64 val)
    678{
    679	unsigned long cnt, top, bottom, msb;
    680
    681	rb_time_split(val, &top, &bottom, &msb);
    682
    683	/* Writes always succeed with a valid number even if it gets interrupted. */
    684	do {
    685		cnt = local_inc_return(&t->cnt);
    686		rb_time_val_set(&t->top, top, cnt);
    687		rb_time_val_set(&t->bottom, bottom, cnt);
    688		rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt);
    689	} while (cnt != local_read(&t->cnt));
    690}
    691
    692static inline bool
    693rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
    694{
    695	unsigned long ret;
    696
    697	ret = local_cmpxchg(l, expect, set);
    698	return ret == expect;
    699}
    700
    701static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
    702{
    703	unsigned long cnt, top, bottom, msb;
    704	unsigned long cnt2, top2, bottom2, msb2;
    705	u64 val;
    706
    707	/* The cmpxchg always fails if it interrupted an update */
    708	 if (!__rb_time_read(t, &val, &cnt2))
    709		 return false;
    710
    711	 if (val != expect)
    712		 return false;
    713
    714	 cnt = local_read(&t->cnt);
    715	 if ((cnt & 3) != cnt2)
    716		 return false;
    717
    718	 cnt2 = cnt + 1;
    719
    720	 rb_time_split(val, &top, &bottom, &msb);
    721	 top = rb_time_val_cnt(top, cnt);
    722	 bottom = rb_time_val_cnt(bottom, cnt);
    723
    724	 rb_time_split(set, &top2, &bottom2, &msb2);
    725	 top2 = rb_time_val_cnt(top2, cnt2);
    726	 bottom2 = rb_time_val_cnt(bottom2, cnt2);
    727
    728	if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
    729		return false;
    730	if (!rb_time_read_cmpxchg(&t->msb, msb, msb2))
    731		return false;
    732	if (!rb_time_read_cmpxchg(&t->top, top, top2))
    733		return false;
    734	if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
    735		return false;
    736	return true;
    737}
    738
    739#else /* 64 bits */
    740
    741/* local64_t always succeeds */
    742
    743static inline bool rb_time_read(rb_time_t *t, u64 *ret)
    744{
    745	*ret = local64_read(&t->time);
    746	return true;
    747}
    748static void rb_time_set(rb_time_t *t, u64 val)
    749{
    750	local64_set(&t->time, val);
    751}
    752
    753static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
    754{
    755	u64 val;
    756	val = local64_cmpxchg(&t->time, expect, set);
    757	return val == expect;
    758}
    759#endif
    760
    761/*
    762 * Enable this to make sure that the event passed to
    763 * ring_buffer_event_time_stamp() is not committed and also
    764 * is on the buffer that it passed in.
    765 */
    766//#define RB_VERIFY_EVENT
    767#ifdef RB_VERIFY_EVENT
    768static struct list_head *rb_list_head(struct list_head *list);
    769static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
    770			 void *event)
    771{
    772	struct buffer_page *page = cpu_buffer->commit_page;
    773	struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
    774	struct list_head *next;
    775	long commit, write;
    776	unsigned long addr = (unsigned long)event;
    777	bool done = false;
    778	int stop = 0;
    779
    780	/* Make sure the event exists and is not committed yet */
    781	do {
    782		if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
    783			done = true;
    784		commit = local_read(&page->page->commit);
    785		write = local_read(&page->write);
    786		if (addr >= (unsigned long)&page->page->data[commit] &&
    787		    addr < (unsigned long)&page->page->data[write])
    788			return;
    789
    790		next = rb_list_head(page->list.next);
    791		page = list_entry(next, struct buffer_page, list);
    792	} while (!done);
    793	WARN_ON_ONCE(1);
    794}
    795#else
    796static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
    797			 void *event)
    798{
    799}
    800#endif
    801
    802/*
    803 * The absolute time stamp drops the 5 MSBs and some clocks may
    804 * require them. The rb_fix_abs_ts() will take a previous full
    805 * time stamp, and add the 5 MSB of that time stamp on to the
    806 * saved absolute time stamp. Then they are compared in case of
    807 * the unlikely event that the latest time stamp incremented
    808 * the 5 MSB.
    809 */
    810static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
    811{
    812	if (save_ts & TS_MSB) {
    813		abs |= save_ts & TS_MSB;
    814		/* Check for overflow */
    815		if (unlikely(abs < save_ts))
    816			abs += 1ULL << 59;
    817	}
    818	return abs;
    819}
    820
    821static inline u64 rb_time_stamp(struct trace_buffer *buffer);
    822
    823/**
    824 * ring_buffer_event_time_stamp - return the event's current time stamp
    825 * @buffer: The buffer that the event is on
    826 * @event: the event to get the time stamp of
    827 *
    828 * Note, this must be called after @event is reserved, and before it is
    829 * committed to the ring buffer. And must be called from the same
    830 * context where the event was reserved (normal, softirq, irq, etc).
    831 *
    832 * Returns the time stamp associated with the current event.
    833 * If the event has an extended time stamp, then that is used as
    834 * the time stamp to return.
    835 * In the highly unlikely case that the event was nested more than
    836 * the max nesting, then the write_stamp of the buffer is returned,
    837 * otherwise  current time is returned, but that really neither of
    838 * the last two cases should ever happen.
    839 */
    840u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
    841				 struct ring_buffer_event *event)
    842{
    843	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
    844	unsigned int nest;
    845	u64 ts;
    846
    847	/* If the event includes an absolute time, then just use that */
    848	if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
    849		ts = rb_event_time_stamp(event);
    850		return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
    851	}
    852
    853	nest = local_read(&cpu_buffer->committing);
    854	verify_event(cpu_buffer, event);
    855	if (WARN_ON_ONCE(!nest))
    856		goto fail;
    857
    858	/* Read the current saved nesting level time stamp */
    859	if (likely(--nest < MAX_NEST))
    860		return cpu_buffer->event_stamp[nest];
    861
    862	/* Shouldn't happen, warn if it does */
    863	WARN_ONCE(1, "nest (%d) greater than max", nest);
    864
    865 fail:
    866	/* Can only fail on 32 bit */
    867	if (!rb_time_read(&cpu_buffer->write_stamp, &ts))
    868		/* Screw it, just read the current time */
    869		ts = rb_time_stamp(cpu_buffer->buffer);
    870
    871	return ts;
    872}
    873
    874/**
    875 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
    876 * @buffer: The ring_buffer to get the number of pages from
    877 * @cpu: The cpu of the ring_buffer to get the number of pages from
    878 *
    879 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
    880 */
    881size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
    882{
    883	return buffer->buffers[cpu]->nr_pages;
    884}
    885
    886/**
    887 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
    888 * @buffer: The ring_buffer to get the number of pages from
    889 * @cpu: The cpu of the ring_buffer to get the number of pages from
    890 *
    891 * Returns the number of pages that have content in the ring buffer.
    892 */
    893size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
    894{
    895	size_t read;
    896	size_t cnt;
    897
    898	read = local_read(&buffer->buffers[cpu]->pages_read);
    899	cnt = local_read(&buffer->buffers[cpu]->pages_touched);
    900	/* The reader can read an empty page, but not more than that */
    901	if (cnt < read) {
    902		WARN_ON_ONCE(read > cnt + 1);
    903		return 0;
    904	}
    905
    906	return cnt - read;
    907}
    908
    909/*
    910 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
    911 *
    912 * Schedules a delayed work to wake up any task that is blocked on the
    913 * ring buffer waiters queue.
    914 */
    915static void rb_wake_up_waiters(struct irq_work *work)
    916{
    917	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
    918
    919	wake_up_all(&rbwork->waiters);
    920	if (rbwork->wakeup_full) {
    921		rbwork->wakeup_full = false;
    922		wake_up_all(&rbwork->full_waiters);
    923	}
    924}
    925
    926/**
    927 * ring_buffer_wait - wait for input to the ring buffer
    928 * @buffer: buffer to wait on
    929 * @cpu: the cpu buffer to wait on
    930 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
    931 *
    932 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
    933 * as data is added to any of the @buffer's cpu buffers. Otherwise
    934 * it will wait for data to be added to a specific cpu buffer.
    935 */
    936int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
    937{
    938	struct ring_buffer_per_cpu *cpu_buffer;
    939	DEFINE_WAIT(wait);
    940	struct rb_irq_work *work;
    941	int ret = 0;
    942
    943	/*
    944	 * Depending on what the caller is waiting for, either any
    945	 * data in any cpu buffer, or a specific buffer, put the
    946	 * caller on the appropriate wait queue.
    947	 */
    948	if (cpu == RING_BUFFER_ALL_CPUS) {
    949		work = &buffer->irq_work;
    950		/* Full only makes sense on per cpu reads */
    951		full = 0;
    952	} else {
    953		if (!cpumask_test_cpu(cpu, buffer->cpumask))
    954			return -ENODEV;
    955		cpu_buffer = buffer->buffers[cpu];
    956		work = &cpu_buffer->irq_work;
    957	}
    958
    959
    960	while (true) {
    961		if (full)
    962			prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
    963		else
    964			prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
    965
    966		/*
    967		 * The events can happen in critical sections where
    968		 * checking a work queue can cause deadlocks.
    969		 * After adding a task to the queue, this flag is set
    970		 * only to notify events to try to wake up the queue
    971		 * using irq_work.
    972		 *
    973		 * We don't clear it even if the buffer is no longer
    974		 * empty. The flag only causes the next event to run
    975		 * irq_work to do the work queue wake up. The worse
    976		 * that can happen if we race with !trace_empty() is that
    977		 * an event will cause an irq_work to try to wake up
    978		 * an empty queue.
    979		 *
    980		 * There's no reason to protect this flag either, as
    981		 * the work queue and irq_work logic will do the necessary
    982		 * synchronization for the wake ups. The only thing
    983		 * that is necessary is that the wake up happens after
    984		 * a task has been queued. It's OK for spurious wake ups.
    985		 */
    986		if (full)
    987			work->full_waiters_pending = true;
    988		else
    989			work->waiters_pending = true;
    990
    991		if (signal_pending(current)) {
    992			ret = -EINTR;
    993			break;
    994		}
    995
    996		if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
    997			break;
    998
    999		if (cpu != RING_BUFFER_ALL_CPUS &&
   1000		    !ring_buffer_empty_cpu(buffer, cpu)) {
   1001			unsigned long flags;
   1002			bool pagebusy;
   1003			size_t nr_pages;
   1004			size_t dirty;
   1005
   1006			if (!full)
   1007				break;
   1008
   1009			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   1010			pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
   1011			nr_pages = cpu_buffer->nr_pages;
   1012			dirty = ring_buffer_nr_dirty_pages(buffer, cpu);
   1013			if (!cpu_buffer->shortest_full ||
   1014			    cpu_buffer->shortest_full < full)
   1015				cpu_buffer->shortest_full = full;
   1016			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   1017			if (!pagebusy &&
   1018			    (!nr_pages || (dirty * 100) > full * nr_pages))
   1019				break;
   1020		}
   1021
   1022		schedule();
   1023	}
   1024
   1025	if (full)
   1026		finish_wait(&work->full_waiters, &wait);
   1027	else
   1028		finish_wait(&work->waiters, &wait);
   1029
   1030	return ret;
   1031}
   1032
   1033/**
   1034 * ring_buffer_poll_wait - poll on buffer input
   1035 * @buffer: buffer to wait on
   1036 * @cpu: the cpu buffer to wait on
   1037 * @filp: the file descriptor
   1038 * @poll_table: The poll descriptor
   1039 *
   1040 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
   1041 * as data is added to any of the @buffer's cpu buffers. Otherwise
   1042 * it will wait for data to be added to a specific cpu buffer.
   1043 *
   1044 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
   1045 * zero otherwise.
   1046 */
   1047__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
   1048			  struct file *filp, poll_table *poll_table)
   1049{
   1050	struct ring_buffer_per_cpu *cpu_buffer;
   1051	struct rb_irq_work *work;
   1052
   1053	if (cpu == RING_BUFFER_ALL_CPUS)
   1054		work = &buffer->irq_work;
   1055	else {
   1056		if (!cpumask_test_cpu(cpu, buffer->cpumask))
   1057			return -EINVAL;
   1058
   1059		cpu_buffer = buffer->buffers[cpu];
   1060		work = &cpu_buffer->irq_work;
   1061	}
   1062
   1063	poll_wait(filp, &work->waiters, poll_table);
   1064	work->waiters_pending = true;
   1065	/*
   1066	 * There's a tight race between setting the waiters_pending and
   1067	 * checking if the ring buffer is empty.  Once the waiters_pending bit
   1068	 * is set, the next event will wake the task up, but we can get stuck
   1069	 * if there's only a single event in.
   1070	 *
   1071	 * FIXME: Ideally, we need a memory barrier on the writer side as well,
   1072	 * but adding a memory barrier to all events will cause too much of a
   1073	 * performance hit in the fast path.  We only need a memory barrier when
   1074	 * the buffer goes from empty to having content.  But as this race is
   1075	 * extremely small, and it's not a problem if another event comes in, we
   1076	 * will fix it later.
   1077	 */
   1078	smp_mb();
   1079
   1080	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
   1081	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
   1082		return EPOLLIN | EPOLLRDNORM;
   1083	return 0;
   1084}
   1085
   1086/* buffer may be either ring_buffer or ring_buffer_per_cpu */
   1087#define RB_WARN_ON(b, cond)						\
   1088	({								\
   1089		int _____ret = unlikely(cond);				\
   1090		if (_____ret) {						\
   1091			if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
   1092				struct ring_buffer_per_cpu *__b =	\
   1093					(void *)b;			\
   1094				atomic_inc(&__b->buffer->record_disabled); \
   1095			} else						\
   1096				atomic_inc(&b->record_disabled);	\
   1097			WARN_ON(1);					\
   1098		}							\
   1099		_____ret;						\
   1100	})
   1101
   1102/* Up this if you want to test the TIME_EXTENTS and normalization */
   1103#define DEBUG_SHIFT 0
   1104
   1105static inline u64 rb_time_stamp(struct trace_buffer *buffer)
   1106{
   1107	u64 ts;
   1108
   1109	/* Skip retpolines :-( */
   1110	if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local))
   1111		ts = trace_clock_local();
   1112	else
   1113		ts = buffer->clock();
   1114
   1115	/* shift to debug/test normalization and TIME_EXTENTS */
   1116	return ts << DEBUG_SHIFT;
   1117}
   1118
   1119u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
   1120{
   1121	u64 time;
   1122
   1123	preempt_disable_notrace();
   1124	time = rb_time_stamp(buffer);
   1125	preempt_enable_notrace();
   1126
   1127	return time;
   1128}
   1129EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
   1130
   1131void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
   1132				      int cpu, u64 *ts)
   1133{
   1134	/* Just stupid testing the normalize function and deltas */
   1135	*ts >>= DEBUG_SHIFT;
   1136}
   1137EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
   1138
   1139/*
   1140 * Making the ring buffer lockless makes things tricky.
   1141 * Although writes only happen on the CPU that they are on,
   1142 * and they only need to worry about interrupts. Reads can
   1143 * happen on any CPU.
   1144 *
   1145 * The reader page is always off the ring buffer, but when the
   1146 * reader finishes with a page, it needs to swap its page with
   1147 * a new one from the buffer. The reader needs to take from
   1148 * the head (writes go to the tail). But if a writer is in overwrite
   1149 * mode and wraps, it must push the head page forward.
   1150 *
   1151 * Here lies the problem.
   1152 *
   1153 * The reader must be careful to replace only the head page, and
   1154 * not another one. As described at the top of the file in the
   1155 * ASCII art, the reader sets its old page to point to the next
   1156 * page after head. It then sets the page after head to point to
   1157 * the old reader page. But if the writer moves the head page
   1158 * during this operation, the reader could end up with the tail.
   1159 *
   1160 * We use cmpxchg to help prevent this race. We also do something
   1161 * special with the page before head. We set the LSB to 1.
   1162 *
   1163 * When the writer must push the page forward, it will clear the
   1164 * bit that points to the head page, move the head, and then set
   1165 * the bit that points to the new head page.
   1166 *
   1167 * We also don't want an interrupt coming in and moving the head
   1168 * page on another writer. Thus we use the second LSB to catch
   1169 * that too. Thus:
   1170 *
   1171 * head->list->prev->next        bit 1          bit 0
   1172 *                              -------        -------
   1173 * Normal page                     0              0
   1174 * Points to head page             0              1
   1175 * New head page                   1              0
   1176 *
   1177 * Note we can not trust the prev pointer of the head page, because:
   1178 *
   1179 * +----+       +-----+        +-----+
   1180 * |    |------>|  T  |---X--->|  N  |
   1181 * |    |<------|     |        |     |
   1182 * +----+       +-----+        +-----+
   1183 *   ^                           ^ |
   1184 *   |          +-----+          | |
   1185 *   +----------|  R  |----------+ |
   1186 *              |     |<-----------+
   1187 *              +-----+
   1188 *
   1189 * Key:  ---X-->  HEAD flag set in pointer
   1190 *         T      Tail page
   1191 *         R      Reader page
   1192 *         N      Next page
   1193 *
   1194 * (see __rb_reserve_next() to see where this happens)
   1195 *
   1196 *  What the above shows is that the reader just swapped out
   1197 *  the reader page with a page in the buffer, but before it
   1198 *  could make the new header point back to the new page added
   1199 *  it was preempted by a writer. The writer moved forward onto
   1200 *  the new page added by the reader and is about to move forward
   1201 *  again.
   1202 *
   1203 *  You can see, it is legitimate for the previous pointer of
   1204 *  the head (or any page) not to point back to itself. But only
   1205 *  temporarily.
   1206 */
   1207
   1208#define RB_PAGE_NORMAL		0UL
   1209#define RB_PAGE_HEAD		1UL
   1210#define RB_PAGE_UPDATE		2UL
   1211
   1212
   1213#define RB_FLAG_MASK		3UL
   1214
   1215/* PAGE_MOVED is not part of the mask */
   1216#define RB_PAGE_MOVED		4UL
   1217
   1218/*
   1219 * rb_list_head - remove any bit
   1220 */
   1221static struct list_head *rb_list_head(struct list_head *list)
   1222{
   1223	unsigned long val = (unsigned long)list;
   1224
   1225	return (struct list_head *)(val & ~RB_FLAG_MASK);
   1226}
   1227
   1228/*
   1229 * rb_is_head_page - test if the given page is the head page
   1230 *
   1231 * Because the reader may move the head_page pointer, we can
   1232 * not trust what the head page is (it may be pointing to
   1233 * the reader page). But if the next page is a header page,
   1234 * its flags will be non zero.
   1235 */
   1236static inline int
   1237rb_is_head_page(struct buffer_page *page, struct list_head *list)
   1238{
   1239	unsigned long val;
   1240
   1241	val = (unsigned long)list->next;
   1242
   1243	if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
   1244		return RB_PAGE_MOVED;
   1245
   1246	return val & RB_FLAG_MASK;
   1247}
   1248
   1249/*
   1250 * rb_is_reader_page
   1251 *
   1252 * The unique thing about the reader page, is that, if the
   1253 * writer is ever on it, the previous pointer never points
   1254 * back to the reader page.
   1255 */
   1256static bool rb_is_reader_page(struct buffer_page *page)
   1257{
   1258	struct list_head *list = page->list.prev;
   1259
   1260	return rb_list_head(list->next) != &page->list;
   1261}
   1262
   1263/*
   1264 * rb_set_list_to_head - set a list_head to be pointing to head.
   1265 */
   1266static void rb_set_list_to_head(struct list_head *list)
   1267{
   1268	unsigned long *ptr;
   1269
   1270	ptr = (unsigned long *)&list->next;
   1271	*ptr |= RB_PAGE_HEAD;
   1272	*ptr &= ~RB_PAGE_UPDATE;
   1273}
   1274
   1275/*
   1276 * rb_head_page_activate - sets up head page
   1277 */
   1278static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
   1279{
   1280	struct buffer_page *head;
   1281
   1282	head = cpu_buffer->head_page;
   1283	if (!head)
   1284		return;
   1285
   1286	/*
   1287	 * Set the previous list pointer to have the HEAD flag.
   1288	 */
   1289	rb_set_list_to_head(head->list.prev);
   1290}
   1291
   1292static void rb_list_head_clear(struct list_head *list)
   1293{
   1294	unsigned long *ptr = (unsigned long *)&list->next;
   1295
   1296	*ptr &= ~RB_FLAG_MASK;
   1297}
   1298
   1299/*
   1300 * rb_head_page_deactivate - clears head page ptr (for free list)
   1301 */
   1302static void
   1303rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
   1304{
   1305	struct list_head *hd;
   1306
   1307	/* Go through the whole list and clear any pointers found. */
   1308	rb_list_head_clear(cpu_buffer->pages);
   1309
   1310	list_for_each(hd, cpu_buffer->pages)
   1311		rb_list_head_clear(hd);
   1312}
   1313
   1314static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
   1315			    struct buffer_page *head,
   1316			    struct buffer_page *prev,
   1317			    int old_flag, int new_flag)
   1318{
   1319	struct list_head *list;
   1320	unsigned long val = (unsigned long)&head->list;
   1321	unsigned long ret;
   1322
   1323	list = &prev->list;
   1324
   1325	val &= ~RB_FLAG_MASK;
   1326
   1327	ret = cmpxchg((unsigned long *)&list->next,
   1328		      val | old_flag, val | new_flag);
   1329
   1330	/* check if the reader took the page */
   1331	if ((ret & ~RB_FLAG_MASK) != val)
   1332		return RB_PAGE_MOVED;
   1333
   1334	return ret & RB_FLAG_MASK;
   1335}
   1336
   1337static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
   1338				   struct buffer_page *head,
   1339				   struct buffer_page *prev,
   1340				   int old_flag)
   1341{
   1342	return rb_head_page_set(cpu_buffer, head, prev,
   1343				old_flag, RB_PAGE_UPDATE);
   1344}
   1345
   1346static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
   1347				 struct buffer_page *head,
   1348				 struct buffer_page *prev,
   1349				 int old_flag)
   1350{
   1351	return rb_head_page_set(cpu_buffer, head, prev,
   1352				old_flag, RB_PAGE_HEAD);
   1353}
   1354
   1355static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
   1356				   struct buffer_page *head,
   1357				   struct buffer_page *prev,
   1358				   int old_flag)
   1359{
   1360	return rb_head_page_set(cpu_buffer, head, prev,
   1361				old_flag, RB_PAGE_NORMAL);
   1362}
   1363
   1364static inline void rb_inc_page(struct buffer_page **bpage)
   1365{
   1366	struct list_head *p = rb_list_head((*bpage)->list.next);
   1367
   1368	*bpage = list_entry(p, struct buffer_page, list);
   1369}
   1370
   1371static struct buffer_page *
   1372rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
   1373{
   1374	struct buffer_page *head;
   1375	struct buffer_page *page;
   1376	struct list_head *list;
   1377	int i;
   1378
   1379	if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
   1380		return NULL;
   1381
   1382	/* sanity check */
   1383	list = cpu_buffer->pages;
   1384	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
   1385		return NULL;
   1386
   1387	page = head = cpu_buffer->head_page;
   1388	/*
   1389	 * It is possible that the writer moves the header behind
   1390	 * where we started, and we miss in one loop.
   1391	 * A second loop should grab the header, but we'll do
   1392	 * three loops just because I'm paranoid.
   1393	 */
   1394	for (i = 0; i < 3; i++) {
   1395		do {
   1396			if (rb_is_head_page(page, page->list.prev)) {
   1397				cpu_buffer->head_page = page;
   1398				return page;
   1399			}
   1400			rb_inc_page(&page);
   1401		} while (page != head);
   1402	}
   1403
   1404	RB_WARN_ON(cpu_buffer, 1);
   1405
   1406	return NULL;
   1407}
   1408
   1409static int rb_head_page_replace(struct buffer_page *old,
   1410				struct buffer_page *new)
   1411{
   1412	unsigned long *ptr = (unsigned long *)&old->list.prev->next;
   1413	unsigned long val;
   1414	unsigned long ret;
   1415
   1416	val = *ptr & ~RB_FLAG_MASK;
   1417	val |= RB_PAGE_HEAD;
   1418
   1419	ret = cmpxchg(ptr, val, (unsigned long)&new->list);
   1420
   1421	return ret == val;
   1422}
   1423
   1424/*
   1425 * rb_tail_page_update - move the tail page forward
   1426 */
   1427static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
   1428			       struct buffer_page *tail_page,
   1429			       struct buffer_page *next_page)
   1430{
   1431	unsigned long old_entries;
   1432	unsigned long old_write;
   1433
   1434	/*
   1435	 * The tail page now needs to be moved forward.
   1436	 *
   1437	 * We need to reset the tail page, but without messing
   1438	 * with possible erasing of data brought in by interrupts
   1439	 * that have moved the tail page and are currently on it.
   1440	 *
   1441	 * We add a counter to the write field to denote this.
   1442	 */
   1443	old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
   1444	old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
   1445
   1446	local_inc(&cpu_buffer->pages_touched);
   1447	/*
   1448	 * Just make sure we have seen our old_write and synchronize
   1449	 * with any interrupts that come in.
   1450	 */
   1451	barrier();
   1452
   1453	/*
   1454	 * If the tail page is still the same as what we think
   1455	 * it is, then it is up to us to update the tail
   1456	 * pointer.
   1457	 */
   1458	if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
   1459		/* Zero the write counter */
   1460		unsigned long val = old_write & ~RB_WRITE_MASK;
   1461		unsigned long eval = old_entries & ~RB_WRITE_MASK;
   1462
   1463		/*
   1464		 * This will only succeed if an interrupt did
   1465		 * not come in and change it. In which case, we
   1466		 * do not want to modify it.
   1467		 *
   1468		 * We add (void) to let the compiler know that we do not care
   1469		 * about the return value of these functions. We use the
   1470		 * cmpxchg to only update if an interrupt did not already
   1471		 * do it for us. If the cmpxchg fails, we don't care.
   1472		 */
   1473		(void)local_cmpxchg(&next_page->write, old_write, val);
   1474		(void)local_cmpxchg(&next_page->entries, old_entries, eval);
   1475
   1476		/*
   1477		 * No need to worry about races with clearing out the commit.
   1478		 * it only can increment when a commit takes place. But that
   1479		 * only happens in the outer most nested commit.
   1480		 */
   1481		local_set(&next_page->page->commit, 0);
   1482
   1483		/* Again, either we update tail_page or an interrupt does */
   1484		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
   1485	}
   1486}
   1487
   1488static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
   1489			  struct buffer_page *bpage)
   1490{
   1491	unsigned long val = (unsigned long)bpage;
   1492
   1493	if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
   1494		return 1;
   1495
   1496	return 0;
   1497}
   1498
   1499/**
   1500 * rb_check_list - make sure a pointer to a list has the last bits zero
   1501 */
   1502static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
   1503			 struct list_head *list)
   1504{
   1505	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
   1506		return 1;
   1507	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
   1508		return 1;
   1509	return 0;
   1510}
   1511
   1512/**
   1513 * rb_check_pages - integrity check of buffer pages
   1514 * @cpu_buffer: CPU buffer with pages to test
   1515 *
   1516 * As a safety measure we check to make sure the data pages have not
   1517 * been corrupted.
   1518 */
   1519static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
   1520{
   1521	struct list_head *head = cpu_buffer->pages;
   1522	struct buffer_page *bpage, *tmp;
   1523
   1524	/* Reset the head page if it exists */
   1525	if (cpu_buffer->head_page)
   1526		rb_set_head_page(cpu_buffer);
   1527
   1528	rb_head_page_deactivate(cpu_buffer);
   1529
   1530	if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
   1531		return -1;
   1532	if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
   1533		return -1;
   1534
   1535	if (rb_check_list(cpu_buffer, head))
   1536		return -1;
   1537
   1538	list_for_each_entry_safe(bpage, tmp, head, list) {
   1539		if (RB_WARN_ON(cpu_buffer,
   1540			       bpage->list.next->prev != &bpage->list))
   1541			return -1;
   1542		if (RB_WARN_ON(cpu_buffer,
   1543			       bpage->list.prev->next != &bpage->list))
   1544			return -1;
   1545		if (rb_check_list(cpu_buffer, &bpage->list))
   1546			return -1;
   1547	}
   1548
   1549	rb_head_page_activate(cpu_buffer);
   1550
   1551	return 0;
   1552}
   1553
   1554static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
   1555		long nr_pages, struct list_head *pages)
   1556{
   1557	struct buffer_page *bpage, *tmp;
   1558	bool user_thread = current->mm != NULL;
   1559	gfp_t mflags;
   1560	long i;
   1561
   1562	/*
   1563	 * Check if the available memory is there first.
   1564	 * Note, si_mem_available() only gives us a rough estimate of available
   1565	 * memory. It may not be accurate. But we don't care, we just want
   1566	 * to prevent doing any allocation when it is obvious that it is
   1567	 * not going to succeed.
   1568	 */
   1569	i = si_mem_available();
   1570	if (i < nr_pages)
   1571		return -ENOMEM;
   1572
   1573	/*
   1574	 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
   1575	 * gracefully without invoking oom-killer and the system is not
   1576	 * destabilized.
   1577	 */
   1578	mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
   1579
   1580	/*
   1581	 * If a user thread allocates too much, and si_mem_available()
   1582	 * reports there's enough memory, even though there is not.
   1583	 * Make sure the OOM killer kills this thread. This can happen
   1584	 * even with RETRY_MAYFAIL because another task may be doing
   1585	 * an allocation after this task has taken all memory.
   1586	 * This is the task the OOM killer needs to take out during this
   1587	 * loop, even if it was triggered by an allocation somewhere else.
   1588	 */
   1589	if (user_thread)
   1590		set_current_oom_origin();
   1591	for (i = 0; i < nr_pages; i++) {
   1592		struct page *page;
   1593
   1594		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
   1595				    mflags, cpu_to_node(cpu_buffer->cpu));
   1596		if (!bpage)
   1597			goto free_pages;
   1598
   1599		rb_check_bpage(cpu_buffer, bpage);
   1600
   1601		list_add(&bpage->list, pages);
   1602
   1603		page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0);
   1604		if (!page)
   1605			goto free_pages;
   1606		bpage->page = page_address(page);
   1607		rb_init_page(bpage->page);
   1608
   1609		if (user_thread && fatal_signal_pending(current))
   1610			goto free_pages;
   1611	}
   1612	if (user_thread)
   1613		clear_current_oom_origin();
   1614
   1615	return 0;
   1616
   1617free_pages:
   1618	list_for_each_entry_safe(bpage, tmp, pages, list) {
   1619		list_del_init(&bpage->list);
   1620		free_buffer_page(bpage);
   1621	}
   1622	if (user_thread)
   1623		clear_current_oom_origin();
   1624
   1625	return -ENOMEM;
   1626}
   1627
   1628static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
   1629			     unsigned long nr_pages)
   1630{
   1631	LIST_HEAD(pages);
   1632
   1633	WARN_ON(!nr_pages);
   1634
   1635	if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
   1636		return -ENOMEM;
   1637
   1638	/*
   1639	 * The ring buffer page list is a circular list that does not
   1640	 * start and end with a list head. All page list items point to
   1641	 * other pages.
   1642	 */
   1643	cpu_buffer->pages = pages.next;
   1644	list_del(&pages);
   1645
   1646	cpu_buffer->nr_pages = nr_pages;
   1647
   1648	rb_check_pages(cpu_buffer);
   1649
   1650	return 0;
   1651}
   1652
   1653static struct ring_buffer_per_cpu *
   1654rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
   1655{
   1656	struct ring_buffer_per_cpu *cpu_buffer;
   1657	struct buffer_page *bpage;
   1658	struct page *page;
   1659	int ret;
   1660
   1661	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
   1662				  GFP_KERNEL, cpu_to_node(cpu));
   1663	if (!cpu_buffer)
   1664		return NULL;
   1665
   1666	cpu_buffer->cpu = cpu;
   1667	cpu_buffer->buffer = buffer;
   1668	raw_spin_lock_init(&cpu_buffer->reader_lock);
   1669	lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
   1670	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
   1671	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
   1672	init_completion(&cpu_buffer->update_done);
   1673	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
   1674	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
   1675	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
   1676
   1677	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
   1678			    GFP_KERNEL, cpu_to_node(cpu));
   1679	if (!bpage)
   1680		goto fail_free_buffer;
   1681
   1682	rb_check_bpage(cpu_buffer, bpage);
   1683
   1684	cpu_buffer->reader_page = bpage;
   1685	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
   1686	if (!page)
   1687		goto fail_free_reader;
   1688	bpage->page = page_address(page);
   1689	rb_init_page(bpage->page);
   1690
   1691	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
   1692	INIT_LIST_HEAD(&cpu_buffer->new_pages);
   1693
   1694	ret = rb_allocate_pages(cpu_buffer, nr_pages);
   1695	if (ret < 0)
   1696		goto fail_free_reader;
   1697
   1698	cpu_buffer->head_page
   1699		= list_entry(cpu_buffer->pages, struct buffer_page, list);
   1700	cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
   1701
   1702	rb_head_page_activate(cpu_buffer);
   1703
   1704	return cpu_buffer;
   1705
   1706 fail_free_reader:
   1707	free_buffer_page(cpu_buffer->reader_page);
   1708
   1709 fail_free_buffer:
   1710	kfree(cpu_buffer);
   1711	return NULL;
   1712}
   1713
   1714static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
   1715{
   1716	struct list_head *head = cpu_buffer->pages;
   1717	struct buffer_page *bpage, *tmp;
   1718
   1719	free_buffer_page(cpu_buffer->reader_page);
   1720
   1721	rb_head_page_deactivate(cpu_buffer);
   1722
   1723	if (head) {
   1724		list_for_each_entry_safe(bpage, tmp, head, list) {
   1725			list_del_init(&bpage->list);
   1726			free_buffer_page(bpage);
   1727		}
   1728		bpage = list_entry(head, struct buffer_page, list);
   1729		free_buffer_page(bpage);
   1730	}
   1731
   1732	kfree(cpu_buffer);
   1733}
   1734
   1735/**
   1736 * __ring_buffer_alloc - allocate a new ring_buffer
   1737 * @size: the size in bytes per cpu that is needed.
   1738 * @flags: attributes to set for the ring buffer.
   1739 * @key: ring buffer reader_lock_key.
   1740 *
   1741 * Currently the only flag that is available is the RB_FL_OVERWRITE
   1742 * flag. This flag means that the buffer will overwrite old data
   1743 * when the buffer wraps. If this flag is not set, the buffer will
   1744 * drop data when the tail hits the head.
   1745 */
   1746struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
   1747					struct lock_class_key *key)
   1748{
   1749	struct trace_buffer *buffer;
   1750	long nr_pages;
   1751	int bsize;
   1752	int cpu;
   1753	int ret;
   1754
   1755	/* keep it in its own cache line */
   1756	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
   1757			 GFP_KERNEL);
   1758	if (!buffer)
   1759		return NULL;
   1760
   1761	if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
   1762		goto fail_free_buffer;
   1763
   1764	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
   1765	buffer->flags = flags;
   1766	buffer->clock = trace_clock_local;
   1767	buffer->reader_lock_key = key;
   1768
   1769	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
   1770	init_waitqueue_head(&buffer->irq_work.waiters);
   1771
   1772	/* need at least two pages */
   1773	if (nr_pages < 2)
   1774		nr_pages = 2;
   1775
   1776	buffer->cpus = nr_cpu_ids;
   1777
   1778	bsize = sizeof(void *) * nr_cpu_ids;
   1779	buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
   1780				  GFP_KERNEL);
   1781	if (!buffer->buffers)
   1782		goto fail_free_cpumask;
   1783
   1784	cpu = raw_smp_processor_id();
   1785	cpumask_set_cpu(cpu, buffer->cpumask);
   1786	buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
   1787	if (!buffer->buffers[cpu])
   1788		goto fail_free_buffers;
   1789
   1790	ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
   1791	if (ret < 0)
   1792		goto fail_free_buffers;
   1793
   1794	mutex_init(&buffer->mutex);
   1795
   1796	return buffer;
   1797
   1798 fail_free_buffers:
   1799	for_each_buffer_cpu(buffer, cpu) {
   1800		if (buffer->buffers[cpu])
   1801			rb_free_cpu_buffer(buffer->buffers[cpu]);
   1802	}
   1803	kfree(buffer->buffers);
   1804
   1805 fail_free_cpumask:
   1806	free_cpumask_var(buffer->cpumask);
   1807
   1808 fail_free_buffer:
   1809	kfree(buffer);
   1810	return NULL;
   1811}
   1812EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
   1813
   1814/**
   1815 * ring_buffer_free - free a ring buffer.
   1816 * @buffer: the buffer to free.
   1817 */
   1818void
   1819ring_buffer_free(struct trace_buffer *buffer)
   1820{
   1821	int cpu;
   1822
   1823	cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
   1824
   1825	for_each_buffer_cpu(buffer, cpu)
   1826		rb_free_cpu_buffer(buffer->buffers[cpu]);
   1827
   1828	kfree(buffer->buffers);
   1829	free_cpumask_var(buffer->cpumask);
   1830
   1831	kfree(buffer);
   1832}
   1833EXPORT_SYMBOL_GPL(ring_buffer_free);
   1834
   1835void ring_buffer_set_clock(struct trace_buffer *buffer,
   1836			   u64 (*clock)(void))
   1837{
   1838	buffer->clock = clock;
   1839}
   1840
   1841void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
   1842{
   1843	buffer->time_stamp_abs = abs;
   1844}
   1845
   1846bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
   1847{
   1848	return buffer->time_stamp_abs;
   1849}
   1850
   1851static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
   1852
   1853static inline unsigned long rb_page_entries(struct buffer_page *bpage)
   1854{
   1855	return local_read(&bpage->entries) & RB_WRITE_MASK;
   1856}
   1857
   1858static inline unsigned long rb_page_write(struct buffer_page *bpage)
   1859{
   1860	return local_read(&bpage->write) & RB_WRITE_MASK;
   1861}
   1862
   1863static int
   1864rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
   1865{
   1866	struct list_head *tail_page, *to_remove, *next_page;
   1867	struct buffer_page *to_remove_page, *tmp_iter_page;
   1868	struct buffer_page *last_page, *first_page;
   1869	unsigned long nr_removed;
   1870	unsigned long head_bit;
   1871	int page_entries;
   1872
   1873	head_bit = 0;
   1874
   1875	raw_spin_lock_irq(&cpu_buffer->reader_lock);
   1876	atomic_inc(&cpu_buffer->record_disabled);
   1877	/*
   1878	 * We don't race with the readers since we have acquired the reader
   1879	 * lock. We also don't race with writers after disabling recording.
   1880	 * This makes it easy to figure out the first and the last page to be
   1881	 * removed from the list. We unlink all the pages in between including
   1882	 * the first and last pages. This is done in a busy loop so that we
   1883	 * lose the least number of traces.
   1884	 * The pages are freed after we restart recording and unlock readers.
   1885	 */
   1886	tail_page = &cpu_buffer->tail_page->list;
   1887
   1888	/*
   1889	 * tail page might be on reader page, we remove the next page
   1890	 * from the ring buffer
   1891	 */
   1892	if (cpu_buffer->tail_page == cpu_buffer->reader_page)
   1893		tail_page = rb_list_head(tail_page->next);
   1894	to_remove = tail_page;
   1895
   1896	/* start of pages to remove */
   1897	first_page = list_entry(rb_list_head(to_remove->next),
   1898				struct buffer_page, list);
   1899
   1900	for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
   1901		to_remove = rb_list_head(to_remove)->next;
   1902		head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
   1903	}
   1904
   1905	next_page = rb_list_head(to_remove)->next;
   1906
   1907	/*
   1908	 * Now we remove all pages between tail_page and next_page.
   1909	 * Make sure that we have head_bit value preserved for the
   1910	 * next page
   1911	 */
   1912	tail_page->next = (struct list_head *)((unsigned long)next_page |
   1913						head_bit);
   1914	next_page = rb_list_head(next_page);
   1915	next_page->prev = tail_page;
   1916
   1917	/* make sure pages points to a valid page in the ring buffer */
   1918	cpu_buffer->pages = next_page;
   1919
   1920	/* update head page */
   1921	if (head_bit)
   1922		cpu_buffer->head_page = list_entry(next_page,
   1923						struct buffer_page, list);
   1924
   1925	/*
   1926	 * change read pointer to make sure any read iterators reset
   1927	 * themselves
   1928	 */
   1929	cpu_buffer->read = 0;
   1930
   1931	/* pages are removed, resume tracing and then free the pages */
   1932	atomic_dec(&cpu_buffer->record_disabled);
   1933	raw_spin_unlock_irq(&cpu_buffer->reader_lock);
   1934
   1935	RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
   1936
   1937	/* last buffer page to remove */
   1938	last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
   1939				list);
   1940	tmp_iter_page = first_page;
   1941
   1942	do {
   1943		cond_resched();
   1944
   1945		to_remove_page = tmp_iter_page;
   1946		rb_inc_page(&tmp_iter_page);
   1947
   1948		/* update the counters */
   1949		page_entries = rb_page_entries(to_remove_page);
   1950		if (page_entries) {
   1951			/*
   1952			 * If something was added to this page, it was full
   1953			 * since it is not the tail page. So we deduct the
   1954			 * bytes consumed in ring buffer from here.
   1955			 * Increment overrun to account for the lost events.
   1956			 */
   1957			local_add(page_entries, &cpu_buffer->overrun);
   1958			local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
   1959		}
   1960
   1961		/*
   1962		 * We have already removed references to this list item, just
   1963		 * free up the buffer_page and its page
   1964		 */
   1965		free_buffer_page(to_remove_page);
   1966		nr_removed--;
   1967
   1968	} while (to_remove_page != last_page);
   1969
   1970	RB_WARN_ON(cpu_buffer, nr_removed);
   1971
   1972	return nr_removed == 0;
   1973}
   1974
   1975static int
   1976rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
   1977{
   1978	struct list_head *pages = &cpu_buffer->new_pages;
   1979	int retries, success;
   1980
   1981	raw_spin_lock_irq(&cpu_buffer->reader_lock);
   1982	/*
   1983	 * We are holding the reader lock, so the reader page won't be swapped
   1984	 * in the ring buffer. Now we are racing with the writer trying to
   1985	 * move head page and the tail page.
   1986	 * We are going to adapt the reader page update process where:
   1987	 * 1. We first splice the start and end of list of new pages between
   1988	 *    the head page and its previous page.
   1989	 * 2. We cmpxchg the prev_page->next to point from head page to the
   1990	 *    start of new pages list.
   1991	 * 3. Finally, we update the head->prev to the end of new list.
   1992	 *
   1993	 * We will try this process 10 times, to make sure that we don't keep
   1994	 * spinning.
   1995	 */
   1996	retries = 10;
   1997	success = 0;
   1998	while (retries--) {
   1999		struct list_head *head_page, *prev_page, *r;
   2000		struct list_head *last_page, *first_page;
   2001		struct list_head *head_page_with_bit;
   2002
   2003		head_page = &rb_set_head_page(cpu_buffer)->list;
   2004		if (!head_page)
   2005			break;
   2006		prev_page = head_page->prev;
   2007
   2008		first_page = pages->next;
   2009		last_page  = pages->prev;
   2010
   2011		head_page_with_bit = (struct list_head *)
   2012				     ((unsigned long)head_page | RB_PAGE_HEAD);
   2013
   2014		last_page->next = head_page_with_bit;
   2015		first_page->prev = prev_page;
   2016
   2017		r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
   2018
   2019		if (r == head_page_with_bit) {
   2020			/*
   2021			 * yay, we replaced the page pointer to our new list,
   2022			 * now, we just have to update to head page's prev
   2023			 * pointer to point to end of list
   2024			 */
   2025			head_page->prev = last_page;
   2026			success = 1;
   2027			break;
   2028		}
   2029	}
   2030
   2031	if (success)
   2032		INIT_LIST_HEAD(pages);
   2033	/*
   2034	 * If we weren't successful in adding in new pages, warn and stop
   2035	 * tracing
   2036	 */
   2037	RB_WARN_ON(cpu_buffer, !success);
   2038	raw_spin_unlock_irq(&cpu_buffer->reader_lock);
   2039
   2040	/* free pages if they weren't inserted */
   2041	if (!success) {
   2042		struct buffer_page *bpage, *tmp;
   2043		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
   2044					 list) {
   2045			list_del_init(&bpage->list);
   2046			free_buffer_page(bpage);
   2047		}
   2048	}
   2049	return success;
   2050}
   2051
   2052static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
   2053{
   2054	int success;
   2055
   2056	if (cpu_buffer->nr_pages_to_update > 0)
   2057		success = rb_insert_pages(cpu_buffer);
   2058	else
   2059		success = rb_remove_pages(cpu_buffer,
   2060					-cpu_buffer->nr_pages_to_update);
   2061
   2062	if (success)
   2063		cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
   2064}
   2065
   2066static void update_pages_handler(struct work_struct *work)
   2067{
   2068	struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
   2069			struct ring_buffer_per_cpu, update_pages_work);
   2070	rb_update_pages(cpu_buffer);
   2071	complete(&cpu_buffer->update_done);
   2072}
   2073
   2074/**
   2075 * ring_buffer_resize - resize the ring buffer
   2076 * @buffer: the buffer to resize.
   2077 * @size: the new size.
   2078 * @cpu_id: the cpu buffer to resize
   2079 *
   2080 * Minimum size is 2 * BUF_PAGE_SIZE.
   2081 *
   2082 * Returns 0 on success and < 0 on failure.
   2083 */
   2084int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
   2085			int cpu_id)
   2086{
   2087	struct ring_buffer_per_cpu *cpu_buffer;
   2088	unsigned long nr_pages;
   2089	int cpu, err;
   2090
   2091	/*
   2092	 * Always succeed at resizing a non-existent buffer:
   2093	 */
   2094	if (!buffer)
   2095		return 0;
   2096
   2097	/* Make sure the requested buffer exists */
   2098	if (cpu_id != RING_BUFFER_ALL_CPUS &&
   2099	    !cpumask_test_cpu(cpu_id, buffer->cpumask))
   2100		return 0;
   2101
   2102	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
   2103
   2104	/* we need a minimum of two pages */
   2105	if (nr_pages < 2)
   2106		nr_pages = 2;
   2107
   2108	/* prevent another thread from changing buffer sizes */
   2109	mutex_lock(&buffer->mutex);
   2110
   2111
   2112	if (cpu_id == RING_BUFFER_ALL_CPUS) {
   2113		/*
   2114		 * Don't succeed if resizing is disabled, as a reader might be
   2115		 * manipulating the ring buffer and is expecting a sane state while
   2116		 * this is true.
   2117		 */
   2118		for_each_buffer_cpu(buffer, cpu) {
   2119			cpu_buffer = buffer->buffers[cpu];
   2120			if (atomic_read(&cpu_buffer->resize_disabled)) {
   2121				err = -EBUSY;
   2122				goto out_err_unlock;
   2123			}
   2124		}
   2125
   2126		/* calculate the pages to update */
   2127		for_each_buffer_cpu(buffer, cpu) {
   2128			cpu_buffer = buffer->buffers[cpu];
   2129
   2130			cpu_buffer->nr_pages_to_update = nr_pages -
   2131							cpu_buffer->nr_pages;
   2132			/*
   2133			 * nothing more to do for removing pages or no update
   2134			 */
   2135			if (cpu_buffer->nr_pages_to_update <= 0)
   2136				continue;
   2137			/*
   2138			 * to add pages, make sure all new pages can be
   2139			 * allocated without receiving ENOMEM
   2140			 */
   2141			INIT_LIST_HEAD(&cpu_buffer->new_pages);
   2142			if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
   2143						&cpu_buffer->new_pages)) {
   2144				/* not enough memory for new pages */
   2145				err = -ENOMEM;
   2146				goto out_err;
   2147			}
   2148		}
   2149
   2150		cpus_read_lock();
   2151		/*
   2152		 * Fire off all the required work handlers
   2153		 * We can't schedule on offline CPUs, but it's not necessary
   2154		 * since we can change their buffer sizes without any race.
   2155		 */
   2156		for_each_buffer_cpu(buffer, cpu) {
   2157			cpu_buffer = buffer->buffers[cpu];
   2158			if (!cpu_buffer->nr_pages_to_update)
   2159				continue;
   2160
   2161			/* Can't run something on an offline CPU. */
   2162			if (!cpu_online(cpu)) {
   2163				rb_update_pages(cpu_buffer);
   2164				cpu_buffer->nr_pages_to_update = 0;
   2165			} else {
   2166				schedule_work_on(cpu,
   2167						&cpu_buffer->update_pages_work);
   2168			}
   2169		}
   2170
   2171		/* wait for all the updates to complete */
   2172		for_each_buffer_cpu(buffer, cpu) {
   2173			cpu_buffer = buffer->buffers[cpu];
   2174			if (!cpu_buffer->nr_pages_to_update)
   2175				continue;
   2176
   2177			if (cpu_online(cpu))
   2178				wait_for_completion(&cpu_buffer->update_done);
   2179			cpu_buffer->nr_pages_to_update = 0;
   2180		}
   2181
   2182		cpus_read_unlock();
   2183	} else {
   2184		cpu_buffer = buffer->buffers[cpu_id];
   2185
   2186		if (nr_pages == cpu_buffer->nr_pages)
   2187			goto out;
   2188
   2189		/*
   2190		 * Don't succeed if resizing is disabled, as a reader might be
   2191		 * manipulating the ring buffer and is expecting a sane state while
   2192		 * this is true.
   2193		 */
   2194		if (atomic_read(&cpu_buffer->resize_disabled)) {
   2195			err = -EBUSY;
   2196			goto out_err_unlock;
   2197		}
   2198
   2199		cpu_buffer->nr_pages_to_update = nr_pages -
   2200						cpu_buffer->nr_pages;
   2201
   2202		INIT_LIST_HEAD(&cpu_buffer->new_pages);
   2203		if (cpu_buffer->nr_pages_to_update > 0 &&
   2204			__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
   2205					    &cpu_buffer->new_pages)) {
   2206			err = -ENOMEM;
   2207			goto out_err;
   2208		}
   2209
   2210		cpus_read_lock();
   2211
   2212		/* Can't run something on an offline CPU. */
   2213		if (!cpu_online(cpu_id))
   2214			rb_update_pages(cpu_buffer);
   2215		else {
   2216			schedule_work_on(cpu_id,
   2217					 &cpu_buffer->update_pages_work);
   2218			wait_for_completion(&cpu_buffer->update_done);
   2219		}
   2220
   2221		cpu_buffer->nr_pages_to_update = 0;
   2222		cpus_read_unlock();
   2223	}
   2224
   2225 out:
   2226	/*
   2227	 * The ring buffer resize can happen with the ring buffer
   2228	 * enabled, so that the update disturbs the tracing as little
   2229	 * as possible. But if the buffer is disabled, we do not need
   2230	 * to worry about that, and we can take the time to verify
   2231	 * that the buffer is not corrupt.
   2232	 */
   2233	if (atomic_read(&buffer->record_disabled)) {
   2234		atomic_inc(&buffer->record_disabled);
   2235		/*
   2236		 * Even though the buffer was disabled, we must make sure
   2237		 * that it is truly disabled before calling rb_check_pages.
   2238		 * There could have been a race between checking
   2239		 * record_disable and incrementing it.
   2240		 */
   2241		synchronize_rcu();
   2242		for_each_buffer_cpu(buffer, cpu) {
   2243			cpu_buffer = buffer->buffers[cpu];
   2244			rb_check_pages(cpu_buffer);
   2245		}
   2246		atomic_dec(&buffer->record_disabled);
   2247	}
   2248
   2249	mutex_unlock(&buffer->mutex);
   2250	return 0;
   2251
   2252 out_err:
   2253	for_each_buffer_cpu(buffer, cpu) {
   2254		struct buffer_page *bpage, *tmp;
   2255
   2256		cpu_buffer = buffer->buffers[cpu];
   2257		cpu_buffer->nr_pages_to_update = 0;
   2258
   2259		if (list_empty(&cpu_buffer->new_pages))
   2260			continue;
   2261
   2262		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
   2263					list) {
   2264			list_del_init(&bpage->list);
   2265			free_buffer_page(bpage);
   2266		}
   2267	}
   2268 out_err_unlock:
   2269	mutex_unlock(&buffer->mutex);
   2270	return err;
   2271}
   2272EXPORT_SYMBOL_GPL(ring_buffer_resize);
   2273
   2274void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
   2275{
   2276	mutex_lock(&buffer->mutex);
   2277	if (val)
   2278		buffer->flags |= RB_FL_OVERWRITE;
   2279	else
   2280		buffer->flags &= ~RB_FL_OVERWRITE;
   2281	mutex_unlock(&buffer->mutex);
   2282}
   2283EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
   2284
   2285static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
   2286{
   2287	return bpage->page->data + index;
   2288}
   2289
   2290static __always_inline struct ring_buffer_event *
   2291rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
   2292{
   2293	return __rb_page_index(cpu_buffer->reader_page,
   2294			       cpu_buffer->reader_page->read);
   2295}
   2296
   2297static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
   2298{
   2299	return local_read(&bpage->page->commit);
   2300}
   2301
   2302static struct ring_buffer_event *
   2303rb_iter_head_event(struct ring_buffer_iter *iter)
   2304{
   2305	struct ring_buffer_event *event;
   2306	struct buffer_page *iter_head_page = iter->head_page;
   2307	unsigned long commit;
   2308	unsigned length;
   2309
   2310	if (iter->head != iter->next_event)
   2311		return iter->event;
   2312
   2313	/*
   2314	 * When the writer goes across pages, it issues a cmpxchg which
   2315	 * is a mb(), which will synchronize with the rmb here.
   2316	 * (see rb_tail_page_update() and __rb_reserve_next())
   2317	 */
   2318	commit = rb_page_commit(iter_head_page);
   2319	smp_rmb();
   2320	event = __rb_page_index(iter_head_page, iter->head);
   2321	length = rb_event_length(event);
   2322
   2323	/*
   2324	 * READ_ONCE() doesn't work on functions and we don't want the
   2325	 * compiler doing any crazy optimizations with length.
   2326	 */
   2327	barrier();
   2328
   2329	if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE)
   2330		/* Writer corrupted the read? */
   2331		goto reset;
   2332
   2333	memcpy(iter->event, event, length);
   2334	/*
   2335	 * If the page stamp is still the same after this rmb() then the
   2336	 * event was safely copied without the writer entering the page.
   2337	 */
   2338	smp_rmb();
   2339
   2340	/* Make sure the page didn't change since we read this */
   2341	if (iter->page_stamp != iter_head_page->page->time_stamp ||
   2342	    commit > rb_page_commit(iter_head_page))
   2343		goto reset;
   2344
   2345	iter->next_event = iter->head + length;
   2346	return iter->event;
   2347 reset:
   2348	/* Reset to the beginning */
   2349	iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
   2350	iter->head = 0;
   2351	iter->next_event = 0;
   2352	iter->missed_events = 1;
   2353	return NULL;
   2354}
   2355
   2356/* Size is determined by what has been committed */
   2357static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
   2358{
   2359	return rb_page_commit(bpage);
   2360}
   2361
   2362static __always_inline unsigned
   2363rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
   2364{
   2365	return rb_page_commit(cpu_buffer->commit_page);
   2366}
   2367
   2368static __always_inline unsigned
   2369rb_event_index(struct ring_buffer_event *event)
   2370{
   2371	unsigned long addr = (unsigned long)event;
   2372
   2373	return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
   2374}
   2375
   2376static void rb_inc_iter(struct ring_buffer_iter *iter)
   2377{
   2378	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
   2379
   2380	/*
   2381	 * The iterator could be on the reader page (it starts there).
   2382	 * But the head could have moved, since the reader was
   2383	 * found. Check for this case and assign the iterator
   2384	 * to the head page instead of next.
   2385	 */
   2386	if (iter->head_page == cpu_buffer->reader_page)
   2387		iter->head_page = rb_set_head_page(cpu_buffer);
   2388	else
   2389		rb_inc_page(&iter->head_page);
   2390
   2391	iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
   2392	iter->head = 0;
   2393	iter->next_event = 0;
   2394}
   2395
   2396/*
   2397 * rb_handle_head_page - writer hit the head page
   2398 *
   2399 * Returns: +1 to retry page
   2400 *           0 to continue
   2401 *          -1 on error
   2402 */
   2403static int
   2404rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
   2405		    struct buffer_page *tail_page,
   2406		    struct buffer_page *next_page)
   2407{
   2408	struct buffer_page *new_head;
   2409	int entries;
   2410	int type;
   2411	int ret;
   2412
   2413	entries = rb_page_entries(next_page);
   2414
   2415	/*
   2416	 * The hard part is here. We need to move the head
   2417	 * forward, and protect against both readers on
   2418	 * other CPUs and writers coming in via interrupts.
   2419	 */
   2420	type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
   2421				       RB_PAGE_HEAD);
   2422
   2423	/*
   2424	 * type can be one of four:
   2425	 *  NORMAL - an interrupt already moved it for us
   2426	 *  HEAD   - we are the first to get here.
   2427	 *  UPDATE - we are the interrupt interrupting
   2428	 *           a current move.
   2429	 *  MOVED  - a reader on another CPU moved the next
   2430	 *           pointer to its reader page. Give up
   2431	 *           and try again.
   2432	 */
   2433
   2434	switch (type) {
   2435	case RB_PAGE_HEAD:
   2436		/*
   2437		 * We changed the head to UPDATE, thus
   2438		 * it is our responsibility to update
   2439		 * the counters.
   2440		 */
   2441		local_add(entries, &cpu_buffer->overrun);
   2442		local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
   2443
   2444		/*
   2445		 * The entries will be zeroed out when we move the
   2446		 * tail page.
   2447		 */
   2448
   2449		/* still more to do */
   2450		break;
   2451
   2452	case RB_PAGE_UPDATE:
   2453		/*
   2454		 * This is an interrupt that interrupt the
   2455		 * previous update. Still more to do.
   2456		 */
   2457		break;
   2458	case RB_PAGE_NORMAL:
   2459		/*
   2460		 * An interrupt came in before the update
   2461		 * and processed this for us.
   2462		 * Nothing left to do.
   2463		 */
   2464		return 1;
   2465	case RB_PAGE_MOVED:
   2466		/*
   2467		 * The reader is on another CPU and just did
   2468		 * a swap with our next_page.
   2469		 * Try again.
   2470		 */
   2471		return 1;
   2472	default:
   2473		RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
   2474		return -1;
   2475	}
   2476
   2477	/*
   2478	 * Now that we are here, the old head pointer is
   2479	 * set to UPDATE. This will keep the reader from
   2480	 * swapping the head page with the reader page.
   2481	 * The reader (on another CPU) will spin till
   2482	 * we are finished.
   2483	 *
   2484	 * We just need to protect against interrupts
   2485	 * doing the job. We will set the next pointer
   2486	 * to HEAD. After that, we set the old pointer
   2487	 * to NORMAL, but only if it was HEAD before.
   2488	 * otherwise we are an interrupt, and only
   2489	 * want the outer most commit to reset it.
   2490	 */
   2491	new_head = next_page;
   2492	rb_inc_page(&new_head);
   2493
   2494	ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
   2495				    RB_PAGE_NORMAL);
   2496
   2497	/*
   2498	 * Valid returns are:
   2499	 *  HEAD   - an interrupt came in and already set it.
   2500	 *  NORMAL - One of two things:
   2501	 *            1) We really set it.
   2502	 *            2) A bunch of interrupts came in and moved
   2503	 *               the page forward again.
   2504	 */
   2505	switch (ret) {
   2506	case RB_PAGE_HEAD:
   2507	case RB_PAGE_NORMAL:
   2508		/* OK */
   2509		break;
   2510	default:
   2511		RB_WARN_ON(cpu_buffer, 1);
   2512		return -1;
   2513	}
   2514
   2515	/*
   2516	 * It is possible that an interrupt came in,
   2517	 * set the head up, then more interrupts came in
   2518	 * and moved it again. When we get back here,
   2519	 * the page would have been set to NORMAL but we
   2520	 * just set it back to HEAD.
   2521	 *
   2522	 * How do you detect this? Well, if that happened
   2523	 * the tail page would have moved.
   2524	 */
   2525	if (ret == RB_PAGE_NORMAL) {
   2526		struct buffer_page *buffer_tail_page;
   2527
   2528		buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
   2529		/*
   2530		 * If the tail had moved passed next, then we need
   2531		 * to reset the pointer.
   2532		 */
   2533		if (buffer_tail_page != tail_page &&
   2534		    buffer_tail_page != next_page)
   2535			rb_head_page_set_normal(cpu_buffer, new_head,
   2536						next_page,
   2537						RB_PAGE_HEAD);
   2538	}
   2539
   2540	/*
   2541	 * If this was the outer most commit (the one that
   2542	 * changed the original pointer from HEAD to UPDATE),
   2543	 * then it is up to us to reset it to NORMAL.
   2544	 */
   2545	if (type == RB_PAGE_HEAD) {
   2546		ret = rb_head_page_set_normal(cpu_buffer, next_page,
   2547					      tail_page,
   2548					      RB_PAGE_UPDATE);
   2549		if (RB_WARN_ON(cpu_buffer,
   2550			       ret != RB_PAGE_UPDATE))
   2551			return -1;
   2552	}
   2553
   2554	return 0;
   2555}
   2556
   2557static inline void
   2558rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
   2559	      unsigned long tail, struct rb_event_info *info)
   2560{
   2561	struct buffer_page *tail_page = info->tail_page;
   2562	struct ring_buffer_event *event;
   2563	unsigned long length = info->length;
   2564
   2565	/*
   2566	 * Only the event that crossed the page boundary
   2567	 * must fill the old tail_page with padding.
   2568	 */
   2569	if (tail >= BUF_PAGE_SIZE) {
   2570		/*
   2571		 * If the page was filled, then we still need
   2572		 * to update the real_end. Reset it to zero
   2573		 * and the reader will ignore it.
   2574		 */
   2575		if (tail == BUF_PAGE_SIZE)
   2576			tail_page->real_end = 0;
   2577
   2578		local_sub(length, &tail_page->write);
   2579		return;
   2580	}
   2581
   2582	event = __rb_page_index(tail_page, tail);
   2583
   2584	/* account for padding bytes */
   2585	local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
   2586
   2587	/*
   2588	 * Save the original length to the meta data.
   2589	 * This will be used by the reader to add lost event
   2590	 * counter.
   2591	 */
   2592	tail_page->real_end = tail;
   2593
   2594	/*
   2595	 * If this event is bigger than the minimum size, then
   2596	 * we need to be careful that we don't subtract the
   2597	 * write counter enough to allow another writer to slip
   2598	 * in on this page.
   2599	 * We put in a discarded commit instead, to make sure
   2600	 * that this space is not used again.
   2601	 *
   2602	 * If we are less than the minimum size, we don't need to
   2603	 * worry about it.
   2604	 */
   2605	if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
   2606		/* No room for any events */
   2607
   2608		/* Mark the rest of the page with padding */
   2609		rb_event_set_padding(event);
   2610
   2611		/* Set the write back to the previous setting */
   2612		local_sub(length, &tail_page->write);
   2613		return;
   2614	}
   2615
   2616	/* Put in a discarded event */
   2617	event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
   2618	event->type_len = RINGBUF_TYPE_PADDING;
   2619	/* time delta must be non zero */
   2620	event->time_delta = 1;
   2621
   2622	/* Set write to end of buffer */
   2623	length = (tail + length) - BUF_PAGE_SIZE;
   2624	local_sub(length, &tail_page->write);
   2625}
   2626
   2627static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
   2628
   2629/*
   2630 * This is the slow path, force gcc not to inline it.
   2631 */
   2632static noinline struct ring_buffer_event *
   2633rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
   2634	     unsigned long tail, struct rb_event_info *info)
   2635{
   2636	struct buffer_page *tail_page = info->tail_page;
   2637	struct buffer_page *commit_page = cpu_buffer->commit_page;
   2638	struct trace_buffer *buffer = cpu_buffer->buffer;
   2639	struct buffer_page *next_page;
   2640	int ret;
   2641
   2642	next_page = tail_page;
   2643
   2644	rb_inc_page(&next_page);
   2645
   2646	/*
   2647	 * If for some reason, we had an interrupt storm that made
   2648	 * it all the way around the buffer, bail, and warn
   2649	 * about it.
   2650	 */
   2651	if (unlikely(next_page == commit_page)) {
   2652		local_inc(&cpu_buffer->commit_overrun);
   2653		goto out_reset;
   2654	}
   2655
   2656	/*
   2657	 * This is where the fun begins!
   2658	 *
   2659	 * We are fighting against races between a reader that
   2660	 * could be on another CPU trying to swap its reader
   2661	 * page with the buffer head.
   2662	 *
   2663	 * We are also fighting against interrupts coming in and
   2664	 * moving the head or tail on us as well.
   2665	 *
   2666	 * If the next page is the head page then we have filled
   2667	 * the buffer, unless the commit page is still on the
   2668	 * reader page.
   2669	 */
   2670	if (rb_is_head_page(next_page, &tail_page->list)) {
   2671
   2672		/*
   2673		 * If the commit is not on the reader page, then
   2674		 * move the header page.
   2675		 */
   2676		if (!rb_is_reader_page(cpu_buffer->commit_page)) {
   2677			/*
   2678			 * If we are not in overwrite mode,
   2679			 * this is easy, just stop here.
   2680			 */
   2681			if (!(buffer->flags & RB_FL_OVERWRITE)) {
   2682				local_inc(&cpu_buffer->dropped_events);
   2683				goto out_reset;
   2684			}
   2685
   2686			ret = rb_handle_head_page(cpu_buffer,
   2687						  tail_page,
   2688						  next_page);
   2689			if (ret < 0)
   2690				goto out_reset;
   2691			if (ret)
   2692				goto out_again;
   2693		} else {
   2694			/*
   2695			 * We need to be careful here too. The
   2696			 * commit page could still be on the reader
   2697			 * page. We could have a small buffer, and
   2698			 * have filled up the buffer with events
   2699			 * from interrupts and such, and wrapped.
   2700			 *
   2701			 * Note, if the tail page is also on the
   2702			 * reader_page, we let it move out.
   2703			 */
   2704			if (unlikely((cpu_buffer->commit_page !=
   2705				      cpu_buffer->tail_page) &&
   2706				     (cpu_buffer->commit_page ==
   2707				      cpu_buffer->reader_page))) {
   2708				local_inc(&cpu_buffer->commit_overrun);
   2709				goto out_reset;
   2710			}
   2711		}
   2712	}
   2713
   2714	rb_tail_page_update(cpu_buffer, tail_page, next_page);
   2715
   2716 out_again:
   2717
   2718	rb_reset_tail(cpu_buffer, tail, info);
   2719
   2720	/* Commit what we have for now. */
   2721	rb_end_commit(cpu_buffer);
   2722	/* rb_end_commit() decs committing */
   2723	local_inc(&cpu_buffer->committing);
   2724
   2725	/* fail and let the caller try again */
   2726	return ERR_PTR(-EAGAIN);
   2727
   2728 out_reset:
   2729	/* reset write */
   2730	rb_reset_tail(cpu_buffer, tail, info);
   2731
   2732	return NULL;
   2733}
   2734
   2735/* Slow path */
   2736static struct ring_buffer_event *
   2737rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
   2738{
   2739	if (abs)
   2740		event->type_len = RINGBUF_TYPE_TIME_STAMP;
   2741	else
   2742		event->type_len = RINGBUF_TYPE_TIME_EXTEND;
   2743
   2744	/* Not the first event on the page, or not delta? */
   2745	if (abs || rb_event_index(event)) {
   2746		event->time_delta = delta & TS_MASK;
   2747		event->array[0] = delta >> TS_SHIFT;
   2748	} else {
   2749		/* nope, just zero it */
   2750		event->time_delta = 0;
   2751		event->array[0] = 0;
   2752	}
   2753
   2754	return skip_time_extend(event);
   2755}
   2756
   2757#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
   2758static inline bool sched_clock_stable(void)
   2759{
   2760	return true;
   2761}
   2762#endif
   2763
   2764static void
   2765rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
   2766		   struct rb_event_info *info)
   2767{
   2768	u64 write_stamp;
   2769
   2770	WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
   2771		  (unsigned long long)info->delta,
   2772		  (unsigned long long)info->ts,
   2773		  (unsigned long long)info->before,
   2774		  (unsigned long long)info->after,
   2775		  (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
   2776		  sched_clock_stable() ? "" :
   2777		  "If you just came from a suspend/resume,\n"
   2778		  "please switch to the trace global clock:\n"
   2779		  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
   2780		  "or add trace_clock=global to the kernel command line\n");
   2781}
   2782
   2783static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
   2784				      struct ring_buffer_event **event,
   2785				      struct rb_event_info *info,
   2786				      u64 *delta,
   2787				      unsigned int *length)
   2788{
   2789	bool abs = info->add_timestamp &
   2790		(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
   2791
   2792	if (unlikely(info->delta > (1ULL << 59))) {
   2793		/*
   2794		 * Some timers can use more than 59 bits, and when a timestamp
   2795		 * is added to the buffer, it will lose those bits.
   2796		 */
   2797		if (abs && (info->ts & TS_MSB)) {
   2798			info->delta &= ABS_TS_MASK;
   2799
   2800		/* did the clock go backwards */
   2801		} else if (info->before == info->after && info->before > info->ts) {
   2802			/* not interrupted */
   2803			static int once;
   2804
   2805			/*
   2806			 * This is possible with a recalibrating of the TSC.
   2807			 * Do not produce a call stack, but just report it.
   2808			 */
   2809			if (!once) {
   2810				once++;
   2811				pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
   2812					info->before, info->ts);
   2813			}
   2814		} else
   2815			rb_check_timestamp(cpu_buffer, info);
   2816		if (!abs)
   2817			info->delta = 0;
   2818	}
   2819	*event = rb_add_time_stamp(*event, info->delta, abs);
   2820	*length -= RB_LEN_TIME_EXTEND;
   2821	*delta = 0;
   2822}
   2823
   2824/**
   2825 * rb_update_event - update event type and data
   2826 * @cpu_buffer: The per cpu buffer of the @event
   2827 * @event: the event to update
   2828 * @info: The info to update the @event with (contains length and delta)
   2829 *
   2830 * Update the type and data fields of the @event. The length
   2831 * is the actual size that is written to the ring buffer,
   2832 * and with this, we can determine what to place into the
   2833 * data field.
   2834 */
   2835static void
   2836rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
   2837		struct ring_buffer_event *event,
   2838		struct rb_event_info *info)
   2839{
   2840	unsigned length = info->length;
   2841	u64 delta = info->delta;
   2842	unsigned int nest = local_read(&cpu_buffer->committing) - 1;
   2843
   2844	if (!WARN_ON_ONCE(nest >= MAX_NEST))
   2845		cpu_buffer->event_stamp[nest] = info->ts;
   2846
   2847	/*
   2848	 * If we need to add a timestamp, then we
   2849	 * add it to the start of the reserved space.
   2850	 */
   2851	if (unlikely(info->add_timestamp))
   2852		rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
   2853
   2854	event->time_delta = delta;
   2855	length -= RB_EVNT_HDR_SIZE;
   2856	if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
   2857		event->type_len = 0;
   2858		event->array[0] = length;
   2859	} else
   2860		event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
   2861}
   2862
   2863static unsigned rb_calculate_event_length(unsigned length)
   2864{
   2865	struct ring_buffer_event event; /* Used only for sizeof array */
   2866
   2867	/* zero length can cause confusions */
   2868	if (!length)
   2869		length++;
   2870
   2871	if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
   2872		length += sizeof(event.array[0]);
   2873
   2874	length += RB_EVNT_HDR_SIZE;
   2875	length = ALIGN(length, RB_ARCH_ALIGNMENT);
   2876
   2877	/*
   2878	 * In case the time delta is larger than the 27 bits for it
   2879	 * in the header, we need to add a timestamp. If another
   2880	 * event comes in when trying to discard this one to increase
   2881	 * the length, then the timestamp will be added in the allocated
   2882	 * space of this event. If length is bigger than the size needed
   2883	 * for the TIME_EXTEND, then padding has to be used. The events
   2884	 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
   2885	 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
   2886	 * As length is a multiple of 4, we only need to worry if it
   2887	 * is 12 (RB_LEN_TIME_EXTEND + 4).
   2888	 */
   2889	if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
   2890		length += RB_ALIGNMENT;
   2891
   2892	return length;
   2893}
   2894
   2895static u64 rb_time_delta(struct ring_buffer_event *event)
   2896{
   2897	switch (event->type_len) {
   2898	case RINGBUF_TYPE_PADDING:
   2899		return 0;
   2900
   2901	case RINGBUF_TYPE_TIME_EXTEND:
   2902		return rb_event_time_stamp(event);
   2903
   2904	case RINGBUF_TYPE_TIME_STAMP:
   2905		return 0;
   2906
   2907	case RINGBUF_TYPE_DATA:
   2908		return event->time_delta;
   2909	default:
   2910		return 0;
   2911	}
   2912}
   2913
   2914static inline int
   2915rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
   2916		  struct ring_buffer_event *event)
   2917{
   2918	unsigned long new_index, old_index;
   2919	struct buffer_page *bpage;
   2920	unsigned long index;
   2921	unsigned long addr;
   2922	u64 write_stamp;
   2923	u64 delta;
   2924
   2925	new_index = rb_event_index(event);
   2926	old_index = new_index + rb_event_ts_length(event);
   2927	addr = (unsigned long)event;
   2928	addr &= PAGE_MASK;
   2929
   2930	bpage = READ_ONCE(cpu_buffer->tail_page);
   2931
   2932	delta = rb_time_delta(event);
   2933
   2934	if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
   2935		return 0;
   2936
   2937	/* Make sure the write stamp is read before testing the location */
   2938	barrier();
   2939
   2940	if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
   2941		unsigned long write_mask =
   2942			local_read(&bpage->write) & ~RB_WRITE_MASK;
   2943		unsigned long event_length = rb_event_length(event);
   2944
   2945		/* Something came in, can't discard */
   2946		if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
   2947				       write_stamp, write_stamp - delta))
   2948			return 0;
   2949
   2950		/*
   2951		 * It's possible that the event time delta is zero
   2952		 * (has the same time stamp as the previous event)
   2953		 * in which case write_stamp and before_stamp could
   2954		 * be the same. In such a case, force before_stamp
   2955		 * to be different than write_stamp. It doesn't
   2956		 * matter what it is, as long as its different.
   2957		 */
   2958		if (!delta)
   2959			rb_time_set(&cpu_buffer->before_stamp, 0);
   2960
   2961		/*
   2962		 * If an event were to come in now, it would see that the
   2963		 * write_stamp and the before_stamp are different, and assume
   2964		 * that this event just added itself before updating
   2965		 * the write stamp. The interrupting event will fix the
   2966		 * write stamp for us, and use the before stamp as its delta.
   2967		 */
   2968
   2969		/*
   2970		 * This is on the tail page. It is possible that
   2971		 * a write could come in and move the tail page
   2972		 * and write to the next page. That is fine
   2973		 * because we just shorten what is on this page.
   2974		 */
   2975		old_index += write_mask;
   2976		new_index += write_mask;
   2977		index = local_cmpxchg(&bpage->write, old_index, new_index);
   2978		if (index == old_index) {
   2979			/* update counters */
   2980			local_sub(event_length, &cpu_buffer->entries_bytes);
   2981			return 1;
   2982		}
   2983	}
   2984
   2985	/* could not discard */
   2986	return 0;
   2987}
   2988
   2989static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
   2990{
   2991	local_inc(&cpu_buffer->committing);
   2992	local_inc(&cpu_buffer->commits);
   2993}
   2994
   2995static __always_inline void
   2996rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
   2997{
   2998	unsigned long max_count;
   2999
   3000	/*
   3001	 * We only race with interrupts and NMIs on this CPU.
   3002	 * If we own the commit event, then we can commit
   3003	 * all others that interrupted us, since the interruptions
   3004	 * are in stack format (they finish before they come
   3005	 * back to us). This allows us to do a simple loop to
   3006	 * assign the commit to the tail.
   3007	 */
   3008 again:
   3009	max_count = cpu_buffer->nr_pages * 100;
   3010
   3011	while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
   3012		if (RB_WARN_ON(cpu_buffer, !(--max_count)))
   3013			return;
   3014		if (RB_WARN_ON(cpu_buffer,
   3015			       rb_is_reader_page(cpu_buffer->tail_page)))
   3016			return;
   3017		local_set(&cpu_buffer->commit_page->page->commit,
   3018			  rb_page_write(cpu_buffer->commit_page));
   3019		rb_inc_page(&cpu_buffer->commit_page);
   3020		/* add barrier to keep gcc from optimizing too much */
   3021		barrier();
   3022	}
   3023	while (rb_commit_index(cpu_buffer) !=
   3024	       rb_page_write(cpu_buffer->commit_page)) {
   3025
   3026		local_set(&cpu_buffer->commit_page->page->commit,
   3027			  rb_page_write(cpu_buffer->commit_page));
   3028		RB_WARN_ON(cpu_buffer,
   3029			   local_read(&cpu_buffer->commit_page->page->commit) &
   3030			   ~RB_WRITE_MASK);
   3031		barrier();
   3032	}
   3033
   3034	/* again, keep gcc from optimizing */
   3035	barrier();
   3036
   3037	/*
   3038	 * If an interrupt came in just after the first while loop
   3039	 * and pushed the tail page forward, we will be left with
   3040	 * a dangling commit that will never go forward.
   3041	 */
   3042	if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
   3043		goto again;
   3044}
   3045
   3046static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
   3047{
   3048	unsigned long commits;
   3049
   3050	if (RB_WARN_ON(cpu_buffer,
   3051		       !local_read(&cpu_buffer->committing)))
   3052		return;
   3053
   3054 again:
   3055	commits = local_read(&cpu_buffer->commits);
   3056	/* synchronize with interrupts */
   3057	barrier();
   3058	if (local_read(&cpu_buffer->committing) == 1)
   3059		rb_set_commit_to_write(cpu_buffer);
   3060
   3061	local_dec(&cpu_buffer->committing);
   3062
   3063	/* synchronize with interrupts */
   3064	barrier();
   3065
   3066	/*
   3067	 * Need to account for interrupts coming in between the
   3068	 * updating of the commit page and the clearing of the
   3069	 * committing counter.
   3070	 */
   3071	if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
   3072	    !local_read(&cpu_buffer->committing)) {
   3073		local_inc(&cpu_buffer->committing);
   3074		goto again;
   3075	}
   3076}
   3077
   3078static inline void rb_event_discard(struct ring_buffer_event *event)
   3079{
   3080	if (extended_time(event))
   3081		event = skip_time_extend(event);
   3082
   3083	/* array[0] holds the actual length for the discarded event */
   3084	event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
   3085	event->type_len = RINGBUF_TYPE_PADDING;
   3086	/* time delta must be non zero */
   3087	if (!event->time_delta)
   3088		event->time_delta = 1;
   3089}
   3090
   3091static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
   3092		      struct ring_buffer_event *event)
   3093{
   3094	local_inc(&cpu_buffer->entries);
   3095	rb_end_commit(cpu_buffer);
   3096}
   3097
   3098static __always_inline void
   3099rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
   3100{
   3101	size_t nr_pages;
   3102	size_t dirty;
   3103	size_t full;
   3104
   3105	if (buffer->irq_work.waiters_pending) {
   3106		buffer->irq_work.waiters_pending = false;
   3107		/* irq_work_queue() supplies it's own memory barriers */
   3108		irq_work_queue(&buffer->irq_work.work);
   3109	}
   3110
   3111	if (cpu_buffer->irq_work.waiters_pending) {
   3112		cpu_buffer->irq_work.waiters_pending = false;
   3113		/* irq_work_queue() supplies it's own memory barriers */
   3114		irq_work_queue(&cpu_buffer->irq_work.work);
   3115	}
   3116
   3117	if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
   3118		return;
   3119
   3120	if (cpu_buffer->reader_page == cpu_buffer->commit_page)
   3121		return;
   3122
   3123	if (!cpu_buffer->irq_work.full_waiters_pending)
   3124		return;
   3125
   3126	cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
   3127
   3128	full = cpu_buffer->shortest_full;
   3129	nr_pages = cpu_buffer->nr_pages;
   3130	dirty = ring_buffer_nr_dirty_pages(buffer, cpu_buffer->cpu);
   3131	if (full && nr_pages && (dirty * 100) <= full * nr_pages)
   3132		return;
   3133
   3134	cpu_buffer->irq_work.wakeup_full = true;
   3135	cpu_buffer->irq_work.full_waiters_pending = false;
   3136	/* irq_work_queue() supplies it's own memory barriers */
   3137	irq_work_queue(&cpu_buffer->irq_work.work);
   3138}
   3139
   3140#ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
   3141# define do_ring_buffer_record_recursion()	\
   3142	do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
   3143#else
   3144# define do_ring_buffer_record_recursion() do { } while (0)
   3145#endif
   3146
   3147/*
   3148 * The lock and unlock are done within a preempt disable section.
   3149 * The current_context per_cpu variable can only be modified
   3150 * by the current task between lock and unlock. But it can
   3151 * be modified more than once via an interrupt. To pass this
   3152 * information from the lock to the unlock without having to
   3153 * access the 'in_interrupt()' functions again (which do show
   3154 * a bit of overhead in something as critical as function tracing,
   3155 * we use a bitmask trick.
   3156 *
   3157 *  bit 1 =  NMI context
   3158 *  bit 2 =  IRQ context
   3159 *  bit 3 =  SoftIRQ context
   3160 *  bit 4 =  normal context.
   3161 *
   3162 * This works because this is the order of contexts that can
   3163 * preempt other contexts. A SoftIRQ never preempts an IRQ
   3164 * context.
   3165 *
   3166 * When the context is determined, the corresponding bit is
   3167 * checked and set (if it was set, then a recursion of that context
   3168 * happened).
   3169 *
   3170 * On unlock, we need to clear this bit. To do so, just subtract
   3171 * 1 from the current_context and AND it to itself.
   3172 *
   3173 * (binary)
   3174 *  101 - 1 = 100
   3175 *  101 & 100 = 100 (clearing bit zero)
   3176 *
   3177 *  1010 - 1 = 1001
   3178 *  1010 & 1001 = 1000 (clearing bit 1)
   3179 *
   3180 * The least significant bit can be cleared this way, and it
   3181 * just so happens that it is the same bit corresponding to
   3182 * the current context.
   3183 *
   3184 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
   3185 * is set when a recursion is detected at the current context, and if
   3186 * the TRANSITION bit is already set, it will fail the recursion.
   3187 * This is needed because there's a lag between the changing of
   3188 * interrupt context and updating the preempt count. In this case,
   3189 * a false positive will be found. To handle this, one extra recursion
   3190 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
   3191 * bit is already set, then it is considered a recursion and the function
   3192 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
   3193 *
   3194 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
   3195 * to be cleared. Even if it wasn't the context that set it. That is,
   3196 * if an interrupt comes in while NORMAL bit is set and the ring buffer
   3197 * is called before preempt_count() is updated, since the check will
   3198 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
   3199 * NMI then comes in, it will set the NMI bit, but when the NMI code
   3200 * does the trace_recursive_unlock() it will clear the TRANSITION bit
   3201 * and leave the NMI bit set. But this is fine, because the interrupt
   3202 * code that set the TRANSITION bit will then clear the NMI bit when it
   3203 * calls trace_recursive_unlock(). If another NMI comes in, it will
   3204 * set the TRANSITION bit and continue.
   3205 *
   3206 * Note: The TRANSITION bit only handles a single transition between context.
   3207 */
   3208
   3209static __always_inline int
   3210trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
   3211{
   3212	unsigned int val = cpu_buffer->current_context;
   3213	int bit = interrupt_context_level();
   3214
   3215	bit = RB_CTX_NORMAL - bit;
   3216
   3217	if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
   3218		/*
   3219		 * It is possible that this was called by transitioning
   3220		 * between interrupt context, and preempt_count() has not
   3221		 * been updated yet. In this case, use the TRANSITION bit.
   3222		 */
   3223		bit = RB_CTX_TRANSITION;
   3224		if (val & (1 << (bit + cpu_buffer->nest))) {
   3225			do_ring_buffer_record_recursion();
   3226			return 1;
   3227		}
   3228	}
   3229
   3230	val |= (1 << (bit + cpu_buffer->nest));
   3231	cpu_buffer->current_context = val;
   3232
   3233	return 0;
   3234}
   3235
   3236static __always_inline void
   3237trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
   3238{
   3239	cpu_buffer->current_context &=
   3240		cpu_buffer->current_context - (1 << cpu_buffer->nest);
   3241}
   3242
   3243/* The recursive locking above uses 5 bits */
   3244#define NESTED_BITS 5
   3245
   3246/**
   3247 * ring_buffer_nest_start - Allow to trace while nested
   3248 * @buffer: The ring buffer to modify
   3249 *
   3250 * The ring buffer has a safety mechanism to prevent recursion.
   3251 * But there may be a case where a trace needs to be done while
   3252 * tracing something else. In this case, calling this function
   3253 * will allow this function to nest within a currently active
   3254 * ring_buffer_lock_reserve().
   3255 *
   3256 * Call this function before calling another ring_buffer_lock_reserve() and
   3257 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
   3258 */
   3259void ring_buffer_nest_start(struct trace_buffer *buffer)
   3260{
   3261	struct ring_buffer_per_cpu *cpu_buffer;
   3262	int cpu;
   3263
   3264	/* Enabled by ring_buffer_nest_end() */
   3265	preempt_disable_notrace();
   3266	cpu = raw_smp_processor_id();
   3267	cpu_buffer = buffer->buffers[cpu];
   3268	/* This is the shift value for the above recursive locking */
   3269	cpu_buffer->nest += NESTED_BITS;
   3270}
   3271
   3272/**
   3273 * ring_buffer_nest_end - Allow to trace while nested
   3274 * @buffer: The ring buffer to modify
   3275 *
   3276 * Must be called after ring_buffer_nest_start() and after the
   3277 * ring_buffer_unlock_commit().
   3278 */
   3279void ring_buffer_nest_end(struct trace_buffer *buffer)
   3280{
   3281	struct ring_buffer_per_cpu *cpu_buffer;
   3282	int cpu;
   3283
   3284	/* disabled by ring_buffer_nest_start() */
   3285	cpu = raw_smp_processor_id();
   3286	cpu_buffer = buffer->buffers[cpu];
   3287	/* This is the shift value for the above recursive locking */
   3288	cpu_buffer->nest -= NESTED_BITS;
   3289	preempt_enable_notrace();
   3290}
   3291
   3292/**
   3293 * ring_buffer_unlock_commit - commit a reserved
   3294 * @buffer: The buffer to commit to
   3295 * @event: The event pointer to commit.
   3296 *
   3297 * This commits the data to the ring buffer, and releases any locks held.
   3298 *
   3299 * Must be paired with ring_buffer_lock_reserve.
   3300 */
   3301int ring_buffer_unlock_commit(struct trace_buffer *buffer,
   3302			      struct ring_buffer_event *event)
   3303{
   3304	struct ring_buffer_per_cpu *cpu_buffer;
   3305	int cpu = raw_smp_processor_id();
   3306
   3307	cpu_buffer = buffer->buffers[cpu];
   3308
   3309	rb_commit(cpu_buffer, event);
   3310
   3311	rb_wakeups(buffer, cpu_buffer);
   3312
   3313	trace_recursive_unlock(cpu_buffer);
   3314
   3315	preempt_enable_notrace();
   3316
   3317	return 0;
   3318}
   3319EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
   3320
   3321/* Special value to validate all deltas on a page. */
   3322#define CHECK_FULL_PAGE		1L
   3323
   3324#ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
   3325static void dump_buffer_page(struct buffer_data_page *bpage,
   3326			     struct rb_event_info *info,
   3327			     unsigned long tail)
   3328{
   3329	struct ring_buffer_event *event;
   3330	u64 ts, delta;
   3331	int e;
   3332
   3333	ts = bpage->time_stamp;
   3334	pr_warn("  [%lld] PAGE TIME STAMP\n", ts);
   3335
   3336	for (e = 0; e < tail; e += rb_event_length(event)) {
   3337
   3338		event = (struct ring_buffer_event *)(bpage->data + e);
   3339
   3340		switch (event->type_len) {
   3341
   3342		case RINGBUF_TYPE_TIME_EXTEND:
   3343			delta = rb_event_time_stamp(event);
   3344			ts += delta;
   3345			pr_warn("  [%lld] delta:%lld TIME EXTEND\n", ts, delta);
   3346			break;
   3347
   3348		case RINGBUF_TYPE_TIME_STAMP:
   3349			delta = rb_event_time_stamp(event);
   3350			ts = rb_fix_abs_ts(delta, ts);
   3351			pr_warn("  [%lld] absolute:%lld TIME STAMP\n", ts, delta);
   3352			break;
   3353
   3354		case RINGBUF_TYPE_PADDING:
   3355			ts += event->time_delta;
   3356			pr_warn("  [%lld] delta:%d PADDING\n", ts, event->time_delta);
   3357			break;
   3358
   3359		case RINGBUF_TYPE_DATA:
   3360			ts += event->time_delta;
   3361			pr_warn("  [%lld] delta:%d\n", ts, event->time_delta);
   3362			break;
   3363
   3364		default:
   3365			break;
   3366		}
   3367	}
   3368}
   3369
   3370static DEFINE_PER_CPU(atomic_t, checking);
   3371static atomic_t ts_dump;
   3372
   3373/*
   3374 * Check if the current event time stamp matches the deltas on
   3375 * the buffer page.
   3376 */
   3377static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
   3378			 struct rb_event_info *info,
   3379			 unsigned long tail)
   3380{
   3381	struct ring_buffer_event *event;
   3382	struct buffer_data_page *bpage;
   3383	u64 ts, delta;
   3384	bool full = false;
   3385	int e;
   3386
   3387	bpage = info->tail_page->page;
   3388
   3389	if (tail == CHECK_FULL_PAGE) {
   3390		full = true;
   3391		tail = local_read(&bpage->commit);
   3392	} else if (info->add_timestamp &
   3393		   (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
   3394		/* Ignore events with absolute time stamps */
   3395		return;
   3396	}
   3397
   3398	/*
   3399	 * Do not check the first event (skip possible extends too).
   3400	 * Also do not check if previous events have not been committed.
   3401	 */
   3402	if (tail <= 8 || tail > local_read(&bpage->commit))
   3403		return;
   3404
   3405	/*
   3406	 * If this interrupted another event, 
   3407	 */
   3408	if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
   3409		goto out;
   3410
   3411	ts = bpage->time_stamp;
   3412
   3413	for (e = 0; e < tail; e += rb_event_length(event)) {
   3414
   3415		event = (struct ring_buffer_event *)(bpage->data + e);
   3416
   3417		switch (event->type_len) {
   3418
   3419		case RINGBUF_TYPE_TIME_EXTEND:
   3420			delta = rb_event_time_stamp(event);
   3421			ts += delta;
   3422			break;
   3423
   3424		case RINGBUF_TYPE_TIME_STAMP:
   3425			delta = rb_event_time_stamp(event);
   3426			ts = rb_fix_abs_ts(delta, ts);
   3427			break;
   3428
   3429		case RINGBUF_TYPE_PADDING:
   3430			if (event->time_delta == 1)
   3431				break;
   3432			fallthrough;
   3433		case RINGBUF_TYPE_DATA:
   3434			ts += event->time_delta;
   3435			break;
   3436
   3437		default:
   3438			RB_WARN_ON(cpu_buffer, 1);
   3439		}
   3440	}
   3441	if ((full && ts > info->ts) ||
   3442	    (!full && ts + info->delta != info->ts)) {
   3443		/* If another report is happening, ignore this one */
   3444		if (atomic_inc_return(&ts_dump) != 1) {
   3445			atomic_dec(&ts_dump);
   3446			goto out;
   3447		}
   3448		atomic_inc(&cpu_buffer->record_disabled);
   3449		/* There's some cases in boot up that this can happen */
   3450		WARN_ON_ONCE(system_state != SYSTEM_BOOTING);
   3451		pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n",
   3452			cpu_buffer->cpu,
   3453			ts + info->delta, info->ts, info->delta,
   3454			info->before, info->after,
   3455			full ? " (full)" : "");
   3456		dump_buffer_page(bpage, info, tail);
   3457		atomic_dec(&ts_dump);
   3458		/* Do not re-enable checking */
   3459		return;
   3460	}
   3461out:
   3462	atomic_dec(this_cpu_ptr(&checking));
   3463}
   3464#else
   3465static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
   3466			 struct rb_event_info *info,
   3467			 unsigned long tail)
   3468{
   3469}
   3470#endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
   3471
   3472static struct ring_buffer_event *
   3473__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
   3474		  struct rb_event_info *info)
   3475{
   3476	struct ring_buffer_event *event;
   3477	struct buffer_page *tail_page;
   3478	unsigned long tail, write, w;
   3479	bool a_ok;
   3480	bool b_ok;
   3481
   3482	/* Don't let the compiler play games with cpu_buffer->tail_page */
   3483	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
   3484
   3485 /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
   3486	barrier();
   3487	b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
   3488	a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
   3489	barrier();
   3490	info->ts = rb_time_stamp(cpu_buffer->buffer);
   3491
   3492	if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
   3493		info->delta = info->ts;
   3494	} else {
   3495		/*
   3496		 * If interrupting an event time update, we may need an
   3497		 * absolute timestamp.
   3498		 * Don't bother if this is the start of a new page (w == 0).
   3499		 */
   3500		if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) {
   3501			info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
   3502			info->length += RB_LEN_TIME_EXTEND;
   3503		} else {
   3504			info->delta = info->ts - info->after;
   3505			if (unlikely(test_time_stamp(info->delta))) {
   3506				info->add_timestamp |= RB_ADD_STAMP_EXTEND;
   3507				info->length += RB_LEN_TIME_EXTEND;
   3508			}
   3509		}
   3510	}
   3511
   3512 /*B*/	rb_time_set(&cpu_buffer->before_stamp, info->ts);
   3513
   3514 /*C*/	write = local_add_return(info->length, &tail_page->write);
   3515
   3516	/* set write to only the index of the write */
   3517	write &= RB_WRITE_MASK;
   3518
   3519	tail = write - info->length;
   3520
   3521	/* See if we shot pass the end of this buffer page */
   3522	if (unlikely(write > BUF_PAGE_SIZE)) {
   3523		/* before and after may now different, fix it up*/
   3524		b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
   3525		a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
   3526		if (a_ok && b_ok && info->before != info->after)
   3527			(void)rb_time_cmpxchg(&cpu_buffer->before_stamp,
   3528					      info->before, info->after);
   3529		if (a_ok && b_ok)
   3530			check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
   3531		return rb_move_tail(cpu_buffer, tail, info);
   3532	}
   3533
   3534	if (likely(tail == w)) {
   3535		u64 save_before;
   3536		bool s_ok;
   3537
   3538		/* Nothing interrupted us between A and C */
   3539 /*D*/		rb_time_set(&cpu_buffer->write_stamp, info->ts);
   3540		barrier();
   3541 /*E*/		s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
   3542		RB_WARN_ON(cpu_buffer, !s_ok);
   3543		if (likely(!(info->add_timestamp &
   3544			     (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
   3545			/* This did not interrupt any time update */
   3546			info->delta = info->ts - info->after;
   3547		else
   3548			/* Just use full timestamp for interrupting event */
   3549			info->delta = info->ts;
   3550		barrier();
   3551		check_buffer(cpu_buffer, info, tail);
   3552		if (unlikely(info->ts != save_before)) {
   3553			/* SLOW PATH - Interrupted between C and E */
   3554
   3555			a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
   3556			RB_WARN_ON(cpu_buffer, !a_ok);
   3557
   3558			/* Write stamp must only go forward */
   3559			if (save_before > info->after) {
   3560				/*
   3561				 * We do not care about the result, only that
   3562				 * it gets updated atomically.
   3563				 */
   3564				(void)rb_time_cmpxchg(&cpu_buffer->write_stamp,
   3565						      info->after, save_before);
   3566			}
   3567		}
   3568	} else {
   3569		u64 ts;
   3570		/* SLOW PATH - Interrupted between A and C */
   3571		a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
   3572		/* Was interrupted before here, write_stamp must be valid */
   3573		RB_WARN_ON(cpu_buffer, !a_ok);
   3574		ts = rb_time_stamp(cpu_buffer->buffer);
   3575		barrier();
   3576 /*E*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
   3577		    info->after < ts &&
   3578		    rb_time_cmpxchg(&cpu_buffer->write_stamp,
   3579				    info->after, ts)) {
   3580			/* Nothing came after this event between C and E */
   3581			info->delta = ts - info->after;
   3582		} else {
   3583			/*
   3584			 * Interrupted between C and E:
   3585			 * Lost the previous events time stamp. Just set the
   3586			 * delta to zero, and this will be the same time as
   3587			 * the event this event interrupted. And the events that
   3588			 * came after this will still be correct (as they would
   3589			 * have built their delta on the previous event.
   3590			 */
   3591			info->delta = 0;
   3592		}
   3593		info->ts = ts;
   3594		info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
   3595	}
   3596
   3597	/*
   3598	 * If this is the first commit on the page, then it has the same
   3599	 * timestamp as the page itself.
   3600	 */
   3601	if (unlikely(!tail && !(info->add_timestamp &
   3602				(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
   3603		info->delta = 0;
   3604
   3605	/* We reserved something on the buffer */
   3606
   3607	event = __rb_page_index(tail_page, tail);
   3608	rb_update_event(cpu_buffer, event, info);
   3609
   3610	local_inc(&tail_page->entries);
   3611
   3612	/*
   3613	 * If this is the first commit on the page, then update
   3614	 * its timestamp.
   3615	 */
   3616	if (unlikely(!tail))
   3617		tail_page->page->time_stamp = info->ts;
   3618
   3619	/* account for these added bytes */
   3620	local_add(info->length, &cpu_buffer->entries_bytes);
   3621
   3622	return event;
   3623}
   3624
   3625static __always_inline struct ring_buffer_event *
   3626rb_reserve_next_event(struct trace_buffer *buffer,
   3627		      struct ring_buffer_per_cpu *cpu_buffer,
   3628		      unsigned long length)
   3629{
   3630	struct ring_buffer_event *event;
   3631	struct rb_event_info info;
   3632	int nr_loops = 0;
   3633	int add_ts_default;
   3634
   3635	rb_start_commit(cpu_buffer);
   3636	/* The commit page can not change after this */
   3637
   3638#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
   3639	/*
   3640	 * Due to the ability to swap a cpu buffer from a buffer
   3641	 * it is possible it was swapped before we committed.
   3642	 * (committing stops a swap). We check for it here and
   3643	 * if it happened, we have to fail the write.
   3644	 */
   3645	barrier();
   3646	if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
   3647		local_dec(&cpu_buffer->committing);
   3648		local_dec(&cpu_buffer->commits);
   3649		return NULL;
   3650	}
   3651#endif
   3652
   3653	info.length = rb_calculate_event_length(length);
   3654
   3655	if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
   3656		add_ts_default = RB_ADD_STAMP_ABSOLUTE;
   3657		info.length += RB_LEN_TIME_EXTEND;
   3658	} else {
   3659		add_ts_default = RB_ADD_STAMP_NONE;
   3660	}
   3661
   3662 again:
   3663	info.add_timestamp = add_ts_default;
   3664	info.delta = 0;
   3665
   3666	/*
   3667	 * We allow for interrupts to reenter here and do a trace.
   3668	 * If one does, it will cause this original code to loop
   3669	 * back here. Even with heavy interrupts happening, this
   3670	 * should only happen a few times in a row. If this happens
   3671	 * 1000 times in a row, there must be either an interrupt
   3672	 * storm or we have something buggy.
   3673	 * Bail!
   3674	 */
   3675	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
   3676		goto out_fail;
   3677
   3678	event = __rb_reserve_next(cpu_buffer, &info);
   3679
   3680	if (unlikely(PTR_ERR(event) == -EAGAIN)) {
   3681		if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
   3682			info.length -= RB_LEN_TIME_EXTEND;
   3683		goto again;
   3684	}
   3685
   3686	if (likely(event))
   3687		return event;
   3688 out_fail:
   3689	rb_end_commit(cpu_buffer);
   3690	return NULL;
   3691}
   3692
   3693/**
   3694 * ring_buffer_lock_reserve - reserve a part of the buffer
   3695 * @buffer: the ring buffer to reserve from
   3696 * @length: the length of the data to reserve (excluding event header)
   3697 *
   3698 * Returns a reserved event on the ring buffer to copy directly to.
   3699 * The user of this interface will need to get the body to write into
   3700 * and can use the ring_buffer_event_data() interface.
   3701 *
   3702 * The length is the length of the data needed, not the event length
   3703 * which also includes the event header.
   3704 *
   3705 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
   3706 * If NULL is returned, then nothing has been allocated or locked.
   3707 */
   3708struct ring_buffer_event *
   3709ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
   3710{
   3711	struct ring_buffer_per_cpu *cpu_buffer;
   3712	struct ring_buffer_event *event;
   3713	int cpu;
   3714
   3715	/* If we are tracing schedule, we don't want to recurse */
   3716	preempt_disable_notrace();
   3717
   3718	if (unlikely(atomic_read(&buffer->record_disabled)))
   3719		goto out;
   3720
   3721	cpu = raw_smp_processor_id();
   3722
   3723	if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
   3724		goto out;
   3725
   3726	cpu_buffer = buffer->buffers[cpu];
   3727
   3728	if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
   3729		goto out;
   3730
   3731	if (unlikely(length > BUF_MAX_DATA_SIZE))
   3732		goto out;
   3733
   3734	if (unlikely(trace_recursive_lock(cpu_buffer)))
   3735		goto out;
   3736
   3737	event = rb_reserve_next_event(buffer, cpu_buffer, length);
   3738	if (!event)
   3739		goto out_unlock;
   3740
   3741	return event;
   3742
   3743 out_unlock:
   3744	trace_recursive_unlock(cpu_buffer);
   3745 out:
   3746	preempt_enable_notrace();
   3747	return NULL;
   3748}
   3749EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
   3750
   3751/*
   3752 * Decrement the entries to the page that an event is on.
   3753 * The event does not even need to exist, only the pointer
   3754 * to the page it is on. This may only be called before the commit
   3755 * takes place.
   3756 */
   3757static inline void
   3758rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
   3759		   struct ring_buffer_event *event)
   3760{
   3761	unsigned long addr = (unsigned long)event;
   3762	struct buffer_page *bpage = cpu_buffer->commit_page;
   3763	struct buffer_page *start;
   3764
   3765	addr &= PAGE_MASK;
   3766
   3767	/* Do the likely case first */
   3768	if (likely(bpage->page == (void *)addr)) {
   3769		local_dec(&bpage->entries);
   3770		return;
   3771	}
   3772
   3773	/*
   3774	 * Because the commit page may be on the reader page we
   3775	 * start with the next page and check the end loop there.
   3776	 */
   3777	rb_inc_page(&bpage);
   3778	start = bpage;
   3779	do {
   3780		if (bpage->page == (void *)addr) {
   3781			local_dec(&bpage->entries);
   3782			return;
   3783		}
   3784		rb_inc_page(&bpage);
   3785	} while (bpage != start);
   3786
   3787	/* commit not part of this buffer?? */
   3788	RB_WARN_ON(cpu_buffer, 1);
   3789}
   3790
   3791/**
   3792 * ring_buffer_discard_commit - discard an event that has not been committed
   3793 * @buffer: the ring buffer
   3794 * @event: non committed event to discard
   3795 *
   3796 * Sometimes an event that is in the ring buffer needs to be ignored.
   3797 * This function lets the user discard an event in the ring buffer
   3798 * and then that event will not be read later.
   3799 *
   3800 * This function only works if it is called before the item has been
   3801 * committed. It will try to free the event from the ring buffer
   3802 * if another event has not been added behind it.
   3803 *
   3804 * If another event has been added behind it, it will set the event
   3805 * up as discarded, and perform the commit.
   3806 *
   3807 * If this function is called, do not call ring_buffer_unlock_commit on
   3808 * the event.
   3809 */
   3810void ring_buffer_discard_commit(struct trace_buffer *buffer,
   3811				struct ring_buffer_event *event)
   3812{
   3813	struct ring_buffer_per_cpu *cpu_buffer;
   3814	int cpu;
   3815
   3816	/* The event is discarded regardless */
   3817	rb_event_discard(event);
   3818
   3819	cpu = smp_processor_id();
   3820	cpu_buffer = buffer->buffers[cpu];
   3821
   3822	/*
   3823	 * This must only be called if the event has not been
   3824	 * committed yet. Thus we can assume that preemption
   3825	 * is still disabled.
   3826	 */
   3827	RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
   3828
   3829	rb_decrement_entry(cpu_buffer, event);
   3830	if (rb_try_to_discard(cpu_buffer, event))
   3831		goto out;
   3832
   3833 out:
   3834	rb_end_commit(cpu_buffer);
   3835
   3836	trace_recursive_unlock(cpu_buffer);
   3837
   3838	preempt_enable_notrace();
   3839
   3840}
   3841EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
   3842
   3843/**
   3844 * ring_buffer_write - write data to the buffer without reserving
   3845 * @buffer: The ring buffer to write to.
   3846 * @length: The length of the data being written (excluding the event header)
   3847 * @data: The data to write to the buffer.
   3848 *
   3849 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
   3850 * one function. If you already have the data to write to the buffer, it
   3851 * may be easier to simply call this function.
   3852 *
   3853 * Note, like ring_buffer_lock_reserve, the length is the length of the data
   3854 * and not the length of the event which would hold the header.
   3855 */
   3856int ring_buffer_write(struct trace_buffer *buffer,
   3857		      unsigned long length,
   3858		      void *data)
   3859{
   3860	struct ring_buffer_per_cpu *cpu_buffer;
   3861	struct ring_buffer_event *event;
   3862	void *body;
   3863	int ret = -EBUSY;
   3864	int cpu;
   3865
   3866	preempt_disable_notrace();
   3867
   3868	if (atomic_read(&buffer->record_disabled))
   3869		goto out;
   3870
   3871	cpu = raw_smp_processor_id();
   3872
   3873	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   3874		goto out;
   3875
   3876	cpu_buffer = buffer->buffers[cpu];
   3877
   3878	if (atomic_read(&cpu_buffer->record_disabled))
   3879		goto out;
   3880
   3881	if (length > BUF_MAX_DATA_SIZE)
   3882		goto out;
   3883
   3884	if (unlikely(trace_recursive_lock(cpu_buffer)))
   3885		goto out;
   3886
   3887	event = rb_reserve_next_event(buffer, cpu_buffer, length);
   3888	if (!event)
   3889		goto out_unlock;
   3890
   3891	body = rb_event_data(event);
   3892
   3893	memcpy(body, data, length);
   3894
   3895	rb_commit(cpu_buffer, event);
   3896
   3897	rb_wakeups(buffer, cpu_buffer);
   3898
   3899	ret = 0;
   3900
   3901 out_unlock:
   3902	trace_recursive_unlock(cpu_buffer);
   3903
   3904 out:
   3905	preempt_enable_notrace();
   3906
   3907	return ret;
   3908}
   3909EXPORT_SYMBOL_GPL(ring_buffer_write);
   3910
   3911static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
   3912{
   3913	struct buffer_page *reader = cpu_buffer->reader_page;
   3914	struct buffer_page *head = rb_set_head_page(cpu_buffer);
   3915	struct buffer_page *commit = cpu_buffer->commit_page;
   3916
   3917	/* In case of error, head will be NULL */
   3918	if (unlikely(!head))
   3919		return true;
   3920
   3921	/* Reader should exhaust content in reader page */
   3922	if (reader->read != rb_page_commit(reader))
   3923		return false;
   3924
   3925	/*
   3926	 * If writers are committing on the reader page, knowing all
   3927	 * committed content has been read, the ring buffer is empty.
   3928	 */
   3929	if (commit == reader)
   3930		return true;
   3931
   3932	/*
   3933	 * If writers are committing on a page other than reader page
   3934	 * and head page, there should always be content to read.
   3935	 */
   3936	if (commit != head)
   3937		return false;
   3938
   3939	/*
   3940	 * Writers are committing on the head page, we just need
   3941	 * to care about there're committed data, and the reader will
   3942	 * swap reader page with head page when it is to read data.
   3943	 */
   3944	return rb_page_commit(commit) == 0;
   3945}
   3946
   3947/**
   3948 * ring_buffer_record_disable - stop all writes into the buffer
   3949 * @buffer: The ring buffer to stop writes to.
   3950 *
   3951 * This prevents all writes to the buffer. Any attempt to write
   3952 * to the buffer after this will fail and return NULL.
   3953 *
   3954 * The caller should call synchronize_rcu() after this.
   3955 */
   3956void ring_buffer_record_disable(struct trace_buffer *buffer)
   3957{
   3958	atomic_inc(&buffer->record_disabled);
   3959}
   3960EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
   3961
   3962/**
   3963 * ring_buffer_record_enable - enable writes to the buffer
   3964 * @buffer: The ring buffer to enable writes
   3965 *
   3966 * Note, multiple disables will need the same number of enables
   3967 * to truly enable the writing (much like preempt_disable).
   3968 */
   3969void ring_buffer_record_enable(struct trace_buffer *buffer)
   3970{
   3971	atomic_dec(&buffer->record_disabled);
   3972}
   3973EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
   3974
   3975/**
   3976 * ring_buffer_record_off - stop all writes into the buffer
   3977 * @buffer: The ring buffer to stop writes to.
   3978 *
   3979 * This prevents all writes to the buffer. Any attempt to write
   3980 * to the buffer after this will fail and return NULL.
   3981 *
   3982 * This is different than ring_buffer_record_disable() as
   3983 * it works like an on/off switch, where as the disable() version
   3984 * must be paired with a enable().
   3985 */
   3986void ring_buffer_record_off(struct trace_buffer *buffer)
   3987{
   3988	unsigned int rd;
   3989	unsigned int new_rd;
   3990
   3991	do {
   3992		rd = atomic_read(&buffer->record_disabled);
   3993		new_rd = rd | RB_BUFFER_OFF;
   3994	} while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
   3995}
   3996EXPORT_SYMBOL_GPL(ring_buffer_record_off);
   3997
   3998/**
   3999 * ring_buffer_record_on - restart writes into the buffer
   4000 * @buffer: The ring buffer to start writes to.
   4001 *
   4002 * This enables all writes to the buffer that was disabled by
   4003 * ring_buffer_record_off().
   4004 *
   4005 * This is different than ring_buffer_record_enable() as
   4006 * it works like an on/off switch, where as the enable() version
   4007 * must be paired with a disable().
   4008 */
   4009void ring_buffer_record_on(struct trace_buffer *buffer)
   4010{
   4011	unsigned int rd;
   4012	unsigned int new_rd;
   4013
   4014	do {
   4015		rd = atomic_read(&buffer->record_disabled);
   4016		new_rd = rd & ~RB_BUFFER_OFF;
   4017	} while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
   4018}
   4019EXPORT_SYMBOL_GPL(ring_buffer_record_on);
   4020
   4021/**
   4022 * ring_buffer_record_is_on - return true if the ring buffer can write
   4023 * @buffer: The ring buffer to see if write is enabled
   4024 *
   4025 * Returns true if the ring buffer is in a state that it accepts writes.
   4026 */
   4027bool ring_buffer_record_is_on(struct trace_buffer *buffer)
   4028{
   4029	return !atomic_read(&buffer->record_disabled);
   4030}
   4031
   4032/**
   4033 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
   4034 * @buffer: The ring buffer to see if write is set enabled
   4035 *
   4036 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
   4037 * Note that this does NOT mean it is in a writable state.
   4038 *
   4039 * It may return true when the ring buffer has been disabled by
   4040 * ring_buffer_record_disable(), as that is a temporary disabling of
   4041 * the ring buffer.
   4042 */
   4043bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
   4044{
   4045	return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
   4046}
   4047
   4048/**
   4049 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
   4050 * @buffer: The ring buffer to stop writes to.
   4051 * @cpu: The CPU buffer to stop
   4052 *
   4053 * This prevents all writes to the buffer. Any attempt to write
   4054 * to the buffer after this will fail and return NULL.
   4055 *
   4056 * The caller should call synchronize_rcu() after this.
   4057 */
   4058void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
   4059{
   4060	struct ring_buffer_per_cpu *cpu_buffer;
   4061
   4062	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4063		return;
   4064
   4065	cpu_buffer = buffer->buffers[cpu];
   4066	atomic_inc(&cpu_buffer->record_disabled);
   4067}
   4068EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
   4069
   4070/**
   4071 * ring_buffer_record_enable_cpu - enable writes to the buffer
   4072 * @buffer: The ring buffer to enable writes
   4073 * @cpu: The CPU to enable.
   4074 *
   4075 * Note, multiple disables will need the same number of enables
   4076 * to truly enable the writing (much like preempt_disable).
   4077 */
   4078void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
   4079{
   4080	struct ring_buffer_per_cpu *cpu_buffer;
   4081
   4082	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4083		return;
   4084
   4085	cpu_buffer = buffer->buffers[cpu];
   4086	atomic_dec(&cpu_buffer->record_disabled);
   4087}
   4088EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
   4089
   4090/*
   4091 * The total entries in the ring buffer is the running counter
   4092 * of entries entered into the ring buffer, minus the sum of
   4093 * the entries read from the ring buffer and the number of
   4094 * entries that were overwritten.
   4095 */
   4096static inline unsigned long
   4097rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
   4098{
   4099	return local_read(&cpu_buffer->entries) -
   4100		(local_read(&cpu_buffer->overrun) + cpu_buffer->read);
   4101}
   4102
   4103/**
   4104 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
   4105 * @buffer: The ring buffer
   4106 * @cpu: The per CPU buffer to read from.
   4107 */
   4108u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
   4109{
   4110	unsigned long flags;
   4111	struct ring_buffer_per_cpu *cpu_buffer;
   4112	struct buffer_page *bpage;
   4113	u64 ret = 0;
   4114
   4115	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4116		return 0;
   4117
   4118	cpu_buffer = buffer->buffers[cpu];
   4119	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   4120	/*
   4121	 * if the tail is on reader_page, oldest time stamp is on the reader
   4122	 * page
   4123	 */
   4124	if (cpu_buffer->tail_page == cpu_buffer->reader_page)
   4125		bpage = cpu_buffer->reader_page;
   4126	else
   4127		bpage = rb_set_head_page(cpu_buffer);
   4128	if (bpage)
   4129		ret = bpage->page->time_stamp;
   4130	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   4131
   4132	return ret;
   4133}
   4134EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
   4135
   4136/**
   4137 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
   4138 * @buffer: The ring buffer
   4139 * @cpu: The per CPU buffer to read from.
   4140 */
   4141unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
   4142{
   4143	struct ring_buffer_per_cpu *cpu_buffer;
   4144	unsigned long ret;
   4145
   4146	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4147		return 0;
   4148
   4149	cpu_buffer = buffer->buffers[cpu];
   4150	ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
   4151
   4152	return ret;
   4153}
   4154EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
   4155
   4156/**
   4157 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
   4158 * @buffer: The ring buffer
   4159 * @cpu: The per CPU buffer to get the entries from.
   4160 */
   4161unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
   4162{
   4163	struct ring_buffer_per_cpu *cpu_buffer;
   4164
   4165	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4166		return 0;
   4167
   4168	cpu_buffer = buffer->buffers[cpu];
   4169
   4170	return rb_num_of_entries(cpu_buffer);
   4171}
   4172EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
   4173
   4174/**
   4175 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
   4176 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
   4177 * @buffer: The ring buffer
   4178 * @cpu: The per CPU buffer to get the number of overruns from
   4179 */
   4180unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
   4181{
   4182	struct ring_buffer_per_cpu *cpu_buffer;
   4183	unsigned long ret;
   4184
   4185	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4186		return 0;
   4187
   4188	cpu_buffer = buffer->buffers[cpu];
   4189	ret = local_read(&cpu_buffer->overrun);
   4190
   4191	return ret;
   4192}
   4193EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
   4194
   4195/**
   4196 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
   4197 * commits failing due to the buffer wrapping around while there are uncommitted
   4198 * events, such as during an interrupt storm.
   4199 * @buffer: The ring buffer
   4200 * @cpu: The per CPU buffer to get the number of overruns from
   4201 */
   4202unsigned long
   4203ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
   4204{
   4205	struct ring_buffer_per_cpu *cpu_buffer;
   4206	unsigned long ret;
   4207
   4208	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4209		return 0;
   4210
   4211	cpu_buffer = buffer->buffers[cpu];
   4212	ret = local_read(&cpu_buffer->commit_overrun);
   4213
   4214	return ret;
   4215}
   4216EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
   4217
   4218/**
   4219 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
   4220 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
   4221 * @buffer: The ring buffer
   4222 * @cpu: The per CPU buffer to get the number of overruns from
   4223 */
   4224unsigned long
   4225ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
   4226{
   4227	struct ring_buffer_per_cpu *cpu_buffer;
   4228	unsigned long ret;
   4229
   4230	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4231		return 0;
   4232
   4233	cpu_buffer = buffer->buffers[cpu];
   4234	ret = local_read(&cpu_buffer->dropped_events);
   4235
   4236	return ret;
   4237}
   4238EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
   4239
   4240/**
   4241 * ring_buffer_read_events_cpu - get the number of events successfully read
   4242 * @buffer: The ring buffer
   4243 * @cpu: The per CPU buffer to get the number of events read
   4244 */
   4245unsigned long
   4246ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
   4247{
   4248	struct ring_buffer_per_cpu *cpu_buffer;
   4249
   4250	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4251		return 0;
   4252
   4253	cpu_buffer = buffer->buffers[cpu];
   4254	return cpu_buffer->read;
   4255}
   4256EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
   4257
   4258/**
   4259 * ring_buffer_entries - get the number of entries in a buffer
   4260 * @buffer: The ring buffer
   4261 *
   4262 * Returns the total number of entries in the ring buffer
   4263 * (all CPU entries)
   4264 */
   4265unsigned long ring_buffer_entries(struct trace_buffer *buffer)
   4266{
   4267	struct ring_buffer_per_cpu *cpu_buffer;
   4268	unsigned long entries = 0;
   4269	int cpu;
   4270
   4271	/* if you care about this being correct, lock the buffer */
   4272	for_each_buffer_cpu(buffer, cpu) {
   4273		cpu_buffer = buffer->buffers[cpu];
   4274		entries += rb_num_of_entries(cpu_buffer);
   4275	}
   4276
   4277	return entries;
   4278}
   4279EXPORT_SYMBOL_GPL(ring_buffer_entries);
   4280
   4281/**
   4282 * ring_buffer_overruns - get the number of overruns in buffer
   4283 * @buffer: The ring buffer
   4284 *
   4285 * Returns the total number of overruns in the ring buffer
   4286 * (all CPU entries)
   4287 */
   4288unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
   4289{
   4290	struct ring_buffer_per_cpu *cpu_buffer;
   4291	unsigned long overruns = 0;
   4292	int cpu;
   4293
   4294	/* if you care about this being correct, lock the buffer */
   4295	for_each_buffer_cpu(buffer, cpu) {
   4296		cpu_buffer = buffer->buffers[cpu];
   4297		overruns += local_read(&cpu_buffer->overrun);
   4298	}
   4299
   4300	return overruns;
   4301}
   4302EXPORT_SYMBOL_GPL(ring_buffer_overruns);
   4303
   4304static void rb_iter_reset(struct ring_buffer_iter *iter)
   4305{
   4306	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
   4307
   4308	/* Iterator usage is expected to have record disabled */
   4309	iter->head_page = cpu_buffer->reader_page;
   4310	iter->head = cpu_buffer->reader_page->read;
   4311	iter->next_event = iter->head;
   4312
   4313	iter->cache_reader_page = iter->head_page;
   4314	iter->cache_read = cpu_buffer->read;
   4315
   4316	if (iter->head) {
   4317		iter->read_stamp = cpu_buffer->read_stamp;
   4318		iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
   4319	} else {
   4320		iter->read_stamp = iter->head_page->page->time_stamp;
   4321		iter->page_stamp = iter->read_stamp;
   4322	}
   4323}
   4324
   4325/**
   4326 * ring_buffer_iter_reset - reset an iterator
   4327 * @iter: The iterator to reset
   4328 *
   4329 * Resets the iterator, so that it will start from the beginning
   4330 * again.
   4331 */
   4332void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
   4333{
   4334	struct ring_buffer_per_cpu *cpu_buffer;
   4335	unsigned long flags;
   4336
   4337	if (!iter)
   4338		return;
   4339
   4340	cpu_buffer = iter->cpu_buffer;
   4341
   4342	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   4343	rb_iter_reset(iter);
   4344	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   4345}
   4346EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
   4347
   4348/**
   4349 * ring_buffer_iter_empty - check if an iterator has no more to read
   4350 * @iter: The iterator to check
   4351 */
   4352int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
   4353{
   4354	struct ring_buffer_per_cpu *cpu_buffer;
   4355	struct buffer_page *reader;
   4356	struct buffer_page *head_page;
   4357	struct buffer_page *commit_page;
   4358	struct buffer_page *curr_commit_page;
   4359	unsigned commit;
   4360	u64 curr_commit_ts;
   4361	u64 commit_ts;
   4362
   4363	cpu_buffer = iter->cpu_buffer;
   4364	reader = cpu_buffer->reader_page;
   4365	head_page = cpu_buffer->head_page;
   4366	commit_page = cpu_buffer->commit_page;
   4367	commit_ts = commit_page->page->time_stamp;
   4368
   4369	/*
   4370	 * When the writer goes across pages, it issues a cmpxchg which
   4371	 * is a mb(), which will synchronize with the rmb here.
   4372	 * (see rb_tail_page_update())
   4373	 */
   4374	smp_rmb();
   4375	commit = rb_page_commit(commit_page);
   4376	/* We want to make sure that the commit page doesn't change */
   4377	smp_rmb();
   4378
   4379	/* Make sure commit page didn't change */
   4380	curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
   4381	curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
   4382
   4383	/* If the commit page changed, then there's more data */
   4384	if (curr_commit_page != commit_page ||
   4385	    curr_commit_ts != commit_ts)
   4386		return 0;
   4387
   4388	/* Still racy, as it may return a false positive, but that's OK */
   4389	return ((iter->head_page == commit_page && iter->head >= commit) ||
   4390		(iter->head_page == reader && commit_page == head_page &&
   4391		 head_page->read == commit &&
   4392		 iter->head == rb_page_commit(cpu_buffer->reader_page)));
   4393}
   4394EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
   4395
   4396static void
   4397rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
   4398		     struct ring_buffer_event *event)
   4399{
   4400	u64 delta;
   4401
   4402	switch (event->type_len) {
   4403	case RINGBUF_TYPE_PADDING:
   4404		return;
   4405
   4406	case RINGBUF_TYPE_TIME_EXTEND:
   4407		delta = rb_event_time_stamp(event);
   4408		cpu_buffer->read_stamp += delta;
   4409		return;
   4410
   4411	case RINGBUF_TYPE_TIME_STAMP:
   4412		delta = rb_event_time_stamp(event);
   4413		delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp);
   4414		cpu_buffer->read_stamp = delta;
   4415		return;
   4416
   4417	case RINGBUF_TYPE_DATA:
   4418		cpu_buffer->read_stamp += event->time_delta;
   4419		return;
   4420
   4421	default:
   4422		RB_WARN_ON(cpu_buffer, 1);
   4423	}
   4424	return;
   4425}
   4426
   4427static void
   4428rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
   4429			  struct ring_buffer_event *event)
   4430{
   4431	u64 delta;
   4432
   4433	switch (event->type_len) {
   4434	case RINGBUF_TYPE_PADDING:
   4435		return;
   4436
   4437	case RINGBUF_TYPE_TIME_EXTEND:
   4438		delta = rb_event_time_stamp(event);
   4439		iter->read_stamp += delta;
   4440		return;
   4441
   4442	case RINGBUF_TYPE_TIME_STAMP:
   4443		delta = rb_event_time_stamp(event);
   4444		delta = rb_fix_abs_ts(delta, iter->read_stamp);
   4445		iter->read_stamp = delta;
   4446		return;
   4447
   4448	case RINGBUF_TYPE_DATA:
   4449		iter->read_stamp += event->time_delta;
   4450		return;
   4451
   4452	default:
   4453		RB_WARN_ON(iter->cpu_buffer, 1);
   4454	}
   4455	return;
   4456}
   4457
   4458static struct buffer_page *
   4459rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
   4460{
   4461	struct buffer_page *reader = NULL;
   4462	unsigned long overwrite;
   4463	unsigned long flags;
   4464	int nr_loops = 0;
   4465	int ret;
   4466
   4467	local_irq_save(flags);
   4468	arch_spin_lock(&cpu_buffer->lock);
   4469
   4470 again:
   4471	/*
   4472	 * This should normally only loop twice. But because the
   4473	 * start of the reader inserts an empty page, it causes
   4474	 * a case where we will loop three times. There should be no
   4475	 * reason to loop four times (that I know of).
   4476	 */
   4477	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
   4478		reader = NULL;
   4479		goto out;
   4480	}
   4481
   4482	reader = cpu_buffer->reader_page;
   4483
   4484	/* If there's more to read, return this page */
   4485	if (cpu_buffer->reader_page->read < rb_page_size(reader))
   4486		goto out;
   4487
   4488	/* Never should we have an index greater than the size */
   4489	if (RB_WARN_ON(cpu_buffer,
   4490		       cpu_buffer->reader_page->read > rb_page_size(reader)))
   4491		goto out;
   4492
   4493	/* check if we caught up to the tail */
   4494	reader = NULL;
   4495	if (cpu_buffer->commit_page == cpu_buffer->reader_page)
   4496		goto out;
   4497
   4498	/* Don't bother swapping if the ring buffer is empty */
   4499	if (rb_num_of_entries(cpu_buffer) == 0)
   4500		goto out;
   4501
   4502	/*
   4503	 * Reset the reader page to size zero.
   4504	 */
   4505	local_set(&cpu_buffer->reader_page->write, 0);
   4506	local_set(&cpu_buffer->reader_page->entries, 0);
   4507	local_set(&cpu_buffer->reader_page->page->commit, 0);
   4508	cpu_buffer->reader_page->real_end = 0;
   4509
   4510 spin:
   4511	/*
   4512	 * Splice the empty reader page into the list around the head.
   4513	 */
   4514	reader = rb_set_head_page(cpu_buffer);
   4515	if (!reader)
   4516		goto out;
   4517	cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
   4518	cpu_buffer->reader_page->list.prev = reader->list.prev;
   4519
   4520	/*
   4521	 * cpu_buffer->pages just needs to point to the buffer, it
   4522	 *  has no specific buffer page to point to. Lets move it out
   4523	 *  of our way so we don't accidentally swap it.
   4524	 */
   4525	cpu_buffer->pages = reader->list.prev;
   4526
   4527	/* The reader page will be pointing to the new head */
   4528	rb_set_list_to_head(&cpu_buffer->reader_page->list);
   4529
   4530	/*
   4531	 * We want to make sure we read the overruns after we set up our
   4532	 * pointers to the next object. The writer side does a
   4533	 * cmpxchg to cross pages which acts as the mb on the writer
   4534	 * side. Note, the reader will constantly fail the swap
   4535	 * while the writer is updating the pointers, so this
   4536	 * guarantees that the overwrite recorded here is the one we
   4537	 * want to compare with the last_overrun.
   4538	 */
   4539	smp_mb();
   4540	overwrite = local_read(&(cpu_buffer->overrun));
   4541
   4542	/*
   4543	 * Here's the tricky part.
   4544	 *
   4545	 * We need to move the pointer past the header page.
   4546	 * But we can only do that if a writer is not currently
   4547	 * moving it. The page before the header page has the
   4548	 * flag bit '1' set if it is pointing to the page we want.
   4549	 * but if the writer is in the process of moving it
   4550	 * than it will be '2' or already moved '0'.
   4551	 */
   4552
   4553	ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
   4554
   4555	/*
   4556	 * If we did not convert it, then we must try again.
   4557	 */
   4558	if (!ret)
   4559		goto spin;
   4560
   4561	/*
   4562	 * Yay! We succeeded in replacing the page.
   4563	 *
   4564	 * Now make the new head point back to the reader page.
   4565	 */
   4566	rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
   4567	rb_inc_page(&cpu_buffer->head_page);
   4568
   4569	local_inc(&cpu_buffer->pages_read);
   4570
   4571	/* Finally update the reader page to the new head */
   4572	cpu_buffer->reader_page = reader;
   4573	cpu_buffer->reader_page->read = 0;
   4574
   4575	if (overwrite != cpu_buffer->last_overrun) {
   4576		cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
   4577		cpu_buffer->last_overrun = overwrite;
   4578	}
   4579
   4580	goto again;
   4581
   4582 out:
   4583	/* Update the read_stamp on the first event */
   4584	if (reader && reader->read == 0)
   4585		cpu_buffer->read_stamp = reader->page->time_stamp;
   4586
   4587	arch_spin_unlock(&cpu_buffer->lock);
   4588	local_irq_restore(flags);
   4589
   4590	return reader;
   4591}
   4592
   4593static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
   4594{
   4595	struct ring_buffer_event *event;
   4596	struct buffer_page *reader;
   4597	unsigned length;
   4598
   4599	reader = rb_get_reader_page(cpu_buffer);
   4600
   4601	/* This function should not be called when buffer is empty */
   4602	if (RB_WARN_ON(cpu_buffer, !reader))
   4603		return;
   4604
   4605	event = rb_reader_event(cpu_buffer);
   4606
   4607	if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
   4608		cpu_buffer->read++;
   4609
   4610	rb_update_read_stamp(cpu_buffer, event);
   4611
   4612	length = rb_event_length(event);
   4613	cpu_buffer->reader_page->read += length;
   4614}
   4615
   4616static void rb_advance_iter(struct ring_buffer_iter *iter)
   4617{
   4618	struct ring_buffer_per_cpu *cpu_buffer;
   4619
   4620	cpu_buffer = iter->cpu_buffer;
   4621
   4622	/* If head == next_event then we need to jump to the next event */
   4623	if (iter->head == iter->next_event) {
   4624		/* If the event gets overwritten again, there's nothing to do */
   4625		if (rb_iter_head_event(iter) == NULL)
   4626			return;
   4627	}
   4628
   4629	iter->head = iter->next_event;
   4630
   4631	/*
   4632	 * Check if we are at the end of the buffer.
   4633	 */
   4634	if (iter->next_event >= rb_page_size(iter->head_page)) {
   4635		/* discarded commits can make the page empty */
   4636		if (iter->head_page == cpu_buffer->commit_page)
   4637			return;
   4638		rb_inc_iter(iter);
   4639		return;
   4640	}
   4641
   4642	rb_update_iter_read_stamp(iter, iter->event);
   4643}
   4644
   4645static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
   4646{
   4647	return cpu_buffer->lost_events;
   4648}
   4649
   4650static struct ring_buffer_event *
   4651rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
   4652	       unsigned long *lost_events)
   4653{
   4654	struct ring_buffer_event *event;
   4655	struct buffer_page *reader;
   4656	int nr_loops = 0;
   4657
   4658	if (ts)
   4659		*ts = 0;
   4660 again:
   4661	/*
   4662	 * We repeat when a time extend is encountered.
   4663	 * Since the time extend is always attached to a data event,
   4664	 * we should never loop more than once.
   4665	 * (We never hit the following condition more than twice).
   4666	 */
   4667	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
   4668		return NULL;
   4669
   4670	reader = rb_get_reader_page(cpu_buffer);
   4671	if (!reader)
   4672		return NULL;
   4673
   4674	event = rb_reader_event(cpu_buffer);
   4675
   4676	switch (event->type_len) {
   4677	case RINGBUF_TYPE_PADDING:
   4678		if (rb_null_event(event))
   4679			RB_WARN_ON(cpu_buffer, 1);
   4680		/*
   4681		 * Because the writer could be discarding every
   4682		 * event it creates (which would probably be bad)
   4683		 * if we were to go back to "again" then we may never
   4684		 * catch up, and will trigger the warn on, or lock
   4685		 * the box. Return the padding, and we will release
   4686		 * the current locks, and try again.
   4687		 */
   4688		return event;
   4689
   4690	case RINGBUF_TYPE_TIME_EXTEND:
   4691		/* Internal data, OK to advance */
   4692		rb_advance_reader(cpu_buffer);
   4693		goto again;
   4694
   4695	case RINGBUF_TYPE_TIME_STAMP:
   4696		if (ts) {
   4697			*ts = rb_event_time_stamp(event);
   4698			*ts = rb_fix_abs_ts(*ts, reader->page->time_stamp);
   4699			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
   4700							 cpu_buffer->cpu, ts);
   4701		}
   4702		/* Internal data, OK to advance */
   4703		rb_advance_reader(cpu_buffer);
   4704		goto again;
   4705
   4706	case RINGBUF_TYPE_DATA:
   4707		if (ts && !(*ts)) {
   4708			*ts = cpu_buffer->read_stamp + event->time_delta;
   4709			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
   4710							 cpu_buffer->cpu, ts);
   4711		}
   4712		if (lost_events)
   4713			*lost_events = rb_lost_events(cpu_buffer);
   4714		return event;
   4715
   4716	default:
   4717		RB_WARN_ON(cpu_buffer, 1);
   4718	}
   4719
   4720	return NULL;
   4721}
   4722EXPORT_SYMBOL_GPL(ring_buffer_peek);
   4723
   4724static struct ring_buffer_event *
   4725rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
   4726{
   4727	struct trace_buffer *buffer;
   4728	struct ring_buffer_per_cpu *cpu_buffer;
   4729	struct ring_buffer_event *event;
   4730	int nr_loops = 0;
   4731
   4732	if (ts)
   4733		*ts = 0;
   4734
   4735	cpu_buffer = iter->cpu_buffer;
   4736	buffer = cpu_buffer->buffer;
   4737
   4738	/*
   4739	 * Check if someone performed a consuming read to
   4740	 * the buffer. A consuming read invalidates the iterator
   4741	 * and we need to reset the iterator in this case.
   4742	 */
   4743	if (unlikely(iter->cache_read != cpu_buffer->read ||
   4744		     iter->cache_reader_page != cpu_buffer->reader_page))
   4745		rb_iter_reset(iter);
   4746
   4747 again:
   4748	if (ring_buffer_iter_empty(iter))
   4749		return NULL;
   4750
   4751	/*
   4752	 * As the writer can mess with what the iterator is trying
   4753	 * to read, just give up if we fail to get an event after
   4754	 * three tries. The iterator is not as reliable when reading
   4755	 * the ring buffer with an active write as the consumer is.
   4756	 * Do not warn if the three failures is reached.
   4757	 */
   4758	if (++nr_loops > 3)
   4759		return NULL;
   4760
   4761	if (rb_per_cpu_empty(cpu_buffer))
   4762		return NULL;
   4763
   4764	if (iter->head >= rb_page_size(iter->head_page)) {
   4765		rb_inc_iter(iter);
   4766		goto again;
   4767	}
   4768
   4769	event = rb_iter_head_event(iter);
   4770	if (!event)
   4771		goto again;
   4772
   4773	switch (event->type_len) {
   4774	case RINGBUF_TYPE_PADDING:
   4775		if (rb_null_event(event)) {
   4776			rb_inc_iter(iter);
   4777			goto again;
   4778		}
   4779		rb_advance_iter(iter);
   4780		return event;
   4781
   4782	case RINGBUF_TYPE_TIME_EXTEND:
   4783		/* Internal data, OK to advance */
   4784		rb_advance_iter(iter);
   4785		goto again;
   4786
   4787	case RINGBUF_TYPE_TIME_STAMP:
   4788		if (ts) {
   4789			*ts = rb_event_time_stamp(event);
   4790			*ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp);
   4791			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
   4792							 cpu_buffer->cpu, ts);
   4793		}
   4794		/* Internal data, OK to advance */
   4795		rb_advance_iter(iter);
   4796		goto again;
   4797
   4798	case RINGBUF_TYPE_DATA:
   4799		if (ts && !(*ts)) {
   4800			*ts = iter->read_stamp + event->time_delta;
   4801			ring_buffer_normalize_time_stamp(buffer,
   4802							 cpu_buffer->cpu, ts);
   4803		}
   4804		return event;
   4805
   4806	default:
   4807		RB_WARN_ON(cpu_buffer, 1);
   4808	}
   4809
   4810	return NULL;
   4811}
   4812EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
   4813
   4814static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
   4815{
   4816	if (likely(!in_nmi())) {
   4817		raw_spin_lock(&cpu_buffer->reader_lock);
   4818		return true;
   4819	}
   4820
   4821	/*
   4822	 * If an NMI die dumps out the content of the ring buffer
   4823	 * trylock must be used to prevent a deadlock if the NMI
   4824	 * preempted a task that holds the ring buffer locks. If
   4825	 * we get the lock then all is fine, if not, then continue
   4826	 * to do the read, but this can corrupt the ring buffer,
   4827	 * so it must be permanently disabled from future writes.
   4828	 * Reading from NMI is a oneshot deal.
   4829	 */
   4830	if (raw_spin_trylock(&cpu_buffer->reader_lock))
   4831		return true;
   4832
   4833	/* Continue without locking, but disable the ring buffer */
   4834	atomic_inc(&cpu_buffer->record_disabled);
   4835	return false;
   4836}
   4837
   4838static inline void
   4839rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
   4840{
   4841	if (likely(locked))
   4842		raw_spin_unlock(&cpu_buffer->reader_lock);
   4843	return;
   4844}
   4845
   4846/**
   4847 * ring_buffer_peek - peek at the next event to be read
   4848 * @buffer: The ring buffer to read
   4849 * @cpu: The cpu to peak at
   4850 * @ts: The timestamp counter of this event.
   4851 * @lost_events: a variable to store if events were lost (may be NULL)
   4852 *
   4853 * This will return the event that will be read next, but does
   4854 * not consume the data.
   4855 */
   4856struct ring_buffer_event *
   4857ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
   4858		 unsigned long *lost_events)
   4859{
   4860	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
   4861	struct ring_buffer_event *event;
   4862	unsigned long flags;
   4863	bool dolock;
   4864
   4865	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4866		return NULL;
   4867
   4868 again:
   4869	local_irq_save(flags);
   4870	dolock = rb_reader_lock(cpu_buffer);
   4871	event = rb_buffer_peek(cpu_buffer, ts, lost_events);
   4872	if (event && event->type_len == RINGBUF_TYPE_PADDING)
   4873		rb_advance_reader(cpu_buffer);
   4874	rb_reader_unlock(cpu_buffer, dolock);
   4875	local_irq_restore(flags);
   4876
   4877	if (event && event->type_len == RINGBUF_TYPE_PADDING)
   4878		goto again;
   4879
   4880	return event;
   4881}
   4882
   4883/** ring_buffer_iter_dropped - report if there are dropped events
   4884 * @iter: The ring buffer iterator
   4885 *
   4886 * Returns true if there was dropped events since the last peek.
   4887 */
   4888bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
   4889{
   4890	bool ret = iter->missed_events != 0;
   4891
   4892	iter->missed_events = 0;
   4893	return ret;
   4894}
   4895EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
   4896
   4897/**
   4898 * ring_buffer_iter_peek - peek at the next event to be read
   4899 * @iter: The ring buffer iterator
   4900 * @ts: The timestamp counter of this event.
   4901 *
   4902 * This will return the event that will be read next, but does
   4903 * not increment the iterator.
   4904 */
   4905struct ring_buffer_event *
   4906ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
   4907{
   4908	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
   4909	struct ring_buffer_event *event;
   4910	unsigned long flags;
   4911
   4912 again:
   4913	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   4914	event = rb_iter_peek(iter, ts);
   4915	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   4916
   4917	if (event && event->type_len == RINGBUF_TYPE_PADDING)
   4918		goto again;
   4919
   4920	return event;
   4921}
   4922
   4923/**
   4924 * ring_buffer_consume - return an event and consume it
   4925 * @buffer: The ring buffer to get the next event from
   4926 * @cpu: the cpu to read the buffer from
   4927 * @ts: a variable to store the timestamp (may be NULL)
   4928 * @lost_events: a variable to store if events were lost (may be NULL)
   4929 *
   4930 * Returns the next event in the ring buffer, and that event is consumed.
   4931 * Meaning, that sequential reads will keep returning a different event,
   4932 * and eventually empty the ring buffer if the producer is slower.
   4933 */
   4934struct ring_buffer_event *
   4935ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
   4936		    unsigned long *lost_events)
   4937{
   4938	struct ring_buffer_per_cpu *cpu_buffer;
   4939	struct ring_buffer_event *event = NULL;
   4940	unsigned long flags;
   4941	bool dolock;
   4942
   4943 again:
   4944	/* might be called in atomic */
   4945	preempt_disable();
   4946
   4947	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   4948		goto out;
   4949
   4950	cpu_buffer = buffer->buffers[cpu];
   4951	local_irq_save(flags);
   4952	dolock = rb_reader_lock(cpu_buffer);
   4953
   4954	event = rb_buffer_peek(cpu_buffer, ts, lost_events);
   4955	if (event) {
   4956		cpu_buffer->lost_events = 0;
   4957		rb_advance_reader(cpu_buffer);
   4958	}
   4959
   4960	rb_reader_unlock(cpu_buffer, dolock);
   4961	local_irq_restore(flags);
   4962
   4963 out:
   4964	preempt_enable();
   4965
   4966	if (event && event->type_len == RINGBUF_TYPE_PADDING)
   4967		goto again;
   4968
   4969	return event;
   4970}
   4971EXPORT_SYMBOL_GPL(ring_buffer_consume);
   4972
   4973/**
   4974 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
   4975 * @buffer: The ring buffer to read from
   4976 * @cpu: The cpu buffer to iterate over
   4977 * @flags: gfp flags to use for memory allocation
   4978 *
   4979 * This performs the initial preparations necessary to iterate
   4980 * through the buffer.  Memory is allocated, buffer recording
   4981 * is disabled, and the iterator pointer is returned to the caller.
   4982 *
   4983 * Disabling buffer recording prevents the reading from being
   4984 * corrupted. This is not a consuming read, so a producer is not
   4985 * expected.
   4986 *
   4987 * After a sequence of ring_buffer_read_prepare calls, the user is
   4988 * expected to make at least one call to ring_buffer_read_prepare_sync.
   4989 * Afterwards, ring_buffer_read_start is invoked to get things going
   4990 * for real.
   4991 *
   4992 * This overall must be paired with ring_buffer_read_finish.
   4993 */
   4994struct ring_buffer_iter *
   4995ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
   4996{
   4997	struct ring_buffer_per_cpu *cpu_buffer;
   4998	struct ring_buffer_iter *iter;
   4999
   5000	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5001		return NULL;
   5002
   5003	iter = kzalloc(sizeof(*iter), flags);
   5004	if (!iter)
   5005		return NULL;
   5006
   5007	iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags);
   5008	if (!iter->event) {
   5009		kfree(iter);
   5010		return NULL;
   5011	}
   5012
   5013	cpu_buffer = buffer->buffers[cpu];
   5014
   5015	iter->cpu_buffer = cpu_buffer;
   5016
   5017	atomic_inc(&cpu_buffer->resize_disabled);
   5018
   5019	return iter;
   5020}
   5021EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
   5022
   5023/**
   5024 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
   5025 *
   5026 * All previously invoked ring_buffer_read_prepare calls to prepare
   5027 * iterators will be synchronized.  Afterwards, read_buffer_read_start
   5028 * calls on those iterators are allowed.
   5029 */
   5030void
   5031ring_buffer_read_prepare_sync(void)
   5032{
   5033	synchronize_rcu();
   5034}
   5035EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
   5036
   5037/**
   5038 * ring_buffer_read_start - start a non consuming read of the buffer
   5039 * @iter: The iterator returned by ring_buffer_read_prepare
   5040 *
   5041 * This finalizes the startup of an iteration through the buffer.
   5042 * The iterator comes from a call to ring_buffer_read_prepare and
   5043 * an intervening ring_buffer_read_prepare_sync must have been
   5044 * performed.
   5045 *
   5046 * Must be paired with ring_buffer_read_finish.
   5047 */
   5048void
   5049ring_buffer_read_start(struct ring_buffer_iter *iter)
   5050{
   5051	struct ring_buffer_per_cpu *cpu_buffer;
   5052	unsigned long flags;
   5053
   5054	if (!iter)
   5055		return;
   5056
   5057	cpu_buffer = iter->cpu_buffer;
   5058
   5059	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   5060	arch_spin_lock(&cpu_buffer->lock);
   5061	rb_iter_reset(iter);
   5062	arch_spin_unlock(&cpu_buffer->lock);
   5063	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   5064}
   5065EXPORT_SYMBOL_GPL(ring_buffer_read_start);
   5066
   5067/**
   5068 * ring_buffer_read_finish - finish reading the iterator of the buffer
   5069 * @iter: The iterator retrieved by ring_buffer_start
   5070 *
   5071 * This re-enables the recording to the buffer, and frees the
   5072 * iterator.
   5073 */
   5074void
   5075ring_buffer_read_finish(struct ring_buffer_iter *iter)
   5076{
   5077	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
   5078	unsigned long flags;
   5079
   5080	/*
   5081	 * Ring buffer is disabled from recording, here's a good place
   5082	 * to check the integrity of the ring buffer.
   5083	 * Must prevent readers from trying to read, as the check
   5084	 * clears the HEAD page and readers require it.
   5085	 */
   5086	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   5087	rb_check_pages(cpu_buffer);
   5088	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   5089
   5090	atomic_dec(&cpu_buffer->resize_disabled);
   5091	kfree(iter->event);
   5092	kfree(iter);
   5093}
   5094EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
   5095
   5096/**
   5097 * ring_buffer_iter_advance - advance the iterator to the next location
   5098 * @iter: The ring buffer iterator
   5099 *
   5100 * Move the location of the iterator such that the next read will
   5101 * be the next location of the iterator.
   5102 */
   5103void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
   5104{
   5105	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
   5106	unsigned long flags;
   5107
   5108	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   5109
   5110	rb_advance_iter(iter);
   5111
   5112	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   5113}
   5114EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
   5115
   5116/**
   5117 * ring_buffer_size - return the size of the ring buffer (in bytes)
   5118 * @buffer: The ring buffer.
   5119 * @cpu: The CPU to get ring buffer size from.
   5120 */
   5121unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
   5122{
   5123	/*
   5124	 * Earlier, this method returned
   5125	 *	BUF_PAGE_SIZE * buffer->nr_pages
   5126	 * Since the nr_pages field is now removed, we have converted this to
   5127	 * return the per cpu buffer value.
   5128	 */
   5129	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5130		return 0;
   5131
   5132	return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
   5133}
   5134EXPORT_SYMBOL_GPL(ring_buffer_size);
   5135
   5136static void
   5137rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
   5138{
   5139	rb_head_page_deactivate(cpu_buffer);
   5140
   5141	cpu_buffer->head_page
   5142		= list_entry(cpu_buffer->pages, struct buffer_page, list);
   5143	local_set(&cpu_buffer->head_page->write, 0);
   5144	local_set(&cpu_buffer->head_page->entries, 0);
   5145	local_set(&cpu_buffer->head_page->page->commit, 0);
   5146
   5147	cpu_buffer->head_page->read = 0;
   5148
   5149	cpu_buffer->tail_page = cpu_buffer->head_page;
   5150	cpu_buffer->commit_page = cpu_buffer->head_page;
   5151
   5152	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
   5153	INIT_LIST_HEAD(&cpu_buffer->new_pages);
   5154	local_set(&cpu_buffer->reader_page->write, 0);
   5155	local_set(&cpu_buffer->reader_page->entries, 0);
   5156	local_set(&cpu_buffer->reader_page->page->commit, 0);
   5157	cpu_buffer->reader_page->read = 0;
   5158
   5159	local_set(&cpu_buffer->entries_bytes, 0);
   5160	local_set(&cpu_buffer->overrun, 0);
   5161	local_set(&cpu_buffer->commit_overrun, 0);
   5162	local_set(&cpu_buffer->dropped_events, 0);
   5163	local_set(&cpu_buffer->entries, 0);
   5164	local_set(&cpu_buffer->committing, 0);
   5165	local_set(&cpu_buffer->commits, 0);
   5166	local_set(&cpu_buffer->pages_touched, 0);
   5167	local_set(&cpu_buffer->pages_read, 0);
   5168	cpu_buffer->last_pages_touch = 0;
   5169	cpu_buffer->shortest_full = 0;
   5170	cpu_buffer->read = 0;
   5171	cpu_buffer->read_bytes = 0;
   5172
   5173	rb_time_set(&cpu_buffer->write_stamp, 0);
   5174	rb_time_set(&cpu_buffer->before_stamp, 0);
   5175
   5176	memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
   5177
   5178	cpu_buffer->lost_events = 0;
   5179	cpu_buffer->last_overrun = 0;
   5180
   5181	rb_head_page_activate(cpu_buffer);
   5182}
   5183
   5184/* Must have disabled the cpu buffer then done a synchronize_rcu */
   5185static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
   5186{
   5187	unsigned long flags;
   5188
   5189	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   5190
   5191	if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
   5192		goto out;
   5193
   5194	arch_spin_lock(&cpu_buffer->lock);
   5195
   5196	rb_reset_cpu(cpu_buffer);
   5197
   5198	arch_spin_unlock(&cpu_buffer->lock);
   5199
   5200 out:
   5201	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   5202}
   5203
   5204/**
   5205 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
   5206 * @buffer: The ring buffer to reset a per cpu buffer of
   5207 * @cpu: The CPU buffer to be reset
   5208 */
   5209void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
   5210{
   5211	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
   5212
   5213	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5214		return;
   5215
   5216	/* prevent another thread from changing buffer sizes */
   5217	mutex_lock(&buffer->mutex);
   5218
   5219	atomic_inc(&cpu_buffer->resize_disabled);
   5220	atomic_inc(&cpu_buffer->record_disabled);
   5221
   5222	/* Make sure all commits have finished */
   5223	synchronize_rcu();
   5224
   5225	reset_disabled_cpu_buffer(cpu_buffer);
   5226
   5227	atomic_dec(&cpu_buffer->record_disabled);
   5228	atomic_dec(&cpu_buffer->resize_disabled);
   5229
   5230	mutex_unlock(&buffer->mutex);
   5231}
   5232EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
   5233
   5234/**
   5235 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
   5236 * @buffer: The ring buffer to reset a per cpu buffer of
   5237 * @cpu: The CPU buffer to be reset
   5238 */
   5239void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
   5240{
   5241	struct ring_buffer_per_cpu *cpu_buffer;
   5242	int cpu;
   5243
   5244	/* prevent another thread from changing buffer sizes */
   5245	mutex_lock(&buffer->mutex);
   5246
   5247	for_each_online_buffer_cpu(buffer, cpu) {
   5248		cpu_buffer = buffer->buffers[cpu];
   5249
   5250		atomic_inc(&cpu_buffer->resize_disabled);
   5251		atomic_inc(&cpu_buffer->record_disabled);
   5252	}
   5253
   5254	/* Make sure all commits have finished */
   5255	synchronize_rcu();
   5256
   5257	for_each_online_buffer_cpu(buffer, cpu) {
   5258		cpu_buffer = buffer->buffers[cpu];
   5259
   5260		reset_disabled_cpu_buffer(cpu_buffer);
   5261
   5262		atomic_dec(&cpu_buffer->record_disabled);
   5263		atomic_dec(&cpu_buffer->resize_disabled);
   5264	}
   5265
   5266	mutex_unlock(&buffer->mutex);
   5267}
   5268
   5269/**
   5270 * ring_buffer_reset - reset a ring buffer
   5271 * @buffer: The ring buffer to reset all cpu buffers
   5272 */
   5273void ring_buffer_reset(struct trace_buffer *buffer)
   5274{
   5275	struct ring_buffer_per_cpu *cpu_buffer;
   5276	int cpu;
   5277
   5278	/* prevent another thread from changing buffer sizes */
   5279	mutex_lock(&buffer->mutex);
   5280
   5281	for_each_buffer_cpu(buffer, cpu) {
   5282		cpu_buffer = buffer->buffers[cpu];
   5283
   5284		atomic_inc(&cpu_buffer->resize_disabled);
   5285		atomic_inc(&cpu_buffer->record_disabled);
   5286	}
   5287
   5288	/* Make sure all commits have finished */
   5289	synchronize_rcu();
   5290
   5291	for_each_buffer_cpu(buffer, cpu) {
   5292		cpu_buffer = buffer->buffers[cpu];
   5293
   5294		reset_disabled_cpu_buffer(cpu_buffer);
   5295
   5296		atomic_dec(&cpu_buffer->record_disabled);
   5297		atomic_dec(&cpu_buffer->resize_disabled);
   5298	}
   5299
   5300	mutex_unlock(&buffer->mutex);
   5301}
   5302EXPORT_SYMBOL_GPL(ring_buffer_reset);
   5303
   5304/**
   5305 * rind_buffer_empty - is the ring buffer empty?
   5306 * @buffer: The ring buffer to test
   5307 */
   5308bool ring_buffer_empty(struct trace_buffer *buffer)
   5309{
   5310	struct ring_buffer_per_cpu *cpu_buffer;
   5311	unsigned long flags;
   5312	bool dolock;
   5313	int cpu;
   5314	int ret;
   5315
   5316	/* yes this is racy, but if you don't like the race, lock the buffer */
   5317	for_each_buffer_cpu(buffer, cpu) {
   5318		cpu_buffer = buffer->buffers[cpu];
   5319		local_irq_save(flags);
   5320		dolock = rb_reader_lock(cpu_buffer);
   5321		ret = rb_per_cpu_empty(cpu_buffer);
   5322		rb_reader_unlock(cpu_buffer, dolock);
   5323		local_irq_restore(flags);
   5324
   5325		if (!ret)
   5326			return false;
   5327	}
   5328
   5329	return true;
   5330}
   5331EXPORT_SYMBOL_GPL(ring_buffer_empty);
   5332
   5333/**
   5334 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
   5335 * @buffer: The ring buffer
   5336 * @cpu: The CPU buffer to test
   5337 */
   5338bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
   5339{
   5340	struct ring_buffer_per_cpu *cpu_buffer;
   5341	unsigned long flags;
   5342	bool dolock;
   5343	int ret;
   5344
   5345	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5346		return true;
   5347
   5348	cpu_buffer = buffer->buffers[cpu];
   5349	local_irq_save(flags);
   5350	dolock = rb_reader_lock(cpu_buffer);
   5351	ret = rb_per_cpu_empty(cpu_buffer);
   5352	rb_reader_unlock(cpu_buffer, dolock);
   5353	local_irq_restore(flags);
   5354
   5355	return ret;
   5356}
   5357EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
   5358
   5359#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
   5360/**
   5361 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
   5362 * @buffer_a: One buffer to swap with
   5363 * @buffer_b: The other buffer to swap with
   5364 * @cpu: the CPU of the buffers to swap
   5365 *
   5366 * This function is useful for tracers that want to take a "snapshot"
   5367 * of a CPU buffer and has another back up buffer lying around.
   5368 * it is expected that the tracer handles the cpu buffer not being
   5369 * used at the moment.
   5370 */
   5371int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
   5372			 struct trace_buffer *buffer_b, int cpu)
   5373{
   5374	struct ring_buffer_per_cpu *cpu_buffer_a;
   5375	struct ring_buffer_per_cpu *cpu_buffer_b;
   5376	int ret = -EINVAL;
   5377
   5378	if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
   5379	    !cpumask_test_cpu(cpu, buffer_b->cpumask))
   5380		goto out;
   5381
   5382	cpu_buffer_a = buffer_a->buffers[cpu];
   5383	cpu_buffer_b = buffer_b->buffers[cpu];
   5384
   5385	/* At least make sure the two buffers are somewhat the same */
   5386	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
   5387		goto out;
   5388
   5389	ret = -EAGAIN;
   5390
   5391	if (atomic_read(&buffer_a->record_disabled))
   5392		goto out;
   5393
   5394	if (atomic_read(&buffer_b->record_disabled))
   5395		goto out;
   5396
   5397	if (atomic_read(&cpu_buffer_a->record_disabled))
   5398		goto out;
   5399
   5400	if (atomic_read(&cpu_buffer_b->record_disabled))
   5401		goto out;
   5402
   5403	/*
   5404	 * We can't do a synchronize_rcu here because this
   5405	 * function can be called in atomic context.
   5406	 * Normally this will be called from the same CPU as cpu.
   5407	 * If not it's up to the caller to protect this.
   5408	 */
   5409	atomic_inc(&cpu_buffer_a->record_disabled);
   5410	atomic_inc(&cpu_buffer_b->record_disabled);
   5411
   5412	ret = -EBUSY;
   5413	if (local_read(&cpu_buffer_a->committing))
   5414		goto out_dec;
   5415	if (local_read(&cpu_buffer_b->committing))
   5416		goto out_dec;
   5417
   5418	buffer_a->buffers[cpu] = cpu_buffer_b;
   5419	buffer_b->buffers[cpu] = cpu_buffer_a;
   5420
   5421	cpu_buffer_b->buffer = buffer_a;
   5422	cpu_buffer_a->buffer = buffer_b;
   5423
   5424	ret = 0;
   5425
   5426out_dec:
   5427	atomic_dec(&cpu_buffer_a->record_disabled);
   5428	atomic_dec(&cpu_buffer_b->record_disabled);
   5429out:
   5430	return ret;
   5431}
   5432EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
   5433#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
   5434
   5435/**
   5436 * ring_buffer_alloc_read_page - allocate a page to read from buffer
   5437 * @buffer: the buffer to allocate for.
   5438 * @cpu: the cpu buffer to allocate.
   5439 *
   5440 * This function is used in conjunction with ring_buffer_read_page.
   5441 * When reading a full page from the ring buffer, these functions
   5442 * can be used to speed up the process. The calling function should
   5443 * allocate a few pages first with this function. Then when it
   5444 * needs to get pages from the ring buffer, it passes the result
   5445 * of this function into ring_buffer_read_page, which will swap
   5446 * the page that was allocated, with the read page of the buffer.
   5447 *
   5448 * Returns:
   5449 *  The page allocated, or ERR_PTR
   5450 */
   5451void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
   5452{
   5453	struct ring_buffer_per_cpu *cpu_buffer;
   5454	struct buffer_data_page *bpage = NULL;
   5455	unsigned long flags;
   5456	struct page *page;
   5457
   5458	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5459		return ERR_PTR(-ENODEV);
   5460
   5461	cpu_buffer = buffer->buffers[cpu];
   5462	local_irq_save(flags);
   5463	arch_spin_lock(&cpu_buffer->lock);
   5464
   5465	if (cpu_buffer->free_page) {
   5466		bpage = cpu_buffer->free_page;
   5467		cpu_buffer->free_page = NULL;
   5468	}
   5469
   5470	arch_spin_unlock(&cpu_buffer->lock);
   5471	local_irq_restore(flags);
   5472
   5473	if (bpage)
   5474		goto out;
   5475
   5476	page = alloc_pages_node(cpu_to_node(cpu),
   5477				GFP_KERNEL | __GFP_NORETRY, 0);
   5478	if (!page)
   5479		return ERR_PTR(-ENOMEM);
   5480
   5481	bpage = page_address(page);
   5482
   5483 out:
   5484	rb_init_page(bpage);
   5485
   5486	return bpage;
   5487}
   5488EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
   5489
   5490/**
   5491 * ring_buffer_free_read_page - free an allocated read page
   5492 * @buffer: the buffer the page was allocate for
   5493 * @cpu: the cpu buffer the page came from
   5494 * @data: the page to free
   5495 *
   5496 * Free a page allocated from ring_buffer_alloc_read_page.
   5497 */
   5498void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data)
   5499{
   5500	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
   5501	struct buffer_data_page *bpage = data;
   5502	struct page *page = virt_to_page(bpage);
   5503	unsigned long flags;
   5504
   5505	/* If the page is still in use someplace else, we can't reuse it */
   5506	if (page_ref_count(page) > 1)
   5507		goto out;
   5508
   5509	local_irq_save(flags);
   5510	arch_spin_lock(&cpu_buffer->lock);
   5511
   5512	if (!cpu_buffer->free_page) {
   5513		cpu_buffer->free_page = bpage;
   5514		bpage = NULL;
   5515	}
   5516
   5517	arch_spin_unlock(&cpu_buffer->lock);
   5518	local_irq_restore(flags);
   5519
   5520 out:
   5521	free_page((unsigned long)bpage);
   5522}
   5523EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
   5524
   5525/**
   5526 * ring_buffer_read_page - extract a page from the ring buffer
   5527 * @buffer: buffer to extract from
   5528 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
   5529 * @len: amount to extract
   5530 * @cpu: the cpu of the buffer to extract
   5531 * @full: should the extraction only happen when the page is full.
   5532 *
   5533 * This function will pull out a page from the ring buffer and consume it.
   5534 * @data_page must be the address of the variable that was returned
   5535 * from ring_buffer_alloc_read_page. This is because the page might be used
   5536 * to swap with a page in the ring buffer.
   5537 *
   5538 * for example:
   5539 *	rpage = ring_buffer_alloc_read_page(buffer, cpu);
   5540 *	if (IS_ERR(rpage))
   5541 *		return PTR_ERR(rpage);
   5542 *	ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
   5543 *	if (ret >= 0)
   5544 *		process_page(rpage, ret);
   5545 *
   5546 * When @full is set, the function will not return true unless
   5547 * the writer is off the reader page.
   5548 *
   5549 * Note: it is up to the calling functions to handle sleeps and wakeups.
   5550 *  The ring buffer can be used anywhere in the kernel and can not
   5551 *  blindly call wake_up. The layer that uses the ring buffer must be
   5552 *  responsible for that.
   5553 *
   5554 * Returns:
   5555 *  >=0 if data has been transferred, returns the offset of consumed data.
   5556 *  <0 if no data has been transferred.
   5557 */
   5558int ring_buffer_read_page(struct trace_buffer *buffer,
   5559			  void **data_page, size_t len, int cpu, int full)
   5560{
   5561	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
   5562	struct ring_buffer_event *event;
   5563	struct buffer_data_page *bpage;
   5564	struct buffer_page *reader;
   5565	unsigned long missed_events;
   5566	unsigned long flags;
   5567	unsigned int commit;
   5568	unsigned int read;
   5569	u64 save_timestamp;
   5570	int ret = -1;
   5571
   5572	if (!cpumask_test_cpu(cpu, buffer->cpumask))
   5573		goto out;
   5574
   5575	/*
   5576	 * If len is not big enough to hold the page header, then
   5577	 * we can not copy anything.
   5578	 */
   5579	if (len <= BUF_PAGE_HDR_SIZE)
   5580		goto out;
   5581
   5582	len -= BUF_PAGE_HDR_SIZE;
   5583
   5584	if (!data_page)
   5585		goto out;
   5586
   5587	bpage = *data_page;
   5588	if (!bpage)
   5589		goto out;
   5590
   5591	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
   5592
   5593	reader = rb_get_reader_page(cpu_buffer);
   5594	if (!reader)
   5595		goto out_unlock;
   5596
   5597	event = rb_reader_event(cpu_buffer);
   5598
   5599	read = reader->read;
   5600	commit = rb_page_commit(reader);
   5601
   5602	/* Check if any events were dropped */
   5603	missed_events = cpu_buffer->lost_events;
   5604
   5605	/*
   5606	 * If this page has been partially read or
   5607	 * if len is not big enough to read the rest of the page or
   5608	 * a writer is still on the page, then
   5609	 * we must copy the data from the page to the buffer.
   5610	 * Otherwise, we can simply swap the page with the one passed in.
   5611	 */
   5612	if (read || (len < (commit - read)) ||
   5613	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
   5614		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
   5615		unsigned int rpos = read;
   5616		unsigned int pos = 0;
   5617		unsigned int size;
   5618
   5619		if (full)
   5620			goto out_unlock;
   5621
   5622		if (len > (commit - read))
   5623			len = (commit - read);
   5624
   5625		/* Always keep the time extend and data together */
   5626		size = rb_event_ts_length(event);
   5627
   5628		if (len < size)
   5629			goto out_unlock;
   5630
   5631		/* save the current timestamp, since the user will need it */
   5632		save_timestamp = cpu_buffer->read_stamp;
   5633
   5634		/* Need to copy one event at a time */
   5635		do {
   5636			/* We need the size of one event, because
   5637			 * rb_advance_reader only advances by one event,
   5638			 * whereas rb_event_ts_length may include the size of
   5639			 * one or two events.
   5640			 * We have already ensured there's enough space if this
   5641			 * is a time extend. */
   5642			size = rb_event_length(event);
   5643			memcpy(bpage->data + pos, rpage->data + rpos, size);
   5644
   5645			len -= size;
   5646
   5647			rb_advance_reader(cpu_buffer);
   5648			rpos = reader->read;
   5649			pos += size;
   5650
   5651			if (rpos >= commit)
   5652				break;
   5653
   5654			event = rb_reader_event(cpu_buffer);
   5655			/* Always keep the time extend and data together */
   5656			size = rb_event_ts_length(event);
   5657		} while (len >= size);
   5658
   5659		/* update bpage */
   5660		local_set(&bpage->commit, pos);
   5661		bpage->time_stamp = save_timestamp;
   5662
   5663		/* we copied everything to the beginning */
   5664		read = 0;
   5665	} else {
   5666		/* update the entry counter */
   5667		cpu_buffer->read += rb_page_entries(reader);
   5668		cpu_buffer->read_bytes += BUF_PAGE_SIZE;
   5669
   5670		/* swap the pages */
   5671		rb_init_page(bpage);
   5672		bpage = reader->page;
   5673		reader->page = *data_page;
   5674		local_set(&reader->write, 0);
   5675		local_set(&reader->entries, 0);
   5676		reader->read = 0;
   5677		*data_page = bpage;
   5678
   5679		/*
   5680		 * Use the real_end for the data size,
   5681		 * This gives us a chance to store the lost events
   5682		 * on the page.
   5683		 */
   5684		if (reader->real_end)
   5685			local_set(&bpage->commit, reader->real_end);
   5686	}
   5687	ret = read;
   5688
   5689	cpu_buffer->lost_events = 0;
   5690
   5691	commit = local_read(&bpage->commit);
   5692	/*
   5693	 * Set a flag in the commit field if we lost events
   5694	 */
   5695	if (missed_events) {
   5696		/* If there is room at the end of the page to save the
   5697		 * missed events, then record it there.
   5698		 */
   5699		if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
   5700			memcpy(&bpage->data[commit], &missed_events,
   5701			       sizeof(missed_events));
   5702			local_add(RB_MISSED_STORED, &bpage->commit);
   5703			commit += sizeof(missed_events);
   5704		}
   5705		local_add(RB_MISSED_EVENTS, &bpage->commit);
   5706	}
   5707
   5708	/*
   5709	 * This page may be off to user land. Zero it out here.
   5710	 */
   5711	if (commit < BUF_PAGE_SIZE)
   5712		memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
   5713
   5714 out_unlock:
   5715	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   5716
   5717 out:
   5718	return ret;
   5719}
   5720EXPORT_SYMBOL_GPL(ring_buffer_read_page);
   5721
   5722/*
   5723 * We only allocate new buffers, never free them if the CPU goes down.
   5724 * If we were to free the buffer, then the user would lose any trace that was in
   5725 * the buffer.
   5726 */
   5727int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
   5728{
   5729	struct trace_buffer *buffer;
   5730	long nr_pages_same;
   5731	int cpu_i;
   5732	unsigned long nr_pages;
   5733
   5734	buffer = container_of(node, struct trace_buffer, node);
   5735	if (cpumask_test_cpu(cpu, buffer->cpumask))
   5736		return 0;
   5737
   5738	nr_pages = 0;
   5739	nr_pages_same = 1;
   5740	/* check if all cpu sizes are same */
   5741	for_each_buffer_cpu(buffer, cpu_i) {
   5742		/* fill in the size from first enabled cpu */
   5743		if (nr_pages == 0)
   5744			nr_pages = buffer->buffers[cpu_i]->nr_pages;
   5745		if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
   5746			nr_pages_same = 0;
   5747			break;
   5748		}
   5749	}
   5750	/* allocate minimum pages, user can later expand it */
   5751	if (!nr_pages_same)
   5752		nr_pages = 2;
   5753	buffer->buffers[cpu] =
   5754		rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
   5755	if (!buffer->buffers[cpu]) {
   5756		WARN(1, "failed to allocate ring buffer on CPU %u\n",
   5757		     cpu);
   5758		return -ENOMEM;
   5759	}
   5760	smp_wmb();
   5761	cpumask_set_cpu(cpu, buffer->cpumask);
   5762	return 0;
   5763}
   5764
   5765#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
   5766/*
   5767 * This is a basic integrity check of the ring buffer.
   5768 * Late in the boot cycle this test will run when configured in.
   5769 * It will kick off a thread per CPU that will go into a loop
   5770 * writing to the per cpu ring buffer various sizes of data.
   5771 * Some of the data will be large items, some small.
   5772 *
   5773 * Another thread is created that goes into a spin, sending out
   5774 * IPIs to the other CPUs to also write into the ring buffer.
   5775 * this is to test the nesting ability of the buffer.
   5776 *
   5777 * Basic stats are recorded and reported. If something in the
   5778 * ring buffer should happen that's not expected, a big warning
   5779 * is displayed and all ring buffers are disabled.
   5780 */
   5781static struct task_struct *rb_threads[NR_CPUS] __initdata;
   5782
   5783struct rb_test_data {
   5784	struct trace_buffer *buffer;
   5785	unsigned long		events;
   5786	unsigned long		bytes_written;
   5787	unsigned long		bytes_alloc;
   5788	unsigned long		bytes_dropped;
   5789	unsigned long		events_nested;
   5790	unsigned long		bytes_written_nested;
   5791	unsigned long		bytes_alloc_nested;
   5792	unsigned long		bytes_dropped_nested;
   5793	int			min_size_nested;
   5794	int			max_size_nested;
   5795	int			max_size;
   5796	int			min_size;
   5797	int			cpu;
   5798	int			cnt;
   5799};
   5800
   5801static struct rb_test_data rb_data[NR_CPUS] __initdata;
   5802
   5803/* 1 meg per cpu */
   5804#define RB_TEST_BUFFER_SIZE	1048576
   5805
   5806static char rb_string[] __initdata =
   5807	"abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
   5808	"?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
   5809	"!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
   5810
   5811static bool rb_test_started __initdata;
   5812
   5813struct rb_item {
   5814	int size;
   5815	char str[];
   5816};
   5817
   5818static __init int rb_write_something(struct rb_test_data *data, bool nested)
   5819{
   5820	struct ring_buffer_event *event;
   5821	struct rb_item *item;
   5822	bool started;
   5823	int event_len;
   5824	int size;
   5825	int len;
   5826	int cnt;
   5827
   5828	/* Have nested writes different that what is written */
   5829	cnt = data->cnt + (nested ? 27 : 0);
   5830
   5831	/* Multiply cnt by ~e, to make some unique increment */
   5832	size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
   5833
   5834	len = size + sizeof(struct rb_item);
   5835
   5836	started = rb_test_started;
   5837	/* read rb_test_started before checking buffer enabled */
   5838	smp_rmb();
   5839
   5840	event = ring_buffer_lock_reserve(data->buffer, len);
   5841	if (!event) {
   5842		/* Ignore dropped events before test starts. */
   5843		if (started) {
   5844			if (nested)
   5845				data->bytes_dropped += len;
   5846			else
   5847				data->bytes_dropped_nested += len;
   5848		}
   5849		return len;
   5850	}
   5851
   5852	event_len = ring_buffer_event_length(event);
   5853
   5854	if (RB_WARN_ON(data->buffer, event_len < len))
   5855		goto out;
   5856
   5857	item = ring_buffer_event_data(event);
   5858	item->size = size;
   5859	memcpy(item->str, rb_string, size);
   5860
   5861	if (nested) {
   5862		data->bytes_alloc_nested += event_len;
   5863		data->bytes_written_nested += len;
   5864		data->events_nested++;
   5865		if (!data->min_size_nested || len < data->min_size_nested)
   5866			data->min_size_nested = len;
   5867		if (len > data->max_size_nested)
   5868			data->max_size_nested = len;
   5869	} else {
   5870		data->bytes_alloc += event_len;
   5871		data->bytes_written += len;
   5872		data->events++;
   5873		if (!data->min_size || len < data->min_size)
   5874			data->max_size = len;
   5875		if (len > data->max_size)
   5876			data->max_size = len;
   5877	}
   5878
   5879 out:
   5880	ring_buffer_unlock_commit(data->buffer, event);
   5881
   5882	return 0;
   5883}
   5884
   5885static __init int rb_test(void *arg)
   5886{
   5887	struct rb_test_data *data = arg;
   5888
   5889	while (!kthread_should_stop()) {
   5890		rb_write_something(data, false);
   5891		data->cnt++;
   5892
   5893		set_current_state(TASK_INTERRUPTIBLE);
   5894		/* Now sleep between a min of 100-300us and a max of 1ms */
   5895		usleep_range(((data->cnt % 3) + 1) * 100, 1000);
   5896	}
   5897
   5898	return 0;
   5899}
   5900
   5901static __init void rb_ipi(void *ignore)
   5902{
   5903	struct rb_test_data *data;
   5904	int cpu = smp_processor_id();
   5905
   5906	data = &rb_data[cpu];
   5907	rb_write_something(data, true);
   5908}
   5909
   5910static __init int rb_hammer_test(void *arg)
   5911{
   5912	while (!kthread_should_stop()) {
   5913
   5914		/* Send an IPI to all cpus to write data! */
   5915		smp_call_function(rb_ipi, NULL, 1);
   5916		/* No sleep, but for non preempt, let others run */
   5917		schedule();
   5918	}
   5919
   5920	return 0;
   5921}
   5922
   5923static __init int test_ringbuffer(void)
   5924{
   5925	struct task_struct *rb_hammer;
   5926	struct trace_buffer *buffer;
   5927	int cpu;
   5928	int ret = 0;
   5929
   5930	if (security_locked_down(LOCKDOWN_TRACEFS)) {
   5931		pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
   5932		return 0;
   5933	}
   5934
   5935	pr_info("Running ring buffer tests...\n");
   5936
   5937	buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
   5938	if (WARN_ON(!buffer))
   5939		return 0;
   5940
   5941	/* Disable buffer so that threads can't write to it yet */
   5942	ring_buffer_record_off(buffer);
   5943
   5944	for_each_online_cpu(cpu) {
   5945		rb_data[cpu].buffer = buffer;
   5946		rb_data[cpu].cpu = cpu;
   5947		rb_data[cpu].cnt = cpu;
   5948		rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu],
   5949						     cpu, "rbtester/%u");
   5950		if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
   5951			pr_cont("FAILED\n");
   5952			ret = PTR_ERR(rb_threads[cpu]);
   5953			goto out_free;
   5954		}
   5955	}
   5956
   5957	/* Now create the rb hammer! */
   5958	rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
   5959	if (WARN_ON(IS_ERR(rb_hammer))) {
   5960		pr_cont("FAILED\n");
   5961		ret = PTR_ERR(rb_hammer);
   5962		goto out_free;
   5963	}
   5964
   5965	ring_buffer_record_on(buffer);
   5966	/*
   5967	 * Show buffer is enabled before setting rb_test_started.
   5968	 * Yes there's a small race window where events could be
   5969	 * dropped and the thread wont catch it. But when a ring
   5970	 * buffer gets enabled, there will always be some kind of
   5971	 * delay before other CPUs see it. Thus, we don't care about
   5972	 * those dropped events. We care about events dropped after
   5973	 * the threads see that the buffer is active.
   5974	 */
   5975	smp_wmb();
   5976	rb_test_started = true;
   5977
   5978	set_current_state(TASK_INTERRUPTIBLE);
   5979	/* Just run for 10 seconds */;
   5980	schedule_timeout(10 * HZ);
   5981
   5982	kthread_stop(rb_hammer);
   5983
   5984 out_free:
   5985	for_each_online_cpu(cpu) {
   5986		if (!rb_threads[cpu])
   5987			break;
   5988		kthread_stop(rb_threads[cpu]);
   5989	}
   5990	if (ret) {
   5991		ring_buffer_free(buffer);
   5992		return ret;
   5993	}
   5994
   5995	/* Report! */
   5996	pr_info("finished\n");
   5997	for_each_online_cpu(cpu) {
   5998		struct ring_buffer_event *event;
   5999		struct rb_test_data *data = &rb_data[cpu];
   6000		struct rb_item *item;
   6001		unsigned long total_events;
   6002		unsigned long total_dropped;
   6003		unsigned long total_written;
   6004		unsigned long total_alloc;
   6005		unsigned long total_read = 0;
   6006		unsigned long total_size = 0;
   6007		unsigned long total_len = 0;
   6008		unsigned long total_lost = 0;
   6009		unsigned long lost;
   6010		int big_event_size;
   6011		int small_event_size;
   6012
   6013		ret = -1;
   6014
   6015		total_events = data->events + data->events_nested;
   6016		total_written = data->bytes_written + data->bytes_written_nested;
   6017		total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
   6018		total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
   6019
   6020		big_event_size = data->max_size + data->max_size_nested;
   6021		small_event_size = data->min_size + data->min_size_nested;
   6022
   6023		pr_info("CPU %d:\n", cpu);
   6024		pr_info("              events:    %ld\n", total_events);
   6025		pr_info("       dropped bytes:    %ld\n", total_dropped);
   6026		pr_info("       alloced bytes:    %ld\n", total_alloc);
   6027		pr_info("       written bytes:    %ld\n", total_written);
   6028		pr_info("       biggest event:    %d\n", big_event_size);
   6029		pr_info("      smallest event:    %d\n", small_event_size);
   6030
   6031		if (RB_WARN_ON(buffer, total_dropped))
   6032			break;
   6033
   6034		ret = 0;
   6035
   6036		while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
   6037			total_lost += lost;
   6038			item = ring_buffer_event_data(event);
   6039			total_len += ring_buffer_event_length(event);
   6040			total_size += item->size + sizeof(struct rb_item);
   6041			if (memcmp(&item->str[0], rb_string, item->size) != 0) {
   6042				pr_info("FAILED!\n");
   6043				pr_info("buffer had: %.*s\n", item->size, item->str);
   6044				pr_info("expected:   %.*s\n", item->size, rb_string);
   6045				RB_WARN_ON(buffer, 1);
   6046				ret = -1;
   6047				break;
   6048			}
   6049			total_read++;
   6050		}
   6051		if (ret)
   6052			break;
   6053
   6054		ret = -1;
   6055
   6056		pr_info("         read events:   %ld\n", total_read);
   6057		pr_info("         lost events:   %ld\n", total_lost);
   6058		pr_info("        total events:   %ld\n", total_lost + total_read);
   6059		pr_info("  recorded len bytes:   %ld\n", total_len);
   6060		pr_info(" recorded size bytes:   %ld\n", total_size);
   6061		if (total_lost) {
   6062			pr_info(" With dropped events, record len and size may not match\n"
   6063				" alloced and written from above\n");
   6064		} else {
   6065			if (RB_WARN_ON(buffer, total_len != total_alloc ||
   6066				       total_size != total_written))
   6067				break;
   6068		}
   6069		if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
   6070			break;
   6071
   6072		ret = 0;
   6073	}
   6074	if (!ret)
   6075		pr_info("Ring buffer PASSED!\n");
   6076
   6077	ring_buffer_free(buffer);
   6078	return 0;
   6079}
   6080
   6081late_initcall(test_ringbuffer);
   6082#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */