cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

dlmrecovery.c (89063B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * dlmrecovery.c
      4 *
      5 * recovery stuff
      6 *
      7 * Copyright (C) 2004 Oracle.  All rights reserved.
      8 */
      9
     10
     11#include <linux/module.h>
     12#include <linux/fs.h>
     13#include <linux/types.h>
     14#include <linux/slab.h>
     15#include <linux/highmem.h>
     16#include <linux/init.h>
     17#include <linux/sysctl.h>
     18#include <linux/random.h>
     19#include <linux/blkdev.h>
     20#include <linux/socket.h>
     21#include <linux/inet.h>
     22#include <linux/timer.h>
     23#include <linux/kthread.h>
     24#include <linux/delay.h>
     25
     26
     27#include "../cluster/heartbeat.h"
     28#include "../cluster/nodemanager.h"
     29#include "../cluster/tcp.h"
     30
     31#include "dlmapi.h"
     32#include "dlmcommon.h"
     33#include "dlmdomain.h"
     34
     35#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_RECOVERY)
     36#include "../cluster/masklog.h"
     37
     38static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
     39
     40static int dlm_recovery_thread(void *data);
     41static int dlm_do_recovery(struct dlm_ctxt *dlm);
     42
     43static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
     44static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
     45static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
     46static int dlm_request_all_locks(struct dlm_ctxt *dlm,
     47				 u8 request_from, u8 dead_node);
     48static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
     49
     50static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res);
     51static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
     52					const char *lockname, int namelen,
     53					int total_locks, u64 cookie,
     54					u8 flags, u8 master);
     55static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
     56				    struct dlm_migratable_lockres *mres,
     57				    u8 send_to,
     58				    struct dlm_lock_resource *res,
     59				    int total_locks);
     60static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
     61				     struct dlm_lock_resource *res,
     62				     struct dlm_migratable_lockres *mres);
     63static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
     64static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
     65				 u8 dead_node, u8 send_to);
     66static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
     67static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
     68					struct list_head *list, u8 dead_node);
     69static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
     70					      u8 dead_node, u8 new_master);
     71static void dlm_reco_ast(void *astdata);
     72static void dlm_reco_bast(void *astdata, int blocked_type);
     73static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
     74static void dlm_request_all_locks_worker(struct dlm_work_item *item,
     75					 void *data);
     76static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
     77static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
     78				      struct dlm_lock_resource *res,
     79				      u8 *real_master);
     80
     81static u64 dlm_get_next_mig_cookie(void);
     82
     83static DEFINE_SPINLOCK(dlm_reco_state_lock);
     84static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
     85static u64 dlm_mig_cookie = 1;
     86
     87static u64 dlm_get_next_mig_cookie(void)
     88{
     89	u64 c;
     90	spin_lock(&dlm_mig_cookie_lock);
     91	c = dlm_mig_cookie;
     92	if (dlm_mig_cookie == (~0ULL))
     93		dlm_mig_cookie = 1;
     94	else
     95		dlm_mig_cookie++;
     96	spin_unlock(&dlm_mig_cookie_lock);
     97	return c;
     98}
     99
    100static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
    101					  u8 dead_node)
    102{
    103	assert_spin_locked(&dlm->spinlock);
    104	if (dlm->reco.dead_node != dead_node)
    105		mlog(0, "%s: changing dead_node from %u to %u\n",
    106		     dlm->name, dlm->reco.dead_node, dead_node);
    107	dlm->reco.dead_node = dead_node;
    108}
    109
    110static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
    111				       u8 master)
    112{
    113	assert_spin_locked(&dlm->spinlock);
    114	mlog(0, "%s: changing new_master from %u to %u\n",
    115	     dlm->name, dlm->reco.new_master, master);
    116	dlm->reco.new_master = master;
    117}
    118
    119static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
    120{
    121	assert_spin_locked(&dlm->spinlock);
    122	clear_bit(dlm->reco.dead_node, dlm->recovery_map);
    123	dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
    124	dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
    125}
    126
    127/* Worker function used during recovery. */
    128void dlm_dispatch_work(struct work_struct *work)
    129{
    130	struct dlm_ctxt *dlm =
    131		container_of(work, struct dlm_ctxt, dispatched_work);
    132	LIST_HEAD(tmp_list);
    133	struct dlm_work_item *item, *next;
    134	dlm_workfunc_t *workfunc;
    135	int tot=0;
    136
    137	spin_lock(&dlm->work_lock);
    138	list_splice_init(&dlm->work_list, &tmp_list);
    139	spin_unlock(&dlm->work_lock);
    140
    141	list_for_each_entry(item, &tmp_list, list) {
    142		tot++;
    143	}
    144	mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
    145
    146	list_for_each_entry_safe(item, next, &tmp_list, list) {
    147		workfunc = item->func;
    148		list_del_init(&item->list);
    149
    150		/* already have ref on dlm to avoid having
    151		 * it disappear.  just double-check. */
    152		BUG_ON(item->dlm != dlm);
    153
    154		/* this is allowed to sleep and
    155		 * call network stuff */
    156		workfunc(item, item->data);
    157
    158		dlm_put(dlm);
    159		kfree(item);
    160	}
    161}
    162
    163/*
    164 * RECOVERY THREAD
    165 */
    166
    167void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
    168{
    169	/* wake the recovery thread
    170	 * this will wake the reco thread in one of three places
    171	 * 1) sleeping with no recovery happening
    172	 * 2) sleeping with recovery mastered elsewhere
    173	 * 3) recovery mastered here, waiting on reco data */
    174
    175	wake_up(&dlm->dlm_reco_thread_wq);
    176}
    177
    178/* Launch the recovery thread */
    179int dlm_launch_recovery_thread(struct dlm_ctxt *dlm)
    180{
    181	mlog(0, "starting dlm recovery thread...\n");
    182
    183	dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm,
    184			"dlm_reco-%s", dlm->name);
    185	if (IS_ERR(dlm->dlm_reco_thread_task)) {
    186		mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task));
    187		dlm->dlm_reco_thread_task = NULL;
    188		return -EINVAL;
    189	}
    190
    191	return 0;
    192}
    193
    194void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
    195{
    196	if (dlm->dlm_reco_thread_task) {
    197		mlog(0, "waiting for dlm recovery thread to exit\n");
    198		kthread_stop(dlm->dlm_reco_thread_task);
    199		dlm->dlm_reco_thread_task = NULL;
    200	}
    201}
    202
    203
    204
    205/*
    206 * this is lame, but here's how recovery works...
    207 * 1) all recovery threads cluster wide will work on recovering
    208 *    ONE node at a time
    209 * 2) negotiate who will take over all the locks for the dead node.
    210 *    thats right... ALL the locks.
    211 * 3) once a new master is chosen, everyone scans all locks
    212 *    and moves aside those mastered by the dead guy
    213 * 4) each of these locks should be locked until recovery is done
    214 * 5) the new master collects up all of secondary lock queue info
    215 *    one lock at a time, forcing each node to communicate back
    216 *    before continuing
    217 * 6) each secondary lock queue responds with the full known lock info
    218 * 7) once the new master has run all its locks, it sends a ALLDONE!
    219 *    message to everyone
    220 * 8) upon receiving this message, the secondary queue node unlocks
    221 *    and responds to the ALLDONE
    222 * 9) once the new master gets responses from everyone, he unlocks
    223 *    everything and recovery for this dead node is done
    224 *10) go back to 2) while there are still dead nodes
    225 *
    226 */
    227
    228static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
    229{
    230	struct dlm_reco_node_data *ndata;
    231	struct dlm_lock_resource *res;
    232
    233	mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
    234	     dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
    235	     dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
    236	     dlm->reco.dead_node, dlm->reco.new_master);
    237
    238	list_for_each_entry(ndata, &dlm->reco.node_data, list) {
    239		char *st = "unknown";
    240		switch (ndata->state) {
    241			case DLM_RECO_NODE_DATA_INIT:
    242				st = "init";
    243				break;
    244			case DLM_RECO_NODE_DATA_REQUESTING:
    245				st = "requesting";
    246				break;
    247			case DLM_RECO_NODE_DATA_DEAD:
    248				st = "dead";
    249				break;
    250			case DLM_RECO_NODE_DATA_RECEIVING:
    251				st = "receiving";
    252				break;
    253			case DLM_RECO_NODE_DATA_REQUESTED:
    254				st = "requested";
    255				break;
    256			case DLM_RECO_NODE_DATA_DONE:
    257				st = "done";
    258				break;
    259			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
    260				st = "finalize-sent";
    261				break;
    262			default:
    263				st = "bad";
    264				break;
    265		}
    266		mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
    267		     dlm->name, ndata->node_num, st);
    268	}
    269	list_for_each_entry(res, &dlm->reco.resources, recovering) {
    270		mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
    271		     dlm->name, res->lockname.len, res->lockname.name);
    272	}
    273}
    274
    275#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
    276
    277static int dlm_recovery_thread(void *data)
    278{
    279	int status;
    280	struct dlm_ctxt *dlm = data;
    281	unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS);
    282
    283	mlog(0, "dlm thread running for %s...\n", dlm->name);
    284
    285	while (!kthread_should_stop()) {
    286		if (dlm_domain_fully_joined(dlm)) {
    287			status = dlm_do_recovery(dlm);
    288			if (status == -EAGAIN) {
    289				/* do not sleep, recheck immediately. */
    290				continue;
    291			}
    292			if (status < 0)
    293				mlog_errno(status);
    294		}
    295
    296		wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
    297						 kthread_should_stop(),
    298						 timeout);
    299	}
    300
    301	mlog(0, "quitting DLM recovery thread\n");
    302	return 0;
    303}
    304
    305/* returns true when the recovery master has contacted us */
    306static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
    307{
    308	int ready;
    309	spin_lock(&dlm->spinlock);
    310	ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
    311	spin_unlock(&dlm->spinlock);
    312	return ready;
    313}
    314
    315/* returns true if node is no longer in the domain
    316 * could be dead or just not joined */
    317int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
    318{
    319	int dead;
    320	spin_lock(&dlm->spinlock);
    321	dead = !test_bit(node, dlm->domain_map);
    322	spin_unlock(&dlm->spinlock);
    323	return dead;
    324}
    325
    326/* returns true if node is no longer in the domain
    327 * could be dead or just not joined */
    328static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
    329{
    330	int recovered;
    331	spin_lock(&dlm->spinlock);
    332	recovered = !test_bit(node, dlm->recovery_map);
    333	spin_unlock(&dlm->spinlock);
    334	return recovered;
    335}
    336
    337
    338void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
    339{
    340	if (dlm_is_node_dead(dlm, node))
    341		return;
    342
    343	printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
    344	       "domain %s\n", node, dlm->name);
    345
    346	if (timeout)
    347		wait_event_timeout(dlm->dlm_reco_thread_wq,
    348				   dlm_is_node_dead(dlm, node),
    349				   msecs_to_jiffies(timeout));
    350	else
    351		wait_event(dlm->dlm_reco_thread_wq,
    352			   dlm_is_node_dead(dlm, node));
    353}
    354
    355void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
    356{
    357	if (dlm_is_node_recovered(dlm, node))
    358		return;
    359
    360	printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
    361	       "domain %s\n", node, dlm->name);
    362
    363	if (timeout)
    364		wait_event_timeout(dlm->dlm_reco_thread_wq,
    365				   dlm_is_node_recovered(dlm, node),
    366				   msecs_to_jiffies(timeout));
    367	else
    368		wait_event(dlm->dlm_reco_thread_wq,
    369			   dlm_is_node_recovered(dlm, node));
    370}
    371
    372/* callers of the top-level api calls (dlmlock/dlmunlock) should
    373 * block on the dlm->reco.event when recovery is in progress.
    374 * the dlm recovery thread will set this state when it begins
    375 * recovering a dead node (as the new master or not) and clear
    376 * the state and wake as soon as all affected lock resources have
    377 * been marked with the RECOVERY flag */
    378static int dlm_in_recovery(struct dlm_ctxt *dlm)
    379{
    380	int in_recovery;
    381	spin_lock(&dlm->spinlock);
    382	in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
    383	spin_unlock(&dlm->spinlock);
    384	return in_recovery;
    385}
    386
    387
    388void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
    389{
    390	if (dlm_in_recovery(dlm)) {
    391		mlog(0, "%s: reco thread %d in recovery: "
    392		     "state=%d, master=%u, dead=%u\n",
    393		     dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
    394		     dlm->reco.state, dlm->reco.new_master,
    395		     dlm->reco.dead_node);
    396	}
    397	wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
    398}
    399
    400static void dlm_begin_recovery(struct dlm_ctxt *dlm)
    401{
    402	assert_spin_locked(&dlm->spinlock);
    403	BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
    404	printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
    405	       dlm->name, dlm->reco.dead_node);
    406	dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
    407}
    408
    409static void dlm_end_recovery(struct dlm_ctxt *dlm)
    410{
    411	spin_lock(&dlm->spinlock);
    412	BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
    413	dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
    414	spin_unlock(&dlm->spinlock);
    415	printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
    416	wake_up(&dlm->reco.event);
    417}
    418
    419static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
    420{
    421	printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
    422	       "dead node %u in domain %s\n", dlm->reco.new_master,
    423	       (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
    424	       dlm->reco.dead_node, dlm->name);
    425}
    426
    427static int dlm_do_recovery(struct dlm_ctxt *dlm)
    428{
    429	int status = 0;
    430	int ret;
    431
    432	spin_lock(&dlm->spinlock);
    433
    434	if (dlm->migrate_done) {
    435		mlog(0, "%s: no need do recovery after migrating all "
    436		     "lock resources\n", dlm->name);
    437		spin_unlock(&dlm->spinlock);
    438		return 0;
    439	}
    440
    441	/* check to see if the new master has died */
    442	if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM &&
    443	    test_bit(dlm->reco.new_master, dlm->recovery_map)) {
    444		mlog(0, "new master %u died while recovering %u!\n",
    445		     dlm->reco.new_master, dlm->reco.dead_node);
    446		/* unset the new_master, leave dead_node */
    447		dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
    448	}
    449
    450	/* select a target to recover */
    451	if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
    452		int bit;
    453
    454		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
    455		if (bit >= O2NM_MAX_NODES || bit < 0)
    456			dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
    457		else
    458			dlm_set_reco_dead_node(dlm, bit);
    459	} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
    460		/* BUG? */
    461		mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
    462		     dlm->reco.dead_node);
    463		dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
    464	}
    465
    466	if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
    467		// mlog(0, "nothing to recover!  sleeping now!\n");
    468		spin_unlock(&dlm->spinlock);
    469		/* return to main thread loop and sleep. */
    470		return 0;
    471	}
    472	mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
    473	     dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
    474	     dlm->reco.dead_node);
    475
    476	/* take write barrier */
    477	/* (stops the list reshuffling thread, proxy ast handling) */
    478	dlm_begin_recovery(dlm);
    479
    480	spin_unlock(&dlm->spinlock);
    481
    482	if (dlm->reco.new_master == dlm->node_num)
    483		goto master_here;
    484
    485	if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
    486		/* choose a new master, returns 0 if this node
    487		 * is the master, -EEXIST if it's another node.
    488		 * this does not return until a new master is chosen
    489		 * or recovery completes entirely. */
    490		ret = dlm_pick_recovery_master(dlm);
    491		if (!ret) {
    492			/* already notified everyone.  go. */
    493			goto master_here;
    494		}
    495		mlog(0, "another node will master this recovery session.\n");
    496	}
    497
    498	dlm_print_recovery_master(dlm);
    499
    500	/* it is safe to start everything back up here
    501	 * because all of the dead node's lock resources
    502	 * have been marked as in-recovery */
    503	dlm_end_recovery(dlm);
    504
    505	/* sleep out in main dlm_recovery_thread loop. */
    506	return 0;
    507
    508master_here:
    509	dlm_print_recovery_master(dlm);
    510
    511	status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
    512	if (status < 0) {
    513		/* we should never hit this anymore */
    514		mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, "
    515		     "retrying.\n", dlm->name, status, dlm->reco.dead_node);
    516		/* yield a bit to allow any final network messages
    517		 * to get handled on remaining nodes */
    518		msleep(100);
    519	} else {
    520		/* success!  see if any other nodes need recovery */
    521		mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
    522		     dlm->name, dlm->reco.dead_node, dlm->node_num);
    523		spin_lock(&dlm->spinlock);
    524		__dlm_reset_recovery(dlm);
    525		dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
    526		spin_unlock(&dlm->spinlock);
    527	}
    528	dlm_end_recovery(dlm);
    529
    530	/* continue and look for another dead node */
    531	return -EAGAIN;
    532}
    533
    534static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
    535{
    536	int status = 0;
    537	struct dlm_reco_node_data *ndata;
    538	int all_nodes_done;
    539	int destroy = 0;
    540	int pass = 0;
    541
    542	do {
    543		/* we have become recovery master.  there is no escaping
    544		 * this, so just keep trying until we get it. */
    545		status = dlm_init_recovery_area(dlm, dead_node);
    546		if (status < 0) {
    547			mlog(ML_ERROR, "%s: failed to alloc recovery area, "
    548			     "retrying\n", dlm->name);
    549			msleep(1000);
    550		}
    551	} while (status != 0);
    552
    553	/* safe to access the node data list without a lock, since this
    554	 * process is the only one to change the list */
    555	list_for_each_entry(ndata, &dlm->reco.node_data, list) {
    556		BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
    557		ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
    558
    559		mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
    560		     ndata->node_num);
    561
    562		if (ndata->node_num == dlm->node_num) {
    563			ndata->state = DLM_RECO_NODE_DATA_DONE;
    564			continue;
    565		}
    566
    567		do {
    568			status = dlm_request_all_locks(dlm, ndata->node_num,
    569						       dead_node);
    570			if (status < 0) {
    571				mlog_errno(status);
    572				if (dlm_is_host_down(status)) {
    573					/* node died, ignore it for recovery */
    574					status = 0;
    575					ndata->state = DLM_RECO_NODE_DATA_DEAD;
    576					/* wait for the domain map to catch up
    577					 * with the network state. */
    578					wait_event_timeout(dlm->dlm_reco_thread_wq,
    579							   dlm_is_node_dead(dlm,
    580								ndata->node_num),
    581							   msecs_to_jiffies(1000));
    582					mlog(0, "waited 1 sec for %u, "
    583					     "dead? %s\n", ndata->node_num,
    584					     dlm_is_node_dead(dlm, ndata->node_num) ?
    585					     "yes" : "no");
    586				} else {
    587					/* -ENOMEM on the other node */
    588					mlog(0, "%s: node %u returned "
    589					     "%d during recovery, retrying "
    590					     "after a short wait\n",
    591					     dlm->name, ndata->node_num,
    592					     status);
    593					msleep(100);
    594				}
    595			}
    596		} while (status != 0);
    597
    598		spin_lock(&dlm_reco_state_lock);
    599		switch (ndata->state) {
    600			case DLM_RECO_NODE_DATA_INIT:
    601			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
    602			case DLM_RECO_NODE_DATA_REQUESTED:
    603				BUG();
    604				break;
    605			case DLM_RECO_NODE_DATA_DEAD:
    606				mlog(0, "node %u died after requesting "
    607				     "recovery info for node %u\n",
    608				     ndata->node_num, dead_node);
    609				/* fine.  don't need this node's info.
    610				 * continue without it. */
    611				break;
    612			case DLM_RECO_NODE_DATA_REQUESTING:
    613				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
    614				mlog(0, "now receiving recovery data from "
    615				     "node %u for dead node %u\n",
    616				     ndata->node_num, dead_node);
    617				break;
    618			case DLM_RECO_NODE_DATA_RECEIVING:
    619				mlog(0, "already receiving recovery data from "
    620				     "node %u for dead node %u\n",
    621				     ndata->node_num, dead_node);
    622				break;
    623			case DLM_RECO_NODE_DATA_DONE:
    624				mlog(0, "already DONE receiving recovery data "
    625				     "from node %u for dead node %u\n",
    626				     ndata->node_num, dead_node);
    627				break;
    628		}
    629		spin_unlock(&dlm_reco_state_lock);
    630	}
    631
    632	mlog(0, "%s: Done requesting all lock info\n", dlm->name);
    633
    634	/* nodes should be sending reco data now
    635	 * just need to wait */
    636
    637	while (1) {
    638		/* check all the nodes now to see if we are
    639		 * done, or if anyone died */
    640		all_nodes_done = 1;
    641		spin_lock(&dlm_reco_state_lock);
    642		list_for_each_entry(ndata, &dlm->reco.node_data, list) {
    643			mlog(0, "checking recovery state of node %u\n",
    644			     ndata->node_num);
    645			switch (ndata->state) {
    646				case DLM_RECO_NODE_DATA_INIT:
    647				case DLM_RECO_NODE_DATA_REQUESTING:
    648					mlog(ML_ERROR, "bad ndata state for "
    649					     "node %u: state=%d\n",
    650					     ndata->node_num, ndata->state);
    651					BUG();
    652					break;
    653				case DLM_RECO_NODE_DATA_DEAD:
    654					mlog(0, "node %u died after "
    655					     "requesting recovery info for "
    656					     "node %u\n", ndata->node_num,
    657					     dead_node);
    658					break;
    659				case DLM_RECO_NODE_DATA_RECEIVING:
    660				case DLM_RECO_NODE_DATA_REQUESTED:
    661					mlog(0, "%s: node %u still in state %s\n",
    662					     dlm->name, ndata->node_num,
    663					     ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
    664					     "receiving" : "requested");
    665					all_nodes_done = 0;
    666					break;
    667				case DLM_RECO_NODE_DATA_DONE:
    668					mlog(0, "%s: node %u state is done\n",
    669					     dlm->name, ndata->node_num);
    670					break;
    671				case DLM_RECO_NODE_DATA_FINALIZE_SENT:
    672					mlog(0, "%s: node %u state is finalize\n",
    673					     dlm->name, ndata->node_num);
    674					break;
    675			}
    676		}
    677		spin_unlock(&dlm_reco_state_lock);
    678
    679		mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,
    680		     all_nodes_done?"yes":"no");
    681		if (all_nodes_done) {
    682			int ret;
    683
    684			/* Set this flag on recovery master to avoid
    685			 * a new recovery for another dead node start
    686			 * before the recovery is not done. That may
    687			 * cause recovery hung.*/
    688			spin_lock(&dlm->spinlock);
    689			dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
    690			spin_unlock(&dlm->spinlock);
    691
    692			/* all nodes are now in DLM_RECO_NODE_DATA_DONE state
    693	 		 * just send a finalize message to everyone and
    694	 		 * clean up */
    695			mlog(0, "all nodes are done! send finalize\n");
    696			ret = dlm_send_finalize_reco_message(dlm);
    697			if (ret < 0)
    698				mlog_errno(ret);
    699
    700			spin_lock(&dlm->spinlock);
    701			dlm_finish_local_lockres_recovery(dlm, dead_node,
    702							  dlm->node_num);
    703			spin_unlock(&dlm->spinlock);
    704			mlog(0, "should be done with recovery!\n");
    705
    706			mlog(0, "finishing recovery of %s at %lu, "
    707			     "dead=%u, this=%u, new=%u\n", dlm->name,
    708			     jiffies, dlm->reco.dead_node,
    709			     dlm->node_num, dlm->reco.new_master);
    710			destroy = 1;
    711			status = 0;
    712			/* rescan everything marked dirty along the way */
    713			dlm_kick_thread(dlm, NULL);
    714			break;
    715		}
    716		/* wait to be signalled, with periodic timeout
    717		 * to check for node death */
    718		wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
    719					 kthread_should_stop(),
    720					 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS));
    721
    722	}
    723
    724	if (destroy)
    725		dlm_destroy_recovery_area(dlm);
    726
    727	return status;
    728}
    729
    730static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
    731{
    732	int num=0;
    733	struct dlm_reco_node_data *ndata;
    734
    735	spin_lock(&dlm->spinlock);
    736	memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map));
    737	/* nodes can only be removed (by dying) after dropping
    738	 * this lock, and death will be trapped later, so this should do */
    739	spin_unlock(&dlm->spinlock);
    740
    741	while (1) {
    742		num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num);
    743		if (num >= O2NM_MAX_NODES) {
    744			break;
    745		}
    746		BUG_ON(num == dead_node);
    747
    748		ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
    749		if (!ndata) {
    750			dlm_destroy_recovery_area(dlm);
    751			return -ENOMEM;
    752		}
    753		ndata->node_num = num;
    754		ndata->state = DLM_RECO_NODE_DATA_INIT;
    755		spin_lock(&dlm_reco_state_lock);
    756		list_add_tail(&ndata->list, &dlm->reco.node_data);
    757		spin_unlock(&dlm_reco_state_lock);
    758		num++;
    759	}
    760
    761	return 0;
    762}
    763
    764static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm)
    765{
    766	struct dlm_reco_node_data *ndata, *next;
    767	LIST_HEAD(tmplist);
    768
    769	spin_lock(&dlm_reco_state_lock);
    770	list_splice_init(&dlm->reco.node_data, &tmplist);
    771	spin_unlock(&dlm_reco_state_lock);
    772
    773	list_for_each_entry_safe(ndata, next, &tmplist, list) {
    774		list_del_init(&ndata->list);
    775		kfree(ndata);
    776	}
    777}
    778
    779static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
    780				 u8 dead_node)
    781{
    782	struct dlm_lock_request lr;
    783	int ret;
    784	int status;
    785
    786	mlog(0, "\n");
    787
    788
    789	mlog(0, "dlm_request_all_locks: dead node is %u, sending request "
    790		  "to %u\n", dead_node, request_from);
    791
    792	memset(&lr, 0, sizeof(lr));
    793	lr.node_idx = dlm->node_num;
    794	lr.dead_node = dead_node;
    795
    796	// send message
    797	ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,
    798				 &lr, sizeof(lr), request_from, &status);
    799
    800	/* negative status is handled by caller */
    801	if (ret < 0)
    802		mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u "
    803		     "to recover dead node %u\n", dlm->name, ret,
    804		     request_from, dead_node);
    805	else
    806		ret = status;
    807	// return from here, then
    808	// sleep until all received or error
    809	return ret;
    810
    811}
    812
    813int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
    814				  void **ret_data)
    815{
    816	struct dlm_ctxt *dlm = data;
    817	struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
    818	char *buf = NULL;
    819	struct dlm_work_item *item = NULL;
    820
    821	if (!dlm_grab(dlm))
    822		return -EINVAL;
    823
    824	if (lr->dead_node != dlm->reco.dead_node) {
    825		mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
    826		     "dead_node is %u\n", dlm->name, lr->node_idx,
    827		     lr->dead_node, dlm->reco.dead_node);
    828		dlm_print_reco_node_status(dlm);
    829		/* this is a hack */
    830		dlm_put(dlm);
    831		return -ENOMEM;
    832	}
    833	BUG_ON(lr->dead_node != dlm->reco.dead_node);
    834
    835	item = kzalloc(sizeof(*item), GFP_NOFS);
    836	if (!item) {
    837		dlm_put(dlm);
    838		return -ENOMEM;
    839	}
    840
    841	/* this will get freed by dlm_request_all_locks_worker */
    842	buf = (char *) __get_free_page(GFP_NOFS);
    843	if (!buf) {
    844		kfree(item);
    845		dlm_put(dlm);
    846		return -ENOMEM;
    847	}
    848
    849	/* queue up work for dlm_request_all_locks_worker */
    850	dlm_grab(dlm);  /* get an extra ref for the work item */
    851	dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);
    852	item->u.ral.reco_master = lr->node_idx;
    853	item->u.ral.dead_node = lr->dead_node;
    854	spin_lock(&dlm->work_lock);
    855	list_add_tail(&item->list, &dlm->work_list);
    856	spin_unlock(&dlm->work_lock);
    857	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
    858
    859	dlm_put(dlm);
    860	return 0;
    861}
    862
    863static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
    864{
    865	struct dlm_migratable_lockres *mres;
    866	struct dlm_lock_resource *res;
    867	struct dlm_ctxt *dlm;
    868	LIST_HEAD(resources);
    869	int ret;
    870	u8 dead_node, reco_master;
    871	int skip_all_done = 0;
    872
    873	dlm = item->dlm;
    874	dead_node = item->u.ral.dead_node;
    875	reco_master = item->u.ral.reco_master;
    876	mres = (struct dlm_migratable_lockres *)data;
    877
    878	mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
    879	     dlm->name, dead_node, reco_master);
    880
    881	if (dead_node != dlm->reco.dead_node ||
    882	    reco_master != dlm->reco.new_master) {
    883		/* worker could have been created before the recovery master
    884		 * died.  if so, do not continue, but do not error. */
    885		if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
    886			mlog(ML_NOTICE, "%s: will not send recovery state, "
    887			     "recovery master %u died, thread=(dead=%u,mas=%u)"
    888			     " current=(dead=%u,mas=%u)\n", dlm->name,
    889			     reco_master, dead_node, reco_master,
    890			     dlm->reco.dead_node, dlm->reco.new_master);
    891		} else {
    892			mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
    893			     "master=%u), request(dead=%u, master=%u)\n",
    894			     dlm->name, dlm->reco.dead_node,
    895			     dlm->reco.new_master, dead_node, reco_master);
    896		}
    897		goto leave;
    898	}
    899
    900	/* lock resources should have already been moved to the
    901 	 * dlm->reco.resources list.  now move items from that list
    902 	 * to a temp list if the dead owner matches.  note that the
    903	 * whole cluster recovers only one node at a time, so we
    904	 * can safely move UNKNOWN lock resources for each recovery
    905	 * session. */
    906	dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
    907
    908	/* now we can begin blasting lockreses without the dlm lock */
    909
    910	/* any errors returned will be due to the new_master dying,
    911	 * the dlm_reco_thread should detect this */
    912	list_for_each_entry(res, &resources, recovering) {
    913		ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
    914				   	DLM_MRES_RECOVERY);
    915		if (ret < 0) {
    916			mlog(ML_ERROR, "%s: node %u went down while sending "
    917			     "recovery state for dead node %u, ret=%d\n", dlm->name,
    918			     reco_master, dead_node, ret);
    919			skip_all_done = 1;
    920			break;
    921		}
    922	}
    923
    924	/* move the resources back to the list */
    925	spin_lock(&dlm->spinlock);
    926	list_splice_init(&resources, &dlm->reco.resources);
    927	spin_unlock(&dlm->spinlock);
    928
    929	if (!skip_all_done) {
    930		ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
    931		if (ret < 0) {
    932			mlog(ML_ERROR, "%s: node %u went down while sending "
    933			     "recovery all-done for dead node %u, ret=%d\n",
    934			     dlm->name, reco_master, dead_node, ret);
    935		}
    936	}
    937leave:
    938	free_page((unsigned long)data);
    939}
    940
    941
    942static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
    943{
    944	int ret, tmpret;
    945	struct dlm_reco_data_done done_msg;
    946
    947	memset(&done_msg, 0, sizeof(done_msg));
    948	done_msg.node_idx = dlm->node_num;
    949	done_msg.dead_node = dead_node;
    950	mlog(0, "sending DATA DONE message to %u, "
    951	     "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,
    952	     done_msg.dead_node);
    953
    954	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
    955				 sizeof(done_msg), send_to, &tmpret);
    956	if (ret < 0) {
    957		mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u "
    958		     "to recover dead node %u\n", dlm->name, ret, send_to,
    959		     dead_node);
    960		if (!dlm_is_host_down(ret)) {
    961			BUG();
    962		}
    963	} else
    964		ret = tmpret;
    965	return ret;
    966}
    967
    968
    969int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
    970			       void **ret_data)
    971{
    972	struct dlm_ctxt *dlm = data;
    973	struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
    974	struct dlm_reco_node_data *ndata = NULL;
    975	int ret = -EINVAL;
    976
    977	if (!dlm_grab(dlm))
    978		return -EINVAL;
    979
    980	mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
    981	     "node_idx=%u, this node=%u\n", done->dead_node,
    982	     dlm->reco.dead_node, done->node_idx, dlm->node_num);
    983
    984	mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
    985			"Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
    986			"node_idx=%u, this node=%u\n", done->dead_node,
    987			dlm->reco.dead_node, done->node_idx, dlm->node_num);
    988
    989	spin_lock(&dlm_reco_state_lock);
    990	list_for_each_entry(ndata, &dlm->reco.node_data, list) {
    991		if (ndata->node_num != done->node_idx)
    992			continue;
    993
    994		switch (ndata->state) {
    995			/* should have moved beyond INIT but not to FINALIZE yet */
    996			case DLM_RECO_NODE_DATA_INIT:
    997			case DLM_RECO_NODE_DATA_DEAD:
    998			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
    999				mlog(ML_ERROR, "bad ndata state for node %u:"
   1000				     " state=%d\n", ndata->node_num,
   1001				     ndata->state);
   1002				BUG();
   1003				break;
   1004			/* these states are possible at this point, anywhere along
   1005			 * the line of recovery */
   1006			case DLM_RECO_NODE_DATA_DONE:
   1007			case DLM_RECO_NODE_DATA_RECEIVING:
   1008			case DLM_RECO_NODE_DATA_REQUESTED:
   1009			case DLM_RECO_NODE_DATA_REQUESTING:
   1010				mlog(0, "node %u is DONE sending "
   1011					  "recovery data!\n",
   1012					  ndata->node_num);
   1013
   1014				ndata->state = DLM_RECO_NODE_DATA_DONE;
   1015				ret = 0;
   1016				break;
   1017		}
   1018	}
   1019	spin_unlock(&dlm_reco_state_lock);
   1020
   1021	/* wake the recovery thread, some node is done */
   1022	if (!ret)
   1023		dlm_kick_recovery_thread(dlm);
   1024
   1025	if (ret < 0)
   1026		mlog(ML_ERROR, "failed to find recovery node data for node "
   1027		     "%u\n", done->node_idx);
   1028	dlm_put(dlm);
   1029
   1030	mlog(0, "leaving reco data done handler, ret=%d\n", ret);
   1031	return ret;
   1032}
   1033
   1034static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
   1035					struct list_head *list,
   1036				       	u8 dead_node)
   1037{
   1038	struct dlm_lock_resource *res, *next;
   1039	struct dlm_lock *lock;
   1040
   1041	spin_lock(&dlm->spinlock);
   1042	list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
   1043		/* always prune any $RECOVERY entries for dead nodes,
   1044		 * otherwise hangs can occur during later recovery */
   1045		if (dlm_is_recovery_lock(res->lockname.name,
   1046					 res->lockname.len)) {
   1047			spin_lock(&res->spinlock);
   1048			list_for_each_entry(lock, &res->granted, list) {
   1049				if (lock->ml.node == dead_node) {
   1050					mlog(0, "AHA! there was "
   1051					     "a $RECOVERY lock for dead "
   1052					     "node %u (%s)!\n",
   1053					     dead_node, dlm->name);
   1054					list_del_init(&lock->list);
   1055					dlm_lock_put(lock);
   1056					/* Can't schedule DLM_UNLOCK_FREE_LOCK
   1057					 * - do manually */
   1058					dlm_lock_put(lock);
   1059					break;
   1060				}
   1061			}
   1062			spin_unlock(&res->spinlock);
   1063			continue;
   1064		}
   1065
   1066		if (res->owner == dead_node) {
   1067			mlog(0, "found lockres owned by dead node while "
   1068				  "doing recovery for node %u. sending it.\n",
   1069				  dead_node);
   1070			list_move_tail(&res->recovering, list);
   1071		} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
   1072			mlog(0, "found UNKNOWN owner while doing recovery "
   1073				  "for node %u. sending it.\n", dead_node);
   1074			list_move_tail(&res->recovering, list);
   1075		}
   1076	}
   1077	spin_unlock(&dlm->spinlock);
   1078}
   1079
   1080static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res)
   1081{
   1082	int total_locks = 0;
   1083	struct list_head *iter, *queue = &res->granted;
   1084	int i;
   1085
   1086	for (i=0; i<3; i++) {
   1087		list_for_each(iter, queue)
   1088			total_locks++;
   1089		queue++;
   1090	}
   1091	return total_locks;
   1092}
   1093
   1094
   1095static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
   1096				      struct dlm_migratable_lockres *mres,
   1097				      u8 send_to,
   1098				      struct dlm_lock_resource *res,
   1099				      int total_locks)
   1100{
   1101	u64 mig_cookie = be64_to_cpu(mres->mig_cookie);
   1102	int mres_total_locks = be32_to_cpu(mres->total_locks);
   1103	int ret = 0, status = 0;
   1104	u8 orig_flags = mres->flags,
   1105	   orig_master = mres->master;
   1106
   1107	BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS);
   1108	if (!mres->num_locks)
   1109		return 0;
   1110
   1111	/* add an all-done flag if we reached the last lock */
   1112	orig_flags = mres->flags;
   1113	BUG_ON(total_locks > mres_total_locks);
   1114	if (total_locks == mres_total_locks)
   1115		mres->flags |= DLM_MRES_ALL_DONE;
   1116
   1117	mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
   1118	     dlm->name, res->lockname.len, res->lockname.name,
   1119	     orig_flags & DLM_MRES_MIGRATION ? "migration" : "recovery",
   1120	     send_to);
   1121
   1122	/* send it */
   1123	ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
   1124				 struct_size(mres, ml, mres->num_locks),
   1125				 send_to, &status);
   1126	if (ret < 0) {
   1127		/* XXX: negative status is not handled.
   1128		 * this will end up killing this node. */
   1129		mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to "
   1130		     "node %u (%s)\n", dlm->name, mres->lockname_len,
   1131		     mres->lockname, ret, send_to,
   1132		     (orig_flags & DLM_MRES_MIGRATION ?
   1133		      "migration" : "recovery"));
   1134	} else {
   1135		/* might get an -ENOMEM back here */
   1136		ret = status;
   1137		if (ret < 0) {
   1138			mlog_errno(ret);
   1139
   1140			if (ret == -EFAULT) {
   1141				mlog(ML_ERROR, "node %u told me to kill "
   1142				     "myself!\n", send_to);
   1143				BUG();
   1144			}
   1145		}
   1146	}
   1147
   1148	/* zero and reinit the message buffer */
   1149	dlm_init_migratable_lockres(mres, res->lockname.name,
   1150				    res->lockname.len, mres_total_locks,
   1151				    mig_cookie, orig_flags, orig_master);
   1152	return ret;
   1153}
   1154
   1155static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
   1156					const char *lockname, int namelen,
   1157					int total_locks, u64 cookie,
   1158					u8 flags, u8 master)
   1159{
   1160	/* mres here is one full page */
   1161	clear_page(mres);
   1162	mres->lockname_len = namelen;
   1163	memcpy(mres->lockname, lockname, namelen);
   1164	mres->num_locks = 0;
   1165	mres->total_locks = cpu_to_be32(total_locks);
   1166	mres->mig_cookie = cpu_to_be64(cookie);
   1167	mres->flags = flags;
   1168	mres->master = master;
   1169}
   1170
   1171static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock,
   1172					  struct dlm_migratable_lockres *mres,
   1173					  int queue)
   1174{
   1175	if (!lock->lksb)
   1176	       return;
   1177
   1178	/* Ignore lvb in all locks in the blocked list */
   1179	if (queue == DLM_BLOCKED_LIST)
   1180		return;
   1181
   1182	/* Only consider lvbs in locks with granted EX or PR lock levels */
   1183	if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE)
   1184		return;
   1185
   1186	if (dlm_lvb_is_empty(mres->lvb)) {
   1187		memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
   1188		return;
   1189	}
   1190
   1191	/* Ensure the lvb copied for migration matches in other valid locks */
   1192	if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))
   1193		return;
   1194
   1195	mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, "
   1196	     "node=%u\n",
   1197	     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
   1198	     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
   1199	     lock->lockres->lockname.len, lock->lockres->lockname.name,
   1200	     lock->ml.node);
   1201	dlm_print_one_lock_resource(lock->lockres);
   1202	BUG();
   1203}
   1204
   1205/* returns 1 if this lock fills the network structure,
   1206 * 0 otherwise */
   1207static int dlm_add_lock_to_array(struct dlm_lock *lock,
   1208				 struct dlm_migratable_lockres *mres, int queue)
   1209{
   1210	struct dlm_migratable_lock *ml;
   1211	int lock_num = mres->num_locks;
   1212
   1213	ml = &(mres->ml[lock_num]);
   1214	ml->cookie = lock->ml.cookie;
   1215	ml->type = lock->ml.type;
   1216	ml->convert_type = lock->ml.convert_type;
   1217	ml->highest_blocked = lock->ml.highest_blocked;
   1218	ml->list = queue;
   1219	if (lock->lksb) {
   1220		ml->flags = lock->lksb->flags;
   1221		dlm_prepare_lvb_for_migration(lock, mres, queue);
   1222	}
   1223	ml->node = lock->ml.node;
   1224	mres->num_locks++;
   1225	/* we reached the max, send this network message */
   1226	if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS)
   1227		return 1;
   1228	return 0;
   1229}
   1230
   1231static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
   1232			       struct dlm_migratable_lockres *mres)
   1233{
   1234	struct dlm_lock dummy;
   1235	memset(&dummy, 0, sizeof(dummy));
   1236	dummy.ml.cookie = 0;
   1237	dummy.ml.type = LKM_IVMODE;
   1238	dummy.ml.convert_type = LKM_IVMODE;
   1239	dummy.ml.highest_blocked = LKM_IVMODE;
   1240	dummy.lksb = NULL;
   1241	dummy.ml.node = dlm->node_num;
   1242	dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
   1243}
   1244
   1245static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
   1246				    struct dlm_migratable_lock *ml,
   1247				    u8 *nodenum)
   1248{
   1249	if (unlikely(ml->cookie == 0 &&
   1250	    ml->type == LKM_IVMODE &&
   1251	    ml->convert_type == LKM_IVMODE &&
   1252	    ml->highest_blocked == LKM_IVMODE &&
   1253	    ml->list == DLM_BLOCKED_LIST)) {
   1254		*nodenum = ml->node;
   1255		return 1;
   1256	}
   1257	return 0;
   1258}
   1259
   1260int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
   1261			 struct dlm_migratable_lockres *mres,
   1262			 u8 send_to, u8 flags)
   1263{
   1264	struct list_head *queue;
   1265	int total_locks, i;
   1266	u64 mig_cookie = 0;
   1267	struct dlm_lock *lock;
   1268	int ret = 0;
   1269
   1270	BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));
   1271
   1272	mlog(0, "sending to %u\n", send_to);
   1273
   1274	total_locks = dlm_num_locks_in_lockres(res);
   1275	if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) {
   1276		/* rare, but possible */
   1277		mlog(0, "argh.  lockres has %d locks.  this will "
   1278			  "require more than one network packet to "
   1279			  "migrate\n", total_locks);
   1280		mig_cookie = dlm_get_next_mig_cookie();
   1281	}
   1282
   1283	dlm_init_migratable_lockres(mres, res->lockname.name,
   1284				    res->lockname.len, total_locks,
   1285				    mig_cookie, flags, res->owner);
   1286
   1287	total_locks = 0;
   1288	for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {
   1289		queue = dlm_list_idx_to_ptr(res, i);
   1290		list_for_each_entry(lock, queue, list) {
   1291			/* add another lock. */
   1292			total_locks++;
   1293			if (!dlm_add_lock_to_array(lock, mres, i))
   1294				continue;
   1295
   1296			/* this filled the lock message,
   1297			 * we must send it immediately. */
   1298			ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
   1299						       res, total_locks);
   1300			if (ret < 0)
   1301				goto error;
   1302		}
   1303	}
   1304	if (total_locks == 0) {
   1305		/* send a dummy lock to indicate a mastery reference only */
   1306		mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
   1307		     dlm->name, res->lockname.len, res->lockname.name,
   1308		     send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
   1309		     "migration");
   1310		dlm_add_dummy_lock(dlm, mres);
   1311	}
   1312	/* flush any remaining locks */
   1313	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
   1314	if (ret < 0)
   1315		goto error;
   1316	return ret;
   1317
   1318error:
   1319	mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
   1320	     dlm->name, ret);
   1321	if (!dlm_is_host_down(ret))
   1322		BUG();
   1323	mlog(0, "%s: node %u went down while sending %s "
   1324	     "lockres %.*s\n", dlm->name, send_to,
   1325	     flags & DLM_MRES_RECOVERY ?  "recovery" : "migration",
   1326	     res->lockname.len, res->lockname.name);
   1327	return ret;
   1328}
   1329
   1330
   1331
   1332/*
   1333 * this message will contain no more than one page worth of
   1334 * recovery data, and it will work on only one lockres.
   1335 * there may be many locks in this page, and we may need to wait
   1336 * for additional packets to complete all the locks (rare, but
   1337 * possible).
   1338 */
   1339/*
   1340 * NOTE: the allocation error cases here are scary
   1341 * we really cannot afford to fail an alloc in recovery
   1342 * do we spin?  returning an error only delays the problem really
   1343 */
   1344
   1345int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
   1346			    void **ret_data)
   1347{
   1348	struct dlm_ctxt *dlm = data;
   1349	struct dlm_migratable_lockres *mres =
   1350		(struct dlm_migratable_lockres *)msg->buf;
   1351	int ret = 0;
   1352	u8 real_master;
   1353	u8 extra_refs = 0;
   1354	char *buf = NULL;
   1355	struct dlm_work_item *item = NULL;
   1356	struct dlm_lock_resource *res = NULL;
   1357	unsigned int hash;
   1358
   1359	if (!dlm_grab(dlm))
   1360		return -EINVAL;
   1361
   1362	if (!dlm_joined(dlm)) {
   1363		mlog(ML_ERROR, "Domain %s not joined! "
   1364			  "lockres %.*s, master %u\n",
   1365			  dlm->name, mres->lockname_len,
   1366			  mres->lockname, mres->master);
   1367		dlm_put(dlm);
   1368		return -EINVAL;
   1369	}
   1370
   1371	BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));
   1372
   1373	real_master = mres->master;
   1374	if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
   1375		/* cannot migrate a lockres with no master */
   1376		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
   1377	}
   1378
   1379	mlog(0, "%s message received from node %u\n",
   1380		  (mres->flags & DLM_MRES_RECOVERY) ?
   1381		  "recovery" : "migration", mres->master);
   1382	if (mres->flags & DLM_MRES_ALL_DONE)
   1383		mlog(0, "all done flag.  all lockres data received!\n");
   1384
   1385	ret = -ENOMEM;
   1386	buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
   1387	item = kzalloc(sizeof(*item), GFP_NOFS);
   1388	if (!buf || !item)
   1389		goto leave;
   1390
   1391	/* lookup the lock to see if we have a secondary queue for this
   1392	 * already...  just add the locks in and this will have its owner
   1393	 * and RECOVERY flag changed when it completes. */
   1394	hash = dlm_lockid_hash(mres->lockname, mres->lockname_len);
   1395	spin_lock(&dlm->spinlock);
   1396	res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len,
   1397			hash);
   1398	if (res) {
   1399	 	/* this will get a ref on res */
   1400		/* mark it as recovering/migrating and hash it */
   1401		spin_lock(&res->spinlock);
   1402		if (res->state & DLM_LOCK_RES_DROPPING_REF) {
   1403			mlog(0, "%s: node is attempting to migrate "
   1404				"lockres %.*s, but marked as dropping "
   1405				" ref!\n", dlm->name,
   1406				mres->lockname_len, mres->lockname);
   1407			ret = -EINVAL;
   1408			spin_unlock(&res->spinlock);
   1409			spin_unlock(&dlm->spinlock);
   1410			dlm_lockres_put(res);
   1411			goto leave;
   1412		}
   1413
   1414		if (mres->flags & DLM_MRES_RECOVERY) {
   1415			res->state |= DLM_LOCK_RES_RECOVERING;
   1416		} else {
   1417			if (res->state & DLM_LOCK_RES_MIGRATING) {
   1418				/* this is at least the second
   1419				 * lockres message */
   1420				mlog(0, "lock %.*s is already migrating\n",
   1421					  mres->lockname_len,
   1422					  mres->lockname);
   1423			} else if (res->state & DLM_LOCK_RES_RECOVERING) {
   1424				/* caller should BUG */
   1425				mlog(ML_ERROR, "node is attempting to migrate "
   1426				     "lock %.*s, but marked as recovering!\n",
   1427				     mres->lockname_len, mres->lockname);
   1428				ret = -EFAULT;
   1429				spin_unlock(&res->spinlock);
   1430				spin_unlock(&dlm->spinlock);
   1431				dlm_lockres_put(res);
   1432				goto leave;
   1433			}
   1434			res->state |= DLM_LOCK_RES_MIGRATING;
   1435		}
   1436		spin_unlock(&res->spinlock);
   1437		spin_unlock(&dlm->spinlock);
   1438	} else {
   1439		spin_unlock(&dlm->spinlock);
   1440		/* need to allocate, just like if it was
   1441		 * mastered here normally  */
   1442		res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len);
   1443		if (!res)
   1444			goto leave;
   1445
   1446		/* to match the ref that we would have gotten if
   1447		 * dlm_lookup_lockres had succeeded */
   1448		dlm_lockres_get(res);
   1449
   1450		/* mark it as recovering/migrating and hash it */
   1451		if (mres->flags & DLM_MRES_RECOVERY)
   1452			res->state |= DLM_LOCK_RES_RECOVERING;
   1453		else
   1454			res->state |= DLM_LOCK_RES_MIGRATING;
   1455
   1456		spin_lock(&dlm->spinlock);
   1457		__dlm_insert_lockres(dlm, res);
   1458		spin_unlock(&dlm->spinlock);
   1459
   1460		/* Add an extra ref for this lock-less lockres lest the
   1461		 * dlm_thread purges it before we get the chance to add
   1462		 * locks to it */
   1463		dlm_lockres_get(res);
   1464
   1465		/* There are three refs that need to be put.
   1466		 * 1. Taken above.
   1467		 * 2. kref_init in dlm_new_lockres()->dlm_init_lockres().
   1468		 * 3. dlm_lookup_lockres()
   1469		 * The first one is handled at the end of this function. The
   1470		 * other two are handled in the worker thread after locks have
   1471		 * been attached. Yes, we don't wait for purge time to match
   1472		 * kref_init. The lockres will still have atleast one ref
   1473		 * added because it is in the hash __dlm_insert_lockres() */
   1474		extra_refs++;
   1475
   1476		/* now that the new lockres is inserted,
   1477		 * make it usable by other processes */
   1478		spin_lock(&res->spinlock);
   1479		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
   1480		spin_unlock(&res->spinlock);
   1481		wake_up(&res->wq);
   1482	}
   1483
   1484	/* at this point we have allocated everything we need,
   1485	 * and we have a hashed lockres with an extra ref and
   1486	 * the proper res->state flags. */
   1487	ret = 0;
   1488	spin_lock(&res->spinlock);
   1489	/* drop this either when master requery finds a different master
   1490	 * or when a lock is added by the recovery worker */
   1491	dlm_lockres_grab_inflight_ref(dlm, res);
   1492	if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
   1493		/* migration cannot have an unknown master */
   1494		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
   1495		mlog(0, "recovery has passed me a lockres with an "
   1496			  "unknown owner.. will need to requery: "
   1497			  "%.*s\n", mres->lockname_len, mres->lockname);
   1498	} else {
   1499		/* take a reference now to pin the lockres, drop it
   1500		 * when locks are added in the worker */
   1501		dlm_change_lockres_owner(dlm, res, dlm->node_num);
   1502	}
   1503	spin_unlock(&res->spinlock);
   1504
   1505	/* queue up work for dlm_mig_lockres_worker */
   1506	dlm_grab(dlm);  /* get an extra ref for the work item */
   1507	memcpy(buf, msg->buf, be16_to_cpu(msg->data_len));  /* copy the whole message */
   1508	dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
   1509	item->u.ml.lockres = res; /* already have a ref */
   1510	item->u.ml.real_master = real_master;
   1511	item->u.ml.extra_ref = extra_refs;
   1512	spin_lock(&dlm->work_lock);
   1513	list_add_tail(&item->list, &dlm->work_list);
   1514	spin_unlock(&dlm->work_lock);
   1515	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
   1516
   1517leave:
   1518	/* One extra ref taken needs to be put here */
   1519	if (extra_refs)
   1520		dlm_lockres_put(res);
   1521
   1522	dlm_put(dlm);
   1523	if (ret < 0) {
   1524		kfree(buf);
   1525		kfree(item);
   1526		mlog_errno(ret);
   1527	}
   1528
   1529	return ret;
   1530}
   1531
   1532
   1533static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
   1534{
   1535	struct dlm_ctxt *dlm;
   1536	struct dlm_migratable_lockres *mres;
   1537	int ret = 0;
   1538	struct dlm_lock_resource *res;
   1539	u8 real_master;
   1540	u8 extra_ref;
   1541
   1542	dlm = item->dlm;
   1543	mres = (struct dlm_migratable_lockres *)data;
   1544
   1545	res = item->u.ml.lockres;
   1546	real_master = item->u.ml.real_master;
   1547	extra_ref = item->u.ml.extra_ref;
   1548
   1549	if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
   1550		/* this case is super-rare. only occurs if
   1551		 * node death happens during migration. */
   1552again:
   1553		ret = dlm_lockres_master_requery(dlm, res, &real_master);
   1554		if (ret < 0) {
   1555			mlog(0, "dlm_lockres_master_requery ret=%d\n",
   1556				  ret);
   1557			goto again;
   1558		}
   1559		if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
   1560			mlog(0, "lockres %.*s not claimed.  "
   1561				   "this node will take it.\n",
   1562				   res->lockname.len, res->lockname.name);
   1563		} else {
   1564			spin_lock(&res->spinlock);
   1565			dlm_lockres_drop_inflight_ref(dlm, res);
   1566			spin_unlock(&res->spinlock);
   1567			mlog(0, "master needs to respond to sender "
   1568				  "that node %u still owns %.*s\n",
   1569				  real_master, res->lockname.len,
   1570				  res->lockname.name);
   1571			/* cannot touch this lockres */
   1572			goto leave;
   1573		}
   1574	}
   1575
   1576	ret = dlm_process_recovery_data(dlm, res, mres);
   1577	if (ret < 0)
   1578		mlog(0, "dlm_process_recovery_data returned  %d\n", ret);
   1579	else
   1580		mlog(0, "dlm_process_recovery_data succeeded\n");
   1581
   1582	if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) ==
   1583	                   (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) {
   1584		ret = dlm_finish_migration(dlm, res, mres->master);
   1585		if (ret < 0)
   1586			mlog_errno(ret);
   1587	}
   1588
   1589leave:
   1590	/* See comment in dlm_mig_lockres_handler() */
   1591	if (res) {
   1592		if (extra_ref)
   1593			dlm_lockres_put(res);
   1594		dlm_lockres_put(res);
   1595	}
   1596	kfree(data);
   1597}
   1598
   1599
   1600
   1601static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
   1602				      struct dlm_lock_resource *res,
   1603				      u8 *real_master)
   1604{
   1605	struct dlm_node_iter iter;
   1606	int nodenum;
   1607	int ret = 0;
   1608
   1609	*real_master = DLM_LOCK_RES_OWNER_UNKNOWN;
   1610
   1611	/* we only reach here if one of the two nodes in a
   1612	 * migration died while the migration was in progress.
   1613	 * at this point we need to requery the master.  we
   1614	 * know that the new_master got as far as creating
   1615	 * an mle on at least one node, but we do not know
   1616	 * if any nodes had actually cleared the mle and set
   1617	 * the master to the new_master.  the old master
   1618	 * is supposed to set the owner to UNKNOWN in the
   1619	 * event of a new_master death, so the only possible
   1620	 * responses that we can get from nodes here are
   1621	 * that the master is new_master, or that the master
   1622	 * is UNKNOWN.
   1623	 * if all nodes come back with UNKNOWN then we know
   1624	 * the lock needs remastering here.
   1625	 * if any node comes back with a valid master, check
   1626	 * to see if that master is the one that we are
   1627	 * recovering.  if so, then the new_master died and
   1628	 * we need to remaster this lock.  if not, then the
   1629	 * new_master survived and that node will respond to
   1630	 * other nodes about the owner.
   1631	 * if there is an owner, this node needs to dump this
   1632	 * lockres and alert the sender that this lockres
   1633	 * was rejected. */
   1634	spin_lock(&dlm->spinlock);
   1635	dlm_node_iter_init(dlm->domain_map, &iter);
   1636	spin_unlock(&dlm->spinlock);
   1637
   1638	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
   1639		/* do not send to self */
   1640		if (nodenum == dlm->node_num)
   1641			continue;
   1642		ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
   1643		if (ret < 0) {
   1644			mlog_errno(ret);
   1645			if (!dlm_is_host_down(ret))
   1646				BUG();
   1647			/* host is down, so answer for that node would be
   1648			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
   1649		}
   1650		if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
   1651			mlog(0, "lock master is %u\n", *real_master);
   1652			break;
   1653		}
   1654	}
   1655	return ret;
   1656}
   1657
   1658
   1659int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
   1660			  u8 nodenum, u8 *real_master)
   1661{
   1662	int ret;
   1663	struct dlm_master_requery req;
   1664	int status = DLM_LOCK_RES_OWNER_UNKNOWN;
   1665
   1666	memset(&req, 0, sizeof(req));
   1667	req.node_idx = dlm->node_num;
   1668	req.namelen = res->lockname.len;
   1669	memcpy(req.name, res->lockname.name, res->lockname.len);
   1670
   1671resend:
   1672	ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key,
   1673				 &req, sizeof(req), nodenum, &status);
   1674	if (ret < 0)
   1675		mlog(ML_ERROR, "Error %d when sending message %u (key "
   1676		     "0x%x) to node %u\n", ret, DLM_MASTER_REQUERY_MSG,
   1677		     dlm->key, nodenum);
   1678	else if (status == -ENOMEM) {
   1679		mlog_errno(status);
   1680		msleep(50);
   1681		goto resend;
   1682	} else {
   1683		BUG_ON(status < 0);
   1684		BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN);
   1685		*real_master = (u8) (status & 0xff);
   1686		mlog(0, "node %u responded to master requery with %u\n",
   1687			  nodenum, *real_master);
   1688		ret = 0;
   1689	}
   1690	return ret;
   1691}
   1692
   1693
   1694/* this function cannot error, so unless the sending
   1695 * or receiving of the message failed, the owner can
   1696 * be trusted */
   1697int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
   1698			       void **ret_data)
   1699{
   1700	struct dlm_ctxt *dlm = data;
   1701	struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
   1702	struct dlm_lock_resource *res = NULL;
   1703	unsigned int hash;
   1704	int master = DLM_LOCK_RES_OWNER_UNKNOWN;
   1705	u32 flags = DLM_ASSERT_MASTER_REQUERY;
   1706	int dispatched = 0;
   1707
   1708	if (!dlm_grab(dlm)) {
   1709		/* since the domain has gone away on this
   1710		 * node, the proper response is UNKNOWN */
   1711		return master;
   1712	}
   1713
   1714	hash = dlm_lockid_hash(req->name, req->namelen);
   1715
   1716	spin_lock(&dlm->spinlock);
   1717	res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
   1718	if (res) {
   1719		spin_lock(&res->spinlock);
   1720		master = res->owner;
   1721		if (master == dlm->node_num) {
   1722			int ret = dlm_dispatch_assert_master(dlm, res,
   1723							     0, 0, flags);
   1724			if (ret < 0) {
   1725				mlog_errno(ret);
   1726				spin_unlock(&res->spinlock);
   1727				dlm_lockres_put(res);
   1728				spin_unlock(&dlm->spinlock);
   1729				dlm_put(dlm);
   1730				/* sender will take care of this and retry */
   1731				return ret;
   1732			} else {
   1733				dispatched = 1;
   1734				__dlm_lockres_grab_inflight_worker(dlm, res);
   1735				spin_unlock(&res->spinlock);
   1736			}
   1737		} else {
   1738			/* put.. incase we are not the master */
   1739			spin_unlock(&res->spinlock);
   1740			dlm_lockres_put(res);
   1741		}
   1742	}
   1743	spin_unlock(&dlm->spinlock);
   1744
   1745	if (!dispatched)
   1746		dlm_put(dlm);
   1747	return master;
   1748}
   1749
   1750static inline struct list_head *
   1751dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num)
   1752{
   1753	struct list_head *ret;
   1754	BUG_ON(list_num < 0);
   1755	BUG_ON(list_num > 2);
   1756	ret = &(res->granted);
   1757	ret += list_num;
   1758	return ret;
   1759}
   1760/* TODO: do ast flush business
   1761 * TODO: do MIGRATING and RECOVERING spinning
   1762 */
   1763
   1764/*
   1765* NOTE about in-flight requests during migration:
   1766*
   1767* Before attempting the migrate, the master has marked the lockres as
   1768* MIGRATING and then flushed all of its pending ASTS.  So any in-flight
   1769* requests either got queued before the MIGRATING flag got set, in which
   1770* case the lock data will reflect the change and a return message is on
   1771* the way, or the request failed to get in before MIGRATING got set.  In
   1772* this case, the caller will be told to spin and wait for the MIGRATING
   1773* flag to be dropped, then recheck the master.
   1774* This holds true for the convert, cancel and unlock cases, and since lvb
   1775* updates are tied to these same messages, it applies to lvb updates as
   1776* well.  For the lock case, there is no way a lock can be on the master
   1777* queue and not be on the secondary queue since the lock is always added
   1778* locally first.  This means that the new target node will never be sent
   1779* a lock that he doesn't already have on the list.
   1780* In total, this means that the local lock is correct and should not be
   1781* updated to match the one sent by the master.  Any messages sent back
   1782* from the master before the MIGRATING flag will bring the lock properly
   1783* up-to-date, and the change will be ordered properly for the waiter.
   1784* We will *not* attempt to modify the lock underneath the waiter.
   1785*/
   1786
   1787static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
   1788				     struct dlm_lock_resource *res,
   1789				     struct dlm_migratable_lockres *mres)
   1790{
   1791	struct dlm_migratable_lock *ml;
   1792	struct list_head *queue, *iter;
   1793	struct list_head *tmpq = NULL;
   1794	struct dlm_lock *newlock = NULL;
   1795	struct dlm_lockstatus *lksb = NULL;
   1796	int ret = 0;
   1797	int i, j, bad;
   1798	struct dlm_lock *lock;
   1799	u8 from = O2NM_MAX_NODES;
   1800	__be64 c;
   1801
   1802	mlog(0, "running %d locks for this lockres\n", mres->num_locks);
   1803	for (i=0; i<mres->num_locks; i++) {
   1804		ml = &(mres->ml[i]);
   1805
   1806		if (dlm_is_dummy_lock(dlm, ml, &from)) {
   1807			/* placeholder, just need to set the refmap bit */
   1808			BUG_ON(mres->num_locks != 1);
   1809			mlog(0, "%s:%.*s: dummy lock for %u\n",
   1810			     dlm->name, mres->lockname_len, mres->lockname,
   1811			     from);
   1812			spin_lock(&res->spinlock);
   1813			dlm_lockres_set_refmap_bit(dlm, res, from);
   1814			spin_unlock(&res->spinlock);
   1815			break;
   1816		}
   1817		BUG_ON(ml->highest_blocked != LKM_IVMODE);
   1818		newlock = NULL;
   1819		lksb = NULL;
   1820
   1821		queue = dlm_list_num_to_pointer(res, ml->list);
   1822		tmpq = NULL;
   1823
   1824		/* if the lock is for the local node it needs to
   1825		 * be moved to the proper location within the queue.
   1826		 * do not allocate a new lock structure. */
   1827		if (ml->node == dlm->node_num) {
   1828			/* MIGRATION ONLY! */
   1829			BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
   1830
   1831			lock = NULL;
   1832			spin_lock(&res->spinlock);
   1833			for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
   1834				tmpq = dlm_list_idx_to_ptr(res, j);
   1835				list_for_each(iter, tmpq) {
   1836					lock = list_entry(iter,
   1837						  struct dlm_lock, list);
   1838					if (lock->ml.cookie == ml->cookie)
   1839						break;
   1840					lock = NULL;
   1841				}
   1842				if (lock)
   1843					break;
   1844			}
   1845
   1846			/* lock is always created locally first, and
   1847			 * destroyed locally last.  it must be on the list */
   1848			if (!lock) {
   1849				c = ml->cookie;
   1850				mlog(ML_ERROR, "Could not find local lock "
   1851					       "with cookie %u:%llu, node %u, "
   1852					       "list %u, flags 0x%x, type %d, "
   1853					       "conv %d, highest blocked %d\n",
   1854				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
   1855				     dlm_get_lock_cookie_seq(be64_to_cpu(c)),
   1856				     ml->node, ml->list, ml->flags, ml->type,
   1857				     ml->convert_type, ml->highest_blocked);
   1858				__dlm_print_one_lock_resource(res);
   1859				BUG();
   1860			}
   1861
   1862			if (lock->ml.node != ml->node) {
   1863				c = lock->ml.cookie;
   1864				mlog(ML_ERROR, "Mismatched node# in lock "
   1865				     "cookie %u:%llu, name %.*s, node %u\n",
   1866				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
   1867				     dlm_get_lock_cookie_seq(be64_to_cpu(c)),
   1868				     res->lockname.len, res->lockname.name,
   1869				     lock->ml.node);
   1870				c = ml->cookie;
   1871				mlog(ML_ERROR, "Migrate lock cookie %u:%llu, "
   1872				     "node %u, list %u, flags 0x%x, type %d, "
   1873				     "conv %d, highest blocked %d\n",
   1874				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
   1875				     dlm_get_lock_cookie_seq(be64_to_cpu(c)),
   1876				     ml->node, ml->list, ml->flags, ml->type,
   1877				     ml->convert_type, ml->highest_blocked);
   1878				__dlm_print_one_lock_resource(res);
   1879				BUG();
   1880			}
   1881
   1882			if (tmpq != queue) {
   1883				c = ml->cookie;
   1884				mlog(0, "Lock cookie %u:%llu was on list %u "
   1885				     "instead of list %u for %.*s\n",
   1886				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
   1887				     dlm_get_lock_cookie_seq(be64_to_cpu(c)),
   1888				     j, ml->list, res->lockname.len,
   1889				     res->lockname.name);
   1890				__dlm_print_one_lock_resource(res);
   1891				spin_unlock(&res->spinlock);
   1892				continue;
   1893			}
   1894
   1895			/* see NOTE above about why we do not update
   1896			 * to match the master here */
   1897
   1898			/* move the lock to its proper place */
   1899			/* do not alter lock refcount.  switching lists. */
   1900			list_move_tail(&lock->list, queue);
   1901			spin_unlock(&res->spinlock);
   1902
   1903			mlog(0, "just reordered a local lock!\n");
   1904			continue;
   1905		}
   1906
   1907		/* lock is for another node. */
   1908		newlock = dlm_new_lock(ml->type, ml->node,
   1909				       be64_to_cpu(ml->cookie), NULL);
   1910		if (!newlock) {
   1911			ret = -ENOMEM;
   1912			goto leave;
   1913		}
   1914		lksb = newlock->lksb;
   1915		dlm_lock_attach_lockres(newlock, res);
   1916
   1917		if (ml->convert_type != LKM_IVMODE) {
   1918			BUG_ON(queue != &res->converting);
   1919			newlock->ml.convert_type = ml->convert_type;
   1920		}
   1921		lksb->flags |= (ml->flags &
   1922				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
   1923
   1924		if (ml->type == LKM_NLMODE)
   1925			goto skip_lvb;
   1926
   1927		/*
   1928		 * If the lock is in the blocked list it can't have a valid lvb,
   1929		 * so skip it
   1930		 */
   1931		if (ml->list == DLM_BLOCKED_LIST)
   1932			goto skip_lvb;
   1933
   1934		if (!dlm_lvb_is_empty(mres->lvb)) {
   1935			if (lksb->flags & DLM_LKSB_PUT_LVB) {
   1936				/* other node was trying to update
   1937				 * lvb when node died.  recreate the
   1938				 * lksb with the updated lvb. */
   1939				memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
   1940				/* the lock resource lvb update must happen
   1941				 * NOW, before the spinlock is dropped.
   1942				 * we no longer wait for the AST to update
   1943				 * the lvb. */
   1944				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
   1945			} else {
   1946				/* otherwise, the node is sending its
   1947				 * most recent valid lvb info */
   1948				BUG_ON(ml->type != LKM_EXMODE &&
   1949				       ml->type != LKM_PRMODE);
   1950				if (!dlm_lvb_is_empty(res->lvb) &&
   1951 				    (ml->type == LKM_EXMODE ||
   1952 				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
   1953 					int i;
   1954 					mlog(ML_ERROR, "%s:%.*s: received bad "
   1955 					     "lvb! type=%d\n", dlm->name,
   1956 					     res->lockname.len,
   1957 					     res->lockname.name, ml->type);
   1958 					printk("lockres lvb=[");
   1959 					for (i=0; i<DLM_LVB_LEN; i++)
   1960 						printk("%02x", res->lvb[i]);
   1961 					printk("]\nmigrated lvb=[");
   1962 					for (i=0; i<DLM_LVB_LEN; i++)
   1963 						printk("%02x", mres->lvb[i]);
   1964 					printk("]\n");
   1965 					dlm_print_one_lock_resource(res);
   1966 					BUG();
   1967				}
   1968				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
   1969			}
   1970		}
   1971skip_lvb:
   1972
   1973		/* NOTE:
   1974		 * wrt lock queue ordering and recovery:
   1975		 *    1. order of locks on granted queue is
   1976		 *       meaningless.
   1977		 *    2. order of locks on converting queue is
   1978		 *       LOST with the node death.  sorry charlie.
   1979		 *    3. order of locks on the blocked queue is
   1980		 *       also LOST.
   1981		 * order of locks does not affect integrity, it
   1982		 * just means that a lock request may get pushed
   1983		 * back in line as a result of the node death.
   1984		 * also note that for a given node the lock order
   1985		 * for its secondary queue locks is preserved
   1986		 * relative to each other, but clearly *not*
   1987		 * preserved relative to locks from other nodes.
   1988		 */
   1989		bad = 0;
   1990		spin_lock(&res->spinlock);
   1991		list_for_each_entry(lock, queue, list) {
   1992			if (lock->ml.cookie == ml->cookie) {
   1993				c = lock->ml.cookie;
   1994				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
   1995				     "exists on this lockres!\n", dlm->name,
   1996				     res->lockname.len, res->lockname.name,
   1997				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
   1998				     dlm_get_lock_cookie_seq(be64_to_cpu(c)));
   1999
   2000				mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
   2001				     "node=%u, cookie=%u:%llu, queue=%d\n",
   2002	      			     ml->type, ml->convert_type, ml->node,
   2003				     dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
   2004				     dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
   2005				     ml->list);
   2006
   2007				__dlm_print_one_lock_resource(res);
   2008				bad = 1;
   2009				break;
   2010			}
   2011		}
   2012		if (!bad) {
   2013			dlm_lock_get(newlock);
   2014			if (mres->flags & DLM_MRES_RECOVERY &&
   2015					ml->list == DLM_CONVERTING_LIST &&
   2016					newlock->ml.type >
   2017					newlock->ml.convert_type) {
   2018				/* newlock is doing downconvert, add it to the
   2019				 * head of converting list */
   2020				list_add(&newlock->list, queue);
   2021			} else
   2022				list_add_tail(&newlock->list, queue);
   2023			mlog(0, "%s:%.*s: added lock for node %u, "
   2024			     "setting refmap bit\n", dlm->name,
   2025			     res->lockname.len, res->lockname.name, ml->node);
   2026			dlm_lockres_set_refmap_bit(dlm, res, ml->node);
   2027		}
   2028		spin_unlock(&res->spinlock);
   2029	}
   2030	mlog(0, "done running all the locks\n");
   2031
   2032leave:
   2033	/* balance the ref taken when the work was queued */
   2034	spin_lock(&res->spinlock);
   2035	dlm_lockres_drop_inflight_ref(dlm, res);
   2036	spin_unlock(&res->spinlock);
   2037
   2038	if (ret < 0)
   2039		mlog_errno(ret);
   2040
   2041	return ret;
   2042}
   2043
   2044void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
   2045				       struct dlm_lock_resource *res)
   2046{
   2047	int i;
   2048	struct list_head *queue;
   2049	struct dlm_lock *lock, *next;
   2050
   2051	assert_spin_locked(&dlm->spinlock);
   2052	assert_spin_locked(&res->spinlock);
   2053	res->state |= DLM_LOCK_RES_RECOVERING;
   2054	if (!list_empty(&res->recovering)) {
   2055		mlog(0,
   2056		     "Recovering res %s:%.*s, is already on recovery list!\n",
   2057		     dlm->name, res->lockname.len, res->lockname.name);
   2058		list_del_init(&res->recovering);
   2059		dlm_lockres_put(res);
   2060	}
   2061	/* We need to hold a reference while on the recovery list */
   2062	dlm_lockres_get(res);
   2063	list_add_tail(&res->recovering, &dlm->reco.resources);
   2064
   2065	/* find any pending locks and put them back on proper list */
   2066	for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {
   2067		queue = dlm_list_idx_to_ptr(res, i);
   2068		list_for_each_entry_safe(lock, next, queue, list) {
   2069			dlm_lock_get(lock);
   2070			if (lock->convert_pending) {
   2071				/* move converting lock back to granted */
   2072				mlog(0, "node died with convert pending "
   2073				     "on %.*s. move back to granted list.\n",
   2074				     res->lockname.len, res->lockname.name);
   2075				dlm_revert_pending_convert(res, lock);
   2076				lock->convert_pending = 0;
   2077			} else if (lock->lock_pending) {
   2078				/* remove pending lock requests completely */
   2079				BUG_ON(i != DLM_BLOCKED_LIST);
   2080				mlog(0, "node died with lock pending "
   2081				     "on %.*s. remove from blocked list and skip.\n",
   2082				     res->lockname.len, res->lockname.name);
   2083				/* lock will be floating until ref in
   2084				 * dlmlock_remote is freed after the network
   2085				 * call returns.  ok for it to not be on any
   2086				 * list since no ast can be called
   2087				 * (the master is dead). */
   2088				dlm_revert_pending_lock(res, lock);
   2089				lock->lock_pending = 0;
   2090			} else if (lock->unlock_pending) {
   2091				/* if an unlock was in progress, treat as
   2092				 * if this had completed successfully
   2093				 * before sending this lock state to the
   2094				 * new master.  note that the dlm_unlock
   2095				 * call is still responsible for calling
   2096				 * the unlockast.  that will happen after
   2097				 * the network call times out.  for now,
   2098				 * just move lists to prepare the new
   2099				 * recovery master.  */
   2100				BUG_ON(i != DLM_GRANTED_LIST);
   2101				mlog(0, "node died with unlock pending "
   2102				     "on %.*s. remove from blocked list and skip.\n",
   2103				     res->lockname.len, res->lockname.name);
   2104				dlm_commit_pending_unlock(res, lock);
   2105				lock->unlock_pending = 0;
   2106			} else if (lock->cancel_pending) {
   2107				/* if a cancel was in progress, treat as
   2108				 * if this had completed successfully
   2109				 * before sending this lock state to the
   2110				 * new master */
   2111				BUG_ON(i != DLM_CONVERTING_LIST);
   2112				mlog(0, "node died with cancel pending "
   2113				     "on %.*s. move back to granted list.\n",
   2114				     res->lockname.len, res->lockname.name);
   2115				dlm_commit_pending_cancel(res, lock);
   2116				lock->cancel_pending = 0;
   2117			}
   2118			dlm_lock_put(lock);
   2119		}
   2120	}
   2121}
   2122
   2123
   2124
   2125/* removes all recovered locks from the recovery list.
   2126 * sets the res->owner to the new master.
   2127 * unsets the RECOVERY flag and wakes waiters. */
   2128static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
   2129					      u8 dead_node, u8 new_master)
   2130{
   2131	int i;
   2132	struct hlist_head *bucket;
   2133	struct dlm_lock_resource *res, *next;
   2134
   2135	assert_spin_locked(&dlm->spinlock);
   2136
   2137	list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
   2138		if (res->owner == dead_node) {
   2139			mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
   2140			     dlm->name, res->lockname.len, res->lockname.name,
   2141			     res->owner, new_master);
   2142			list_del_init(&res->recovering);
   2143			spin_lock(&res->spinlock);
   2144			/* new_master has our reference from
   2145			 * the lock state sent during recovery */
   2146			dlm_change_lockres_owner(dlm, res, new_master);
   2147			res->state &= ~DLM_LOCK_RES_RECOVERING;
   2148			if (__dlm_lockres_has_locks(res))
   2149				__dlm_dirty_lockres(dlm, res);
   2150			spin_unlock(&res->spinlock);
   2151			wake_up(&res->wq);
   2152			dlm_lockres_put(res);
   2153		}
   2154	}
   2155
   2156	/* this will become unnecessary eventually, but
   2157	 * for now we need to run the whole hash, clear
   2158	 * the RECOVERING state and set the owner
   2159	 * if necessary */
   2160	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
   2161		bucket = dlm_lockres_hash(dlm, i);
   2162		hlist_for_each_entry(res, bucket, hash_node) {
   2163			if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) {
   2164				spin_lock(&res->spinlock);
   2165				res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING;
   2166				spin_unlock(&res->spinlock);
   2167				wake_up(&res->wq);
   2168			}
   2169
   2170			if (!(res->state & DLM_LOCK_RES_RECOVERING))
   2171				continue;
   2172
   2173			if (res->owner != dead_node &&
   2174			    res->owner != dlm->node_num)
   2175				continue;
   2176
   2177			if (!list_empty(&res->recovering)) {
   2178				list_del_init(&res->recovering);
   2179				dlm_lockres_put(res);
   2180			}
   2181
   2182			/* new_master has our reference from
   2183			 * the lock state sent during recovery */
   2184			mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
   2185			     dlm->name, res->lockname.len, res->lockname.name,
   2186			     res->owner, new_master);
   2187			spin_lock(&res->spinlock);
   2188			dlm_change_lockres_owner(dlm, res, new_master);
   2189			res->state &= ~DLM_LOCK_RES_RECOVERING;
   2190			if (__dlm_lockres_has_locks(res))
   2191				__dlm_dirty_lockres(dlm, res);
   2192			spin_unlock(&res->spinlock);
   2193			wake_up(&res->wq);
   2194		}
   2195	}
   2196}
   2197
   2198static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
   2199{
   2200	if (local) {
   2201		if (lock->ml.type != LKM_EXMODE &&
   2202		    lock->ml.type != LKM_PRMODE)
   2203			return 1;
   2204	} else if (lock->ml.type == LKM_EXMODE)
   2205		return 1;
   2206	return 0;
   2207}
   2208
   2209static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
   2210			       struct dlm_lock_resource *res, u8 dead_node)
   2211{
   2212	struct list_head *queue;
   2213	struct dlm_lock *lock;
   2214	int blank_lvb = 0, local = 0;
   2215	int i;
   2216	u8 search_node;
   2217
   2218	assert_spin_locked(&dlm->spinlock);
   2219	assert_spin_locked(&res->spinlock);
   2220
   2221	if (res->owner == dlm->node_num)
   2222		/* if this node owned the lockres, and if the dead node
   2223		 * had an EX when he died, blank out the lvb */
   2224		search_node = dead_node;
   2225	else {
   2226		/* if this is a secondary lockres, and we had no EX or PR
   2227		 * locks granted, we can no longer trust the lvb */
   2228		search_node = dlm->node_num;
   2229		local = 1;  /* check local state for valid lvb */
   2230	}
   2231
   2232	for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
   2233		queue = dlm_list_idx_to_ptr(res, i);
   2234		list_for_each_entry(lock, queue, list) {
   2235			if (lock->ml.node == search_node) {
   2236				if (dlm_lvb_needs_invalidation(lock, local)) {
   2237					/* zero the lksb lvb and lockres lvb */
   2238					blank_lvb = 1;
   2239					memset(lock->lksb->lvb, 0, DLM_LVB_LEN);
   2240				}
   2241			}
   2242		}
   2243	}
   2244
   2245	if (blank_lvb) {
   2246		mlog(0, "clearing %.*s lvb, dead node %u had EX\n",
   2247		     res->lockname.len, res->lockname.name, dead_node);
   2248		memset(res->lvb, 0, DLM_LVB_LEN);
   2249	}
   2250}
   2251
   2252static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
   2253				struct dlm_lock_resource *res, u8 dead_node)
   2254{
   2255	struct dlm_lock *lock, *next;
   2256	unsigned int freed = 0;
   2257
   2258	/* this node is the lockres master:
   2259	 * 1) remove any stale locks for the dead node
   2260	 * 2) if the dead node had an EX when he died, blank out the lvb
   2261	 */
   2262	assert_spin_locked(&dlm->spinlock);
   2263	assert_spin_locked(&res->spinlock);
   2264
   2265	/* We do two dlm_lock_put(). One for removing from list and the other is
   2266	 * to force the DLM_UNLOCK_FREE_LOCK action so as to free the locks */
   2267
   2268	/* TODO: check pending_asts, pending_basts here */
   2269	list_for_each_entry_safe(lock, next, &res->granted, list) {
   2270		if (lock->ml.node == dead_node) {
   2271			list_del_init(&lock->list);
   2272			dlm_lock_put(lock);
   2273			/* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
   2274			dlm_lock_put(lock);
   2275			freed++;
   2276		}
   2277	}
   2278	list_for_each_entry_safe(lock, next, &res->converting, list) {
   2279		if (lock->ml.node == dead_node) {
   2280			list_del_init(&lock->list);
   2281			dlm_lock_put(lock);
   2282			/* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
   2283			dlm_lock_put(lock);
   2284			freed++;
   2285		}
   2286	}
   2287	list_for_each_entry_safe(lock, next, &res->blocked, list) {
   2288		if (lock->ml.node == dead_node) {
   2289			list_del_init(&lock->list);
   2290			dlm_lock_put(lock);
   2291			/* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
   2292			dlm_lock_put(lock);
   2293			freed++;
   2294		}
   2295	}
   2296
   2297	if (freed) {
   2298		mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
   2299		     "dropping ref from lockres\n", dlm->name,
   2300		     res->lockname.len, res->lockname.name, freed, dead_node);
   2301		if(!test_bit(dead_node, res->refmap)) {
   2302			mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, "
   2303			     "but ref was not set\n", dlm->name,
   2304			     res->lockname.len, res->lockname.name, freed, dead_node);
   2305			__dlm_print_one_lock_resource(res);
   2306		}
   2307		res->state |= DLM_LOCK_RES_RECOVERY_WAITING;
   2308		dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
   2309	} else if (test_bit(dead_node, res->refmap)) {
   2310		mlog(0, "%s:%.*s: dead node %u had a ref, but had "
   2311		     "no locks and had not purged before dying\n", dlm->name,
   2312		     res->lockname.len, res->lockname.name, dead_node);
   2313		dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
   2314	}
   2315
   2316	/* do not kick thread yet */
   2317	__dlm_dirty_lockres(dlm, res);
   2318}
   2319
   2320static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
   2321{
   2322	struct dlm_lock_resource *res;
   2323	int i;
   2324	struct hlist_head *bucket;
   2325	struct hlist_node *tmp;
   2326	struct dlm_lock *lock;
   2327
   2328
   2329	/* purge any stale mles */
   2330	dlm_clean_master_list(dlm, dead_node);
   2331
   2332	/*
   2333	 * now clean up all lock resources.  there are two rules:
   2334	 *
   2335	 * 1) if the dead node was the master, move the lockres
   2336	 *    to the recovering list.  set the RECOVERING flag.
   2337	 *    this lockres needs to be cleaned up before it can
   2338	 *    be used further.
   2339	 *
   2340	 * 2) if this node was the master, remove all locks from
   2341	 *    each of the lockres queues that were owned by the
   2342	 *    dead node.  once recovery finishes, the dlm thread
   2343	 *    can be kicked again to see if any ASTs or BASTs
   2344	 *    need to be fired as a result.
   2345	 */
   2346	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
   2347		bucket = dlm_lockres_hash(dlm, i);
   2348		hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
   2349 			/* always prune any $RECOVERY entries for dead nodes,
   2350 			 * otherwise hangs can occur during later recovery */
   2351			if (dlm_is_recovery_lock(res->lockname.name,
   2352						 res->lockname.len)) {
   2353				spin_lock(&res->spinlock);
   2354				list_for_each_entry(lock, &res->granted, list) {
   2355					if (lock->ml.node == dead_node) {
   2356						mlog(0, "AHA! there was "
   2357						     "a $RECOVERY lock for dead "
   2358						     "node %u (%s)!\n",
   2359						     dead_node, dlm->name);
   2360						list_del_init(&lock->list);
   2361						dlm_lock_put(lock);
   2362						/* Can't schedule
   2363						 * DLM_UNLOCK_FREE_LOCK
   2364						 * - do manually */
   2365						dlm_lock_put(lock);
   2366						break;
   2367					}
   2368				}
   2369
   2370				if ((res->owner == dead_node) &&
   2371							(res->state & DLM_LOCK_RES_DROPPING_REF)) {
   2372					dlm_lockres_get(res);
   2373					__dlm_do_purge_lockres(dlm, res);
   2374					spin_unlock(&res->spinlock);
   2375					wake_up(&res->wq);
   2376					dlm_lockres_put(res);
   2377					continue;
   2378				} else if (res->owner == dlm->node_num)
   2379					dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
   2380				spin_unlock(&res->spinlock);
   2381				continue;
   2382			}
   2383			spin_lock(&res->spinlock);
   2384			/* zero the lvb if necessary */
   2385			dlm_revalidate_lvb(dlm, res, dead_node);
   2386			if (res->owner == dead_node) {
   2387				if (res->state & DLM_LOCK_RES_DROPPING_REF) {
   2388					mlog(0, "%s:%.*s: owned by "
   2389						"dead node %u, this node was "
   2390						"dropping its ref when master died. "
   2391						"continue, purging the lockres.\n",
   2392						dlm->name, res->lockname.len,
   2393						res->lockname.name, dead_node);
   2394					dlm_lockres_get(res);
   2395					__dlm_do_purge_lockres(dlm, res);
   2396					spin_unlock(&res->spinlock);
   2397					wake_up(&res->wq);
   2398					dlm_lockres_put(res);
   2399					continue;
   2400				}
   2401				dlm_move_lockres_to_recovery_list(dlm, res);
   2402			} else if (res->owner == dlm->node_num) {
   2403				dlm_free_dead_locks(dlm, res, dead_node);
   2404				__dlm_lockres_calc_usage(dlm, res);
   2405			} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
   2406				if (test_bit(dead_node, res->refmap)) {
   2407					mlog(0, "%s:%.*s: dead node %u had a ref, but had "
   2408						"no locks and had not purged before dying\n",
   2409						dlm->name, res->lockname.len,
   2410						res->lockname.name, dead_node);
   2411					dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
   2412				}
   2413			}
   2414			spin_unlock(&res->spinlock);
   2415		}
   2416	}
   2417
   2418}
   2419
   2420static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
   2421{
   2422	assert_spin_locked(&dlm->spinlock);
   2423
   2424	if (dlm->reco.new_master == idx) {
   2425		mlog(0, "%s: recovery master %d just died\n",
   2426		     dlm->name, idx);
   2427		if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
   2428			/* finalize1 was reached, so it is safe to clear
   2429			 * the new_master and dead_node.  that recovery
   2430			 * is complete. */
   2431			mlog(0, "%s: dead master %d had reached "
   2432			     "finalize1 state, clearing\n", dlm->name, idx);
   2433			dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
   2434			__dlm_reset_recovery(dlm);
   2435		}
   2436	}
   2437
   2438	/* Clean up join state on node death. */
   2439	if (dlm->joining_node == idx) {
   2440		mlog(0, "Clearing join state for node %u\n", idx);
   2441		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
   2442	}
   2443
   2444	/* check to see if the node is already considered dead */
   2445	if (!test_bit(idx, dlm->live_nodes_map)) {
   2446		mlog(0, "for domain %s, node %d is already dead. "
   2447		     "another node likely did recovery already.\n",
   2448		     dlm->name, idx);
   2449		return;
   2450	}
   2451
   2452	/* check to see if we do not care about this node */
   2453	if (!test_bit(idx, dlm->domain_map)) {
   2454		/* This also catches the case that we get a node down
   2455		 * but haven't joined the domain yet. */
   2456		mlog(0, "node %u already removed from domain!\n", idx);
   2457		return;
   2458	}
   2459
   2460	clear_bit(idx, dlm->live_nodes_map);
   2461
   2462	/* make sure local cleanup occurs before the heartbeat events */
   2463	if (!test_bit(idx, dlm->recovery_map))
   2464		dlm_do_local_recovery_cleanup(dlm, idx);
   2465
   2466	/* notify anything attached to the heartbeat events */
   2467	dlm_hb_event_notify_attached(dlm, idx, 0);
   2468
   2469	mlog(0, "node %u being removed from domain map!\n", idx);
   2470	clear_bit(idx, dlm->domain_map);
   2471	clear_bit(idx, dlm->exit_domain_map);
   2472	/* wake up migration waiters if a node goes down.
   2473	 * perhaps later we can genericize this for other waiters. */
   2474	wake_up(&dlm->migration_wq);
   2475
   2476	set_bit(idx, dlm->recovery_map);
   2477}
   2478
   2479void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data)
   2480{
   2481	struct dlm_ctxt *dlm = data;
   2482
   2483	if (!dlm_grab(dlm))
   2484		return;
   2485
   2486	/*
   2487	 * This will notify any dlm users that a node in our domain
   2488	 * went away without notifying us first.
   2489	 */
   2490	if (test_bit(idx, dlm->domain_map))
   2491		dlm_fire_domain_eviction_callbacks(dlm, idx);
   2492
   2493	spin_lock(&dlm->spinlock);
   2494	__dlm_hb_node_down(dlm, idx);
   2495	spin_unlock(&dlm->spinlock);
   2496
   2497	dlm_put(dlm);
   2498}
   2499
   2500void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
   2501{
   2502	struct dlm_ctxt *dlm = data;
   2503
   2504	if (!dlm_grab(dlm))
   2505		return;
   2506
   2507	spin_lock(&dlm->spinlock);
   2508	set_bit(idx, dlm->live_nodes_map);
   2509	/* do NOT notify mle attached to the heartbeat events.
   2510	 * new nodes are not interesting in mastery until joined. */
   2511	spin_unlock(&dlm->spinlock);
   2512
   2513	dlm_put(dlm);
   2514}
   2515
   2516static void dlm_reco_ast(void *astdata)
   2517{
   2518	struct dlm_ctxt *dlm = astdata;
   2519	mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n",
   2520	     dlm->node_num, dlm->name);
   2521}
   2522static void dlm_reco_bast(void *astdata, int blocked_type)
   2523{
   2524	struct dlm_ctxt *dlm = astdata;
   2525	mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n",
   2526	     dlm->node_num, dlm->name);
   2527}
   2528static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
   2529{
   2530	mlog(0, "unlockast for recovery lock fired!\n");
   2531}
   2532
   2533/*
   2534 * dlm_pick_recovery_master will continually attempt to use
   2535 * dlmlock() on the special "$RECOVERY" lockres with the
   2536 * LKM_NOQUEUE flag to get an EX.  every thread that enters
   2537 * this function on each node racing to become the recovery
   2538 * master will not stop attempting this until either:
   2539 * a) this node gets the EX (and becomes the recovery master),
   2540 * or b) dlm->reco.new_master gets set to some nodenum
   2541 * != O2NM_INVALID_NODE_NUM (another node will do the reco).
   2542 * so each time a recovery master is needed, the entire cluster
   2543 * will sync at this point.  if the new master dies, that will
   2544 * be detected in dlm_do_recovery */
   2545static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
   2546{
   2547	enum dlm_status ret;
   2548	struct dlm_lockstatus lksb;
   2549	int status = -EINVAL;
   2550
   2551	mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
   2552	     dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
   2553again:
   2554	memset(&lksb, 0, sizeof(lksb));
   2555
   2556	ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
   2557		      DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
   2558		      dlm_reco_ast, dlm, dlm_reco_bast);
   2559
   2560	mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
   2561	     dlm->name, ret, lksb.status);
   2562
   2563	if (ret == DLM_NORMAL) {
   2564		mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
   2565		     dlm->name, dlm->node_num);
   2566
   2567		/* got the EX lock.  check to see if another node
   2568		 * just became the reco master */
   2569		if (dlm_reco_master_ready(dlm)) {
   2570			mlog(0, "%s: got reco EX lock, but %u will "
   2571			     "do the recovery\n", dlm->name,
   2572			     dlm->reco.new_master);
   2573			status = -EEXIST;
   2574		} else {
   2575			status = 0;
   2576
   2577			/* see if recovery was already finished elsewhere */
   2578			spin_lock(&dlm->spinlock);
   2579			if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
   2580				status = -EINVAL;
   2581				mlog(0, "%s: got reco EX lock, but "
   2582				     "node got recovered already\n", dlm->name);
   2583				if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
   2584					mlog(ML_ERROR, "%s: new master is %u "
   2585					     "but no dead node!\n",
   2586					     dlm->name, dlm->reco.new_master);
   2587					BUG();
   2588				}
   2589			}
   2590			spin_unlock(&dlm->spinlock);
   2591		}
   2592
   2593		/* if this node has actually become the recovery master,
   2594		 * set the master and send the messages to begin recovery */
   2595		if (!status) {
   2596			mlog(0, "%s: dead=%u, this=%u, sending "
   2597			     "begin_reco now\n", dlm->name,
   2598			     dlm->reco.dead_node, dlm->node_num);
   2599			status = dlm_send_begin_reco_message(dlm,
   2600				      dlm->reco.dead_node);
   2601			/* this always succeeds */
   2602			BUG_ON(status);
   2603
   2604			/* set the new_master to this node */
   2605			spin_lock(&dlm->spinlock);
   2606			dlm_set_reco_master(dlm, dlm->node_num);
   2607			spin_unlock(&dlm->spinlock);
   2608		}
   2609
   2610		/* recovery lock is a special case.  ast will not get fired,
   2611		 * so just go ahead and unlock it. */
   2612		ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
   2613		if (ret == DLM_DENIED) {
   2614			mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
   2615			ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
   2616		}
   2617		if (ret != DLM_NORMAL) {
   2618			/* this would really suck. this could only happen
   2619			 * if there was a network error during the unlock
   2620			 * because of node death.  this means the unlock
   2621			 * is actually "done" and the lock structure is
   2622			 * even freed.  we can continue, but only
   2623			 * because this specific lock name is special. */
   2624			mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
   2625		}
   2626	} else if (ret == DLM_NOTQUEUED) {
   2627		mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
   2628		     dlm->name, dlm->node_num);
   2629		/* another node is master. wait on
   2630		 * reco.new_master != O2NM_INVALID_NODE_NUM
   2631		 * for at most one second */
   2632		wait_event_timeout(dlm->dlm_reco_thread_wq,
   2633					 dlm_reco_master_ready(dlm),
   2634					 msecs_to_jiffies(1000));
   2635		if (!dlm_reco_master_ready(dlm)) {
   2636			mlog(0, "%s: reco master taking awhile\n",
   2637			     dlm->name);
   2638			goto again;
   2639		}
   2640		/* another node has informed this one that it is reco master */
   2641		mlog(0, "%s: reco master %u is ready to recover %u\n",
   2642		     dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
   2643		status = -EEXIST;
   2644	} else if (ret == DLM_RECOVERING) {
   2645		mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
   2646		     dlm->name, dlm->node_num);
   2647		goto again;
   2648	} else {
   2649		struct dlm_lock_resource *res;
   2650
   2651		/* dlmlock returned something other than NOTQUEUED or NORMAL */
   2652		mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
   2653		     "lksb.status=%s\n", dlm->name, dlm_errname(ret),
   2654		     dlm_errname(lksb.status));
   2655		res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
   2656					 DLM_RECOVERY_LOCK_NAME_LEN);
   2657		if (res) {
   2658			dlm_print_one_lock_resource(res);
   2659			dlm_lockres_put(res);
   2660		} else {
   2661			mlog(ML_ERROR, "recovery lock not found\n");
   2662		}
   2663		BUG();
   2664	}
   2665
   2666	return status;
   2667}
   2668
   2669static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
   2670{
   2671	struct dlm_begin_reco br;
   2672	int ret = 0;
   2673	struct dlm_node_iter iter;
   2674	int nodenum;
   2675	int status;
   2676
   2677	mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
   2678
   2679	spin_lock(&dlm->spinlock);
   2680	dlm_node_iter_init(dlm->domain_map, &iter);
   2681	spin_unlock(&dlm->spinlock);
   2682
   2683	clear_bit(dead_node, iter.node_map);
   2684
   2685	memset(&br, 0, sizeof(br));
   2686	br.node_idx = dlm->node_num;
   2687	br.dead_node = dead_node;
   2688
   2689	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
   2690		ret = 0;
   2691		if (nodenum == dead_node) {
   2692			mlog(0, "not sending begin reco to dead node "
   2693				  "%u\n", dead_node);
   2694			continue;
   2695		}
   2696		if (nodenum == dlm->node_num) {
   2697			mlog(0, "not sending begin reco to self\n");
   2698			continue;
   2699		}
   2700retry:
   2701		mlog(0, "attempting to send begin reco msg to %d\n",
   2702			  nodenum);
   2703		ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key,
   2704					 &br, sizeof(br), nodenum, &status);
   2705		/* negative status is handled ok by caller here */
   2706		if (ret >= 0)
   2707			ret = status;
   2708		if (dlm_is_host_down(ret)) {
   2709			/* node is down.  not involved in recovery
   2710			 * so just keep going */
   2711			mlog(ML_NOTICE, "%s: node %u was down when sending "
   2712			     "begin reco msg (%d)\n", dlm->name, nodenum, ret);
   2713			ret = 0;
   2714		}
   2715
   2716		/*
   2717		 * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8,
   2718		 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN.
   2719		 * We are handling both for compatibility reasons.
   2720		 */
   2721		if (ret == -EAGAIN || ret == EAGAIN) {
   2722			mlog(0, "%s: trying to start recovery of node "
   2723			     "%u, but node %u is waiting for last recovery "
   2724			     "to complete, backoff for a bit\n", dlm->name,
   2725			     dead_node, nodenum);
   2726			msleep(100);
   2727			goto retry;
   2728		}
   2729		if (ret < 0) {
   2730			struct dlm_lock_resource *res;
   2731
   2732			/* this is now a serious problem, possibly ENOMEM
   2733			 * in the network stack.  must retry */
   2734			mlog_errno(ret);
   2735			mlog(ML_ERROR, "begin reco of dlm %s to node %u "
   2736			     "returned %d\n", dlm->name, nodenum, ret);
   2737			res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
   2738						 DLM_RECOVERY_LOCK_NAME_LEN);
   2739			if (res) {
   2740				dlm_print_one_lock_resource(res);
   2741				dlm_lockres_put(res);
   2742			} else {
   2743				mlog(ML_ERROR, "recovery lock not found\n");
   2744			}
   2745			/* sleep for a bit in hopes that we can avoid
   2746			 * another ENOMEM */
   2747			msleep(100);
   2748			goto retry;
   2749		}
   2750	}
   2751
   2752	return ret;
   2753}
   2754
   2755int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
   2756			   void **ret_data)
   2757{
   2758	struct dlm_ctxt *dlm = data;
   2759	struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
   2760
   2761	/* ok to return 0, domain has gone away */
   2762	if (!dlm_grab(dlm))
   2763		return 0;
   2764
   2765	spin_lock(&dlm->spinlock);
   2766	if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
   2767		mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
   2768		     "but this node is in finalize state, waiting on finalize2\n",
   2769		     dlm->name, br->node_idx, br->dead_node,
   2770		     dlm->reco.dead_node, dlm->reco.new_master);
   2771		spin_unlock(&dlm->spinlock);
   2772		dlm_put(dlm);
   2773		return -EAGAIN;
   2774	}
   2775	spin_unlock(&dlm->spinlock);
   2776
   2777	mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
   2778	     dlm->name, br->node_idx, br->dead_node,
   2779	     dlm->reco.dead_node, dlm->reco.new_master);
   2780
   2781	dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
   2782
   2783	spin_lock(&dlm->spinlock);
   2784	if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
   2785		if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
   2786			mlog(0, "%s: new_master %u died, changing "
   2787			     "to %u\n", dlm->name, dlm->reco.new_master,
   2788			     br->node_idx);
   2789		} else {
   2790			mlog(0, "%s: new_master %u NOT DEAD, changing "
   2791			     "to %u\n", dlm->name, dlm->reco.new_master,
   2792			     br->node_idx);
   2793			/* may not have seen the new master as dead yet */
   2794		}
   2795	}
   2796	if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
   2797		mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
   2798		     "node %u changing it to %u\n", dlm->name,
   2799		     dlm->reco.dead_node, br->node_idx, br->dead_node);
   2800	}
   2801	dlm_set_reco_master(dlm, br->node_idx);
   2802	dlm_set_reco_dead_node(dlm, br->dead_node);
   2803	if (!test_bit(br->dead_node, dlm->recovery_map)) {
   2804		mlog(0, "recovery master %u sees %u as dead, but this "
   2805		     "node has not yet.  marking %u as dead\n",
   2806		     br->node_idx, br->dead_node, br->dead_node);
   2807		if (!test_bit(br->dead_node, dlm->domain_map) ||
   2808		    !test_bit(br->dead_node, dlm->live_nodes_map))
   2809			mlog(0, "%u not in domain/live_nodes map "
   2810			     "so setting it in reco map manually\n",
   2811			     br->dead_node);
   2812		/* force the recovery cleanup in __dlm_hb_node_down
   2813		 * both of these will be cleared in a moment */
   2814		set_bit(br->dead_node, dlm->domain_map);
   2815		set_bit(br->dead_node, dlm->live_nodes_map);
   2816		__dlm_hb_node_down(dlm, br->dead_node);
   2817	}
   2818	spin_unlock(&dlm->spinlock);
   2819
   2820	dlm_kick_recovery_thread(dlm);
   2821
   2822	mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
   2823	     dlm->name, br->node_idx, br->dead_node,
   2824	     dlm->reco.dead_node, dlm->reco.new_master);
   2825
   2826	dlm_put(dlm);
   2827	return 0;
   2828}
   2829
   2830#define DLM_FINALIZE_STAGE2  0x01
   2831static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
   2832{
   2833	int ret = 0;
   2834	struct dlm_finalize_reco fr;
   2835	struct dlm_node_iter iter;
   2836	int nodenum;
   2837	int status;
   2838	int stage = 1;
   2839
   2840	mlog(0, "finishing recovery for node %s:%u, "
   2841	     "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
   2842
   2843	spin_lock(&dlm->spinlock);
   2844	dlm_node_iter_init(dlm->domain_map, &iter);
   2845	spin_unlock(&dlm->spinlock);
   2846
   2847stage2:
   2848	memset(&fr, 0, sizeof(fr));
   2849	fr.node_idx = dlm->node_num;
   2850	fr.dead_node = dlm->reco.dead_node;
   2851	if (stage == 2)
   2852		fr.flags |= DLM_FINALIZE_STAGE2;
   2853
   2854	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
   2855		if (nodenum == dlm->node_num)
   2856			continue;
   2857		ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
   2858					 &fr, sizeof(fr), nodenum, &status);
   2859		if (ret >= 0)
   2860			ret = status;
   2861		if (ret < 0) {
   2862			mlog(ML_ERROR, "Error %d when sending message %u (key "
   2863			     "0x%x) to node %u\n", ret, DLM_FINALIZE_RECO_MSG,
   2864			     dlm->key, nodenum);
   2865			if (dlm_is_host_down(ret)) {
   2866				/* this has no effect on this recovery
   2867				 * session, so set the status to zero to
   2868				 * finish out the last recovery */
   2869				mlog(ML_ERROR, "node %u went down after this "
   2870				     "node finished recovery.\n", nodenum);
   2871				ret = 0;
   2872				continue;
   2873			}
   2874			break;
   2875		}
   2876	}
   2877	if (stage == 1) {
   2878		/* reset the node_iter back to the top and send finalize2 */
   2879		iter.curnode = -1;
   2880		stage = 2;
   2881		goto stage2;
   2882	}
   2883
   2884	return ret;
   2885}
   2886
   2887int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
   2888			      void **ret_data)
   2889{
   2890	struct dlm_ctxt *dlm = data;
   2891	struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
   2892	int stage = 1;
   2893
   2894	/* ok to return 0, domain has gone away */
   2895	if (!dlm_grab(dlm))
   2896		return 0;
   2897
   2898	if (fr->flags & DLM_FINALIZE_STAGE2)
   2899		stage = 2;
   2900
   2901	mlog(0, "%s: node %u finalizing recovery stage%d of "
   2902	     "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
   2903	     fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
   2904
   2905	spin_lock(&dlm->spinlock);
   2906
   2907	if (dlm->reco.new_master != fr->node_idx) {
   2908		mlog(ML_ERROR, "node %u sent recovery finalize msg, but node "
   2909		     "%u is supposed to be the new master, dead=%u\n",
   2910		     fr->node_idx, dlm->reco.new_master, fr->dead_node);
   2911		BUG();
   2912	}
   2913	if (dlm->reco.dead_node != fr->dead_node) {
   2914		mlog(ML_ERROR, "node %u sent recovery finalize msg for dead "
   2915		     "node %u, but node %u is supposed to be dead\n",
   2916		     fr->node_idx, fr->dead_node, dlm->reco.dead_node);
   2917		BUG();
   2918	}
   2919
   2920	switch (stage) {
   2921		case 1:
   2922			dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
   2923			if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
   2924				mlog(ML_ERROR, "%s: received finalize1 from "
   2925				     "new master %u for dead node %u, but "
   2926				     "this node has already received it!\n",
   2927				     dlm->name, fr->node_idx, fr->dead_node);
   2928				dlm_print_reco_node_status(dlm);
   2929				BUG();
   2930			}
   2931			dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
   2932			spin_unlock(&dlm->spinlock);
   2933			break;
   2934		case 2:
   2935			if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
   2936				mlog(ML_ERROR, "%s: received finalize2 from "
   2937				     "new master %u for dead node %u, but "
   2938				     "this node did not have finalize1!\n",
   2939				     dlm->name, fr->node_idx, fr->dead_node);
   2940				dlm_print_reco_node_status(dlm);
   2941				BUG();
   2942			}
   2943			dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
   2944			__dlm_reset_recovery(dlm);
   2945			spin_unlock(&dlm->spinlock);
   2946			dlm_kick_recovery_thread(dlm);
   2947			break;
   2948	}
   2949
   2950	mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
   2951	     dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
   2952
   2953	dlm_put(dlm);
   2954	return 0;
   2955}