cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

dlmmaster.c (99592B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * dlmmod.c
      4 *
      5 * standalone DLM module
      6 *
      7 * Copyright (C) 2004 Oracle.  All rights reserved.
      8 */
      9
     10
     11#include <linux/module.h>
     12#include <linux/fs.h>
     13#include <linux/types.h>
     14#include <linux/slab.h>
     15#include <linux/highmem.h>
     16#include <linux/init.h>
     17#include <linux/sysctl.h>
     18#include <linux/random.h>
     19#include <linux/blkdev.h>
     20#include <linux/socket.h>
     21#include <linux/inet.h>
     22#include <linux/spinlock.h>
     23#include <linux/delay.h>
     24
     25
     26#include "../cluster/heartbeat.h"
     27#include "../cluster/nodemanager.h"
     28#include "../cluster/tcp.h"
     29
     30#include "dlmapi.h"
     31#include "dlmcommon.h"
     32#include "dlmdomain.h"
     33#include "dlmdebug.h"
     34
     35#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
     36#include "../cluster/masklog.h"
     37
     38static void dlm_mle_node_down(struct dlm_ctxt *dlm,
     39			      struct dlm_master_list_entry *mle,
     40			      struct o2nm_node *node,
     41			      int idx);
     42static void dlm_mle_node_up(struct dlm_ctxt *dlm,
     43			    struct dlm_master_list_entry *mle,
     44			    struct o2nm_node *node,
     45			    int idx);
     46
     47static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
     48static int dlm_do_assert_master(struct dlm_ctxt *dlm,
     49				struct dlm_lock_resource *res,
     50				void *nodemap, u32 flags);
     51static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
     52
     53static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
     54				struct dlm_master_list_entry *mle,
     55				const char *name,
     56				unsigned int namelen)
     57{
     58	if (dlm != mle->dlm)
     59		return 0;
     60
     61	if (namelen != mle->mnamelen ||
     62	    memcmp(name, mle->mname, namelen) != 0)
     63		return 0;
     64
     65	return 1;
     66}
     67
     68static struct kmem_cache *dlm_lockres_cache;
     69static struct kmem_cache *dlm_lockname_cache;
     70static struct kmem_cache *dlm_mle_cache;
     71
     72static void dlm_mle_release(struct kref *kref);
     73static void dlm_init_mle(struct dlm_master_list_entry *mle,
     74			enum dlm_mle_type type,
     75			struct dlm_ctxt *dlm,
     76			struct dlm_lock_resource *res,
     77			const char *name,
     78			unsigned int namelen);
     79static void dlm_put_mle(struct dlm_master_list_entry *mle);
     80static void __dlm_put_mle(struct dlm_master_list_entry *mle);
     81static int dlm_find_mle(struct dlm_ctxt *dlm,
     82			struct dlm_master_list_entry **mle,
     83			char *name, unsigned int namelen);
     84
     85static int dlm_do_master_request(struct dlm_lock_resource *res,
     86				 struct dlm_master_list_entry *mle, int to);
     87
     88
     89static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
     90				     struct dlm_lock_resource *res,
     91				     struct dlm_master_list_entry *mle,
     92				     int *blocked);
     93static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
     94				    struct dlm_lock_resource *res,
     95				    struct dlm_master_list_entry *mle,
     96				    int blocked);
     97static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
     98				 struct dlm_lock_resource *res,
     99				 struct dlm_master_list_entry *mle,
    100				 struct dlm_master_list_entry **oldmle,
    101				 const char *name, unsigned int namelen,
    102				 u8 new_master, u8 master);
    103
    104static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
    105				    struct dlm_lock_resource *res);
    106static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
    107				      struct dlm_lock_resource *res);
    108static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
    109				       struct dlm_lock_resource *res,
    110				       u8 target);
    111static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
    112				       struct dlm_lock_resource *res);
    113
    114
    115int dlm_is_host_down(int errno)
    116{
    117	switch (errno) {
    118		case -EBADF:
    119		case -ECONNREFUSED:
    120		case -ENOTCONN:
    121		case -ECONNRESET:
    122		case -EPIPE:
    123		case -EHOSTDOWN:
    124		case -EHOSTUNREACH:
    125		case -ETIMEDOUT:
    126		case -ECONNABORTED:
    127		case -ENETDOWN:
    128		case -ENETUNREACH:
    129		case -ENETRESET:
    130		case -ESHUTDOWN:
    131		case -ENOPROTOOPT:
    132		case -EINVAL:   /* if returned from our tcp code,
    133				   this means there is no socket */
    134			return 1;
    135	}
    136	return 0;
    137}
    138
    139
    140/*
    141 * MASTER LIST FUNCTIONS
    142 */
    143
    144
    145/*
    146 * regarding master list entries and heartbeat callbacks:
    147 *
    148 * in order to avoid sleeping and allocation that occurs in
    149 * heartbeat, master list entries are simply attached to the
    150 * dlm's established heartbeat callbacks.  the mle is attached
    151 * when it is created, and since the dlm->spinlock is held at
    152 * that time, any heartbeat event will be properly discovered
    153 * by the mle.  the mle needs to be detached from the
    154 * dlm->mle_hb_events list as soon as heartbeat events are no
    155 * longer useful to the mle, and before the mle is freed.
    156 *
    157 * as a general rule, heartbeat events are no longer needed by
    158 * the mle once an "answer" regarding the lock master has been
    159 * received.
    160 */
    161static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
    162					      struct dlm_master_list_entry *mle)
    163{
    164	assert_spin_locked(&dlm->spinlock);
    165
    166	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
    167}
    168
    169
    170static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
    171					      struct dlm_master_list_entry *mle)
    172{
    173	if (!list_empty(&mle->hb_events))
    174		list_del_init(&mle->hb_events);
    175}
    176
    177
    178static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
    179					    struct dlm_master_list_entry *mle)
    180{
    181	spin_lock(&dlm->spinlock);
    182	__dlm_mle_detach_hb_events(dlm, mle);
    183	spin_unlock(&dlm->spinlock);
    184}
    185
    186static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
    187{
    188	struct dlm_ctxt *dlm;
    189	dlm = mle->dlm;
    190
    191	assert_spin_locked(&dlm->spinlock);
    192	assert_spin_locked(&dlm->master_lock);
    193	mle->inuse++;
    194	kref_get(&mle->mle_refs);
    195}
    196
    197static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
    198{
    199	struct dlm_ctxt *dlm;
    200	dlm = mle->dlm;
    201
    202	spin_lock(&dlm->spinlock);
    203	spin_lock(&dlm->master_lock);
    204	mle->inuse--;
    205	__dlm_put_mle(mle);
    206	spin_unlock(&dlm->master_lock);
    207	spin_unlock(&dlm->spinlock);
    208
    209}
    210
    211/* remove from list and free */
    212static void __dlm_put_mle(struct dlm_master_list_entry *mle)
    213{
    214	struct dlm_ctxt *dlm;
    215	dlm = mle->dlm;
    216
    217	assert_spin_locked(&dlm->spinlock);
    218	assert_spin_locked(&dlm->master_lock);
    219	if (!kref_read(&mle->mle_refs)) {
    220		/* this may or may not crash, but who cares.
    221		 * it's a BUG. */
    222		mlog(ML_ERROR, "bad mle: %p\n", mle);
    223		dlm_print_one_mle(mle);
    224		BUG();
    225	} else
    226		kref_put(&mle->mle_refs, dlm_mle_release);
    227}
    228
    229
    230/* must not have any spinlocks coming in */
    231static void dlm_put_mle(struct dlm_master_list_entry *mle)
    232{
    233	struct dlm_ctxt *dlm;
    234	dlm = mle->dlm;
    235
    236	spin_lock(&dlm->spinlock);
    237	spin_lock(&dlm->master_lock);
    238	__dlm_put_mle(mle);
    239	spin_unlock(&dlm->master_lock);
    240	spin_unlock(&dlm->spinlock);
    241}
    242
    243static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
    244{
    245	kref_get(&mle->mle_refs);
    246}
    247
    248static void dlm_init_mle(struct dlm_master_list_entry *mle,
    249			enum dlm_mle_type type,
    250			struct dlm_ctxt *dlm,
    251			struct dlm_lock_resource *res,
    252			const char *name,
    253			unsigned int namelen)
    254{
    255	assert_spin_locked(&dlm->spinlock);
    256
    257	mle->dlm = dlm;
    258	mle->type = type;
    259	INIT_HLIST_NODE(&mle->master_hash_node);
    260	INIT_LIST_HEAD(&mle->hb_events);
    261	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
    262	spin_lock_init(&mle->spinlock);
    263	init_waitqueue_head(&mle->wq);
    264	atomic_set(&mle->woken, 0);
    265	kref_init(&mle->mle_refs);
    266	memset(mle->response_map, 0, sizeof(mle->response_map));
    267	mle->master = O2NM_MAX_NODES;
    268	mle->new_master = O2NM_MAX_NODES;
    269	mle->inuse = 0;
    270
    271	BUG_ON(mle->type != DLM_MLE_BLOCK &&
    272	       mle->type != DLM_MLE_MASTER &&
    273	       mle->type != DLM_MLE_MIGRATION);
    274
    275	if (mle->type == DLM_MLE_MASTER) {
    276		BUG_ON(!res);
    277		mle->mleres = res;
    278		memcpy(mle->mname, res->lockname.name, res->lockname.len);
    279		mle->mnamelen = res->lockname.len;
    280		mle->mnamehash = res->lockname.hash;
    281	} else {
    282		BUG_ON(!name);
    283		mle->mleres = NULL;
    284		memcpy(mle->mname, name, namelen);
    285		mle->mnamelen = namelen;
    286		mle->mnamehash = dlm_lockid_hash(name, namelen);
    287	}
    288
    289	atomic_inc(&dlm->mle_tot_count[mle->type]);
    290	atomic_inc(&dlm->mle_cur_count[mle->type]);
    291
    292	/* copy off the node_map and register hb callbacks on our copy */
    293	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
    294	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
    295	clear_bit(dlm->node_num, mle->vote_map);
    296	clear_bit(dlm->node_num, mle->node_map);
    297
    298	/* attach the mle to the domain node up/down events */
    299	__dlm_mle_attach_hb_events(dlm, mle);
    300}
    301
    302void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
    303{
    304	assert_spin_locked(&dlm->spinlock);
    305	assert_spin_locked(&dlm->master_lock);
    306
    307	if (!hlist_unhashed(&mle->master_hash_node))
    308		hlist_del_init(&mle->master_hash_node);
    309}
    310
    311void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
    312{
    313	struct hlist_head *bucket;
    314
    315	assert_spin_locked(&dlm->master_lock);
    316
    317	bucket = dlm_master_hash(dlm, mle->mnamehash);
    318	hlist_add_head(&mle->master_hash_node, bucket);
    319}
    320
    321/* returns 1 if found, 0 if not */
    322static int dlm_find_mle(struct dlm_ctxt *dlm,
    323			struct dlm_master_list_entry **mle,
    324			char *name, unsigned int namelen)
    325{
    326	struct dlm_master_list_entry *tmpmle;
    327	struct hlist_head *bucket;
    328	unsigned int hash;
    329
    330	assert_spin_locked(&dlm->master_lock);
    331
    332	hash = dlm_lockid_hash(name, namelen);
    333	bucket = dlm_master_hash(dlm, hash);
    334	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
    335		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
    336			continue;
    337		dlm_get_mle(tmpmle);
    338		*mle = tmpmle;
    339		return 1;
    340	}
    341	return 0;
    342}
    343
    344void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
    345{
    346	struct dlm_master_list_entry *mle;
    347
    348	assert_spin_locked(&dlm->spinlock);
    349
    350	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
    351		if (node_up)
    352			dlm_mle_node_up(dlm, mle, NULL, idx);
    353		else
    354			dlm_mle_node_down(dlm, mle, NULL, idx);
    355	}
    356}
    357
    358static void dlm_mle_node_down(struct dlm_ctxt *dlm,
    359			      struct dlm_master_list_entry *mle,
    360			      struct o2nm_node *node, int idx)
    361{
    362	spin_lock(&mle->spinlock);
    363
    364	if (!test_bit(idx, mle->node_map))
    365		mlog(0, "node %u already removed from nodemap!\n", idx);
    366	else
    367		clear_bit(idx, mle->node_map);
    368
    369	spin_unlock(&mle->spinlock);
    370}
    371
    372static void dlm_mle_node_up(struct dlm_ctxt *dlm,
    373			    struct dlm_master_list_entry *mle,
    374			    struct o2nm_node *node, int idx)
    375{
    376	spin_lock(&mle->spinlock);
    377
    378	if (test_bit(idx, mle->node_map))
    379		mlog(0, "node %u already in node map!\n", idx);
    380	else
    381		set_bit(idx, mle->node_map);
    382
    383	spin_unlock(&mle->spinlock);
    384}
    385
    386
    387int dlm_init_mle_cache(void)
    388{
    389	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
    390					  sizeof(struct dlm_master_list_entry),
    391					  0, SLAB_HWCACHE_ALIGN,
    392					  NULL);
    393	if (dlm_mle_cache == NULL)
    394		return -ENOMEM;
    395	return 0;
    396}
    397
    398void dlm_destroy_mle_cache(void)
    399{
    400	kmem_cache_destroy(dlm_mle_cache);
    401}
    402
    403static void dlm_mle_release(struct kref *kref)
    404{
    405	struct dlm_master_list_entry *mle;
    406	struct dlm_ctxt *dlm;
    407
    408	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
    409	dlm = mle->dlm;
    410
    411	assert_spin_locked(&dlm->spinlock);
    412	assert_spin_locked(&dlm->master_lock);
    413
    414	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
    415	     mle->type);
    416
    417	/* remove from list if not already */
    418	__dlm_unlink_mle(dlm, mle);
    419
    420	/* detach the mle from the domain node up/down events */
    421	__dlm_mle_detach_hb_events(dlm, mle);
    422
    423	atomic_dec(&dlm->mle_cur_count[mle->type]);
    424
    425	/* NOTE: kfree under spinlock here.
    426	 * if this is bad, we can move this to a freelist. */
    427	kmem_cache_free(dlm_mle_cache, mle);
    428}
    429
    430
    431/*
    432 * LOCK RESOURCE FUNCTIONS
    433 */
    434
    435int dlm_init_master_caches(void)
    436{
    437	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
    438					      sizeof(struct dlm_lock_resource),
    439					      0, SLAB_HWCACHE_ALIGN, NULL);
    440	if (!dlm_lockres_cache)
    441		goto bail;
    442
    443	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
    444					       DLM_LOCKID_NAME_MAX, 0,
    445					       SLAB_HWCACHE_ALIGN, NULL);
    446	if (!dlm_lockname_cache)
    447		goto bail;
    448
    449	return 0;
    450bail:
    451	dlm_destroy_master_caches();
    452	return -ENOMEM;
    453}
    454
    455void dlm_destroy_master_caches(void)
    456{
    457	kmem_cache_destroy(dlm_lockname_cache);
    458	dlm_lockname_cache = NULL;
    459
    460	kmem_cache_destroy(dlm_lockres_cache);
    461	dlm_lockres_cache = NULL;
    462}
    463
    464static void dlm_lockres_release(struct kref *kref)
    465{
    466	struct dlm_lock_resource *res;
    467	struct dlm_ctxt *dlm;
    468
    469	res = container_of(kref, struct dlm_lock_resource, refs);
    470	dlm = res->dlm;
    471
    472	/* This should not happen -- all lockres' have a name
    473	 * associated with them at init time. */
    474	BUG_ON(!res->lockname.name);
    475
    476	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
    477	     res->lockname.name);
    478
    479	atomic_dec(&dlm->res_cur_count);
    480
    481	if (!hlist_unhashed(&res->hash_node) ||
    482	    !list_empty(&res->granted) ||
    483	    !list_empty(&res->converting) ||
    484	    !list_empty(&res->blocked) ||
    485	    !list_empty(&res->dirty) ||
    486	    !list_empty(&res->recovering) ||
    487	    !list_empty(&res->purge)) {
    488		mlog(ML_ERROR,
    489		     "Going to BUG for resource %.*s."
    490		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
    491		     res->lockname.len, res->lockname.name,
    492		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
    493		     !list_empty(&res->granted) ? 'G' : ' ',
    494		     !list_empty(&res->converting) ? 'C' : ' ',
    495		     !list_empty(&res->blocked) ? 'B' : ' ',
    496		     !list_empty(&res->dirty) ? 'D' : ' ',
    497		     !list_empty(&res->recovering) ? 'R' : ' ',
    498		     !list_empty(&res->purge) ? 'P' : ' ');
    499
    500		dlm_print_one_lock_resource(res);
    501	}
    502
    503	/* By the time we're ready to blow this guy away, we shouldn't
    504	 * be on any lists. */
    505	BUG_ON(!hlist_unhashed(&res->hash_node));
    506	BUG_ON(!list_empty(&res->granted));
    507	BUG_ON(!list_empty(&res->converting));
    508	BUG_ON(!list_empty(&res->blocked));
    509	BUG_ON(!list_empty(&res->dirty));
    510	BUG_ON(!list_empty(&res->recovering));
    511	BUG_ON(!list_empty(&res->purge));
    512
    513	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
    514
    515	kmem_cache_free(dlm_lockres_cache, res);
    516}
    517
    518void dlm_lockres_put(struct dlm_lock_resource *res)
    519{
    520	kref_put(&res->refs, dlm_lockres_release);
    521}
    522
    523static void dlm_init_lockres(struct dlm_ctxt *dlm,
    524			     struct dlm_lock_resource *res,
    525			     const char *name, unsigned int namelen)
    526{
    527	char *qname;
    528
    529	/* If we memset here, we lose our reference to the kmalloc'd
    530	 * res->lockname.name, so be sure to init every field
    531	 * correctly! */
    532
    533	qname = (char *) res->lockname.name;
    534	memcpy(qname, name, namelen);
    535
    536	res->lockname.len = namelen;
    537	res->lockname.hash = dlm_lockid_hash(name, namelen);
    538
    539	init_waitqueue_head(&res->wq);
    540	spin_lock_init(&res->spinlock);
    541	INIT_HLIST_NODE(&res->hash_node);
    542	INIT_LIST_HEAD(&res->granted);
    543	INIT_LIST_HEAD(&res->converting);
    544	INIT_LIST_HEAD(&res->blocked);
    545	INIT_LIST_HEAD(&res->dirty);
    546	INIT_LIST_HEAD(&res->recovering);
    547	INIT_LIST_HEAD(&res->purge);
    548	INIT_LIST_HEAD(&res->tracking);
    549	atomic_set(&res->asts_reserved, 0);
    550	res->migration_pending = 0;
    551	res->inflight_locks = 0;
    552	res->inflight_assert_workers = 0;
    553
    554	res->dlm = dlm;
    555
    556	kref_init(&res->refs);
    557
    558	atomic_inc(&dlm->res_tot_count);
    559	atomic_inc(&dlm->res_cur_count);
    560
    561	/* just for consistency */
    562	spin_lock(&res->spinlock);
    563	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
    564	spin_unlock(&res->spinlock);
    565
    566	res->state = DLM_LOCK_RES_IN_PROGRESS;
    567
    568	res->last_used = 0;
    569
    570	spin_lock(&dlm->track_lock);
    571	list_add_tail(&res->tracking, &dlm->tracking_list);
    572	spin_unlock(&dlm->track_lock);
    573
    574	memset(res->lvb, 0, DLM_LVB_LEN);
    575	memset(res->refmap, 0, sizeof(res->refmap));
    576}
    577
    578struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
    579				   const char *name,
    580				   unsigned int namelen)
    581{
    582	struct dlm_lock_resource *res = NULL;
    583
    584	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
    585	if (!res)
    586		goto error;
    587
    588	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
    589	if (!res->lockname.name)
    590		goto error;
    591
    592	dlm_init_lockres(dlm, res, name, namelen);
    593	return res;
    594
    595error:
    596	if (res)
    597		kmem_cache_free(dlm_lockres_cache, res);
    598	return NULL;
    599}
    600
    601void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
    602				struct dlm_lock_resource *res, int bit)
    603{
    604	assert_spin_locked(&res->spinlock);
    605
    606	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
    607	     res->lockname.name, bit, __builtin_return_address(0));
    608
    609	set_bit(bit, res->refmap);
    610}
    611
    612void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
    613				  struct dlm_lock_resource *res, int bit)
    614{
    615	assert_spin_locked(&res->spinlock);
    616
    617	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
    618	     res->lockname.name, bit, __builtin_return_address(0));
    619
    620	clear_bit(bit, res->refmap);
    621}
    622
    623static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
    624				   struct dlm_lock_resource *res)
    625{
    626	res->inflight_locks++;
    627
    628	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
    629	     res->lockname.len, res->lockname.name, res->inflight_locks,
    630	     __builtin_return_address(0));
    631}
    632
    633void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
    634				   struct dlm_lock_resource *res)
    635{
    636	assert_spin_locked(&res->spinlock);
    637	__dlm_lockres_grab_inflight_ref(dlm, res);
    638}
    639
    640void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
    641				   struct dlm_lock_resource *res)
    642{
    643	assert_spin_locked(&res->spinlock);
    644
    645	BUG_ON(res->inflight_locks == 0);
    646
    647	res->inflight_locks--;
    648
    649	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
    650	     res->lockname.len, res->lockname.name, res->inflight_locks,
    651	     __builtin_return_address(0));
    652
    653	wake_up(&res->wq);
    654}
    655
    656void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
    657		struct dlm_lock_resource *res)
    658{
    659	assert_spin_locked(&res->spinlock);
    660	res->inflight_assert_workers++;
    661	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
    662			dlm->name, res->lockname.len, res->lockname.name,
    663			res->inflight_assert_workers);
    664}
    665
    666static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
    667		struct dlm_lock_resource *res)
    668{
    669	assert_spin_locked(&res->spinlock);
    670	BUG_ON(res->inflight_assert_workers == 0);
    671	res->inflight_assert_workers--;
    672	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
    673			dlm->name, res->lockname.len, res->lockname.name,
    674			res->inflight_assert_workers);
    675}
    676
    677static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
    678		struct dlm_lock_resource *res)
    679{
    680	spin_lock(&res->spinlock);
    681	__dlm_lockres_drop_inflight_worker(dlm, res);
    682	spin_unlock(&res->spinlock);
    683}
    684
    685/*
    686 * lookup a lock resource by name.
    687 * may already exist in the hashtable.
    688 * lockid is null terminated
    689 *
    690 * if not, allocate enough for the lockres and for
    691 * the temporary structure used in doing the mastering.
    692 *
    693 * also, do a lookup in the dlm->master_list to see
    694 * if another node has begun mastering the same lock.
    695 * if so, there should be a block entry in there
    696 * for this name, and we should *not* attempt to master
    697 * the lock here.   need to wait around for that node
    698 * to assert_master (or die).
    699 *
    700 */
    701struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
    702					  const char *lockid,
    703					  int namelen,
    704					  int flags)
    705{
    706	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
    707	struct dlm_master_list_entry *mle = NULL;
    708	struct dlm_master_list_entry *alloc_mle = NULL;
    709	int blocked = 0;
    710	int ret, nodenum;
    711	struct dlm_node_iter iter;
    712	unsigned int hash;
    713	int tries = 0;
    714	int bit, wait_on_recovery = 0;
    715
    716	BUG_ON(!lockid);
    717
    718	hash = dlm_lockid_hash(lockid, namelen);
    719
    720	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
    721
    722lookup:
    723	spin_lock(&dlm->spinlock);
    724	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
    725	if (tmpres) {
    726		spin_unlock(&dlm->spinlock);
    727		spin_lock(&tmpres->spinlock);
    728
    729		/*
    730		 * Right after dlm spinlock was released, dlm_thread could have
    731		 * purged the lockres. Check if lockres got unhashed. If so
    732		 * start over.
    733		 */
    734		if (hlist_unhashed(&tmpres->hash_node)) {
    735			spin_unlock(&tmpres->spinlock);
    736			dlm_lockres_put(tmpres);
    737			tmpres = NULL;
    738			goto lookup;
    739		}
    740
    741		/* Wait on the thread that is mastering the resource */
    742		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
    743			__dlm_wait_on_lockres(tmpres);
    744			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
    745			spin_unlock(&tmpres->spinlock);
    746			dlm_lockres_put(tmpres);
    747			tmpres = NULL;
    748			goto lookup;
    749		}
    750
    751		/* Wait on the resource purge to complete before continuing */
    752		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
    753			BUG_ON(tmpres->owner == dlm->node_num);
    754			__dlm_wait_on_lockres_flags(tmpres,
    755						    DLM_LOCK_RES_DROPPING_REF);
    756			spin_unlock(&tmpres->spinlock);
    757			dlm_lockres_put(tmpres);
    758			tmpres = NULL;
    759			goto lookup;
    760		}
    761
    762		/* Grab inflight ref to pin the resource */
    763		dlm_lockres_grab_inflight_ref(dlm, tmpres);
    764
    765		spin_unlock(&tmpres->spinlock);
    766		if (res) {
    767			spin_lock(&dlm->track_lock);
    768			if (!list_empty(&res->tracking))
    769				list_del_init(&res->tracking);
    770			else
    771				mlog(ML_ERROR, "Resource %.*s not "
    772						"on the Tracking list\n",
    773						res->lockname.len,
    774						res->lockname.name);
    775			spin_unlock(&dlm->track_lock);
    776			dlm_lockres_put(res);
    777		}
    778		res = tmpres;
    779		goto leave;
    780	}
    781
    782	if (!res) {
    783		spin_unlock(&dlm->spinlock);
    784		mlog(0, "allocating a new resource\n");
    785		/* nothing found and we need to allocate one. */
    786		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
    787		if (!alloc_mle)
    788			goto leave;
    789		res = dlm_new_lockres(dlm, lockid, namelen);
    790		if (!res)
    791			goto leave;
    792		goto lookup;
    793	}
    794
    795	mlog(0, "no lockres found, allocated our own: %p\n", res);
    796
    797	if (flags & LKM_LOCAL) {
    798		/* caller knows it's safe to assume it's not mastered elsewhere
    799		 * DONE!  return right away */
    800		spin_lock(&res->spinlock);
    801		dlm_change_lockres_owner(dlm, res, dlm->node_num);
    802		__dlm_insert_lockres(dlm, res);
    803		dlm_lockres_grab_inflight_ref(dlm, res);
    804		spin_unlock(&res->spinlock);
    805		spin_unlock(&dlm->spinlock);
    806		/* lockres still marked IN_PROGRESS */
    807		goto wake_waiters;
    808	}
    809
    810	/* check master list to see if another node has started mastering it */
    811	spin_lock(&dlm->master_lock);
    812
    813	/* if we found a block, wait for lock to be mastered by another node */
    814	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
    815	if (blocked) {
    816		int mig;
    817		if (mle->type == DLM_MLE_MASTER) {
    818			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
    819			BUG();
    820		}
    821		mig = (mle->type == DLM_MLE_MIGRATION);
    822		/* if there is a migration in progress, let the migration
    823		 * finish before continuing.  we can wait for the absence
    824		 * of the MIGRATION mle: either the migrate finished or
    825		 * one of the nodes died and the mle was cleaned up.
    826		 * if there is a BLOCK here, but it already has a master
    827		 * set, we are too late.  the master does not have a ref
    828		 * for us in the refmap.  detach the mle and drop it.
    829		 * either way, go back to the top and start over. */
    830		if (mig || mle->master != O2NM_MAX_NODES) {
    831			BUG_ON(mig && mle->master == dlm->node_num);
    832			/* we arrived too late.  the master does not
    833			 * have a ref for us. retry. */
    834			mlog(0, "%s:%.*s: late on %s\n",
    835			     dlm->name, namelen, lockid,
    836			     mig ?  "MIGRATION" : "BLOCK");
    837			spin_unlock(&dlm->master_lock);
    838			spin_unlock(&dlm->spinlock);
    839
    840			/* master is known, detach */
    841			if (!mig)
    842				dlm_mle_detach_hb_events(dlm, mle);
    843			dlm_put_mle(mle);
    844			mle = NULL;
    845			/* this is lame, but we can't wait on either
    846			 * the mle or lockres waitqueue here */
    847			if (mig)
    848				msleep(100);
    849			goto lookup;
    850		}
    851	} else {
    852		/* go ahead and try to master lock on this node */
    853		mle = alloc_mle;
    854		/* make sure this does not get freed below */
    855		alloc_mle = NULL;
    856		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
    857		set_bit(dlm->node_num, mle->maybe_map);
    858		__dlm_insert_mle(dlm, mle);
    859
    860		/* still holding the dlm spinlock, check the recovery map
    861		 * to see if there are any nodes that still need to be
    862		 * considered.  these will not appear in the mle nodemap
    863		 * but they might own this lockres.  wait on them. */
    864		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
    865		if (bit < O2NM_MAX_NODES) {
    866			mlog(0, "%s: res %.*s, At least one node (%d) "
    867			     "to recover before lock mastery can begin\n",
    868			     dlm->name, namelen, (char *)lockid, bit);
    869			wait_on_recovery = 1;
    870		}
    871	}
    872
    873	/* at this point there is either a DLM_MLE_BLOCK or a
    874	 * DLM_MLE_MASTER on the master list, so it's safe to add the
    875	 * lockres to the hashtable.  anyone who finds the lock will
    876	 * still have to wait on the IN_PROGRESS. */
    877
    878	/* finally add the lockres to its hash bucket */
    879	__dlm_insert_lockres(dlm, res);
    880
    881	/* since this lockres is new it doesn't not require the spinlock */
    882	__dlm_lockres_grab_inflight_ref(dlm, res);
    883
    884	/* get an extra ref on the mle in case this is a BLOCK
    885	 * if so, the creator of the BLOCK may try to put the last
    886	 * ref at this time in the assert master handler, so we
    887	 * need an extra one to keep from a bad ptr deref. */
    888	dlm_get_mle_inuse(mle);
    889	spin_unlock(&dlm->master_lock);
    890	spin_unlock(&dlm->spinlock);
    891
    892redo_request:
    893	while (wait_on_recovery) {
    894		/* any cluster changes that occurred after dropping the
    895		 * dlm spinlock would be detectable be a change on the mle,
    896		 * so we only need to clear out the recovery map once. */
    897		if (dlm_is_recovery_lock(lockid, namelen)) {
    898			mlog(0, "%s: Recovery map is not empty, but must "
    899			     "master $RECOVERY lock now\n", dlm->name);
    900			if (!dlm_pre_master_reco_lockres(dlm, res))
    901				wait_on_recovery = 0;
    902			else {
    903				mlog(0, "%s: waiting 500ms for heartbeat state "
    904				    "change\n", dlm->name);
    905				msleep(500);
    906			}
    907			continue;
    908		}
    909
    910		dlm_kick_recovery_thread(dlm);
    911		msleep(1000);
    912		dlm_wait_for_recovery(dlm);
    913
    914		spin_lock(&dlm->spinlock);
    915		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
    916		if (bit < O2NM_MAX_NODES) {
    917			mlog(0, "%s: res %.*s, At least one node (%d) "
    918			     "to recover before lock mastery can begin\n",
    919			     dlm->name, namelen, (char *)lockid, bit);
    920			wait_on_recovery = 1;
    921		} else
    922			wait_on_recovery = 0;
    923		spin_unlock(&dlm->spinlock);
    924
    925		if (wait_on_recovery)
    926			dlm_wait_for_node_recovery(dlm, bit, 10000);
    927	}
    928
    929	/* must wait for lock to be mastered elsewhere */
    930	if (blocked)
    931		goto wait;
    932
    933	ret = -EINVAL;
    934	dlm_node_iter_init(mle->vote_map, &iter);
    935	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
    936		ret = dlm_do_master_request(res, mle, nodenum);
    937		if (ret < 0)
    938			mlog_errno(ret);
    939		if (mle->master != O2NM_MAX_NODES) {
    940			/* found a master ! */
    941			if (mle->master <= nodenum)
    942				break;
    943			/* if our master request has not reached the master
    944			 * yet, keep going until it does.  this is how the
    945			 * master will know that asserts are needed back to
    946			 * the lower nodes. */
    947			mlog(0, "%s: res %.*s, Requests only up to %u but "
    948			     "master is %u, keep going\n", dlm->name, namelen,
    949			     lockid, nodenum, mle->master);
    950		}
    951	}
    952
    953wait:
    954	/* keep going until the response map includes all nodes */
    955	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
    956	if (ret < 0) {
    957		wait_on_recovery = 1;
    958		mlog(0, "%s: res %.*s, Node map changed, redo the master "
    959		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
    960		     res->lockname.name, blocked);
    961		if (++tries > 20) {
    962			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
    963			     "dlm_wait_for_lock_mastery, blocked = %d\n",
    964			     dlm->name, res->lockname.len,
    965			     res->lockname.name, blocked);
    966			dlm_print_one_lock_resource(res);
    967			dlm_print_one_mle(mle);
    968			tries = 0;
    969		}
    970		goto redo_request;
    971	}
    972
    973	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
    974	     res->lockname.name, res->owner);
    975	/* make sure we never continue without this */
    976	BUG_ON(res->owner == O2NM_MAX_NODES);
    977
    978	/* master is known, detach if not already detached */
    979	dlm_mle_detach_hb_events(dlm, mle);
    980	dlm_put_mle(mle);
    981	/* put the extra ref */
    982	dlm_put_mle_inuse(mle);
    983
    984wake_waiters:
    985	spin_lock(&res->spinlock);
    986	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
    987	spin_unlock(&res->spinlock);
    988	wake_up(&res->wq);
    989
    990leave:
    991	/* need to free the unused mle */
    992	if (alloc_mle)
    993		kmem_cache_free(dlm_mle_cache, alloc_mle);
    994
    995	return res;
    996}
    997
    998
    999#define DLM_MASTERY_TIMEOUT_MS   5000
   1000
   1001static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
   1002				     struct dlm_lock_resource *res,
   1003				     struct dlm_master_list_entry *mle,
   1004				     int *blocked)
   1005{
   1006	u8 m;
   1007	int ret, bit;
   1008	int map_changed, voting_done;
   1009	int assert, sleep;
   1010
   1011recheck:
   1012	ret = 0;
   1013	assert = 0;
   1014
   1015	/* check if another node has already become the owner */
   1016	spin_lock(&res->spinlock);
   1017	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
   1018		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
   1019		     res->lockname.len, res->lockname.name, res->owner);
   1020		spin_unlock(&res->spinlock);
   1021		/* this will cause the master to re-assert across
   1022		 * the whole cluster, freeing up mles */
   1023		if (res->owner != dlm->node_num) {
   1024			ret = dlm_do_master_request(res, mle, res->owner);
   1025			if (ret < 0) {
   1026				/* give recovery a chance to run */
   1027				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
   1028				msleep(500);
   1029				goto recheck;
   1030			}
   1031		}
   1032		ret = 0;
   1033		goto leave;
   1034	}
   1035	spin_unlock(&res->spinlock);
   1036
   1037	spin_lock(&mle->spinlock);
   1038	m = mle->master;
   1039	map_changed = (memcmp(mle->vote_map, mle->node_map,
   1040			      sizeof(mle->vote_map)) != 0);
   1041	voting_done = (memcmp(mle->vote_map, mle->response_map,
   1042			     sizeof(mle->vote_map)) == 0);
   1043
   1044	/* restart if we hit any errors */
   1045	if (map_changed) {
   1046		int b;
   1047		mlog(0, "%s: %.*s: node map changed, restarting\n",
   1048		     dlm->name, res->lockname.len, res->lockname.name);
   1049		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
   1050		b = (mle->type == DLM_MLE_BLOCK);
   1051		if ((*blocked && !b) || (!*blocked && b)) {
   1052			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
   1053			     dlm->name, res->lockname.len, res->lockname.name,
   1054			     *blocked, b);
   1055			*blocked = b;
   1056		}
   1057		spin_unlock(&mle->spinlock);
   1058		if (ret < 0) {
   1059			mlog_errno(ret);
   1060			goto leave;
   1061		}
   1062		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
   1063		     "rechecking now\n", dlm->name, res->lockname.len,
   1064		     res->lockname.name);
   1065		goto recheck;
   1066	} else {
   1067		if (!voting_done) {
   1068			mlog(0, "map not changed and voting not done "
   1069			     "for %s:%.*s\n", dlm->name, res->lockname.len,
   1070			     res->lockname.name);
   1071		}
   1072	}
   1073
   1074	if (m != O2NM_MAX_NODES) {
   1075		/* another node has done an assert!
   1076		 * all done! */
   1077		sleep = 0;
   1078	} else {
   1079		sleep = 1;
   1080		/* have all nodes responded? */
   1081		if (voting_done && !*blocked) {
   1082			bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
   1083			if (dlm->node_num <= bit) {
   1084				/* my node number is lowest.
   1085			 	 * now tell other nodes that I am
   1086				 * mastering this. */
   1087				mle->master = dlm->node_num;
   1088				/* ref was grabbed in get_lock_resource
   1089				 * will be dropped in dlmlock_master */
   1090				assert = 1;
   1091				sleep = 0;
   1092			}
   1093			/* if voting is done, but we have not received
   1094			 * an assert master yet, we must sleep */
   1095		}
   1096	}
   1097
   1098	spin_unlock(&mle->spinlock);
   1099
   1100	/* sleep if we haven't finished voting yet */
   1101	if (sleep) {
   1102		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
   1103		atomic_set(&mle->woken, 0);
   1104		(void)wait_event_timeout(mle->wq,
   1105					 (atomic_read(&mle->woken) == 1),
   1106					 timeo);
   1107		if (res->owner == O2NM_MAX_NODES) {
   1108			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
   1109			     res->lockname.len, res->lockname.name);
   1110			goto recheck;
   1111		}
   1112		mlog(0, "done waiting, master is %u\n", res->owner);
   1113		ret = 0;
   1114		goto leave;
   1115	}
   1116
   1117	ret = 0;   /* done */
   1118	if (assert) {
   1119		m = dlm->node_num;
   1120		mlog(0, "about to master %.*s here, this=%u\n",
   1121		     res->lockname.len, res->lockname.name, m);
   1122		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
   1123		if (ret) {
   1124			/* This is a failure in the network path,
   1125			 * not in the response to the assert_master
   1126			 * (any nonzero response is a BUG on this node).
   1127			 * Most likely a socket just got disconnected
   1128			 * due to node death. */
   1129			mlog_errno(ret);
   1130		}
   1131		/* no longer need to restart lock mastery.
   1132		 * all living nodes have been contacted. */
   1133		ret = 0;
   1134	}
   1135
   1136	/* set the lockres owner */
   1137	spin_lock(&res->spinlock);
   1138	/* mastery reference obtained either during
   1139	 * assert_master_handler or in get_lock_resource */
   1140	dlm_change_lockres_owner(dlm, res, m);
   1141	spin_unlock(&res->spinlock);
   1142
   1143leave:
   1144	return ret;
   1145}
   1146
   1147struct dlm_bitmap_diff_iter
   1148{
   1149	int curnode;
   1150	unsigned long *orig_bm;
   1151	unsigned long *cur_bm;
   1152	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
   1153};
   1154
   1155enum dlm_node_state_change
   1156{
   1157	NODE_DOWN = -1,
   1158	NODE_NO_CHANGE = 0,
   1159	NODE_UP
   1160};
   1161
   1162static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
   1163				      unsigned long *orig_bm,
   1164				      unsigned long *cur_bm)
   1165{
   1166	unsigned long p1, p2;
   1167	int i;
   1168
   1169	iter->curnode = -1;
   1170	iter->orig_bm = orig_bm;
   1171	iter->cur_bm = cur_bm;
   1172
   1173	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
   1174       		p1 = *(iter->orig_bm + i);
   1175	       	p2 = *(iter->cur_bm + i);
   1176		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
   1177	}
   1178}
   1179
   1180static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
   1181				     enum dlm_node_state_change *state)
   1182{
   1183	int bit;
   1184
   1185	if (iter->curnode >= O2NM_MAX_NODES)
   1186		return -ENOENT;
   1187
   1188	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
   1189			    iter->curnode+1);
   1190	if (bit >= O2NM_MAX_NODES) {
   1191		iter->curnode = O2NM_MAX_NODES;
   1192		return -ENOENT;
   1193	}
   1194
   1195	/* if it was there in the original then this node died */
   1196	if (test_bit(bit, iter->orig_bm))
   1197		*state = NODE_DOWN;
   1198	else
   1199		*state = NODE_UP;
   1200
   1201	iter->curnode = bit;
   1202	return bit;
   1203}
   1204
   1205
   1206static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
   1207				    struct dlm_lock_resource *res,
   1208				    struct dlm_master_list_entry *mle,
   1209				    int blocked)
   1210{
   1211	struct dlm_bitmap_diff_iter bdi;
   1212	enum dlm_node_state_change sc;
   1213	int node;
   1214	int ret = 0;
   1215
   1216	mlog(0, "something happened such that the "
   1217	     "master process may need to be restarted!\n");
   1218
   1219	assert_spin_locked(&mle->spinlock);
   1220
   1221	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
   1222	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
   1223	while (node >= 0) {
   1224		if (sc == NODE_UP) {
   1225			/* a node came up.  clear any old vote from
   1226			 * the response map and set it in the vote map
   1227			 * then restart the mastery. */
   1228			mlog(ML_NOTICE, "node %d up while restarting\n", node);
   1229
   1230			/* redo the master request, but only for the new node */
   1231			mlog(0, "sending request to new node\n");
   1232			clear_bit(node, mle->response_map);
   1233			set_bit(node, mle->vote_map);
   1234		} else {
   1235			mlog(ML_ERROR, "node down! %d\n", node);
   1236			if (blocked) {
   1237				int lowest = find_first_bit(mle->maybe_map,
   1238						       O2NM_MAX_NODES);
   1239
   1240				/* act like it was never there */
   1241				clear_bit(node, mle->maybe_map);
   1242
   1243			       	if (node == lowest) {
   1244					mlog(0, "expected master %u died"
   1245					    " while this node was blocked "
   1246					    "waiting on it!\n", node);
   1247					lowest = find_next_bit(mle->maybe_map,
   1248						       	O2NM_MAX_NODES,
   1249						       	lowest+1);
   1250					if (lowest < O2NM_MAX_NODES) {
   1251						mlog(0, "%s:%.*s:still "
   1252						     "blocked. waiting on %u "
   1253						     "now\n", dlm->name,
   1254						     res->lockname.len,
   1255						     res->lockname.name,
   1256						     lowest);
   1257					} else {
   1258						/* mle is an MLE_BLOCK, but
   1259						 * there is now nothing left to
   1260						 * block on.  we need to return
   1261						 * all the way back out and try
   1262						 * again with an MLE_MASTER.
   1263						 * dlm_do_local_recovery_cleanup
   1264						 * has already run, so the mle
   1265						 * refcount is ok */
   1266						mlog(0, "%s:%.*s: no "
   1267						     "longer blocking. try to "
   1268						     "master this here\n",
   1269						     dlm->name,
   1270						     res->lockname.len,
   1271						     res->lockname.name);
   1272						mle->type = DLM_MLE_MASTER;
   1273						mle->mleres = res;
   1274					}
   1275				}
   1276			}
   1277
   1278			/* now blank out everything, as if we had never
   1279			 * contacted anyone */
   1280			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
   1281			memset(mle->response_map, 0, sizeof(mle->response_map));
   1282			/* reset the vote_map to the current node_map */
   1283			memcpy(mle->vote_map, mle->node_map,
   1284			       sizeof(mle->node_map));
   1285			/* put myself into the maybe map */
   1286			if (mle->type != DLM_MLE_BLOCK)
   1287				set_bit(dlm->node_num, mle->maybe_map);
   1288		}
   1289		ret = -EAGAIN;
   1290		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
   1291	}
   1292	return ret;
   1293}
   1294
   1295
   1296/*
   1297 * DLM_MASTER_REQUEST_MSG
   1298 *
   1299 * returns: 0 on success,
   1300 *          -errno on a network error
   1301 *
   1302 * on error, the caller should assume the target node is "dead"
   1303 *
   1304 */
   1305
   1306static int dlm_do_master_request(struct dlm_lock_resource *res,
   1307				 struct dlm_master_list_entry *mle, int to)
   1308{
   1309	struct dlm_ctxt *dlm = mle->dlm;
   1310	struct dlm_master_request request;
   1311	int ret, response=0, resend;
   1312
   1313	memset(&request, 0, sizeof(request));
   1314	request.node_idx = dlm->node_num;
   1315
   1316	BUG_ON(mle->type == DLM_MLE_MIGRATION);
   1317
   1318	request.namelen = (u8)mle->mnamelen;
   1319	memcpy(request.name, mle->mname, request.namelen);
   1320
   1321again:
   1322	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
   1323				 sizeof(request), to, &response);
   1324	if (ret < 0)  {
   1325		if (ret == -ESRCH) {
   1326			/* should never happen */
   1327			mlog(ML_ERROR, "TCP stack not ready!\n");
   1328			BUG();
   1329		} else if (ret == -EINVAL) {
   1330			mlog(ML_ERROR, "bad args passed to o2net!\n");
   1331			BUG();
   1332		} else if (ret == -ENOMEM) {
   1333			mlog(ML_ERROR, "out of memory while trying to send "
   1334			     "network message!  retrying\n");
   1335			/* this is totally crude */
   1336			msleep(50);
   1337			goto again;
   1338		} else if (!dlm_is_host_down(ret)) {
   1339			/* not a network error. bad. */
   1340			mlog_errno(ret);
   1341			mlog(ML_ERROR, "unhandled error!");
   1342			BUG();
   1343		}
   1344		/* all other errors should be network errors,
   1345		 * and likely indicate node death */
   1346		mlog(ML_ERROR, "link to %d went down!\n", to);
   1347		goto out;
   1348	}
   1349
   1350	ret = 0;
   1351	resend = 0;
   1352	spin_lock(&mle->spinlock);
   1353	switch (response) {
   1354		case DLM_MASTER_RESP_YES:
   1355			set_bit(to, mle->response_map);
   1356			mlog(0, "node %u is the master, response=YES\n", to);
   1357			mlog(0, "%s:%.*s: master node %u now knows I have a "
   1358			     "reference\n", dlm->name, res->lockname.len,
   1359			     res->lockname.name, to);
   1360			mle->master = to;
   1361			break;
   1362		case DLM_MASTER_RESP_NO:
   1363			mlog(0, "node %u not master, response=NO\n", to);
   1364			set_bit(to, mle->response_map);
   1365			break;
   1366		case DLM_MASTER_RESP_MAYBE:
   1367			mlog(0, "node %u not master, response=MAYBE\n", to);
   1368			set_bit(to, mle->response_map);
   1369			set_bit(to, mle->maybe_map);
   1370			break;
   1371		case DLM_MASTER_RESP_ERROR:
   1372			mlog(0, "node %u hit an error, resending\n", to);
   1373			resend = 1;
   1374			response = 0;
   1375			break;
   1376		default:
   1377			mlog(ML_ERROR, "bad response! %u\n", response);
   1378			BUG();
   1379	}
   1380	spin_unlock(&mle->spinlock);
   1381	if (resend) {
   1382		/* this is also totally crude */
   1383		msleep(50);
   1384		goto again;
   1385	}
   1386
   1387out:
   1388	return ret;
   1389}
   1390
   1391/*
   1392 * locks that can be taken here:
   1393 * dlm->spinlock
   1394 * res->spinlock
   1395 * mle->spinlock
   1396 * dlm->master_list
   1397 *
   1398 * if possible, TRIM THIS DOWN!!!
   1399 */
   1400int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
   1401			       void **ret_data)
   1402{
   1403	u8 response = DLM_MASTER_RESP_MAYBE;
   1404	struct dlm_ctxt *dlm = data;
   1405	struct dlm_lock_resource *res = NULL;
   1406	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
   1407	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
   1408	char *name;
   1409	unsigned int namelen, hash;
   1410	int found, ret;
   1411	int set_maybe;
   1412	int dispatch_assert = 0;
   1413	int dispatched = 0;
   1414
   1415	if (!dlm_grab(dlm))
   1416		return DLM_MASTER_RESP_NO;
   1417
   1418	if (!dlm_domain_fully_joined(dlm)) {
   1419		response = DLM_MASTER_RESP_NO;
   1420		goto send_response;
   1421	}
   1422
   1423	name = request->name;
   1424	namelen = request->namelen;
   1425	hash = dlm_lockid_hash(name, namelen);
   1426
   1427	if (namelen > DLM_LOCKID_NAME_MAX) {
   1428		response = DLM_IVBUFLEN;
   1429		goto send_response;
   1430	}
   1431
   1432way_up_top:
   1433	spin_lock(&dlm->spinlock);
   1434	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
   1435	if (res) {
   1436		spin_unlock(&dlm->spinlock);
   1437
   1438		/* take care of the easy cases up front */
   1439		spin_lock(&res->spinlock);
   1440
   1441		/*
   1442		 * Right after dlm spinlock was released, dlm_thread could have
   1443		 * purged the lockres. Check if lockres got unhashed. If so
   1444		 * start over.
   1445		 */
   1446		if (hlist_unhashed(&res->hash_node)) {
   1447			spin_unlock(&res->spinlock);
   1448			dlm_lockres_put(res);
   1449			goto way_up_top;
   1450		}
   1451
   1452		if (res->state & (DLM_LOCK_RES_RECOVERING|
   1453				  DLM_LOCK_RES_MIGRATING)) {
   1454			spin_unlock(&res->spinlock);
   1455			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
   1456			     "being recovered/migrated\n");
   1457			response = DLM_MASTER_RESP_ERROR;
   1458			if (mle)
   1459				kmem_cache_free(dlm_mle_cache, mle);
   1460			goto send_response;
   1461		}
   1462
   1463		if (res->owner == dlm->node_num) {
   1464			dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
   1465			spin_unlock(&res->spinlock);
   1466			response = DLM_MASTER_RESP_YES;
   1467			if (mle)
   1468				kmem_cache_free(dlm_mle_cache, mle);
   1469
   1470			/* this node is the owner.
   1471			 * there is some extra work that needs to
   1472			 * happen now.  the requesting node has
   1473			 * caused all nodes up to this one to
   1474			 * create mles.  this node now needs to
   1475			 * go back and clean those up. */
   1476			dispatch_assert = 1;
   1477			goto send_response;
   1478		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
   1479			spin_unlock(&res->spinlock);
   1480			// mlog(0, "node %u is the master\n", res->owner);
   1481			response = DLM_MASTER_RESP_NO;
   1482			if (mle)
   1483				kmem_cache_free(dlm_mle_cache, mle);
   1484			goto send_response;
   1485		}
   1486
   1487		/* ok, there is no owner.  either this node is
   1488		 * being blocked, or it is actively trying to
   1489		 * master this lock. */
   1490		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
   1491			mlog(ML_ERROR, "lock with no owner should be "
   1492			     "in-progress!\n");
   1493			BUG();
   1494		}
   1495
   1496		// mlog(0, "lockres is in progress...\n");
   1497		spin_lock(&dlm->master_lock);
   1498		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
   1499		if (!found) {
   1500			mlog(ML_ERROR, "no mle found for this lock!\n");
   1501			BUG();
   1502		}
   1503		set_maybe = 1;
   1504		spin_lock(&tmpmle->spinlock);
   1505		if (tmpmle->type == DLM_MLE_BLOCK) {
   1506			// mlog(0, "this node is waiting for "
   1507			// "lockres to be mastered\n");
   1508			response = DLM_MASTER_RESP_NO;
   1509		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
   1510			mlog(0, "node %u is master, but trying to migrate to "
   1511			     "node %u.\n", tmpmle->master, tmpmle->new_master);
   1512			if (tmpmle->master == dlm->node_num) {
   1513				mlog(ML_ERROR, "no owner on lockres, but this "
   1514				     "node is trying to migrate it to %u?!\n",
   1515				     tmpmle->new_master);
   1516				BUG();
   1517			} else {
   1518				/* the real master can respond on its own */
   1519				response = DLM_MASTER_RESP_NO;
   1520			}
   1521		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
   1522			set_maybe = 0;
   1523			if (tmpmle->master == dlm->node_num) {
   1524				response = DLM_MASTER_RESP_YES;
   1525				/* this node will be the owner.
   1526				 * go back and clean the mles on any
   1527				 * other nodes */
   1528				dispatch_assert = 1;
   1529				dlm_lockres_set_refmap_bit(dlm, res,
   1530							   request->node_idx);
   1531			} else
   1532				response = DLM_MASTER_RESP_NO;
   1533		} else {
   1534			// mlog(0, "this node is attempting to "
   1535			// "master lockres\n");
   1536			response = DLM_MASTER_RESP_MAYBE;
   1537		}
   1538		if (set_maybe)
   1539			set_bit(request->node_idx, tmpmle->maybe_map);
   1540		spin_unlock(&tmpmle->spinlock);
   1541
   1542		spin_unlock(&dlm->master_lock);
   1543		spin_unlock(&res->spinlock);
   1544
   1545		/* keep the mle attached to heartbeat events */
   1546		dlm_put_mle(tmpmle);
   1547		if (mle)
   1548			kmem_cache_free(dlm_mle_cache, mle);
   1549		goto send_response;
   1550	}
   1551
   1552	/*
   1553	 * lockres doesn't exist on this node
   1554	 * if there is an MLE_BLOCK, return NO
   1555	 * if there is an MLE_MASTER, return MAYBE
   1556	 * otherwise, add an MLE_BLOCK, return NO
   1557	 */
   1558	spin_lock(&dlm->master_lock);
   1559	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
   1560	if (!found) {
   1561		/* this lockid has never been seen on this node yet */
   1562		// mlog(0, "no mle found\n");
   1563		if (!mle) {
   1564			spin_unlock(&dlm->master_lock);
   1565			spin_unlock(&dlm->spinlock);
   1566
   1567			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
   1568			if (!mle) {
   1569				response = DLM_MASTER_RESP_ERROR;
   1570				mlog_errno(-ENOMEM);
   1571				goto send_response;
   1572			}
   1573			goto way_up_top;
   1574		}
   1575
   1576		// mlog(0, "this is second time thru, already allocated, "
   1577		// "add the block.\n");
   1578		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
   1579		set_bit(request->node_idx, mle->maybe_map);
   1580		__dlm_insert_mle(dlm, mle);
   1581		response = DLM_MASTER_RESP_NO;
   1582	} else {
   1583		spin_lock(&tmpmle->spinlock);
   1584		if (tmpmle->master == dlm->node_num) {
   1585			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
   1586			BUG();
   1587		}
   1588		if (tmpmle->type == DLM_MLE_BLOCK)
   1589			response = DLM_MASTER_RESP_NO;
   1590		else if (tmpmle->type == DLM_MLE_MIGRATION) {
   1591			mlog(0, "migration mle was found (%u->%u)\n",
   1592			     tmpmle->master, tmpmle->new_master);
   1593			/* real master can respond on its own */
   1594			response = DLM_MASTER_RESP_NO;
   1595		} else
   1596			response = DLM_MASTER_RESP_MAYBE;
   1597		set_bit(request->node_idx, tmpmle->maybe_map);
   1598		spin_unlock(&tmpmle->spinlock);
   1599	}
   1600	spin_unlock(&dlm->master_lock);
   1601	spin_unlock(&dlm->spinlock);
   1602
   1603	if (found) {
   1604		/* keep the mle attached to heartbeat events */
   1605		dlm_put_mle(tmpmle);
   1606	}
   1607send_response:
   1608	/*
   1609	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
   1610	 * The reference is released by dlm_assert_master_worker() under
   1611	 * the call to dlm_dispatch_assert_master().  If
   1612	 * dlm_assert_master_worker() isn't called, we drop it here.
   1613	 */
   1614	if (dispatch_assert) {
   1615		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
   1616			     dlm->node_num, res->lockname.len, res->lockname.name);
   1617		spin_lock(&res->spinlock);
   1618		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
   1619						 DLM_ASSERT_MASTER_MLE_CLEANUP);
   1620		if (ret < 0) {
   1621			mlog(ML_ERROR, "failed to dispatch assert master work\n");
   1622			response = DLM_MASTER_RESP_ERROR;
   1623			spin_unlock(&res->spinlock);
   1624			dlm_lockres_put(res);
   1625		} else {
   1626			dispatched = 1;
   1627			__dlm_lockres_grab_inflight_worker(dlm, res);
   1628			spin_unlock(&res->spinlock);
   1629		}
   1630	} else {
   1631		if (res)
   1632			dlm_lockres_put(res);
   1633	}
   1634
   1635	if (!dispatched)
   1636		dlm_put(dlm);
   1637	return response;
   1638}
   1639
   1640/*
   1641 * DLM_ASSERT_MASTER_MSG
   1642 */
   1643
   1644
   1645/*
   1646 * NOTE: this can be used for debugging
   1647 * can periodically run all locks owned by this node
   1648 * and re-assert across the cluster...
   1649 */
   1650static int dlm_do_assert_master(struct dlm_ctxt *dlm,
   1651				struct dlm_lock_resource *res,
   1652				void *nodemap, u32 flags)
   1653{
   1654	struct dlm_assert_master assert;
   1655	int to, tmpret;
   1656	struct dlm_node_iter iter;
   1657	int ret = 0;
   1658	int reassert;
   1659	const char *lockname = res->lockname.name;
   1660	unsigned int namelen = res->lockname.len;
   1661
   1662	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
   1663
   1664	spin_lock(&res->spinlock);
   1665	res->state |= DLM_LOCK_RES_SETREF_INPROG;
   1666	spin_unlock(&res->spinlock);
   1667
   1668again:
   1669	reassert = 0;
   1670
   1671	/* note that if this nodemap is empty, it returns 0 */
   1672	dlm_node_iter_init(nodemap, &iter);
   1673	while ((to = dlm_node_iter_next(&iter)) >= 0) {
   1674		int r = 0;
   1675		struct dlm_master_list_entry *mle = NULL;
   1676
   1677		mlog(0, "sending assert master to %d (%.*s)\n", to,
   1678		     namelen, lockname);
   1679		memset(&assert, 0, sizeof(assert));
   1680		assert.node_idx = dlm->node_num;
   1681		assert.namelen = namelen;
   1682		memcpy(assert.name, lockname, namelen);
   1683		assert.flags = cpu_to_be32(flags);
   1684
   1685		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
   1686					    &assert, sizeof(assert), to, &r);
   1687		if (tmpret < 0) {
   1688			mlog(ML_ERROR, "Error %d when sending message %u (key "
   1689			     "0x%x) to node %u\n", tmpret,
   1690			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
   1691			if (!dlm_is_host_down(tmpret)) {
   1692				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
   1693				BUG();
   1694			}
   1695			/* a node died.  finish out the rest of the nodes. */
   1696			mlog(0, "link to %d went down!\n", to);
   1697			/* any nonzero status return will do */
   1698			ret = tmpret;
   1699			r = 0;
   1700		} else if (r < 0) {
   1701			/* ok, something horribly messed.  kill thyself. */
   1702			mlog(ML_ERROR,"during assert master of %.*s to %u, "
   1703			     "got %d.\n", namelen, lockname, to, r);
   1704			spin_lock(&dlm->spinlock);
   1705			spin_lock(&dlm->master_lock);
   1706			if (dlm_find_mle(dlm, &mle, (char *)lockname,
   1707					 namelen)) {
   1708				dlm_print_one_mle(mle);
   1709				__dlm_put_mle(mle);
   1710			}
   1711			spin_unlock(&dlm->master_lock);
   1712			spin_unlock(&dlm->spinlock);
   1713			BUG();
   1714		}
   1715
   1716		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
   1717		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
   1718				mlog(ML_ERROR, "%.*s: very strange, "
   1719				     "master MLE but no lockres on %u\n",
   1720				     namelen, lockname, to);
   1721		}
   1722
   1723		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
   1724			mlog(0, "%.*s: node %u create mles on other "
   1725			     "nodes and requests a re-assert\n",
   1726			     namelen, lockname, to);
   1727			reassert = 1;
   1728		}
   1729		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
   1730			mlog(0, "%.*s: node %u has a reference to this "
   1731			     "lockres, set the bit in the refmap\n",
   1732			     namelen, lockname, to);
   1733			spin_lock(&res->spinlock);
   1734			dlm_lockres_set_refmap_bit(dlm, res, to);
   1735			spin_unlock(&res->spinlock);
   1736		}
   1737	}
   1738
   1739	if (reassert)
   1740		goto again;
   1741
   1742	spin_lock(&res->spinlock);
   1743	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
   1744	spin_unlock(&res->spinlock);
   1745	wake_up(&res->wq);
   1746
   1747	return ret;
   1748}
   1749
   1750/*
   1751 * locks that can be taken here:
   1752 * dlm->spinlock
   1753 * res->spinlock
   1754 * mle->spinlock
   1755 * dlm->master_list
   1756 *
   1757 * if possible, TRIM THIS DOWN!!!
   1758 */
   1759int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
   1760			      void **ret_data)
   1761{
   1762	struct dlm_ctxt *dlm = data;
   1763	struct dlm_master_list_entry *mle = NULL;
   1764	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
   1765	struct dlm_lock_resource *res = NULL;
   1766	char *name;
   1767	unsigned int namelen, hash;
   1768	u32 flags;
   1769	int master_request = 0, have_lockres_ref = 0;
   1770	int ret = 0;
   1771
   1772	if (!dlm_grab(dlm))
   1773		return 0;
   1774
   1775	name = assert->name;
   1776	namelen = assert->namelen;
   1777	hash = dlm_lockid_hash(name, namelen);
   1778	flags = be32_to_cpu(assert->flags);
   1779
   1780	if (namelen > DLM_LOCKID_NAME_MAX) {
   1781		mlog(ML_ERROR, "Invalid name length!");
   1782		goto done;
   1783	}
   1784
   1785	spin_lock(&dlm->spinlock);
   1786
   1787	if (flags)
   1788		mlog(0, "assert_master with flags: %u\n", flags);
   1789
   1790	/* find the MLE */
   1791	spin_lock(&dlm->master_lock);
   1792	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
   1793		/* not an error, could be master just re-asserting */
   1794		mlog(0, "just got an assert_master from %u, but no "
   1795		     "MLE for it! (%.*s)\n", assert->node_idx,
   1796		     namelen, name);
   1797	} else {
   1798		int bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
   1799		if (bit >= O2NM_MAX_NODES) {
   1800			/* not necessarily an error, though less likely.
   1801			 * could be master just re-asserting. */
   1802			mlog(0, "no bits set in the maybe_map, but %u "
   1803			     "is asserting! (%.*s)\n", assert->node_idx,
   1804			     namelen, name);
   1805		} else if (bit != assert->node_idx) {
   1806			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
   1807				mlog(0, "master %u was found, %u should "
   1808				     "back off\n", assert->node_idx, bit);
   1809			} else {
   1810				/* with the fix for bug 569, a higher node
   1811				 * number winning the mastery will respond
   1812				 * YES to mastery requests, but this node
   1813				 * had no way of knowing.  let it pass. */
   1814				mlog(0, "%u is the lowest node, "
   1815				     "%u is asserting. (%.*s)  %u must "
   1816				     "have begun after %u won.\n", bit,
   1817				     assert->node_idx, namelen, name, bit,
   1818				     assert->node_idx);
   1819			}
   1820		}
   1821		if (mle->type == DLM_MLE_MIGRATION) {
   1822			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
   1823				mlog(0, "%s:%.*s: got cleanup assert"
   1824				     " from %u for migration\n",
   1825				     dlm->name, namelen, name,
   1826				     assert->node_idx);
   1827			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
   1828				mlog(0, "%s:%.*s: got unrelated assert"
   1829				     " from %u for migration, ignoring\n",
   1830				     dlm->name, namelen, name,
   1831				     assert->node_idx);
   1832				__dlm_put_mle(mle);
   1833				spin_unlock(&dlm->master_lock);
   1834				spin_unlock(&dlm->spinlock);
   1835				goto done;
   1836			}
   1837		}
   1838	}
   1839	spin_unlock(&dlm->master_lock);
   1840
   1841	/* ok everything checks out with the MLE
   1842	 * now check to see if there is a lockres */
   1843	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
   1844	if (res) {
   1845		spin_lock(&res->spinlock);
   1846		if (res->state & DLM_LOCK_RES_RECOVERING)  {
   1847			mlog(ML_ERROR, "%u asserting but %.*s is "
   1848			     "RECOVERING!\n", assert->node_idx, namelen, name);
   1849			goto kill;
   1850		}
   1851		if (!mle) {
   1852			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
   1853			    res->owner != assert->node_idx) {
   1854				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
   1855				     "but current owner is %u! (%.*s)\n",
   1856				     assert->node_idx, res->owner, namelen,
   1857				     name);
   1858				__dlm_print_one_lock_resource(res);
   1859				BUG();
   1860			}
   1861		} else if (mle->type != DLM_MLE_MIGRATION) {
   1862			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
   1863				/* owner is just re-asserting */
   1864				if (res->owner == assert->node_idx) {
   1865					mlog(0, "owner %u re-asserting on "
   1866					     "lock %.*s\n", assert->node_idx,
   1867					     namelen, name);
   1868					goto ok;
   1869				}
   1870				mlog(ML_ERROR, "got assert_master from "
   1871				     "node %u, but %u is the owner! "
   1872				     "(%.*s)\n", assert->node_idx,
   1873				     res->owner, namelen, name);
   1874				goto kill;
   1875			}
   1876			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
   1877				mlog(ML_ERROR, "got assert from %u, but lock "
   1878				     "with no owner should be "
   1879				     "in-progress! (%.*s)\n",
   1880				     assert->node_idx,
   1881				     namelen, name);
   1882				goto kill;
   1883			}
   1884		} else /* mle->type == DLM_MLE_MIGRATION */ {
   1885			/* should only be getting an assert from new master */
   1886			if (assert->node_idx != mle->new_master) {
   1887				mlog(ML_ERROR, "got assert from %u, but "
   1888				     "new master is %u, and old master "
   1889				     "was %u (%.*s)\n",
   1890				     assert->node_idx, mle->new_master,
   1891				     mle->master, namelen, name);
   1892				goto kill;
   1893			}
   1894
   1895		}
   1896ok:
   1897		spin_unlock(&res->spinlock);
   1898	}
   1899
   1900	// mlog(0, "woo!  got an assert_master from node %u!\n",
   1901	// 	     assert->node_idx);
   1902	if (mle) {
   1903		int extra_ref = 0;
   1904		int nn = -1;
   1905		int rr, err = 0;
   1906
   1907		spin_lock(&mle->spinlock);
   1908		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
   1909			extra_ref = 1;
   1910		else {
   1911			/* MASTER mle: if any bits set in the response map
   1912			 * then the calling node needs to re-assert to clear
   1913			 * up nodes that this node contacted */
   1914			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
   1915						    nn+1)) < O2NM_MAX_NODES) {
   1916				if (nn != dlm->node_num && nn != assert->node_idx) {
   1917					master_request = 1;
   1918					break;
   1919				}
   1920			}
   1921		}
   1922		mle->master = assert->node_idx;
   1923		atomic_set(&mle->woken, 1);
   1924		wake_up(&mle->wq);
   1925		spin_unlock(&mle->spinlock);
   1926
   1927		if (res) {
   1928			int wake = 0;
   1929			spin_lock(&res->spinlock);
   1930			if (mle->type == DLM_MLE_MIGRATION) {
   1931				mlog(0, "finishing off migration of lockres %.*s, "
   1932			     		"from %u to %u\n",
   1933			       		res->lockname.len, res->lockname.name,
   1934			       		dlm->node_num, mle->new_master);
   1935				res->state &= ~DLM_LOCK_RES_MIGRATING;
   1936				wake = 1;
   1937				dlm_change_lockres_owner(dlm, res, mle->new_master);
   1938				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
   1939			} else {
   1940				dlm_change_lockres_owner(dlm, res, mle->master);
   1941			}
   1942			spin_unlock(&res->spinlock);
   1943			have_lockres_ref = 1;
   1944			if (wake)
   1945				wake_up(&res->wq);
   1946		}
   1947
   1948		/* master is known, detach if not already detached.
   1949		 * ensures that only one assert_master call will happen
   1950		 * on this mle. */
   1951		spin_lock(&dlm->master_lock);
   1952
   1953		rr = kref_read(&mle->mle_refs);
   1954		if (mle->inuse > 0) {
   1955			if (extra_ref && rr < 3)
   1956				err = 1;
   1957			else if (!extra_ref && rr < 2)
   1958				err = 1;
   1959		} else {
   1960			if (extra_ref && rr < 2)
   1961				err = 1;
   1962			else if (!extra_ref && rr < 1)
   1963				err = 1;
   1964		}
   1965		if (err) {
   1966			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
   1967			     "that will mess up this node, refs=%d, extra=%d, "
   1968			     "inuse=%d\n", dlm->name, namelen, name,
   1969			     assert->node_idx, rr, extra_ref, mle->inuse);
   1970			dlm_print_one_mle(mle);
   1971		}
   1972		__dlm_unlink_mle(dlm, mle);
   1973		__dlm_mle_detach_hb_events(dlm, mle);
   1974		__dlm_put_mle(mle);
   1975		if (extra_ref) {
   1976			/* the assert master message now balances the extra
   1977		 	 * ref given by the master / migration request message.
   1978		 	 * if this is the last put, it will be removed
   1979		 	 * from the list. */
   1980			__dlm_put_mle(mle);
   1981		}
   1982		spin_unlock(&dlm->master_lock);
   1983	} else if (res) {
   1984		if (res->owner != assert->node_idx) {
   1985			mlog(0, "assert_master from %u, but current "
   1986			     "owner is %u (%.*s), no mle\n", assert->node_idx,
   1987			     res->owner, namelen, name);
   1988		}
   1989	}
   1990	spin_unlock(&dlm->spinlock);
   1991
   1992done:
   1993	ret = 0;
   1994	if (res) {
   1995		spin_lock(&res->spinlock);
   1996		res->state |= DLM_LOCK_RES_SETREF_INPROG;
   1997		spin_unlock(&res->spinlock);
   1998		*ret_data = (void *)res;
   1999	}
   2000	dlm_put(dlm);
   2001	if (master_request) {
   2002		mlog(0, "need to tell master to reassert\n");
   2003		/* positive. negative would shoot down the node. */
   2004		ret |= DLM_ASSERT_RESPONSE_REASSERT;
   2005		if (!have_lockres_ref) {
   2006			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
   2007			     "mle present here for %s:%.*s, but no lockres!\n",
   2008			     assert->node_idx, dlm->name, namelen, name);
   2009		}
   2010	}
   2011	if (have_lockres_ref) {
   2012		/* let the master know we have a reference to the lockres */
   2013		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
   2014		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
   2015		     dlm->name, namelen, name, assert->node_idx);
   2016	}
   2017	return ret;
   2018
   2019kill:
   2020	/* kill the caller! */
   2021	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
   2022	     "and killing the other node now!  This node is OK and can continue.\n");
   2023	__dlm_print_one_lock_resource(res);
   2024	spin_unlock(&res->spinlock);
   2025	spin_lock(&dlm->master_lock);
   2026	if (mle)
   2027		__dlm_put_mle(mle);
   2028	spin_unlock(&dlm->master_lock);
   2029	spin_unlock(&dlm->spinlock);
   2030	*ret_data = (void *)res;
   2031	dlm_put(dlm);
   2032	return -EINVAL;
   2033}
   2034
   2035void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
   2036{
   2037	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
   2038
   2039	if (ret_data) {
   2040		spin_lock(&res->spinlock);
   2041		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
   2042		spin_unlock(&res->spinlock);
   2043		wake_up(&res->wq);
   2044		dlm_lockres_put(res);
   2045	}
   2046	return;
   2047}
   2048
   2049int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
   2050			       struct dlm_lock_resource *res,
   2051			       int ignore_higher, u8 request_from, u32 flags)
   2052{
   2053	struct dlm_work_item *item;
   2054	item = kzalloc(sizeof(*item), GFP_ATOMIC);
   2055	if (!item)
   2056		return -ENOMEM;
   2057
   2058
   2059	/* queue up work for dlm_assert_master_worker */
   2060	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
   2061	item->u.am.lockres = res; /* already have a ref */
   2062	/* can optionally ignore node numbers higher than this node */
   2063	item->u.am.ignore_higher = ignore_higher;
   2064	item->u.am.request_from = request_from;
   2065	item->u.am.flags = flags;
   2066
   2067	if (ignore_higher)
   2068		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
   2069		     res->lockname.name);
   2070
   2071	spin_lock(&dlm->work_lock);
   2072	list_add_tail(&item->list, &dlm->work_list);
   2073	spin_unlock(&dlm->work_lock);
   2074
   2075	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
   2076	return 0;
   2077}
   2078
   2079static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
   2080{
   2081	struct dlm_ctxt *dlm = data;
   2082	int ret = 0;
   2083	struct dlm_lock_resource *res;
   2084	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
   2085	int ignore_higher;
   2086	int bit;
   2087	u8 request_from;
   2088	u32 flags;
   2089
   2090	dlm = item->dlm;
   2091	res = item->u.am.lockres;
   2092	ignore_higher = item->u.am.ignore_higher;
   2093	request_from = item->u.am.request_from;
   2094	flags = item->u.am.flags;
   2095
   2096	spin_lock(&dlm->spinlock);
   2097	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
   2098	spin_unlock(&dlm->spinlock);
   2099
   2100	clear_bit(dlm->node_num, nodemap);
   2101	if (ignore_higher) {
   2102		/* if is this just to clear up mles for nodes below
   2103		 * this node, do not send the message to the original
   2104		 * caller or any node number higher than this */
   2105		clear_bit(request_from, nodemap);
   2106		bit = dlm->node_num;
   2107		while (1) {
   2108			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
   2109					    bit+1);
   2110		       	if (bit >= O2NM_MAX_NODES)
   2111				break;
   2112			clear_bit(bit, nodemap);
   2113		}
   2114	}
   2115
   2116	/*
   2117	 * If we're migrating this lock to someone else, we are no
   2118	 * longer allowed to assert out own mastery.  OTOH, we need to
   2119	 * prevent migration from starting while we're still asserting
   2120	 * our dominance.  The reserved ast delays migration.
   2121	 */
   2122	spin_lock(&res->spinlock);
   2123	if (res->state & DLM_LOCK_RES_MIGRATING) {
   2124		mlog(0, "Someone asked us to assert mastery, but we're "
   2125		     "in the middle of migration.  Skipping assert, "
   2126		     "the new master will handle that.\n");
   2127		spin_unlock(&res->spinlock);
   2128		goto put;
   2129	} else
   2130		__dlm_lockres_reserve_ast(res);
   2131	spin_unlock(&res->spinlock);
   2132
   2133	/* this call now finishes out the nodemap
   2134	 * even if one or more nodes die */
   2135	mlog(0, "worker about to master %.*s here, this=%u\n",
   2136		     res->lockname.len, res->lockname.name, dlm->node_num);
   2137	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
   2138	if (ret < 0) {
   2139		/* no need to restart, we are done */
   2140		if (!dlm_is_host_down(ret))
   2141			mlog_errno(ret);
   2142	}
   2143
   2144	/* Ok, we've asserted ourselves.  Let's let migration start. */
   2145	dlm_lockres_release_ast(dlm, res);
   2146
   2147put:
   2148	dlm_lockres_drop_inflight_worker(dlm, res);
   2149
   2150	dlm_lockres_put(res);
   2151
   2152	mlog(0, "finished with dlm_assert_master_worker\n");
   2153}
   2154
   2155/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
   2156 * We cannot wait for node recovery to complete to begin mastering this
   2157 * lockres because this lockres is used to kick off recovery! ;-)
   2158 * So, do a pre-check on all living nodes to see if any of those nodes
   2159 * think that $RECOVERY is currently mastered by a dead node.  If so,
   2160 * we wait a short time to allow that node to get notified by its own
   2161 * heartbeat stack, then check again.  All $RECOVERY lock resources
   2162 * mastered by dead nodes are purged when the heartbeat callback is
   2163 * fired, so we can know for sure that it is safe to continue once
   2164 * the node returns a live node or no node.  */
   2165static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
   2166				       struct dlm_lock_resource *res)
   2167{
   2168	struct dlm_node_iter iter;
   2169	int nodenum;
   2170	int ret = 0;
   2171	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
   2172
   2173	spin_lock(&dlm->spinlock);
   2174	dlm_node_iter_init(dlm->domain_map, &iter);
   2175	spin_unlock(&dlm->spinlock);
   2176
   2177	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
   2178		/* do not send to self */
   2179		if (nodenum == dlm->node_num)
   2180			continue;
   2181		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
   2182		if (ret < 0) {
   2183			mlog_errno(ret);
   2184			if (!dlm_is_host_down(ret))
   2185				BUG();
   2186			/* host is down, so answer for that node would be
   2187			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
   2188			ret = 0;
   2189		}
   2190
   2191		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
   2192			/* check to see if this master is in the recovery map */
   2193			spin_lock(&dlm->spinlock);
   2194			if (test_bit(master, dlm->recovery_map)) {
   2195				mlog(ML_NOTICE, "%s: node %u has not seen "
   2196				     "node %u go down yet, and thinks the "
   2197				     "dead node is mastering the recovery "
   2198				     "lock.  must wait.\n", dlm->name,
   2199				     nodenum, master);
   2200				ret = -EAGAIN;
   2201			}
   2202			spin_unlock(&dlm->spinlock);
   2203			mlog(0, "%s: reco lock master is %u\n", dlm->name,
   2204			     master);
   2205			break;
   2206		}
   2207	}
   2208	return ret;
   2209}
   2210
   2211/*
   2212 * DLM_DEREF_LOCKRES_MSG
   2213 */
   2214
   2215int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
   2216{
   2217	struct dlm_deref_lockres deref;
   2218	int ret = 0, r;
   2219	const char *lockname;
   2220	unsigned int namelen;
   2221
   2222	lockname = res->lockname.name;
   2223	namelen = res->lockname.len;
   2224	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
   2225
   2226	memset(&deref, 0, sizeof(deref));
   2227	deref.node_idx = dlm->node_num;
   2228	deref.namelen = namelen;
   2229	memcpy(deref.name, lockname, namelen);
   2230
   2231	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
   2232				 &deref, sizeof(deref), res->owner, &r);
   2233	if (ret < 0)
   2234		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
   2235		     dlm->name, namelen, lockname, ret, res->owner);
   2236	else if (r < 0) {
   2237		/* BAD.  other node says I did not have a ref. */
   2238		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
   2239		     dlm->name, namelen, lockname, res->owner, r);
   2240		dlm_print_one_lock_resource(res);
   2241		if (r == -ENOMEM)
   2242			BUG();
   2243	} else
   2244		ret = r;
   2245
   2246	return ret;
   2247}
   2248
   2249int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
   2250			      void **ret_data)
   2251{
   2252	struct dlm_ctxt *dlm = data;
   2253	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
   2254	struct dlm_lock_resource *res = NULL;
   2255	char *name;
   2256	unsigned int namelen;
   2257	int ret = -EINVAL;
   2258	u8 node;
   2259	unsigned int hash;
   2260	struct dlm_work_item *item;
   2261	int cleared = 0;
   2262	int dispatch = 0;
   2263
   2264	if (!dlm_grab(dlm))
   2265		return 0;
   2266
   2267	name = deref->name;
   2268	namelen = deref->namelen;
   2269	node = deref->node_idx;
   2270
   2271	if (namelen > DLM_LOCKID_NAME_MAX) {
   2272		mlog(ML_ERROR, "Invalid name length!");
   2273		goto done;
   2274	}
   2275	if (deref->node_idx >= O2NM_MAX_NODES) {
   2276		mlog(ML_ERROR, "Invalid node number: %u\n", node);
   2277		goto done;
   2278	}
   2279
   2280	hash = dlm_lockid_hash(name, namelen);
   2281
   2282	spin_lock(&dlm->spinlock);
   2283	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
   2284	if (!res) {
   2285		spin_unlock(&dlm->spinlock);
   2286		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
   2287		     dlm->name, namelen, name);
   2288		goto done;
   2289	}
   2290	spin_unlock(&dlm->spinlock);
   2291
   2292	spin_lock(&res->spinlock);
   2293	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
   2294		dispatch = 1;
   2295	else {
   2296		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
   2297		if (test_bit(node, res->refmap)) {
   2298			dlm_lockres_clear_refmap_bit(dlm, res, node);
   2299			cleared = 1;
   2300		}
   2301	}
   2302	spin_unlock(&res->spinlock);
   2303
   2304	if (!dispatch) {
   2305		if (cleared)
   2306			dlm_lockres_calc_usage(dlm, res);
   2307		else {
   2308			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
   2309		     	"but it is already dropped!\n", dlm->name,
   2310		     	res->lockname.len, res->lockname.name, node);
   2311			dlm_print_one_lock_resource(res);
   2312		}
   2313		ret = DLM_DEREF_RESPONSE_DONE;
   2314		goto done;
   2315	}
   2316
   2317	item = kzalloc(sizeof(*item), GFP_NOFS);
   2318	if (!item) {
   2319		ret = -ENOMEM;
   2320		mlog_errno(ret);
   2321		goto done;
   2322	}
   2323
   2324	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
   2325	item->u.dl.deref_res = res;
   2326	item->u.dl.deref_node = node;
   2327
   2328	spin_lock(&dlm->work_lock);
   2329	list_add_tail(&item->list, &dlm->work_list);
   2330	spin_unlock(&dlm->work_lock);
   2331
   2332	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
   2333	return DLM_DEREF_RESPONSE_INPROG;
   2334
   2335done:
   2336	if (res)
   2337		dlm_lockres_put(res);
   2338	dlm_put(dlm);
   2339
   2340	return ret;
   2341}
   2342
   2343int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
   2344			      void **ret_data)
   2345{
   2346	struct dlm_ctxt *dlm = data;
   2347	struct dlm_deref_lockres_done *deref
   2348			= (struct dlm_deref_lockres_done *)msg->buf;
   2349	struct dlm_lock_resource *res = NULL;
   2350	char *name;
   2351	unsigned int namelen;
   2352	int ret = -EINVAL;
   2353	u8 node;
   2354	unsigned int hash;
   2355
   2356	if (!dlm_grab(dlm))
   2357		return 0;
   2358
   2359	name = deref->name;
   2360	namelen = deref->namelen;
   2361	node = deref->node_idx;
   2362
   2363	if (namelen > DLM_LOCKID_NAME_MAX) {
   2364		mlog(ML_ERROR, "Invalid name length!");
   2365		goto done;
   2366	}
   2367	if (deref->node_idx >= O2NM_MAX_NODES) {
   2368		mlog(ML_ERROR, "Invalid node number: %u\n", node);
   2369		goto done;
   2370	}
   2371
   2372	hash = dlm_lockid_hash(name, namelen);
   2373
   2374	spin_lock(&dlm->spinlock);
   2375	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
   2376	if (!res) {
   2377		spin_unlock(&dlm->spinlock);
   2378		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
   2379		     dlm->name, namelen, name);
   2380		goto done;
   2381	}
   2382
   2383	spin_lock(&res->spinlock);
   2384	if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
   2385		spin_unlock(&res->spinlock);
   2386		spin_unlock(&dlm->spinlock);
   2387		mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
   2388			"but it is already derefed!\n", dlm->name,
   2389			res->lockname.len, res->lockname.name, node);
   2390		ret = 0;
   2391		goto done;
   2392	}
   2393
   2394	__dlm_do_purge_lockres(dlm, res);
   2395	spin_unlock(&res->spinlock);
   2396	wake_up(&res->wq);
   2397
   2398	spin_unlock(&dlm->spinlock);
   2399
   2400	ret = 0;
   2401done:
   2402	if (res)
   2403		dlm_lockres_put(res);
   2404	dlm_put(dlm);
   2405	return ret;
   2406}
   2407
   2408static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
   2409		struct dlm_lock_resource *res, u8 node)
   2410{
   2411	struct dlm_deref_lockres_done deref;
   2412	int ret = 0, r;
   2413	const char *lockname;
   2414	unsigned int namelen;
   2415
   2416	lockname = res->lockname.name;
   2417	namelen = res->lockname.len;
   2418	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
   2419
   2420	memset(&deref, 0, sizeof(deref));
   2421	deref.node_idx = dlm->node_num;
   2422	deref.namelen = namelen;
   2423	memcpy(deref.name, lockname, namelen);
   2424
   2425	ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
   2426				 &deref, sizeof(deref), node, &r);
   2427	if (ret < 0) {
   2428		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
   2429				" to node %u\n", dlm->name, namelen,
   2430				lockname, ret, node);
   2431	} else if (r < 0) {
   2432		/* ignore the error */
   2433		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
   2434		     dlm->name, namelen, lockname, node, r);
   2435		dlm_print_one_lock_resource(res);
   2436	}
   2437}
   2438
   2439static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
   2440{
   2441	struct dlm_ctxt *dlm;
   2442	struct dlm_lock_resource *res;
   2443	u8 node;
   2444	u8 cleared = 0;
   2445
   2446	dlm = item->dlm;
   2447	res = item->u.dl.deref_res;
   2448	node = item->u.dl.deref_node;
   2449
   2450	spin_lock(&res->spinlock);
   2451	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
   2452	__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
   2453	if (test_bit(node, res->refmap)) {
   2454		dlm_lockres_clear_refmap_bit(dlm, res, node);
   2455		cleared = 1;
   2456	}
   2457	spin_unlock(&res->spinlock);
   2458
   2459	dlm_drop_lockres_ref_done(dlm, res, node);
   2460
   2461	if (cleared) {
   2462		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
   2463		     dlm->name, res->lockname.len, res->lockname.name, node);
   2464		dlm_lockres_calc_usage(dlm, res);
   2465	} else {
   2466		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
   2467		     "but it is already dropped!\n", dlm->name,
   2468		     res->lockname.len, res->lockname.name, node);
   2469		dlm_print_one_lock_resource(res);
   2470	}
   2471
   2472	dlm_lockres_put(res);
   2473}
   2474
   2475/*
   2476 * A migratable resource is one that is :
   2477 * 1. locally mastered, and,
   2478 * 2. zero local locks, and,
   2479 * 3. one or more non-local locks, or, one or more references
   2480 * Returns 1 if yes, 0 if not.
   2481 */
   2482static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
   2483				      struct dlm_lock_resource *res)
   2484{
   2485	enum dlm_lockres_list idx;
   2486	int nonlocal = 0, node_ref;
   2487	struct list_head *queue;
   2488	struct dlm_lock *lock;
   2489	u64 cookie;
   2490
   2491	assert_spin_locked(&res->spinlock);
   2492
   2493	/* delay migration when the lockres is in MIGRATING state */
   2494	if (res->state & DLM_LOCK_RES_MIGRATING)
   2495		return 0;
   2496
   2497	/* delay migration when the lockres is in RECOCERING state */
   2498	if (res->state & (DLM_LOCK_RES_RECOVERING|
   2499			DLM_LOCK_RES_RECOVERY_WAITING))
   2500		return 0;
   2501
   2502	if (res->owner != dlm->node_num)
   2503		return 0;
   2504
   2505        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
   2506		queue = dlm_list_idx_to_ptr(res, idx);
   2507		list_for_each_entry(lock, queue, list) {
   2508			if (lock->ml.node != dlm->node_num) {
   2509				nonlocal++;
   2510				continue;
   2511			}
   2512			cookie = be64_to_cpu(lock->ml.cookie);
   2513			mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
   2514			     "%s list\n", dlm->name, res->lockname.len,
   2515			     res->lockname.name,
   2516			     dlm_get_lock_cookie_node(cookie),
   2517			     dlm_get_lock_cookie_seq(cookie),
   2518			     dlm_list_in_text(idx));
   2519			return 0;
   2520		}
   2521	}
   2522
   2523	if (!nonlocal) {
   2524		node_ref = find_first_bit(res->refmap, O2NM_MAX_NODES);
   2525		if (node_ref >= O2NM_MAX_NODES)
   2526			return 0;
   2527	}
   2528
   2529	mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
   2530	     res->lockname.name);
   2531
   2532	return 1;
   2533}
   2534
   2535/*
   2536 * DLM_MIGRATE_LOCKRES
   2537 */
   2538
   2539
   2540static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
   2541			       struct dlm_lock_resource *res, u8 target)
   2542{
   2543	struct dlm_master_list_entry *mle = NULL;
   2544	struct dlm_master_list_entry *oldmle = NULL;
   2545 	struct dlm_migratable_lockres *mres = NULL;
   2546	int ret = 0;
   2547	const char *name;
   2548	unsigned int namelen;
   2549	int mle_added = 0;
   2550	int wake = 0;
   2551
   2552	if (!dlm_grab(dlm))
   2553		return -EINVAL;
   2554
   2555	name = res->lockname.name;
   2556	namelen = res->lockname.len;
   2557
   2558	mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
   2559	     target);
   2560
   2561	/* preallocate up front. if this fails, abort */
   2562	ret = -ENOMEM;
   2563	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
   2564	if (!mres) {
   2565		mlog_errno(ret);
   2566		goto leave;
   2567	}
   2568
   2569	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
   2570	if (!mle) {
   2571		mlog_errno(ret);
   2572		goto leave;
   2573	}
   2574	ret = 0;
   2575
   2576	/*
   2577	 * clear any existing master requests and
   2578	 * add the migration mle to the list
   2579	 */
   2580	spin_lock(&dlm->spinlock);
   2581	spin_lock(&dlm->master_lock);
   2582	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
   2583				    namelen, target, dlm->node_num);
   2584	/* get an extra reference on the mle.
   2585	 * otherwise the assert_master from the new
   2586	 * master will destroy this.
   2587	 */
   2588	if (ret != -EEXIST)
   2589		dlm_get_mle_inuse(mle);
   2590
   2591	spin_unlock(&dlm->master_lock);
   2592	spin_unlock(&dlm->spinlock);
   2593
   2594	if (ret == -EEXIST) {
   2595		mlog(0, "another process is already migrating it\n");
   2596		goto fail;
   2597	}
   2598	mle_added = 1;
   2599
   2600	/*
   2601	 * set the MIGRATING flag and flush asts
   2602	 * if we fail after this we need to re-dirty the lockres
   2603	 */
   2604	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
   2605		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
   2606		     "the target went down.\n", res->lockname.len,
   2607		     res->lockname.name, target);
   2608		spin_lock(&res->spinlock);
   2609		res->state &= ~DLM_LOCK_RES_MIGRATING;
   2610		wake = 1;
   2611		spin_unlock(&res->spinlock);
   2612		ret = -EINVAL;
   2613	}
   2614
   2615fail:
   2616	if (ret != -EEXIST && oldmle) {
   2617		/* master is known, detach if not already detached */
   2618		dlm_mle_detach_hb_events(dlm, oldmle);
   2619		dlm_put_mle(oldmle);
   2620	}
   2621
   2622	if (ret < 0) {
   2623		if (mle_added) {
   2624			dlm_mle_detach_hb_events(dlm, mle);
   2625			dlm_put_mle(mle);
   2626			dlm_put_mle_inuse(mle);
   2627		} else if (mle) {
   2628			kmem_cache_free(dlm_mle_cache, mle);
   2629			mle = NULL;
   2630		}
   2631		goto leave;
   2632	}
   2633
   2634	/*
   2635	 * at this point, we have a migration target, an mle
   2636	 * in the master list, and the MIGRATING flag set on
   2637	 * the lockres
   2638	 */
   2639
   2640	/* now that remote nodes are spinning on the MIGRATING flag,
   2641	 * ensure that all assert_master work is flushed. */
   2642	flush_workqueue(dlm->dlm_worker);
   2643
   2644	/* notify new node and send all lock state */
   2645	/* call send_one_lockres with migration flag.
   2646	 * this serves as notice to the target node that a
   2647	 * migration is starting. */
   2648	ret = dlm_send_one_lockres(dlm, res, mres, target,
   2649				   DLM_MRES_MIGRATION);
   2650
   2651	if (ret < 0) {
   2652		mlog(0, "migration to node %u failed with %d\n",
   2653		     target, ret);
   2654		/* migration failed, detach and clean up mle */
   2655		dlm_mle_detach_hb_events(dlm, mle);
   2656		dlm_put_mle(mle);
   2657		dlm_put_mle_inuse(mle);
   2658		spin_lock(&res->spinlock);
   2659		res->state &= ~DLM_LOCK_RES_MIGRATING;
   2660		wake = 1;
   2661		spin_unlock(&res->spinlock);
   2662		if (dlm_is_host_down(ret))
   2663			dlm_wait_for_node_death(dlm, target,
   2664						DLM_NODE_DEATH_WAIT_MAX);
   2665		goto leave;
   2666	}
   2667
   2668	/* at this point, the target sends a message to all nodes,
   2669	 * (using dlm_do_migrate_request).  this node is skipped since
   2670	 * we had to put an mle in the list to begin the process.  this
   2671	 * node now waits for target to do an assert master.  this node
   2672	 * will be the last one notified, ensuring that the migration
   2673	 * is complete everywhere.  if the target dies while this is
   2674	 * going on, some nodes could potentially see the target as the
   2675	 * master, so it is important that my recovery finds the migration
   2676	 * mle and sets the master to UNKNOWN. */
   2677
   2678
   2679	/* wait for new node to assert master */
   2680	while (1) {
   2681		ret = wait_event_interruptible_timeout(mle->wq,
   2682					(atomic_read(&mle->woken) == 1),
   2683					msecs_to_jiffies(5000));
   2684
   2685		if (ret >= 0) {
   2686		       	if (atomic_read(&mle->woken) == 1 ||
   2687			    res->owner == target)
   2688				break;
   2689
   2690			mlog(0, "%s:%.*s: timed out during migration\n",
   2691			     dlm->name, res->lockname.len, res->lockname.name);
   2692			/* avoid hang during shutdown when migrating lockres
   2693			 * to a node which also goes down */
   2694			if (dlm_is_node_dead(dlm, target)) {
   2695				mlog(0, "%s:%.*s: expected migration "
   2696				     "target %u is no longer up, restarting\n",
   2697				     dlm->name, res->lockname.len,
   2698				     res->lockname.name, target);
   2699				ret = -EINVAL;
   2700				/* migration failed, detach and clean up mle */
   2701				dlm_mle_detach_hb_events(dlm, mle);
   2702				dlm_put_mle(mle);
   2703				dlm_put_mle_inuse(mle);
   2704				spin_lock(&res->spinlock);
   2705				res->state &= ~DLM_LOCK_RES_MIGRATING;
   2706				wake = 1;
   2707				spin_unlock(&res->spinlock);
   2708				goto leave;
   2709			}
   2710		} else
   2711			mlog(0, "%s:%.*s: caught signal during migration\n",
   2712			     dlm->name, res->lockname.len, res->lockname.name);
   2713	}
   2714
   2715	/* all done, set the owner, clear the flag */
   2716	spin_lock(&res->spinlock);
   2717	dlm_set_lockres_owner(dlm, res, target);
   2718	res->state &= ~DLM_LOCK_RES_MIGRATING;
   2719	dlm_remove_nonlocal_locks(dlm, res);
   2720	spin_unlock(&res->spinlock);
   2721	wake_up(&res->wq);
   2722
   2723	/* master is known, detach if not already detached */
   2724	dlm_mle_detach_hb_events(dlm, mle);
   2725	dlm_put_mle_inuse(mle);
   2726	ret = 0;
   2727
   2728	dlm_lockres_calc_usage(dlm, res);
   2729
   2730leave:
   2731	/* re-dirty the lockres if we failed */
   2732	if (ret < 0)
   2733		dlm_kick_thread(dlm, res);
   2734
   2735	/* wake up waiters if the MIGRATING flag got set
   2736	 * but migration failed */
   2737	if (wake)
   2738		wake_up(&res->wq);
   2739
   2740	if (mres)
   2741		free_page((unsigned long)mres);
   2742
   2743	dlm_put(dlm);
   2744
   2745	mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
   2746	     name, target, ret);
   2747	return ret;
   2748}
   2749
   2750/*
   2751 * Should be called only after beginning the domain leave process.
   2752 * There should not be any remaining locks on nonlocal lock resources,
   2753 * and there should be no local locks left on locally mastered resources.
   2754 *
   2755 * Called with the dlm spinlock held, may drop it to do migration, but
   2756 * will re-acquire before exit.
   2757 *
   2758 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
   2759 */
   2760int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
   2761	__must_hold(&dlm->spinlock)
   2762{
   2763	int ret;
   2764	int lock_dropped = 0;
   2765	u8 target = O2NM_MAX_NODES;
   2766
   2767	assert_spin_locked(&dlm->spinlock);
   2768
   2769	spin_lock(&res->spinlock);
   2770	if (dlm_is_lockres_migratable(dlm, res))
   2771		target = dlm_pick_migration_target(dlm, res);
   2772	spin_unlock(&res->spinlock);
   2773
   2774	if (target == O2NM_MAX_NODES)
   2775		goto leave;
   2776
   2777	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
   2778	spin_unlock(&dlm->spinlock);
   2779	lock_dropped = 1;
   2780	ret = dlm_migrate_lockres(dlm, res, target);
   2781	if (ret)
   2782		mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
   2783		     dlm->name, res->lockname.len, res->lockname.name,
   2784		     target, ret);
   2785	spin_lock(&dlm->spinlock);
   2786leave:
   2787	return lock_dropped;
   2788}
   2789
   2790int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
   2791{
   2792	int ret;
   2793	spin_lock(&dlm->ast_lock);
   2794	spin_lock(&lock->spinlock);
   2795	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
   2796	spin_unlock(&lock->spinlock);
   2797	spin_unlock(&dlm->ast_lock);
   2798	return ret;
   2799}
   2800
   2801static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
   2802				     struct dlm_lock_resource *res,
   2803				     u8 mig_target)
   2804{
   2805	int can_proceed;
   2806	spin_lock(&res->spinlock);
   2807	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
   2808	spin_unlock(&res->spinlock);
   2809
   2810	/* target has died, so make the caller break out of the
   2811	 * wait_event, but caller must recheck the domain_map */
   2812	spin_lock(&dlm->spinlock);
   2813	if (!test_bit(mig_target, dlm->domain_map))
   2814		can_proceed = 1;
   2815	spin_unlock(&dlm->spinlock);
   2816	return can_proceed;
   2817}
   2818
   2819static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
   2820				struct dlm_lock_resource *res)
   2821{
   2822	int ret;
   2823	spin_lock(&res->spinlock);
   2824	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
   2825	spin_unlock(&res->spinlock);
   2826	return ret;
   2827}
   2828
   2829
   2830static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
   2831				       struct dlm_lock_resource *res,
   2832				       u8 target)
   2833{
   2834	int ret = 0;
   2835
   2836	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
   2837	       res->lockname.len, res->lockname.name, dlm->node_num,
   2838	       target);
   2839	/* need to set MIGRATING flag on lockres.  this is done by
   2840	 * ensuring that all asts have been flushed for this lockres. */
   2841	spin_lock(&res->spinlock);
   2842	BUG_ON(res->migration_pending);
   2843	res->migration_pending = 1;
   2844	/* strategy is to reserve an extra ast then release
   2845	 * it below, letting the release do all of the work */
   2846	__dlm_lockres_reserve_ast(res);
   2847	spin_unlock(&res->spinlock);
   2848
   2849	/* now flush all the pending asts */
   2850	dlm_kick_thread(dlm, res);
   2851	/* before waiting on DIRTY, block processes which may
   2852	 * try to dirty the lockres before MIGRATING is set */
   2853	spin_lock(&res->spinlock);
   2854	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
   2855	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
   2856	spin_unlock(&res->spinlock);
   2857	/* now wait on any pending asts and the DIRTY state */
   2858	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
   2859	dlm_lockres_release_ast(dlm, res);
   2860
   2861	mlog(0, "about to wait on migration_wq, dirty=%s\n",
   2862	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
   2863	/* if the extra ref we just put was the final one, this
   2864	 * will pass thru immediately.  otherwise, we need to wait
   2865	 * for the last ast to finish. */
   2866again:
   2867	ret = wait_event_interruptible_timeout(dlm->migration_wq,
   2868		   dlm_migration_can_proceed(dlm, res, target),
   2869		   msecs_to_jiffies(1000));
   2870	if (ret < 0) {
   2871		mlog(0, "woken again: migrating? %s, dead? %s\n",
   2872		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
   2873		       test_bit(target, dlm->domain_map) ? "no":"yes");
   2874	} else {
   2875		mlog(0, "all is well: migrating? %s, dead? %s\n",
   2876		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
   2877		       test_bit(target, dlm->domain_map) ? "no":"yes");
   2878	}
   2879	if (!dlm_migration_can_proceed(dlm, res, target)) {
   2880		mlog(0, "trying again...\n");
   2881		goto again;
   2882	}
   2883
   2884	ret = 0;
   2885	/* did the target go down or die? */
   2886	spin_lock(&dlm->spinlock);
   2887	if (!test_bit(target, dlm->domain_map)) {
   2888		mlog(ML_ERROR, "aha. migration target %u just went down\n",
   2889		     target);
   2890		ret = -EHOSTDOWN;
   2891	}
   2892	spin_unlock(&dlm->spinlock);
   2893
   2894	/*
   2895	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
   2896	 * another try; otherwise, we are sure the MIGRATING state is there,
   2897	 * drop the unneeded state which blocked threads trying to DIRTY
   2898	 */
   2899	spin_lock(&res->spinlock);
   2900	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
   2901	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
   2902	if (!ret)
   2903		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
   2904	else
   2905		res->migration_pending = 0;
   2906	spin_unlock(&res->spinlock);
   2907
   2908	/*
   2909	 * at this point:
   2910	 *
   2911	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
   2912	 *   o there are no pending asts on this lockres
   2913	 *   o all processes trying to reserve an ast on this
   2914	 *     lockres must wait for the MIGRATING flag to clear
   2915	 */
   2916	return ret;
   2917}
   2918
   2919/* last step in the migration process.
   2920 * original master calls this to free all of the dlm_lock
   2921 * structures that used to be for other nodes. */
   2922static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
   2923				      struct dlm_lock_resource *res)
   2924{
   2925	struct list_head *queue = &res->granted;
   2926	int i, bit;
   2927	struct dlm_lock *lock, *next;
   2928
   2929	assert_spin_locked(&res->spinlock);
   2930
   2931	BUG_ON(res->owner == dlm->node_num);
   2932
   2933	for (i=0; i<3; i++) {
   2934		list_for_each_entry_safe(lock, next, queue, list) {
   2935			if (lock->ml.node != dlm->node_num) {
   2936				mlog(0, "putting lock for node %u\n",
   2937				     lock->ml.node);
   2938				/* be extra careful */
   2939				BUG_ON(!list_empty(&lock->ast_list));
   2940				BUG_ON(!list_empty(&lock->bast_list));
   2941				BUG_ON(lock->ast_pending);
   2942				BUG_ON(lock->bast_pending);
   2943				dlm_lockres_clear_refmap_bit(dlm, res,
   2944							     lock->ml.node);
   2945				list_del_init(&lock->list);
   2946				dlm_lock_put(lock);
   2947				/* In a normal unlock, we would have added a
   2948				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
   2949				dlm_lock_put(lock);
   2950			}
   2951		}
   2952		queue++;
   2953	}
   2954	bit = 0;
   2955	while (1) {
   2956		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
   2957		if (bit >= O2NM_MAX_NODES)
   2958			break;
   2959		/* do not clear the local node reference, if there is a
   2960		 * process holding this, let it drop the ref itself */
   2961		if (bit != dlm->node_num) {
   2962			mlog(0, "%s:%.*s: node %u had a ref to this "
   2963			     "migrating lockres, clearing\n", dlm->name,
   2964			     res->lockname.len, res->lockname.name, bit);
   2965			dlm_lockres_clear_refmap_bit(dlm, res, bit);
   2966		}
   2967		bit++;
   2968	}
   2969}
   2970
   2971/*
   2972 * Pick a node to migrate the lock resource to. This function selects a
   2973 * potential target based first on the locks and then on refmap. It skips
   2974 * nodes that are in the process of exiting the domain.
   2975 */
   2976static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
   2977				    struct dlm_lock_resource *res)
   2978{
   2979	enum dlm_lockres_list idx;
   2980	struct list_head *queue;
   2981	struct dlm_lock *lock;
   2982	int noderef;
   2983	u8 nodenum = O2NM_MAX_NODES;
   2984
   2985	assert_spin_locked(&dlm->spinlock);
   2986	assert_spin_locked(&res->spinlock);
   2987
   2988	/* Go through all the locks */
   2989	for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
   2990		queue = dlm_list_idx_to_ptr(res, idx);
   2991		list_for_each_entry(lock, queue, list) {
   2992			if (lock->ml.node == dlm->node_num)
   2993				continue;
   2994			if (test_bit(lock->ml.node, dlm->exit_domain_map))
   2995				continue;
   2996			nodenum = lock->ml.node;
   2997			goto bail;
   2998		}
   2999	}
   3000
   3001	/* Go thru the refmap */
   3002	noderef = -1;
   3003	while (1) {
   3004		noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
   3005					noderef + 1);
   3006		if (noderef >= O2NM_MAX_NODES)
   3007			break;
   3008		if (noderef == dlm->node_num)
   3009			continue;
   3010		if (test_bit(noderef, dlm->exit_domain_map))
   3011			continue;
   3012		nodenum = noderef;
   3013		goto bail;
   3014	}
   3015
   3016bail:
   3017	return nodenum;
   3018}
   3019
   3020/* this is called by the new master once all lockres
   3021 * data has been received */
   3022static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
   3023				  struct dlm_lock_resource *res,
   3024				  u8 master, u8 new_master,
   3025				  struct dlm_node_iter *iter)
   3026{
   3027	struct dlm_migrate_request migrate;
   3028	int ret, skip, status = 0;
   3029	int nodenum;
   3030
   3031	memset(&migrate, 0, sizeof(migrate));
   3032	migrate.namelen = res->lockname.len;
   3033	memcpy(migrate.name, res->lockname.name, migrate.namelen);
   3034	migrate.new_master = new_master;
   3035	migrate.master = master;
   3036
   3037	ret = 0;
   3038
   3039	/* send message to all nodes, except the master and myself */
   3040	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
   3041		if (nodenum == master ||
   3042		    nodenum == new_master)
   3043			continue;
   3044
   3045		/* We could race exit domain. If exited, skip. */
   3046		spin_lock(&dlm->spinlock);
   3047		skip = (!test_bit(nodenum, dlm->domain_map));
   3048		spin_unlock(&dlm->spinlock);
   3049		if (skip) {
   3050			clear_bit(nodenum, iter->node_map);
   3051			continue;
   3052		}
   3053
   3054		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
   3055					 &migrate, sizeof(migrate), nodenum,
   3056					 &status);
   3057		if (ret < 0) {
   3058			mlog(ML_ERROR, "%s: res %.*s, Error %d send "
   3059			     "MIGRATE_REQUEST to node %u\n", dlm->name,
   3060			     migrate.namelen, migrate.name, ret, nodenum);
   3061			if (!dlm_is_host_down(ret)) {
   3062				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
   3063				BUG();
   3064			}
   3065			clear_bit(nodenum, iter->node_map);
   3066			ret = 0;
   3067		} else if (status < 0) {
   3068			mlog(0, "migrate request (node %u) returned %d!\n",
   3069			     nodenum, status);
   3070			ret = status;
   3071		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
   3072			/* during the migration request we short-circuited
   3073			 * the mastery of the lockres.  make sure we have
   3074			 * a mastery ref for nodenum */
   3075			mlog(0, "%s:%.*s: need ref for node %u\n",
   3076			     dlm->name, res->lockname.len, res->lockname.name,
   3077			     nodenum);
   3078			spin_lock(&res->spinlock);
   3079			dlm_lockres_set_refmap_bit(dlm, res, nodenum);
   3080			spin_unlock(&res->spinlock);
   3081		}
   3082	}
   3083
   3084	if (ret < 0)
   3085		mlog_errno(ret);
   3086
   3087	mlog(0, "returning ret=%d\n", ret);
   3088	return ret;
   3089}
   3090
   3091
   3092/* if there is an existing mle for this lockres, we now know who the master is.
   3093 * (the one who sent us *this* message) we can clear it up right away.
   3094 * since the process that put the mle on the list still has a reference to it,
   3095 * we can unhash it now, set the master and wake the process.  as a result,
   3096 * we will have no mle in the list to start with.  now we can add an mle for
   3097 * the migration and this should be the only one found for those scanning the
   3098 * list.  */
   3099int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
   3100				void **ret_data)
   3101{
   3102	struct dlm_ctxt *dlm = data;
   3103	struct dlm_lock_resource *res = NULL;
   3104	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
   3105	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
   3106	const char *name;
   3107	unsigned int namelen, hash;
   3108	int ret = 0;
   3109
   3110	if (!dlm_grab(dlm))
   3111		return 0;
   3112
   3113	name = migrate->name;
   3114	namelen = migrate->namelen;
   3115	hash = dlm_lockid_hash(name, namelen);
   3116
   3117	/* preallocate.. if this fails, abort */
   3118	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
   3119
   3120	if (!mle) {
   3121		ret = -ENOMEM;
   3122		goto leave;
   3123	}
   3124
   3125	/* check for pre-existing lock */
   3126	spin_lock(&dlm->spinlock);
   3127	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
   3128	if (res) {
   3129		spin_lock(&res->spinlock);
   3130		if (res->state & DLM_LOCK_RES_RECOVERING) {
   3131			/* if all is working ok, this can only mean that we got
   3132		 	* a migrate request from a node that we now see as
   3133		 	* dead.  what can we do here?  drop it to the floor? */
   3134			spin_unlock(&res->spinlock);
   3135			mlog(ML_ERROR, "Got a migrate request, but the "
   3136			     "lockres is marked as recovering!");
   3137			kmem_cache_free(dlm_mle_cache, mle);
   3138			ret = -EINVAL; /* need a better solution */
   3139			goto unlock;
   3140		}
   3141		res->state |= DLM_LOCK_RES_MIGRATING;
   3142		spin_unlock(&res->spinlock);
   3143	}
   3144
   3145	spin_lock(&dlm->master_lock);
   3146	/* ignore status.  only nonzero status would BUG. */
   3147	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
   3148				    name, namelen,
   3149				    migrate->new_master,
   3150				    migrate->master);
   3151
   3152	if (ret < 0)
   3153		kmem_cache_free(dlm_mle_cache, mle);
   3154
   3155	spin_unlock(&dlm->master_lock);
   3156unlock:
   3157	spin_unlock(&dlm->spinlock);
   3158
   3159	if (oldmle) {
   3160		/* master is known, detach if not already detached */
   3161		dlm_mle_detach_hb_events(dlm, oldmle);
   3162		dlm_put_mle(oldmle);
   3163	}
   3164
   3165	if (res)
   3166		dlm_lockres_put(res);
   3167leave:
   3168	dlm_put(dlm);
   3169	return ret;
   3170}
   3171
   3172/* must be holding dlm->spinlock and dlm->master_lock
   3173 * when adding a migration mle, we can clear any other mles
   3174 * in the master list because we know with certainty that
   3175 * the master is "master".  so we remove any old mle from
   3176 * the list after setting it's master field, and then add
   3177 * the new migration mle.  this way we can hold with the rule
   3178 * of having only one mle for a given lock name at all times. */
   3179static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
   3180				 struct dlm_lock_resource *res,
   3181				 struct dlm_master_list_entry *mle,
   3182				 struct dlm_master_list_entry **oldmle,
   3183				 const char *name, unsigned int namelen,
   3184				 u8 new_master, u8 master)
   3185{
   3186	int found;
   3187	int ret = 0;
   3188
   3189	*oldmle = NULL;
   3190
   3191	assert_spin_locked(&dlm->spinlock);
   3192	assert_spin_locked(&dlm->master_lock);
   3193
   3194	/* caller is responsible for any ref taken here on oldmle */
   3195	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
   3196	if (found) {
   3197		struct dlm_master_list_entry *tmp = *oldmle;
   3198		spin_lock(&tmp->spinlock);
   3199		if (tmp->type == DLM_MLE_MIGRATION) {
   3200			if (master == dlm->node_num) {
   3201				/* ah another process raced me to it */
   3202				mlog(0, "tried to migrate %.*s, but some "
   3203				     "process beat me to it\n",
   3204				     namelen, name);
   3205				spin_unlock(&tmp->spinlock);
   3206				return -EEXIST;
   3207			} else {
   3208				/* bad.  2 NODES are trying to migrate! */
   3209				mlog(ML_ERROR, "migration error  mle: "
   3210				     "master=%u new_master=%u // request: "
   3211				     "master=%u new_master=%u // "
   3212				     "lockres=%.*s\n",
   3213				     tmp->master, tmp->new_master,
   3214				     master, new_master,
   3215				     namelen, name);
   3216				BUG();
   3217			}
   3218		} else {
   3219			/* this is essentially what assert_master does */
   3220			tmp->master = master;
   3221			atomic_set(&tmp->woken, 1);
   3222			wake_up(&tmp->wq);
   3223			/* remove it so that only one mle will be found */
   3224			__dlm_unlink_mle(dlm, tmp);
   3225			__dlm_mle_detach_hb_events(dlm, tmp);
   3226			if (tmp->type == DLM_MLE_MASTER) {
   3227				ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
   3228				mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
   3229						"telling master to get ref "
   3230						"for cleared out mle during "
   3231						"migration\n", dlm->name,
   3232						namelen, name, master,
   3233						new_master);
   3234			}
   3235		}
   3236		spin_unlock(&tmp->spinlock);
   3237	}
   3238
   3239	/* now add a migration mle to the tail of the list */
   3240	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
   3241	mle->new_master = new_master;
   3242	/* the new master will be sending an assert master for this.
   3243	 * at that point we will get the refmap reference */
   3244	mle->master = master;
   3245	/* do this for consistency with other mle types */
   3246	set_bit(new_master, mle->maybe_map);
   3247	__dlm_insert_mle(dlm, mle);
   3248
   3249	return ret;
   3250}
   3251
   3252/*
   3253 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
   3254 */
   3255static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
   3256					struct dlm_master_list_entry *mle)
   3257{
   3258	struct dlm_lock_resource *res;
   3259
   3260	/* Find the lockres associated to the mle and set its owner to UNK */
   3261	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
   3262				   mle->mnamehash);
   3263	if (res) {
   3264		spin_unlock(&dlm->master_lock);
   3265
   3266		/* move lockres onto recovery list */
   3267		spin_lock(&res->spinlock);
   3268		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
   3269		dlm_move_lockres_to_recovery_list(dlm, res);
   3270		spin_unlock(&res->spinlock);
   3271		dlm_lockres_put(res);
   3272
   3273		/* about to get rid of mle, detach from heartbeat */
   3274		__dlm_mle_detach_hb_events(dlm, mle);
   3275
   3276		/* dump the mle */
   3277		spin_lock(&dlm->master_lock);
   3278		__dlm_put_mle(mle);
   3279		spin_unlock(&dlm->master_lock);
   3280	}
   3281
   3282	return res;
   3283}
   3284
   3285static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
   3286				    struct dlm_master_list_entry *mle)
   3287{
   3288	__dlm_mle_detach_hb_events(dlm, mle);
   3289
   3290	spin_lock(&mle->spinlock);
   3291	__dlm_unlink_mle(dlm, mle);
   3292	atomic_set(&mle->woken, 1);
   3293	spin_unlock(&mle->spinlock);
   3294
   3295	wake_up(&mle->wq);
   3296}
   3297
   3298static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
   3299				struct dlm_master_list_entry *mle, u8 dead_node)
   3300{
   3301	int bit;
   3302
   3303	BUG_ON(mle->type != DLM_MLE_BLOCK);
   3304
   3305	spin_lock(&mle->spinlock);
   3306	bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
   3307	if (bit != dead_node) {
   3308		mlog(0, "mle found, but dead node %u would not have been "
   3309		     "master\n", dead_node);
   3310		spin_unlock(&mle->spinlock);
   3311	} else {
   3312		/* Must drop the refcount by one since the assert_master will
   3313		 * never arrive. This may result in the mle being unlinked and
   3314		 * freed, but there may still be a process waiting in the
   3315		 * dlmlock path which is fine. */
   3316		mlog(0, "node %u was expected master\n", dead_node);
   3317		atomic_set(&mle->woken, 1);
   3318		spin_unlock(&mle->spinlock);
   3319		wake_up(&mle->wq);
   3320
   3321		/* Do not need events any longer, so detach from heartbeat */
   3322		__dlm_mle_detach_hb_events(dlm, mle);
   3323		__dlm_put_mle(mle);
   3324	}
   3325}
   3326
   3327void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
   3328{
   3329	struct dlm_master_list_entry *mle;
   3330	struct dlm_lock_resource *res;
   3331	struct hlist_head *bucket;
   3332	struct hlist_node *tmp;
   3333	unsigned int i;
   3334
   3335	mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
   3336top:
   3337	assert_spin_locked(&dlm->spinlock);
   3338
   3339	/* clean the master list */
   3340	spin_lock(&dlm->master_lock);
   3341	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
   3342		bucket = dlm_master_hash(dlm, i);
   3343		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
   3344			BUG_ON(mle->type != DLM_MLE_BLOCK &&
   3345			       mle->type != DLM_MLE_MASTER &&
   3346			       mle->type != DLM_MLE_MIGRATION);
   3347
   3348			/* MASTER mles are initiated locally. The waiting
   3349			 * process will notice the node map change shortly.
   3350			 * Let that happen as normal. */
   3351			if (mle->type == DLM_MLE_MASTER)
   3352				continue;
   3353
   3354			/* BLOCK mles are initiated by other nodes. Need to
   3355			 * clean up if the dead node would have been the
   3356			 * master. */
   3357			if (mle->type == DLM_MLE_BLOCK) {
   3358				dlm_clean_block_mle(dlm, mle, dead_node);
   3359				continue;
   3360			}
   3361
   3362			/* Everything else is a MIGRATION mle */
   3363
   3364			/* The rule for MIGRATION mles is that the master
   3365			 * becomes UNKNOWN if *either* the original or the new
   3366			 * master dies. All UNKNOWN lockres' are sent to
   3367			 * whichever node becomes the recovery master. The new
   3368			 * master is responsible for determining if there is
   3369			 * still a master for this lockres, or if he needs to
   3370			 * take over mastery. Either way, this node should
   3371			 * expect another message to resolve this. */
   3372
   3373			if (mle->master != dead_node &&
   3374			    mle->new_master != dead_node)
   3375				continue;
   3376
   3377			if (mle->new_master == dead_node && mle->inuse) {
   3378				mlog(ML_NOTICE, "%s: target %u died during "
   3379						"migration from %u, the MLE is "
   3380						"still keep used, ignore it!\n",
   3381						dlm->name, dead_node,
   3382						mle->master);
   3383				continue;
   3384			}
   3385
   3386			/* If we have reached this point, this mle needs to be
   3387			 * removed from the list and freed. */
   3388			dlm_clean_migration_mle(dlm, mle);
   3389
   3390			mlog(0, "%s: node %u died during migration from "
   3391			     "%u to %u!\n", dlm->name, dead_node, mle->master,
   3392			     mle->new_master);
   3393
   3394			/* If we find a lockres associated with the mle, we've
   3395			 * hit this rare case that messes up our lock ordering.
   3396			 * If so, we need to drop the master lock so that we can
   3397			 * take the lockres lock, meaning that we will have to
   3398			 * restart from the head of list. */
   3399			res = dlm_reset_mleres_owner(dlm, mle);
   3400			if (res)
   3401				/* restart */
   3402				goto top;
   3403
   3404			/* This may be the last reference */
   3405			__dlm_put_mle(mle);
   3406		}
   3407	}
   3408	spin_unlock(&dlm->master_lock);
   3409}
   3410
   3411int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
   3412			 u8 old_master)
   3413{
   3414	struct dlm_node_iter iter;
   3415	int ret = 0;
   3416
   3417	spin_lock(&dlm->spinlock);
   3418	dlm_node_iter_init(dlm->domain_map, &iter);
   3419	clear_bit(old_master, iter.node_map);
   3420	clear_bit(dlm->node_num, iter.node_map);
   3421	spin_unlock(&dlm->spinlock);
   3422
   3423	/* ownership of the lockres is changing.  account for the
   3424	 * mastery reference here since old_master will briefly have
   3425	 * a reference after the migration completes */
   3426	spin_lock(&res->spinlock);
   3427	dlm_lockres_set_refmap_bit(dlm, res, old_master);
   3428	spin_unlock(&res->spinlock);
   3429
   3430	mlog(0, "now time to do a migrate request to other nodes\n");
   3431	ret = dlm_do_migrate_request(dlm, res, old_master,
   3432				     dlm->node_num, &iter);
   3433	if (ret < 0) {
   3434		mlog_errno(ret);
   3435		goto leave;
   3436	}
   3437
   3438	mlog(0, "doing assert master of %.*s to all except the original node\n",
   3439	     res->lockname.len, res->lockname.name);
   3440	/* this call now finishes out the nodemap
   3441	 * even if one or more nodes die */
   3442	ret = dlm_do_assert_master(dlm, res, iter.node_map,
   3443				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
   3444	if (ret < 0) {
   3445		/* no longer need to retry.  all living nodes contacted. */
   3446		mlog_errno(ret);
   3447		ret = 0;
   3448	}
   3449
   3450	memset(iter.node_map, 0, sizeof(iter.node_map));
   3451	set_bit(old_master, iter.node_map);
   3452	mlog(0, "doing assert master of %.*s back to %u\n",
   3453	     res->lockname.len, res->lockname.name, old_master);
   3454	ret = dlm_do_assert_master(dlm, res, iter.node_map,
   3455				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
   3456	if (ret < 0) {
   3457		mlog(0, "assert master to original master failed "
   3458		     "with %d.\n", ret);
   3459		/* the only nonzero status here would be because of
   3460		 * a dead original node.  we're done. */
   3461		ret = 0;
   3462	}
   3463
   3464	/* all done, set the owner, clear the flag */
   3465	spin_lock(&res->spinlock);
   3466	dlm_set_lockres_owner(dlm, res, dlm->node_num);
   3467	res->state &= ~DLM_LOCK_RES_MIGRATING;
   3468	spin_unlock(&res->spinlock);
   3469	/* re-dirty it on the new master */
   3470	dlm_kick_thread(dlm, res);
   3471	wake_up(&res->wq);
   3472leave:
   3473	return ret;
   3474}
   3475
   3476/*
   3477 * LOCKRES AST REFCOUNT
   3478 * this is integral to migration
   3479 */
   3480
   3481/* for future intent to call an ast, reserve one ahead of time.
   3482 * this should be called only after waiting on the lockres
   3483 * with dlm_wait_on_lockres, and while still holding the
   3484 * spinlock after the call. */
   3485void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
   3486{
   3487	assert_spin_locked(&res->spinlock);
   3488	if (res->state & DLM_LOCK_RES_MIGRATING) {
   3489		__dlm_print_one_lock_resource(res);
   3490	}
   3491	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
   3492
   3493	atomic_inc(&res->asts_reserved);
   3494}
   3495
   3496/*
   3497 * used to drop the reserved ast, either because it went unused,
   3498 * or because the ast/bast was actually called.
   3499 *
   3500 * also, if there is a pending migration on this lockres,
   3501 * and this was the last pending ast on the lockres,
   3502 * atomically set the MIGRATING flag before we drop the lock.
   3503 * this is how we ensure that migration can proceed with no
   3504 * asts in progress.  note that it is ok if the state of the
   3505 * queues is such that a lock should be granted in the future
   3506 * or that a bast should be fired, because the new master will
   3507 * shuffle the lists on this lockres as soon as it is migrated.
   3508 */
   3509void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
   3510			     struct dlm_lock_resource *res)
   3511{
   3512	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
   3513		return;
   3514
   3515	if (!res->migration_pending) {
   3516		spin_unlock(&res->spinlock);
   3517		return;
   3518	}
   3519
   3520	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
   3521	res->migration_pending = 0;
   3522	res->state |= DLM_LOCK_RES_MIGRATING;
   3523	spin_unlock(&res->spinlock);
   3524	wake_up(&res->wq);
   3525	wake_up(&dlm->migration_wq);
   3526}
   3527
   3528void dlm_force_free_mles(struct dlm_ctxt *dlm)
   3529{
   3530	int i;
   3531	struct hlist_head *bucket;
   3532	struct dlm_master_list_entry *mle;
   3533	struct hlist_node *tmp;
   3534
   3535	/*
   3536	 * We notified all other nodes that we are exiting the domain and
   3537	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
   3538	 * around we force free them and wake any processes that are waiting
   3539	 * on the mles
   3540	 */
   3541	spin_lock(&dlm->spinlock);
   3542	spin_lock(&dlm->master_lock);
   3543
   3544	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
   3545	BUG_ON((find_first_bit(dlm->domain_map, O2NM_MAX_NODES) < O2NM_MAX_NODES));
   3546
   3547	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
   3548		bucket = dlm_master_hash(dlm, i);
   3549		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
   3550			if (mle->type != DLM_MLE_BLOCK) {
   3551				mlog(ML_ERROR, "bad mle: %p\n", mle);
   3552				dlm_print_one_mle(mle);
   3553			}
   3554			atomic_set(&mle->woken, 1);
   3555			wake_up(&mle->wq);
   3556
   3557			__dlm_unlink_mle(dlm, mle);
   3558			__dlm_mle_detach_hb_events(dlm, mle);
   3559			__dlm_put_mle(mle);
   3560		}
   3561	}
   3562	spin_unlock(&dlm->master_lock);
   3563	spin_unlock(&dlm->spinlock);
   3564}