cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

tree_nocb.h (47902B)


      1/* SPDX-License-Identifier: GPL-2.0+ */
      2/*
      3 * Read-Copy Update mechanism for mutual exclusion (tree-based version)
      4 * Internal non-public definitions that provide either classic
      5 * or preemptible semantics.
      6 *
      7 * Copyright Red Hat, 2009
      8 * Copyright IBM Corporation, 2009
      9 * Copyright SUSE, 2021
     10 *
     11 * Author: Ingo Molnar <mingo@elte.hu>
     12 *	   Paul E. McKenney <paulmck@linux.ibm.com>
     13 *	   Frederic Weisbecker <frederic@kernel.org>
     14 */
     15
     16#ifdef CONFIG_RCU_NOCB_CPU
     17static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
     18static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
     19static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
     20{
     21	return lockdep_is_held(&rdp->nocb_lock);
     22}
     23
     24static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
     25{
     26	/* Race on early boot between thread creation and assignment */
     27	if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
     28		return true;
     29
     30	if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
     31		if (in_task())
     32			return true;
     33	return false;
     34}
     35
     36/*
     37 * Offload callback processing from the boot-time-specified set of CPUs
     38 * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
     39 * created that pull the callbacks from the corresponding CPU, wait for
     40 * a grace period to elapse, and invoke the callbacks.  These kthreads
     41 * are organized into GP kthreads, which manage incoming callbacks, wait for
     42 * grace periods, and awaken CB kthreads, and the CB kthreads, which only
     43 * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
     44 * do a wake_up() on their GP kthread when they insert a callback into any
     45 * empty list, unless the rcu_nocb_poll boot parameter has been specified,
     46 * in which case each kthread actively polls its CPU.  (Which isn't so great
     47 * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
     48 *
     49 * This is intended to be used in conjunction with Frederic Weisbecker's
     50 * adaptive-idle work, which would seriously reduce OS jitter on CPUs
     51 * running CPU-bound user-mode computations.
     52 *
     53 * Offloading of callbacks can also be used as an energy-efficiency
     54 * measure because CPUs with no RCU callbacks queued are more aggressive
     55 * about entering dyntick-idle mode.
     56 */
     57
     58
     59/*
     60 * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
     61 * If the list is invalid, a warning is emitted and all CPUs are offloaded.
     62 */
     63static int __init rcu_nocb_setup(char *str)
     64{
     65	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
     66	if (*str == '=') {
     67		if (cpulist_parse(++str, rcu_nocb_mask)) {
     68			pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
     69			cpumask_setall(rcu_nocb_mask);
     70		}
     71	}
     72	rcu_state.nocb_is_setup = true;
     73	return 1;
     74}
     75__setup("rcu_nocbs", rcu_nocb_setup);
     76
     77static int __init parse_rcu_nocb_poll(char *arg)
     78{
     79	rcu_nocb_poll = true;
     80	return 0;
     81}
     82early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
     83
     84/*
     85 * Don't bother bypassing ->cblist if the call_rcu() rate is low.
     86 * After all, the main point of bypassing is to avoid lock contention
     87 * on ->nocb_lock, which only can happen at high call_rcu() rates.
     88 */
     89static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
     90module_param(nocb_nobypass_lim_per_jiffy, int, 0);
     91
     92/*
     93 * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
     94 * lock isn't immediately available, increment ->nocb_lock_contended to
     95 * flag the contention.
     96 */
     97static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
     98	__acquires(&rdp->nocb_bypass_lock)
     99{
    100	lockdep_assert_irqs_disabled();
    101	if (raw_spin_trylock(&rdp->nocb_bypass_lock))
    102		return;
    103	atomic_inc(&rdp->nocb_lock_contended);
    104	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
    105	smp_mb__after_atomic(); /* atomic_inc() before lock. */
    106	raw_spin_lock(&rdp->nocb_bypass_lock);
    107	smp_mb__before_atomic(); /* atomic_dec() after lock. */
    108	atomic_dec(&rdp->nocb_lock_contended);
    109}
    110
    111/*
    112 * Spinwait until the specified rcu_data structure's ->nocb_lock is
    113 * not contended.  Please note that this is extremely special-purpose,
    114 * relying on the fact that at most two kthreads and one CPU contend for
    115 * this lock, and also that the two kthreads are guaranteed to have frequent
    116 * grace-period-duration time intervals between successive acquisitions
    117 * of the lock.  This allows us to use an extremely simple throttling
    118 * mechanism, and further to apply it only to the CPU doing floods of
    119 * call_rcu() invocations.  Don't try this at home!
    120 */
    121static void rcu_nocb_wait_contended(struct rcu_data *rdp)
    122{
    123	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
    124	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
    125		cpu_relax();
    126}
    127
    128/*
    129 * Conditionally acquire the specified rcu_data structure's
    130 * ->nocb_bypass_lock.
    131 */
    132static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
    133{
    134	lockdep_assert_irqs_disabled();
    135	return raw_spin_trylock(&rdp->nocb_bypass_lock);
    136}
    137
    138/*
    139 * Release the specified rcu_data structure's ->nocb_bypass_lock.
    140 */
    141static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
    142	__releases(&rdp->nocb_bypass_lock)
    143{
    144	lockdep_assert_irqs_disabled();
    145	raw_spin_unlock(&rdp->nocb_bypass_lock);
    146}
    147
    148/*
    149 * Acquire the specified rcu_data structure's ->nocb_lock, but only
    150 * if it corresponds to a no-CBs CPU.
    151 */
    152static void rcu_nocb_lock(struct rcu_data *rdp)
    153{
    154	lockdep_assert_irqs_disabled();
    155	if (!rcu_rdp_is_offloaded(rdp))
    156		return;
    157	raw_spin_lock(&rdp->nocb_lock);
    158}
    159
    160/*
    161 * Release the specified rcu_data structure's ->nocb_lock, but only
    162 * if it corresponds to a no-CBs CPU.
    163 */
    164static void rcu_nocb_unlock(struct rcu_data *rdp)
    165{
    166	if (rcu_rdp_is_offloaded(rdp)) {
    167		lockdep_assert_irqs_disabled();
    168		raw_spin_unlock(&rdp->nocb_lock);
    169	}
    170}
    171
    172/*
    173 * Release the specified rcu_data structure's ->nocb_lock and restore
    174 * interrupts, but only if it corresponds to a no-CBs CPU.
    175 */
    176static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
    177				       unsigned long flags)
    178{
    179	if (rcu_rdp_is_offloaded(rdp)) {
    180		lockdep_assert_irqs_disabled();
    181		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
    182	} else {
    183		local_irq_restore(flags);
    184	}
    185}
    186
    187/* Lockdep check that ->cblist may be safely accessed. */
    188static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
    189{
    190	lockdep_assert_irqs_disabled();
    191	if (rcu_rdp_is_offloaded(rdp))
    192		lockdep_assert_held(&rdp->nocb_lock);
    193}
    194
    195/*
    196 * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
    197 * grace period.
    198 */
    199static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
    200{
    201	swake_up_all(sq);
    202}
    203
    204static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
    205{
    206	return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
    207}
    208
    209static void rcu_init_one_nocb(struct rcu_node *rnp)
    210{
    211	init_swait_queue_head(&rnp->nocb_gp_wq[0]);
    212	init_swait_queue_head(&rnp->nocb_gp_wq[1]);
    213}
    214
    215static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
    216			   struct rcu_data *rdp,
    217			   bool force, unsigned long flags)
    218	__releases(rdp_gp->nocb_gp_lock)
    219{
    220	bool needwake = false;
    221
    222	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
    223		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
    224		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    225				    TPS("AlreadyAwake"));
    226		return false;
    227	}
    228
    229	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
    230		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
    231		del_timer(&rdp_gp->nocb_timer);
    232	}
    233
    234	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
    235		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
    236		needwake = true;
    237	}
    238	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
    239	if (needwake) {
    240		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
    241		wake_up_process(rdp_gp->nocb_gp_kthread);
    242	}
    243
    244	return needwake;
    245}
    246
    247/*
    248 * Kick the GP kthread for this NOCB group.
    249 */
    250static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
    251{
    252	unsigned long flags;
    253	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
    254
    255	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
    256	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
    257}
    258
    259/*
    260 * Arrange to wake the GP kthread for this NOCB group at some future
    261 * time when it is safe to do so.
    262 */
    263static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
    264			       const char *reason)
    265{
    266	unsigned long flags;
    267	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
    268
    269	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
    270
    271	/*
    272	 * Bypass wakeup overrides previous deferments. In case
    273	 * of callback storm, no need to wake up too early.
    274	 */
    275	if (waketype == RCU_NOCB_WAKE_BYPASS) {
    276		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
    277		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
    278	} else {
    279		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
    280			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
    281		if (rdp_gp->nocb_defer_wakeup < waketype)
    282			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
    283	}
    284
    285	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
    286
    287	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
    288}
    289
    290/*
    291 * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
    292 * However, if there is a callback to be enqueued and if ->nocb_bypass
    293 * proves to be initially empty, just return false because the no-CB GP
    294 * kthread may need to be awakened in this case.
    295 *
    296 * Note that this function always returns true if rhp is NULL.
    297 */
    298static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
    299				     unsigned long j)
    300{
    301	struct rcu_cblist rcl;
    302
    303	WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
    304	rcu_lockdep_assert_cblist_protected(rdp);
    305	lockdep_assert_held(&rdp->nocb_bypass_lock);
    306	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
    307		raw_spin_unlock(&rdp->nocb_bypass_lock);
    308		return false;
    309	}
    310	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
    311	if (rhp)
    312		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
    313	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
    314	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
    315	WRITE_ONCE(rdp->nocb_bypass_first, j);
    316	rcu_nocb_bypass_unlock(rdp);
    317	return true;
    318}
    319
    320/*
    321 * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
    322 * However, if there is a callback to be enqueued and if ->nocb_bypass
    323 * proves to be initially empty, just return false because the no-CB GP
    324 * kthread may need to be awakened in this case.
    325 *
    326 * Note that this function always returns true if rhp is NULL.
    327 */
    328static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
    329				  unsigned long j)
    330{
    331	if (!rcu_rdp_is_offloaded(rdp))
    332		return true;
    333	rcu_lockdep_assert_cblist_protected(rdp);
    334	rcu_nocb_bypass_lock(rdp);
    335	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
    336}
    337
    338/*
    339 * If the ->nocb_bypass_lock is immediately available, flush the
    340 * ->nocb_bypass queue into ->cblist.
    341 */
    342static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
    343{
    344	rcu_lockdep_assert_cblist_protected(rdp);
    345	if (!rcu_rdp_is_offloaded(rdp) ||
    346	    !rcu_nocb_bypass_trylock(rdp))
    347		return;
    348	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
    349}
    350
    351/*
    352 * See whether it is appropriate to use the ->nocb_bypass list in order
    353 * to control contention on ->nocb_lock.  A limited number of direct
    354 * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
    355 * is non-empty, further callbacks must be placed into ->nocb_bypass,
    356 * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
    357 * back to direct use of ->cblist.  However, ->nocb_bypass should not be
    358 * used if ->cblist is empty, because otherwise callbacks can be stranded
    359 * on ->nocb_bypass because we cannot count on the current CPU ever again
    360 * invoking call_rcu().  The general rule is that if ->nocb_bypass is
    361 * non-empty, the corresponding no-CBs grace-period kthread must not be
    362 * in an indefinite sleep state.
    363 *
    364 * Finally, it is not permitted to use the bypass during early boot,
    365 * as doing so would confuse the auto-initialization code.  Besides
    366 * which, there is no point in worrying about lock contention while
    367 * there is only one CPU in operation.
    368 */
    369static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
    370				bool *was_alldone, unsigned long flags)
    371{
    372	unsigned long c;
    373	unsigned long cur_gp_seq;
    374	unsigned long j = jiffies;
    375	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
    376
    377	lockdep_assert_irqs_disabled();
    378
    379	// Pure softirq/rcuc based processing: no bypassing, no
    380	// locking.
    381	if (!rcu_rdp_is_offloaded(rdp)) {
    382		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
    383		return false;
    384	}
    385
    386	// In the process of (de-)offloading: no bypassing, but
    387	// locking.
    388	if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
    389		rcu_nocb_lock(rdp);
    390		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
    391		return false; /* Not offloaded, no bypassing. */
    392	}
    393
    394	// Don't use ->nocb_bypass during early boot.
    395	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
    396		rcu_nocb_lock(rdp);
    397		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
    398		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
    399		return false;
    400	}
    401
    402	// If we have advanced to a new jiffy, reset counts to allow
    403	// moving back from ->nocb_bypass to ->cblist.
    404	if (j == rdp->nocb_nobypass_last) {
    405		c = rdp->nocb_nobypass_count + 1;
    406	} else {
    407		WRITE_ONCE(rdp->nocb_nobypass_last, j);
    408		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
    409		if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
    410				 nocb_nobypass_lim_per_jiffy))
    411			c = 0;
    412		else if (c > nocb_nobypass_lim_per_jiffy)
    413			c = nocb_nobypass_lim_per_jiffy;
    414	}
    415	WRITE_ONCE(rdp->nocb_nobypass_count, c);
    416
    417	// If there hasn't yet been all that many ->cblist enqueues
    418	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
    419	// ->nocb_bypass first.
    420	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
    421		rcu_nocb_lock(rdp);
    422		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
    423		if (*was_alldone)
    424			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    425					    TPS("FirstQ"));
    426		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
    427		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
    428		return false; // Caller must enqueue the callback.
    429	}
    430
    431	// If ->nocb_bypass has been used too long or is too full,
    432	// flush ->nocb_bypass to ->cblist.
    433	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
    434	    ncbs >= qhimark) {
    435		rcu_nocb_lock(rdp);
    436		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
    437			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
    438			if (*was_alldone)
    439				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    440						    TPS("FirstQ"));
    441			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
    442			return false; // Caller must enqueue the callback.
    443		}
    444		if (j != rdp->nocb_gp_adv_time &&
    445		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
    446		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
    447			rcu_advance_cbs_nowake(rdp->mynode, rdp);
    448			rdp->nocb_gp_adv_time = j;
    449		}
    450		rcu_nocb_unlock_irqrestore(rdp, flags);
    451		return true; // Callback already enqueued.
    452	}
    453
    454	// We need to use the bypass.
    455	rcu_nocb_wait_contended(rdp);
    456	rcu_nocb_bypass_lock(rdp);
    457	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
    458	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
    459	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
    460	if (!ncbs) {
    461		WRITE_ONCE(rdp->nocb_bypass_first, j);
    462		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
    463	}
    464	rcu_nocb_bypass_unlock(rdp);
    465	smp_mb(); /* Order enqueue before wake. */
    466	if (ncbs) {
    467		local_irq_restore(flags);
    468	} else {
    469		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
    470		rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
    471		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
    472			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    473					    TPS("FirstBQwake"));
    474			__call_rcu_nocb_wake(rdp, true, flags);
    475		} else {
    476			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    477					    TPS("FirstBQnoWake"));
    478			rcu_nocb_unlock_irqrestore(rdp, flags);
    479		}
    480	}
    481	return true; // Callback already enqueued.
    482}
    483
    484/*
    485 * Awaken the no-CBs grace-period kthread if needed, either due to it
    486 * legitimately being asleep or due to overload conditions.
    487 *
    488 * If warranted, also wake up the kthread servicing this CPUs queues.
    489 */
    490static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
    491				 unsigned long flags)
    492				 __releases(rdp->nocb_lock)
    493{
    494	unsigned long cur_gp_seq;
    495	unsigned long j;
    496	long len;
    497	struct task_struct *t;
    498
    499	// If we are being polled or there is no kthread, just leave.
    500	t = READ_ONCE(rdp->nocb_gp_kthread);
    501	if (rcu_nocb_poll || !t) {
    502		rcu_nocb_unlock_irqrestore(rdp, flags);
    503		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    504				    TPS("WakeNotPoll"));
    505		return;
    506	}
    507	// Need to actually to a wakeup.
    508	len = rcu_segcblist_n_cbs(&rdp->cblist);
    509	if (was_alldone) {
    510		rdp->qlen_last_fqs_check = len;
    511		if (!irqs_disabled_flags(flags)) {
    512			/* ... if queue was empty ... */
    513			rcu_nocb_unlock_irqrestore(rdp, flags);
    514			wake_nocb_gp(rdp, false);
    515			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    516					    TPS("WakeEmpty"));
    517		} else {
    518			rcu_nocb_unlock_irqrestore(rdp, flags);
    519			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
    520					   TPS("WakeEmptyIsDeferred"));
    521		}
    522	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
    523		/* ... or if many callbacks queued. */
    524		rdp->qlen_last_fqs_check = len;
    525		j = jiffies;
    526		if (j != rdp->nocb_gp_adv_time &&
    527		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
    528		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
    529			rcu_advance_cbs_nowake(rdp->mynode, rdp);
    530			rdp->nocb_gp_adv_time = j;
    531		}
    532		smp_mb(); /* Enqueue before timer_pending(). */
    533		if ((rdp->nocb_cb_sleep ||
    534		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
    535		    !timer_pending(&rdp->nocb_timer)) {
    536			rcu_nocb_unlock_irqrestore(rdp, flags);
    537			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
    538					   TPS("WakeOvfIsDeferred"));
    539		} else {
    540			rcu_nocb_unlock_irqrestore(rdp, flags);
    541			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
    542		}
    543	} else {
    544		rcu_nocb_unlock_irqrestore(rdp, flags);
    545		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
    546	}
    547}
    548
    549/*
    550 * Check if we ignore this rdp.
    551 *
    552 * We check that without holding the nocb lock but
    553 * we make sure not to miss a freshly offloaded rdp
    554 * with the current ordering:
    555 *
    556 *  rdp_offload_toggle()        nocb_gp_enabled_cb()
    557 * -------------------------   ----------------------------
    558 *    WRITE flags                 LOCK nocb_gp_lock
    559 *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
    560 *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
    561 *    UNLOCK nocb_gp_lock         READ flags
    562 */
    563static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
    564{
    565	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
    566
    567	return rcu_segcblist_test_flags(&rdp->cblist, flags);
    568}
    569
    570static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
    571						     bool *needwake_state)
    572{
    573	struct rcu_segcblist *cblist = &rdp->cblist;
    574
    575	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
    576		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
    577			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
    578			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
    579				*needwake_state = true;
    580		}
    581		return false;
    582	}
    583
    584	/*
    585	 * De-offloading. Clear our flag and notify the de-offload worker.
    586	 * We will ignore this rdp until it ever gets re-offloaded.
    587	 */
    588	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
    589	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
    590	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
    591		*needwake_state = true;
    592	return true;
    593}
    594
    595
    596/*
    597 * No-CBs GP kthreads come here to wait for additional callbacks to show up
    598 * or for grace periods to end.
    599 */
    600static void nocb_gp_wait(struct rcu_data *my_rdp)
    601{
    602	bool bypass = false;
    603	long bypass_ncbs;
    604	int __maybe_unused cpu = my_rdp->cpu;
    605	unsigned long cur_gp_seq;
    606	unsigned long flags;
    607	bool gotcbs = false;
    608	unsigned long j = jiffies;
    609	bool needwait_gp = false; // This prevents actual uninitialized use.
    610	bool needwake;
    611	bool needwake_gp;
    612	struct rcu_data *rdp;
    613	struct rcu_node *rnp;
    614	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
    615	bool wasempty = false;
    616
    617	/*
    618	 * Each pass through the following loop checks for CBs and for the
    619	 * nearest grace period (if any) to wait for next.  The CB kthreads
    620	 * and the global grace-period kthread are awakened if needed.
    621	 */
    622	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
    623	/*
    624	 * An rcu_data structure is removed from the list after its
    625	 * CPU is de-offloaded and added to the list before that CPU is
    626	 * (re-)offloaded.  If the following loop happens to be referencing
    627	 * that rcu_data structure during the time that the corresponding
    628	 * CPU is de-offloaded and then immediately re-offloaded, this
    629	 * loop's rdp pointer will be carried to the end of the list by
    630	 * the resulting pair of list operations.  This can cause the loop
    631	 * to skip over some of the rcu_data structures that were supposed
    632	 * to have been scanned.  Fortunately a new iteration through the
    633	 * entire loop is forced after a given CPU's rcu_data structure
    634	 * is added to the list, so the skipped-over rcu_data structures
    635	 * won't be ignored for long.
    636	 */
    637	list_for_each_entry_rcu(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp, 1) {
    638		bool needwake_state = false;
    639
    640		if (!nocb_gp_enabled_cb(rdp))
    641			continue;
    642		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
    643		rcu_nocb_lock_irqsave(rdp, flags);
    644		if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
    645			rcu_nocb_unlock_irqrestore(rdp, flags);
    646			if (needwake_state)
    647				swake_up_one(&rdp->nocb_state_wq);
    648			continue;
    649		}
    650		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
    651		if (bypass_ncbs &&
    652		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
    653		     bypass_ncbs > 2 * qhimark)) {
    654			// Bypass full or old, so flush it.
    655			(void)rcu_nocb_try_flush_bypass(rdp, j);
    656			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
    657		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
    658			rcu_nocb_unlock_irqrestore(rdp, flags);
    659			if (needwake_state)
    660				swake_up_one(&rdp->nocb_state_wq);
    661			continue; /* No callbacks here, try next. */
    662		}
    663		if (bypass_ncbs) {
    664			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    665					    TPS("Bypass"));
    666			bypass = true;
    667		}
    668		rnp = rdp->mynode;
    669
    670		// Advance callbacks if helpful and low contention.
    671		needwake_gp = false;
    672		if (!rcu_segcblist_restempty(&rdp->cblist,
    673					     RCU_NEXT_READY_TAIL) ||
    674		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
    675		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
    676			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
    677			needwake_gp = rcu_advance_cbs(rnp, rdp);
    678			wasempty = rcu_segcblist_restempty(&rdp->cblist,
    679							   RCU_NEXT_READY_TAIL);
    680			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
    681		}
    682		// Need to wait on some grace period?
    683		WARN_ON_ONCE(wasempty &&
    684			     !rcu_segcblist_restempty(&rdp->cblist,
    685						      RCU_NEXT_READY_TAIL));
    686		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
    687			if (!needwait_gp ||
    688			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
    689				wait_gp_seq = cur_gp_seq;
    690			needwait_gp = true;
    691			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
    692					    TPS("NeedWaitGP"));
    693		}
    694		if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
    695			needwake = rdp->nocb_cb_sleep;
    696			WRITE_ONCE(rdp->nocb_cb_sleep, false);
    697			smp_mb(); /* CB invocation -after- GP end. */
    698		} else {
    699			needwake = false;
    700		}
    701		rcu_nocb_unlock_irqrestore(rdp, flags);
    702		if (needwake) {
    703			swake_up_one(&rdp->nocb_cb_wq);
    704			gotcbs = true;
    705		}
    706		if (needwake_gp)
    707			rcu_gp_kthread_wake();
    708		if (needwake_state)
    709			swake_up_one(&rdp->nocb_state_wq);
    710	}
    711
    712	my_rdp->nocb_gp_bypass = bypass;
    713	my_rdp->nocb_gp_gp = needwait_gp;
    714	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
    715
    716	if (bypass && !rcu_nocb_poll) {
    717		// At least one child with non-empty ->nocb_bypass, so set
    718		// timer in order to avoid stranding its callbacks.
    719		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
    720				   TPS("WakeBypassIsDeferred"));
    721	}
    722	if (rcu_nocb_poll) {
    723		/* Polling, so trace if first poll in the series. */
    724		if (gotcbs)
    725			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
    726		schedule_timeout_idle(1);
    727	} else if (!needwait_gp) {
    728		/* Wait for callbacks to appear. */
    729		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
    730		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
    731				!READ_ONCE(my_rdp->nocb_gp_sleep));
    732		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
    733	} else {
    734		rnp = my_rdp->mynode;
    735		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
    736		swait_event_interruptible_exclusive(
    737			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
    738			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
    739			!READ_ONCE(my_rdp->nocb_gp_sleep));
    740		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
    741	}
    742	if (!rcu_nocb_poll) {
    743		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
    744		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
    745			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
    746			del_timer(&my_rdp->nocb_timer);
    747		}
    748		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
    749		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
    750	}
    751	my_rdp->nocb_gp_seq = -1;
    752	WARN_ON(signal_pending(current));
    753}
    754
    755/*
    756 * No-CBs grace-period-wait kthread.  There is one of these per group
    757 * of CPUs, but only once at least one CPU in that group has come online
    758 * at least once since boot.  This kthread checks for newly posted
    759 * callbacks from any of the CPUs it is responsible for, waits for a
    760 * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
    761 * that then have callback-invocation work to do.
    762 */
    763static int rcu_nocb_gp_kthread(void *arg)
    764{
    765	struct rcu_data *rdp = arg;
    766
    767	for (;;) {
    768		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
    769		nocb_gp_wait(rdp);
    770		cond_resched_tasks_rcu_qs();
    771	}
    772	return 0;
    773}
    774
    775static inline bool nocb_cb_can_run(struct rcu_data *rdp)
    776{
    777	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
    778
    779	return rcu_segcblist_test_flags(&rdp->cblist, flags);
    780}
    781
    782static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
    783{
    784	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
    785}
    786
    787/*
    788 * Invoke any ready callbacks from the corresponding no-CBs CPU,
    789 * then, if there are no more, wait for more to appear.
    790 */
    791static void nocb_cb_wait(struct rcu_data *rdp)
    792{
    793	struct rcu_segcblist *cblist = &rdp->cblist;
    794	unsigned long cur_gp_seq;
    795	unsigned long flags;
    796	bool needwake_state = false;
    797	bool needwake_gp = false;
    798	bool can_sleep = true;
    799	struct rcu_node *rnp = rdp->mynode;
    800
    801	do {
    802		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
    803						    nocb_cb_wait_cond(rdp));
    804
    805		// VVV Ensure CB invocation follows _sleep test.
    806		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
    807			WARN_ON(signal_pending(current));
    808			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
    809		}
    810	} while (!nocb_cb_can_run(rdp));
    811
    812
    813	local_irq_save(flags);
    814	rcu_momentary_dyntick_idle();
    815	local_irq_restore(flags);
    816	/*
    817	 * Disable BH to provide the expected environment.  Also, when
    818	 * transitioning to/from NOCB mode, a self-requeuing callback might
    819	 * be invoked from softirq.  A short grace period could cause both
    820	 * instances of this callback would execute concurrently.
    821	 */
    822	local_bh_disable();
    823	rcu_do_batch(rdp);
    824	local_bh_enable();
    825	lockdep_assert_irqs_enabled();
    826	rcu_nocb_lock_irqsave(rdp, flags);
    827	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
    828	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
    829	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
    830		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
    831		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
    832	}
    833
    834	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
    835		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
    836			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
    837			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
    838				needwake_state = true;
    839		}
    840		if (rcu_segcblist_ready_cbs(cblist))
    841			can_sleep = false;
    842	} else {
    843		/*
    844		 * De-offloading. Clear our flag and notify the de-offload worker.
    845		 * We won't touch the callbacks and keep sleeping until we ever
    846		 * get re-offloaded.
    847		 */
    848		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
    849		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
    850		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
    851			needwake_state = true;
    852	}
    853
    854	WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
    855
    856	if (rdp->nocb_cb_sleep)
    857		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
    858
    859	rcu_nocb_unlock_irqrestore(rdp, flags);
    860	if (needwake_gp)
    861		rcu_gp_kthread_wake();
    862
    863	if (needwake_state)
    864		swake_up_one(&rdp->nocb_state_wq);
    865}
    866
    867/*
    868 * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
    869 * nocb_cb_wait() to do the dirty work.
    870 */
    871static int rcu_nocb_cb_kthread(void *arg)
    872{
    873	struct rcu_data *rdp = arg;
    874
    875	// Each pass through this loop does one callback batch, and,
    876	// if there are no more ready callbacks, waits for them.
    877	for (;;) {
    878		nocb_cb_wait(rdp);
    879		cond_resched_tasks_rcu_qs();
    880	}
    881	return 0;
    882}
    883
    884/* Is a deferred wakeup of rcu_nocb_kthread() required? */
    885static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
    886{
    887	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
    888}
    889
    890/* Do a deferred wakeup of rcu_nocb_kthread(). */
    891static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
    892					   struct rcu_data *rdp, int level,
    893					   unsigned long flags)
    894	__releases(rdp_gp->nocb_gp_lock)
    895{
    896	int ndw;
    897	int ret;
    898
    899	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
    900		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
    901		return false;
    902	}
    903
    904	ndw = rdp_gp->nocb_defer_wakeup;
    905	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
    906	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
    907
    908	return ret;
    909}
    910
    911/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
    912static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
    913{
    914	unsigned long flags;
    915	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
    916
    917	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
    918	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
    919
    920	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
    921	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
    922	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
    923}
    924
    925/*
    926 * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
    927 * This means we do an inexact common-case check.  Note that if
    928 * we miss, ->nocb_timer will eventually clean things up.
    929 */
    930static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
    931{
    932	unsigned long flags;
    933	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
    934
    935	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
    936		return false;
    937
    938	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
    939	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
    940}
    941
    942void rcu_nocb_flush_deferred_wakeup(void)
    943{
    944	do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
    945}
    946EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
    947
    948static int rdp_offload_toggle(struct rcu_data *rdp,
    949			       bool offload, unsigned long flags)
    950	__releases(rdp->nocb_lock)
    951{
    952	struct rcu_segcblist *cblist = &rdp->cblist;
    953	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
    954	bool wake_gp = false;
    955
    956	rcu_segcblist_offload(cblist, offload);
    957
    958	if (rdp->nocb_cb_sleep)
    959		rdp->nocb_cb_sleep = false;
    960	rcu_nocb_unlock_irqrestore(rdp, flags);
    961
    962	/*
    963	 * Ignore former value of nocb_cb_sleep and force wake up as it could
    964	 * have been spuriously set to false already.
    965	 */
    966	swake_up_one(&rdp->nocb_cb_wq);
    967
    968	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
    969	if (rdp_gp->nocb_gp_sleep) {
    970		rdp_gp->nocb_gp_sleep = false;
    971		wake_gp = true;
    972	}
    973	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
    974
    975	if (wake_gp)
    976		wake_up_process(rdp_gp->nocb_gp_kthread);
    977
    978	return 0;
    979}
    980
    981static long rcu_nocb_rdp_deoffload(void *arg)
    982{
    983	struct rcu_data *rdp = arg;
    984	struct rcu_segcblist *cblist = &rdp->cblist;
    985	unsigned long flags;
    986	int ret;
    987
    988	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
    989
    990	pr_info("De-offloading %d\n", rdp->cpu);
    991
    992	rcu_nocb_lock_irqsave(rdp, flags);
    993	/*
    994	 * Flush once and for all now. This suffices because we are
    995	 * running on the target CPU holding ->nocb_lock (thus having
    996	 * interrupts disabled), and because rdp_offload_toggle()
    997	 * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
    998	 * Thus future calls to rcu_segcblist_completely_offloaded() will
    999	 * return false, which means that future calls to rcu_nocb_try_bypass()
   1000	 * will refuse to put anything into the bypass.
   1001	 */
   1002	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
   1003	/*
   1004	 * Start with invoking rcu_core() early. This way if the current thread
   1005	 * happens to preempt an ongoing call to rcu_core() in the middle,
   1006	 * leaving some work dismissed because rcu_core() still thinks the rdp is
   1007	 * completely offloaded, we are guaranteed a nearby future instance of
   1008	 * rcu_core() to catch up.
   1009	 */
   1010	rcu_segcblist_set_flags(cblist, SEGCBLIST_RCU_CORE);
   1011	invoke_rcu_core();
   1012	ret = rdp_offload_toggle(rdp, false, flags);
   1013	swait_event_exclusive(rdp->nocb_state_wq,
   1014			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
   1015							SEGCBLIST_KTHREAD_GP));
   1016	/* Stop nocb_gp_wait() from iterating over this structure. */
   1017	list_del_rcu(&rdp->nocb_entry_rdp);
   1018	/*
   1019	 * Lock one last time to acquire latest callback updates from kthreads
   1020	 * so we can later handle callbacks locally without locking.
   1021	 */
   1022	rcu_nocb_lock_irqsave(rdp, flags);
   1023	/*
   1024	 * Theoretically we could clear SEGCBLIST_LOCKING after the nocb
   1025	 * lock is released but how about being paranoid for once?
   1026	 */
   1027	rcu_segcblist_clear_flags(cblist, SEGCBLIST_LOCKING);
   1028	/*
   1029	 * Without SEGCBLIST_LOCKING, we can't use
   1030	 * rcu_nocb_unlock_irqrestore() anymore.
   1031	 */
   1032	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
   1033
   1034	/* Sanity check */
   1035	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
   1036
   1037
   1038	return ret;
   1039}
   1040
   1041int rcu_nocb_cpu_deoffload(int cpu)
   1042{
   1043	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
   1044	int ret = 0;
   1045
   1046	mutex_lock(&rcu_state.barrier_mutex);
   1047	cpus_read_lock();
   1048	if (rcu_rdp_is_offloaded(rdp)) {
   1049		if (cpu_online(cpu)) {
   1050			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
   1051			if (!ret)
   1052				cpumask_clear_cpu(cpu, rcu_nocb_mask);
   1053		} else {
   1054			pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
   1055			ret = -EINVAL;
   1056		}
   1057	}
   1058	cpus_read_unlock();
   1059	mutex_unlock(&rcu_state.barrier_mutex);
   1060
   1061	return ret;
   1062}
   1063EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
   1064
   1065static long rcu_nocb_rdp_offload(void *arg)
   1066{
   1067	struct rcu_data *rdp = arg;
   1068	struct rcu_segcblist *cblist = &rdp->cblist;
   1069	unsigned long flags;
   1070	int ret;
   1071
   1072	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
   1073	/*
   1074	 * For now we only support re-offload, ie: the rdp must have been
   1075	 * offloaded on boot first.
   1076	 */
   1077	if (!rdp->nocb_gp_rdp)
   1078		return -EINVAL;
   1079
   1080	pr_info("Offloading %d\n", rdp->cpu);
   1081
   1082	/*
   1083	 * Cause future nocb_gp_wait() invocations to iterate over
   1084	 * structure, resetting ->nocb_gp_sleep and waking up the related
   1085	 * "rcuog".  Since nocb_gp_wait() in turn locks ->nocb_gp_lock
   1086	 * before setting ->nocb_gp_sleep again, we are guaranteed to
   1087	 * iterate this newly added structure before "rcuog" goes to
   1088	 * sleep again.
   1089	 */
   1090	list_add_tail_rcu(&rdp->nocb_entry_rdp, &rdp->nocb_gp_rdp->nocb_head_rdp);
   1091
   1092	/*
   1093	 * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING
   1094	 * is set.
   1095	 */
   1096	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
   1097
   1098	/*
   1099	 * We didn't take the nocb lock while working on the
   1100	 * rdp->cblist with SEGCBLIST_LOCKING cleared (pure softirq/rcuc mode).
   1101	 * Every modifications that have been done previously on
   1102	 * rdp->cblist must be visible remotely by the nocb kthreads
   1103	 * upon wake up after reading the cblist flags.
   1104	 *
   1105	 * The layout against nocb_lock enforces that ordering:
   1106	 *
   1107	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
   1108	 * -------------------------   ----------------------------
   1109	 *      WRITE callbacks           rcu_nocb_lock()
   1110	 *      rcu_nocb_lock()           READ flags
   1111	 *      WRITE flags               READ callbacks
   1112	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
   1113	 */
   1114	ret = rdp_offload_toggle(rdp, true, flags);
   1115	swait_event_exclusive(rdp->nocb_state_wq,
   1116			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
   1117			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
   1118
   1119	/*
   1120	 * All kthreads are ready to work, we can finally relieve rcu_core() and
   1121	 * enable nocb bypass.
   1122	 */
   1123	rcu_nocb_lock_irqsave(rdp, flags);
   1124	rcu_segcblist_clear_flags(cblist, SEGCBLIST_RCU_CORE);
   1125	rcu_nocb_unlock_irqrestore(rdp, flags);
   1126
   1127	return ret;
   1128}
   1129
   1130int rcu_nocb_cpu_offload(int cpu)
   1131{
   1132	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
   1133	int ret = 0;
   1134
   1135	mutex_lock(&rcu_state.barrier_mutex);
   1136	cpus_read_lock();
   1137	if (!rcu_rdp_is_offloaded(rdp)) {
   1138		if (cpu_online(cpu)) {
   1139			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
   1140			if (!ret)
   1141				cpumask_set_cpu(cpu, rcu_nocb_mask);
   1142		} else {
   1143			pr_info("NOCB: Can't CB-offload an offline CPU\n");
   1144			ret = -EINVAL;
   1145		}
   1146	}
   1147	cpus_read_unlock();
   1148	mutex_unlock(&rcu_state.barrier_mutex);
   1149
   1150	return ret;
   1151}
   1152EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
   1153
   1154void __init rcu_init_nohz(void)
   1155{
   1156	int cpu;
   1157	bool need_rcu_nocb_mask = false;
   1158	struct rcu_data *rdp;
   1159
   1160#if defined(CONFIG_NO_HZ_FULL)
   1161	if (tick_nohz_full_running && !cpumask_empty(tick_nohz_full_mask))
   1162		need_rcu_nocb_mask = true;
   1163#endif /* #if defined(CONFIG_NO_HZ_FULL) */
   1164
   1165	if (need_rcu_nocb_mask) {
   1166		if (!cpumask_available(rcu_nocb_mask)) {
   1167			if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
   1168				pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
   1169				return;
   1170			}
   1171		}
   1172		rcu_state.nocb_is_setup = true;
   1173	}
   1174
   1175	if (!rcu_state.nocb_is_setup)
   1176		return;
   1177
   1178#if defined(CONFIG_NO_HZ_FULL)
   1179	if (tick_nohz_full_running)
   1180		cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
   1181#endif /* #if defined(CONFIG_NO_HZ_FULL) */
   1182
   1183	if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
   1184		pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
   1185		cpumask_and(rcu_nocb_mask, cpu_possible_mask,
   1186			    rcu_nocb_mask);
   1187	}
   1188	if (cpumask_empty(rcu_nocb_mask))
   1189		pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
   1190	else
   1191		pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
   1192			cpumask_pr_args(rcu_nocb_mask));
   1193	if (rcu_nocb_poll)
   1194		pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
   1195
   1196	for_each_cpu(cpu, rcu_nocb_mask) {
   1197		rdp = per_cpu_ptr(&rcu_data, cpu);
   1198		if (rcu_segcblist_empty(&rdp->cblist))
   1199			rcu_segcblist_init(&rdp->cblist);
   1200		rcu_segcblist_offload(&rdp->cblist, true);
   1201		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB | SEGCBLIST_KTHREAD_GP);
   1202		rcu_segcblist_clear_flags(&rdp->cblist, SEGCBLIST_RCU_CORE);
   1203	}
   1204	rcu_organize_nocb_kthreads();
   1205}
   1206
   1207/* Initialize per-rcu_data variables for no-CBs CPUs. */
   1208static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
   1209{
   1210	init_swait_queue_head(&rdp->nocb_cb_wq);
   1211	init_swait_queue_head(&rdp->nocb_gp_wq);
   1212	init_swait_queue_head(&rdp->nocb_state_wq);
   1213	raw_spin_lock_init(&rdp->nocb_lock);
   1214	raw_spin_lock_init(&rdp->nocb_bypass_lock);
   1215	raw_spin_lock_init(&rdp->nocb_gp_lock);
   1216	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
   1217	rcu_cblist_init(&rdp->nocb_bypass);
   1218	mutex_init(&rdp->nocb_gp_kthread_mutex);
   1219}
   1220
   1221/*
   1222 * If the specified CPU is a no-CBs CPU that does not already have its
   1223 * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
   1224 * for this CPU's group has not yet been created, spawn it as well.
   1225 */
   1226static void rcu_spawn_cpu_nocb_kthread(int cpu)
   1227{
   1228	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
   1229	struct rcu_data *rdp_gp;
   1230	struct task_struct *t;
   1231	struct sched_param sp;
   1232
   1233	if (!rcu_scheduler_fully_active || !rcu_state.nocb_is_setup)
   1234		return;
   1235
   1236	/* If there already is an rcuo kthread, then nothing to do. */
   1237	if (rdp->nocb_cb_kthread)
   1238		return;
   1239
   1240	/* If we didn't spawn the GP kthread first, reorganize! */
   1241	sp.sched_priority = kthread_prio;
   1242	rdp_gp = rdp->nocb_gp_rdp;
   1243	mutex_lock(&rdp_gp->nocb_gp_kthread_mutex);
   1244	if (!rdp_gp->nocb_gp_kthread) {
   1245		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
   1246				"rcuog/%d", rdp_gp->cpu);
   1247		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) {
   1248			mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
   1249			return;
   1250		}
   1251		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
   1252		if (kthread_prio)
   1253			sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
   1254	}
   1255	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
   1256
   1257	/* Spawn the kthread for this CPU. */
   1258	t = kthread_run(rcu_nocb_cb_kthread, rdp,
   1259			"rcuo%c/%d", rcu_state.abbr, cpu);
   1260	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
   1261		return;
   1262
   1263	if (kthread_prio)
   1264		sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
   1265	WRITE_ONCE(rdp->nocb_cb_kthread, t);
   1266	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
   1267}
   1268
   1269/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
   1270static int rcu_nocb_gp_stride = -1;
   1271module_param(rcu_nocb_gp_stride, int, 0444);
   1272
   1273/*
   1274 * Initialize GP-CB relationships for all no-CBs CPU.
   1275 */
   1276static void __init rcu_organize_nocb_kthreads(void)
   1277{
   1278	int cpu;
   1279	bool firsttime = true;
   1280	bool gotnocbs = false;
   1281	bool gotnocbscbs = true;
   1282	int ls = rcu_nocb_gp_stride;
   1283	int nl = 0;  /* Next GP kthread. */
   1284	struct rcu_data *rdp;
   1285	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
   1286
   1287	if (!cpumask_available(rcu_nocb_mask))
   1288		return;
   1289	if (ls == -1) {
   1290		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
   1291		rcu_nocb_gp_stride = ls;
   1292	}
   1293
   1294	/*
   1295	 * Each pass through this loop sets up one rcu_data structure.
   1296	 * Should the corresponding CPU come online in the future, then
   1297	 * we will spawn the needed set of rcu_nocb_kthread() kthreads.
   1298	 */
   1299	for_each_possible_cpu(cpu) {
   1300		rdp = per_cpu_ptr(&rcu_data, cpu);
   1301		if (rdp->cpu >= nl) {
   1302			/* New GP kthread, set up for CBs & next GP. */
   1303			gotnocbs = true;
   1304			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
   1305			rdp_gp = rdp;
   1306			INIT_LIST_HEAD(&rdp->nocb_head_rdp);
   1307			if (dump_tree) {
   1308				if (!firsttime)
   1309					pr_cont("%s\n", gotnocbscbs
   1310							? "" : " (self only)");
   1311				gotnocbscbs = false;
   1312				firsttime = false;
   1313				pr_alert("%s: No-CB GP kthread CPU %d:",
   1314					 __func__, cpu);
   1315			}
   1316		} else {
   1317			/* Another CB kthread, link to previous GP kthread. */
   1318			gotnocbscbs = true;
   1319			if (dump_tree)
   1320				pr_cont(" %d", cpu);
   1321		}
   1322		rdp->nocb_gp_rdp = rdp_gp;
   1323		if (cpumask_test_cpu(cpu, rcu_nocb_mask))
   1324			list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp);
   1325	}
   1326	if (gotnocbs && dump_tree)
   1327		pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
   1328}
   1329
   1330/*
   1331 * Bind the current task to the offloaded CPUs.  If there are no offloaded
   1332 * CPUs, leave the task unbound.  Splat if the bind attempt fails.
   1333 */
   1334void rcu_bind_current_to_nocb(void)
   1335{
   1336	if (cpumask_available(rcu_nocb_mask) && !cpumask_empty(rcu_nocb_mask))
   1337		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
   1338}
   1339EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
   1340
   1341// The ->on_cpu field is available only in CONFIG_SMP=y, so...
   1342#ifdef CONFIG_SMP
   1343static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
   1344{
   1345	return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
   1346}
   1347#else // #ifdef CONFIG_SMP
   1348static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
   1349{
   1350	return "";
   1351}
   1352#endif // #else #ifdef CONFIG_SMP
   1353
   1354/*
   1355 * Dump out nocb grace-period kthread state for the specified rcu_data
   1356 * structure.
   1357 */
   1358static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
   1359{
   1360	struct rcu_node *rnp = rdp->mynode;
   1361
   1362	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
   1363		rdp->cpu,
   1364		"kK"[!!rdp->nocb_gp_kthread],
   1365		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
   1366		"dD"[!!rdp->nocb_defer_wakeup],
   1367		"tT"[timer_pending(&rdp->nocb_timer)],
   1368		"sS"[!!rdp->nocb_gp_sleep],
   1369		".W"[swait_active(&rdp->nocb_gp_wq)],
   1370		".W"[swait_active(&rnp->nocb_gp_wq[0])],
   1371		".W"[swait_active(&rnp->nocb_gp_wq[1])],
   1372		".B"[!!rdp->nocb_gp_bypass],
   1373		".G"[!!rdp->nocb_gp_gp],
   1374		(long)rdp->nocb_gp_seq,
   1375		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
   1376		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
   1377		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
   1378		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
   1379}
   1380
   1381/* Dump out nocb kthread state for the specified rcu_data structure. */
   1382static void show_rcu_nocb_state(struct rcu_data *rdp)
   1383{
   1384	char bufw[20];
   1385	char bufr[20];
   1386	struct rcu_data *nocb_next_rdp;
   1387	struct rcu_segcblist *rsclp = &rdp->cblist;
   1388	bool waslocked;
   1389	bool wassleep;
   1390
   1391	if (rdp->nocb_gp_rdp == rdp)
   1392		show_rcu_nocb_gp_state(rdp);
   1393
   1394	nocb_next_rdp = list_next_or_null_rcu(&rdp->nocb_gp_rdp->nocb_head_rdp,
   1395					      &rdp->nocb_entry_rdp,
   1396					      typeof(*rdp),
   1397					      nocb_entry_rdp);
   1398
   1399	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
   1400	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
   1401	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
   1402		rdp->cpu, rdp->nocb_gp_rdp->cpu,
   1403		nocb_next_rdp ? nocb_next_rdp->cpu : -1,
   1404		"kK"[!!rdp->nocb_cb_kthread],
   1405		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
   1406		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
   1407		"lL"[raw_spin_is_locked(&rdp->nocb_lock)],
   1408		"sS"[!!rdp->nocb_cb_sleep],
   1409		".W"[swait_active(&rdp->nocb_cb_wq)],
   1410		jiffies - rdp->nocb_bypass_first,
   1411		jiffies - rdp->nocb_nobypass_last,
   1412		rdp->nocb_nobypass_count,
   1413		".D"[rcu_segcblist_ready_cbs(rsclp)],
   1414		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
   1415		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
   1416		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
   1417		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
   1418		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
   1419		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
   1420		rcu_segcblist_n_cbs(&rdp->cblist),
   1421		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
   1422		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
   1423		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
   1424
   1425	/* It is OK for GP kthreads to have GP state. */
   1426	if (rdp->nocb_gp_rdp == rdp)
   1427		return;
   1428
   1429	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
   1430	wassleep = swait_active(&rdp->nocb_gp_wq);
   1431	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
   1432		return;  /* Nothing untoward. */
   1433
   1434	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
   1435		"lL"[waslocked],
   1436		"dD"[!!rdp->nocb_defer_wakeup],
   1437		"sS"[!!rdp->nocb_gp_sleep],
   1438		".W"[wassleep]);
   1439}
   1440
   1441#else /* #ifdef CONFIG_RCU_NOCB_CPU */
   1442
   1443static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
   1444{
   1445	return 0;
   1446}
   1447
   1448static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
   1449{
   1450	return false;
   1451}
   1452
   1453/* No ->nocb_lock to acquire.  */
   1454static void rcu_nocb_lock(struct rcu_data *rdp)
   1455{
   1456}
   1457
   1458/* No ->nocb_lock to release.  */
   1459static void rcu_nocb_unlock(struct rcu_data *rdp)
   1460{
   1461}
   1462
   1463/* No ->nocb_lock to release.  */
   1464static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
   1465				       unsigned long flags)
   1466{
   1467	local_irq_restore(flags);
   1468}
   1469
   1470/* Lockdep check that ->cblist may be safely accessed. */
   1471static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
   1472{
   1473	lockdep_assert_irqs_disabled();
   1474}
   1475
   1476static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
   1477{
   1478}
   1479
   1480static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
   1481{
   1482	return NULL;
   1483}
   1484
   1485static void rcu_init_one_nocb(struct rcu_node *rnp)
   1486{
   1487}
   1488
   1489static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
   1490				  unsigned long j)
   1491{
   1492	return true;
   1493}
   1494
   1495static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
   1496				bool *was_alldone, unsigned long flags)
   1497{
   1498	return false;
   1499}
   1500
   1501static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
   1502				 unsigned long flags)
   1503{
   1504	WARN_ON_ONCE(1);  /* Should be dead code! */
   1505}
   1506
   1507static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
   1508{
   1509}
   1510
   1511static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
   1512{
   1513	return false;
   1514}
   1515
   1516static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
   1517{
   1518	return false;
   1519}
   1520
   1521static void rcu_spawn_cpu_nocb_kthread(int cpu)
   1522{
   1523}
   1524
   1525static void show_rcu_nocb_state(struct rcu_data *rdp)
   1526{
   1527}
   1528
   1529#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */