cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

recover.c (23390B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/******************************************************************************
      3*******************************************************************************
      4**
      5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
      6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
      7**
      8**
      9*******************************************************************************
     10******************************************************************************/
     11
     12#include "dlm_internal.h"
     13#include "lockspace.h"
     14#include "dir.h"
     15#include "config.h"
     16#include "ast.h"
     17#include "memory.h"
     18#include "rcom.h"
     19#include "lock.h"
     20#include "lowcomms.h"
     21#include "member.h"
     22#include "recover.h"
     23
     24
     25/*
     26 * Recovery waiting routines: these functions wait for a particular reply from
     27 * a remote node, or for the remote node to report a certain status.  They need
     28 * to abort if the lockspace is stopped indicating a node has failed (perhaps
     29 * the one being waited for).
     30 */
     31
     32/*
     33 * Wait until given function returns non-zero or lockspace is stopped
     34 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
     35 * function thinks it could have completed the waited-on task, they should wake
     36 * up ls_wait_general to get an immediate response rather than waiting for the
     37 * timeout.  This uses a timeout so it can check periodically if the wait
     38 * should abort due to node failure (which doesn't cause a wake_up).
     39 * This should only be called by the dlm_recoverd thread.
     40 */
     41
     42int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
     43{
     44	int error = 0;
     45	int rv;
     46
     47	while (1) {
     48		rv = wait_event_timeout(ls->ls_wait_general,
     49					testfn(ls) || dlm_recovery_stopped(ls),
     50					dlm_config.ci_recover_timer * HZ);
     51		if (rv)
     52			break;
     53		if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
     54			log_debug(ls, "dlm_wait_function timed out");
     55			return -ETIMEDOUT;
     56		}
     57	}
     58
     59	if (dlm_recovery_stopped(ls)) {
     60		log_debug(ls, "dlm_wait_function aborted");
     61		error = -EINTR;
     62	}
     63	return error;
     64}
     65
     66/*
     67 * An efficient way for all nodes to wait for all others to have a certain
     68 * status.  The node with the lowest nodeid polls all the others for their
     69 * status (wait_status_all) and all the others poll the node with the low id
     70 * for its accumulated result (wait_status_low).  When all nodes have set
     71 * status flag X, then status flag X_ALL will be set on the low nodeid.
     72 */
     73
     74uint32_t dlm_recover_status(struct dlm_ls *ls)
     75{
     76	uint32_t status;
     77	spin_lock(&ls->ls_recover_lock);
     78	status = ls->ls_recover_status;
     79	spin_unlock(&ls->ls_recover_lock);
     80	return status;
     81}
     82
     83static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
     84{
     85	ls->ls_recover_status |= status;
     86}
     87
     88void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
     89{
     90	spin_lock(&ls->ls_recover_lock);
     91	_set_recover_status(ls, status);
     92	spin_unlock(&ls->ls_recover_lock);
     93}
     94
     95static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
     96			   int save_slots)
     97{
     98	struct dlm_rcom *rc = ls->ls_recover_buf;
     99	struct dlm_member *memb;
    100	int error = 0, delay;
    101
    102	list_for_each_entry(memb, &ls->ls_nodes, list) {
    103		delay = 0;
    104		for (;;) {
    105			if (dlm_recovery_stopped(ls)) {
    106				error = -EINTR;
    107				goto out;
    108			}
    109
    110			error = dlm_rcom_status(ls, memb->nodeid, 0);
    111			if (error)
    112				goto out;
    113
    114			if (save_slots)
    115				dlm_slot_save(ls, rc, memb);
    116
    117			if (le32_to_cpu(rc->rc_result) & wait_status)
    118				break;
    119			if (delay < 1000)
    120				delay += 20;
    121			msleep(delay);
    122		}
    123	}
    124 out:
    125	return error;
    126}
    127
    128static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
    129			   uint32_t status_flags)
    130{
    131	struct dlm_rcom *rc = ls->ls_recover_buf;
    132	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
    133
    134	for (;;) {
    135		if (dlm_recovery_stopped(ls)) {
    136			error = -EINTR;
    137			goto out;
    138		}
    139
    140		error = dlm_rcom_status(ls, nodeid, status_flags);
    141		if (error)
    142			break;
    143
    144		if (le32_to_cpu(rc->rc_result) & wait_status)
    145			break;
    146		if (delay < 1000)
    147			delay += 20;
    148		msleep(delay);
    149	}
    150 out:
    151	return error;
    152}
    153
    154static int wait_status(struct dlm_ls *ls, uint32_t status)
    155{
    156	uint32_t status_all = status << 1;
    157	int error;
    158
    159	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
    160		error = wait_status_all(ls, status, 0);
    161		if (!error)
    162			dlm_set_recover_status(ls, status_all);
    163	} else
    164		error = wait_status_low(ls, status_all, 0);
    165
    166	return error;
    167}
    168
    169int dlm_recover_members_wait(struct dlm_ls *ls)
    170{
    171	struct dlm_member *memb;
    172	struct dlm_slot *slots;
    173	int num_slots, slots_size;
    174	int error, rv;
    175	uint32_t gen;
    176
    177	list_for_each_entry(memb, &ls->ls_nodes, list) {
    178		memb->slot = -1;
    179		memb->generation = 0;
    180	}
    181
    182	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
    183		error = wait_status_all(ls, DLM_RS_NODES, 1);
    184		if (error)
    185			goto out;
    186
    187		/* slots array is sparse, slots_size may be > num_slots */
    188
    189		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
    190		if (!rv) {
    191			spin_lock(&ls->ls_recover_lock);
    192			_set_recover_status(ls, DLM_RS_NODES_ALL);
    193			ls->ls_num_slots = num_slots;
    194			ls->ls_slots_size = slots_size;
    195			ls->ls_slots = slots;
    196			ls->ls_generation = gen;
    197			spin_unlock(&ls->ls_recover_lock);
    198		} else {
    199			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
    200		}
    201	} else {
    202		error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
    203		if (error)
    204			goto out;
    205
    206		dlm_slots_copy_in(ls);
    207	}
    208 out:
    209	return error;
    210}
    211
    212int dlm_recover_directory_wait(struct dlm_ls *ls)
    213{
    214	return wait_status(ls, DLM_RS_DIR);
    215}
    216
    217int dlm_recover_locks_wait(struct dlm_ls *ls)
    218{
    219	return wait_status(ls, DLM_RS_LOCKS);
    220}
    221
    222int dlm_recover_done_wait(struct dlm_ls *ls)
    223{
    224	return wait_status(ls, DLM_RS_DONE);
    225}
    226
    227/*
    228 * The recover_list contains all the rsb's for which we've requested the new
    229 * master nodeid.  As replies are returned from the resource directories the
    230 * rsb's are removed from the list.  When the list is empty we're done.
    231 *
    232 * The recover_list is later similarly used for all rsb's for which we've sent
    233 * new lkb's and need to receive new corresponding lkid's.
    234 *
    235 * We use the address of the rsb struct as a simple local identifier for the
    236 * rsb so we can match an rcom reply with the rsb it was sent for.
    237 */
    238
    239static int recover_list_empty(struct dlm_ls *ls)
    240{
    241	int empty;
    242
    243	spin_lock(&ls->ls_recover_list_lock);
    244	empty = list_empty(&ls->ls_recover_list);
    245	spin_unlock(&ls->ls_recover_list_lock);
    246
    247	return empty;
    248}
    249
    250static void recover_list_add(struct dlm_rsb *r)
    251{
    252	struct dlm_ls *ls = r->res_ls;
    253
    254	spin_lock(&ls->ls_recover_list_lock);
    255	if (list_empty(&r->res_recover_list)) {
    256		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
    257		ls->ls_recover_list_count++;
    258		dlm_hold_rsb(r);
    259	}
    260	spin_unlock(&ls->ls_recover_list_lock);
    261}
    262
    263static void recover_list_del(struct dlm_rsb *r)
    264{
    265	struct dlm_ls *ls = r->res_ls;
    266
    267	spin_lock(&ls->ls_recover_list_lock);
    268	list_del_init(&r->res_recover_list);
    269	ls->ls_recover_list_count--;
    270	spin_unlock(&ls->ls_recover_list_lock);
    271
    272	dlm_put_rsb(r);
    273}
    274
    275static void recover_list_clear(struct dlm_ls *ls)
    276{
    277	struct dlm_rsb *r, *s;
    278
    279	spin_lock(&ls->ls_recover_list_lock);
    280	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
    281		list_del_init(&r->res_recover_list);
    282		r->res_recover_locks_count = 0;
    283		dlm_put_rsb(r);
    284		ls->ls_recover_list_count--;
    285	}
    286
    287	if (ls->ls_recover_list_count != 0) {
    288		log_error(ls, "warning: recover_list_count %d",
    289			  ls->ls_recover_list_count);
    290		ls->ls_recover_list_count = 0;
    291	}
    292	spin_unlock(&ls->ls_recover_list_lock);
    293}
    294
    295static int recover_idr_empty(struct dlm_ls *ls)
    296{
    297	int empty = 1;
    298
    299	spin_lock(&ls->ls_recover_idr_lock);
    300	if (ls->ls_recover_list_count)
    301		empty = 0;
    302	spin_unlock(&ls->ls_recover_idr_lock);
    303
    304	return empty;
    305}
    306
    307static int recover_idr_add(struct dlm_rsb *r)
    308{
    309	struct dlm_ls *ls = r->res_ls;
    310	int rv;
    311
    312	idr_preload(GFP_NOFS);
    313	spin_lock(&ls->ls_recover_idr_lock);
    314	if (r->res_id) {
    315		rv = -1;
    316		goto out_unlock;
    317	}
    318	rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
    319	if (rv < 0)
    320		goto out_unlock;
    321
    322	r->res_id = rv;
    323	ls->ls_recover_list_count++;
    324	dlm_hold_rsb(r);
    325	rv = 0;
    326out_unlock:
    327	spin_unlock(&ls->ls_recover_idr_lock);
    328	idr_preload_end();
    329	return rv;
    330}
    331
    332static void recover_idr_del(struct dlm_rsb *r)
    333{
    334	struct dlm_ls *ls = r->res_ls;
    335
    336	spin_lock(&ls->ls_recover_idr_lock);
    337	idr_remove(&ls->ls_recover_idr, r->res_id);
    338	r->res_id = 0;
    339	ls->ls_recover_list_count--;
    340	spin_unlock(&ls->ls_recover_idr_lock);
    341
    342	dlm_put_rsb(r);
    343}
    344
    345static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
    346{
    347	struct dlm_rsb *r;
    348
    349	spin_lock(&ls->ls_recover_idr_lock);
    350	r = idr_find(&ls->ls_recover_idr, (int)id);
    351	spin_unlock(&ls->ls_recover_idr_lock);
    352	return r;
    353}
    354
    355static void recover_idr_clear(struct dlm_ls *ls)
    356{
    357	struct dlm_rsb *r;
    358	int id;
    359
    360	spin_lock(&ls->ls_recover_idr_lock);
    361
    362	idr_for_each_entry(&ls->ls_recover_idr, r, id) {
    363		idr_remove(&ls->ls_recover_idr, id);
    364		r->res_id = 0;
    365		r->res_recover_locks_count = 0;
    366		ls->ls_recover_list_count--;
    367
    368		dlm_put_rsb(r);
    369	}
    370
    371	if (ls->ls_recover_list_count != 0) {
    372		log_error(ls, "warning: recover_list_count %d",
    373			  ls->ls_recover_list_count);
    374		ls->ls_recover_list_count = 0;
    375	}
    376	spin_unlock(&ls->ls_recover_idr_lock);
    377}
    378
    379
    380/* Master recovery: find new master node for rsb's that were
    381   mastered on nodes that have been removed.
    382
    383   dlm_recover_masters
    384   recover_master
    385   dlm_send_rcom_lookup            ->  receive_rcom_lookup
    386                                       dlm_dir_lookup
    387   receive_rcom_lookup_reply       <-
    388   dlm_recover_master_reply
    389   set_new_master
    390   set_master_lkbs
    391   set_lock_master
    392*/
    393
    394/*
    395 * Set the lock master for all LKBs in a lock queue
    396 * If we are the new master of the rsb, we may have received new
    397 * MSTCPY locks from other nodes already which we need to ignore
    398 * when setting the new nodeid.
    399 */
    400
    401static void set_lock_master(struct list_head *queue, int nodeid)
    402{
    403	struct dlm_lkb *lkb;
    404
    405	list_for_each_entry(lkb, queue, lkb_statequeue) {
    406		if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
    407			lkb->lkb_nodeid = nodeid;
    408			lkb->lkb_remid = 0;
    409		}
    410	}
    411}
    412
    413static void set_master_lkbs(struct dlm_rsb *r)
    414{
    415	set_lock_master(&r->res_grantqueue, r->res_nodeid);
    416	set_lock_master(&r->res_convertqueue, r->res_nodeid);
    417	set_lock_master(&r->res_waitqueue, r->res_nodeid);
    418}
    419
    420/*
    421 * Propagate the new master nodeid to locks
    422 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
    423 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
    424 * rsb's to consider.
    425 */
    426
    427static void set_new_master(struct dlm_rsb *r)
    428{
    429	set_master_lkbs(r);
    430	rsb_set_flag(r, RSB_NEW_MASTER);
    431	rsb_set_flag(r, RSB_NEW_MASTER2);
    432}
    433
    434/*
    435 * We do async lookups on rsb's that need new masters.  The rsb's
    436 * waiting for a lookup reply are kept on the recover_list.
    437 *
    438 * Another node recovering the master may have sent us a rcom lookup,
    439 * and our dlm_master_lookup() set it as the new master, along with
    440 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
    441 * equals our_nodeid below).
    442 */
    443
    444static int recover_master(struct dlm_rsb *r, unsigned int *count)
    445{
    446	struct dlm_ls *ls = r->res_ls;
    447	int our_nodeid, dir_nodeid;
    448	int is_removed = 0;
    449	int error;
    450
    451	if (is_master(r))
    452		return 0;
    453
    454	is_removed = dlm_is_removed(ls, r->res_nodeid);
    455
    456	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
    457		return 0;
    458
    459	our_nodeid = dlm_our_nodeid();
    460	dir_nodeid = dlm_dir_nodeid(r);
    461
    462	if (dir_nodeid == our_nodeid) {
    463		if (is_removed) {
    464			r->res_master_nodeid = our_nodeid;
    465			r->res_nodeid = 0;
    466		}
    467
    468		/* set master of lkbs to ourself when is_removed, or to
    469		   another new master which we set along with NEW_MASTER
    470		   in dlm_master_lookup */
    471		set_new_master(r);
    472		error = 0;
    473	} else {
    474		recover_idr_add(r);
    475		error = dlm_send_rcom_lookup(r, dir_nodeid);
    476	}
    477
    478	(*count)++;
    479	return error;
    480}
    481
    482/*
    483 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
    484 * This is necessary because recovery can be started, aborted and restarted,
    485 * causing the master nodeid to briefly change during the aborted recovery, and
    486 * change back to the original value in the second recovery.  The MSTCPY locks
    487 * may or may not have been purged during the aborted recovery.  Another node
    488 * with an outstanding request in waiters list and a request reply saved in the
    489 * requestqueue, cannot know whether it should ignore the reply and resend the
    490 * request, or accept the reply and complete the request.  It must do the
    491 * former if the remote node purged MSTCPY locks, and it must do the later if
    492 * the remote node did not.  This is solved by always purging MSTCPY locks, in
    493 * which case, the request reply would always be ignored and the request
    494 * resent.
    495 */
    496
    497static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
    498{
    499	int dir_nodeid = dlm_dir_nodeid(r);
    500	int new_master = dir_nodeid;
    501
    502	if (dir_nodeid == dlm_our_nodeid())
    503		new_master = 0;
    504
    505	dlm_purge_mstcpy_locks(r);
    506	r->res_master_nodeid = dir_nodeid;
    507	r->res_nodeid = new_master;
    508	set_new_master(r);
    509	(*count)++;
    510	return 0;
    511}
    512
    513/*
    514 * Go through local root resources and for each rsb which has a master which
    515 * has departed, get the new master nodeid from the directory.  The dir will
    516 * assign mastery to the first node to look up the new master.  That means
    517 * we'll discover in this lookup if we're the new master of any rsb's.
    518 *
    519 * We fire off all the dir lookup requests individually and asynchronously to
    520 * the correct dir node.
    521 */
    522
    523int dlm_recover_masters(struct dlm_ls *ls)
    524{
    525	struct dlm_rsb *r;
    526	unsigned int total = 0;
    527	unsigned int count = 0;
    528	int nodir = dlm_no_directory(ls);
    529	int error;
    530
    531	log_rinfo(ls, "dlm_recover_masters");
    532
    533	down_read(&ls->ls_root_sem);
    534	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
    535		if (dlm_recovery_stopped(ls)) {
    536			up_read(&ls->ls_root_sem);
    537			error = -EINTR;
    538			goto out;
    539		}
    540
    541		lock_rsb(r);
    542		if (nodir)
    543			error = recover_master_static(r, &count);
    544		else
    545			error = recover_master(r, &count);
    546		unlock_rsb(r);
    547		cond_resched();
    548		total++;
    549
    550		if (error) {
    551			up_read(&ls->ls_root_sem);
    552			goto out;
    553		}
    554	}
    555	up_read(&ls->ls_root_sem);
    556
    557	log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
    558
    559	error = dlm_wait_function(ls, &recover_idr_empty);
    560 out:
    561	if (error)
    562		recover_idr_clear(ls);
    563	return error;
    564}
    565
    566int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
    567{
    568	struct dlm_rsb *r;
    569	int ret_nodeid, new_master;
    570
    571	r = recover_idr_find(ls, le64_to_cpu(rc->rc_id));
    572	if (!r) {
    573		log_error(ls, "dlm_recover_master_reply no id %llx",
    574			  (unsigned long long)le64_to_cpu(rc->rc_id));
    575		goto out;
    576	}
    577
    578	ret_nodeid = le32_to_cpu(rc->rc_result);
    579
    580	if (ret_nodeid == dlm_our_nodeid())
    581		new_master = 0;
    582	else
    583		new_master = ret_nodeid;
    584
    585	lock_rsb(r);
    586	r->res_master_nodeid = ret_nodeid;
    587	r->res_nodeid = new_master;
    588	set_new_master(r);
    589	unlock_rsb(r);
    590	recover_idr_del(r);
    591
    592	if (recover_idr_empty(ls))
    593		wake_up(&ls->ls_wait_general);
    594 out:
    595	return 0;
    596}
    597
    598
    599/* Lock recovery: rebuild the process-copy locks we hold on a
    600   remastered rsb on the new rsb master.
    601
    602   dlm_recover_locks
    603   recover_locks
    604   recover_locks_queue
    605   dlm_send_rcom_lock              ->  receive_rcom_lock
    606                                       dlm_recover_master_copy
    607   receive_rcom_lock_reply         <-
    608   dlm_recover_process_copy
    609*/
    610
    611
    612/*
    613 * keep a count of the number of lkb's we send to the new master; when we get
    614 * an equal number of replies then recovery for the rsb is done
    615 */
    616
    617static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
    618{
    619	struct dlm_lkb *lkb;
    620	int error = 0;
    621
    622	list_for_each_entry(lkb, head, lkb_statequeue) {
    623	   	error = dlm_send_rcom_lock(r, lkb);
    624		if (error)
    625			break;
    626		r->res_recover_locks_count++;
    627	}
    628
    629	return error;
    630}
    631
    632static int recover_locks(struct dlm_rsb *r)
    633{
    634	int error = 0;
    635
    636	lock_rsb(r);
    637
    638	DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
    639
    640	error = recover_locks_queue(r, &r->res_grantqueue);
    641	if (error)
    642		goto out;
    643	error = recover_locks_queue(r, &r->res_convertqueue);
    644	if (error)
    645		goto out;
    646	error = recover_locks_queue(r, &r->res_waitqueue);
    647	if (error)
    648		goto out;
    649
    650	if (r->res_recover_locks_count)
    651		recover_list_add(r);
    652	else
    653		rsb_clear_flag(r, RSB_NEW_MASTER);
    654 out:
    655	unlock_rsb(r);
    656	return error;
    657}
    658
    659int dlm_recover_locks(struct dlm_ls *ls)
    660{
    661	struct dlm_rsb *r;
    662	int error, count = 0;
    663
    664	down_read(&ls->ls_root_sem);
    665	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
    666		if (is_master(r)) {
    667			rsb_clear_flag(r, RSB_NEW_MASTER);
    668			continue;
    669		}
    670
    671		if (!rsb_flag(r, RSB_NEW_MASTER))
    672			continue;
    673
    674		if (dlm_recovery_stopped(ls)) {
    675			error = -EINTR;
    676			up_read(&ls->ls_root_sem);
    677			goto out;
    678		}
    679
    680		error = recover_locks(r);
    681		if (error) {
    682			up_read(&ls->ls_root_sem);
    683			goto out;
    684		}
    685
    686		count += r->res_recover_locks_count;
    687	}
    688	up_read(&ls->ls_root_sem);
    689
    690	log_rinfo(ls, "dlm_recover_locks %d out", count);
    691
    692	error = dlm_wait_function(ls, &recover_list_empty);
    693 out:
    694	if (error)
    695		recover_list_clear(ls);
    696	return error;
    697}
    698
    699void dlm_recovered_lock(struct dlm_rsb *r)
    700{
    701	DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
    702
    703	r->res_recover_locks_count--;
    704	if (!r->res_recover_locks_count) {
    705		rsb_clear_flag(r, RSB_NEW_MASTER);
    706		recover_list_del(r);
    707	}
    708
    709	if (recover_list_empty(r->res_ls))
    710		wake_up(&r->res_ls->ls_wait_general);
    711}
    712
    713/*
    714 * The lvb needs to be recovered on all master rsb's.  This includes setting
    715 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
    716 * based on the lvb's of the locks held on the rsb.
    717 *
    718 * RSB_VALNOTVALID is set in two cases:
    719 *
    720 * 1. we are master, but not new, and we purged an EX/PW lock held by a
    721 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
    722 *
    723 * 2. we are a new master, and there are only NL/CR locks left.
    724 * (We could probably improve this by only invaliding in this way when
    725 * the previous master left uncleanly.  VMS docs mention that.)
    726 *
    727 * The LVB contents are only considered for changing when this is a new master
    728 * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
    729 * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
    730 * from the lkb with the largest lvb sequence number.
    731 */
    732
    733static void recover_lvb(struct dlm_rsb *r)
    734{
    735	struct dlm_lkb *big_lkb = NULL, *iter, *high_lkb = NULL;
    736	uint32_t high_seq = 0;
    737	int lock_lvb_exists = 0;
    738	int lvblen = r->res_ls->ls_lvblen;
    739
    740	if (!rsb_flag(r, RSB_NEW_MASTER2) &&
    741	    rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
    742		/* case 1 above */
    743		rsb_set_flag(r, RSB_VALNOTVALID);
    744		return;
    745	}
    746
    747	if (!rsb_flag(r, RSB_NEW_MASTER2))
    748		return;
    749
    750	/* we are the new master, so figure out if VALNOTVALID should
    751	   be set, and set the rsb lvb from the best lkb available. */
    752
    753	list_for_each_entry(iter, &r->res_grantqueue, lkb_statequeue) {
    754		if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
    755			continue;
    756
    757		lock_lvb_exists = 1;
    758
    759		if (iter->lkb_grmode > DLM_LOCK_CR) {
    760			big_lkb = iter;
    761			goto setflag;
    762		}
    763
    764		if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
    765			high_lkb = iter;
    766			high_seq = iter->lkb_lvbseq;
    767		}
    768	}
    769
    770	list_for_each_entry(iter, &r->res_convertqueue, lkb_statequeue) {
    771		if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
    772			continue;
    773
    774		lock_lvb_exists = 1;
    775
    776		if (iter->lkb_grmode > DLM_LOCK_CR) {
    777			big_lkb = iter;
    778			goto setflag;
    779		}
    780
    781		if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
    782			high_lkb = iter;
    783			high_seq = iter->lkb_lvbseq;
    784		}
    785	}
    786
    787 setflag:
    788	if (!lock_lvb_exists)
    789		goto out;
    790
    791	/* lvb is invalidated if only NL/CR locks remain */
    792	if (!big_lkb)
    793		rsb_set_flag(r, RSB_VALNOTVALID);
    794
    795	if (!r->res_lvbptr) {
    796		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
    797		if (!r->res_lvbptr)
    798			goto out;
    799	}
    800
    801	if (big_lkb) {
    802		r->res_lvbseq = big_lkb->lkb_lvbseq;
    803		memcpy(r->res_lvbptr, big_lkb->lkb_lvbptr, lvblen);
    804	} else if (high_lkb) {
    805		r->res_lvbseq = high_lkb->lkb_lvbseq;
    806		memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
    807	} else {
    808		r->res_lvbseq = 0;
    809		memset(r->res_lvbptr, 0, lvblen);
    810	}
    811 out:
    812	return;
    813}
    814
    815/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
    816   converting PR->CW or CW->PR need to have their lkb_grmode set. */
    817
    818static void recover_conversion(struct dlm_rsb *r)
    819{
    820	struct dlm_ls *ls = r->res_ls;
    821	struct dlm_lkb *lkb;
    822	int grmode = -1;
    823
    824	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
    825		if (lkb->lkb_grmode == DLM_LOCK_PR ||
    826		    lkb->lkb_grmode == DLM_LOCK_CW) {
    827			grmode = lkb->lkb_grmode;
    828			break;
    829		}
    830	}
    831
    832	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
    833		if (lkb->lkb_grmode != DLM_LOCK_IV)
    834			continue;
    835		if (grmode == -1) {
    836			log_debug(ls, "recover_conversion %x set gr to rq %d",
    837				  lkb->lkb_id, lkb->lkb_rqmode);
    838			lkb->lkb_grmode = lkb->lkb_rqmode;
    839		} else {
    840			log_debug(ls, "recover_conversion %x set gr %d",
    841				  lkb->lkb_id, grmode);
    842			lkb->lkb_grmode = grmode;
    843		}
    844	}
    845}
    846
    847/* We've become the new master for this rsb and waiting/converting locks may
    848   need to be granted in dlm_recover_grant() due to locks that may have
    849   existed from a removed node. */
    850
    851static void recover_grant(struct dlm_rsb *r)
    852{
    853	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
    854		rsb_set_flag(r, RSB_RECOVER_GRANT);
    855}
    856
    857void dlm_recover_rsbs(struct dlm_ls *ls)
    858{
    859	struct dlm_rsb *r;
    860	unsigned int count = 0;
    861
    862	down_read(&ls->ls_root_sem);
    863	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
    864		lock_rsb(r);
    865		if (is_master(r)) {
    866			if (rsb_flag(r, RSB_RECOVER_CONVERT))
    867				recover_conversion(r);
    868
    869			/* recover lvb before granting locks so the updated
    870			   lvb/VALNOTVALID is presented in the completion */
    871			recover_lvb(r);
    872
    873			if (rsb_flag(r, RSB_NEW_MASTER2))
    874				recover_grant(r);
    875			count++;
    876		} else {
    877			rsb_clear_flag(r, RSB_VALNOTVALID);
    878		}
    879		rsb_clear_flag(r, RSB_RECOVER_CONVERT);
    880		rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
    881		rsb_clear_flag(r, RSB_NEW_MASTER2);
    882		unlock_rsb(r);
    883	}
    884	up_read(&ls->ls_root_sem);
    885
    886	if (count)
    887		log_rinfo(ls, "dlm_recover_rsbs %d done", count);
    888}
    889
    890/* Create a single list of all root rsb's to be used during recovery */
    891
    892int dlm_create_root_list(struct dlm_ls *ls)
    893{
    894	struct rb_node *n;
    895	struct dlm_rsb *r;
    896	int i, error = 0;
    897
    898	down_write(&ls->ls_root_sem);
    899	if (!list_empty(&ls->ls_root_list)) {
    900		log_error(ls, "root list not empty");
    901		error = -EINVAL;
    902		goto out;
    903	}
    904
    905	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
    906		spin_lock(&ls->ls_rsbtbl[i].lock);
    907		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
    908			r = rb_entry(n, struct dlm_rsb, res_hashnode);
    909			list_add(&r->res_root_list, &ls->ls_root_list);
    910			dlm_hold_rsb(r);
    911		}
    912
    913		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
    914			log_error(ls, "dlm_create_root_list toss not empty");
    915		spin_unlock(&ls->ls_rsbtbl[i].lock);
    916	}
    917 out:
    918	up_write(&ls->ls_root_sem);
    919	return error;
    920}
    921
    922void dlm_release_root_list(struct dlm_ls *ls)
    923{
    924	struct dlm_rsb *r, *safe;
    925
    926	down_write(&ls->ls_root_sem);
    927	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
    928		list_del_init(&r->res_root_list);
    929		dlm_put_rsb(r);
    930	}
    931	up_write(&ls->ls_root_sem);
    932}
    933
    934void dlm_clear_toss(struct dlm_ls *ls)
    935{
    936	struct rb_node *n, *next;
    937	struct dlm_rsb *r;
    938	unsigned int count = 0;
    939	int i;
    940
    941	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
    942		spin_lock(&ls->ls_rsbtbl[i].lock);
    943		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
    944			next = rb_next(n);
    945			r = rb_entry(n, struct dlm_rsb, res_hashnode);
    946			rb_erase(n, &ls->ls_rsbtbl[i].toss);
    947			dlm_free_rsb(r);
    948			count++;
    949		}
    950		spin_unlock(&ls->ls_rsbtbl[i].lock);
    951	}
    952
    953	if (count)
    954		log_rinfo(ls, "dlm_clear_toss %u done", count);
    955}
    956