cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

dlmglue.c (128654B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * dlmglue.c
      4 *
      5 * Code which implements an OCFS2 specific interface to our DLM.
      6 *
      7 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
      8 */
      9
     10#include <linux/types.h>
     11#include <linux/slab.h>
     12#include <linux/highmem.h>
     13#include <linux/mm.h>
     14#include <linux/kthread.h>
     15#include <linux/pagemap.h>
     16#include <linux/debugfs.h>
     17#include <linux/seq_file.h>
     18#include <linux/time.h>
     19#include <linux/delay.h>
     20#include <linux/quotaops.h>
     21#include <linux/sched/signal.h>
     22
     23#define MLOG_MASK_PREFIX ML_DLM_GLUE
     24#include <cluster/masklog.h>
     25
     26#include "ocfs2.h"
     27#include "ocfs2_lockingver.h"
     28
     29#include "alloc.h"
     30#include "dcache.h"
     31#include "dlmglue.h"
     32#include "extent_map.h"
     33#include "file.h"
     34#include "heartbeat.h"
     35#include "inode.h"
     36#include "journal.h"
     37#include "stackglue.h"
     38#include "slot_map.h"
     39#include "super.h"
     40#include "uptodate.h"
     41#include "quota.h"
     42#include "refcounttree.h"
     43#include "acl.h"
     44
     45#include "buffer_head_io.h"
     46
     47struct ocfs2_mask_waiter {
     48	struct list_head	mw_item;
     49	int			mw_status;
     50	struct completion	mw_complete;
     51	unsigned long		mw_mask;
     52	unsigned long		mw_goal;
     53#ifdef CONFIG_OCFS2_FS_STATS
     54	ktime_t			mw_lock_start;
     55#endif
     56};
     57
     58static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
     59static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
     60static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
     61static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
     62
     63/*
     64 * Return value from ->downconvert_worker functions.
     65 *
     66 * These control the precise actions of ocfs2_unblock_lock()
     67 * and ocfs2_process_blocked_lock()
     68 *
     69 */
     70enum ocfs2_unblock_action {
     71	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
     72	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
     73				      * ->post_unlock callback */
     74	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
     75				      * ->post_unlock() callback. */
     76};
     77
     78struct ocfs2_unblock_ctl {
     79	int requeue;
     80	enum ocfs2_unblock_action unblock_action;
     81};
     82
     83/* Lockdep class keys */
     84#ifdef CONFIG_DEBUG_LOCK_ALLOC
     85static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
     86#endif
     87
     88static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
     89					int new_level);
     90static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
     91
     92static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
     93				     int blocking);
     94
     95static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
     96				       int blocking);
     97
     98static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
     99				     struct ocfs2_lock_res *lockres);
    100
    101static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
    102
    103static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
    104					    int new_level);
    105static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
    106					 int blocking);
    107
    108#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
    109
    110/* This aids in debugging situations where a bad LVB might be involved. */
    111static void ocfs2_dump_meta_lvb_info(u64 level,
    112				     const char *function,
    113				     unsigned int line,
    114				     struct ocfs2_lock_res *lockres)
    115{
    116	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
    117
    118	mlog(level, "LVB information for %s (called from %s:%u):\n",
    119	     lockres->l_name, function, line);
    120	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
    121	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
    122	     be32_to_cpu(lvb->lvb_igeneration));
    123	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
    124	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
    125	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
    126	     be16_to_cpu(lvb->lvb_imode));
    127	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
    128	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
    129	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
    130	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
    131	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
    132	     be32_to_cpu(lvb->lvb_iattr));
    133}
    134
    135
    136/*
    137 * OCFS2 Lock Resource Operations
    138 *
    139 * These fine tune the behavior of the generic dlmglue locking infrastructure.
    140 *
    141 * The most basic of lock types can point ->l_priv to their respective
    142 * struct ocfs2_super and allow the default actions to manage things.
    143 *
    144 * Right now, each lock type also needs to implement an init function,
    145 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
    146 * should be called when the lock is no longer needed (i.e., object
    147 * destruction time).
    148 */
    149struct ocfs2_lock_res_ops {
    150	/*
    151	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
    152	 * this callback if ->l_priv is not an ocfs2_super pointer
    153	 */
    154	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
    155
    156	/*
    157	 * Optionally called in the downconvert thread after a
    158	 * successful downconvert. The lockres will not be referenced
    159	 * after this callback is called, so it is safe to free
    160	 * memory, etc.
    161	 *
    162	 * The exact semantics of when this is called are controlled
    163	 * by ->downconvert_worker()
    164	 */
    165	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
    166
    167	/*
    168	 * Allow a lock type to add checks to determine whether it is
    169	 * safe to downconvert a lock. Return 0 to re-queue the
    170	 * downconvert at a later time, nonzero to continue.
    171	 *
    172	 * For most locks, the default checks that there are no
    173	 * incompatible holders are sufficient.
    174	 *
    175	 * Called with the lockres spinlock held.
    176	 */
    177	int (*check_downconvert)(struct ocfs2_lock_res *, int);
    178
    179	/*
    180	 * Allows a lock type to populate the lock value block. This
    181	 * is called on downconvert, and when we drop a lock.
    182	 *
    183	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
    184	 * in the flags field.
    185	 *
    186	 * Called with the lockres spinlock held.
    187	 */
    188	void (*set_lvb)(struct ocfs2_lock_res *);
    189
    190	/*
    191	 * Called from the downconvert thread when it is determined
    192	 * that a lock will be downconverted. This is called without
    193	 * any locks held so the function can do work that might
    194	 * schedule (syncing out data, etc).
    195	 *
    196	 * This should return any one of the ocfs2_unblock_action
    197	 * values, depending on what it wants the thread to do.
    198	 */
    199	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
    200
    201	/*
    202	 * LOCK_TYPE_* flags which describe the specific requirements
    203	 * of a lock type. Descriptions of each individual flag follow.
    204	 */
    205	int flags;
    206};
    207
    208/*
    209 * Some locks want to "refresh" potentially stale data when a
    210 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
    211 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
    212 * individual lockres l_flags member from the ast function. It is
    213 * expected that the locking wrapper will clear the
    214 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
    215 */
    216#define LOCK_TYPE_REQUIRES_REFRESH 0x1
    217
    218/*
    219 * Indicate that a lock type makes use of the lock value block. The
    220 * ->set_lvb lock type callback must be defined.
    221 */
    222#define LOCK_TYPE_USES_LVB		0x2
    223
    224static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
    225	.get_osb	= ocfs2_get_inode_osb,
    226	.flags		= 0,
    227};
    228
    229static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
    230	.get_osb	= ocfs2_get_inode_osb,
    231	.check_downconvert = ocfs2_check_meta_downconvert,
    232	.set_lvb	= ocfs2_set_meta_lvb,
    233	.downconvert_worker = ocfs2_data_convert_worker,
    234	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
    235};
    236
    237static struct ocfs2_lock_res_ops ocfs2_super_lops = {
    238	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
    239};
    240
    241static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
    242	.flags		= 0,
    243};
    244
    245static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
    246	.flags		= 0,
    247};
    248
    249static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
    250	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
    251};
    252
    253static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
    254	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
    255};
    256
    257static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
    258	.get_osb	= ocfs2_get_dentry_osb,
    259	.post_unlock	= ocfs2_dentry_post_unlock,
    260	.downconvert_worker = ocfs2_dentry_convert_worker,
    261	.flags		= 0,
    262};
    263
    264static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
    265	.get_osb	= ocfs2_get_inode_osb,
    266	.flags		= 0,
    267};
    268
    269static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
    270	.get_osb	= ocfs2_get_file_osb,
    271	.flags		= 0,
    272};
    273
    274static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
    275	.set_lvb	= ocfs2_set_qinfo_lvb,
    276	.get_osb	= ocfs2_get_qinfo_osb,
    277	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
    278};
    279
    280static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
    281	.check_downconvert = ocfs2_check_refcount_downconvert,
    282	.downconvert_worker = ocfs2_refcount_convert_worker,
    283	.flags		= 0,
    284};
    285
    286static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
    287{
    288	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
    289		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
    290		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
    291}
    292
    293static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
    294{
    295	return container_of(lksb, struct ocfs2_lock_res, l_lksb);
    296}
    297
    298static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
    299{
    300	BUG_ON(!ocfs2_is_inode_lock(lockres));
    301
    302	return (struct inode *) lockres->l_priv;
    303}
    304
    305static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
    306{
    307	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
    308
    309	return (struct ocfs2_dentry_lock *)lockres->l_priv;
    310}
    311
    312static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
    313{
    314	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
    315
    316	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
    317}
    318
    319static inline struct ocfs2_refcount_tree *
    320ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
    321{
    322	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
    323}
    324
    325static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
    326{
    327	if (lockres->l_ops->get_osb)
    328		return lockres->l_ops->get_osb(lockres);
    329
    330	return (struct ocfs2_super *)lockres->l_priv;
    331}
    332
    333static int ocfs2_lock_create(struct ocfs2_super *osb,
    334			     struct ocfs2_lock_res *lockres,
    335			     int level,
    336			     u32 dlm_flags);
    337static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
    338						     int wanted);
    339static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
    340				   struct ocfs2_lock_res *lockres,
    341				   int level, unsigned long caller_ip);
    342static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
    343					struct ocfs2_lock_res *lockres,
    344					int level)
    345{
    346	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
    347}
    348
    349static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
    350static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
    351static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
    352static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
    353static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
    354					struct ocfs2_lock_res *lockres);
    355static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
    356						int convert);
    357#define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
    358	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
    359		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
    360		     _err, _func, _lockres->l_name);					\
    361	else										\
    362		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
    363		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
    364		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
    365} while (0)
    366static int ocfs2_downconvert_thread(void *arg);
    367static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
    368					struct ocfs2_lock_res *lockres);
    369static int ocfs2_inode_lock_update(struct inode *inode,
    370				  struct buffer_head **bh);
    371static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
    372static inline int ocfs2_highest_compat_lock_level(int level);
    373static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
    374					      int new_level);
    375static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
    376				  struct ocfs2_lock_res *lockres,
    377				  int new_level,
    378				  int lvb,
    379				  unsigned int generation);
    380static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
    381				        struct ocfs2_lock_res *lockres);
    382static int ocfs2_cancel_convert(struct ocfs2_super *osb,
    383				struct ocfs2_lock_res *lockres);
    384
    385
    386static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
    387				  u64 blkno,
    388				  u32 generation,
    389				  char *name)
    390{
    391	int len;
    392
    393	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
    394
    395	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
    396		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
    397		       (long long)blkno, generation);
    398
    399	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
    400
    401	mlog(0, "built lock resource with name: %s\n", name);
    402}
    403
    404static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
    405
    406static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
    407				       struct ocfs2_dlm_debug *dlm_debug)
    408{
    409	mlog(0, "Add tracking for lockres %s\n", res->l_name);
    410
    411	spin_lock(&ocfs2_dlm_tracking_lock);
    412	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
    413	spin_unlock(&ocfs2_dlm_tracking_lock);
    414}
    415
    416static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
    417{
    418	spin_lock(&ocfs2_dlm_tracking_lock);
    419	if (!list_empty(&res->l_debug_list))
    420		list_del_init(&res->l_debug_list);
    421	spin_unlock(&ocfs2_dlm_tracking_lock);
    422}
    423
    424#ifdef CONFIG_OCFS2_FS_STATS
    425static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
    426{
    427	res->l_lock_refresh = 0;
    428	res->l_lock_wait = 0;
    429	memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
    430	memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
    431}
    432
    433static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
    434				    struct ocfs2_mask_waiter *mw, int ret)
    435{
    436	u32 usec;
    437	ktime_t kt;
    438	struct ocfs2_lock_stats *stats;
    439
    440	if (level == LKM_PRMODE)
    441		stats = &res->l_lock_prmode;
    442	else if (level == LKM_EXMODE)
    443		stats = &res->l_lock_exmode;
    444	else
    445		return;
    446
    447	kt = ktime_sub(ktime_get(), mw->mw_lock_start);
    448	usec = ktime_to_us(kt);
    449
    450	stats->ls_gets++;
    451	stats->ls_total += ktime_to_ns(kt);
    452	/* overflow */
    453	if (unlikely(stats->ls_gets == 0)) {
    454		stats->ls_gets++;
    455		stats->ls_total = ktime_to_ns(kt);
    456	}
    457
    458	if (stats->ls_max < usec)
    459		stats->ls_max = usec;
    460
    461	if (ret)
    462		stats->ls_fail++;
    463
    464	stats->ls_last = ktime_to_us(ktime_get_real());
    465}
    466
    467static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
    468{
    469	lockres->l_lock_refresh++;
    470}
    471
    472static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
    473{
    474	struct ocfs2_mask_waiter *mw;
    475
    476	if (list_empty(&lockres->l_mask_waiters)) {
    477		lockres->l_lock_wait = 0;
    478		return;
    479	}
    480
    481	mw = list_first_entry(&lockres->l_mask_waiters,
    482				struct ocfs2_mask_waiter, mw_item);
    483	lockres->l_lock_wait =
    484			ktime_to_us(ktime_mono_to_real(mw->mw_lock_start));
    485}
    486
    487static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
    488{
    489	mw->mw_lock_start = ktime_get();
    490}
    491#else
    492static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
    493{
    494}
    495static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
    496			   int level, struct ocfs2_mask_waiter *mw, int ret)
    497{
    498}
    499static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
    500{
    501}
    502static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
    503{
    504}
    505static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
    506{
    507}
    508#endif
    509
    510static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
    511				       struct ocfs2_lock_res *res,
    512				       enum ocfs2_lock_type type,
    513				       struct ocfs2_lock_res_ops *ops,
    514				       void *priv)
    515{
    516	res->l_type          = type;
    517	res->l_ops           = ops;
    518	res->l_priv          = priv;
    519
    520	res->l_level         = DLM_LOCK_IV;
    521	res->l_requested     = DLM_LOCK_IV;
    522	res->l_blocking      = DLM_LOCK_IV;
    523	res->l_action        = OCFS2_AST_INVALID;
    524	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
    525
    526	res->l_flags         = OCFS2_LOCK_INITIALIZED;
    527
    528	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
    529
    530	ocfs2_init_lock_stats(res);
    531#ifdef CONFIG_DEBUG_LOCK_ALLOC
    532	if (type != OCFS2_LOCK_TYPE_OPEN)
    533		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
    534				 &lockdep_keys[type], 0);
    535	else
    536		res->l_lockdep_map.key = NULL;
    537#endif
    538}
    539
    540void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
    541{
    542	/* This also clears out the lock status block */
    543	memset(res, 0, sizeof(struct ocfs2_lock_res));
    544	spin_lock_init(&res->l_lock);
    545	init_waitqueue_head(&res->l_event);
    546	INIT_LIST_HEAD(&res->l_blocked_list);
    547	INIT_LIST_HEAD(&res->l_mask_waiters);
    548	INIT_LIST_HEAD(&res->l_holders);
    549}
    550
    551void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
    552			       enum ocfs2_lock_type type,
    553			       unsigned int generation,
    554			       struct inode *inode)
    555{
    556	struct ocfs2_lock_res_ops *ops;
    557
    558	switch(type) {
    559		case OCFS2_LOCK_TYPE_RW:
    560			ops = &ocfs2_inode_rw_lops;
    561			break;
    562		case OCFS2_LOCK_TYPE_META:
    563			ops = &ocfs2_inode_inode_lops;
    564			break;
    565		case OCFS2_LOCK_TYPE_OPEN:
    566			ops = &ocfs2_inode_open_lops;
    567			break;
    568		default:
    569			mlog_bug_on_msg(1, "type: %d\n", type);
    570			ops = NULL; /* thanks, gcc */
    571			break;
    572	}
    573
    574	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
    575			      generation, res->l_name);
    576	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
    577}
    578
    579static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
    580{
    581	struct inode *inode = ocfs2_lock_res_inode(lockres);
    582
    583	return OCFS2_SB(inode->i_sb);
    584}
    585
    586static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
    587{
    588	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
    589
    590	return OCFS2_SB(info->dqi_gi.dqi_sb);
    591}
    592
    593static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
    594{
    595	struct ocfs2_file_private *fp = lockres->l_priv;
    596
    597	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
    598}
    599
    600static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
    601{
    602	__be64 inode_blkno_be;
    603
    604	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
    605	       sizeof(__be64));
    606
    607	return be64_to_cpu(inode_blkno_be);
    608}
    609
    610static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
    611{
    612	struct ocfs2_dentry_lock *dl = lockres->l_priv;
    613
    614	return OCFS2_SB(dl->dl_inode->i_sb);
    615}
    616
    617void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
    618				u64 parent, struct inode *inode)
    619{
    620	int len;
    621	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
    622	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
    623	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
    624
    625	ocfs2_lock_res_init_once(lockres);
    626
    627	/*
    628	 * Unfortunately, the standard lock naming scheme won't work
    629	 * here because we have two 16 byte values to use. Instead,
    630	 * we'll stuff the inode number as a binary value. We still
    631	 * want error prints to show something without garbling the
    632	 * display, so drop a null byte in there before the inode
    633	 * number. A future version of OCFS2 will likely use all
    634	 * binary lock names. The stringified names have been a
    635	 * tremendous aid in debugging, but now that the debugfs
    636	 * interface exists, we can mangle things there if need be.
    637	 *
    638	 * NOTE: We also drop the standard "pad" value (the total lock
    639	 * name size stays the same though - the last part is all
    640	 * zeros due to the memset in ocfs2_lock_res_init_once()
    641	 */
    642	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
    643		       "%c%016llx",
    644		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
    645		       (long long)parent);
    646
    647	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
    648
    649	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
    650	       sizeof(__be64));
    651
    652	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
    653				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
    654				   dl);
    655}
    656
    657static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
    658				      struct ocfs2_super *osb)
    659{
    660	/* Superblock lockres doesn't come from a slab so we call init
    661	 * once on it manually.  */
    662	ocfs2_lock_res_init_once(res);
    663	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
    664			      0, res->l_name);
    665	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
    666				   &ocfs2_super_lops, osb);
    667}
    668
    669static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
    670				       struct ocfs2_super *osb)
    671{
    672	/* Rename lockres doesn't come from a slab so we call init
    673	 * once on it manually.  */
    674	ocfs2_lock_res_init_once(res);
    675	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
    676	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
    677				   &ocfs2_rename_lops, osb);
    678}
    679
    680static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
    681					 struct ocfs2_super *osb)
    682{
    683	/* nfs_sync lockres doesn't come from a slab so we call init
    684	 * once on it manually.  */
    685	ocfs2_lock_res_init_once(res);
    686	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
    687	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
    688				   &ocfs2_nfs_sync_lops, osb);
    689}
    690
    691static void ocfs2_nfs_sync_lock_init(struct ocfs2_super *osb)
    692{
    693	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
    694	init_rwsem(&osb->nfs_sync_rwlock);
    695}
    696
    697void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
    698{
    699	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
    700
    701	/* Only one trimfs thread are allowed to work at the same time. */
    702	mutex_lock(&osb->obs_trim_fs_mutex);
    703
    704	ocfs2_lock_res_init_once(lockres);
    705	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
    706	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
    707				   &ocfs2_trim_fs_lops, osb);
    708}
    709
    710void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
    711{
    712	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
    713
    714	ocfs2_simple_drop_lockres(osb, lockres);
    715	ocfs2_lock_res_free(lockres);
    716
    717	mutex_unlock(&osb->obs_trim_fs_mutex);
    718}
    719
    720static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
    721					    struct ocfs2_super *osb)
    722{
    723	ocfs2_lock_res_init_once(res);
    724	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
    725	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
    726				   &ocfs2_orphan_scan_lops, osb);
    727}
    728
    729void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
    730			      struct ocfs2_file_private *fp)
    731{
    732	struct inode *inode = fp->fp_file->f_mapping->host;
    733	struct ocfs2_inode_info *oi = OCFS2_I(inode);
    734
    735	ocfs2_lock_res_init_once(lockres);
    736	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
    737			      inode->i_generation, lockres->l_name);
    738	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
    739				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
    740				   fp);
    741	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
    742}
    743
    744void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
    745			       struct ocfs2_mem_dqinfo *info)
    746{
    747	ocfs2_lock_res_init_once(lockres);
    748	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
    749			      0, lockres->l_name);
    750	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
    751				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
    752				   info);
    753}
    754
    755void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
    756				  struct ocfs2_super *osb, u64 ref_blkno,
    757				  unsigned int generation)
    758{
    759	ocfs2_lock_res_init_once(lockres);
    760	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
    761			      generation, lockres->l_name);
    762	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
    763				   &ocfs2_refcount_block_lops, osb);
    764}
    765
    766void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
    767{
    768	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
    769		return;
    770
    771	ocfs2_remove_lockres_tracking(res);
    772
    773	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
    774			"Lockres %s is on the blocked list\n",
    775			res->l_name);
    776	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
    777			"Lockres %s has mask waiters pending\n",
    778			res->l_name);
    779	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
    780			"Lockres %s is locked\n",
    781			res->l_name);
    782	mlog_bug_on_msg(res->l_ro_holders,
    783			"Lockres %s has %u ro holders\n",
    784			res->l_name, res->l_ro_holders);
    785	mlog_bug_on_msg(res->l_ex_holders,
    786			"Lockres %s has %u ex holders\n",
    787			res->l_name, res->l_ex_holders);
    788
    789	/* Need to clear out the lock status block for the dlm */
    790	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
    791
    792	res->l_flags = 0UL;
    793}
    794
    795/*
    796 * Keep a list of processes who have interest in a lockres.
    797 * Note: this is now only uesed for check recursive cluster locking.
    798 */
    799static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
    800				   struct ocfs2_lock_holder *oh)
    801{
    802	INIT_LIST_HEAD(&oh->oh_list);
    803	oh->oh_owner_pid = get_pid(task_pid(current));
    804
    805	spin_lock(&lockres->l_lock);
    806	list_add_tail(&oh->oh_list, &lockres->l_holders);
    807	spin_unlock(&lockres->l_lock);
    808}
    809
    810static struct ocfs2_lock_holder *
    811ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
    812		struct pid *pid)
    813{
    814	struct ocfs2_lock_holder *oh;
    815
    816	spin_lock(&lockres->l_lock);
    817	list_for_each_entry(oh, &lockres->l_holders, oh_list) {
    818		if (oh->oh_owner_pid == pid) {
    819			spin_unlock(&lockres->l_lock);
    820			return oh;
    821		}
    822	}
    823	spin_unlock(&lockres->l_lock);
    824	return NULL;
    825}
    826
    827static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
    828				       struct ocfs2_lock_holder *oh)
    829{
    830	spin_lock(&lockres->l_lock);
    831	list_del(&oh->oh_list);
    832	spin_unlock(&lockres->l_lock);
    833
    834	put_pid(oh->oh_owner_pid);
    835}
    836
    837
    838static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
    839				     int level)
    840{
    841	BUG_ON(!lockres);
    842
    843	switch(level) {
    844	case DLM_LOCK_EX:
    845		lockres->l_ex_holders++;
    846		break;
    847	case DLM_LOCK_PR:
    848		lockres->l_ro_holders++;
    849		break;
    850	default:
    851		BUG();
    852	}
    853}
    854
    855static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
    856				     int level)
    857{
    858	BUG_ON(!lockres);
    859
    860	switch(level) {
    861	case DLM_LOCK_EX:
    862		BUG_ON(!lockres->l_ex_holders);
    863		lockres->l_ex_holders--;
    864		break;
    865	case DLM_LOCK_PR:
    866		BUG_ON(!lockres->l_ro_holders);
    867		lockres->l_ro_holders--;
    868		break;
    869	default:
    870		BUG();
    871	}
    872}
    873
    874/* WARNING: This function lives in a world where the only three lock
    875 * levels are EX, PR, and NL. It *will* have to be adjusted when more
    876 * lock types are added. */
    877static inline int ocfs2_highest_compat_lock_level(int level)
    878{
    879	int new_level = DLM_LOCK_EX;
    880
    881	if (level == DLM_LOCK_EX)
    882		new_level = DLM_LOCK_NL;
    883	else if (level == DLM_LOCK_PR)
    884		new_level = DLM_LOCK_PR;
    885	return new_level;
    886}
    887
    888static void lockres_set_flags(struct ocfs2_lock_res *lockres,
    889			      unsigned long newflags)
    890{
    891	struct ocfs2_mask_waiter *mw, *tmp;
    892
    893 	assert_spin_locked(&lockres->l_lock);
    894
    895	lockres->l_flags = newflags;
    896
    897	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
    898		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
    899			continue;
    900
    901		list_del_init(&mw->mw_item);
    902		mw->mw_status = 0;
    903		complete(&mw->mw_complete);
    904		ocfs2_track_lock_wait(lockres);
    905	}
    906}
    907static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
    908{
    909	lockres_set_flags(lockres, lockres->l_flags | or);
    910}
    911static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
    912				unsigned long clear)
    913{
    914	lockres_set_flags(lockres, lockres->l_flags & ~clear);
    915}
    916
    917static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
    918{
    919	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
    920	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
    921	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
    922	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
    923
    924	lockres->l_level = lockres->l_requested;
    925	if (lockres->l_level <=
    926	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
    927		lockres->l_blocking = DLM_LOCK_NL;
    928		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
    929	}
    930	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
    931}
    932
    933static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
    934{
    935	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
    936	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
    937
    938	/* Convert from RO to EX doesn't really need anything as our
    939	 * information is already up to data. Convert from NL to
    940	 * *anything* however should mark ourselves as needing an
    941	 * update */
    942	if (lockres->l_level == DLM_LOCK_NL &&
    943	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
    944		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
    945
    946	lockres->l_level = lockres->l_requested;
    947
    948	/*
    949	 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
    950	 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
    951	 * downconverting the lock before the upconvert has fully completed.
    952	 * Do not prevent the dc thread from downconverting if NONBLOCK lock
    953	 * had already returned.
    954	 */
    955	if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
    956		lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
    957	else
    958		lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
    959
    960	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
    961}
    962
    963static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
    964{
    965	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
    966	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
    967
    968	if (lockres->l_requested > DLM_LOCK_NL &&
    969	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
    970	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
    971		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
    972
    973	lockres->l_level = lockres->l_requested;
    974	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
    975	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
    976}
    977
    978static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
    979				     int level)
    980{
    981	int needs_downconvert = 0;
    982
    983	assert_spin_locked(&lockres->l_lock);
    984
    985	if (level > lockres->l_blocking) {
    986		/* only schedule a downconvert if we haven't already scheduled
    987		 * one that goes low enough to satisfy the level we're
    988		 * blocking.  this also catches the case where we get
    989		 * duplicate BASTs */
    990		if (ocfs2_highest_compat_lock_level(level) <
    991		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
    992			needs_downconvert = 1;
    993
    994		lockres->l_blocking = level;
    995	}
    996
    997	mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
    998	     lockres->l_name, level, lockres->l_level, lockres->l_blocking,
    999	     needs_downconvert);
   1000
   1001	if (needs_downconvert)
   1002		lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
   1003	mlog(0, "needs_downconvert = %d\n", needs_downconvert);
   1004	return needs_downconvert;
   1005}
   1006
   1007/*
   1008 * OCFS2_LOCK_PENDING and l_pending_gen.
   1009 *
   1010 * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
   1011 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
   1012 * for more details on the race.
   1013 *
   1014 * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
   1015 * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
   1016 * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
   1017 * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
   1018 * the caller is going to try to clear PENDING again.  If nothing else is
   1019 * happening, __lockres_clear_pending() sees PENDING is unset and does
   1020 * nothing.
   1021 *
   1022 * But what if another path (eg downconvert thread) has just started a
   1023 * new locking action?  The other path has re-set PENDING.  Our path
   1024 * cannot clear PENDING, because that will re-open the original race
   1025 * window.
   1026 *
   1027 * [Example]
   1028 *
   1029 * ocfs2_meta_lock()
   1030 *  ocfs2_cluster_lock()
   1031 *   set BUSY
   1032 *   set PENDING
   1033 *   drop l_lock
   1034 *   ocfs2_dlm_lock()
   1035 *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
   1036 *     clear PENDING			 ocfs2_unblock_lock()
   1037 *					  take_l_lock
   1038 *					  !BUSY
   1039 *					  ocfs2_prepare_downconvert()
   1040 *					   set BUSY
   1041 *					   set PENDING
   1042 *					  drop l_lock
   1043 *   take l_lock
   1044 *   clear PENDING
   1045 *   drop l_lock
   1046 *			<window>
   1047 *					  ocfs2_dlm_lock()
   1048 *
   1049 * So as you can see, we now have a window where l_lock is not held,
   1050 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
   1051 *
   1052 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
   1053 * set by ocfs2_prepare_downconvert().  That wasn't nice.
   1054 *
   1055 * To solve this we introduce l_pending_gen.  A call to
   1056 * lockres_clear_pending() will only do so when it is passed a generation
   1057 * number that matches the lockres.  lockres_set_pending() will return the
   1058 * current generation number.  When ocfs2_cluster_lock() goes to clear
   1059 * PENDING, it passes the generation it got from set_pending().  In our
   1060 * example above, the generation numbers will *not* match.  Thus,
   1061 * ocfs2_cluster_lock() will not clear the PENDING set by
   1062 * ocfs2_prepare_downconvert().
   1063 */
   1064
   1065/* Unlocked version for ocfs2_locking_ast() */
   1066static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
   1067				    unsigned int generation,
   1068				    struct ocfs2_super *osb)
   1069{
   1070	assert_spin_locked(&lockres->l_lock);
   1071
   1072	/*
   1073	 * The ast and locking functions can race us here.  The winner
   1074	 * will clear pending, the loser will not.
   1075	 */
   1076	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
   1077	    (lockres->l_pending_gen != generation))
   1078		return;
   1079
   1080	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
   1081	lockres->l_pending_gen++;
   1082
   1083	/*
   1084	 * The downconvert thread may have skipped us because we
   1085	 * were PENDING.  Wake it up.
   1086	 */
   1087	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
   1088		ocfs2_wake_downconvert_thread(osb);
   1089}
   1090
   1091/* Locked version for callers of ocfs2_dlm_lock() */
   1092static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
   1093				  unsigned int generation,
   1094				  struct ocfs2_super *osb)
   1095{
   1096	unsigned long flags;
   1097
   1098	spin_lock_irqsave(&lockres->l_lock, flags);
   1099	__lockres_clear_pending(lockres, generation, osb);
   1100	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1101}
   1102
   1103static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
   1104{
   1105	assert_spin_locked(&lockres->l_lock);
   1106	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
   1107
   1108	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
   1109
   1110	return lockres->l_pending_gen;
   1111}
   1112
   1113static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
   1114{
   1115	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
   1116	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
   1117	int needs_downconvert;
   1118	unsigned long flags;
   1119
   1120	BUG_ON(level <= DLM_LOCK_NL);
   1121
   1122	mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
   1123	     "type %s\n", lockres->l_name, level, lockres->l_level,
   1124	     ocfs2_lock_type_string(lockres->l_type));
   1125
   1126	/*
   1127	 * We can skip the bast for locks which don't enable caching -
   1128	 * they'll be dropped at the earliest possible time anyway.
   1129	 */
   1130	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
   1131		return;
   1132
   1133	spin_lock_irqsave(&lockres->l_lock, flags);
   1134	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
   1135	if (needs_downconvert)
   1136		ocfs2_schedule_blocked_lock(osb, lockres);
   1137	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1138
   1139	wake_up(&lockres->l_event);
   1140
   1141	ocfs2_wake_downconvert_thread(osb);
   1142}
   1143
   1144static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
   1145{
   1146	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
   1147	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
   1148	unsigned long flags;
   1149	int status;
   1150
   1151	spin_lock_irqsave(&lockres->l_lock, flags);
   1152
   1153	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
   1154
   1155	if (status == -EAGAIN) {
   1156		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
   1157		goto out;
   1158	}
   1159
   1160	if (status) {
   1161		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
   1162		     lockres->l_name, status);
   1163		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1164		return;
   1165	}
   1166
   1167	mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
   1168	     "level %d => %d\n", lockres->l_name, lockres->l_action,
   1169	     lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
   1170
   1171	switch(lockres->l_action) {
   1172	case OCFS2_AST_ATTACH:
   1173		ocfs2_generic_handle_attach_action(lockres);
   1174		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
   1175		break;
   1176	case OCFS2_AST_CONVERT:
   1177		ocfs2_generic_handle_convert_action(lockres);
   1178		break;
   1179	case OCFS2_AST_DOWNCONVERT:
   1180		ocfs2_generic_handle_downconvert_action(lockres);
   1181		break;
   1182	default:
   1183		mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
   1184		     "flags 0x%lx, unlock: %u\n",
   1185		     lockres->l_name, lockres->l_action, lockres->l_flags,
   1186		     lockres->l_unlock_action);
   1187		BUG();
   1188	}
   1189out:
   1190	/* set it to something invalid so if we get called again we
   1191	 * can catch it. */
   1192	lockres->l_action = OCFS2_AST_INVALID;
   1193
   1194	/* Did we try to cancel this lock?  Clear that state */
   1195	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
   1196		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
   1197
   1198	/*
   1199	 * We may have beaten the locking functions here.  We certainly
   1200	 * know that dlm_lock() has been called :-)
   1201	 * Because we can't have two lock calls in flight at once, we
   1202	 * can use lockres->l_pending_gen.
   1203	 */
   1204	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
   1205
   1206	wake_up(&lockres->l_event);
   1207	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1208}
   1209
   1210static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
   1211{
   1212	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
   1213	unsigned long flags;
   1214
   1215	mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
   1216	     lockres->l_name, lockres->l_unlock_action);
   1217
   1218	spin_lock_irqsave(&lockres->l_lock, flags);
   1219	if (error) {
   1220		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
   1221		     "unlock_action %d\n", error, lockres->l_name,
   1222		     lockres->l_unlock_action);
   1223		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1224		return;
   1225	}
   1226
   1227	switch(lockres->l_unlock_action) {
   1228	case OCFS2_UNLOCK_CANCEL_CONVERT:
   1229		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
   1230		lockres->l_action = OCFS2_AST_INVALID;
   1231		/* Downconvert thread may have requeued this lock, we
   1232		 * need to wake it. */
   1233		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
   1234			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
   1235		break;
   1236	case OCFS2_UNLOCK_DROP_LOCK:
   1237		lockres->l_level = DLM_LOCK_IV;
   1238		break;
   1239	default:
   1240		BUG();
   1241	}
   1242
   1243	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
   1244	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
   1245	wake_up(&lockres->l_event);
   1246	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1247}
   1248
   1249/*
   1250 * This is the filesystem locking protocol.  It provides the lock handling
   1251 * hooks for the underlying DLM.  It has a maximum version number.
   1252 * The version number allows interoperability with systems running at
   1253 * the same major number and an equal or smaller minor number.
   1254 *
   1255 * Whenever the filesystem does new things with locks (adds or removes a
   1256 * lock, orders them differently, does different things underneath a lock),
   1257 * the version must be changed.  The protocol is negotiated when joining
   1258 * the dlm domain.  A node may join the domain if its major version is
   1259 * identical to all other nodes and its minor version is greater than
   1260 * or equal to all other nodes.  When its minor version is greater than
   1261 * the other nodes, it will run at the minor version specified by the
   1262 * other nodes.
   1263 *
   1264 * If a locking change is made that will not be compatible with older
   1265 * versions, the major number must be increased and the minor version set
   1266 * to zero.  If a change merely adds a behavior that can be disabled when
   1267 * speaking to older versions, the minor version must be increased.  If a
   1268 * change adds a fully backwards compatible change (eg, LVB changes that
   1269 * are just ignored by older versions), the version does not need to be
   1270 * updated.
   1271 */
   1272static struct ocfs2_locking_protocol lproto = {
   1273	.lp_max_version = {
   1274		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
   1275		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
   1276	},
   1277	.lp_lock_ast		= ocfs2_locking_ast,
   1278	.lp_blocking_ast	= ocfs2_blocking_ast,
   1279	.lp_unlock_ast		= ocfs2_unlock_ast,
   1280};
   1281
   1282void ocfs2_set_locking_protocol(void)
   1283{
   1284	ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
   1285}
   1286
   1287static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
   1288						int convert)
   1289{
   1290	unsigned long flags;
   1291
   1292	spin_lock_irqsave(&lockres->l_lock, flags);
   1293	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
   1294	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
   1295	if (convert)
   1296		lockres->l_action = OCFS2_AST_INVALID;
   1297	else
   1298		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
   1299	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1300
   1301	wake_up(&lockres->l_event);
   1302}
   1303
   1304/* Note: If we detect another process working on the lock (i.e.,
   1305 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
   1306 * to do the right thing in that case.
   1307 */
   1308static int ocfs2_lock_create(struct ocfs2_super *osb,
   1309			     struct ocfs2_lock_res *lockres,
   1310			     int level,
   1311			     u32 dlm_flags)
   1312{
   1313	int ret = 0;
   1314	unsigned long flags;
   1315	unsigned int gen;
   1316
   1317	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
   1318	     dlm_flags);
   1319
   1320	spin_lock_irqsave(&lockres->l_lock, flags);
   1321	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
   1322	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
   1323		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1324		goto bail;
   1325	}
   1326
   1327	lockres->l_action = OCFS2_AST_ATTACH;
   1328	lockres->l_requested = level;
   1329	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
   1330	gen = lockres_set_pending(lockres);
   1331	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1332
   1333	ret = ocfs2_dlm_lock(osb->cconn,
   1334			     level,
   1335			     &lockres->l_lksb,
   1336			     dlm_flags,
   1337			     lockres->l_name,
   1338			     OCFS2_LOCK_ID_MAX_LEN - 1);
   1339	lockres_clear_pending(lockres, gen, osb);
   1340	if (ret) {
   1341		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
   1342		ocfs2_recover_from_dlm_error(lockres, 1);
   1343	}
   1344
   1345	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
   1346
   1347bail:
   1348	return ret;
   1349}
   1350
   1351static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
   1352					int flag)
   1353{
   1354	unsigned long flags;
   1355	int ret;
   1356
   1357	spin_lock_irqsave(&lockres->l_lock, flags);
   1358	ret = lockres->l_flags & flag;
   1359	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1360
   1361	return ret;
   1362}
   1363
   1364static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
   1365
   1366{
   1367	wait_event(lockres->l_event,
   1368		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
   1369}
   1370
   1371static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
   1372
   1373{
   1374	wait_event(lockres->l_event,
   1375		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
   1376}
   1377
   1378/* predict what lock level we'll be dropping down to on behalf
   1379 * of another node, and return true if the currently wanted
   1380 * level will be compatible with it. */
   1381static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
   1382						     int wanted)
   1383{
   1384	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
   1385
   1386	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
   1387}
   1388
   1389static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
   1390{
   1391	INIT_LIST_HEAD(&mw->mw_item);
   1392	init_completion(&mw->mw_complete);
   1393	ocfs2_init_start_time(mw);
   1394}
   1395
   1396static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
   1397{
   1398	wait_for_completion(&mw->mw_complete);
   1399	/* Re-arm the completion in case we want to wait on it again */
   1400	reinit_completion(&mw->mw_complete);
   1401	return mw->mw_status;
   1402}
   1403
   1404static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
   1405				    struct ocfs2_mask_waiter *mw,
   1406				    unsigned long mask,
   1407				    unsigned long goal)
   1408{
   1409	BUG_ON(!list_empty(&mw->mw_item));
   1410
   1411	assert_spin_locked(&lockres->l_lock);
   1412
   1413	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
   1414	mw->mw_mask = mask;
   1415	mw->mw_goal = goal;
   1416	ocfs2_track_lock_wait(lockres);
   1417}
   1418
   1419/* returns 0 if the mw that was removed was already satisfied, -EBUSY
   1420 * if the mask still hadn't reached its goal */
   1421static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
   1422				      struct ocfs2_mask_waiter *mw)
   1423{
   1424	int ret = 0;
   1425
   1426	assert_spin_locked(&lockres->l_lock);
   1427	if (!list_empty(&mw->mw_item)) {
   1428		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
   1429			ret = -EBUSY;
   1430
   1431		list_del_init(&mw->mw_item);
   1432		init_completion(&mw->mw_complete);
   1433		ocfs2_track_lock_wait(lockres);
   1434	}
   1435
   1436	return ret;
   1437}
   1438
   1439static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
   1440				      struct ocfs2_mask_waiter *mw)
   1441{
   1442	unsigned long flags;
   1443	int ret = 0;
   1444
   1445	spin_lock_irqsave(&lockres->l_lock, flags);
   1446	ret = __lockres_remove_mask_waiter(lockres, mw);
   1447	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1448
   1449	return ret;
   1450
   1451}
   1452
   1453static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
   1454					     struct ocfs2_lock_res *lockres)
   1455{
   1456	int ret;
   1457
   1458	ret = wait_for_completion_interruptible(&mw->mw_complete);
   1459	if (ret)
   1460		lockres_remove_mask_waiter(lockres, mw);
   1461	else
   1462		ret = mw->mw_status;
   1463	/* Re-arm the completion in case we want to wait on it again */
   1464	reinit_completion(&mw->mw_complete);
   1465	return ret;
   1466}
   1467
   1468static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
   1469				struct ocfs2_lock_res *lockres,
   1470				int level,
   1471				u32 lkm_flags,
   1472				int arg_flags,
   1473				int l_subclass,
   1474				unsigned long caller_ip)
   1475{
   1476	struct ocfs2_mask_waiter mw;
   1477	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
   1478	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
   1479	unsigned long flags;
   1480	unsigned int gen;
   1481	int noqueue_attempted = 0;
   1482	int dlm_locked = 0;
   1483	int kick_dc = 0;
   1484
   1485	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
   1486		mlog_errno(-EINVAL);
   1487		return -EINVAL;
   1488	}
   1489
   1490	ocfs2_init_mask_waiter(&mw);
   1491
   1492	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
   1493		lkm_flags |= DLM_LKF_VALBLK;
   1494
   1495again:
   1496	wait = 0;
   1497
   1498	spin_lock_irqsave(&lockres->l_lock, flags);
   1499
   1500	if (catch_signals && signal_pending(current)) {
   1501		ret = -ERESTARTSYS;
   1502		goto unlock;
   1503	}
   1504
   1505	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
   1506			"Cluster lock called on freeing lockres %s! flags "
   1507			"0x%lx\n", lockres->l_name, lockres->l_flags);
   1508
   1509	/* We only compare against the currently granted level
   1510	 * here. If the lock is blocked waiting on a downconvert,
   1511	 * we'll get caught below. */
   1512	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
   1513	    level > lockres->l_level) {
   1514		/* is someone sitting in dlm_lock? If so, wait on
   1515		 * them. */
   1516		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
   1517		wait = 1;
   1518		goto unlock;
   1519	}
   1520
   1521	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
   1522		/*
   1523		 * We've upconverted. If the lock now has a level we can
   1524		 * work with, we take it. If, however, the lock is not at the
   1525		 * required level, we go thru the full cycle. One way this could
   1526		 * happen is if a process requesting an upconvert to PR is
   1527		 * closely followed by another requesting upconvert to an EX.
   1528		 * If the process requesting EX lands here, we want it to
   1529		 * continue attempting to upconvert and let the process
   1530		 * requesting PR take the lock.
   1531		 * If multiple processes request upconvert to PR, the first one
   1532		 * here will take the lock. The others will have to go thru the
   1533		 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
   1534		 * downconvert request.
   1535		 */
   1536		if (level <= lockres->l_level)
   1537			goto update_holders;
   1538	}
   1539
   1540	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
   1541	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
   1542		/* is the lock is currently blocked on behalf of
   1543		 * another node */
   1544		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
   1545		wait = 1;
   1546		goto unlock;
   1547	}
   1548
   1549	if (level > lockres->l_level) {
   1550		if (noqueue_attempted > 0) {
   1551			ret = -EAGAIN;
   1552			goto unlock;
   1553		}
   1554		if (lkm_flags & DLM_LKF_NOQUEUE)
   1555			noqueue_attempted = 1;
   1556
   1557		if (lockres->l_action != OCFS2_AST_INVALID)
   1558			mlog(ML_ERROR, "lockres %s has action %u pending\n",
   1559			     lockres->l_name, lockres->l_action);
   1560
   1561		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
   1562			lockres->l_action = OCFS2_AST_ATTACH;
   1563			lkm_flags &= ~DLM_LKF_CONVERT;
   1564		} else {
   1565			lockres->l_action = OCFS2_AST_CONVERT;
   1566			lkm_flags |= DLM_LKF_CONVERT;
   1567		}
   1568
   1569		lockres->l_requested = level;
   1570		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
   1571		gen = lockres_set_pending(lockres);
   1572		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1573
   1574		BUG_ON(level == DLM_LOCK_IV);
   1575		BUG_ON(level == DLM_LOCK_NL);
   1576
   1577		mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
   1578		     lockres->l_name, lockres->l_level, level);
   1579
   1580		/* call dlm_lock to upgrade lock now */
   1581		ret = ocfs2_dlm_lock(osb->cconn,
   1582				     level,
   1583				     &lockres->l_lksb,
   1584				     lkm_flags,
   1585				     lockres->l_name,
   1586				     OCFS2_LOCK_ID_MAX_LEN - 1);
   1587		lockres_clear_pending(lockres, gen, osb);
   1588		if (ret) {
   1589			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
   1590			    (ret != -EAGAIN)) {
   1591				ocfs2_log_dlm_error("ocfs2_dlm_lock",
   1592						    ret, lockres);
   1593			}
   1594			ocfs2_recover_from_dlm_error(lockres, 1);
   1595			goto out;
   1596		}
   1597		dlm_locked = 1;
   1598
   1599		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
   1600		     lockres->l_name);
   1601
   1602		/* At this point we've gone inside the dlm and need to
   1603		 * complete our work regardless. */
   1604		catch_signals = 0;
   1605
   1606		/* wait for busy to clear and carry on */
   1607		goto again;
   1608	}
   1609
   1610update_holders:
   1611	/* Ok, if we get here then we're good to go. */
   1612	ocfs2_inc_holders(lockres, level);
   1613
   1614	ret = 0;
   1615unlock:
   1616	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
   1617
   1618	/* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
   1619	kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
   1620
   1621	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1622	if (kick_dc)
   1623		ocfs2_wake_downconvert_thread(osb);
   1624out:
   1625	/*
   1626	 * This is helping work around a lock inversion between the page lock
   1627	 * and dlm locks.  One path holds the page lock while calling aops
   1628	 * which block acquiring dlm locks.  The voting thread holds dlm
   1629	 * locks while acquiring page locks while down converting data locks.
   1630	 * This block is helping an aop path notice the inversion and back
   1631	 * off to unlock its page lock before trying the dlm lock again.
   1632	 */
   1633	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
   1634	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
   1635		wait = 0;
   1636		spin_lock_irqsave(&lockres->l_lock, flags);
   1637		if (__lockres_remove_mask_waiter(lockres, &mw)) {
   1638			if (dlm_locked)
   1639				lockres_or_flags(lockres,
   1640					OCFS2_LOCK_NONBLOCK_FINISHED);
   1641			spin_unlock_irqrestore(&lockres->l_lock, flags);
   1642			ret = -EAGAIN;
   1643		} else {
   1644			spin_unlock_irqrestore(&lockres->l_lock, flags);
   1645			goto again;
   1646		}
   1647	}
   1648	if (wait) {
   1649		ret = ocfs2_wait_for_mask(&mw);
   1650		if (ret == 0)
   1651			goto again;
   1652		mlog_errno(ret);
   1653	}
   1654	ocfs2_update_lock_stats(lockres, level, &mw, ret);
   1655
   1656#ifdef CONFIG_DEBUG_LOCK_ALLOC
   1657	if (!ret && lockres->l_lockdep_map.key != NULL) {
   1658		if (level == DLM_LOCK_PR)
   1659			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
   1660				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
   1661				caller_ip);
   1662		else
   1663			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
   1664				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
   1665				caller_ip);
   1666	}
   1667#endif
   1668	return ret;
   1669}
   1670
   1671static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
   1672				     struct ocfs2_lock_res *lockres,
   1673				     int level,
   1674				     u32 lkm_flags,
   1675				     int arg_flags)
   1676{
   1677	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
   1678				    0, _RET_IP_);
   1679}
   1680
   1681
   1682static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
   1683				   struct ocfs2_lock_res *lockres,
   1684				   int level,
   1685				   unsigned long caller_ip)
   1686{
   1687	unsigned long flags;
   1688
   1689	spin_lock_irqsave(&lockres->l_lock, flags);
   1690	ocfs2_dec_holders(lockres, level);
   1691	ocfs2_downconvert_on_unlock(osb, lockres);
   1692	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1693#ifdef CONFIG_DEBUG_LOCK_ALLOC
   1694	if (lockres->l_lockdep_map.key != NULL)
   1695		rwsem_release(&lockres->l_lockdep_map, caller_ip);
   1696#endif
   1697}
   1698
   1699static int ocfs2_create_new_lock(struct ocfs2_super *osb,
   1700				 struct ocfs2_lock_res *lockres,
   1701				 int ex,
   1702				 int local)
   1703{
   1704	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   1705	unsigned long flags;
   1706	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
   1707
   1708	spin_lock_irqsave(&lockres->l_lock, flags);
   1709	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
   1710	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
   1711	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1712
   1713	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
   1714}
   1715
   1716/* Grants us an EX lock on the data and metadata resources, skipping
   1717 * the normal cluster directory lookup. Use this ONLY on newly created
   1718 * inodes which other nodes can't possibly see, and which haven't been
   1719 * hashed in the inode hash yet. This can give us a good performance
   1720 * increase as it'll skip the network broadcast normally associated
   1721 * with creating a new lock resource. */
   1722int ocfs2_create_new_inode_locks(struct inode *inode)
   1723{
   1724	int ret;
   1725	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1726
   1727	BUG_ON(!ocfs2_inode_is_new(inode));
   1728
   1729	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
   1730
   1731	/* NOTE: That we don't increment any of the holder counts, nor
   1732	 * do we add anything to a journal handle. Since this is
   1733	 * supposed to be a new inode which the cluster doesn't know
   1734	 * about yet, there is no need to.  As far as the LVB handling
   1735	 * is concerned, this is basically like acquiring an EX lock
   1736	 * on a resource which has an invalid one -- we'll set it
   1737	 * valid when we release the EX. */
   1738
   1739	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
   1740	if (ret) {
   1741		mlog_errno(ret);
   1742		goto bail;
   1743	}
   1744
   1745	/*
   1746	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
   1747	 * don't use a generation in their lock names.
   1748	 */
   1749	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
   1750	if (ret) {
   1751		mlog_errno(ret);
   1752		goto bail;
   1753	}
   1754
   1755	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
   1756	if (ret)
   1757		mlog_errno(ret);
   1758
   1759bail:
   1760	return ret;
   1761}
   1762
   1763int ocfs2_rw_lock(struct inode *inode, int write)
   1764{
   1765	int status, level;
   1766	struct ocfs2_lock_res *lockres;
   1767	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1768
   1769	mlog(0, "inode %llu take %s RW lock\n",
   1770	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   1771	     write ? "EXMODE" : "PRMODE");
   1772
   1773	if (ocfs2_mount_local(osb))
   1774		return 0;
   1775
   1776	lockres = &OCFS2_I(inode)->ip_rw_lockres;
   1777
   1778	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
   1779
   1780	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
   1781	if (status < 0)
   1782		mlog_errno(status);
   1783
   1784	return status;
   1785}
   1786
   1787int ocfs2_try_rw_lock(struct inode *inode, int write)
   1788{
   1789	int status, level;
   1790	struct ocfs2_lock_res *lockres;
   1791	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1792
   1793	mlog(0, "inode %llu try to take %s RW lock\n",
   1794	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   1795	     write ? "EXMODE" : "PRMODE");
   1796
   1797	if (ocfs2_mount_local(osb))
   1798		return 0;
   1799
   1800	lockres = &OCFS2_I(inode)->ip_rw_lockres;
   1801
   1802	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
   1803
   1804	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
   1805	return status;
   1806}
   1807
   1808void ocfs2_rw_unlock(struct inode *inode, int write)
   1809{
   1810	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
   1811	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
   1812	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1813
   1814	mlog(0, "inode %llu drop %s RW lock\n",
   1815	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   1816	     write ? "EXMODE" : "PRMODE");
   1817
   1818	if (!ocfs2_mount_local(osb))
   1819		ocfs2_cluster_unlock(osb, lockres, level);
   1820}
   1821
   1822/*
   1823 * ocfs2_open_lock always get PR mode lock.
   1824 */
   1825int ocfs2_open_lock(struct inode *inode)
   1826{
   1827	int status = 0;
   1828	struct ocfs2_lock_res *lockres;
   1829	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1830
   1831	mlog(0, "inode %llu take PRMODE open lock\n",
   1832	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
   1833
   1834	if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
   1835		goto out;
   1836
   1837	lockres = &OCFS2_I(inode)->ip_open_lockres;
   1838
   1839	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
   1840	if (status < 0)
   1841		mlog_errno(status);
   1842
   1843out:
   1844	return status;
   1845}
   1846
   1847int ocfs2_try_open_lock(struct inode *inode, int write)
   1848{
   1849	int status = 0, level;
   1850	struct ocfs2_lock_res *lockres;
   1851	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1852
   1853	mlog(0, "inode %llu try to take %s open lock\n",
   1854	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   1855	     write ? "EXMODE" : "PRMODE");
   1856
   1857	if (ocfs2_is_hard_readonly(osb)) {
   1858		if (write)
   1859			status = -EROFS;
   1860		goto out;
   1861	}
   1862
   1863	if (ocfs2_mount_local(osb))
   1864		goto out;
   1865
   1866	lockres = &OCFS2_I(inode)->ip_open_lockres;
   1867
   1868	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
   1869
   1870	/*
   1871	 * The file system may already holding a PRMODE/EXMODE open lock.
   1872	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
   1873	 * other nodes and the -EAGAIN will indicate to the caller that
   1874	 * this inode is still in use.
   1875	 */
   1876	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
   1877
   1878out:
   1879	return status;
   1880}
   1881
   1882/*
   1883 * ocfs2_open_unlock unlock PR and EX mode open locks.
   1884 */
   1885void ocfs2_open_unlock(struct inode *inode)
   1886{
   1887	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
   1888	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   1889
   1890	mlog(0, "inode %llu drop open lock\n",
   1891	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
   1892
   1893	if (ocfs2_mount_local(osb))
   1894		goto out;
   1895
   1896	if(lockres->l_ro_holders)
   1897		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
   1898	if(lockres->l_ex_holders)
   1899		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
   1900
   1901out:
   1902	return;
   1903}
   1904
   1905static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
   1906				     int level)
   1907{
   1908	int ret;
   1909	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
   1910	unsigned long flags;
   1911	struct ocfs2_mask_waiter mw;
   1912
   1913	ocfs2_init_mask_waiter(&mw);
   1914
   1915retry_cancel:
   1916	spin_lock_irqsave(&lockres->l_lock, flags);
   1917	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
   1918		ret = ocfs2_prepare_cancel_convert(osb, lockres);
   1919		if (ret) {
   1920			spin_unlock_irqrestore(&lockres->l_lock, flags);
   1921			ret = ocfs2_cancel_convert(osb, lockres);
   1922			if (ret < 0) {
   1923				mlog_errno(ret);
   1924				goto out;
   1925			}
   1926			goto retry_cancel;
   1927		}
   1928		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
   1929		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1930
   1931		ocfs2_wait_for_mask(&mw);
   1932		goto retry_cancel;
   1933	}
   1934
   1935	ret = -ERESTARTSYS;
   1936	/*
   1937	 * We may still have gotten the lock, in which case there's no
   1938	 * point to restarting the syscall.
   1939	 */
   1940	if (lockres->l_level == level)
   1941		ret = 0;
   1942
   1943	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
   1944	     lockres->l_flags, lockres->l_level, lockres->l_action);
   1945
   1946	spin_unlock_irqrestore(&lockres->l_lock, flags);
   1947
   1948out:
   1949	return ret;
   1950}
   1951
   1952/*
   1953 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
   1954 * flock() calls. The locking approach this requires is sufficiently
   1955 * different from all other cluster lock types that we implement a
   1956 * separate path to the "low-level" dlm calls. In particular:
   1957 *
   1958 * - No optimization of lock levels is done - we take at exactly
   1959 *   what's been requested.
   1960 *
   1961 * - No lock caching is employed. We immediately downconvert to
   1962 *   no-lock at unlock time. This also means flock locks never go on
   1963 *   the blocking list).
   1964 *
   1965 * - Since userspace can trivially deadlock itself with flock, we make
   1966 *   sure to allow cancellation of a misbehaving applications flock()
   1967 *   request.
   1968 *
   1969 * - Access to any flock lockres doesn't require concurrency, so we
   1970 *   can simplify the code by requiring the caller to guarantee
   1971 *   serialization of dlmglue flock calls.
   1972 */
   1973int ocfs2_file_lock(struct file *file, int ex, int trylock)
   1974{
   1975	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   1976	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
   1977	unsigned long flags;
   1978	struct ocfs2_file_private *fp = file->private_data;
   1979	struct ocfs2_lock_res *lockres = &fp->fp_flock;
   1980	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
   1981	struct ocfs2_mask_waiter mw;
   1982
   1983	ocfs2_init_mask_waiter(&mw);
   1984
   1985	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
   1986	    (lockres->l_level > DLM_LOCK_NL)) {
   1987		mlog(ML_ERROR,
   1988		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
   1989		     "level: %u\n", lockres->l_name, lockres->l_flags,
   1990		     lockres->l_level);
   1991		return -EINVAL;
   1992	}
   1993
   1994	spin_lock_irqsave(&lockres->l_lock, flags);
   1995	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
   1996		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
   1997		spin_unlock_irqrestore(&lockres->l_lock, flags);
   1998
   1999		/*
   2000		 * Get the lock at NLMODE to start - that way we
   2001		 * can cancel the upconvert request if need be.
   2002		 */
   2003		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
   2004		if (ret < 0) {
   2005			mlog_errno(ret);
   2006			goto out;
   2007		}
   2008
   2009		ret = ocfs2_wait_for_mask(&mw);
   2010		if (ret) {
   2011			mlog_errno(ret);
   2012			goto out;
   2013		}
   2014		spin_lock_irqsave(&lockres->l_lock, flags);
   2015	}
   2016
   2017	lockres->l_action = OCFS2_AST_CONVERT;
   2018	lkm_flags |= DLM_LKF_CONVERT;
   2019	lockres->l_requested = level;
   2020	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
   2021
   2022	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
   2023	spin_unlock_irqrestore(&lockres->l_lock, flags);
   2024
   2025	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
   2026			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
   2027	if (ret) {
   2028		if (!trylock || (ret != -EAGAIN)) {
   2029			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
   2030			ret = -EINVAL;
   2031		}
   2032
   2033		ocfs2_recover_from_dlm_error(lockres, 1);
   2034		lockres_remove_mask_waiter(lockres, &mw);
   2035		goto out;
   2036	}
   2037
   2038	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
   2039	if (ret == -ERESTARTSYS) {
   2040		/*
   2041		 * Userspace can cause deadlock itself with
   2042		 * flock(). Current behavior locally is to allow the
   2043		 * deadlock, but abort the system call if a signal is
   2044		 * received. We follow this example, otherwise a
   2045		 * poorly written program could sit in kernel until
   2046		 * reboot.
   2047		 *
   2048		 * Handling this is a bit more complicated for Ocfs2
   2049		 * though. We can't exit this function with an
   2050		 * outstanding lock request, so a cancel convert is
   2051		 * required. We intentionally overwrite 'ret' - if the
   2052		 * cancel fails and the lock was granted, it's easier
   2053		 * to just bubble success back up to the user.
   2054		 */
   2055		ret = ocfs2_flock_handle_signal(lockres, level);
   2056	} else if (!ret && (level > lockres->l_level)) {
   2057		/* Trylock failed asynchronously */
   2058		BUG_ON(!trylock);
   2059		ret = -EAGAIN;
   2060	}
   2061
   2062out:
   2063
   2064	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
   2065	     lockres->l_name, ex, trylock, ret);
   2066	return ret;
   2067}
   2068
   2069void ocfs2_file_unlock(struct file *file)
   2070{
   2071	int ret;
   2072	unsigned int gen;
   2073	unsigned long flags;
   2074	struct ocfs2_file_private *fp = file->private_data;
   2075	struct ocfs2_lock_res *lockres = &fp->fp_flock;
   2076	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
   2077	struct ocfs2_mask_waiter mw;
   2078
   2079	ocfs2_init_mask_waiter(&mw);
   2080
   2081	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
   2082		return;
   2083
   2084	if (lockres->l_level == DLM_LOCK_NL)
   2085		return;
   2086
   2087	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
   2088	     lockres->l_name, lockres->l_flags, lockres->l_level,
   2089	     lockres->l_action);
   2090
   2091	spin_lock_irqsave(&lockres->l_lock, flags);
   2092	/*
   2093	 * Fake a blocking ast for the downconvert code.
   2094	 */
   2095	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
   2096	lockres->l_blocking = DLM_LOCK_EX;
   2097
   2098	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
   2099	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
   2100	spin_unlock_irqrestore(&lockres->l_lock, flags);
   2101
   2102	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
   2103	if (ret) {
   2104		mlog_errno(ret);
   2105		return;
   2106	}
   2107
   2108	ret = ocfs2_wait_for_mask(&mw);
   2109	if (ret)
   2110		mlog_errno(ret);
   2111}
   2112
   2113static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
   2114					struct ocfs2_lock_res *lockres)
   2115{
   2116	int kick = 0;
   2117
   2118	/* If we know that another node is waiting on our lock, kick
   2119	 * the downconvert thread * pre-emptively when we reach a release
   2120	 * condition. */
   2121	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
   2122		switch(lockres->l_blocking) {
   2123		case DLM_LOCK_EX:
   2124			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
   2125				kick = 1;
   2126			break;
   2127		case DLM_LOCK_PR:
   2128			if (!lockres->l_ex_holders)
   2129				kick = 1;
   2130			break;
   2131		default:
   2132			BUG();
   2133		}
   2134	}
   2135
   2136	if (kick)
   2137		ocfs2_wake_downconvert_thread(osb);
   2138}
   2139
   2140#define OCFS2_SEC_BITS   34
   2141#define OCFS2_SEC_SHIFT  (64 - OCFS2_SEC_BITS)
   2142#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
   2143
   2144/* LVB only has room for 64 bits of time here so we pack it for
   2145 * now. */
   2146static u64 ocfs2_pack_timespec(struct timespec64 *spec)
   2147{
   2148	u64 res;
   2149	u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull);
   2150	u32 nsec = spec->tv_nsec;
   2151
   2152	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
   2153
   2154	return res;
   2155}
   2156
   2157/* Call this with the lockres locked. I am reasonably sure we don't
   2158 * need ip_lock in this function as anyone who would be changing those
   2159 * values is supposed to be blocked in ocfs2_inode_lock right now. */
   2160static void __ocfs2_stuff_meta_lvb(struct inode *inode)
   2161{
   2162	struct ocfs2_inode_info *oi = OCFS2_I(inode);
   2163	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
   2164	struct ocfs2_meta_lvb *lvb;
   2165
   2166	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2167
   2168	/*
   2169	 * Invalidate the LVB of a deleted inode - this way other
   2170	 * nodes are forced to go to disk and discover the new inode
   2171	 * status.
   2172	 */
   2173	if (oi->ip_flags & OCFS2_INODE_DELETED) {
   2174		lvb->lvb_version = 0;
   2175		goto out;
   2176	}
   2177
   2178	lvb->lvb_version   = OCFS2_LVB_VERSION;
   2179	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
   2180	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
   2181	lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
   2182	lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
   2183	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
   2184	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
   2185	lvb->lvb_iatime_packed  =
   2186		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
   2187	lvb->lvb_ictime_packed =
   2188		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
   2189	lvb->lvb_imtime_packed =
   2190		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
   2191	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
   2192	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
   2193	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
   2194
   2195out:
   2196	mlog_meta_lvb(0, lockres);
   2197}
   2198
   2199static void ocfs2_unpack_timespec(struct timespec64 *spec,
   2200				  u64 packed_time)
   2201{
   2202	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
   2203	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
   2204}
   2205
   2206static int ocfs2_refresh_inode_from_lvb(struct inode *inode)
   2207{
   2208	struct ocfs2_inode_info *oi = OCFS2_I(inode);
   2209	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
   2210	struct ocfs2_meta_lvb *lvb;
   2211
   2212	mlog_meta_lvb(0, lockres);
   2213
   2214	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2215	if (inode_wrong_type(inode, be16_to_cpu(lvb->lvb_imode)))
   2216		return -ESTALE;
   2217
   2218	/* We're safe here without the lockres lock... */
   2219	spin_lock(&oi->ip_lock);
   2220	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
   2221	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
   2222
   2223	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
   2224	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
   2225	ocfs2_set_inode_flags(inode);
   2226
   2227	/* fast-symlinks are a special case */
   2228	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
   2229		inode->i_blocks = 0;
   2230	else
   2231		inode->i_blocks = ocfs2_inode_sector_count(inode);
   2232
   2233	i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
   2234	i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
   2235	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
   2236	set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
   2237	ocfs2_unpack_timespec(&inode->i_atime,
   2238			      be64_to_cpu(lvb->lvb_iatime_packed));
   2239	ocfs2_unpack_timespec(&inode->i_mtime,
   2240			      be64_to_cpu(lvb->lvb_imtime_packed));
   2241	ocfs2_unpack_timespec(&inode->i_ctime,
   2242			      be64_to_cpu(lvb->lvb_ictime_packed));
   2243	spin_unlock(&oi->ip_lock);
   2244	return 0;
   2245}
   2246
   2247static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
   2248					      struct ocfs2_lock_res *lockres)
   2249{
   2250	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2251
   2252	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
   2253	    && lvb->lvb_version == OCFS2_LVB_VERSION
   2254	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
   2255		return 1;
   2256	return 0;
   2257}
   2258
   2259/* Determine whether a lock resource needs to be refreshed, and
   2260 * arbitrate who gets to refresh it.
   2261 *
   2262 *   0 means no refresh needed.
   2263 *
   2264 *   > 0 means you need to refresh this and you MUST call
   2265 *   ocfs2_complete_lock_res_refresh afterwards. */
   2266static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
   2267{
   2268	unsigned long flags;
   2269	int status = 0;
   2270
   2271refresh_check:
   2272	spin_lock_irqsave(&lockres->l_lock, flags);
   2273	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
   2274		spin_unlock_irqrestore(&lockres->l_lock, flags);
   2275		goto bail;
   2276	}
   2277
   2278	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
   2279		spin_unlock_irqrestore(&lockres->l_lock, flags);
   2280
   2281		ocfs2_wait_on_refreshing_lock(lockres);
   2282		goto refresh_check;
   2283	}
   2284
   2285	/* Ok, I'll be the one to refresh this lock. */
   2286	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
   2287	spin_unlock_irqrestore(&lockres->l_lock, flags);
   2288
   2289	status = 1;
   2290bail:
   2291	mlog(0, "status %d\n", status);
   2292	return status;
   2293}
   2294
   2295/* If status is non zero, I'll mark it as not being in refresh
   2296 * anymroe, but i won't clear the needs refresh flag. */
   2297static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
   2298						   int status)
   2299{
   2300	unsigned long flags;
   2301
   2302	spin_lock_irqsave(&lockres->l_lock, flags);
   2303	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
   2304	if (!status)
   2305		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
   2306	spin_unlock_irqrestore(&lockres->l_lock, flags);
   2307
   2308	wake_up(&lockres->l_event);
   2309}
   2310
   2311/* may or may not return a bh if it went to disk. */
   2312static int ocfs2_inode_lock_update(struct inode *inode,
   2313				  struct buffer_head **bh)
   2314{
   2315	int status = 0;
   2316	struct ocfs2_inode_info *oi = OCFS2_I(inode);
   2317	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
   2318	struct ocfs2_dinode *fe;
   2319	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   2320
   2321	if (ocfs2_mount_local(osb))
   2322		goto bail;
   2323
   2324	spin_lock(&oi->ip_lock);
   2325	if (oi->ip_flags & OCFS2_INODE_DELETED) {
   2326		mlog(0, "Orphaned inode %llu was deleted while we "
   2327		     "were waiting on a lock. ip_flags = 0x%x\n",
   2328		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
   2329		spin_unlock(&oi->ip_lock);
   2330		status = -ENOENT;
   2331		goto bail;
   2332	}
   2333	spin_unlock(&oi->ip_lock);
   2334
   2335	if (!ocfs2_should_refresh_lock_res(lockres))
   2336		goto bail;
   2337
   2338	/* This will discard any caching information we might have had
   2339	 * for the inode metadata. */
   2340	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
   2341
   2342	ocfs2_extent_map_trunc(inode, 0);
   2343
   2344	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
   2345		mlog(0, "Trusting LVB on inode %llu\n",
   2346		     (unsigned long long)oi->ip_blkno);
   2347		status = ocfs2_refresh_inode_from_lvb(inode);
   2348		goto bail_refresh;
   2349	} else {
   2350		/* Boo, we have to go to disk. */
   2351		/* read bh, cast, ocfs2_refresh_inode */
   2352		status = ocfs2_read_inode_block(inode, bh);
   2353		if (status < 0) {
   2354			mlog_errno(status);
   2355			goto bail_refresh;
   2356		}
   2357		fe = (struct ocfs2_dinode *) (*bh)->b_data;
   2358		if (inode_wrong_type(inode, le16_to_cpu(fe->i_mode))) {
   2359			status = -ESTALE;
   2360			goto bail_refresh;
   2361		}
   2362
   2363		/* This is a good chance to make sure we're not
   2364		 * locking an invalid object.  ocfs2_read_inode_block()
   2365		 * already checked that the inode block is sane.
   2366		 *
   2367		 * We bug on a stale inode here because we checked
   2368		 * above whether it was wiped from disk. The wiping
   2369		 * node provides a guarantee that we receive that
   2370		 * message and can mark the inode before dropping any
   2371		 * locks associated with it. */
   2372		mlog_bug_on_msg(inode->i_generation !=
   2373				le32_to_cpu(fe->i_generation),
   2374				"Invalid dinode %llu disk generation: %u "
   2375				"inode->i_generation: %u\n",
   2376				(unsigned long long)oi->ip_blkno,
   2377				le32_to_cpu(fe->i_generation),
   2378				inode->i_generation);
   2379		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
   2380				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
   2381				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
   2382				(unsigned long long)oi->ip_blkno,
   2383				(unsigned long long)le64_to_cpu(fe->i_dtime),
   2384				le32_to_cpu(fe->i_flags));
   2385
   2386		ocfs2_refresh_inode(inode, fe);
   2387		ocfs2_track_lock_refresh(lockres);
   2388	}
   2389
   2390	status = 0;
   2391bail_refresh:
   2392	ocfs2_complete_lock_res_refresh(lockres, status);
   2393bail:
   2394	return status;
   2395}
   2396
   2397static int ocfs2_assign_bh(struct inode *inode,
   2398			   struct buffer_head **ret_bh,
   2399			   struct buffer_head *passed_bh)
   2400{
   2401	int status;
   2402
   2403	if (passed_bh) {
   2404		/* Ok, the update went to disk for us, use the
   2405		 * returned bh. */
   2406		*ret_bh = passed_bh;
   2407		get_bh(*ret_bh);
   2408
   2409		return 0;
   2410	}
   2411
   2412	status = ocfs2_read_inode_block(inode, ret_bh);
   2413	if (status < 0)
   2414		mlog_errno(status);
   2415
   2416	return status;
   2417}
   2418
   2419/*
   2420 * returns < 0 error if the callback will never be called, otherwise
   2421 * the result of the lock will be communicated via the callback.
   2422 */
   2423int ocfs2_inode_lock_full_nested(struct inode *inode,
   2424				 struct buffer_head **ret_bh,
   2425				 int ex,
   2426				 int arg_flags,
   2427				 int subclass)
   2428{
   2429	int status, level, acquired;
   2430	u32 dlm_flags;
   2431	struct ocfs2_lock_res *lockres = NULL;
   2432	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   2433	struct buffer_head *local_bh = NULL;
   2434
   2435	mlog(0, "inode %llu, take %s META lock\n",
   2436	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   2437	     ex ? "EXMODE" : "PRMODE");
   2438
   2439	status = 0;
   2440	acquired = 0;
   2441	/* We'll allow faking a readonly metadata lock for
   2442	 * rodevices. */
   2443	if (ocfs2_is_hard_readonly(osb)) {
   2444		if (ex)
   2445			status = -EROFS;
   2446		goto getbh;
   2447	}
   2448
   2449	if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
   2450	    ocfs2_mount_local(osb))
   2451		goto update;
   2452
   2453	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
   2454		ocfs2_wait_for_recovery(osb);
   2455
   2456	lockres = &OCFS2_I(inode)->ip_inode_lockres;
   2457	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2458	dlm_flags = 0;
   2459	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
   2460		dlm_flags |= DLM_LKF_NOQUEUE;
   2461
   2462	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
   2463				      arg_flags, subclass, _RET_IP_);
   2464	if (status < 0) {
   2465		if (status != -EAGAIN)
   2466			mlog_errno(status);
   2467		goto bail;
   2468	}
   2469
   2470	/* Notify the error cleanup path to drop the cluster lock. */
   2471	acquired = 1;
   2472
   2473	/* We wait twice because a node may have died while we were in
   2474	 * the lower dlm layers. The second time though, we've
   2475	 * committed to owning this lock so we don't allow signals to
   2476	 * abort the operation. */
   2477	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
   2478		ocfs2_wait_for_recovery(osb);
   2479
   2480update:
   2481	/*
   2482	 * We only see this flag if we're being called from
   2483	 * ocfs2_read_locked_inode(). It means we're locking an inode
   2484	 * which hasn't been populated yet, so clear the refresh flag
   2485	 * and let the caller handle it.
   2486	 */
   2487	if (inode->i_state & I_NEW) {
   2488		status = 0;
   2489		if (lockres)
   2490			ocfs2_complete_lock_res_refresh(lockres, 0);
   2491		goto bail;
   2492	}
   2493
   2494	/* This is fun. The caller may want a bh back, or it may
   2495	 * not. ocfs2_inode_lock_update definitely wants one in, but
   2496	 * may or may not read one, depending on what's in the
   2497	 * LVB. The result of all of this is that we've *only* gone to
   2498	 * disk if we have to, so the complexity is worthwhile. */
   2499	status = ocfs2_inode_lock_update(inode, &local_bh);
   2500	if (status < 0) {
   2501		if (status != -ENOENT)
   2502			mlog_errno(status);
   2503		goto bail;
   2504	}
   2505getbh:
   2506	if (ret_bh) {
   2507		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
   2508		if (status < 0) {
   2509			mlog_errno(status);
   2510			goto bail;
   2511		}
   2512	}
   2513
   2514bail:
   2515	if (status < 0) {
   2516		if (ret_bh && (*ret_bh)) {
   2517			brelse(*ret_bh);
   2518			*ret_bh = NULL;
   2519		}
   2520		if (acquired)
   2521			ocfs2_inode_unlock(inode, ex);
   2522	}
   2523
   2524	brelse(local_bh);
   2525	return status;
   2526}
   2527
   2528/*
   2529 * This is working around a lock inversion between tasks acquiring DLM
   2530 * locks while holding a page lock and the downconvert thread which
   2531 * blocks dlm lock acquiry while acquiring page locks.
   2532 *
   2533 * ** These _with_page variantes are only intended to be called from aop
   2534 * methods that hold page locks and return a very specific *positive* error
   2535 * code that aop methods pass up to the VFS -- test for errors with != 0. **
   2536 *
   2537 * The DLM is called such that it returns -EAGAIN if it would have
   2538 * blocked waiting for the downconvert thread.  In that case we unlock
   2539 * our page so the downconvert thread can make progress.  Once we've
   2540 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
   2541 * that called us can bubble that back up into the VFS who will then
   2542 * immediately retry the aop call.
   2543 */
   2544int ocfs2_inode_lock_with_page(struct inode *inode,
   2545			      struct buffer_head **ret_bh,
   2546			      int ex,
   2547			      struct page *page)
   2548{
   2549	int ret;
   2550
   2551	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
   2552	if (ret == -EAGAIN) {
   2553		unlock_page(page);
   2554		/*
   2555		 * If we can't get inode lock immediately, we should not return
   2556		 * directly here, since this will lead to a softlockup problem.
   2557		 * The method is to get a blocking lock and immediately unlock
   2558		 * before returning, this can avoid CPU resource waste due to
   2559		 * lots of retries, and benefits fairness in getting lock.
   2560		 */
   2561		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
   2562			ocfs2_inode_unlock(inode, ex);
   2563		ret = AOP_TRUNCATED_PAGE;
   2564	}
   2565
   2566	return ret;
   2567}
   2568
   2569int ocfs2_inode_lock_atime(struct inode *inode,
   2570			  struct vfsmount *vfsmnt,
   2571			  int *level, int wait)
   2572{
   2573	int ret;
   2574
   2575	if (wait)
   2576		ret = ocfs2_inode_lock(inode, NULL, 0);
   2577	else
   2578		ret = ocfs2_try_inode_lock(inode, NULL, 0);
   2579
   2580	if (ret < 0) {
   2581		if (ret != -EAGAIN)
   2582			mlog_errno(ret);
   2583		return ret;
   2584	}
   2585
   2586	/*
   2587	 * If we should update atime, we will get EX lock,
   2588	 * otherwise we just get PR lock.
   2589	 */
   2590	if (ocfs2_should_update_atime(inode, vfsmnt)) {
   2591		struct buffer_head *bh = NULL;
   2592
   2593		ocfs2_inode_unlock(inode, 0);
   2594		if (wait)
   2595			ret = ocfs2_inode_lock(inode, &bh, 1);
   2596		else
   2597			ret = ocfs2_try_inode_lock(inode, &bh, 1);
   2598
   2599		if (ret < 0) {
   2600			if (ret != -EAGAIN)
   2601				mlog_errno(ret);
   2602			return ret;
   2603		}
   2604		*level = 1;
   2605		if (ocfs2_should_update_atime(inode, vfsmnt))
   2606			ocfs2_update_inode_atime(inode, bh);
   2607		brelse(bh);
   2608	} else
   2609		*level = 0;
   2610
   2611	return ret;
   2612}
   2613
   2614void ocfs2_inode_unlock(struct inode *inode,
   2615		       int ex)
   2616{
   2617	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2618	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
   2619	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
   2620
   2621	mlog(0, "inode %llu drop %s META lock\n",
   2622	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
   2623	     ex ? "EXMODE" : "PRMODE");
   2624
   2625	if (!ocfs2_is_hard_readonly(osb) &&
   2626	    !ocfs2_mount_local(osb))
   2627		ocfs2_cluster_unlock(osb, lockres, level);
   2628}
   2629
   2630/*
   2631 * This _tracker variantes are introduced to deal with the recursive cluster
   2632 * locking issue. The idea is to keep track of a lock holder on the stack of
   2633 * the current process. If there's a lock holder on the stack, we know the
   2634 * task context is already protected by cluster locking. Currently, they're
   2635 * used in some VFS entry routines.
   2636 *
   2637 * return < 0 on error, return == 0 if there's no lock holder on the stack
   2638 * before this call, return == 1 if this call would be a recursive locking.
   2639 * return == -1 if this lock attempt will cause an upgrade which is forbidden.
   2640 *
   2641 * When taking lock levels into account,we face some different situations.
   2642 *
   2643 * 1. no lock is held
   2644 *    In this case, just lock the inode as requested and return 0
   2645 *
   2646 * 2. We are holding a lock
   2647 *    For this situation, things diverges into several cases
   2648 *
   2649 *    wanted     holding	     what to do
   2650 *    ex		ex	    see 2.1 below
   2651 *    ex		pr	    see 2.2 below
   2652 *    pr		ex	    see 2.1 below
   2653 *    pr		pr	    see 2.1 below
   2654 *
   2655 *    2.1 lock level that is been held is compatible
   2656 *    with the wanted level, so no lock action will be tacken.
   2657 *
   2658 *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
   2659 *
   2660 * Reason why upgrade within a process is forbidden is that
   2661 * lock upgrade may cause dead lock. The following illustrates
   2662 * how it happens.
   2663 *
   2664 *         thread on node1                             thread on node2
   2665 * ocfs2_inode_lock_tracker(ex=0)
   2666 *
   2667 *                                <======   ocfs2_inode_lock_tracker(ex=1)
   2668 *
   2669 * ocfs2_inode_lock_tracker(ex=1)
   2670 */
   2671int ocfs2_inode_lock_tracker(struct inode *inode,
   2672			     struct buffer_head **ret_bh,
   2673			     int ex,
   2674			     struct ocfs2_lock_holder *oh)
   2675{
   2676	int status = 0;
   2677	struct ocfs2_lock_res *lockres;
   2678	struct ocfs2_lock_holder *tmp_oh;
   2679	struct pid *pid = task_pid(current);
   2680
   2681
   2682	lockres = &OCFS2_I(inode)->ip_inode_lockres;
   2683	tmp_oh = ocfs2_pid_holder(lockres, pid);
   2684
   2685	if (!tmp_oh) {
   2686		/*
   2687		 * This corresponds to the case 1.
   2688		 * We haven't got any lock before.
   2689		 */
   2690		status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
   2691		if (status < 0) {
   2692			if (status != -ENOENT)
   2693				mlog_errno(status);
   2694			return status;
   2695		}
   2696
   2697		oh->oh_ex = ex;
   2698		ocfs2_add_holder(lockres, oh);
   2699		return 0;
   2700	}
   2701
   2702	if (unlikely(ex && !tmp_oh->oh_ex)) {
   2703		/*
   2704		 * case 2.2 upgrade may cause dead lock, forbid it.
   2705		 */
   2706		mlog(ML_ERROR, "Recursive locking is not permitted to "
   2707		     "upgrade to EX level from PR level.\n");
   2708		dump_stack();
   2709		return -EINVAL;
   2710	}
   2711
   2712	/*
   2713	 *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
   2714	 *  ignore the lock level and just update it.
   2715	 */
   2716	if (ret_bh) {
   2717		status = ocfs2_inode_lock_full(inode, ret_bh, ex,
   2718					       OCFS2_META_LOCK_GETBH);
   2719		if (status < 0) {
   2720			if (status != -ENOENT)
   2721				mlog_errno(status);
   2722			return status;
   2723		}
   2724	}
   2725	return 1;
   2726}
   2727
   2728void ocfs2_inode_unlock_tracker(struct inode *inode,
   2729				int ex,
   2730				struct ocfs2_lock_holder *oh,
   2731				int had_lock)
   2732{
   2733	struct ocfs2_lock_res *lockres;
   2734
   2735	lockres = &OCFS2_I(inode)->ip_inode_lockres;
   2736	/* had_lock means that the currect process already takes the cluster
   2737	 * lock previously.
   2738	 * If had_lock is 1, we have nothing to do here.
   2739	 * If had_lock is 0, we will release the lock.
   2740	 */
   2741	if (!had_lock) {
   2742		ocfs2_inode_unlock(inode, oh->oh_ex);
   2743		ocfs2_remove_holder(lockres, oh);
   2744	}
   2745}
   2746
   2747int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
   2748{
   2749	struct ocfs2_lock_res *lockres;
   2750	struct ocfs2_orphan_scan_lvb *lvb;
   2751	int status = 0;
   2752
   2753	if (ocfs2_is_hard_readonly(osb))
   2754		return -EROFS;
   2755
   2756	if (ocfs2_mount_local(osb))
   2757		return 0;
   2758
   2759	lockres = &osb->osb_orphan_scan.os_lockres;
   2760	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
   2761	if (status < 0)
   2762		return status;
   2763
   2764	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2765	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
   2766	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
   2767		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
   2768	else
   2769		*seqno = osb->osb_orphan_scan.os_seqno + 1;
   2770
   2771	return status;
   2772}
   2773
   2774void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
   2775{
   2776	struct ocfs2_lock_res *lockres;
   2777	struct ocfs2_orphan_scan_lvb *lvb;
   2778
   2779	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
   2780		lockres = &osb->osb_orphan_scan.os_lockres;
   2781		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2782		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
   2783		lvb->lvb_os_seqno = cpu_to_be32(seqno);
   2784		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
   2785	}
   2786}
   2787
   2788int ocfs2_super_lock(struct ocfs2_super *osb,
   2789		     int ex)
   2790{
   2791	int status = 0;
   2792	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2793	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
   2794
   2795	if (ocfs2_is_hard_readonly(osb))
   2796		return -EROFS;
   2797
   2798	if (ocfs2_mount_local(osb))
   2799		goto bail;
   2800
   2801	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
   2802	if (status < 0) {
   2803		mlog_errno(status);
   2804		goto bail;
   2805	}
   2806
   2807	/* The super block lock path is really in the best position to
   2808	 * know when resources covered by the lock need to be
   2809	 * refreshed, so we do it here. Of course, making sense of
   2810	 * everything is up to the caller :) */
   2811	status = ocfs2_should_refresh_lock_res(lockres);
   2812	if (status) {
   2813		status = ocfs2_refresh_slot_info(osb);
   2814
   2815		ocfs2_complete_lock_res_refresh(lockres, status);
   2816
   2817		if (status < 0) {
   2818			ocfs2_cluster_unlock(osb, lockres, level);
   2819			mlog_errno(status);
   2820		}
   2821		ocfs2_track_lock_refresh(lockres);
   2822	}
   2823bail:
   2824	return status;
   2825}
   2826
   2827void ocfs2_super_unlock(struct ocfs2_super *osb,
   2828			int ex)
   2829{
   2830	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2831	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
   2832
   2833	if (!ocfs2_mount_local(osb))
   2834		ocfs2_cluster_unlock(osb, lockres, level);
   2835}
   2836
   2837int ocfs2_rename_lock(struct ocfs2_super *osb)
   2838{
   2839	int status;
   2840	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
   2841
   2842	if (ocfs2_is_hard_readonly(osb))
   2843		return -EROFS;
   2844
   2845	if (ocfs2_mount_local(osb))
   2846		return 0;
   2847
   2848	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
   2849	if (status < 0)
   2850		mlog_errno(status);
   2851
   2852	return status;
   2853}
   2854
   2855void ocfs2_rename_unlock(struct ocfs2_super *osb)
   2856{
   2857	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
   2858
   2859	if (!ocfs2_mount_local(osb))
   2860		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
   2861}
   2862
   2863int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
   2864{
   2865	int status;
   2866	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
   2867
   2868	if (ocfs2_is_hard_readonly(osb))
   2869		return -EROFS;
   2870
   2871	if (ex)
   2872		down_write(&osb->nfs_sync_rwlock);
   2873	else
   2874		down_read(&osb->nfs_sync_rwlock);
   2875
   2876	if (ocfs2_mount_local(osb))
   2877		return 0;
   2878
   2879	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
   2880				    0, 0);
   2881	if (status < 0) {
   2882		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
   2883
   2884		if (ex)
   2885			up_write(&osb->nfs_sync_rwlock);
   2886		else
   2887			up_read(&osb->nfs_sync_rwlock);
   2888	}
   2889
   2890	return status;
   2891}
   2892
   2893void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
   2894{
   2895	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
   2896
   2897	if (!ocfs2_mount_local(osb))
   2898		ocfs2_cluster_unlock(osb, lockres,
   2899				     ex ? LKM_EXMODE : LKM_PRMODE);
   2900	if (ex)
   2901		up_write(&osb->nfs_sync_rwlock);
   2902	else
   2903		up_read(&osb->nfs_sync_rwlock);
   2904}
   2905
   2906int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
   2907		       struct ocfs2_trim_fs_info *info, int trylock)
   2908{
   2909	int status;
   2910	struct ocfs2_trim_fs_lvb *lvb;
   2911	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
   2912
   2913	if (info)
   2914		info->tf_valid = 0;
   2915
   2916	if (ocfs2_is_hard_readonly(osb))
   2917		return -EROFS;
   2918
   2919	if (ocfs2_mount_local(osb))
   2920		return 0;
   2921
   2922	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
   2923				    trylock ? DLM_LKF_NOQUEUE : 0, 0);
   2924	if (status < 0) {
   2925		if (status != -EAGAIN)
   2926			mlog_errno(status);
   2927		return status;
   2928	}
   2929
   2930	if (info) {
   2931		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2932		if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
   2933		    lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
   2934			info->tf_valid = 1;
   2935			info->tf_success = lvb->lvb_success;
   2936			info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
   2937			info->tf_start = be64_to_cpu(lvb->lvb_start);
   2938			info->tf_len = be64_to_cpu(lvb->lvb_len);
   2939			info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
   2940			info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
   2941		}
   2942	}
   2943
   2944	return status;
   2945}
   2946
   2947void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
   2948			  struct ocfs2_trim_fs_info *info)
   2949{
   2950	struct ocfs2_trim_fs_lvb *lvb;
   2951	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
   2952
   2953	if (ocfs2_mount_local(osb))
   2954		return;
   2955
   2956	if (info) {
   2957		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   2958		lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
   2959		lvb->lvb_success = info->tf_success;
   2960		lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
   2961		lvb->lvb_start = cpu_to_be64(info->tf_start);
   2962		lvb->lvb_len = cpu_to_be64(info->tf_len);
   2963		lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
   2964		lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
   2965	}
   2966
   2967	ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
   2968}
   2969
   2970int ocfs2_dentry_lock(struct dentry *dentry, int ex)
   2971{
   2972	int ret;
   2973	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2974	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
   2975	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
   2976
   2977	BUG_ON(!dl);
   2978
   2979	if (ocfs2_is_hard_readonly(osb)) {
   2980		if (ex)
   2981			return -EROFS;
   2982		return 0;
   2983	}
   2984
   2985	if (ocfs2_mount_local(osb))
   2986		return 0;
   2987
   2988	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
   2989	if (ret < 0)
   2990		mlog_errno(ret);
   2991
   2992	return ret;
   2993}
   2994
   2995void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
   2996{
   2997	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   2998	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
   2999	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
   3000
   3001	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
   3002		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
   3003}
   3004
   3005/* Reference counting of the dlm debug structure. We want this because
   3006 * open references on the debug inodes can live on after a mount, so
   3007 * we can't rely on the ocfs2_super to always exist. */
   3008static void ocfs2_dlm_debug_free(struct kref *kref)
   3009{
   3010	struct ocfs2_dlm_debug *dlm_debug;
   3011
   3012	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
   3013
   3014	kfree(dlm_debug);
   3015}
   3016
   3017void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
   3018{
   3019	if (dlm_debug)
   3020		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
   3021}
   3022
   3023static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
   3024{
   3025	kref_get(&debug->d_refcnt);
   3026}
   3027
   3028struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
   3029{
   3030	struct ocfs2_dlm_debug *dlm_debug;
   3031
   3032	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
   3033	if (!dlm_debug) {
   3034		mlog_errno(-ENOMEM);
   3035		goto out;
   3036	}
   3037
   3038	kref_init(&dlm_debug->d_refcnt);
   3039	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
   3040	dlm_debug->d_filter_secs = 0;
   3041out:
   3042	return dlm_debug;
   3043}
   3044
   3045/* Access to this is arbitrated for us via seq_file->sem. */
   3046struct ocfs2_dlm_seq_priv {
   3047	struct ocfs2_dlm_debug *p_dlm_debug;
   3048	struct ocfs2_lock_res p_iter_res;
   3049	struct ocfs2_lock_res p_tmp_res;
   3050};
   3051
   3052static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
   3053						 struct ocfs2_dlm_seq_priv *priv)
   3054{
   3055	struct ocfs2_lock_res *iter, *ret = NULL;
   3056	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
   3057
   3058	assert_spin_locked(&ocfs2_dlm_tracking_lock);
   3059
   3060	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
   3061		/* discover the head of the list */
   3062		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
   3063			mlog(0, "End of list found, %p\n", ret);
   3064			break;
   3065		}
   3066
   3067		/* We track our "dummy" iteration lockres' by a NULL
   3068		 * l_ops field. */
   3069		if (iter->l_ops != NULL) {
   3070			ret = iter;
   3071			break;
   3072		}
   3073	}
   3074
   3075	return ret;
   3076}
   3077
   3078static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
   3079{
   3080	struct ocfs2_dlm_seq_priv *priv = m->private;
   3081	struct ocfs2_lock_res *iter;
   3082
   3083	spin_lock(&ocfs2_dlm_tracking_lock);
   3084	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
   3085	if (iter) {
   3086		/* Since lockres' have the lifetime of their container
   3087		 * (which can be inodes, ocfs2_supers, etc) we want to
   3088		 * copy this out to a temporary lockres while still
   3089		 * under the spinlock. Obviously after this we can't
   3090		 * trust any pointers on the copy returned, but that's
   3091		 * ok as the information we want isn't typically held
   3092		 * in them. */
   3093		priv->p_tmp_res = *iter;
   3094		iter = &priv->p_tmp_res;
   3095	}
   3096	spin_unlock(&ocfs2_dlm_tracking_lock);
   3097
   3098	return iter;
   3099}
   3100
   3101static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
   3102{
   3103}
   3104
   3105static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
   3106{
   3107	struct ocfs2_dlm_seq_priv *priv = m->private;
   3108	struct ocfs2_lock_res *iter = v;
   3109	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
   3110
   3111	spin_lock(&ocfs2_dlm_tracking_lock);
   3112	iter = ocfs2_dlm_next_res(iter, priv);
   3113	list_del_init(&dummy->l_debug_list);
   3114	if (iter) {
   3115		list_add(&dummy->l_debug_list, &iter->l_debug_list);
   3116		priv->p_tmp_res = *iter;
   3117		iter = &priv->p_tmp_res;
   3118	}
   3119	spin_unlock(&ocfs2_dlm_tracking_lock);
   3120
   3121	return iter;
   3122}
   3123
   3124/*
   3125 * Version is used by debugfs.ocfs2 to determine the format being used
   3126 *
   3127 * New in version 2
   3128 *	- Lock stats printed
   3129 * New in version 3
   3130 *	- Max time in lock stats is in usecs (instead of nsecs)
   3131 * New in version 4
   3132 *	- Add last pr/ex unlock times and first lock wait time in usecs
   3133 */
   3134#define OCFS2_DLM_DEBUG_STR_VERSION 4
   3135static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
   3136{
   3137	int i;
   3138	char *lvb;
   3139	struct ocfs2_lock_res *lockres = v;
   3140#ifdef CONFIG_OCFS2_FS_STATS
   3141	u64 now, last;
   3142	struct ocfs2_dlm_debug *dlm_debug =
   3143			((struct ocfs2_dlm_seq_priv *)m->private)->p_dlm_debug;
   3144#endif
   3145
   3146	if (!lockres)
   3147		return -EINVAL;
   3148
   3149#ifdef CONFIG_OCFS2_FS_STATS
   3150	if (!lockres->l_lock_wait && dlm_debug->d_filter_secs) {
   3151		now = ktime_to_us(ktime_get_real());
   3152		if (lockres->l_lock_prmode.ls_last >
   3153		    lockres->l_lock_exmode.ls_last)
   3154			last = lockres->l_lock_prmode.ls_last;
   3155		else
   3156			last = lockres->l_lock_exmode.ls_last;
   3157		/*
   3158		 * Use d_filter_secs field to filter lock resources dump,
   3159		 * the default d_filter_secs(0) value filters nothing,
   3160		 * otherwise, only dump the last N seconds active lock
   3161		 * resources.
   3162		 */
   3163		if (div_u64(now - last, 1000000) > dlm_debug->d_filter_secs)
   3164			return 0;
   3165	}
   3166#endif
   3167
   3168	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
   3169
   3170	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
   3171		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
   3172			   lockres->l_name,
   3173			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
   3174	else
   3175		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
   3176
   3177	seq_printf(m, "%d\t"
   3178		   "0x%lx\t"
   3179		   "0x%x\t"
   3180		   "0x%x\t"
   3181		   "%u\t"
   3182		   "%u\t"
   3183		   "%d\t"
   3184		   "%d\t",
   3185		   lockres->l_level,
   3186		   lockres->l_flags,
   3187		   lockres->l_action,
   3188		   lockres->l_unlock_action,
   3189		   lockres->l_ro_holders,
   3190		   lockres->l_ex_holders,
   3191		   lockres->l_requested,
   3192		   lockres->l_blocking);
   3193
   3194	/* Dump the raw LVB */
   3195	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   3196	for(i = 0; i < DLM_LVB_LEN; i++)
   3197		seq_printf(m, "0x%x\t", lvb[i]);
   3198
   3199#ifdef CONFIG_OCFS2_FS_STATS
   3200# define lock_num_prmode(_l)		((_l)->l_lock_prmode.ls_gets)
   3201# define lock_num_exmode(_l)		((_l)->l_lock_exmode.ls_gets)
   3202# define lock_num_prmode_failed(_l)	((_l)->l_lock_prmode.ls_fail)
   3203# define lock_num_exmode_failed(_l)	((_l)->l_lock_exmode.ls_fail)
   3204# define lock_total_prmode(_l)		((_l)->l_lock_prmode.ls_total)
   3205# define lock_total_exmode(_l)		((_l)->l_lock_exmode.ls_total)
   3206# define lock_max_prmode(_l)		((_l)->l_lock_prmode.ls_max)
   3207# define lock_max_exmode(_l)		((_l)->l_lock_exmode.ls_max)
   3208# define lock_refresh(_l)		((_l)->l_lock_refresh)
   3209# define lock_last_prmode(_l)		((_l)->l_lock_prmode.ls_last)
   3210# define lock_last_exmode(_l)		((_l)->l_lock_exmode.ls_last)
   3211# define lock_wait(_l)			((_l)->l_lock_wait)
   3212#else
   3213# define lock_num_prmode(_l)		(0)
   3214# define lock_num_exmode(_l)		(0)
   3215# define lock_num_prmode_failed(_l)	(0)
   3216# define lock_num_exmode_failed(_l)	(0)
   3217# define lock_total_prmode(_l)		(0ULL)
   3218# define lock_total_exmode(_l)		(0ULL)
   3219# define lock_max_prmode(_l)		(0)
   3220# define lock_max_exmode(_l)		(0)
   3221# define lock_refresh(_l)		(0)
   3222# define lock_last_prmode(_l)		(0ULL)
   3223# define lock_last_exmode(_l)		(0ULL)
   3224# define lock_wait(_l)			(0ULL)
   3225#endif
   3226	/* The following seq_print was added in version 2 of this output */
   3227	seq_printf(m, "%u\t"
   3228		   "%u\t"
   3229		   "%u\t"
   3230		   "%u\t"
   3231		   "%llu\t"
   3232		   "%llu\t"
   3233		   "%u\t"
   3234		   "%u\t"
   3235		   "%u\t"
   3236		   "%llu\t"
   3237		   "%llu\t"
   3238		   "%llu\t",
   3239		   lock_num_prmode(lockres),
   3240		   lock_num_exmode(lockres),
   3241		   lock_num_prmode_failed(lockres),
   3242		   lock_num_exmode_failed(lockres),
   3243		   lock_total_prmode(lockres),
   3244		   lock_total_exmode(lockres),
   3245		   lock_max_prmode(lockres),
   3246		   lock_max_exmode(lockres),
   3247		   lock_refresh(lockres),
   3248		   lock_last_prmode(lockres),
   3249		   lock_last_exmode(lockres),
   3250		   lock_wait(lockres));
   3251
   3252	/* End the line */
   3253	seq_printf(m, "\n");
   3254	return 0;
   3255}
   3256
   3257static const struct seq_operations ocfs2_dlm_seq_ops = {
   3258	.start =	ocfs2_dlm_seq_start,
   3259	.stop =		ocfs2_dlm_seq_stop,
   3260	.next =		ocfs2_dlm_seq_next,
   3261	.show =		ocfs2_dlm_seq_show,
   3262};
   3263
   3264static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
   3265{
   3266	struct seq_file *seq = file->private_data;
   3267	struct ocfs2_dlm_seq_priv *priv = seq->private;
   3268	struct ocfs2_lock_res *res = &priv->p_iter_res;
   3269
   3270	ocfs2_remove_lockres_tracking(res);
   3271	ocfs2_put_dlm_debug(priv->p_dlm_debug);
   3272	return seq_release_private(inode, file);
   3273}
   3274
   3275static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
   3276{
   3277	struct ocfs2_dlm_seq_priv *priv;
   3278	struct ocfs2_super *osb;
   3279
   3280	priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
   3281	if (!priv) {
   3282		mlog_errno(-ENOMEM);
   3283		return -ENOMEM;
   3284	}
   3285
   3286	osb = inode->i_private;
   3287	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
   3288	priv->p_dlm_debug = osb->osb_dlm_debug;
   3289	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
   3290
   3291	ocfs2_add_lockres_tracking(&priv->p_iter_res,
   3292				   priv->p_dlm_debug);
   3293
   3294	return 0;
   3295}
   3296
   3297static const struct file_operations ocfs2_dlm_debug_fops = {
   3298	.open =		ocfs2_dlm_debug_open,
   3299	.release =	ocfs2_dlm_debug_release,
   3300	.read =		seq_read,
   3301	.llseek =	seq_lseek,
   3302};
   3303
   3304static void ocfs2_dlm_init_debug(struct ocfs2_super *osb)
   3305{
   3306	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
   3307
   3308	debugfs_create_file("locking_state", S_IFREG|S_IRUSR,
   3309			    osb->osb_debug_root, osb, &ocfs2_dlm_debug_fops);
   3310
   3311	debugfs_create_u32("locking_filter", 0600, osb->osb_debug_root,
   3312			   &dlm_debug->d_filter_secs);
   3313	ocfs2_get_dlm_debug(dlm_debug);
   3314}
   3315
   3316static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
   3317{
   3318	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
   3319
   3320	if (dlm_debug)
   3321		ocfs2_put_dlm_debug(dlm_debug);
   3322}
   3323
   3324int ocfs2_dlm_init(struct ocfs2_super *osb)
   3325{
   3326	int status = 0;
   3327	struct ocfs2_cluster_connection *conn = NULL;
   3328
   3329	if (ocfs2_mount_local(osb)) {
   3330		osb->node_num = 0;
   3331		goto local;
   3332	}
   3333
   3334	ocfs2_dlm_init_debug(osb);
   3335
   3336	/* launch downconvert thread */
   3337	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
   3338			osb->uuid_str);
   3339	if (IS_ERR(osb->dc_task)) {
   3340		status = PTR_ERR(osb->dc_task);
   3341		osb->dc_task = NULL;
   3342		mlog_errno(status);
   3343		goto bail;
   3344	}
   3345
   3346	/* for now, uuid == domain */
   3347	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
   3348				       osb->osb_cluster_name,
   3349				       strlen(osb->osb_cluster_name),
   3350				       osb->uuid_str,
   3351				       strlen(osb->uuid_str),
   3352				       &lproto, ocfs2_do_node_down, osb,
   3353				       &conn);
   3354	if (status) {
   3355		mlog_errno(status);
   3356		goto bail;
   3357	}
   3358
   3359	status = ocfs2_cluster_this_node(conn, &osb->node_num);
   3360	if (status < 0) {
   3361		mlog_errno(status);
   3362		mlog(ML_ERROR,
   3363		     "could not find this host's node number\n");
   3364		ocfs2_cluster_disconnect(conn, 0);
   3365		goto bail;
   3366	}
   3367
   3368local:
   3369	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
   3370	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
   3371	ocfs2_nfs_sync_lock_init(osb);
   3372	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
   3373
   3374	osb->cconn = conn;
   3375bail:
   3376	if (status < 0) {
   3377		ocfs2_dlm_shutdown_debug(osb);
   3378		if (osb->dc_task)
   3379			kthread_stop(osb->dc_task);
   3380	}
   3381
   3382	return status;
   3383}
   3384
   3385void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
   3386			int hangup_pending)
   3387{
   3388	ocfs2_drop_osb_locks(osb);
   3389
   3390	/*
   3391	 * Now that we have dropped all locks and ocfs2_dismount_volume()
   3392	 * has disabled recovery, the DLM won't be talking to us.  It's
   3393	 * safe to tear things down before disconnecting the cluster.
   3394	 */
   3395
   3396	if (osb->dc_task) {
   3397		kthread_stop(osb->dc_task);
   3398		osb->dc_task = NULL;
   3399	}
   3400
   3401	ocfs2_lock_res_free(&osb->osb_super_lockres);
   3402	ocfs2_lock_res_free(&osb->osb_rename_lockres);
   3403	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
   3404	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
   3405
   3406	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
   3407	osb->cconn = NULL;
   3408
   3409	ocfs2_dlm_shutdown_debug(osb);
   3410}
   3411
   3412static int ocfs2_drop_lock(struct ocfs2_super *osb,
   3413			   struct ocfs2_lock_res *lockres)
   3414{
   3415	int ret;
   3416	unsigned long flags;
   3417	u32 lkm_flags = 0;
   3418
   3419	/* We didn't get anywhere near actually using this lockres. */
   3420	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
   3421		goto out;
   3422
   3423	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
   3424		lkm_flags |= DLM_LKF_VALBLK;
   3425
   3426	spin_lock_irqsave(&lockres->l_lock, flags);
   3427
   3428	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
   3429			"lockres %s, flags 0x%lx\n",
   3430			lockres->l_name, lockres->l_flags);
   3431
   3432	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
   3433		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
   3434		     "%u, unlock_action = %u\n",
   3435		     lockres->l_name, lockres->l_flags, lockres->l_action,
   3436		     lockres->l_unlock_action);
   3437
   3438		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3439
   3440		/* XXX: Today we just wait on any busy
   3441		 * locks... Perhaps we need to cancel converts in the
   3442		 * future? */
   3443		ocfs2_wait_on_busy_lock(lockres);
   3444
   3445		spin_lock_irqsave(&lockres->l_lock, flags);
   3446	}
   3447
   3448	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
   3449		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
   3450		    lockres->l_level == DLM_LOCK_EX &&
   3451		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
   3452			lockres->l_ops->set_lvb(lockres);
   3453	}
   3454
   3455	if (lockres->l_flags & OCFS2_LOCK_BUSY)
   3456		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
   3457		     lockres->l_name);
   3458	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
   3459		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
   3460
   3461	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
   3462		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3463		goto out;
   3464	}
   3465
   3466	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
   3467
   3468	/* make sure we never get here while waiting for an ast to
   3469	 * fire. */
   3470	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
   3471
   3472	/* is this necessary? */
   3473	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
   3474	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
   3475	spin_unlock_irqrestore(&lockres->l_lock, flags);
   3476
   3477	mlog(0, "lock %s\n", lockres->l_name);
   3478
   3479	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
   3480	if (ret) {
   3481		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
   3482		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
   3483		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
   3484		BUG();
   3485	}
   3486	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
   3487	     lockres->l_name);
   3488
   3489	ocfs2_wait_on_busy_lock(lockres);
   3490out:
   3491	return 0;
   3492}
   3493
   3494static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
   3495				       struct ocfs2_lock_res *lockres);
   3496
   3497/* Mark the lockres as being dropped. It will no longer be
   3498 * queued if blocking, but we still may have to wait on it
   3499 * being dequeued from the downconvert thread before we can consider
   3500 * it safe to drop.
   3501 *
   3502 * You can *not* attempt to call cluster_lock on this lockres anymore. */
   3503void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
   3504				struct ocfs2_lock_res *lockres)
   3505{
   3506	int status;
   3507	struct ocfs2_mask_waiter mw;
   3508	unsigned long flags, flags2;
   3509
   3510	ocfs2_init_mask_waiter(&mw);
   3511
   3512	spin_lock_irqsave(&lockres->l_lock, flags);
   3513	lockres->l_flags |= OCFS2_LOCK_FREEING;
   3514	if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
   3515		/*
   3516		 * We know the downconvert is queued but not in progress
   3517		 * because we are the downconvert thread and processing
   3518		 * different lock. So we can just remove the lock from the
   3519		 * queue. This is not only an optimization but also a way
   3520		 * to avoid the following deadlock:
   3521		 *   ocfs2_dentry_post_unlock()
   3522		 *     ocfs2_dentry_lock_put()
   3523		 *       ocfs2_drop_dentry_lock()
   3524		 *         iput()
   3525		 *           ocfs2_evict_inode()
   3526		 *             ocfs2_clear_inode()
   3527		 *               ocfs2_mark_lockres_freeing()
   3528		 *                 ... blocks waiting for OCFS2_LOCK_QUEUED
   3529		 *                 since we are the downconvert thread which
   3530		 *                 should clear the flag.
   3531		 */
   3532		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3533		spin_lock_irqsave(&osb->dc_task_lock, flags2);
   3534		list_del_init(&lockres->l_blocked_list);
   3535		osb->blocked_lock_count--;
   3536		spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
   3537		/*
   3538		 * Warn if we recurse into another post_unlock call.  Strictly
   3539		 * speaking it isn't a problem but we need to be careful if
   3540		 * that happens (stack overflow, deadlocks, ...) so warn if
   3541		 * ocfs2 grows a path for which this can happen.
   3542		 */
   3543		WARN_ON_ONCE(lockres->l_ops->post_unlock);
   3544		/* Since the lock is freeing we don't do much in the fn below */
   3545		ocfs2_process_blocked_lock(osb, lockres);
   3546		return;
   3547	}
   3548	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
   3549		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
   3550		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3551
   3552		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
   3553
   3554		status = ocfs2_wait_for_mask(&mw);
   3555		if (status)
   3556			mlog_errno(status);
   3557
   3558		spin_lock_irqsave(&lockres->l_lock, flags);
   3559	}
   3560	spin_unlock_irqrestore(&lockres->l_lock, flags);
   3561}
   3562
   3563void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
   3564			       struct ocfs2_lock_res *lockres)
   3565{
   3566	int ret;
   3567
   3568	ocfs2_mark_lockres_freeing(osb, lockres);
   3569	ret = ocfs2_drop_lock(osb, lockres);
   3570	if (ret)
   3571		mlog_errno(ret);
   3572}
   3573
   3574static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
   3575{
   3576	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
   3577	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
   3578	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
   3579	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
   3580}
   3581
   3582int ocfs2_drop_inode_locks(struct inode *inode)
   3583{
   3584	int status, err;
   3585
   3586	/* No need to call ocfs2_mark_lockres_freeing here -
   3587	 * ocfs2_clear_inode has done it for us. */
   3588
   3589	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
   3590			      &OCFS2_I(inode)->ip_open_lockres);
   3591	if (err < 0)
   3592		mlog_errno(err);
   3593
   3594	status = err;
   3595
   3596	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
   3597			      &OCFS2_I(inode)->ip_inode_lockres);
   3598	if (err < 0)
   3599		mlog_errno(err);
   3600	if (err < 0 && !status)
   3601		status = err;
   3602
   3603	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
   3604			      &OCFS2_I(inode)->ip_rw_lockres);
   3605	if (err < 0)
   3606		mlog_errno(err);
   3607	if (err < 0 && !status)
   3608		status = err;
   3609
   3610	return status;
   3611}
   3612
   3613static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
   3614					      int new_level)
   3615{
   3616	assert_spin_locked(&lockres->l_lock);
   3617
   3618	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
   3619
   3620	if (lockres->l_level <= new_level) {
   3621		mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
   3622		     "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
   3623		     "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
   3624		     new_level, list_empty(&lockres->l_blocked_list),
   3625		     list_empty(&lockres->l_mask_waiters), lockres->l_type,
   3626		     lockres->l_flags, lockres->l_ro_holders,
   3627		     lockres->l_ex_holders, lockres->l_action,
   3628		     lockres->l_unlock_action, lockres->l_requested,
   3629		     lockres->l_blocking, lockres->l_pending_gen);
   3630		BUG();
   3631	}
   3632
   3633	mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
   3634	     lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
   3635
   3636	lockres->l_action = OCFS2_AST_DOWNCONVERT;
   3637	lockres->l_requested = new_level;
   3638	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
   3639	return lockres_set_pending(lockres);
   3640}
   3641
   3642static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
   3643				  struct ocfs2_lock_res *lockres,
   3644				  int new_level,
   3645				  int lvb,
   3646				  unsigned int generation)
   3647{
   3648	int ret;
   3649	u32 dlm_flags = DLM_LKF_CONVERT;
   3650
   3651	mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
   3652	     lockres->l_level, new_level);
   3653
   3654	/*
   3655	 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
   3656	 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
   3657	 * we can recover correctly from node failure. Otherwise, we may get
   3658	 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
   3659	 */
   3660	if (ocfs2_userspace_stack(osb) &&
   3661	    lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
   3662		lvb = 1;
   3663
   3664	if (lvb)
   3665		dlm_flags |= DLM_LKF_VALBLK;
   3666
   3667	ret = ocfs2_dlm_lock(osb->cconn,
   3668			     new_level,
   3669			     &lockres->l_lksb,
   3670			     dlm_flags,
   3671			     lockres->l_name,
   3672			     OCFS2_LOCK_ID_MAX_LEN - 1);
   3673	lockres_clear_pending(lockres, generation, osb);
   3674	if (ret) {
   3675		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
   3676		ocfs2_recover_from_dlm_error(lockres, 1);
   3677		goto bail;
   3678	}
   3679
   3680	ret = 0;
   3681bail:
   3682	return ret;
   3683}
   3684
   3685/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
   3686static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
   3687				        struct ocfs2_lock_res *lockres)
   3688{
   3689	assert_spin_locked(&lockres->l_lock);
   3690
   3691	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
   3692		/* If we're already trying to cancel a lock conversion
   3693		 * then just drop the spinlock and allow the caller to
   3694		 * requeue this lock. */
   3695		mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
   3696		return 0;
   3697	}
   3698
   3699	/* were we in a convert when we got the bast fire? */
   3700	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
   3701	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
   3702	/* set things up for the unlockast to know to just
   3703	 * clear out the ast_action and unset busy, etc. */
   3704	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
   3705
   3706	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
   3707			"lock %s, invalid flags: 0x%lx\n",
   3708			lockres->l_name, lockres->l_flags);
   3709
   3710	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
   3711
   3712	return 1;
   3713}
   3714
   3715static int ocfs2_cancel_convert(struct ocfs2_super *osb,
   3716				struct ocfs2_lock_res *lockres)
   3717{
   3718	int ret;
   3719
   3720	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
   3721			       DLM_LKF_CANCEL);
   3722	if (ret) {
   3723		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
   3724		ocfs2_recover_from_dlm_error(lockres, 0);
   3725	}
   3726
   3727	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
   3728
   3729	return ret;
   3730}
   3731
   3732static int ocfs2_unblock_lock(struct ocfs2_super *osb,
   3733			      struct ocfs2_lock_res *lockres,
   3734			      struct ocfs2_unblock_ctl *ctl)
   3735{
   3736	unsigned long flags;
   3737	int blocking;
   3738	int new_level;
   3739	int level;
   3740	int ret = 0;
   3741	int set_lvb = 0;
   3742	unsigned int gen;
   3743
   3744	spin_lock_irqsave(&lockres->l_lock, flags);
   3745
   3746recheck:
   3747	/*
   3748	 * Is it still blocking? If not, we have no more work to do.
   3749	 */
   3750	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
   3751		BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
   3752		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3753		ret = 0;
   3754		goto leave;
   3755	}
   3756
   3757	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
   3758		/* XXX
   3759		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
   3760		 * exists entirely for one reason - another thread has set
   3761		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
   3762		 *
   3763		 * If we do ocfs2_cancel_convert() before the other thread
   3764		 * calls dlm_lock(), our cancel will do nothing.  We will
   3765		 * get no ast, and we will have no way of knowing the
   3766		 * cancel failed.  Meanwhile, the other thread will call
   3767		 * into dlm_lock() and wait...forever.
   3768		 *
   3769		 * Why forever?  Because another node has asked for the
   3770		 * lock first; that's why we're here in unblock_lock().
   3771		 *
   3772		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
   3773		 * set, we just requeue the unblock.  Only when the other
   3774		 * thread has called dlm_lock() and cleared PENDING will
   3775		 * we then cancel their request.
   3776		 *
   3777		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
   3778		 * at the same time they set OCFS2_DLM_BUSY.  They must
   3779		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
   3780		 */
   3781		if (lockres->l_flags & OCFS2_LOCK_PENDING) {
   3782			mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
   3783			     lockres->l_name);
   3784			goto leave_requeue;
   3785		}
   3786
   3787		ctl->requeue = 1;
   3788		ret = ocfs2_prepare_cancel_convert(osb, lockres);
   3789		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3790		if (ret) {
   3791			ret = ocfs2_cancel_convert(osb, lockres);
   3792			if (ret < 0)
   3793				mlog_errno(ret);
   3794		}
   3795		goto leave;
   3796	}
   3797
   3798	/*
   3799	 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
   3800	 * set when the ast is received for an upconvert just before the
   3801	 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
   3802	 * on the heels of the ast, we want to delay the downconvert just
   3803	 * enough to allow the up requestor to do its task. Because this
   3804	 * lock is in the blocked queue, the lock will be downconverted
   3805	 * as soon as the requestor is done with the lock.
   3806	 */
   3807	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
   3808		goto leave_requeue;
   3809
   3810	/*
   3811	 * How can we block and yet be at NL?  We were trying to upconvert
   3812	 * from NL and got canceled.  The code comes back here, and now
   3813	 * we notice and clear BLOCKING.
   3814	 */
   3815	if (lockres->l_level == DLM_LOCK_NL) {
   3816		BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
   3817		mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
   3818		lockres->l_blocking = DLM_LOCK_NL;
   3819		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
   3820		spin_unlock_irqrestore(&lockres->l_lock, flags);
   3821		goto leave;
   3822	}
   3823
   3824	/* if we're blocking an exclusive and we have *any* holders,
   3825	 * then requeue. */
   3826	if ((lockres->l_blocking == DLM_LOCK_EX)
   3827	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
   3828		mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
   3829		     lockres->l_name, lockres->l_ex_holders,
   3830		     lockres->l_ro_holders);
   3831		goto leave_requeue;
   3832	}
   3833
   3834	/* If it's a PR we're blocking, then only
   3835	 * requeue if we've got any EX holders */
   3836	if (lockres->l_blocking == DLM_LOCK_PR &&
   3837	    lockres->l_ex_holders) {
   3838		mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
   3839		     lockres->l_name, lockres->l_ex_holders);
   3840		goto leave_requeue;
   3841	}
   3842
   3843	/*
   3844	 * Can we get a lock in this state if the holder counts are
   3845	 * zero? The meta data unblock code used to check this.
   3846	 */
   3847	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
   3848	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
   3849		mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
   3850		     lockres->l_name);
   3851		goto leave_requeue;
   3852	}
   3853
   3854	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
   3855
   3856	if (lockres->l_ops->check_downconvert
   3857	    && !lockres->l_ops->check_downconvert(lockres, new_level)) {
   3858		mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
   3859		     lockres->l_name);
   3860		goto leave_requeue;
   3861	}
   3862
   3863	/* If we get here, then we know that there are no more
   3864	 * incompatible holders (and anyone asking for an incompatible
   3865	 * lock is blocked). We can now downconvert the lock */
   3866	if (!lockres->l_ops->downconvert_worker)
   3867		goto downconvert;
   3868
   3869	/* Some lockres types want to do a bit of work before
   3870	 * downconverting a lock. Allow that here. The worker function
   3871	 * may sleep, so we save off a copy of what we're blocking as
   3872	 * it may change while we're not holding the spin lock. */
   3873	blocking = lockres->l_blocking;
   3874	level = lockres->l_level;
   3875	spin_unlock_irqrestore(&lockres->l_lock, flags);
   3876
   3877	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
   3878
   3879	if (ctl->unblock_action == UNBLOCK_STOP_POST) {
   3880		mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
   3881		     lockres->l_name);
   3882		goto leave;
   3883	}
   3884
   3885	spin_lock_irqsave(&lockres->l_lock, flags);
   3886	if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
   3887		/* If this changed underneath us, then we can't drop
   3888		 * it just yet. */
   3889		mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
   3890		     "Recheck\n", lockres->l_name, blocking,
   3891		     lockres->l_blocking, level, lockres->l_level);
   3892		goto recheck;
   3893	}
   3894
   3895downconvert:
   3896	ctl->requeue = 0;
   3897
   3898	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
   3899		if (lockres->l_level == DLM_LOCK_EX)
   3900			set_lvb = 1;
   3901
   3902		/*
   3903		 * We only set the lvb if the lock has been fully
   3904		 * refreshed - otherwise we risk setting stale
   3905		 * data. Otherwise, there's no need to actually clear
   3906		 * out the lvb here as it's value is still valid.
   3907		 */
   3908		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
   3909			lockres->l_ops->set_lvb(lockres);
   3910	}
   3911
   3912	gen = ocfs2_prepare_downconvert(lockres, new_level);
   3913	spin_unlock_irqrestore(&lockres->l_lock, flags);
   3914	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
   3915				     gen);
   3916	/* The dlm lock convert is being cancelled in background,
   3917	 * ocfs2_cancel_convert() is asynchronous in fs/dlm,
   3918	 * requeue it, try again later.
   3919	 */
   3920	if (ret == -EBUSY) {
   3921		ctl->requeue = 1;
   3922		mlog(ML_BASTS, "lockres %s, ReQ: Downconvert busy\n",
   3923		     lockres->l_name);
   3924		ret = 0;
   3925		msleep(20);
   3926	}
   3927
   3928leave:
   3929	if (ret)
   3930		mlog_errno(ret);
   3931	return ret;
   3932
   3933leave_requeue:
   3934	spin_unlock_irqrestore(&lockres->l_lock, flags);
   3935	ctl->requeue = 1;
   3936
   3937	return 0;
   3938}
   3939
   3940static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
   3941				     int blocking)
   3942{
   3943	struct inode *inode;
   3944	struct address_space *mapping;
   3945	struct ocfs2_inode_info *oi;
   3946
   3947       	inode = ocfs2_lock_res_inode(lockres);
   3948	mapping = inode->i_mapping;
   3949
   3950	if (S_ISDIR(inode->i_mode)) {
   3951		oi = OCFS2_I(inode);
   3952		oi->ip_dir_lock_gen++;
   3953		mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
   3954		goto out_forget;
   3955	}
   3956
   3957	if (!S_ISREG(inode->i_mode))
   3958		goto out;
   3959
   3960	/*
   3961	 * We need this before the filemap_fdatawrite() so that it can
   3962	 * transfer the dirty bit from the PTE to the
   3963	 * page. Unfortunately this means that even for EX->PR
   3964	 * downconverts, we'll lose our mappings and have to build
   3965	 * them up again.
   3966	 */
   3967	unmap_mapping_range(mapping, 0, 0, 0);
   3968
   3969	if (filemap_fdatawrite(mapping)) {
   3970		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
   3971		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
   3972	}
   3973	sync_mapping_buffers(mapping);
   3974	if (blocking == DLM_LOCK_EX) {
   3975		truncate_inode_pages(mapping, 0);
   3976	} else {
   3977		/* We only need to wait on the I/O if we're not also
   3978		 * truncating pages because truncate_inode_pages waits
   3979		 * for us above. We don't truncate pages if we're
   3980		 * blocking anything < EXMODE because we want to keep
   3981		 * them around in that case. */
   3982		filemap_fdatawait(mapping);
   3983	}
   3984
   3985out_forget:
   3986	forget_all_cached_acls(inode);
   3987
   3988out:
   3989	return UNBLOCK_CONTINUE;
   3990}
   3991
   3992static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
   3993				 struct ocfs2_lock_res *lockres,
   3994				 int new_level)
   3995{
   3996	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
   3997
   3998	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
   3999	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
   4000
   4001	if (checkpointed)
   4002		return 1;
   4003
   4004	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
   4005	return 0;
   4006}
   4007
   4008static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
   4009					int new_level)
   4010{
   4011	struct inode *inode = ocfs2_lock_res_inode(lockres);
   4012
   4013	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
   4014}
   4015
   4016static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
   4017{
   4018	struct inode *inode = ocfs2_lock_res_inode(lockres);
   4019
   4020	__ocfs2_stuff_meta_lvb(inode);
   4021}
   4022
   4023/*
   4024 * Does the final reference drop on our dentry lock. Right now this
   4025 * happens in the downconvert thread, but we could choose to simplify the
   4026 * dlmglue API and push these off to the ocfs2_wq in the future.
   4027 */
   4028static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
   4029				     struct ocfs2_lock_res *lockres)
   4030{
   4031	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
   4032	ocfs2_dentry_lock_put(osb, dl);
   4033}
   4034
   4035/*
   4036 * d_delete() matching dentries before the lock downconvert.
   4037 *
   4038 * At this point, any process waiting to destroy the
   4039 * dentry_lock due to last ref count is stopped by the
   4040 * OCFS2_LOCK_QUEUED flag.
   4041 *
   4042 * We have two potential problems
   4043 *
   4044 * 1) If we do the last reference drop on our dentry_lock (via dput)
   4045 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
   4046 *    the downconvert to finish. Instead we take an elevated
   4047 *    reference and push the drop until after we've completed our
   4048 *    unblock processing.
   4049 *
   4050 * 2) There might be another process with a final reference,
   4051 *    waiting on us to finish processing. If this is the case, we
   4052 *    detect it and exit out - there's no more dentries anyway.
   4053 */
   4054static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
   4055				       int blocking)
   4056{
   4057	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
   4058	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
   4059	struct dentry *dentry;
   4060	unsigned long flags;
   4061	int extra_ref = 0;
   4062
   4063	/*
   4064	 * This node is blocking another node from getting a read
   4065	 * lock. This happens when we've renamed within a
   4066	 * directory. We've forced the other nodes to d_delete(), but
   4067	 * we never actually dropped our lock because it's still
   4068	 * valid. The downconvert code will retain a PR for this node,
   4069	 * so there's no further work to do.
   4070	 */
   4071	if (blocking == DLM_LOCK_PR)
   4072		return UNBLOCK_CONTINUE;
   4073
   4074	/*
   4075	 * Mark this inode as potentially orphaned. The code in
   4076	 * ocfs2_delete_inode() will figure out whether it actually
   4077	 * needs to be freed or not.
   4078	 */
   4079	spin_lock(&oi->ip_lock);
   4080	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
   4081	spin_unlock(&oi->ip_lock);
   4082
   4083	/*
   4084	 * Yuck. We need to make sure however that the check of
   4085	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
   4086	 * respect to a reference decrement or the setting of that
   4087	 * flag.
   4088	 */
   4089	spin_lock_irqsave(&lockres->l_lock, flags);
   4090	spin_lock(&dentry_attach_lock);
   4091	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
   4092	    && dl->dl_count) {
   4093		dl->dl_count++;
   4094		extra_ref = 1;
   4095	}
   4096	spin_unlock(&dentry_attach_lock);
   4097	spin_unlock_irqrestore(&lockres->l_lock, flags);
   4098
   4099	mlog(0, "extra_ref = %d\n", extra_ref);
   4100
   4101	/*
   4102	 * We have a process waiting on us in ocfs2_dentry_iput(),
   4103	 * which means we can't have any more outstanding
   4104	 * aliases. There's no need to do any more work.
   4105	 */
   4106	if (!extra_ref)
   4107		return UNBLOCK_CONTINUE;
   4108
   4109	spin_lock(&dentry_attach_lock);
   4110	while (1) {
   4111		dentry = ocfs2_find_local_alias(dl->dl_inode,
   4112						dl->dl_parent_blkno, 1);
   4113		if (!dentry)
   4114			break;
   4115		spin_unlock(&dentry_attach_lock);
   4116
   4117		if (S_ISDIR(dl->dl_inode->i_mode))
   4118			shrink_dcache_parent(dentry);
   4119
   4120		mlog(0, "d_delete(%pd);\n", dentry);
   4121
   4122		/*
   4123		 * The following dcache calls may do an
   4124		 * iput(). Normally we don't want that from the
   4125		 * downconverting thread, but in this case it's ok
   4126		 * because the requesting node already has an
   4127		 * exclusive lock on the inode, so it can't be queued
   4128		 * for a downconvert.
   4129		 */
   4130		d_delete(dentry);
   4131		dput(dentry);
   4132
   4133		spin_lock(&dentry_attach_lock);
   4134	}
   4135	spin_unlock(&dentry_attach_lock);
   4136
   4137	/*
   4138	 * If we are the last holder of this dentry lock, there is no
   4139	 * reason to downconvert so skip straight to the unlock.
   4140	 */
   4141	if (dl->dl_count == 1)
   4142		return UNBLOCK_STOP_POST;
   4143
   4144	return UNBLOCK_CONTINUE_POST;
   4145}
   4146
   4147static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
   4148					    int new_level)
   4149{
   4150	struct ocfs2_refcount_tree *tree =
   4151				ocfs2_lock_res_refcount_tree(lockres);
   4152
   4153	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
   4154}
   4155
   4156static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
   4157					 int blocking)
   4158{
   4159	struct ocfs2_refcount_tree *tree =
   4160				ocfs2_lock_res_refcount_tree(lockres);
   4161
   4162	ocfs2_metadata_cache_purge(&tree->rf_ci);
   4163
   4164	return UNBLOCK_CONTINUE;
   4165}
   4166
   4167static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
   4168{
   4169	struct ocfs2_qinfo_lvb *lvb;
   4170	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
   4171	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
   4172					    oinfo->dqi_gi.dqi_type);
   4173
   4174	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   4175	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
   4176	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
   4177	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
   4178	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
   4179	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
   4180	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
   4181	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
   4182}
   4183
   4184void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
   4185{
   4186	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
   4187	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
   4188	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   4189
   4190	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
   4191		ocfs2_cluster_unlock(osb, lockres, level);
   4192}
   4193
   4194static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
   4195{
   4196	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
   4197					    oinfo->dqi_gi.dqi_type);
   4198	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
   4199	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
   4200	struct buffer_head *bh = NULL;
   4201	struct ocfs2_global_disk_dqinfo *gdinfo;
   4202	int status = 0;
   4203
   4204	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
   4205	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
   4206		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
   4207		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
   4208		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
   4209		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
   4210		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
   4211		oinfo->dqi_gi.dqi_free_entry =
   4212					be32_to_cpu(lvb->lvb_free_entry);
   4213	} else {
   4214		status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
   4215						     oinfo->dqi_giblk, &bh);
   4216		if (status) {
   4217			mlog_errno(status);
   4218			goto bail;
   4219		}
   4220		gdinfo = (struct ocfs2_global_disk_dqinfo *)
   4221					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
   4222		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
   4223		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
   4224		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
   4225		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
   4226		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
   4227		oinfo->dqi_gi.dqi_free_entry =
   4228					le32_to_cpu(gdinfo->dqi_free_entry);
   4229		brelse(bh);
   4230		ocfs2_track_lock_refresh(lockres);
   4231	}
   4232
   4233bail:
   4234	return status;
   4235}
   4236
   4237/* Lock quota info, this function expects at least shared lock on the quota file
   4238 * so that we can safely refresh quota info from disk. */
   4239int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
   4240{
   4241	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
   4242	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
   4243	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   4244	int status = 0;
   4245
   4246	/* On RO devices, locking really isn't needed... */
   4247	if (ocfs2_is_hard_readonly(osb)) {
   4248		if (ex)
   4249			status = -EROFS;
   4250		goto bail;
   4251	}
   4252	if (ocfs2_mount_local(osb))
   4253		goto bail;
   4254
   4255	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
   4256	if (status < 0) {
   4257		mlog_errno(status);
   4258		goto bail;
   4259	}
   4260	if (!ocfs2_should_refresh_lock_res(lockres))
   4261		goto bail;
   4262	/* OK, we have the lock but we need to refresh the quota info */
   4263	status = ocfs2_refresh_qinfo(oinfo);
   4264	if (status)
   4265		ocfs2_qinfo_unlock(oinfo, ex);
   4266	ocfs2_complete_lock_res_refresh(lockres, status);
   4267bail:
   4268	return status;
   4269}
   4270
   4271int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
   4272{
   4273	int status;
   4274	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   4275	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
   4276	struct ocfs2_super *osb = lockres->l_priv;
   4277
   4278
   4279	if (ocfs2_is_hard_readonly(osb))
   4280		return -EROFS;
   4281
   4282	if (ocfs2_mount_local(osb))
   4283		return 0;
   4284
   4285	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
   4286	if (status < 0)
   4287		mlog_errno(status);
   4288
   4289	return status;
   4290}
   4291
   4292void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
   4293{
   4294	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
   4295	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
   4296	struct ocfs2_super *osb = lockres->l_priv;
   4297
   4298	if (!ocfs2_mount_local(osb))
   4299		ocfs2_cluster_unlock(osb, lockres, level);
   4300}
   4301
   4302static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
   4303				       struct ocfs2_lock_res *lockres)
   4304{
   4305	int status;
   4306	struct ocfs2_unblock_ctl ctl = {0, 0,};
   4307	unsigned long flags;
   4308
   4309	/* Our reference to the lockres in this function can be
   4310	 * considered valid until we remove the OCFS2_LOCK_QUEUED
   4311	 * flag. */
   4312
   4313	BUG_ON(!lockres);
   4314	BUG_ON(!lockres->l_ops);
   4315
   4316	mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
   4317
   4318	/* Detect whether a lock has been marked as going away while
   4319	 * the downconvert thread was processing other things. A lock can
   4320	 * still be marked with OCFS2_LOCK_FREEING after this check,
   4321	 * but short circuiting here will still save us some
   4322	 * performance. */
   4323	spin_lock_irqsave(&lockres->l_lock, flags);
   4324	if (lockres->l_flags & OCFS2_LOCK_FREEING)
   4325		goto unqueue;
   4326	spin_unlock_irqrestore(&lockres->l_lock, flags);
   4327
   4328	status = ocfs2_unblock_lock(osb, lockres, &ctl);
   4329	if (status < 0)
   4330		mlog_errno(status);
   4331
   4332	spin_lock_irqsave(&lockres->l_lock, flags);
   4333unqueue:
   4334	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
   4335		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
   4336	} else
   4337		ocfs2_schedule_blocked_lock(osb, lockres);
   4338
   4339	mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
   4340	     ctl.requeue ? "yes" : "no");
   4341	spin_unlock_irqrestore(&lockres->l_lock, flags);
   4342
   4343	if (ctl.unblock_action != UNBLOCK_CONTINUE
   4344	    && lockres->l_ops->post_unlock)
   4345		lockres->l_ops->post_unlock(osb, lockres);
   4346}
   4347
   4348static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
   4349					struct ocfs2_lock_res *lockres)
   4350{
   4351	unsigned long flags;
   4352
   4353	assert_spin_locked(&lockres->l_lock);
   4354
   4355	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
   4356		/* Do not schedule a lock for downconvert when it's on
   4357		 * the way to destruction - any nodes wanting access
   4358		 * to the resource will get it soon. */
   4359		mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
   4360		     lockres->l_name, lockres->l_flags);
   4361		return;
   4362	}
   4363
   4364	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
   4365
   4366	spin_lock_irqsave(&osb->dc_task_lock, flags);
   4367	if (list_empty(&lockres->l_blocked_list)) {
   4368		list_add_tail(&lockres->l_blocked_list,
   4369			      &osb->blocked_lock_list);
   4370		osb->blocked_lock_count++;
   4371	}
   4372	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4373}
   4374
   4375static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
   4376{
   4377	unsigned long processed;
   4378	unsigned long flags;
   4379	struct ocfs2_lock_res *lockres;
   4380
   4381	spin_lock_irqsave(&osb->dc_task_lock, flags);
   4382	/* grab this early so we know to try again if a state change and
   4383	 * wake happens part-way through our work  */
   4384	osb->dc_work_sequence = osb->dc_wake_sequence;
   4385
   4386	processed = osb->blocked_lock_count;
   4387	/*
   4388	 * blocked lock processing in this loop might call iput which can
   4389	 * remove items off osb->blocked_lock_list. Downconvert up to
   4390	 * 'processed' number of locks, but stop short if we had some
   4391	 * removed in ocfs2_mark_lockres_freeing when downconverting.
   4392	 */
   4393	while (processed && !list_empty(&osb->blocked_lock_list)) {
   4394		lockres = list_entry(osb->blocked_lock_list.next,
   4395				     struct ocfs2_lock_res, l_blocked_list);
   4396		list_del_init(&lockres->l_blocked_list);
   4397		osb->blocked_lock_count--;
   4398		spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4399
   4400		BUG_ON(!processed);
   4401		processed--;
   4402
   4403		ocfs2_process_blocked_lock(osb, lockres);
   4404
   4405		spin_lock_irqsave(&osb->dc_task_lock, flags);
   4406	}
   4407	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4408}
   4409
   4410static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
   4411{
   4412	int empty = 0;
   4413	unsigned long flags;
   4414
   4415	spin_lock_irqsave(&osb->dc_task_lock, flags);
   4416	if (list_empty(&osb->blocked_lock_list))
   4417		empty = 1;
   4418
   4419	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4420	return empty;
   4421}
   4422
   4423static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
   4424{
   4425	int should_wake = 0;
   4426	unsigned long flags;
   4427
   4428	spin_lock_irqsave(&osb->dc_task_lock, flags);
   4429	if (osb->dc_work_sequence != osb->dc_wake_sequence)
   4430		should_wake = 1;
   4431	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4432
   4433	return should_wake;
   4434}
   4435
   4436static int ocfs2_downconvert_thread(void *arg)
   4437{
   4438	struct ocfs2_super *osb = arg;
   4439
   4440	/* only quit once we've been asked to stop and there is no more
   4441	 * work available */
   4442	while (!(kthread_should_stop() &&
   4443		ocfs2_downconvert_thread_lists_empty(osb))) {
   4444
   4445		wait_event_interruptible(osb->dc_event,
   4446					 ocfs2_downconvert_thread_should_wake(osb) ||
   4447					 kthread_should_stop());
   4448
   4449		mlog(0, "downconvert_thread: awoken\n");
   4450
   4451		ocfs2_downconvert_thread_do_work(osb);
   4452	}
   4453
   4454	osb->dc_task = NULL;
   4455	return 0;
   4456}
   4457
   4458void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
   4459{
   4460	unsigned long flags;
   4461
   4462	spin_lock_irqsave(&osb->dc_task_lock, flags);
   4463	/* make sure the voting thread gets a swipe at whatever changes
   4464	 * the caller may have made to the voting state */
   4465	osb->dc_wake_sequence++;
   4466	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
   4467	wake_up(&osb->dc_event);
   4468}