cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

dlmlock.c (19089B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * dlmlock.c
      4 *
      5 * underlying calls for lock creation
      6 *
      7 * Copyright (C) 2004 Oracle.  All rights reserved.
      8 */
      9
     10
     11#include <linux/module.h>
     12#include <linux/fs.h>
     13#include <linux/types.h>
     14#include <linux/slab.h>
     15#include <linux/highmem.h>
     16#include <linux/init.h>
     17#include <linux/sysctl.h>
     18#include <linux/random.h>
     19#include <linux/blkdev.h>
     20#include <linux/socket.h>
     21#include <linux/inet.h>
     22#include <linux/spinlock.h>
     23#include <linux/delay.h>
     24
     25
     26#include "../cluster/heartbeat.h"
     27#include "../cluster/nodemanager.h"
     28#include "../cluster/tcp.h"
     29
     30#include "dlmapi.h"
     31#include "dlmcommon.h"
     32
     33#include "dlmconvert.h"
     34
     35#define MLOG_MASK_PREFIX ML_DLM
     36#include "../cluster/masklog.h"
     37
     38static struct kmem_cache *dlm_lock_cache;
     39
     40static DEFINE_SPINLOCK(dlm_cookie_lock);
     41static u64 dlm_next_cookie = 1;
     42
     43static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
     44					       struct dlm_lock_resource *res,
     45					       struct dlm_lock *lock, int flags);
     46static void dlm_init_lock(struct dlm_lock *newlock, int type,
     47			  u8 node, u64 cookie);
     48static void dlm_lock_release(struct kref *kref);
     49static void dlm_lock_detach_lockres(struct dlm_lock *lock);
     50
     51int dlm_init_lock_cache(void)
     52{
     53	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
     54					   sizeof(struct dlm_lock),
     55					   0, SLAB_HWCACHE_ALIGN, NULL);
     56	if (dlm_lock_cache == NULL)
     57		return -ENOMEM;
     58	return 0;
     59}
     60
     61void dlm_destroy_lock_cache(void)
     62{
     63	kmem_cache_destroy(dlm_lock_cache);
     64}
     65
     66/* Tell us whether we can grant a new lock request.
     67 * locking:
     68 *   caller needs:  res->spinlock
     69 *   taken:         none
     70 *   held on exit:  none
     71 * returns: 1 if the lock can be granted, 0 otherwise.
     72 */
     73static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
     74				  struct dlm_lock *lock)
     75{
     76	struct dlm_lock *tmplock;
     77
     78	list_for_each_entry(tmplock, &res->granted, list) {
     79		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
     80			return 0;
     81	}
     82
     83	list_for_each_entry(tmplock, &res->converting, list) {
     84		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
     85			return 0;
     86		if (!dlm_lock_compatible(tmplock->ml.convert_type,
     87					 lock->ml.type))
     88			return 0;
     89	}
     90
     91	return 1;
     92}
     93
     94/* performs lock creation at the lockres master site
     95 * locking:
     96 *   caller needs:  none
     97 *   taken:         takes and drops res->spinlock
     98 *   held on exit:  none
     99 * returns: DLM_NORMAL, DLM_NOTQUEUED
    100 */
    101static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
    102				      struct dlm_lock_resource *res,
    103				      struct dlm_lock *lock, int flags)
    104{
    105	int call_ast = 0, kick_thread = 0;
    106	enum dlm_status status = DLM_NORMAL;
    107
    108	mlog(0, "type=%d\n", lock->ml.type);
    109
    110	spin_lock(&res->spinlock);
    111	/* if called from dlm_create_lock_handler, need to
    112	 * ensure it will not sleep in dlm_wait_on_lockres */
    113	status = __dlm_lockres_state_to_status(res);
    114	if (status != DLM_NORMAL &&
    115	    lock->ml.node != dlm->node_num) {
    116		/* erf.  state changed after lock was dropped. */
    117		spin_unlock(&res->spinlock);
    118		dlm_error(status);
    119		return status;
    120	}
    121	__dlm_wait_on_lockres(res);
    122	__dlm_lockres_reserve_ast(res);
    123
    124	if (dlm_can_grant_new_lock(res, lock)) {
    125		mlog(0, "I can grant this lock right away\n");
    126		/* got it right away */
    127		lock->lksb->status = DLM_NORMAL;
    128		status = DLM_NORMAL;
    129		dlm_lock_get(lock);
    130		list_add_tail(&lock->list, &res->granted);
    131
    132		/* for the recovery lock, we can't allow the ast
    133		 * to be queued since the dlmthread is already
    134		 * frozen.  but the recovery lock is always locked
    135		 * with LKM_NOQUEUE so we do not need the ast in
    136		 * this special case */
    137		if (!dlm_is_recovery_lock(res->lockname.name,
    138					  res->lockname.len)) {
    139			kick_thread = 1;
    140			call_ast = 1;
    141		} else {
    142			mlog(0, "%s: returning DLM_NORMAL to "
    143			     "node %u for reco lock\n", dlm->name,
    144			     lock->ml.node);
    145		}
    146	} else {
    147		/* for NOQUEUE request, unless we get the
    148		 * lock right away, return DLM_NOTQUEUED */
    149		if (flags & LKM_NOQUEUE) {
    150			status = DLM_NOTQUEUED;
    151			if (dlm_is_recovery_lock(res->lockname.name,
    152						 res->lockname.len)) {
    153				mlog(0, "%s: returning NOTQUEUED to "
    154				     "node %u for reco lock\n", dlm->name,
    155				     lock->ml.node);
    156			}
    157		} else {
    158			status = DLM_NORMAL;
    159			dlm_lock_get(lock);
    160			list_add_tail(&lock->list, &res->blocked);
    161			kick_thread = 1;
    162		}
    163	}
    164
    165	spin_unlock(&res->spinlock);
    166	wake_up(&res->wq);
    167
    168	/* either queue the ast or release it */
    169	if (call_ast)
    170		dlm_queue_ast(dlm, lock);
    171	else
    172		dlm_lockres_release_ast(dlm, res);
    173
    174	dlm_lockres_calc_usage(dlm, res);
    175	if (kick_thread)
    176		dlm_kick_thread(dlm, res);
    177
    178	return status;
    179}
    180
    181void dlm_revert_pending_lock(struct dlm_lock_resource *res,
    182			     struct dlm_lock *lock)
    183{
    184	/* remove from local queue if it failed */
    185	list_del_init(&lock->list);
    186	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
    187}
    188
    189
    190/*
    191 * locking:
    192 *   caller needs:  none
    193 *   taken:         takes and drops res->spinlock
    194 *   held on exit:  none
    195 * returns: DLM_DENIED, DLM_RECOVERING, or net status
    196 */
    197static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
    198				      struct dlm_lock_resource *res,
    199				      struct dlm_lock *lock, int flags)
    200{
    201	enum dlm_status status = DLM_DENIED;
    202	int lockres_changed = 1;
    203
    204	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
    205	     lock->ml.type, res->lockname.len,
    206	     res->lockname.name, flags);
    207
    208	/*
    209	 * Wait if resource is getting recovered, remastered, etc.
    210	 * If the resource was remastered and new owner is self, then exit.
    211	 */
    212	spin_lock(&res->spinlock);
    213	__dlm_wait_on_lockres(res);
    214	if (res->owner == dlm->node_num) {
    215		spin_unlock(&res->spinlock);
    216		return DLM_RECOVERING;
    217	}
    218	res->state |= DLM_LOCK_RES_IN_PROGRESS;
    219
    220	/* add lock to local (secondary) queue */
    221	dlm_lock_get(lock);
    222	list_add_tail(&lock->list, &res->blocked);
    223	lock->lock_pending = 1;
    224	spin_unlock(&res->spinlock);
    225
    226	/* spec seems to say that you will get DLM_NORMAL when the lock
    227	 * has been queued, meaning we need to wait for a reply here. */
    228	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
    229
    230	spin_lock(&res->spinlock);
    231	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
    232	lock->lock_pending = 0;
    233	if (status != DLM_NORMAL) {
    234		if (status == DLM_RECOVERING &&
    235		    dlm_is_recovery_lock(res->lockname.name,
    236					 res->lockname.len)) {
    237			/* recovery lock was mastered by dead node.
    238			 * we need to have calc_usage shoot down this
    239			 * lockres and completely remaster it. */
    240			mlog(0, "%s: recovery lock was owned by "
    241			     "dead node %u, remaster it now.\n",
    242			     dlm->name, res->owner);
    243		} else if (status != DLM_NOTQUEUED) {
    244			/*
    245			 * DO NOT call calc_usage, as this would unhash
    246			 * the remote lockres before we ever get to use
    247			 * it.  treat as if we never made any change to
    248			 * the lockres.
    249			 */
    250			lockres_changed = 0;
    251			dlm_error(status);
    252		}
    253		dlm_revert_pending_lock(res, lock);
    254		dlm_lock_put(lock);
    255	} else if (dlm_is_recovery_lock(res->lockname.name,
    256					res->lockname.len)) {
    257		/* special case for the $RECOVERY lock.
    258		 * there will never be an AST delivered to put
    259		 * this lock on the proper secondary queue
    260		 * (granted), so do it manually. */
    261		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
    262		     "mastered by %u; got lock, manually granting (no ast)\n",
    263		     dlm->name, dlm->node_num, res->owner);
    264		list_move_tail(&lock->list, &res->granted);
    265	}
    266	spin_unlock(&res->spinlock);
    267
    268	if (lockres_changed)
    269		dlm_lockres_calc_usage(dlm, res);
    270
    271	wake_up(&res->wq);
    272	return status;
    273}
    274
    275
    276/* for remote lock creation.
    277 * locking:
    278 *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
    279 *   taken:         none
    280 *   held on exit:  none
    281 * returns: DLM_NOLOCKMGR, or net status
    282 */
    283static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
    284					       struct dlm_lock_resource *res,
    285					       struct dlm_lock *lock, int flags)
    286{
    287	struct dlm_create_lock create;
    288	int tmpret, status = 0;
    289	enum dlm_status ret;
    290
    291	memset(&create, 0, sizeof(create));
    292	create.node_idx = dlm->node_num;
    293	create.requested_type = lock->ml.type;
    294	create.cookie = lock->ml.cookie;
    295	create.namelen = res->lockname.len;
    296	create.flags = cpu_to_be32(flags);
    297	memcpy(create.name, res->lockname.name, create.namelen);
    298
    299	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
    300				    sizeof(create), res->owner, &status);
    301	if (tmpret >= 0) {
    302		ret = status;
    303		if (ret == DLM_REJECTED) {
    304			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
    305			     "owned by node %u. That node is coming back up "
    306			     "currently.\n", dlm->name, create.namelen,
    307			     create.name, res->owner);
    308			dlm_print_one_lock_resource(res);
    309			BUG();
    310		}
    311	} else {
    312		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
    313		     "node %u\n", dlm->name, create.namelen, create.name,
    314		     tmpret, res->owner);
    315		if (dlm_is_host_down(tmpret))
    316			ret = DLM_RECOVERING;
    317		else
    318			ret = dlm_err_to_dlm_status(tmpret);
    319	}
    320
    321	return ret;
    322}
    323
    324void dlm_lock_get(struct dlm_lock *lock)
    325{
    326	kref_get(&lock->lock_refs);
    327}
    328
    329void dlm_lock_put(struct dlm_lock *lock)
    330{
    331	kref_put(&lock->lock_refs, dlm_lock_release);
    332}
    333
    334static void dlm_lock_release(struct kref *kref)
    335{
    336	struct dlm_lock *lock;
    337
    338	lock = container_of(kref, struct dlm_lock, lock_refs);
    339
    340	BUG_ON(!list_empty(&lock->list));
    341	BUG_ON(!list_empty(&lock->ast_list));
    342	BUG_ON(!list_empty(&lock->bast_list));
    343	BUG_ON(lock->ast_pending);
    344	BUG_ON(lock->bast_pending);
    345
    346	dlm_lock_detach_lockres(lock);
    347
    348	if (lock->lksb_kernel_allocated) {
    349		mlog(0, "freeing kernel-allocated lksb\n");
    350		kfree(lock->lksb);
    351	}
    352	kmem_cache_free(dlm_lock_cache, lock);
    353}
    354
    355/* associate a lock with it's lockres, getting a ref on the lockres */
    356void dlm_lock_attach_lockres(struct dlm_lock *lock,
    357			     struct dlm_lock_resource *res)
    358{
    359	dlm_lockres_get(res);
    360	lock->lockres = res;
    361}
    362
    363/* drop ref on lockres, if there is still one associated with lock */
    364static void dlm_lock_detach_lockres(struct dlm_lock *lock)
    365{
    366	struct dlm_lock_resource *res;
    367
    368	res = lock->lockres;
    369	if (res) {
    370		lock->lockres = NULL;
    371		mlog(0, "removing lock's lockres reference\n");
    372		dlm_lockres_put(res);
    373	}
    374}
    375
    376static void dlm_init_lock(struct dlm_lock *newlock, int type,
    377			  u8 node, u64 cookie)
    378{
    379	INIT_LIST_HEAD(&newlock->list);
    380	INIT_LIST_HEAD(&newlock->ast_list);
    381	INIT_LIST_HEAD(&newlock->bast_list);
    382	spin_lock_init(&newlock->spinlock);
    383	newlock->ml.type = type;
    384	newlock->ml.convert_type = LKM_IVMODE;
    385	newlock->ml.highest_blocked = LKM_IVMODE;
    386	newlock->ml.node = node;
    387	newlock->ml.pad1 = 0;
    388	newlock->ml.list = 0;
    389	newlock->ml.flags = 0;
    390	newlock->ast = NULL;
    391	newlock->bast = NULL;
    392	newlock->astdata = NULL;
    393	newlock->ml.cookie = cpu_to_be64(cookie);
    394	newlock->ast_pending = 0;
    395	newlock->bast_pending = 0;
    396	newlock->convert_pending = 0;
    397	newlock->lock_pending = 0;
    398	newlock->unlock_pending = 0;
    399	newlock->cancel_pending = 0;
    400	newlock->lksb_kernel_allocated = 0;
    401
    402	kref_init(&newlock->lock_refs);
    403}
    404
    405struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
    406			       struct dlm_lockstatus *lksb)
    407{
    408	struct dlm_lock *lock;
    409	int kernel_allocated = 0;
    410
    411	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
    412	if (!lock)
    413		return NULL;
    414
    415	if (!lksb) {
    416		/* zero memory only if kernel-allocated */
    417		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
    418		if (!lksb) {
    419			kmem_cache_free(dlm_lock_cache, lock);
    420			return NULL;
    421		}
    422		kernel_allocated = 1;
    423	}
    424
    425	dlm_init_lock(lock, type, node, cookie);
    426	if (kernel_allocated)
    427		lock->lksb_kernel_allocated = 1;
    428	lock->lksb = lksb;
    429	lksb->lockid = lock;
    430	return lock;
    431}
    432
    433/* handler for lock creation net message
    434 * locking:
    435 *   caller needs:  none
    436 *   taken:         takes and drops res->spinlock
    437 *   held on exit:  none
    438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
    439 */
    440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
    441			    void **ret_data)
    442{
    443	struct dlm_ctxt *dlm = data;
    444	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
    445	struct dlm_lock_resource *res = NULL;
    446	struct dlm_lock *newlock = NULL;
    447	struct dlm_lockstatus *lksb = NULL;
    448	enum dlm_status status = DLM_NORMAL;
    449	char *name;
    450	unsigned int namelen;
    451
    452	BUG_ON(!dlm);
    453
    454	if (!dlm_grab(dlm))
    455		return DLM_REJECTED;
    456
    457	name = create->name;
    458	namelen = create->namelen;
    459	status = DLM_REJECTED;
    460	if (!dlm_domain_fully_joined(dlm)) {
    461		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
    462		     "sending a create_lock message for lock %.*s!\n",
    463		     dlm->name, create->node_idx, namelen, name);
    464		dlm_error(status);
    465		goto leave;
    466	}
    467
    468	status = DLM_IVBUFLEN;
    469	if (namelen > DLM_LOCKID_NAME_MAX) {
    470		dlm_error(status);
    471		goto leave;
    472	}
    473
    474	status = DLM_SYSERR;
    475	newlock = dlm_new_lock(create->requested_type,
    476			       create->node_idx,
    477			       be64_to_cpu(create->cookie), NULL);
    478	if (!newlock) {
    479		dlm_error(status);
    480		goto leave;
    481	}
    482
    483	lksb = newlock->lksb;
    484
    485	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
    486		lksb->flags |= DLM_LKSB_GET_LVB;
    487		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
    488	}
    489
    490	status = DLM_IVLOCKID;
    491	res = dlm_lookup_lockres(dlm, name, namelen);
    492	if (!res) {
    493		dlm_error(status);
    494		goto leave;
    495	}
    496
    497	spin_lock(&res->spinlock);
    498	status = __dlm_lockres_state_to_status(res);
    499	spin_unlock(&res->spinlock);
    500
    501	if (status != DLM_NORMAL) {
    502		mlog(0, "lockres recovering/migrating/in-progress\n");
    503		goto leave;
    504	}
    505
    506	dlm_lock_attach_lockres(newlock, res);
    507
    508	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
    509leave:
    510	if (status != DLM_NORMAL)
    511		if (newlock)
    512			dlm_lock_put(newlock);
    513
    514	if (res)
    515		dlm_lockres_put(res);
    516
    517	dlm_put(dlm);
    518
    519	return status;
    520}
    521
    522
    523/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
    524static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
    525{
    526	u64 tmpnode = node_num;
    527
    528	/* shift single byte of node num into top 8 bits */
    529	tmpnode <<= 56;
    530
    531	spin_lock(&dlm_cookie_lock);
    532	*cookie = (dlm_next_cookie | tmpnode);
    533	if (++dlm_next_cookie & 0xff00000000000000ull) {
    534		mlog(0, "This node's cookie will now wrap!\n");
    535		dlm_next_cookie = 1;
    536	}
    537	spin_unlock(&dlm_cookie_lock);
    538}
    539
    540enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
    541			struct dlm_lockstatus *lksb, int flags,
    542			const char *name, int namelen, dlm_astlockfunc_t *ast,
    543			void *data, dlm_bastlockfunc_t *bast)
    544{
    545	enum dlm_status status;
    546	struct dlm_lock_resource *res = NULL;
    547	struct dlm_lock *lock = NULL;
    548	int convert = 0, recovery = 0;
    549
    550	/* yes this function is a mess.
    551	 * TODO: clean this up.  lots of common code in the
    552	 *       lock and convert paths, especially in the retry blocks */
    553	if (!lksb) {
    554		dlm_error(DLM_BADARGS);
    555		return DLM_BADARGS;
    556	}
    557
    558	status = DLM_BADPARAM;
    559	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
    560		dlm_error(status);
    561		goto error;
    562	}
    563
    564	if (flags & ~LKM_VALID_FLAGS) {
    565		dlm_error(status);
    566		goto error;
    567	}
    568
    569	convert = (flags & LKM_CONVERT);
    570	recovery = (flags & LKM_RECOVERY);
    571
    572	if (recovery &&
    573	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
    574		dlm_error(status);
    575		goto error;
    576	}
    577	if (convert && (flags & LKM_LOCAL)) {
    578		mlog(ML_ERROR, "strange LOCAL convert request!\n");
    579		goto error;
    580	}
    581
    582	if (convert) {
    583		/* CONVERT request */
    584
    585		/* if converting, must pass in a valid dlm_lock */
    586		lock = lksb->lockid;
    587		if (!lock) {
    588			mlog(ML_ERROR, "NULL lock pointer in convert "
    589			     "request\n");
    590			goto error;
    591		}
    592
    593		res = lock->lockres;
    594		if (!res) {
    595			mlog(ML_ERROR, "NULL lockres pointer in convert "
    596			     "request\n");
    597			goto error;
    598		}
    599		dlm_lockres_get(res);
    600
    601		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
    602	 	 * static after the original lock call.  convert requests will
    603		 * ensure that everything is the same, or return DLM_BADARGS.
    604	 	 * this means that DLM_DENIED_NOASTS will never be returned.
    605	 	 */
    606		if (lock->lksb != lksb || lock->ast != ast ||
    607		    lock->bast != bast || lock->astdata != data) {
    608			status = DLM_BADARGS;
    609			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
    610			     "astdata=%p\n", lksb, ast, bast, data);
    611			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
    612			     "astdata=%p\n", lock->lksb, lock->ast,
    613			     lock->bast, lock->astdata);
    614			goto error;
    615		}
    616retry_convert:
    617		dlm_wait_for_recovery(dlm);
    618
    619		if (res->owner == dlm->node_num)
    620			status = dlmconvert_master(dlm, res, lock, flags, mode);
    621		else
    622			status = dlmconvert_remote(dlm, res, lock, flags, mode);
    623		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
    624		    status == DLM_FORWARD) {
    625			/* for now, see how this works without sleeping
    626			 * and just retry right away.  I suspect the reco
    627			 * or migration will complete fast enough that
    628			 * no waiting will be necessary */
    629			mlog(0, "retrying convert with migration/recovery/"
    630			     "in-progress\n");
    631			msleep(100);
    632			goto retry_convert;
    633		}
    634	} else {
    635		u64 tmpcookie;
    636
    637		/* LOCK request */
    638		status = DLM_BADARGS;
    639		if (!name) {
    640			dlm_error(status);
    641			goto error;
    642		}
    643
    644		status = DLM_IVBUFLEN;
    645		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
    646			dlm_error(status);
    647			goto error;
    648		}
    649
    650		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
    651		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
    652		if (!lock) {
    653			dlm_error(status);
    654			goto error;
    655		}
    656
    657		if (!recovery)
    658			dlm_wait_for_recovery(dlm);
    659
    660		/* find or create the lock resource */
    661		res = dlm_get_lock_resource(dlm, name, namelen, flags);
    662		if (!res) {
    663			status = DLM_IVLOCKID;
    664			dlm_error(status);
    665			goto error;
    666		}
    667
    668		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
    669		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
    670
    671		dlm_lock_attach_lockres(lock, res);
    672		lock->ast = ast;
    673		lock->bast = bast;
    674		lock->astdata = data;
    675
    676retry_lock:
    677		if (flags & LKM_VALBLK) {
    678			mlog(0, "LKM_VALBLK passed by caller\n");
    679
    680			/* LVB requests for non PR, PW or EX locks are
    681			 * ignored. */
    682			if (mode < LKM_PRMODE)
    683				flags &= ~LKM_VALBLK;
    684			else {
    685				flags |= LKM_GET_LVB;
    686				lock->lksb->flags |= DLM_LKSB_GET_LVB;
    687			}
    688		}
    689
    690		if (res->owner == dlm->node_num)
    691			status = dlmlock_master(dlm, res, lock, flags);
    692		else
    693			status = dlmlock_remote(dlm, res, lock, flags);
    694
    695		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
    696		    status == DLM_FORWARD) {
    697			msleep(100);
    698			if (recovery) {
    699				if (status != DLM_RECOVERING)
    700					goto retry_lock;
    701				/* wait to see the node go down, then
    702				 * drop down and allow the lockres to
    703				 * get cleaned up.  need to remaster. */
    704				dlm_wait_for_node_death(dlm, res->owner,
    705						DLM_NODE_DEATH_WAIT_MAX);
    706			} else {
    707				dlm_wait_for_recovery(dlm);
    708				goto retry_lock;
    709			}
    710		}
    711
    712		/* Inflight taken in dlm_get_lock_resource() is dropped here */
    713		spin_lock(&res->spinlock);
    714		dlm_lockres_drop_inflight_ref(dlm, res);
    715		spin_unlock(&res->spinlock);
    716
    717		dlm_lockres_calc_usage(dlm, res);
    718		dlm_kick_thread(dlm, res);
    719
    720		if (status != DLM_NORMAL) {
    721			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
    722			if (status != DLM_NOTQUEUED)
    723				dlm_error(status);
    724			goto error;
    725		}
    726	}
    727
    728error:
    729	if (status != DLM_NORMAL) {
    730		if (lock && !convert)
    731			dlm_lock_put(lock);
    732		// this is kind of unnecessary
    733		lksb->status = status;
    734	}
    735
    736	/* put lockres ref from the convert path
    737	 * or from dlm_get_lock_resource */
    738	if (res)
    739		dlm_lockres_put(res);
    740
    741	return status;
    742}
    743EXPORT_SYMBOL_GPL(dlmlock);