cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

dir.c (53045B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/spinlock.h>
      5#include <linux/namei.h>
      6#include <linux/slab.h>
      7#include <linux/sched.h>
      8#include <linux/xattr.h>
      9
     10#include "super.h"
     11#include "mds_client.h"
     12
     13/*
     14 * Directory operations: readdir, lookup, create, link, unlink,
     15 * rename, etc.
     16 */
     17
     18/*
     19 * Ceph MDS operations are specified in terms of a base ino and
     20 * relative path.  Thus, the client can specify an operation on a
     21 * specific inode (e.g., a getattr due to fstat(2)), or as a path
     22 * relative to, say, the root directory.
     23 *
     24 * Normally, we limit ourselves to strict inode ops (no path component)
     25 * or dentry operations (a single path component relative to an ino).  The
     26 * exception to this is open_root_dentry(), which will open the mount
     27 * point by name.
     28 */
     29
     30const struct dentry_operations ceph_dentry_ops;
     31
     32static bool __dentry_lease_is_valid(struct ceph_dentry_info *di);
     33static int __dir_lease_try_check(const struct dentry *dentry);
     34
     35/*
     36 * Initialize ceph dentry state.
     37 */
     38static int ceph_d_init(struct dentry *dentry)
     39{
     40	struct ceph_dentry_info *di;
     41	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb);
     42
     43	di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
     44	if (!di)
     45		return -ENOMEM;          /* oh well */
     46
     47	di->dentry = dentry;
     48	di->lease_session = NULL;
     49	di->time = jiffies;
     50	dentry->d_fsdata = di;
     51	INIT_LIST_HEAD(&di->lease_list);
     52
     53	atomic64_inc(&mdsc->metric.total_dentries);
     54
     55	return 0;
     56}
     57
     58/*
     59 * for f_pos for readdir:
     60 * - hash order:
     61 *	(0xff << 52) | ((24 bits hash) << 28) |
     62 *	(the nth entry has hash collision);
     63 * - frag+name order;
     64 *	((frag value) << 28) | (the nth entry in frag);
     65 */
     66#define OFFSET_BITS	28
     67#define OFFSET_MASK	((1 << OFFSET_BITS) - 1)
     68#define HASH_ORDER	(0xffull << (OFFSET_BITS + 24))
     69loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
     70{
     71	loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
     72	if (hash_order)
     73		fpos |= HASH_ORDER;
     74	return fpos;
     75}
     76
     77static bool is_hash_order(loff_t p)
     78{
     79	return (p & HASH_ORDER) == HASH_ORDER;
     80}
     81
     82static unsigned fpos_frag(loff_t p)
     83{
     84	return p >> OFFSET_BITS;
     85}
     86
     87static unsigned fpos_hash(loff_t p)
     88{
     89	return ceph_frag_value(fpos_frag(p));
     90}
     91
     92static unsigned fpos_off(loff_t p)
     93{
     94	return p & OFFSET_MASK;
     95}
     96
     97static int fpos_cmp(loff_t l, loff_t r)
     98{
     99	int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
    100	if (v)
    101		return v;
    102	return (int)(fpos_off(l) - fpos_off(r));
    103}
    104
    105/*
    106 * make note of the last dentry we read, so we can
    107 * continue at the same lexicographical point,
    108 * regardless of what dir changes take place on the
    109 * server.
    110 */
    111static int note_last_dentry(struct ceph_dir_file_info *dfi, const char *name,
    112		            int len, unsigned next_offset)
    113{
    114	char *buf = kmalloc(len+1, GFP_KERNEL);
    115	if (!buf)
    116		return -ENOMEM;
    117	kfree(dfi->last_name);
    118	dfi->last_name = buf;
    119	memcpy(dfi->last_name, name, len);
    120	dfi->last_name[len] = 0;
    121	dfi->next_offset = next_offset;
    122	dout("note_last_dentry '%s'\n", dfi->last_name);
    123	return 0;
    124}
    125
    126
    127static struct dentry *
    128__dcache_find_get_entry(struct dentry *parent, u64 idx,
    129			struct ceph_readdir_cache_control *cache_ctl)
    130{
    131	struct inode *dir = d_inode(parent);
    132	struct dentry *dentry;
    133	unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
    134	loff_t ptr_pos = idx * sizeof(struct dentry *);
    135	pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
    136
    137	if (ptr_pos >= i_size_read(dir))
    138		return NULL;
    139
    140	if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
    141		ceph_readdir_cache_release(cache_ctl);
    142		cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
    143		if (!cache_ctl->page) {
    144			dout(" page %lu not found\n", ptr_pgoff);
    145			return ERR_PTR(-EAGAIN);
    146		}
    147		/* reading/filling the cache are serialized by
    148		   i_rwsem, no need to use page lock */
    149		unlock_page(cache_ctl->page);
    150		cache_ctl->dentries = kmap(cache_ctl->page);
    151	}
    152
    153	cache_ctl->index = idx & idx_mask;
    154
    155	rcu_read_lock();
    156	spin_lock(&parent->d_lock);
    157	/* check i_size again here, because empty directory can be
    158	 * marked as complete while not holding the i_rwsem. */
    159	if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
    160		dentry = cache_ctl->dentries[cache_ctl->index];
    161	else
    162		dentry = NULL;
    163	spin_unlock(&parent->d_lock);
    164	if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
    165		dentry = NULL;
    166	rcu_read_unlock();
    167	return dentry ? : ERR_PTR(-EAGAIN);
    168}
    169
    170/*
    171 * When possible, we try to satisfy a readdir by peeking at the
    172 * dcache.  We make this work by carefully ordering dentries on
    173 * d_child when we initially get results back from the MDS, and
    174 * falling back to a "normal" sync readdir if any dentries in the dir
    175 * are dropped.
    176 *
    177 * Complete dir indicates that we have all dentries in the dir.  It is
    178 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
    179 * the MDS if/when the directory is modified).
    180 */
    181static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
    182			    int shared_gen)
    183{
    184	struct ceph_dir_file_info *dfi = file->private_data;
    185	struct dentry *parent = file->f_path.dentry;
    186	struct inode *dir = d_inode(parent);
    187	struct dentry *dentry, *last = NULL;
    188	struct ceph_dentry_info *di;
    189	struct ceph_readdir_cache_control cache_ctl = {};
    190	u64 idx = 0;
    191	int err = 0;
    192
    193	dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos);
    194
    195	/* search start position */
    196	if (ctx->pos > 2) {
    197		u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
    198		while (count > 0) {
    199			u64 step = count >> 1;
    200			dentry = __dcache_find_get_entry(parent, idx + step,
    201							 &cache_ctl);
    202			if (!dentry) {
    203				/* use linar search */
    204				idx = 0;
    205				break;
    206			}
    207			if (IS_ERR(dentry)) {
    208				err = PTR_ERR(dentry);
    209				goto out;
    210			}
    211			di = ceph_dentry(dentry);
    212			spin_lock(&dentry->d_lock);
    213			if (fpos_cmp(di->offset, ctx->pos) < 0) {
    214				idx += step + 1;
    215				count -= step + 1;
    216			} else {
    217				count = step;
    218			}
    219			spin_unlock(&dentry->d_lock);
    220			dput(dentry);
    221		}
    222
    223		dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
    224	}
    225
    226
    227	for (;;) {
    228		bool emit_dentry = false;
    229		dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
    230		if (!dentry) {
    231			dfi->file_info.flags |= CEPH_F_ATEND;
    232			err = 0;
    233			break;
    234		}
    235		if (IS_ERR(dentry)) {
    236			err = PTR_ERR(dentry);
    237			goto out;
    238		}
    239
    240		spin_lock(&dentry->d_lock);
    241		di = ceph_dentry(dentry);
    242		if (d_unhashed(dentry) ||
    243		    d_really_is_negative(dentry) ||
    244		    di->lease_shared_gen != shared_gen) {
    245			spin_unlock(&dentry->d_lock);
    246			dput(dentry);
    247			err = -EAGAIN;
    248			goto out;
    249		}
    250		if (fpos_cmp(ctx->pos, di->offset) <= 0) {
    251			__ceph_dentry_dir_lease_touch(di);
    252			emit_dentry = true;
    253		}
    254		spin_unlock(&dentry->d_lock);
    255
    256		if (emit_dentry) {
    257			dout(" %llx dentry %p %pd %p\n", di->offset,
    258			     dentry, dentry, d_inode(dentry));
    259			ctx->pos = di->offset;
    260			if (!dir_emit(ctx, dentry->d_name.name,
    261				      dentry->d_name.len, ceph_present_inode(d_inode(dentry)),
    262				      d_inode(dentry)->i_mode >> 12)) {
    263				dput(dentry);
    264				err = 0;
    265				break;
    266			}
    267			ctx->pos++;
    268
    269			if (last)
    270				dput(last);
    271			last = dentry;
    272		} else {
    273			dput(dentry);
    274		}
    275	}
    276out:
    277	ceph_readdir_cache_release(&cache_ctl);
    278	if (last) {
    279		int ret;
    280		di = ceph_dentry(last);
    281		ret = note_last_dentry(dfi, last->d_name.name, last->d_name.len,
    282				       fpos_off(di->offset) + 1);
    283		if (ret < 0)
    284			err = ret;
    285		dput(last);
    286		/* last_name no longer match cache index */
    287		if (dfi->readdir_cache_idx >= 0) {
    288			dfi->readdir_cache_idx = -1;
    289			dfi->dir_release_count = 0;
    290		}
    291	}
    292	return err;
    293}
    294
    295static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos)
    296{
    297	if (!dfi->last_readdir)
    298		return true;
    299	if (is_hash_order(pos))
    300		return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos));
    301	else
    302		return dfi->frag != fpos_frag(pos);
    303}
    304
    305static int ceph_readdir(struct file *file, struct dir_context *ctx)
    306{
    307	struct ceph_dir_file_info *dfi = file->private_data;
    308	struct inode *inode = file_inode(file);
    309	struct ceph_inode_info *ci = ceph_inode(inode);
    310	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
    311	struct ceph_mds_client *mdsc = fsc->mdsc;
    312	int i;
    313	int err;
    314	unsigned frag = -1;
    315	struct ceph_mds_reply_info_parsed *rinfo;
    316
    317	dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
    318	if (dfi->file_info.flags & CEPH_F_ATEND)
    319		return 0;
    320
    321	/* always start with . and .. */
    322	if (ctx->pos == 0) {
    323		dout("readdir off 0 -> '.'\n");
    324		if (!dir_emit(ctx, ".", 1, ceph_present_inode(inode),
    325			    inode->i_mode >> 12))
    326			return 0;
    327		ctx->pos = 1;
    328	}
    329	if (ctx->pos == 1) {
    330		u64 ino;
    331		struct dentry *dentry = file->f_path.dentry;
    332
    333		spin_lock(&dentry->d_lock);
    334		ino = ceph_present_inode(dentry->d_parent->d_inode);
    335		spin_unlock(&dentry->d_lock);
    336
    337		dout("readdir off 1 -> '..'\n");
    338		if (!dir_emit(ctx, "..", 2, ino, inode->i_mode >> 12))
    339			return 0;
    340		ctx->pos = 2;
    341	}
    342
    343	spin_lock(&ci->i_ceph_lock);
    344	/* request Fx cap. if have Fx, we don't need to release Fs cap
    345	 * for later create/unlink. */
    346	__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
    347	/* can we use the dcache? */
    348	if (ceph_test_mount_opt(fsc, DCACHE) &&
    349	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
    350	    ceph_snap(inode) != CEPH_SNAPDIR &&
    351	    __ceph_dir_is_complete_ordered(ci) &&
    352	    __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
    353		int shared_gen = atomic_read(&ci->i_shared_gen);
    354
    355		spin_unlock(&ci->i_ceph_lock);
    356		err = __dcache_readdir(file, ctx, shared_gen);
    357		if (err != -EAGAIN)
    358			return err;
    359	} else {
    360		spin_unlock(&ci->i_ceph_lock);
    361	}
    362
    363	/* proceed with a normal readdir */
    364more:
    365	/* do we have the correct frag content buffered? */
    366	if (need_send_readdir(dfi, ctx->pos)) {
    367		struct ceph_mds_request *req;
    368		int op = ceph_snap(inode) == CEPH_SNAPDIR ?
    369			CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
    370
    371		/* discard old result, if any */
    372		if (dfi->last_readdir) {
    373			ceph_mdsc_put_request(dfi->last_readdir);
    374			dfi->last_readdir = NULL;
    375		}
    376
    377		if (is_hash_order(ctx->pos)) {
    378			/* fragtree isn't always accurate. choose frag
    379			 * based on previous reply when possible. */
    380			if (frag == (unsigned)-1)
    381				frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
    382							NULL, NULL);
    383		} else {
    384			frag = fpos_frag(ctx->pos);
    385		}
    386
    387		dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
    388		     ceph_vinop(inode), frag, dfi->last_name);
    389		req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
    390		if (IS_ERR(req))
    391			return PTR_ERR(req);
    392		err = ceph_alloc_readdir_reply_buffer(req, inode);
    393		if (err) {
    394			ceph_mdsc_put_request(req);
    395			return err;
    396		}
    397		/* hints to request -> mds selection code */
    398		req->r_direct_mode = USE_AUTH_MDS;
    399		if (op == CEPH_MDS_OP_READDIR) {
    400			req->r_direct_hash = ceph_frag_value(frag);
    401			__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
    402			req->r_inode_drop = CEPH_CAP_FILE_EXCL;
    403		}
    404		if (dfi->last_name) {
    405			req->r_path2 = kstrdup(dfi->last_name, GFP_KERNEL);
    406			if (!req->r_path2) {
    407				ceph_mdsc_put_request(req);
    408				return -ENOMEM;
    409			}
    410		} else if (is_hash_order(ctx->pos)) {
    411			req->r_args.readdir.offset_hash =
    412				cpu_to_le32(fpos_hash(ctx->pos));
    413		}
    414
    415		req->r_dir_release_cnt = dfi->dir_release_count;
    416		req->r_dir_ordered_cnt = dfi->dir_ordered_count;
    417		req->r_readdir_cache_idx = dfi->readdir_cache_idx;
    418		req->r_readdir_offset = dfi->next_offset;
    419		req->r_args.readdir.frag = cpu_to_le32(frag);
    420		req->r_args.readdir.flags =
    421				cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
    422
    423		req->r_inode = inode;
    424		ihold(inode);
    425		req->r_dentry = dget(file->f_path.dentry);
    426		err = ceph_mdsc_do_request(mdsc, NULL, req);
    427		if (err < 0) {
    428			ceph_mdsc_put_request(req);
    429			return err;
    430		}
    431		dout("readdir got and parsed readdir result=%d on "
    432		     "frag %x, end=%d, complete=%d, hash_order=%d\n",
    433		     err, frag,
    434		     (int)req->r_reply_info.dir_end,
    435		     (int)req->r_reply_info.dir_complete,
    436		     (int)req->r_reply_info.hash_order);
    437
    438		rinfo = &req->r_reply_info;
    439		if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
    440			frag = le32_to_cpu(rinfo->dir_dir->frag);
    441			if (!rinfo->hash_order) {
    442				dfi->next_offset = req->r_readdir_offset;
    443				/* adjust ctx->pos to beginning of frag */
    444				ctx->pos = ceph_make_fpos(frag,
    445							  dfi->next_offset,
    446							  false);
    447			}
    448		}
    449
    450		dfi->frag = frag;
    451		dfi->last_readdir = req;
    452
    453		if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) {
    454			dfi->readdir_cache_idx = req->r_readdir_cache_idx;
    455			if (dfi->readdir_cache_idx < 0) {
    456				/* preclude from marking dir ordered */
    457				dfi->dir_ordered_count = 0;
    458			} else if (ceph_frag_is_leftmost(frag) &&
    459				   dfi->next_offset == 2) {
    460				/* note dir version at start of readdir so
    461				 * we can tell if any dentries get dropped */
    462				dfi->dir_release_count = req->r_dir_release_cnt;
    463				dfi->dir_ordered_count = req->r_dir_ordered_cnt;
    464			}
    465		} else {
    466			dout("readdir !did_prepopulate\n");
    467			/* disable readdir cache */
    468			dfi->readdir_cache_idx = -1;
    469			/* preclude from marking dir complete */
    470			dfi->dir_release_count = 0;
    471		}
    472
    473		/* note next offset and last dentry name */
    474		if (rinfo->dir_nr > 0) {
    475			struct ceph_mds_reply_dir_entry *rde =
    476					rinfo->dir_entries + (rinfo->dir_nr-1);
    477			unsigned next_offset = req->r_reply_info.dir_end ?
    478					2 : (fpos_off(rde->offset) + 1);
    479			err = note_last_dentry(dfi, rde->name, rde->name_len,
    480					       next_offset);
    481			if (err) {
    482				ceph_mdsc_put_request(dfi->last_readdir);
    483				dfi->last_readdir = NULL;
    484				return err;
    485			}
    486		} else if (req->r_reply_info.dir_end) {
    487			dfi->next_offset = 2;
    488			/* keep last name */
    489		}
    490	}
    491
    492	rinfo = &dfi->last_readdir->r_reply_info;
    493	dout("readdir frag %x num %d pos %llx chunk first %llx\n",
    494	     dfi->frag, rinfo->dir_nr, ctx->pos,
    495	     rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
    496
    497	i = 0;
    498	/* search start position */
    499	if (rinfo->dir_nr > 0) {
    500		int step, nr = rinfo->dir_nr;
    501		while (nr > 0) {
    502			step = nr >> 1;
    503			if (rinfo->dir_entries[i + step].offset < ctx->pos) {
    504				i +=  step + 1;
    505				nr -= step + 1;
    506			} else {
    507				nr = step;
    508			}
    509		}
    510	}
    511	for (; i < rinfo->dir_nr; i++) {
    512		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
    513
    514		BUG_ON(rde->offset < ctx->pos);
    515
    516		ctx->pos = rde->offset;
    517		dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
    518		     i, rinfo->dir_nr, ctx->pos,
    519		     rde->name_len, rde->name, &rde->inode.in);
    520
    521		BUG_ON(!rde->inode.in);
    522
    523		if (!dir_emit(ctx, rde->name, rde->name_len,
    524			      ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
    525			      le32_to_cpu(rde->inode.in->mode) >> 12)) {
    526			/*
    527			 * NOTE: Here no need to put the 'dfi->last_readdir',
    528			 * because when dir_emit stops us it's most likely
    529			 * doesn't have enough memory, etc. So for next readdir
    530			 * it will continue.
    531			 */
    532			dout("filldir stopping us...\n");
    533			return 0;
    534		}
    535		ctx->pos++;
    536	}
    537
    538	ceph_mdsc_put_request(dfi->last_readdir);
    539	dfi->last_readdir = NULL;
    540
    541	if (dfi->next_offset > 2) {
    542		frag = dfi->frag;
    543		goto more;
    544	}
    545
    546	/* more frags? */
    547	if (!ceph_frag_is_rightmost(dfi->frag)) {
    548		frag = ceph_frag_next(dfi->frag);
    549		if (is_hash_order(ctx->pos)) {
    550			loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
    551							dfi->next_offset, true);
    552			if (new_pos > ctx->pos)
    553				ctx->pos = new_pos;
    554			/* keep last_name */
    555		} else {
    556			ctx->pos = ceph_make_fpos(frag, dfi->next_offset,
    557							false);
    558			kfree(dfi->last_name);
    559			dfi->last_name = NULL;
    560		}
    561		dout("readdir next frag is %x\n", frag);
    562		goto more;
    563	}
    564	dfi->file_info.flags |= CEPH_F_ATEND;
    565
    566	/*
    567	 * if dir_release_count still matches the dir, no dentries
    568	 * were released during the whole readdir, and we should have
    569	 * the complete dir contents in our cache.
    570	 */
    571	if (atomic64_read(&ci->i_release_count) ==
    572					dfi->dir_release_count) {
    573		spin_lock(&ci->i_ceph_lock);
    574		if (dfi->dir_ordered_count ==
    575				atomic64_read(&ci->i_ordered_count)) {
    576			dout(" marking %p complete and ordered\n", inode);
    577			/* use i_size to track number of entries in
    578			 * readdir cache */
    579			BUG_ON(dfi->readdir_cache_idx < 0);
    580			i_size_write(inode, dfi->readdir_cache_idx *
    581				     sizeof(struct dentry*));
    582		} else {
    583			dout(" marking %p complete\n", inode);
    584		}
    585		__ceph_dir_set_complete(ci, dfi->dir_release_count,
    586					dfi->dir_ordered_count);
    587		spin_unlock(&ci->i_ceph_lock);
    588	}
    589
    590	dout("readdir %p file %p done.\n", inode, file);
    591	return 0;
    592}
    593
    594static void reset_readdir(struct ceph_dir_file_info *dfi)
    595{
    596	if (dfi->last_readdir) {
    597		ceph_mdsc_put_request(dfi->last_readdir);
    598		dfi->last_readdir = NULL;
    599	}
    600	kfree(dfi->last_name);
    601	dfi->last_name = NULL;
    602	dfi->dir_release_count = 0;
    603	dfi->readdir_cache_idx = -1;
    604	dfi->next_offset = 2;  /* compensate for . and .. */
    605	dfi->file_info.flags &= ~CEPH_F_ATEND;
    606}
    607
    608/*
    609 * discard buffered readdir content on seekdir(0), or seek to new frag,
    610 * or seek prior to current chunk
    611 */
    612static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos)
    613{
    614	struct ceph_mds_reply_info_parsed *rinfo;
    615	loff_t chunk_offset;
    616	if (new_pos == 0)
    617		return true;
    618	if (is_hash_order(new_pos)) {
    619		/* no need to reset last_name for a forward seek when
    620		 * dentries are sotred in hash order */
    621	} else if (dfi->frag != fpos_frag(new_pos)) {
    622		return true;
    623	}
    624	rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL;
    625	if (!rinfo || !rinfo->dir_nr)
    626		return true;
    627	chunk_offset = rinfo->dir_entries[0].offset;
    628	return new_pos < chunk_offset ||
    629	       is_hash_order(new_pos) != is_hash_order(chunk_offset);
    630}
    631
    632static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
    633{
    634	struct ceph_dir_file_info *dfi = file->private_data;
    635	struct inode *inode = file->f_mapping->host;
    636	loff_t retval;
    637
    638	inode_lock(inode);
    639	retval = -EINVAL;
    640	switch (whence) {
    641	case SEEK_CUR:
    642		offset += file->f_pos;
    643		break;
    644	case SEEK_SET:
    645		break;
    646	case SEEK_END:
    647		retval = -EOPNOTSUPP;
    648		goto out;
    649	default:
    650		goto out;
    651	}
    652
    653	if (offset >= 0) {
    654		if (need_reset_readdir(dfi, offset)) {
    655			dout("dir_llseek dropping %p content\n", file);
    656			reset_readdir(dfi);
    657		} else if (is_hash_order(offset) && offset > file->f_pos) {
    658			/* for hash offset, we don't know if a forward seek
    659			 * is within same frag */
    660			dfi->dir_release_count = 0;
    661			dfi->readdir_cache_idx = -1;
    662		}
    663
    664		if (offset != file->f_pos) {
    665			file->f_pos = offset;
    666			file->f_version = 0;
    667			dfi->file_info.flags &= ~CEPH_F_ATEND;
    668		}
    669		retval = offset;
    670	}
    671out:
    672	inode_unlock(inode);
    673	return retval;
    674}
    675
    676/*
    677 * Handle lookups for the hidden .snap directory.
    678 */
    679struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
    680				   struct dentry *dentry)
    681{
    682	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
    683	struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */
    684
    685	/* .snap dir? */
    686	if (ceph_snap(parent) == CEPH_NOSNAP &&
    687	    strcmp(dentry->d_name.name, fsc->mount_options->snapdir_name) == 0) {
    688		struct dentry *res;
    689		struct inode *inode = ceph_get_snapdir(parent);
    690
    691		res = d_splice_alias(inode, dentry);
    692		dout("ENOENT on snapdir %p '%pd', linking to snapdir %p. Spliced dentry %p\n",
    693		     dentry, dentry, inode, res);
    694		if (res)
    695			dentry = res;
    696	}
    697	return dentry;
    698}
    699
    700/*
    701 * Figure out final result of a lookup/open request.
    702 *
    703 * Mainly, make sure we return the final req->r_dentry (if it already
    704 * existed) in place of the original VFS-provided dentry when they
    705 * differ.
    706 *
    707 * Gracefully handle the case where the MDS replies with -ENOENT and
    708 * no trace (which it may do, at its discretion, e.g., if it doesn't
    709 * care to issue a lease on the negative dentry).
    710 */
    711struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
    712				  struct dentry *dentry, int err)
    713{
    714	if (err == -ENOENT) {
    715		/* no trace? */
    716		err = 0;
    717		if (!req->r_reply_info.head->is_dentry) {
    718			dout("ENOENT and no trace, dentry %p inode %p\n",
    719			     dentry, d_inode(dentry));
    720			if (d_really_is_positive(dentry)) {
    721				d_drop(dentry);
    722				err = -ENOENT;
    723			} else {
    724				d_add(dentry, NULL);
    725			}
    726		}
    727	}
    728	if (err)
    729		dentry = ERR_PTR(err);
    730	else if (dentry != req->r_dentry)
    731		dentry = dget(req->r_dentry);   /* we got spliced */
    732	else
    733		dentry = NULL;
    734	return dentry;
    735}
    736
    737static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
    738{
    739	return ceph_ino(inode) == CEPH_INO_ROOT &&
    740		strncmp(dentry->d_name.name, ".ceph", 5) == 0;
    741}
    742
    743/*
    744 * Look up a single dir entry.  If there is a lookup intent, inform
    745 * the MDS so that it gets our 'caps wanted' value in a single op.
    746 */
    747static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
    748				  unsigned int flags)
    749{
    750	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
    751	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
    752	struct ceph_mds_request *req;
    753	int op;
    754	int mask;
    755	int err;
    756
    757	dout("lookup %p dentry %p '%pd'\n",
    758	     dir, dentry, dentry);
    759
    760	if (dentry->d_name.len > NAME_MAX)
    761		return ERR_PTR(-ENAMETOOLONG);
    762
    763	/* can we conclude ENOENT locally? */
    764	if (d_really_is_negative(dentry)) {
    765		struct ceph_inode_info *ci = ceph_inode(dir);
    766		struct ceph_dentry_info *di = ceph_dentry(dentry);
    767
    768		spin_lock(&ci->i_ceph_lock);
    769		dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags);
    770		if (strncmp(dentry->d_name.name,
    771			    fsc->mount_options->snapdir_name,
    772			    dentry->d_name.len) &&
    773		    !is_root_ceph_dentry(dir, dentry) &&
    774		    ceph_test_mount_opt(fsc, DCACHE) &&
    775		    __ceph_dir_is_complete(ci) &&
    776		    __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
    777			__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
    778			spin_unlock(&ci->i_ceph_lock);
    779			dout(" dir %p complete, -ENOENT\n", dir);
    780			d_add(dentry, NULL);
    781			di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
    782			return NULL;
    783		}
    784		spin_unlock(&ci->i_ceph_lock);
    785	}
    786
    787	op = ceph_snap(dir) == CEPH_SNAPDIR ?
    788		CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
    789	req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
    790	if (IS_ERR(req))
    791		return ERR_CAST(req);
    792	req->r_dentry = dget(dentry);
    793	req->r_num_caps = 2;
    794
    795	mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
    796	if (ceph_security_xattr_wanted(dir))
    797		mask |= CEPH_CAP_XATTR_SHARED;
    798	req->r_args.getattr.mask = cpu_to_le32(mask);
    799
    800	ihold(dir);
    801	req->r_parent = dir;
    802	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
    803	err = ceph_mdsc_do_request(mdsc, NULL, req);
    804	if (err == -ENOENT) {
    805		struct dentry *res;
    806
    807		res = ceph_handle_snapdir(req, dentry);
    808		if (IS_ERR(res)) {
    809			err = PTR_ERR(res);
    810		} else {
    811			dentry = res;
    812			err = 0;
    813		}
    814	}
    815	dentry = ceph_finish_lookup(req, dentry, err);
    816	ceph_mdsc_put_request(req);  /* will dput(dentry) */
    817	dout("lookup result=%p\n", dentry);
    818	return dentry;
    819}
    820
    821/*
    822 * If we do a create but get no trace back from the MDS, follow up with
    823 * a lookup (the VFS expects us to link up the provided dentry).
    824 */
    825int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
    826{
    827	struct dentry *result = ceph_lookup(dir, dentry, 0);
    828
    829	if (result && !IS_ERR(result)) {
    830		/*
    831		 * We created the item, then did a lookup, and found
    832		 * it was already linked to another inode we already
    833		 * had in our cache (and thus got spliced). To not
    834		 * confuse VFS (especially when inode is a directory),
    835		 * we don't link our dentry to that inode, return an
    836		 * error instead.
    837		 *
    838		 * This event should be rare and it happens only when
    839		 * we talk to old MDS. Recent MDS does not send traceless
    840		 * reply for request that creates new inode.
    841		 */
    842		d_drop(result);
    843		return -ESTALE;
    844	}
    845	return PTR_ERR(result);
    846}
    847
    848static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
    849		      struct dentry *dentry, umode_t mode, dev_t rdev)
    850{
    851	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
    852	struct ceph_mds_request *req;
    853	struct ceph_acl_sec_ctx as_ctx = {};
    854	int err;
    855
    856	if (ceph_snap(dir) != CEPH_NOSNAP)
    857		return -EROFS;
    858
    859	if (ceph_quota_is_max_files_exceeded(dir)) {
    860		err = -EDQUOT;
    861		goto out;
    862	}
    863
    864	err = ceph_pre_init_acls(dir, &mode, &as_ctx);
    865	if (err < 0)
    866		goto out;
    867	err = ceph_security_init_secctx(dentry, mode, &as_ctx);
    868	if (err < 0)
    869		goto out;
    870
    871	dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
    872	     dir, dentry, mode, rdev);
    873	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
    874	if (IS_ERR(req)) {
    875		err = PTR_ERR(req);
    876		goto out;
    877	}
    878	req->r_dentry = dget(dentry);
    879	req->r_num_caps = 2;
    880	req->r_parent = dir;
    881	ihold(dir);
    882	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
    883	req->r_args.mknod.mode = cpu_to_le32(mode);
    884	req->r_args.mknod.rdev = cpu_to_le32(rdev);
    885	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
    886	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
    887	if (as_ctx.pagelist) {
    888		req->r_pagelist = as_ctx.pagelist;
    889		as_ctx.pagelist = NULL;
    890	}
    891	err = ceph_mdsc_do_request(mdsc, dir, req);
    892	if (!err && !req->r_reply_info.head->is_dentry)
    893		err = ceph_handle_notrace_create(dir, dentry);
    894	ceph_mdsc_put_request(req);
    895out:
    896	if (!err)
    897		ceph_init_inode_acls(d_inode(dentry), &as_ctx);
    898	else
    899		d_drop(dentry);
    900	ceph_release_acl_sec_ctx(&as_ctx);
    901	return err;
    902}
    903
    904static int ceph_create(struct user_namespace *mnt_userns, struct inode *dir,
    905		       struct dentry *dentry, umode_t mode, bool excl)
    906{
    907	return ceph_mknod(mnt_userns, dir, dentry, mode, 0);
    908}
    909
    910static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
    911			struct dentry *dentry, const char *dest)
    912{
    913	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
    914	struct ceph_mds_request *req;
    915	struct ceph_acl_sec_ctx as_ctx = {};
    916	int err;
    917
    918	if (ceph_snap(dir) != CEPH_NOSNAP)
    919		return -EROFS;
    920
    921	if (ceph_quota_is_max_files_exceeded(dir)) {
    922		err = -EDQUOT;
    923		goto out;
    924	}
    925
    926	err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx);
    927	if (err < 0)
    928		goto out;
    929
    930	dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
    931	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
    932	if (IS_ERR(req)) {
    933		err = PTR_ERR(req);
    934		goto out;
    935	}
    936	req->r_path2 = kstrdup(dest, GFP_KERNEL);
    937	if (!req->r_path2) {
    938		err = -ENOMEM;
    939		ceph_mdsc_put_request(req);
    940		goto out;
    941	}
    942	req->r_parent = dir;
    943	ihold(dir);
    944
    945	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
    946	req->r_dentry = dget(dentry);
    947	req->r_num_caps = 2;
    948	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
    949	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
    950	if (as_ctx.pagelist) {
    951		req->r_pagelist = as_ctx.pagelist;
    952		as_ctx.pagelist = NULL;
    953	}
    954	err = ceph_mdsc_do_request(mdsc, dir, req);
    955	if (!err && !req->r_reply_info.head->is_dentry)
    956		err = ceph_handle_notrace_create(dir, dentry);
    957	ceph_mdsc_put_request(req);
    958out:
    959	if (err)
    960		d_drop(dentry);
    961	ceph_release_acl_sec_ctx(&as_ctx);
    962	return err;
    963}
    964
    965static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
    966		      struct dentry *dentry, umode_t mode)
    967{
    968	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
    969	struct ceph_mds_request *req;
    970	struct ceph_acl_sec_ctx as_ctx = {};
    971	int err = -EROFS;
    972	int op;
    973
    974	if (ceph_snap(dir) == CEPH_SNAPDIR) {
    975		/* mkdir .snap/foo is a MKSNAP */
    976		op = CEPH_MDS_OP_MKSNAP;
    977		dout("mksnap dir %p snap '%pd' dn %p\n", dir,
    978		     dentry, dentry);
    979	} else if (ceph_snap(dir) == CEPH_NOSNAP) {
    980		dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
    981		op = CEPH_MDS_OP_MKDIR;
    982	} else {
    983		goto out;
    984	}
    985
    986	if (op == CEPH_MDS_OP_MKDIR &&
    987	    ceph_quota_is_max_files_exceeded(dir)) {
    988		err = -EDQUOT;
    989		goto out;
    990	}
    991
    992	mode |= S_IFDIR;
    993	err = ceph_pre_init_acls(dir, &mode, &as_ctx);
    994	if (err < 0)
    995		goto out;
    996	err = ceph_security_init_secctx(dentry, mode, &as_ctx);
    997	if (err < 0)
    998		goto out;
    999
   1000	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
   1001	if (IS_ERR(req)) {
   1002		err = PTR_ERR(req);
   1003		goto out;
   1004	}
   1005
   1006	req->r_dentry = dget(dentry);
   1007	req->r_num_caps = 2;
   1008	req->r_parent = dir;
   1009	ihold(dir);
   1010	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
   1011	req->r_args.mkdir.mode = cpu_to_le32(mode);
   1012	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
   1013	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
   1014	if (as_ctx.pagelist) {
   1015		req->r_pagelist = as_ctx.pagelist;
   1016		as_ctx.pagelist = NULL;
   1017	}
   1018	err = ceph_mdsc_do_request(mdsc, dir, req);
   1019	if (!err &&
   1020	    !req->r_reply_info.head->is_target &&
   1021	    !req->r_reply_info.head->is_dentry)
   1022		err = ceph_handle_notrace_create(dir, dentry);
   1023	ceph_mdsc_put_request(req);
   1024out:
   1025	if (!err)
   1026		ceph_init_inode_acls(d_inode(dentry), &as_ctx);
   1027	else
   1028		d_drop(dentry);
   1029	ceph_release_acl_sec_ctx(&as_ctx);
   1030	return err;
   1031}
   1032
   1033static int ceph_link(struct dentry *old_dentry, struct inode *dir,
   1034		     struct dentry *dentry)
   1035{
   1036	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
   1037	struct ceph_mds_request *req;
   1038	int err;
   1039
   1040	if (ceph_snap(dir) != CEPH_NOSNAP)
   1041		return -EROFS;
   1042
   1043	dout("link in dir %p old_dentry %p dentry %p\n", dir,
   1044	     old_dentry, dentry);
   1045	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
   1046	if (IS_ERR(req)) {
   1047		d_drop(dentry);
   1048		return PTR_ERR(req);
   1049	}
   1050	req->r_dentry = dget(dentry);
   1051	req->r_num_caps = 2;
   1052	req->r_old_dentry = dget(old_dentry);
   1053	req->r_parent = dir;
   1054	ihold(dir);
   1055	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
   1056	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
   1057	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
   1058	/* release LINK_SHARED on source inode (mds will lock it) */
   1059	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
   1060	err = ceph_mdsc_do_request(mdsc, dir, req);
   1061	if (err) {
   1062		d_drop(dentry);
   1063	} else if (!req->r_reply_info.head->is_dentry) {
   1064		ihold(d_inode(old_dentry));
   1065		d_instantiate(dentry, d_inode(old_dentry));
   1066	}
   1067	ceph_mdsc_put_request(req);
   1068	return err;
   1069}
   1070
   1071static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
   1072				 struct ceph_mds_request *req)
   1073{
   1074	int result = req->r_err ? req->r_err :
   1075			le32_to_cpu(req->r_reply_info.head->result);
   1076
   1077	if (result == -EJUKEBOX)
   1078		goto out;
   1079
   1080	/* If op failed, mark everyone involved for errors */
   1081	if (result) {
   1082		int pathlen = 0;
   1083		u64 base = 0;
   1084		char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
   1085						  &base, 0);
   1086
   1087		/* mark error on parent + clear complete */
   1088		mapping_set_error(req->r_parent->i_mapping, result);
   1089		ceph_dir_clear_complete(req->r_parent);
   1090
   1091		/* drop the dentry -- we don't know its status */
   1092		if (!d_unhashed(req->r_dentry))
   1093			d_drop(req->r_dentry);
   1094
   1095		/* mark inode itself for an error (since metadata is bogus) */
   1096		mapping_set_error(req->r_old_inode->i_mapping, result);
   1097
   1098		pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
   1099			base, IS_ERR(path) ? "<<bad>>" : path, result);
   1100		ceph_mdsc_free_path(path, pathlen);
   1101	}
   1102out:
   1103	iput(req->r_old_inode);
   1104	ceph_mdsc_release_dir_caps(req);
   1105}
   1106
   1107static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
   1108{
   1109	struct ceph_inode_info *ci = ceph_inode(dir);
   1110	struct ceph_dentry_info *di;
   1111	int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
   1112
   1113	spin_lock(&ci->i_ceph_lock);
   1114	if ((__ceph_caps_issued(ci, NULL) & want) == want) {
   1115		ceph_take_cap_refs(ci, want, false);
   1116		got = want;
   1117	}
   1118	spin_unlock(&ci->i_ceph_lock);
   1119
   1120	/* If we didn't get anything, return 0 */
   1121	if (!got)
   1122		return 0;
   1123
   1124        spin_lock(&dentry->d_lock);
   1125        di = ceph_dentry(dentry);
   1126	/*
   1127	 * - We are holding Fx, which implies Fs caps.
   1128	 * - Only support async unlink for primary linkage
   1129	 */
   1130	if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
   1131	    !(di->flags & CEPH_DENTRY_PRIMARY_LINK))
   1132		want = 0;
   1133        spin_unlock(&dentry->d_lock);
   1134
   1135	/* Do we still want what we've got? */
   1136	if (want == got)
   1137		return got;
   1138
   1139	ceph_put_cap_refs(ci, got);
   1140	return 0;
   1141}
   1142
   1143/*
   1144 * rmdir and unlink are differ only by the metadata op code
   1145 */
   1146static int ceph_unlink(struct inode *dir, struct dentry *dentry)
   1147{
   1148	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
   1149	struct ceph_mds_client *mdsc = fsc->mdsc;
   1150	struct inode *inode = d_inode(dentry);
   1151	struct ceph_mds_request *req;
   1152	bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
   1153	int err = -EROFS;
   1154	int op;
   1155
   1156	if (ceph_snap(dir) == CEPH_SNAPDIR) {
   1157		/* rmdir .snap/foo is RMSNAP */
   1158		dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry);
   1159		op = CEPH_MDS_OP_RMSNAP;
   1160	} else if (ceph_snap(dir) == CEPH_NOSNAP) {
   1161		dout("unlink/rmdir dir %p dn %p inode %p\n",
   1162		     dir, dentry, inode);
   1163		op = d_is_dir(dentry) ?
   1164			CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
   1165	} else
   1166		goto out;
   1167retry:
   1168	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
   1169	if (IS_ERR(req)) {
   1170		err = PTR_ERR(req);
   1171		goto out;
   1172	}
   1173	req->r_dentry = dget(dentry);
   1174	req->r_num_caps = 2;
   1175	req->r_parent = dir;
   1176	ihold(dir);
   1177	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
   1178	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
   1179	req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
   1180
   1181	if (try_async && op == CEPH_MDS_OP_UNLINK &&
   1182	    (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
   1183		dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
   1184		     dentry->d_name.len, dentry->d_name.name,
   1185		     ceph_cap_string(req->r_dir_caps));
   1186		set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
   1187		req->r_callback = ceph_async_unlink_cb;
   1188		req->r_old_inode = d_inode(dentry);
   1189		ihold(req->r_old_inode);
   1190		err = ceph_mdsc_submit_request(mdsc, dir, req);
   1191		if (!err) {
   1192			/*
   1193			 * We have enough caps, so we assume that the unlink
   1194			 * will succeed. Fix up the target inode and dcache.
   1195			 */
   1196			drop_nlink(inode);
   1197			d_delete(dentry);
   1198		} else if (err == -EJUKEBOX) {
   1199			try_async = false;
   1200			ceph_mdsc_put_request(req);
   1201			goto retry;
   1202		}
   1203	} else {
   1204		set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
   1205		err = ceph_mdsc_do_request(mdsc, dir, req);
   1206		if (!err && !req->r_reply_info.head->is_dentry)
   1207			d_delete(dentry);
   1208	}
   1209
   1210	ceph_mdsc_put_request(req);
   1211out:
   1212	return err;
   1213}
   1214
   1215static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
   1216		       struct dentry *old_dentry, struct inode *new_dir,
   1217		       struct dentry *new_dentry, unsigned int flags)
   1218{
   1219	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(old_dir->i_sb);
   1220	struct ceph_mds_request *req;
   1221	int op = CEPH_MDS_OP_RENAME;
   1222	int err;
   1223
   1224	if (flags)
   1225		return -EINVAL;
   1226
   1227	if (ceph_snap(old_dir) != ceph_snap(new_dir))
   1228		return -EXDEV;
   1229	if (ceph_snap(old_dir) != CEPH_NOSNAP) {
   1230		if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
   1231			op = CEPH_MDS_OP_RENAMESNAP;
   1232		else
   1233			return -EROFS;
   1234	}
   1235	/* don't allow cross-quota renames */
   1236	if ((old_dir != new_dir) &&
   1237	    (!ceph_quota_is_same_realm(old_dir, new_dir)))
   1238		return -EXDEV;
   1239
   1240	dout("rename dir %p dentry %p to dir %p dentry %p\n",
   1241	     old_dir, old_dentry, new_dir, new_dentry);
   1242	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
   1243	if (IS_ERR(req))
   1244		return PTR_ERR(req);
   1245	ihold(old_dir);
   1246	req->r_dentry = dget(new_dentry);
   1247	req->r_num_caps = 2;
   1248	req->r_old_dentry = dget(old_dentry);
   1249	req->r_old_dentry_dir = old_dir;
   1250	req->r_parent = new_dir;
   1251	ihold(new_dir);
   1252	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
   1253	req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
   1254	req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
   1255	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
   1256	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
   1257	/* release LINK_RDCACHE on source inode (mds will lock it) */
   1258	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
   1259	if (d_really_is_positive(new_dentry)) {
   1260		req->r_inode_drop =
   1261			ceph_drop_caps_for_unlink(d_inode(new_dentry));
   1262	}
   1263	err = ceph_mdsc_do_request(mdsc, old_dir, req);
   1264	if (!err && !req->r_reply_info.head->is_dentry) {
   1265		/*
   1266		 * Normally d_move() is done by fill_trace (called by
   1267		 * do_request, above).  If there is no trace, we need
   1268		 * to do it here.
   1269		 */
   1270		d_move(old_dentry, new_dentry);
   1271	}
   1272	ceph_mdsc_put_request(req);
   1273	return err;
   1274}
   1275
   1276/*
   1277 * Move dentry to tail of mdsc->dentry_leases list when lease is updated.
   1278 * Leases at front of the list will expire first. (Assume all leases have
   1279 * similar duration)
   1280 *
   1281 * Called under dentry->d_lock.
   1282 */
   1283void __ceph_dentry_lease_touch(struct ceph_dentry_info *di)
   1284{
   1285	struct dentry *dn = di->dentry;
   1286	struct ceph_mds_client *mdsc;
   1287
   1288	dout("dentry_lease_touch %p %p '%pd'\n", di, dn, dn);
   1289
   1290	di->flags |= CEPH_DENTRY_LEASE_LIST;
   1291	if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
   1292		di->flags |= CEPH_DENTRY_REFERENCED;
   1293		return;
   1294	}
   1295
   1296	mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
   1297	spin_lock(&mdsc->dentry_list_lock);
   1298	list_move_tail(&di->lease_list, &mdsc->dentry_leases);
   1299	spin_unlock(&mdsc->dentry_list_lock);
   1300}
   1301
   1302static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc,
   1303				     struct ceph_dentry_info *di)
   1304{
   1305	di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED);
   1306	di->lease_gen = 0;
   1307	di->time = jiffies;
   1308	list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases);
   1309}
   1310
   1311/*
   1312 * When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases
   1313 * list if it's not in the list, otherwise set 'referenced' flag.
   1314 *
   1315 * Called under dentry->d_lock.
   1316 */
   1317void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di)
   1318{
   1319	struct dentry *dn = di->dentry;
   1320	struct ceph_mds_client *mdsc;
   1321
   1322	dout("dentry_dir_lease_touch %p %p '%pd' (offset 0x%llx)\n",
   1323	     di, dn, dn, di->offset);
   1324
   1325	if (!list_empty(&di->lease_list)) {
   1326		if (di->flags & CEPH_DENTRY_LEASE_LIST) {
   1327			/* don't remove dentry from dentry lease list
   1328			 * if its lease is valid */
   1329			if (__dentry_lease_is_valid(di))
   1330				return;
   1331		} else {
   1332			di->flags |= CEPH_DENTRY_REFERENCED;
   1333			return;
   1334		}
   1335	}
   1336
   1337	if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
   1338		di->flags |= CEPH_DENTRY_REFERENCED;
   1339		di->flags &= ~CEPH_DENTRY_LEASE_LIST;
   1340		return;
   1341	}
   1342
   1343	mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
   1344	spin_lock(&mdsc->dentry_list_lock);
   1345	__dentry_dir_lease_touch(mdsc, di),
   1346	spin_unlock(&mdsc->dentry_list_lock);
   1347}
   1348
   1349static void __dentry_lease_unlist(struct ceph_dentry_info *di)
   1350{
   1351	struct ceph_mds_client *mdsc;
   1352	if (di->flags & CEPH_DENTRY_SHRINK_LIST)
   1353		return;
   1354	if (list_empty(&di->lease_list))
   1355		return;
   1356
   1357	mdsc = ceph_sb_to_client(di->dentry->d_sb)->mdsc;
   1358	spin_lock(&mdsc->dentry_list_lock);
   1359	list_del_init(&di->lease_list);
   1360	spin_unlock(&mdsc->dentry_list_lock);
   1361}
   1362
   1363enum {
   1364	KEEP	= 0,
   1365	DELETE	= 1,
   1366	TOUCH	= 2,
   1367	STOP	= 4,
   1368};
   1369
   1370struct ceph_lease_walk_control {
   1371	bool dir_lease;
   1372	bool expire_dir_lease;
   1373	unsigned long nr_to_scan;
   1374	unsigned long dir_lease_ttl;
   1375};
   1376
   1377static unsigned long
   1378__dentry_leases_walk(struct ceph_mds_client *mdsc,
   1379		     struct ceph_lease_walk_control *lwc,
   1380		     int (*check)(struct dentry*, void*))
   1381{
   1382	struct ceph_dentry_info *di, *tmp;
   1383	struct dentry *dentry, *last = NULL;
   1384	struct list_head* list;
   1385        LIST_HEAD(dispose);
   1386	unsigned long freed = 0;
   1387	int ret = 0;
   1388
   1389	list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases;
   1390	spin_lock(&mdsc->dentry_list_lock);
   1391	list_for_each_entry_safe(di, tmp, list, lease_list) {
   1392		if (!lwc->nr_to_scan)
   1393			break;
   1394		--lwc->nr_to_scan;
   1395
   1396		dentry = di->dentry;
   1397		if (last == dentry)
   1398			break;
   1399
   1400		if (!spin_trylock(&dentry->d_lock))
   1401			continue;
   1402
   1403		if (__lockref_is_dead(&dentry->d_lockref)) {
   1404			list_del_init(&di->lease_list);
   1405			goto next;
   1406		}
   1407
   1408		ret = check(dentry, lwc);
   1409		if (ret & TOUCH) {
   1410			/* move it into tail of dir lease list */
   1411			__dentry_dir_lease_touch(mdsc, di);
   1412			if (!last)
   1413				last = dentry;
   1414		}
   1415		if (ret & DELETE) {
   1416			/* stale lease */
   1417			di->flags &= ~CEPH_DENTRY_REFERENCED;
   1418			if (dentry->d_lockref.count > 0) {
   1419				/* update_dentry_lease() will re-add
   1420				 * it to lease list, or
   1421				 * ceph_d_delete() will return 1 when
   1422				 * last reference is dropped */
   1423				list_del_init(&di->lease_list);
   1424			} else {
   1425				di->flags |= CEPH_DENTRY_SHRINK_LIST;
   1426				list_move_tail(&di->lease_list, &dispose);
   1427				dget_dlock(dentry);
   1428			}
   1429		}
   1430next:
   1431		spin_unlock(&dentry->d_lock);
   1432		if (ret & STOP)
   1433			break;
   1434	}
   1435	spin_unlock(&mdsc->dentry_list_lock);
   1436
   1437	while (!list_empty(&dispose)) {
   1438		di = list_first_entry(&dispose, struct ceph_dentry_info,
   1439				      lease_list);
   1440		dentry = di->dentry;
   1441		spin_lock(&dentry->d_lock);
   1442
   1443		list_del_init(&di->lease_list);
   1444		di->flags &= ~CEPH_DENTRY_SHRINK_LIST;
   1445		if (di->flags & CEPH_DENTRY_REFERENCED) {
   1446			spin_lock(&mdsc->dentry_list_lock);
   1447			if (di->flags & CEPH_DENTRY_LEASE_LIST) {
   1448				list_add_tail(&di->lease_list,
   1449					      &mdsc->dentry_leases);
   1450			} else {
   1451				__dentry_dir_lease_touch(mdsc, di);
   1452			}
   1453			spin_unlock(&mdsc->dentry_list_lock);
   1454		} else {
   1455			freed++;
   1456		}
   1457
   1458		spin_unlock(&dentry->d_lock);
   1459		/* ceph_d_delete() does the trick */
   1460		dput(dentry);
   1461	}
   1462	return freed;
   1463}
   1464
   1465static int __dentry_lease_check(struct dentry *dentry, void *arg)
   1466{
   1467	struct ceph_dentry_info *di = ceph_dentry(dentry);
   1468	int ret;
   1469
   1470	if (__dentry_lease_is_valid(di))
   1471		return STOP;
   1472	ret = __dir_lease_try_check(dentry);
   1473	if (ret == -EBUSY)
   1474		return KEEP;
   1475	if (ret > 0)
   1476		return TOUCH;
   1477	return DELETE;
   1478}
   1479
   1480static int __dir_lease_check(struct dentry *dentry, void *arg)
   1481{
   1482	struct ceph_lease_walk_control *lwc = arg;
   1483	struct ceph_dentry_info *di = ceph_dentry(dentry);
   1484
   1485	int ret = __dir_lease_try_check(dentry);
   1486	if (ret == -EBUSY)
   1487		return KEEP;
   1488	if (ret > 0) {
   1489		if (time_before(jiffies, di->time + lwc->dir_lease_ttl))
   1490			return STOP;
   1491		/* Move dentry to tail of dir lease list if we don't want
   1492		 * to delete it. So dentries in the list are checked in a
   1493		 * round robin manner */
   1494		if (!lwc->expire_dir_lease)
   1495			return TOUCH;
   1496		if (dentry->d_lockref.count > 0 ||
   1497		    (di->flags & CEPH_DENTRY_REFERENCED))
   1498			return TOUCH;
   1499		/* invalidate dir lease */
   1500		di->lease_shared_gen = 0;
   1501	}
   1502	return DELETE;
   1503}
   1504
   1505int ceph_trim_dentries(struct ceph_mds_client *mdsc)
   1506{
   1507	struct ceph_lease_walk_control lwc;
   1508	unsigned long count;
   1509	unsigned long freed;
   1510
   1511	spin_lock(&mdsc->caps_list_lock);
   1512        if (mdsc->caps_use_max > 0 &&
   1513            mdsc->caps_use_count > mdsc->caps_use_max)
   1514		count = mdsc->caps_use_count - mdsc->caps_use_max;
   1515	else
   1516		count = 0;
   1517        spin_unlock(&mdsc->caps_list_lock);
   1518
   1519	lwc.dir_lease = false;
   1520	lwc.nr_to_scan  = CEPH_CAPS_PER_RELEASE * 2;
   1521	freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check);
   1522	if (!lwc.nr_to_scan) /* more invalid leases */
   1523		return -EAGAIN;
   1524
   1525	if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE)
   1526		lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;
   1527
   1528	lwc.dir_lease = true;
   1529	lwc.expire_dir_lease = freed < count;
   1530	lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
   1531	freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check);
   1532	if (!lwc.nr_to_scan) /* more to check */
   1533		return -EAGAIN;
   1534
   1535	return freed > 0 ? 1 : 0;
   1536}
   1537
   1538/*
   1539 * Ensure a dentry lease will no longer revalidate.
   1540 */
   1541void ceph_invalidate_dentry_lease(struct dentry *dentry)
   1542{
   1543	struct ceph_dentry_info *di = ceph_dentry(dentry);
   1544	spin_lock(&dentry->d_lock);
   1545	di->time = jiffies;
   1546	di->lease_shared_gen = 0;
   1547	di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
   1548	__dentry_lease_unlist(di);
   1549	spin_unlock(&dentry->d_lock);
   1550}
   1551
   1552/*
   1553 * Check if dentry lease is valid.  If not, delete the lease.  Try to
   1554 * renew if the least is more than half up.
   1555 */
   1556static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
   1557{
   1558	struct ceph_mds_session *session;
   1559
   1560	if (!di->lease_gen)
   1561		return false;
   1562
   1563	session = di->lease_session;
   1564	if (session) {
   1565		u32 gen;
   1566		unsigned long ttl;
   1567
   1568		gen = atomic_read(&session->s_cap_gen);
   1569		ttl = session->s_cap_ttl;
   1570
   1571		if (di->lease_gen == gen &&
   1572		    time_before(jiffies, ttl) &&
   1573		    time_before(jiffies, di->time))
   1574			return true;
   1575	}
   1576	di->lease_gen = 0;
   1577	return false;
   1578}
   1579
   1580static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags)
   1581{
   1582	struct ceph_dentry_info *di;
   1583	struct ceph_mds_session *session = NULL;
   1584	u32 seq = 0;
   1585	int valid = 0;
   1586
   1587	spin_lock(&dentry->d_lock);
   1588	di = ceph_dentry(dentry);
   1589	if (di && __dentry_lease_is_valid(di)) {
   1590		valid = 1;
   1591
   1592		if (di->lease_renew_after &&
   1593		    time_after(jiffies, di->lease_renew_after)) {
   1594			/*
   1595			 * We should renew. If we're in RCU walk mode
   1596			 * though, we can't do that so just return
   1597			 * -ECHILD.
   1598			 */
   1599			if (flags & LOOKUP_RCU) {
   1600				valid = -ECHILD;
   1601			} else {
   1602				session = ceph_get_mds_session(di->lease_session);
   1603				seq = di->lease_seq;
   1604				di->lease_renew_after = 0;
   1605				di->lease_renew_from = jiffies;
   1606			}
   1607		}
   1608	}
   1609	spin_unlock(&dentry->d_lock);
   1610
   1611	if (session) {
   1612		ceph_mdsc_lease_send_msg(session, dentry,
   1613					 CEPH_MDS_LEASE_RENEW, seq);
   1614		ceph_put_mds_session(session);
   1615	}
   1616	dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
   1617	return valid;
   1618}
   1619
   1620/*
   1621 * Called under dentry->d_lock.
   1622 */
   1623static int __dir_lease_try_check(const struct dentry *dentry)
   1624{
   1625	struct ceph_dentry_info *di = ceph_dentry(dentry);
   1626	struct inode *dir;
   1627	struct ceph_inode_info *ci;
   1628	int valid = 0;
   1629
   1630	if (!di->lease_shared_gen)
   1631		return 0;
   1632	if (IS_ROOT(dentry))
   1633		return 0;
   1634
   1635	dir = d_inode(dentry->d_parent);
   1636	ci = ceph_inode(dir);
   1637
   1638	if (spin_trylock(&ci->i_ceph_lock)) {
   1639		if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen &&
   1640		    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0))
   1641			valid = 1;
   1642		spin_unlock(&ci->i_ceph_lock);
   1643	} else {
   1644		valid = -EBUSY;
   1645	}
   1646
   1647	if (!valid)
   1648		di->lease_shared_gen = 0;
   1649	return valid;
   1650}
   1651
   1652/*
   1653 * Check if directory-wide content lease/cap is valid.
   1654 */
   1655static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
   1656			      struct ceph_mds_client *mdsc)
   1657{
   1658	struct ceph_inode_info *ci = ceph_inode(dir);
   1659	int valid;
   1660	int shared_gen;
   1661
   1662	spin_lock(&ci->i_ceph_lock);
   1663	valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
   1664	if (valid) {
   1665		__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
   1666		shared_gen = atomic_read(&ci->i_shared_gen);
   1667	}
   1668	spin_unlock(&ci->i_ceph_lock);
   1669	if (valid) {
   1670		struct ceph_dentry_info *di;
   1671		spin_lock(&dentry->d_lock);
   1672		di = ceph_dentry(dentry);
   1673		if (dir == d_inode(dentry->d_parent) &&
   1674		    di && di->lease_shared_gen == shared_gen)
   1675			__ceph_dentry_dir_lease_touch(di);
   1676		else
   1677			valid = 0;
   1678		spin_unlock(&dentry->d_lock);
   1679	}
   1680	dout("dir_lease_is_valid dir %p v%u dentry %p = %d\n",
   1681	     dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, valid);
   1682	return valid;
   1683}
   1684
   1685/*
   1686 * Check if cached dentry can be trusted.
   1687 */
   1688static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
   1689{
   1690	int valid = 0;
   1691	struct dentry *parent;
   1692	struct inode *dir, *inode;
   1693	struct ceph_mds_client *mdsc;
   1694
   1695	if (flags & LOOKUP_RCU) {
   1696		parent = READ_ONCE(dentry->d_parent);
   1697		dir = d_inode_rcu(parent);
   1698		if (!dir)
   1699			return -ECHILD;
   1700		inode = d_inode_rcu(dentry);
   1701	} else {
   1702		parent = dget_parent(dentry);
   1703		dir = d_inode(parent);
   1704		inode = d_inode(dentry);
   1705	}
   1706
   1707	dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry,
   1708	     dentry, inode, ceph_dentry(dentry)->offset);
   1709
   1710	mdsc = ceph_sb_to_client(dir->i_sb)->mdsc;
   1711
   1712	/* always trust cached snapped dentries, snapdir dentry */
   1713	if (ceph_snap(dir) != CEPH_NOSNAP) {
   1714		dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
   1715		     dentry, inode);
   1716		valid = 1;
   1717	} else if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
   1718		valid = 1;
   1719	} else {
   1720		valid = dentry_lease_is_valid(dentry, flags);
   1721		if (valid == -ECHILD)
   1722			return valid;
   1723		if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
   1724			if (inode)
   1725				valid = ceph_is_any_caps(inode);
   1726			else
   1727				valid = 1;
   1728		}
   1729	}
   1730
   1731	if (!valid) {
   1732		struct ceph_mds_request *req;
   1733		int op, err;
   1734		u32 mask;
   1735
   1736		if (flags & LOOKUP_RCU)
   1737			return -ECHILD;
   1738
   1739		percpu_counter_inc(&mdsc->metric.d_lease_mis);
   1740
   1741		op = ceph_snap(dir) == CEPH_SNAPDIR ?
   1742			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
   1743		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
   1744		if (!IS_ERR(req)) {
   1745			req->r_dentry = dget(dentry);
   1746			req->r_num_caps = 2;
   1747			req->r_parent = dir;
   1748			ihold(dir);
   1749
   1750			mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
   1751			if (ceph_security_xattr_wanted(dir))
   1752				mask |= CEPH_CAP_XATTR_SHARED;
   1753			req->r_args.getattr.mask = cpu_to_le32(mask);
   1754
   1755			err = ceph_mdsc_do_request(mdsc, NULL, req);
   1756			switch (err) {
   1757			case 0:
   1758				if (d_really_is_positive(dentry) &&
   1759				    d_inode(dentry) == req->r_target_inode)
   1760					valid = 1;
   1761				break;
   1762			case -ENOENT:
   1763				if (d_really_is_negative(dentry))
   1764					valid = 1;
   1765				fallthrough;
   1766			default:
   1767				break;
   1768			}
   1769			ceph_mdsc_put_request(req);
   1770			dout("d_revalidate %p lookup result=%d\n",
   1771			     dentry, err);
   1772		}
   1773	} else {
   1774		percpu_counter_inc(&mdsc->metric.d_lease_hit);
   1775	}
   1776
   1777	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
   1778	if (!valid)
   1779		ceph_dir_clear_complete(dir);
   1780
   1781	if (!(flags & LOOKUP_RCU))
   1782		dput(parent);
   1783	return valid;
   1784}
   1785
   1786/*
   1787 * Delete unused dentry that doesn't have valid lease
   1788 *
   1789 * Called under dentry->d_lock.
   1790 */
   1791static int ceph_d_delete(const struct dentry *dentry)
   1792{
   1793	struct ceph_dentry_info *di;
   1794
   1795	/* won't release caps */
   1796	if (d_really_is_negative(dentry))
   1797		return 0;
   1798	if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
   1799		return 0;
   1800	/* vaild lease? */
   1801	di = ceph_dentry(dentry);
   1802	if (di) {
   1803		if (__dentry_lease_is_valid(di))
   1804			return 0;
   1805		if (__dir_lease_try_check(dentry))
   1806			return 0;
   1807	}
   1808	return 1;
   1809}
   1810
   1811/*
   1812 * Release our ceph_dentry_info.
   1813 */
   1814static void ceph_d_release(struct dentry *dentry)
   1815{
   1816	struct ceph_dentry_info *di = ceph_dentry(dentry);
   1817	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
   1818
   1819	dout("d_release %p\n", dentry);
   1820
   1821	atomic64_dec(&fsc->mdsc->metric.total_dentries);
   1822
   1823	spin_lock(&dentry->d_lock);
   1824	__dentry_lease_unlist(di);
   1825	dentry->d_fsdata = NULL;
   1826	spin_unlock(&dentry->d_lock);
   1827
   1828	ceph_put_mds_session(di->lease_session);
   1829	kmem_cache_free(ceph_dentry_cachep, di);
   1830}
   1831
   1832/*
   1833 * When the VFS prunes a dentry from the cache, we need to clear the
   1834 * complete flag on the parent directory.
   1835 *
   1836 * Called under dentry->d_lock.
   1837 */
   1838static void ceph_d_prune(struct dentry *dentry)
   1839{
   1840	struct ceph_inode_info *dir_ci;
   1841	struct ceph_dentry_info *di;
   1842
   1843	dout("ceph_d_prune %pd %p\n", dentry, dentry);
   1844
   1845	/* do we have a valid parent? */
   1846	if (IS_ROOT(dentry))
   1847		return;
   1848
   1849	/* we hold d_lock, so d_parent is stable */
   1850	dir_ci = ceph_inode(d_inode(dentry->d_parent));
   1851	if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
   1852		return;
   1853
   1854	/* who calls d_delete() should also disable dcache readdir */
   1855	if (d_really_is_negative(dentry))
   1856		return;
   1857
   1858	/* d_fsdata does not get cleared until d_release */
   1859	if (!d_unhashed(dentry)) {
   1860		__ceph_dir_clear_complete(dir_ci);
   1861		return;
   1862	}
   1863
   1864	/* Disable dcache readdir just in case that someone called d_drop()
   1865	 * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
   1866	 * properly (dcache readdir is still enabled) */
   1867	di = ceph_dentry(dentry);
   1868	if (di->offset > 0 &&
   1869	    di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
   1870		__ceph_dir_clear_ordered(dir_ci);
   1871}
   1872
   1873/*
   1874 * read() on a dir.  This weird interface hack only works if mounted
   1875 * with '-o dirstat'.
   1876 */
   1877static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
   1878			     loff_t *ppos)
   1879{
   1880	struct ceph_dir_file_info *dfi = file->private_data;
   1881	struct inode *inode = file_inode(file);
   1882	struct ceph_inode_info *ci = ceph_inode(inode);
   1883	int left;
   1884	const int bufsize = 1024;
   1885
   1886	if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
   1887		return -EISDIR;
   1888
   1889	if (!dfi->dir_info) {
   1890		dfi->dir_info = kmalloc(bufsize, GFP_KERNEL);
   1891		if (!dfi->dir_info)
   1892			return -ENOMEM;
   1893		dfi->dir_info_len =
   1894			snprintf(dfi->dir_info, bufsize,
   1895				"entries:   %20lld\n"
   1896				" files:    %20lld\n"
   1897				" subdirs:  %20lld\n"
   1898				"rentries:  %20lld\n"
   1899				" rfiles:   %20lld\n"
   1900				" rsubdirs: %20lld\n"
   1901				"rbytes:    %20lld\n"
   1902				"rctime:    %10lld.%09ld\n",
   1903				ci->i_files + ci->i_subdirs,
   1904				ci->i_files,
   1905				ci->i_subdirs,
   1906				ci->i_rfiles + ci->i_rsubdirs,
   1907				ci->i_rfiles,
   1908				ci->i_rsubdirs,
   1909				ci->i_rbytes,
   1910				ci->i_rctime.tv_sec,
   1911				ci->i_rctime.tv_nsec);
   1912	}
   1913
   1914	if (*ppos >= dfi->dir_info_len)
   1915		return 0;
   1916	size = min_t(unsigned, size, dfi->dir_info_len-*ppos);
   1917	left = copy_to_user(buf, dfi->dir_info + *ppos, size);
   1918	if (left == size)
   1919		return -EFAULT;
   1920	*ppos += (size - left);
   1921	return size - left;
   1922}
   1923
   1924
   1925
   1926/*
   1927 * Return name hash for a given dentry.  This is dependent on
   1928 * the parent directory's hash function.
   1929 */
   1930unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
   1931{
   1932	struct ceph_inode_info *dci = ceph_inode(dir);
   1933	unsigned hash;
   1934
   1935	switch (dci->i_dir_layout.dl_dir_hash) {
   1936	case 0:	/* for backward compat */
   1937	case CEPH_STR_HASH_LINUX:
   1938		return dn->d_name.hash;
   1939
   1940	default:
   1941		spin_lock(&dn->d_lock);
   1942		hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
   1943				     dn->d_name.name, dn->d_name.len);
   1944		spin_unlock(&dn->d_lock);
   1945		return hash;
   1946	}
   1947}
   1948
   1949const struct file_operations ceph_dir_fops = {
   1950	.read = ceph_read_dir,
   1951	.iterate = ceph_readdir,
   1952	.llseek = ceph_dir_llseek,
   1953	.open = ceph_open,
   1954	.release = ceph_release,
   1955	.unlocked_ioctl = ceph_ioctl,
   1956	.compat_ioctl = compat_ptr_ioctl,
   1957	.fsync = ceph_fsync,
   1958	.lock = ceph_lock,
   1959	.flock = ceph_flock,
   1960};
   1961
   1962const struct file_operations ceph_snapdir_fops = {
   1963	.iterate = ceph_readdir,
   1964	.llseek = ceph_dir_llseek,
   1965	.open = ceph_open,
   1966	.release = ceph_release,
   1967};
   1968
   1969const struct inode_operations ceph_dir_iops = {
   1970	.lookup = ceph_lookup,
   1971	.permission = ceph_permission,
   1972	.getattr = ceph_getattr,
   1973	.setattr = ceph_setattr,
   1974	.listxattr = ceph_listxattr,
   1975	.get_acl = ceph_get_acl,
   1976	.set_acl = ceph_set_acl,
   1977	.mknod = ceph_mknod,
   1978	.symlink = ceph_symlink,
   1979	.mkdir = ceph_mkdir,
   1980	.link = ceph_link,
   1981	.unlink = ceph_unlink,
   1982	.rmdir = ceph_unlink,
   1983	.rename = ceph_rename,
   1984	.create = ceph_create,
   1985	.atomic_open = ceph_atomic_open,
   1986};
   1987
   1988const struct inode_operations ceph_snapdir_iops = {
   1989	.lookup = ceph_lookup,
   1990	.permission = ceph_permission,
   1991	.getattr = ceph_getattr,
   1992	.mkdir = ceph_mkdir,
   1993	.rmdir = ceph_unlink,
   1994	.rename = ceph_rename,
   1995};
   1996
   1997const struct dentry_operations ceph_dentry_ops = {
   1998	.d_revalidate = ceph_d_revalidate,
   1999	.d_delete = ceph_d_delete,
   2000	.d_release = ceph_d_release,
   2001	.d_prune = ceph_d_prune,
   2002	.d_init = ceph_d_init,
   2003};