cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

mds_client.c (141494B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/fs.h>
      5#include <linux/wait.h>
      6#include <linux/slab.h>
      7#include <linux/gfp.h>
      8#include <linux/sched.h>
      9#include <linux/debugfs.h>
     10#include <linux/seq_file.h>
     11#include <linux/ratelimit.h>
     12#include <linux/bits.h>
     13#include <linux/ktime.h>
     14#include <linux/bitmap.h>
     15
     16#include "super.h"
     17#include "mds_client.h"
     18
     19#include <linux/ceph/ceph_features.h>
     20#include <linux/ceph/messenger.h>
     21#include <linux/ceph/decode.h>
     22#include <linux/ceph/pagelist.h>
     23#include <linux/ceph/auth.h>
     24#include <linux/ceph/debugfs.h>
     25
     26#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
     27
     28/*
     29 * A cluster of MDS (metadata server) daemons is responsible for
     30 * managing the file system namespace (the directory hierarchy and
     31 * inodes) and for coordinating shared access to storage.  Metadata is
     32 * partitioning hierarchically across a number of servers, and that
     33 * partition varies over time as the cluster adjusts the distribution
     34 * in order to balance load.
     35 *
     36 * The MDS client is primarily responsible to managing synchronous
     37 * metadata requests for operations like open, unlink, and so forth.
     38 * If there is a MDS failure, we find out about it when we (possibly
     39 * request and) receive a new MDS map, and can resubmit affected
     40 * requests.
     41 *
     42 * For the most part, though, we take advantage of a lossless
     43 * communications channel to the MDS, and do not need to worry about
     44 * timing out or resubmitting requests.
     45 *
     46 * We maintain a stateful "session" with each MDS we interact with.
     47 * Within each session, we sent periodic heartbeat messages to ensure
     48 * any capabilities or leases we have been issues remain valid.  If
     49 * the session times out and goes stale, our leases and capabilities
     50 * are no longer valid.
     51 */
     52
     53struct ceph_reconnect_state {
     54	struct ceph_mds_session *session;
     55	int nr_caps, nr_realms;
     56	struct ceph_pagelist *pagelist;
     57	unsigned msg_version;
     58	bool allow_multi;
     59};
     60
     61static void __wake_requests(struct ceph_mds_client *mdsc,
     62			    struct list_head *head);
     63static void ceph_cap_release_work(struct work_struct *work);
     64static void ceph_cap_reclaim_work(struct work_struct *work);
     65
     66static const struct ceph_connection_operations mds_con_ops;
     67
     68
     69/*
     70 * mds reply parsing
     71 */
     72
     73static int parse_reply_info_quota(void **p, void *end,
     74				  struct ceph_mds_reply_info_in *info)
     75{
     76	u8 struct_v, struct_compat;
     77	u32 struct_len;
     78
     79	ceph_decode_8_safe(p, end, struct_v, bad);
     80	ceph_decode_8_safe(p, end, struct_compat, bad);
     81	/* struct_v is expected to be >= 1. we only
     82	 * understand encoding with struct_compat == 1. */
     83	if (!struct_v || struct_compat != 1)
     84		goto bad;
     85	ceph_decode_32_safe(p, end, struct_len, bad);
     86	ceph_decode_need(p, end, struct_len, bad);
     87	end = *p + struct_len;
     88	ceph_decode_64_safe(p, end, info->max_bytes, bad);
     89	ceph_decode_64_safe(p, end, info->max_files, bad);
     90	*p = end;
     91	return 0;
     92bad:
     93	return -EIO;
     94}
     95
     96/*
     97 * parse individual inode info
     98 */
     99static int parse_reply_info_in(void **p, void *end,
    100			       struct ceph_mds_reply_info_in *info,
    101			       u64 features)
    102{
    103	int err = 0;
    104	u8 struct_v = 0;
    105
    106	if (features == (u64)-1) {
    107		u32 struct_len;
    108		u8 struct_compat;
    109		ceph_decode_8_safe(p, end, struct_v, bad);
    110		ceph_decode_8_safe(p, end, struct_compat, bad);
    111		/* struct_v is expected to be >= 1. we only understand
    112		 * encoding with struct_compat == 1. */
    113		if (!struct_v || struct_compat != 1)
    114			goto bad;
    115		ceph_decode_32_safe(p, end, struct_len, bad);
    116		ceph_decode_need(p, end, struct_len, bad);
    117		end = *p + struct_len;
    118	}
    119
    120	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
    121	info->in = *p;
    122	*p += sizeof(struct ceph_mds_reply_inode) +
    123		sizeof(*info->in->fragtree.splits) *
    124		le32_to_cpu(info->in->fragtree.nsplits);
    125
    126	ceph_decode_32_safe(p, end, info->symlink_len, bad);
    127	ceph_decode_need(p, end, info->symlink_len, bad);
    128	info->symlink = *p;
    129	*p += info->symlink_len;
    130
    131	ceph_decode_copy_safe(p, end, &info->dir_layout,
    132			      sizeof(info->dir_layout), bad);
    133	ceph_decode_32_safe(p, end, info->xattr_len, bad);
    134	ceph_decode_need(p, end, info->xattr_len, bad);
    135	info->xattr_data = *p;
    136	*p += info->xattr_len;
    137
    138	if (features == (u64)-1) {
    139		/* inline data */
    140		ceph_decode_64_safe(p, end, info->inline_version, bad);
    141		ceph_decode_32_safe(p, end, info->inline_len, bad);
    142		ceph_decode_need(p, end, info->inline_len, bad);
    143		info->inline_data = *p;
    144		*p += info->inline_len;
    145		/* quota */
    146		err = parse_reply_info_quota(p, end, info);
    147		if (err < 0)
    148			goto out_bad;
    149		/* pool namespace */
    150		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
    151		if (info->pool_ns_len > 0) {
    152			ceph_decode_need(p, end, info->pool_ns_len, bad);
    153			info->pool_ns_data = *p;
    154			*p += info->pool_ns_len;
    155		}
    156
    157		/* btime */
    158		ceph_decode_need(p, end, sizeof(info->btime), bad);
    159		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
    160
    161		/* change attribute */
    162		ceph_decode_64_safe(p, end, info->change_attr, bad);
    163
    164		/* dir pin */
    165		if (struct_v >= 2) {
    166			ceph_decode_32_safe(p, end, info->dir_pin, bad);
    167		} else {
    168			info->dir_pin = -ENODATA;
    169		}
    170
    171		/* snapshot birth time, remains zero for v<=2 */
    172		if (struct_v >= 3) {
    173			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
    174			ceph_decode_copy(p, &info->snap_btime,
    175					 sizeof(info->snap_btime));
    176		} else {
    177			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
    178		}
    179
    180		/* snapshot count, remains zero for v<=3 */
    181		if (struct_v >= 4) {
    182			ceph_decode_64_safe(p, end, info->rsnaps, bad);
    183		} else {
    184			info->rsnaps = 0;
    185		}
    186
    187		*p = end;
    188	} else {
    189		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
    190			ceph_decode_64_safe(p, end, info->inline_version, bad);
    191			ceph_decode_32_safe(p, end, info->inline_len, bad);
    192			ceph_decode_need(p, end, info->inline_len, bad);
    193			info->inline_data = *p;
    194			*p += info->inline_len;
    195		} else
    196			info->inline_version = CEPH_INLINE_NONE;
    197
    198		if (features & CEPH_FEATURE_MDS_QUOTA) {
    199			err = parse_reply_info_quota(p, end, info);
    200			if (err < 0)
    201				goto out_bad;
    202		} else {
    203			info->max_bytes = 0;
    204			info->max_files = 0;
    205		}
    206
    207		info->pool_ns_len = 0;
    208		info->pool_ns_data = NULL;
    209		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
    210			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
    211			if (info->pool_ns_len > 0) {
    212				ceph_decode_need(p, end, info->pool_ns_len, bad);
    213				info->pool_ns_data = *p;
    214				*p += info->pool_ns_len;
    215			}
    216		}
    217
    218		if (features & CEPH_FEATURE_FS_BTIME) {
    219			ceph_decode_need(p, end, sizeof(info->btime), bad);
    220			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
    221			ceph_decode_64_safe(p, end, info->change_attr, bad);
    222		}
    223
    224		info->dir_pin = -ENODATA;
    225		/* info->snap_btime and info->rsnaps remain zero */
    226	}
    227	return 0;
    228bad:
    229	err = -EIO;
    230out_bad:
    231	return err;
    232}
    233
    234static int parse_reply_info_dir(void **p, void *end,
    235				struct ceph_mds_reply_dirfrag **dirfrag,
    236				u64 features)
    237{
    238	if (features == (u64)-1) {
    239		u8 struct_v, struct_compat;
    240		u32 struct_len;
    241		ceph_decode_8_safe(p, end, struct_v, bad);
    242		ceph_decode_8_safe(p, end, struct_compat, bad);
    243		/* struct_v is expected to be >= 1. we only understand
    244		 * encoding whose struct_compat == 1. */
    245		if (!struct_v || struct_compat != 1)
    246			goto bad;
    247		ceph_decode_32_safe(p, end, struct_len, bad);
    248		ceph_decode_need(p, end, struct_len, bad);
    249		end = *p + struct_len;
    250	}
    251
    252	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
    253	*dirfrag = *p;
    254	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
    255	if (unlikely(*p > end))
    256		goto bad;
    257	if (features == (u64)-1)
    258		*p = end;
    259	return 0;
    260bad:
    261	return -EIO;
    262}
    263
    264static int parse_reply_info_lease(void **p, void *end,
    265				  struct ceph_mds_reply_lease **lease,
    266				  u64 features)
    267{
    268	if (features == (u64)-1) {
    269		u8 struct_v, struct_compat;
    270		u32 struct_len;
    271		ceph_decode_8_safe(p, end, struct_v, bad);
    272		ceph_decode_8_safe(p, end, struct_compat, bad);
    273		/* struct_v is expected to be >= 1. we only understand
    274		 * encoding whose struct_compat == 1. */
    275		if (!struct_v || struct_compat != 1)
    276			goto bad;
    277		ceph_decode_32_safe(p, end, struct_len, bad);
    278		ceph_decode_need(p, end, struct_len, bad);
    279		end = *p + struct_len;
    280	}
    281
    282	ceph_decode_need(p, end, sizeof(**lease), bad);
    283	*lease = *p;
    284	*p += sizeof(**lease);
    285	if (features == (u64)-1)
    286		*p = end;
    287	return 0;
    288bad:
    289	return -EIO;
    290}
    291
    292/*
    293 * parse a normal reply, which may contain a (dir+)dentry and/or a
    294 * target inode.
    295 */
    296static int parse_reply_info_trace(void **p, void *end,
    297				  struct ceph_mds_reply_info_parsed *info,
    298				  u64 features)
    299{
    300	int err;
    301
    302	if (info->head->is_dentry) {
    303		err = parse_reply_info_in(p, end, &info->diri, features);
    304		if (err < 0)
    305			goto out_bad;
    306
    307		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
    308		if (err < 0)
    309			goto out_bad;
    310
    311		ceph_decode_32_safe(p, end, info->dname_len, bad);
    312		ceph_decode_need(p, end, info->dname_len, bad);
    313		info->dname = *p;
    314		*p += info->dname_len;
    315
    316		err = parse_reply_info_lease(p, end, &info->dlease, features);
    317		if (err < 0)
    318			goto out_bad;
    319	}
    320
    321	if (info->head->is_target) {
    322		err = parse_reply_info_in(p, end, &info->targeti, features);
    323		if (err < 0)
    324			goto out_bad;
    325	}
    326
    327	if (unlikely(*p != end))
    328		goto bad;
    329	return 0;
    330
    331bad:
    332	err = -EIO;
    333out_bad:
    334	pr_err("problem parsing mds trace %d\n", err);
    335	return err;
    336}
    337
    338/*
    339 * parse readdir results
    340 */
    341static int parse_reply_info_readdir(void **p, void *end,
    342				struct ceph_mds_reply_info_parsed *info,
    343				u64 features)
    344{
    345	u32 num, i = 0;
    346	int err;
    347
    348	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
    349	if (err < 0)
    350		goto out_bad;
    351
    352	ceph_decode_need(p, end, sizeof(num) + 2, bad);
    353	num = ceph_decode_32(p);
    354	{
    355		u16 flags = ceph_decode_16(p);
    356		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
    357		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
    358		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
    359		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
    360	}
    361	if (num == 0)
    362		goto done;
    363
    364	BUG_ON(!info->dir_entries);
    365	if ((unsigned long)(info->dir_entries + num) >
    366	    (unsigned long)info->dir_entries + info->dir_buf_size) {
    367		pr_err("dir contents are larger than expected\n");
    368		WARN_ON(1);
    369		goto bad;
    370	}
    371
    372	info->dir_nr = num;
    373	while (num) {
    374		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
    375		/* dentry */
    376		ceph_decode_32_safe(p, end, rde->name_len, bad);
    377		ceph_decode_need(p, end, rde->name_len, bad);
    378		rde->name = *p;
    379		*p += rde->name_len;
    380		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
    381
    382		/* dentry lease */
    383		err = parse_reply_info_lease(p, end, &rde->lease, features);
    384		if (err)
    385			goto out_bad;
    386		/* inode */
    387		err = parse_reply_info_in(p, end, &rde->inode, features);
    388		if (err < 0)
    389			goto out_bad;
    390		/* ceph_readdir_prepopulate() will update it */
    391		rde->offset = 0;
    392		i++;
    393		num--;
    394	}
    395
    396done:
    397	/* Skip over any unrecognized fields */
    398	*p = end;
    399	return 0;
    400
    401bad:
    402	err = -EIO;
    403out_bad:
    404	pr_err("problem parsing dir contents %d\n", err);
    405	return err;
    406}
    407
    408/*
    409 * parse fcntl F_GETLK results
    410 */
    411static int parse_reply_info_filelock(void **p, void *end,
    412				     struct ceph_mds_reply_info_parsed *info,
    413				     u64 features)
    414{
    415	if (*p + sizeof(*info->filelock_reply) > end)
    416		goto bad;
    417
    418	info->filelock_reply = *p;
    419
    420	/* Skip over any unrecognized fields */
    421	*p = end;
    422	return 0;
    423bad:
    424	return -EIO;
    425}
    426
    427
    428#if BITS_PER_LONG == 64
    429
    430#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
    431
    432static int ceph_parse_deleg_inos(void **p, void *end,
    433				 struct ceph_mds_session *s)
    434{
    435	u32 sets;
    436
    437	ceph_decode_32_safe(p, end, sets, bad);
    438	dout("got %u sets of delegated inodes\n", sets);
    439	while (sets--) {
    440		u64 start, len;
    441
    442		ceph_decode_64_safe(p, end, start, bad);
    443		ceph_decode_64_safe(p, end, len, bad);
    444
    445		/* Don't accept a delegation of system inodes */
    446		if (start < CEPH_INO_SYSTEM_BASE) {
    447			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
    448					start, len);
    449			continue;
    450		}
    451		while (len--) {
    452			int err = xa_insert(&s->s_delegated_inos, start++,
    453					    DELEGATED_INO_AVAILABLE,
    454					    GFP_KERNEL);
    455			if (!err) {
    456				dout("added delegated inode 0x%llx\n",
    457				     start - 1);
    458			} else if (err == -EBUSY) {
    459				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
    460					start - 1);
    461			} else {
    462				return err;
    463			}
    464		}
    465	}
    466	return 0;
    467bad:
    468	return -EIO;
    469}
    470
    471u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
    472{
    473	unsigned long ino;
    474	void *val;
    475
    476	xa_for_each(&s->s_delegated_inos, ino, val) {
    477		val = xa_erase(&s->s_delegated_inos, ino);
    478		if (val == DELEGATED_INO_AVAILABLE)
    479			return ino;
    480	}
    481	return 0;
    482}
    483
    484int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
    485{
    486	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
    487			 GFP_KERNEL);
    488}
    489#else /* BITS_PER_LONG == 64 */
    490/*
    491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
    492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
    493 * and bottom words?
    494 */
    495static int ceph_parse_deleg_inos(void **p, void *end,
    496				 struct ceph_mds_session *s)
    497{
    498	u32 sets;
    499
    500	ceph_decode_32_safe(p, end, sets, bad);
    501	if (sets)
    502		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
    503	return 0;
    504bad:
    505	return -EIO;
    506}
    507
    508u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
    509{
    510	return 0;
    511}
    512
    513int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
    514{
    515	return 0;
    516}
    517#endif /* BITS_PER_LONG == 64 */
    518
    519/*
    520 * parse create results
    521 */
    522static int parse_reply_info_create(void **p, void *end,
    523				  struct ceph_mds_reply_info_parsed *info,
    524				  u64 features, struct ceph_mds_session *s)
    525{
    526	int ret;
    527
    528	if (features == (u64)-1 ||
    529	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
    530		if (*p == end) {
    531			/* Malformed reply? */
    532			info->has_create_ino = false;
    533		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
    534			info->has_create_ino = true;
    535			/* struct_v, struct_compat, and len */
    536			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
    537			ceph_decode_64_safe(p, end, info->ino, bad);
    538			ret = ceph_parse_deleg_inos(p, end, s);
    539			if (ret)
    540				return ret;
    541		} else {
    542			/* legacy */
    543			ceph_decode_64_safe(p, end, info->ino, bad);
    544			info->has_create_ino = true;
    545		}
    546	} else {
    547		if (*p != end)
    548			goto bad;
    549	}
    550
    551	/* Skip over any unrecognized fields */
    552	*p = end;
    553	return 0;
    554bad:
    555	return -EIO;
    556}
    557
    558static int parse_reply_info_getvxattr(void **p, void *end,
    559				      struct ceph_mds_reply_info_parsed *info,
    560				      u64 features)
    561{
    562	u32 value_len;
    563
    564	ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
    565	ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
    566	ceph_decode_skip_32(p, end, bad); /* skip payload length */
    567
    568	ceph_decode_32_safe(p, end, value_len, bad);
    569
    570	if (value_len == end - *p) {
    571	  info->xattr_info.xattr_value = *p;
    572	  info->xattr_info.xattr_value_len = value_len;
    573	  *p = end;
    574	  return value_len;
    575	}
    576bad:
    577	return -EIO;
    578}
    579
    580/*
    581 * parse extra results
    582 */
    583static int parse_reply_info_extra(void **p, void *end,
    584				  struct ceph_mds_reply_info_parsed *info,
    585				  u64 features, struct ceph_mds_session *s)
    586{
    587	u32 op = le32_to_cpu(info->head->op);
    588
    589	if (op == CEPH_MDS_OP_GETFILELOCK)
    590		return parse_reply_info_filelock(p, end, info, features);
    591	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
    592		return parse_reply_info_readdir(p, end, info, features);
    593	else if (op == CEPH_MDS_OP_CREATE)
    594		return parse_reply_info_create(p, end, info, features, s);
    595	else if (op == CEPH_MDS_OP_GETVXATTR)
    596		return parse_reply_info_getvxattr(p, end, info, features);
    597	else
    598		return -EIO;
    599}
    600
    601/*
    602 * parse entire mds reply
    603 */
    604static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
    605			    struct ceph_mds_reply_info_parsed *info,
    606			    u64 features)
    607{
    608	void *p, *end;
    609	u32 len;
    610	int err;
    611
    612	info->head = msg->front.iov_base;
    613	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
    614	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
    615
    616	/* trace */
    617	ceph_decode_32_safe(&p, end, len, bad);
    618	if (len > 0) {
    619		ceph_decode_need(&p, end, len, bad);
    620		err = parse_reply_info_trace(&p, p+len, info, features);
    621		if (err < 0)
    622			goto out_bad;
    623	}
    624
    625	/* extra */
    626	ceph_decode_32_safe(&p, end, len, bad);
    627	if (len > 0) {
    628		ceph_decode_need(&p, end, len, bad);
    629		err = parse_reply_info_extra(&p, p+len, info, features, s);
    630		if (err < 0)
    631			goto out_bad;
    632	}
    633
    634	/* snap blob */
    635	ceph_decode_32_safe(&p, end, len, bad);
    636	info->snapblob_len = len;
    637	info->snapblob = p;
    638	p += len;
    639
    640	if (p != end)
    641		goto bad;
    642	return 0;
    643
    644bad:
    645	err = -EIO;
    646out_bad:
    647	pr_err("mds parse_reply err %d\n", err);
    648	return err;
    649}
    650
    651static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
    652{
    653	if (!info->dir_entries)
    654		return;
    655	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
    656}
    657
    658
    659/*
    660 * sessions
    661 */
    662const char *ceph_session_state_name(int s)
    663{
    664	switch (s) {
    665	case CEPH_MDS_SESSION_NEW: return "new";
    666	case CEPH_MDS_SESSION_OPENING: return "opening";
    667	case CEPH_MDS_SESSION_OPEN: return "open";
    668	case CEPH_MDS_SESSION_HUNG: return "hung";
    669	case CEPH_MDS_SESSION_CLOSING: return "closing";
    670	case CEPH_MDS_SESSION_CLOSED: return "closed";
    671	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
    672	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
    673	case CEPH_MDS_SESSION_REJECTED: return "rejected";
    674	default: return "???";
    675	}
    676}
    677
    678struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
    679{
    680	if (refcount_inc_not_zero(&s->s_ref))
    681		return s;
    682	return NULL;
    683}
    684
    685void ceph_put_mds_session(struct ceph_mds_session *s)
    686{
    687	if (IS_ERR_OR_NULL(s))
    688		return;
    689
    690	if (refcount_dec_and_test(&s->s_ref)) {
    691		if (s->s_auth.authorizer)
    692			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
    693		WARN_ON(mutex_is_locked(&s->s_mutex));
    694		xa_destroy(&s->s_delegated_inos);
    695		kfree(s);
    696	}
    697}
    698
    699/*
    700 * called under mdsc->mutex
    701 */
    702struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
    703						   int mds)
    704{
    705	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
    706		return NULL;
    707	return ceph_get_mds_session(mdsc->sessions[mds]);
    708}
    709
    710static bool __have_session(struct ceph_mds_client *mdsc, int mds)
    711{
    712	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
    713		return false;
    714	else
    715		return true;
    716}
    717
    718static int __verify_registered_session(struct ceph_mds_client *mdsc,
    719				       struct ceph_mds_session *s)
    720{
    721	if (s->s_mds >= mdsc->max_sessions ||
    722	    mdsc->sessions[s->s_mds] != s)
    723		return -ENOENT;
    724	return 0;
    725}
    726
    727/*
    728 * create+register a new session for given mds.
    729 * called under mdsc->mutex.
    730 */
    731static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
    732						 int mds)
    733{
    734	struct ceph_mds_session *s;
    735
    736	if (mds >= mdsc->mdsmap->possible_max_rank)
    737		return ERR_PTR(-EINVAL);
    738
    739	s = kzalloc(sizeof(*s), GFP_NOFS);
    740	if (!s)
    741		return ERR_PTR(-ENOMEM);
    742
    743	if (mds >= mdsc->max_sessions) {
    744		int newmax = 1 << get_count_order(mds + 1);
    745		struct ceph_mds_session **sa;
    746
    747		dout("%s: realloc to %d\n", __func__, newmax);
    748		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
    749		if (!sa)
    750			goto fail_realloc;
    751		if (mdsc->sessions) {
    752			memcpy(sa, mdsc->sessions,
    753			       mdsc->max_sessions * sizeof(void *));
    754			kfree(mdsc->sessions);
    755		}
    756		mdsc->sessions = sa;
    757		mdsc->max_sessions = newmax;
    758	}
    759
    760	dout("%s: mds%d\n", __func__, mds);
    761	s->s_mdsc = mdsc;
    762	s->s_mds = mds;
    763	s->s_state = CEPH_MDS_SESSION_NEW;
    764	mutex_init(&s->s_mutex);
    765
    766	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
    767
    768	atomic_set(&s->s_cap_gen, 1);
    769	s->s_cap_ttl = jiffies - 1;
    770
    771	spin_lock_init(&s->s_cap_lock);
    772	INIT_LIST_HEAD(&s->s_caps);
    773	refcount_set(&s->s_ref, 1);
    774	INIT_LIST_HEAD(&s->s_waiting);
    775	INIT_LIST_HEAD(&s->s_unsafe);
    776	xa_init(&s->s_delegated_inos);
    777	INIT_LIST_HEAD(&s->s_cap_releases);
    778	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
    779
    780	INIT_LIST_HEAD(&s->s_cap_dirty);
    781	INIT_LIST_HEAD(&s->s_cap_flushing);
    782
    783	mdsc->sessions[mds] = s;
    784	atomic_inc(&mdsc->num_sessions);
    785	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
    786
    787	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
    788		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
    789
    790	return s;
    791
    792fail_realloc:
    793	kfree(s);
    794	return ERR_PTR(-ENOMEM);
    795}
    796
    797/*
    798 * called under mdsc->mutex
    799 */
    800static void __unregister_session(struct ceph_mds_client *mdsc,
    801			       struct ceph_mds_session *s)
    802{
    803	dout("__unregister_session mds%d %p\n", s->s_mds, s);
    804	BUG_ON(mdsc->sessions[s->s_mds] != s);
    805	mdsc->sessions[s->s_mds] = NULL;
    806	ceph_con_close(&s->s_con);
    807	ceph_put_mds_session(s);
    808	atomic_dec(&mdsc->num_sessions);
    809}
    810
    811/*
    812 * drop session refs in request.
    813 *
    814 * should be last request ref, or hold mdsc->mutex
    815 */
    816static void put_request_session(struct ceph_mds_request *req)
    817{
    818	if (req->r_session) {
    819		ceph_put_mds_session(req->r_session);
    820		req->r_session = NULL;
    821	}
    822}
    823
    824void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
    825				void (*cb)(struct ceph_mds_session *),
    826				bool check_state)
    827{
    828	int mds;
    829
    830	mutex_lock(&mdsc->mutex);
    831	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
    832		struct ceph_mds_session *s;
    833
    834		s = __ceph_lookup_mds_session(mdsc, mds);
    835		if (!s)
    836			continue;
    837
    838		if (check_state && !check_session_state(s)) {
    839			ceph_put_mds_session(s);
    840			continue;
    841		}
    842
    843		mutex_unlock(&mdsc->mutex);
    844		cb(s);
    845		ceph_put_mds_session(s);
    846		mutex_lock(&mdsc->mutex);
    847	}
    848	mutex_unlock(&mdsc->mutex);
    849}
    850
    851void ceph_mdsc_release_request(struct kref *kref)
    852{
    853	struct ceph_mds_request *req = container_of(kref,
    854						    struct ceph_mds_request,
    855						    r_kref);
    856	ceph_mdsc_release_dir_caps_no_check(req);
    857	destroy_reply_info(&req->r_reply_info);
    858	if (req->r_request)
    859		ceph_msg_put(req->r_request);
    860	if (req->r_reply)
    861		ceph_msg_put(req->r_reply);
    862	if (req->r_inode) {
    863		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
    864		iput(req->r_inode);
    865	}
    866	if (req->r_parent) {
    867		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
    868		iput(req->r_parent);
    869	}
    870	iput(req->r_target_inode);
    871	if (req->r_dentry)
    872		dput(req->r_dentry);
    873	if (req->r_old_dentry)
    874		dput(req->r_old_dentry);
    875	if (req->r_old_dentry_dir) {
    876		/*
    877		 * track (and drop pins for) r_old_dentry_dir
    878		 * separately, since r_old_dentry's d_parent may have
    879		 * changed between the dir mutex being dropped and
    880		 * this request being freed.
    881		 */
    882		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
    883				  CEPH_CAP_PIN);
    884		iput(req->r_old_dentry_dir);
    885	}
    886	kfree(req->r_path1);
    887	kfree(req->r_path2);
    888	put_cred(req->r_cred);
    889	if (req->r_pagelist)
    890		ceph_pagelist_release(req->r_pagelist);
    891	put_request_session(req);
    892	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
    893	WARN_ON_ONCE(!list_empty(&req->r_wait));
    894	kmem_cache_free(ceph_mds_request_cachep, req);
    895}
    896
    897DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
    898
    899/*
    900 * lookup session, bump ref if found.
    901 *
    902 * called under mdsc->mutex.
    903 */
    904static struct ceph_mds_request *
    905lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
    906{
    907	struct ceph_mds_request *req;
    908
    909	req = lookup_request(&mdsc->request_tree, tid);
    910	if (req)
    911		ceph_mdsc_get_request(req);
    912
    913	return req;
    914}
    915
    916/*
    917 * Register an in-flight request, and assign a tid.  Link to directory
    918 * are modifying (if any).
    919 *
    920 * Called under mdsc->mutex.
    921 */
    922static void __register_request(struct ceph_mds_client *mdsc,
    923			       struct ceph_mds_request *req,
    924			       struct inode *dir)
    925{
    926	int ret = 0;
    927
    928	req->r_tid = ++mdsc->last_tid;
    929	if (req->r_num_caps) {
    930		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
    931					req->r_num_caps);
    932		if (ret < 0) {
    933			pr_err("__register_request %p "
    934			       "failed to reserve caps: %d\n", req, ret);
    935			/* set req->r_err to fail early from __do_request */
    936			req->r_err = ret;
    937			return;
    938		}
    939	}
    940	dout("__register_request %p tid %lld\n", req, req->r_tid);
    941	ceph_mdsc_get_request(req);
    942	insert_request(&mdsc->request_tree, req);
    943
    944	req->r_cred = get_current_cred();
    945
    946	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
    947		mdsc->oldest_tid = req->r_tid;
    948
    949	if (dir) {
    950		struct ceph_inode_info *ci = ceph_inode(dir);
    951
    952		ihold(dir);
    953		req->r_unsafe_dir = dir;
    954		spin_lock(&ci->i_unsafe_lock);
    955		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
    956		spin_unlock(&ci->i_unsafe_lock);
    957	}
    958}
    959
    960static void __unregister_request(struct ceph_mds_client *mdsc,
    961				 struct ceph_mds_request *req)
    962{
    963	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
    964
    965	/* Never leave an unregistered request on an unsafe list! */
    966	list_del_init(&req->r_unsafe_item);
    967
    968	if (req->r_tid == mdsc->oldest_tid) {
    969		struct rb_node *p = rb_next(&req->r_node);
    970		mdsc->oldest_tid = 0;
    971		while (p) {
    972			struct ceph_mds_request *next_req =
    973				rb_entry(p, struct ceph_mds_request, r_node);
    974			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
    975				mdsc->oldest_tid = next_req->r_tid;
    976				break;
    977			}
    978			p = rb_next(p);
    979		}
    980	}
    981
    982	erase_request(&mdsc->request_tree, req);
    983
    984	if (req->r_unsafe_dir) {
    985		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
    986		spin_lock(&ci->i_unsafe_lock);
    987		list_del_init(&req->r_unsafe_dir_item);
    988		spin_unlock(&ci->i_unsafe_lock);
    989	}
    990	if (req->r_target_inode &&
    991	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
    992		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
    993		spin_lock(&ci->i_unsafe_lock);
    994		list_del_init(&req->r_unsafe_target_item);
    995		spin_unlock(&ci->i_unsafe_lock);
    996	}
    997
    998	if (req->r_unsafe_dir) {
    999		iput(req->r_unsafe_dir);
   1000		req->r_unsafe_dir = NULL;
   1001	}
   1002
   1003	complete_all(&req->r_safe_completion);
   1004
   1005	ceph_mdsc_put_request(req);
   1006}
   1007
   1008/*
   1009 * Walk back up the dentry tree until we hit a dentry representing a
   1010 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
   1011 * when calling this) to ensure that the objects won't disappear while we're
   1012 * working with them. Once we hit a candidate dentry, we attempt to take a
   1013 * reference to it, and return that as the result.
   1014 */
   1015static struct inode *get_nonsnap_parent(struct dentry *dentry)
   1016{
   1017	struct inode *inode = NULL;
   1018
   1019	while (dentry && !IS_ROOT(dentry)) {
   1020		inode = d_inode_rcu(dentry);
   1021		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
   1022			break;
   1023		dentry = dentry->d_parent;
   1024	}
   1025	if (inode)
   1026		inode = igrab(inode);
   1027	return inode;
   1028}
   1029
   1030/*
   1031 * Choose mds to send request to next.  If there is a hint set in the
   1032 * request (e.g., due to a prior forward hint from the mds), use that.
   1033 * Otherwise, consult frag tree and/or caps to identify the
   1034 * appropriate mds.  If all else fails, choose randomly.
   1035 *
   1036 * Called under mdsc->mutex.
   1037 */
   1038static int __choose_mds(struct ceph_mds_client *mdsc,
   1039			struct ceph_mds_request *req,
   1040			bool *random)
   1041{
   1042	struct inode *inode;
   1043	struct ceph_inode_info *ci;
   1044	struct ceph_cap *cap;
   1045	int mode = req->r_direct_mode;
   1046	int mds = -1;
   1047	u32 hash = req->r_direct_hash;
   1048	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
   1049
   1050	if (random)
   1051		*random = false;
   1052
   1053	/*
   1054	 * is there a specific mds we should try?  ignore hint if we have
   1055	 * no session and the mds is not up (active or recovering).
   1056	 */
   1057	if (req->r_resend_mds >= 0 &&
   1058	    (__have_session(mdsc, req->r_resend_mds) ||
   1059	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
   1060		dout("%s using resend_mds mds%d\n", __func__,
   1061		     req->r_resend_mds);
   1062		return req->r_resend_mds;
   1063	}
   1064
   1065	if (mode == USE_RANDOM_MDS)
   1066		goto random;
   1067
   1068	inode = NULL;
   1069	if (req->r_inode) {
   1070		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
   1071			inode = req->r_inode;
   1072			ihold(inode);
   1073		} else {
   1074			/* req->r_dentry is non-null for LSSNAP request */
   1075			rcu_read_lock();
   1076			inode = get_nonsnap_parent(req->r_dentry);
   1077			rcu_read_unlock();
   1078			dout("%s using snapdir's parent %p\n", __func__, inode);
   1079		}
   1080	} else if (req->r_dentry) {
   1081		/* ignore race with rename; old or new d_parent is okay */
   1082		struct dentry *parent;
   1083		struct inode *dir;
   1084
   1085		rcu_read_lock();
   1086		parent = READ_ONCE(req->r_dentry->d_parent);
   1087		dir = req->r_parent ? : d_inode_rcu(parent);
   1088
   1089		if (!dir || dir->i_sb != mdsc->fsc->sb) {
   1090			/*  not this fs or parent went negative */
   1091			inode = d_inode(req->r_dentry);
   1092			if (inode)
   1093				ihold(inode);
   1094		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
   1095			/* direct snapped/virtual snapdir requests
   1096			 * based on parent dir inode */
   1097			inode = get_nonsnap_parent(parent);
   1098			dout("%s using nonsnap parent %p\n", __func__, inode);
   1099		} else {
   1100			/* dentry target */
   1101			inode = d_inode(req->r_dentry);
   1102			if (!inode || mode == USE_AUTH_MDS) {
   1103				/* dir + name */
   1104				inode = igrab(dir);
   1105				hash = ceph_dentry_hash(dir, req->r_dentry);
   1106				is_hash = true;
   1107			} else {
   1108				ihold(inode);
   1109			}
   1110		}
   1111		rcu_read_unlock();
   1112	}
   1113
   1114	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
   1115	     hash, mode);
   1116	if (!inode)
   1117		goto random;
   1118	ci = ceph_inode(inode);
   1119
   1120	if (is_hash && S_ISDIR(inode->i_mode)) {
   1121		struct ceph_inode_frag frag;
   1122		int found;
   1123
   1124		ceph_choose_frag(ci, hash, &frag, &found);
   1125		if (found) {
   1126			if (mode == USE_ANY_MDS && frag.ndist > 0) {
   1127				u8 r;
   1128
   1129				/* choose a random replica */
   1130				get_random_bytes(&r, 1);
   1131				r %= frag.ndist;
   1132				mds = frag.dist[r];
   1133				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
   1134				     __func__, inode, ceph_vinop(inode),
   1135				     frag.frag, mds, (int)r, frag.ndist);
   1136				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
   1137				    CEPH_MDS_STATE_ACTIVE &&
   1138				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
   1139					goto out;
   1140			}
   1141
   1142			/* since this file/dir wasn't known to be
   1143			 * replicated, then we want to look for the
   1144			 * authoritative mds. */
   1145			if (frag.mds >= 0) {
   1146				/* choose auth mds */
   1147				mds = frag.mds;
   1148				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
   1149				     __func__, inode, ceph_vinop(inode),
   1150				     frag.frag, mds);
   1151				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
   1152				    CEPH_MDS_STATE_ACTIVE) {
   1153					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
   1154								  mds))
   1155						goto out;
   1156				}
   1157			}
   1158			mode = USE_AUTH_MDS;
   1159		}
   1160	}
   1161
   1162	spin_lock(&ci->i_ceph_lock);
   1163	cap = NULL;
   1164	if (mode == USE_AUTH_MDS)
   1165		cap = ci->i_auth_cap;
   1166	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
   1167		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
   1168	if (!cap) {
   1169		spin_unlock(&ci->i_ceph_lock);
   1170		iput(inode);
   1171		goto random;
   1172	}
   1173	mds = cap->session->s_mds;
   1174	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
   1175	     inode, ceph_vinop(inode), mds,
   1176	     cap == ci->i_auth_cap ? "auth " : "", cap);
   1177	spin_unlock(&ci->i_ceph_lock);
   1178out:
   1179	iput(inode);
   1180	return mds;
   1181
   1182random:
   1183	if (random)
   1184		*random = true;
   1185
   1186	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
   1187	dout("%s chose random mds%d\n", __func__, mds);
   1188	return mds;
   1189}
   1190
   1191
   1192/*
   1193 * session messages
   1194 */
   1195struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
   1196{
   1197	struct ceph_msg *msg;
   1198	struct ceph_mds_session_head *h;
   1199
   1200	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
   1201			   false);
   1202	if (!msg) {
   1203		pr_err("ENOMEM creating session %s msg\n",
   1204		       ceph_session_op_name(op));
   1205		return NULL;
   1206	}
   1207	h = msg->front.iov_base;
   1208	h->op = cpu_to_le32(op);
   1209	h->seq = cpu_to_le64(seq);
   1210
   1211	return msg;
   1212}
   1213
   1214static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
   1215#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
   1216static int encode_supported_features(void **p, void *end)
   1217{
   1218	static const size_t count = ARRAY_SIZE(feature_bits);
   1219
   1220	if (count > 0) {
   1221		size_t i;
   1222		size_t size = FEATURE_BYTES(count);
   1223
   1224		if (WARN_ON_ONCE(*p + 4 + size > end))
   1225			return -ERANGE;
   1226
   1227		ceph_encode_32(p, size);
   1228		memset(*p, 0, size);
   1229		for (i = 0; i < count; i++)
   1230			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
   1231		*p += size;
   1232	} else {
   1233		if (WARN_ON_ONCE(*p + 4 > end))
   1234			return -ERANGE;
   1235
   1236		ceph_encode_32(p, 0);
   1237	}
   1238
   1239	return 0;
   1240}
   1241
   1242static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
   1243#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
   1244static int encode_metric_spec(void **p, void *end)
   1245{
   1246	static const size_t count = ARRAY_SIZE(metric_bits);
   1247
   1248	/* header */
   1249	if (WARN_ON_ONCE(*p + 2 > end))
   1250		return -ERANGE;
   1251
   1252	ceph_encode_8(p, 1); /* version */
   1253	ceph_encode_8(p, 1); /* compat */
   1254
   1255	if (count > 0) {
   1256		size_t i;
   1257		size_t size = METRIC_BYTES(count);
   1258
   1259		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
   1260			return -ERANGE;
   1261
   1262		/* metric spec info length */
   1263		ceph_encode_32(p, 4 + size);
   1264
   1265		/* metric spec */
   1266		ceph_encode_32(p, size);
   1267		memset(*p, 0, size);
   1268		for (i = 0; i < count; i++)
   1269			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
   1270		*p += size;
   1271	} else {
   1272		if (WARN_ON_ONCE(*p + 4 + 4 > end))
   1273			return -ERANGE;
   1274
   1275		/* metric spec info length */
   1276		ceph_encode_32(p, 4);
   1277		/* metric spec */
   1278		ceph_encode_32(p, 0);
   1279	}
   1280
   1281	return 0;
   1282}
   1283
   1284/*
   1285 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
   1286 * to include additional client metadata fields.
   1287 */
   1288static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
   1289{
   1290	struct ceph_msg *msg;
   1291	struct ceph_mds_session_head *h;
   1292	int i;
   1293	int extra_bytes = 0;
   1294	int metadata_key_count = 0;
   1295	struct ceph_options *opt = mdsc->fsc->client->options;
   1296	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
   1297	size_t size, count;
   1298	void *p, *end;
   1299	int ret;
   1300
   1301	const char* metadata[][2] = {
   1302		{"hostname", mdsc->nodename},
   1303		{"kernel_version", init_utsname()->release},
   1304		{"entity_id", opt->name ? : ""},
   1305		{"root", fsopt->server_path ? : "/"},
   1306		{NULL, NULL}
   1307	};
   1308
   1309	/* Calculate serialized length of metadata */
   1310	extra_bytes = 4;  /* map length */
   1311	for (i = 0; metadata[i][0]; ++i) {
   1312		extra_bytes += 8 + strlen(metadata[i][0]) +
   1313			strlen(metadata[i][1]);
   1314		metadata_key_count++;
   1315	}
   1316
   1317	/* supported feature */
   1318	size = 0;
   1319	count = ARRAY_SIZE(feature_bits);
   1320	if (count > 0)
   1321		size = FEATURE_BYTES(count);
   1322	extra_bytes += 4 + size;
   1323
   1324	/* metric spec */
   1325	size = 0;
   1326	count = ARRAY_SIZE(metric_bits);
   1327	if (count > 0)
   1328		size = METRIC_BYTES(count);
   1329	extra_bytes += 2 + 4 + 4 + size;
   1330
   1331	/* Allocate the message */
   1332	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
   1333			   GFP_NOFS, false);
   1334	if (!msg) {
   1335		pr_err("ENOMEM creating session open msg\n");
   1336		return ERR_PTR(-ENOMEM);
   1337	}
   1338	p = msg->front.iov_base;
   1339	end = p + msg->front.iov_len;
   1340
   1341	h = p;
   1342	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
   1343	h->seq = cpu_to_le64(seq);
   1344
   1345	/*
   1346	 * Serialize client metadata into waiting buffer space, using
   1347	 * the format that userspace expects for map<string, string>
   1348	 *
   1349	 * ClientSession messages with metadata are v4
   1350	 */
   1351	msg->hdr.version = cpu_to_le16(4);
   1352	msg->hdr.compat_version = cpu_to_le16(1);
   1353
   1354	/* The write pointer, following the session_head structure */
   1355	p += sizeof(*h);
   1356
   1357	/* Number of entries in the map */
   1358	ceph_encode_32(&p, metadata_key_count);
   1359
   1360	/* Two length-prefixed strings for each entry in the map */
   1361	for (i = 0; metadata[i][0]; ++i) {
   1362		size_t const key_len = strlen(metadata[i][0]);
   1363		size_t const val_len = strlen(metadata[i][1]);
   1364
   1365		ceph_encode_32(&p, key_len);
   1366		memcpy(p, metadata[i][0], key_len);
   1367		p += key_len;
   1368		ceph_encode_32(&p, val_len);
   1369		memcpy(p, metadata[i][1], val_len);
   1370		p += val_len;
   1371	}
   1372
   1373	ret = encode_supported_features(&p, end);
   1374	if (ret) {
   1375		pr_err("encode_supported_features failed!\n");
   1376		ceph_msg_put(msg);
   1377		return ERR_PTR(ret);
   1378	}
   1379
   1380	ret = encode_metric_spec(&p, end);
   1381	if (ret) {
   1382		pr_err("encode_metric_spec failed!\n");
   1383		ceph_msg_put(msg);
   1384		return ERR_PTR(ret);
   1385	}
   1386
   1387	msg->front.iov_len = p - msg->front.iov_base;
   1388	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   1389
   1390	return msg;
   1391}
   1392
   1393/*
   1394 * send session open request.
   1395 *
   1396 * called under mdsc->mutex
   1397 */
   1398static int __open_session(struct ceph_mds_client *mdsc,
   1399			  struct ceph_mds_session *session)
   1400{
   1401	struct ceph_msg *msg;
   1402	int mstate;
   1403	int mds = session->s_mds;
   1404
   1405	/* wait for mds to go active? */
   1406	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
   1407	dout("open_session to mds%d (%s)\n", mds,
   1408	     ceph_mds_state_name(mstate));
   1409	session->s_state = CEPH_MDS_SESSION_OPENING;
   1410	session->s_renew_requested = jiffies;
   1411
   1412	/* send connect message */
   1413	msg = create_session_open_msg(mdsc, session->s_seq);
   1414	if (IS_ERR(msg))
   1415		return PTR_ERR(msg);
   1416	ceph_con_send(&session->s_con, msg);
   1417	return 0;
   1418}
   1419
   1420/*
   1421 * open sessions for any export targets for the given mds
   1422 *
   1423 * called under mdsc->mutex
   1424 */
   1425static struct ceph_mds_session *
   1426__open_export_target_session(struct ceph_mds_client *mdsc, int target)
   1427{
   1428	struct ceph_mds_session *session;
   1429	int ret;
   1430
   1431	session = __ceph_lookup_mds_session(mdsc, target);
   1432	if (!session) {
   1433		session = register_session(mdsc, target);
   1434		if (IS_ERR(session))
   1435			return session;
   1436	}
   1437	if (session->s_state == CEPH_MDS_SESSION_NEW ||
   1438	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
   1439		ret = __open_session(mdsc, session);
   1440		if (ret)
   1441			return ERR_PTR(ret);
   1442	}
   1443
   1444	return session;
   1445}
   1446
   1447struct ceph_mds_session *
   1448ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
   1449{
   1450	struct ceph_mds_session *session;
   1451
   1452	dout("open_export_target_session to mds%d\n", target);
   1453
   1454	mutex_lock(&mdsc->mutex);
   1455	session = __open_export_target_session(mdsc, target);
   1456	mutex_unlock(&mdsc->mutex);
   1457
   1458	return session;
   1459}
   1460
   1461static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
   1462					  struct ceph_mds_session *session)
   1463{
   1464	struct ceph_mds_info *mi;
   1465	struct ceph_mds_session *ts;
   1466	int i, mds = session->s_mds;
   1467
   1468	if (mds >= mdsc->mdsmap->possible_max_rank)
   1469		return;
   1470
   1471	mi = &mdsc->mdsmap->m_info[mds];
   1472	dout("open_export_target_sessions for mds%d (%d targets)\n",
   1473	     session->s_mds, mi->num_export_targets);
   1474
   1475	for (i = 0; i < mi->num_export_targets; i++) {
   1476		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
   1477		ceph_put_mds_session(ts);
   1478	}
   1479}
   1480
   1481void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
   1482					   struct ceph_mds_session *session)
   1483{
   1484	mutex_lock(&mdsc->mutex);
   1485	__open_export_target_sessions(mdsc, session);
   1486	mutex_unlock(&mdsc->mutex);
   1487}
   1488
   1489/*
   1490 * session caps
   1491 */
   1492
   1493static void detach_cap_releases(struct ceph_mds_session *session,
   1494				struct list_head *target)
   1495{
   1496	lockdep_assert_held(&session->s_cap_lock);
   1497
   1498	list_splice_init(&session->s_cap_releases, target);
   1499	session->s_num_cap_releases = 0;
   1500	dout("dispose_cap_releases mds%d\n", session->s_mds);
   1501}
   1502
   1503static void dispose_cap_releases(struct ceph_mds_client *mdsc,
   1504				 struct list_head *dispose)
   1505{
   1506	while (!list_empty(dispose)) {
   1507		struct ceph_cap *cap;
   1508		/* zero out the in-progress message */
   1509		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
   1510		list_del(&cap->session_caps);
   1511		ceph_put_cap(mdsc, cap);
   1512	}
   1513}
   1514
   1515static void cleanup_session_requests(struct ceph_mds_client *mdsc,
   1516				     struct ceph_mds_session *session)
   1517{
   1518	struct ceph_mds_request *req;
   1519	struct rb_node *p;
   1520
   1521	dout("cleanup_session_requests mds%d\n", session->s_mds);
   1522	mutex_lock(&mdsc->mutex);
   1523	while (!list_empty(&session->s_unsafe)) {
   1524		req = list_first_entry(&session->s_unsafe,
   1525				       struct ceph_mds_request, r_unsafe_item);
   1526		pr_warn_ratelimited(" dropping unsafe request %llu\n",
   1527				    req->r_tid);
   1528		if (req->r_target_inode)
   1529			mapping_set_error(req->r_target_inode->i_mapping, -EIO);
   1530		if (req->r_unsafe_dir)
   1531			mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
   1532		__unregister_request(mdsc, req);
   1533	}
   1534	/* zero r_attempts, so kick_requests() will re-send requests */
   1535	p = rb_first(&mdsc->request_tree);
   1536	while (p) {
   1537		req = rb_entry(p, struct ceph_mds_request, r_node);
   1538		p = rb_next(p);
   1539		if (req->r_session &&
   1540		    req->r_session->s_mds == session->s_mds)
   1541			req->r_attempts = 0;
   1542	}
   1543	mutex_unlock(&mdsc->mutex);
   1544}
   1545
   1546/*
   1547 * Helper to safely iterate over all caps associated with a session, with
   1548 * special care taken to handle a racing __ceph_remove_cap().
   1549 *
   1550 * Caller must hold session s_mutex.
   1551 */
   1552int ceph_iterate_session_caps(struct ceph_mds_session *session,
   1553			      int (*cb)(struct inode *, struct ceph_cap *,
   1554					void *), void *arg)
   1555{
   1556	struct list_head *p;
   1557	struct ceph_cap *cap;
   1558	struct inode *inode, *last_inode = NULL;
   1559	struct ceph_cap *old_cap = NULL;
   1560	int ret;
   1561
   1562	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
   1563	spin_lock(&session->s_cap_lock);
   1564	p = session->s_caps.next;
   1565	while (p != &session->s_caps) {
   1566		cap = list_entry(p, struct ceph_cap, session_caps);
   1567		inode = igrab(&cap->ci->netfs.inode);
   1568		if (!inode) {
   1569			p = p->next;
   1570			continue;
   1571		}
   1572		session->s_cap_iterator = cap;
   1573		spin_unlock(&session->s_cap_lock);
   1574
   1575		if (last_inode) {
   1576			iput(last_inode);
   1577			last_inode = NULL;
   1578		}
   1579		if (old_cap) {
   1580			ceph_put_cap(session->s_mdsc, old_cap);
   1581			old_cap = NULL;
   1582		}
   1583
   1584		ret = cb(inode, cap, arg);
   1585		last_inode = inode;
   1586
   1587		spin_lock(&session->s_cap_lock);
   1588		p = p->next;
   1589		if (!cap->ci) {
   1590			dout("iterate_session_caps  finishing cap %p removal\n",
   1591			     cap);
   1592			BUG_ON(cap->session != session);
   1593			cap->session = NULL;
   1594			list_del_init(&cap->session_caps);
   1595			session->s_nr_caps--;
   1596			atomic64_dec(&session->s_mdsc->metric.total_caps);
   1597			if (cap->queue_release)
   1598				__ceph_queue_cap_release(session, cap);
   1599			else
   1600				old_cap = cap;  /* put_cap it w/o locks held */
   1601		}
   1602		if (ret < 0)
   1603			goto out;
   1604	}
   1605	ret = 0;
   1606out:
   1607	session->s_cap_iterator = NULL;
   1608	spin_unlock(&session->s_cap_lock);
   1609
   1610	iput(last_inode);
   1611	if (old_cap)
   1612		ceph_put_cap(session->s_mdsc, old_cap);
   1613
   1614	return ret;
   1615}
   1616
   1617static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
   1618				  void *arg)
   1619{
   1620	struct ceph_inode_info *ci = ceph_inode(inode);
   1621	bool invalidate = false;
   1622	int iputs;
   1623
   1624	dout("removing cap %p, ci is %p, inode is %p\n",
   1625	     cap, ci, &ci->netfs.inode);
   1626	spin_lock(&ci->i_ceph_lock);
   1627	iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
   1628	spin_unlock(&ci->i_ceph_lock);
   1629
   1630	wake_up_all(&ci->i_cap_wq);
   1631	if (invalidate)
   1632		ceph_queue_invalidate(inode);
   1633	while (iputs--)
   1634		iput(inode);
   1635	return 0;
   1636}
   1637
   1638/*
   1639 * caller must hold session s_mutex
   1640 */
   1641static void remove_session_caps(struct ceph_mds_session *session)
   1642{
   1643	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
   1644	struct super_block *sb = fsc->sb;
   1645	LIST_HEAD(dispose);
   1646
   1647	dout("remove_session_caps on %p\n", session);
   1648	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
   1649
   1650	wake_up_all(&fsc->mdsc->cap_flushing_wq);
   1651
   1652	spin_lock(&session->s_cap_lock);
   1653	if (session->s_nr_caps > 0) {
   1654		struct inode *inode;
   1655		struct ceph_cap *cap, *prev = NULL;
   1656		struct ceph_vino vino;
   1657		/*
   1658		 * iterate_session_caps() skips inodes that are being
   1659		 * deleted, we need to wait until deletions are complete.
   1660		 * __wait_on_freeing_inode() is designed for the job,
   1661		 * but it is not exported, so use lookup inode function
   1662		 * to access it.
   1663		 */
   1664		while (!list_empty(&session->s_caps)) {
   1665			cap = list_entry(session->s_caps.next,
   1666					 struct ceph_cap, session_caps);
   1667			if (cap == prev)
   1668				break;
   1669			prev = cap;
   1670			vino = cap->ci->i_vino;
   1671			spin_unlock(&session->s_cap_lock);
   1672
   1673			inode = ceph_find_inode(sb, vino);
   1674			iput(inode);
   1675
   1676			spin_lock(&session->s_cap_lock);
   1677		}
   1678	}
   1679
   1680	// drop cap expires and unlock s_cap_lock
   1681	detach_cap_releases(session, &dispose);
   1682
   1683	BUG_ON(session->s_nr_caps > 0);
   1684	BUG_ON(!list_empty(&session->s_cap_flushing));
   1685	spin_unlock(&session->s_cap_lock);
   1686	dispose_cap_releases(session->s_mdsc, &dispose);
   1687}
   1688
   1689enum {
   1690	RECONNECT,
   1691	RENEWCAPS,
   1692	FORCE_RO,
   1693};
   1694
   1695/*
   1696 * wake up any threads waiting on this session's caps.  if the cap is
   1697 * old (didn't get renewed on the client reconnect), remove it now.
   1698 *
   1699 * caller must hold s_mutex.
   1700 */
   1701static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
   1702			      void *arg)
   1703{
   1704	struct ceph_inode_info *ci = ceph_inode(inode);
   1705	unsigned long ev = (unsigned long)arg;
   1706
   1707	if (ev == RECONNECT) {
   1708		spin_lock(&ci->i_ceph_lock);
   1709		ci->i_wanted_max_size = 0;
   1710		ci->i_requested_max_size = 0;
   1711		spin_unlock(&ci->i_ceph_lock);
   1712	} else if (ev == RENEWCAPS) {
   1713		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
   1714			/* mds did not re-issue stale cap */
   1715			spin_lock(&ci->i_ceph_lock);
   1716			cap->issued = cap->implemented = CEPH_CAP_PIN;
   1717			spin_unlock(&ci->i_ceph_lock);
   1718		}
   1719	} else if (ev == FORCE_RO) {
   1720	}
   1721	wake_up_all(&ci->i_cap_wq);
   1722	return 0;
   1723}
   1724
   1725static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
   1726{
   1727	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
   1728	ceph_iterate_session_caps(session, wake_up_session_cb,
   1729				  (void *)(unsigned long)ev);
   1730}
   1731
   1732/*
   1733 * Send periodic message to MDS renewing all currently held caps.  The
   1734 * ack will reset the expiration for all caps from this session.
   1735 *
   1736 * caller holds s_mutex
   1737 */
   1738static int send_renew_caps(struct ceph_mds_client *mdsc,
   1739			   struct ceph_mds_session *session)
   1740{
   1741	struct ceph_msg *msg;
   1742	int state;
   1743
   1744	if (time_after_eq(jiffies, session->s_cap_ttl) &&
   1745	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
   1746		pr_info("mds%d caps stale\n", session->s_mds);
   1747	session->s_renew_requested = jiffies;
   1748
   1749	/* do not try to renew caps until a recovering mds has reconnected
   1750	 * with its clients. */
   1751	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
   1752	if (state < CEPH_MDS_STATE_RECONNECT) {
   1753		dout("send_renew_caps ignoring mds%d (%s)\n",
   1754		     session->s_mds, ceph_mds_state_name(state));
   1755		return 0;
   1756	}
   1757
   1758	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
   1759		ceph_mds_state_name(state));
   1760	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
   1761				      ++session->s_renew_seq);
   1762	if (!msg)
   1763		return -ENOMEM;
   1764	ceph_con_send(&session->s_con, msg);
   1765	return 0;
   1766}
   1767
   1768static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
   1769			     struct ceph_mds_session *session, u64 seq)
   1770{
   1771	struct ceph_msg *msg;
   1772
   1773	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
   1774	     session->s_mds, ceph_session_state_name(session->s_state), seq);
   1775	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
   1776	if (!msg)
   1777		return -ENOMEM;
   1778	ceph_con_send(&session->s_con, msg);
   1779	return 0;
   1780}
   1781
   1782
   1783/*
   1784 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
   1785 *
   1786 * Called under session->s_mutex
   1787 */
   1788static void renewed_caps(struct ceph_mds_client *mdsc,
   1789			 struct ceph_mds_session *session, int is_renew)
   1790{
   1791	int was_stale;
   1792	int wake = 0;
   1793
   1794	spin_lock(&session->s_cap_lock);
   1795	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
   1796
   1797	session->s_cap_ttl = session->s_renew_requested +
   1798		mdsc->mdsmap->m_session_timeout*HZ;
   1799
   1800	if (was_stale) {
   1801		if (time_before(jiffies, session->s_cap_ttl)) {
   1802			pr_info("mds%d caps renewed\n", session->s_mds);
   1803			wake = 1;
   1804		} else {
   1805			pr_info("mds%d caps still stale\n", session->s_mds);
   1806		}
   1807	}
   1808	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
   1809	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
   1810	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
   1811	spin_unlock(&session->s_cap_lock);
   1812
   1813	if (wake)
   1814		wake_up_session_caps(session, RENEWCAPS);
   1815}
   1816
   1817/*
   1818 * send a session close request
   1819 */
   1820static int request_close_session(struct ceph_mds_session *session)
   1821{
   1822	struct ceph_msg *msg;
   1823
   1824	dout("request_close_session mds%d state %s seq %lld\n",
   1825	     session->s_mds, ceph_session_state_name(session->s_state),
   1826	     session->s_seq);
   1827	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
   1828				      session->s_seq);
   1829	if (!msg)
   1830		return -ENOMEM;
   1831	ceph_con_send(&session->s_con, msg);
   1832	return 1;
   1833}
   1834
   1835/*
   1836 * Called with s_mutex held.
   1837 */
   1838static int __close_session(struct ceph_mds_client *mdsc,
   1839			 struct ceph_mds_session *session)
   1840{
   1841	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
   1842		return 0;
   1843	session->s_state = CEPH_MDS_SESSION_CLOSING;
   1844	return request_close_session(session);
   1845}
   1846
   1847static bool drop_negative_children(struct dentry *dentry)
   1848{
   1849	struct dentry *child;
   1850	bool all_negative = true;
   1851
   1852	if (!d_is_dir(dentry))
   1853		goto out;
   1854
   1855	spin_lock(&dentry->d_lock);
   1856	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
   1857		if (d_really_is_positive(child)) {
   1858			all_negative = false;
   1859			break;
   1860		}
   1861	}
   1862	spin_unlock(&dentry->d_lock);
   1863
   1864	if (all_negative)
   1865		shrink_dcache_parent(dentry);
   1866out:
   1867	return all_negative;
   1868}
   1869
   1870/*
   1871 * Trim old(er) caps.
   1872 *
   1873 * Because we can't cache an inode without one or more caps, we do
   1874 * this indirectly: if a cap is unused, we prune its aliases, at which
   1875 * point the inode will hopefully get dropped to.
   1876 *
   1877 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
   1878 * memory pressure from the MDS, though, so it needn't be perfect.
   1879 */
   1880static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
   1881{
   1882	int *remaining = arg;
   1883	struct ceph_inode_info *ci = ceph_inode(inode);
   1884	int used, wanted, oissued, mine;
   1885
   1886	if (*remaining <= 0)
   1887		return -1;
   1888
   1889	spin_lock(&ci->i_ceph_lock);
   1890	mine = cap->issued | cap->implemented;
   1891	used = __ceph_caps_used(ci);
   1892	wanted = __ceph_caps_file_wanted(ci);
   1893	oissued = __ceph_caps_issued_other(ci, cap);
   1894
   1895	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
   1896	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
   1897	     ceph_cap_string(used), ceph_cap_string(wanted));
   1898	if (cap == ci->i_auth_cap) {
   1899		if (ci->i_dirty_caps || ci->i_flushing_caps ||
   1900		    !list_empty(&ci->i_cap_snaps))
   1901			goto out;
   1902		if ((used | wanted) & CEPH_CAP_ANY_WR)
   1903			goto out;
   1904		/* Note: it's possible that i_filelock_ref becomes non-zero
   1905		 * after dropping auth caps. It doesn't hurt because reply
   1906		 * of lock mds request will re-add auth caps. */
   1907		if (atomic_read(&ci->i_filelock_ref) > 0)
   1908			goto out;
   1909	}
   1910	/* The inode has cached pages, but it's no longer used.
   1911	 * we can safely drop it */
   1912	if (S_ISREG(inode->i_mode) &&
   1913	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
   1914	    !(oissued & CEPH_CAP_FILE_CACHE)) {
   1915	  used = 0;
   1916	  oissued = 0;
   1917	}
   1918	if ((used | wanted) & ~oissued & mine)
   1919		goto out;   /* we need these caps */
   1920
   1921	if (oissued) {
   1922		/* we aren't the only cap.. just remove us */
   1923		ceph_remove_cap(cap, true);
   1924		(*remaining)--;
   1925	} else {
   1926		struct dentry *dentry;
   1927		/* try dropping referring dentries */
   1928		spin_unlock(&ci->i_ceph_lock);
   1929		dentry = d_find_any_alias(inode);
   1930		if (dentry && drop_negative_children(dentry)) {
   1931			int count;
   1932			dput(dentry);
   1933			d_prune_aliases(inode);
   1934			count = atomic_read(&inode->i_count);
   1935			if (count == 1)
   1936				(*remaining)--;
   1937			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
   1938			     inode, cap, count);
   1939		} else {
   1940			dput(dentry);
   1941		}
   1942		return 0;
   1943	}
   1944
   1945out:
   1946	spin_unlock(&ci->i_ceph_lock);
   1947	return 0;
   1948}
   1949
   1950/*
   1951 * Trim session cap count down to some max number.
   1952 */
   1953int ceph_trim_caps(struct ceph_mds_client *mdsc,
   1954		   struct ceph_mds_session *session,
   1955		   int max_caps)
   1956{
   1957	int trim_caps = session->s_nr_caps - max_caps;
   1958
   1959	dout("trim_caps mds%d start: %d / %d, trim %d\n",
   1960	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
   1961	if (trim_caps > 0) {
   1962		int remaining = trim_caps;
   1963
   1964		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
   1965		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
   1966		     session->s_mds, session->s_nr_caps, max_caps,
   1967			trim_caps - remaining);
   1968	}
   1969
   1970	ceph_flush_cap_releases(mdsc, session);
   1971	return 0;
   1972}
   1973
   1974static int check_caps_flush(struct ceph_mds_client *mdsc,
   1975			    u64 want_flush_tid)
   1976{
   1977	int ret = 1;
   1978
   1979	spin_lock(&mdsc->cap_dirty_lock);
   1980	if (!list_empty(&mdsc->cap_flush_list)) {
   1981		struct ceph_cap_flush *cf =
   1982			list_first_entry(&mdsc->cap_flush_list,
   1983					 struct ceph_cap_flush, g_list);
   1984		if (cf->tid <= want_flush_tid) {
   1985			dout("check_caps_flush still flushing tid "
   1986			     "%llu <= %llu\n", cf->tid, want_flush_tid);
   1987			ret = 0;
   1988		}
   1989	}
   1990	spin_unlock(&mdsc->cap_dirty_lock);
   1991	return ret;
   1992}
   1993
   1994/*
   1995 * flush all dirty inode data to disk.
   1996 *
   1997 * returns true if we've flushed through want_flush_tid
   1998 */
   1999static void wait_caps_flush(struct ceph_mds_client *mdsc,
   2000			    u64 want_flush_tid)
   2001{
   2002	dout("check_caps_flush want %llu\n", want_flush_tid);
   2003
   2004	wait_event(mdsc->cap_flushing_wq,
   2005		   check_caps_flush(mdsc, want_flush_tid));
   2006
   2007	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
   2008}
   2009
   2010/*
   2011 * called under s_mutex
   2012 */
   2013static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
   2014				   struct ceph_mds_session *session)
   2015{
   2016	struct ceph_msg *msg = NULL;
   2017	struct ceph_mds_cap_release *head;
   2018	struct ceph_mds_cap_item *item;
   2019	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
   2020	struct ceph_cap *cap;
   2021	LIST_HEAD(tmp_list);
   2022	int num_cap_releases;
   2023	__le32	barrier, *cap_barrier;
   2024
   2025	down_read(&osdc->lock);
   2026	barrier = cpu_to_le32(osdc->epoch_barrier);
   2027	up_read(&osdc->lock);
   2028
   2029	spin_lock(&session->s_cap_lock);
   2030again:
   2031	list_splice_init(&session->s_cap_releases, &tmp_list);
   2032	num_cap_releases = session->s_num_cap_releases;
   2033	session->s_num_cap_releases = 0;
   2034	spin_unlock(&session->s_cap_lock);
   2035
   2036	while (!list_empty(&tmp_list)) {
   2037		if (!msg) {
   2038			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
   2039					PAGE_SIZE, GFP_NOFS, false);
   2040			if (!msg)
   2041				goto out_err;
   2042			head = msg->front.iov_base;
   2043			head->num = cpu_to_le32(0);
   2044			msg->front.iov_len = sizeof(*head);
   2045
   2046			msg->hdr.version = cpu_to_le16(2);
   2047			msg->hdr.compat_version = cpu_to_le16(1);
   2048		}
   2049
   2050		cap = list_first_entry(&tmp_list, struct ceph_cap,
   2051					session_caps);
   2052		list_del(&cap->session_caps);
   2053		num_cap_releases--;
   2054
   2055		head = msg->front.iov_base;
   2056		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
   2057				   &head->num);
   2058		item = msg->front.iov_base + msg->front.iov_len;
   2059		item->ino = cpu_to_le64(cap->cap_ino);
   2060		item->cap_id = cpu_to_le64(cap->cap_id);
   2061		item->migrate_seq = cpu_to_le32(cap->mseq);
   2062		item->seq = cpu_to_le32(cap->issue_seq);
   2063		msg->front.iov_len += sizeof(*item);
   2064
   2065		ceph_put_cap(mdsc, cap);
   2066
   2067		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
   2068			// Append cap_barrier field
   2069			cap_barrier = msg->front.iov_base + msg->front.iov_len;
   2070			*cap_barrier = barrier;
   2071			msg->front.iov_len += sizeof(*cap_barrier);
   2072
   2073			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   2074			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
   2075			ceph_con_send(&session->s_con, msg);
   2076			msg = NULL;
   2077		}
   2078	}
   2079
   2080	BUG_ON(num_cap_releases != 0);
   2081
   2082	spin_lock(&session->s_cap_lock);
   2083	if (!list_empty(&session->s_cap_releases))
   2084		goto again;
   2085	spin_unlock(&session->s_cap_lock);
   2086
   2087	if (msg) {
   2088		// Append cap_barrier field
   2089		cap_barrier = msg->front.iov_base + msg->front.iov_len;
   2090		*cap_barrier = barrier;
   2091		msg->front.iov_len += sizeof(*cap_barrier);
   2092
   2093		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   2094		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
   2095		ceph_con_send(&session->s_con, msg);
   2096	}
   2097	return;
   2098out_err:
   2099	pr_err("send_cap_releases mds%d, failed to allocate message\n",
   2100		session->s_mds);
   2101	spin_lock(&session->s_cap_lock);
   2102	list_splice(&tmp_list, &session->s_cap_releases);
   2103	session->s_num_cap_releases += num_cap_releases;
   2104	spin_unlock(&session->s_cap_lock);
   2105}
   2106
   2107static void ceph_cap_release_work(struct work_struct *work)
   2108{
   2109	struct ceph_mds_session *session =
   2110		container_of(work, struct ceph_mds_session, s_cap_release_work);
   2111
   2112	mutex_lock(&session->s_mutex);
   2113	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
   2114	    session->s_state == CEPH_MDS_SESSION_HUNG)
   2115		ceph_send_cap_releases(session->s_mdsc, session);
   2116	mutex_unlock(&session->s_mutex);
   2117	ceph_put_mds_session(session);
   2118}
   2119
   2120void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
   2121		             struct ceph_mds_session *session)
   2122{
   2123	if (mdsc->stopping)
   2124		return;
   2125
   2126	ceph_get_mds_session(session);
   2127	if (queue_work(mdsc->fsc->cap_wq,
   2128		       &session->s_cap_release_work)) {
   2129		dout("cap release work queued\n");
   2130	} else {
   2131		ceph_put_mds_session(session);
   2132		dout("failed to queue cap release work\n");
   2133	}
   2134}
   2135
   2136/*
   2137 * caller holds session->s_cap_lock
   2138 */
   2139void __ceph_queue_cap_release(struct ceph_mds_session *session,
   2140			      struct ceph_cap *cap)
   2141{
   2142	list_add_tail(&cap->session_caps, &session->s_cap_releases);
   2143	session->s_num_cap_releases++;
   2144
   2145	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
   2146		ceph_flush_cap_releases(session->s_mdsc, session);
   2147}
   2148
   2149static void ceph_cap_reclaim_work(struct work_struct *work)
   2150{
   2151	struct ceph_mds_client *mdsc =
   2152		container_of(work, struct ceph_mds_client, cap_reclaim_work);
   2153	int ret = ceph_trim_dentries(mdsc);
   2154	if (ret == -EAGAIN)
   2155		ceph_queue_cap_reclaim_work(mdsc);
   2156}
   2157
   2158void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
   2159{
   2160	if (mdsc->stopping)
   2161		return;
   2162
   2163        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
   2164                dout("caps reclaim work queued\n");
   2165        } else {
   2166                dout("failed to queue caps release work\n");
   2167        }
   2168}
   2169
   2170void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
   2171{
   2172	int val;
   2173	if (!nr)
   2174		return;
   2175	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
   2176	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
   2177		atomic_set(&mdsc->cap_reclaim_pending, 0);
   2178		ceph_queue_cap_reclaim_work(mdsc);
   2179	}
   2180}
   2181
   2182/*
   2183 * requests
   2184 */
   2185
   2186int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
   2187				    struct inode *dir)
   2188{
   2189	struct ceph_inode_info *ci = ceph_inode(dir);
   2190	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
   2191	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
   2192	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
   2193	unsigned int num_entries;
   2194	int order;
   2195
   2196	spin_lock(&ci->i_ceph_lock);
   2197	num_entries = ci->i_files + ci->i_subdirs;
   2198	spin_unlock(&ci->i_ceph_lock);
   2199	num_entries = max(num_entries, 1U);
   2200	num_entries = min(num_entries, opt->max_readdir);
   2201
   2202	order = get_order(size * num_entries);
   2203	while (order >= 0) {
   2204		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
   2205							     __GFP_NOWARN |
   2206							     __GFP_ZERO,
   2207							     order);
   2208		if (rinfo->dir_entries)
   2209			break;
   2210		order--;
   2211	}
   2212	if (!rinfo->dir_entries)
   2213		return -ENOMEM;
   2214
   2215	num_entries = (PAGE_SIZE << order) / size;
   2216	num_entries = min(num_entries, opt->max_readdir);
   2217
   2218	rinfo->dir_buf_size = PAGE_SIZE << order;
   2219	req->r_num_caps = num_entries + 1;
   2220	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
   2221	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
   2222	return 0;
   2223}
   2224
   2225/*
   2226 * Create an mds request.
   2227 */
   2228struct ceph_mds_request *
   2229ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
   2230{
   2231	struct ceph_mds_request *req;
   2232
   2233	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
   2234	if (!req)
   2235		return ERR_PTR(-ENOMEM);
   2236
   2237	mutex_init(&req->r_fill_mutex);
   2238	req->r_mdsc = mdsc;
   2239	req->r_started = jiffies;
   2240	req->r_start_latency = ktime_get();
   2241	req->r_resend_mds = -1;
   2242	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
   2243	INIT_LIST_HEAD(&req->r_unsafe_target_item);
   2244	req->r_fmode = -1;
   2245	kref_init(&req->r_kref);
   2246	RB_CLEAR_NODE(&req->r_node);
   2247	INIT_LIST_HEAD(&req->r_wait);
   2248	init_completion(&req->r_completion);
   2249	init_completion(&req->r_safe_completion);
   2250	INIT_LIST_HEAD(&req->r_unsafe_item);
   2251
   2252	ktime_get_coarse_real_ts64(&req->r_stamp);
   2253
   2254	req->r_op = op;
   2255	req->r_direct_mode = mode;
   2256	return req;
   2257}
   2258
   2259/*
   2260 * return oldest (lowest) request, tid in request tree, 0 if none.
   2261 *
   2262 * called under mdsc->mutex.
   2263 */
   2264static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
   2265{
   2266	if (RB_EMPTY_ROOT(&mdsc->request_tree))
   2267		return NULL;
   2268	return rb_entry(rb_first(&mdsc->request_tree),
   2269			struct ceph_mds_request, r_node);
   2270}
   2271
   2272static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
   2273{
   2274	return mdsc->oldest_tid;
   2275}
   2276
   2277/*
   2278 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
   2279 * on build_path_from_dentry in fs/cifs/dir.c.
   2280 *
   2281 * If @stop_on_nosnap, generate path relative to the first non-snapped
   2282 * inode.
   2283 *
   2284 * Encode hidden .snap dirs as a double /, i.e.
   2285 *   foo/.snap/bar -> foo//bar
   2286 */
   2287char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
   2288			   int stop_on_nosnap)
   2289{
   2290	struct dentry *temp;
   2291	char *path;
   2292	int pos;
   2293	unsigned seq;
   2294	u64 base;
   2295
   2296	if (!dentry)
   2297		return ERR_PTR(-EINVAL);
   2298
   2299	path = __getname();
   2300	if (!path)
   2301		return ERR_PTR(-ENOMEM);
   2302retry:
   2303	pos = PATH_MAX - 1;
   2304	path[pos] = '\0';
   2305
   2306	seq = read_seqbegin(&rename_lock);
   2307	rcu_read_lock();
   2308	temp = dentry;
   2309	for (;;) {
   2310		struct inode *inode;
   2311
   2312		spin_lock(&temp->d_lock);
   2313		inode = d_inode(temp);
   2314		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
   2315			dout("build_path path+%d: %p SNAPDIR\n",
   2316			     pos, temp);
   2317		} else if (stop_on_nosnap && inode && dentry != temp &&
   2318			   ceph_snap(inode) == CEPH_NOSNAP) {
   2319			spin_unlock(&temp->d_lock);
   2320			pos++; /* get rid of any prepended '/' */
   2321			break;
   2322		} else {
   2323			pos -= temp->d_name.len;
   2324			if (pos < 0) {
   2325				spin_unlock(&temp->d_lock);
   2326				break;
   2327			}
   2328			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
   2329		}
   2330		spin_unlock(&temp->d_lock);
   2331		temp = READ_ONCE(temp->d_parent);
   2332
   2333		/* Are we at the root? */
   2334		if (IS_ROOT(temp))
   2335			break;
   2336
   2337		/* Are we out of buffer? */
   2338		if (--pos < 0)
   2339			break;
   2340
   2341		path[pos] = '/';
   2342	}
   2343	base = ceph_ino(d_inode(temp));
   2344	rcu_read_unlock();
   2345
   2346	if (read_seqretry(&rename_lock, seq))
   2347		goto retry;
   2348
   2349	if (pos < 0) {
   2350		/*
   2351		 * A rename didn't occur, but somehow we didn't end up where
   2352		 * we thought we would. Throw a warning and try again.
   2353		 */
   2354		pr_warn("build_path did not end path lookup where "
   2355			"expected, pos is %d\n", pos);
   2356		goto retry;
   2357	}
   2358
   2359	*pbase = base;
   2360	*plen = PATH_MAX - 1 - pos;
   2361	dout("build_path on %p %d built %llx '%.*s'\n",
   2362	     dentry, d_count(dentry), base, *plen, path + pos);
   2363	return path + pos;
   2364}
   2365
   2366static int build_dentry_path(struct dentry *dentry, struct inode *dir,
   2367			     const char **ppath, int *ppathlen, u64 *pino,
   2368			     bool *pfreepath, bool parent_locked)
   2369{
   2370	char *path;
   2371
   2372	rcu_read_lock();
   2373	if (!dir)
   2374		dir = d_inode_rcu(dentry->d_parent);
   2375	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
   2376		*pino = ceph_ino(dir);
   2377		rcu_read_unlock();
   2378		*ppath = dentry->d_name.name;
   2379		*ppathlen = dentry->d_name.len;
   2380		return 0;
   2381	}
   2382	rcu_read_unlock();
   2383	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
   2384	if (IS_ERR(path))
   2385		return PTR_ERR(path);
   2386	*ppath = path;
   2387	*pfreepath = true;
   2388	return 0;
   2389}
   2390
   2391static int build_inode_path(struct inode *inode,
   2392			    const char **ppath, int *ppathlen, u64 *pino,
   2393			    bool *pfreepath)
   2394{
   2395	struct dentry *dentry;
   2396	char *path;
   2397
   2398	if (ceph_snap(inode) == CEPH_NOSNAP) {
   2399		*pino = ceph_ino(inode);
   2400		*ppathlen = 0;
   2401		return 0;
   2402	}
   2403	dentry = d_find_alias(inode);
   2404	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
   2405	dput(dentry);
   2406	if (IS_ERR(path))
   2407		return PTR_ERR(path);
   2408	*ppath = path;
   2409	*pfreepath = true;
   2410	return 0;
   2411}
   2412
   2413/*
   2414 * request arguments may be specified via an inode *, a dentry *, or
   2415 * an explicit ino+path.
   2416 */
   2417static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
   2418				  struct inode *rdiri, const char *rpath,
   2419				  u64 rino, const char **ppath, int *pathlen,
   2420				  u64 *ino, bool *freepath, bool parent_locked)
   2421{
   2422	int r = 0;
   2423
   2424	if (rinode) {
   2425		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
   2426		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
   2427		     ceph_snap(rinode));
   2428	} else if (rdentry) {
   2429		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
   2430					freepath, parent_locked);
   2431		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
   2432		     *ppath);
   2433	} else if (rpath || rino) {
   2434		*ino = rino;
   2435		*ppath = rpath;
   2436		*pathlen = rpath ? strlen(rpath) : 0;
   2437		dout(" path %.*s\n", *pathlen, rpath);
   2438	}
   2439
   2440	return r;
   2441}
   2442
   2443static void encode_timestamp_and_gids(void **p,
   2444				      const struct ceph_mds_request *req)
   2445{
   2446	struct ceph_timespec ts;
   2447	int i;
   2448
   2449	ceph_encode_timespec64(&ts, &req->r_stamp);
   2450	ceph_encode_copy(p, &ts, sizeof(ts));
   2451
   2452	/* gid_list */
   2453	ceph_encode_32(p, req->r_cred->group_info->ngroups);
   2454	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
   2455		ceph_encode_64(p, from_kgid(&init_user_ns,
   2456					    req->r_cred->group_info->gid[i]));
   2457}
   2458
   2459/*
   2460 * called under mdsc->mutex
   2461 */
   2462static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
   2463					       struct ceph_mds_request *req,
   2464					       bool drop_cap_releases)
   2465{
   2466	int mds = session->s_mds;
   2467	struct ceph_mds_client *mdsc = session->s_mdsc;
   2468	struct ceph_msg *msg;
   2469	struct ceph_mds_request_head_old *head;
   2470	const char *path1 = NULL;
   2471	const char *path2 = NULL;
   2472	u64 ino1 = 0, ino2 = 0;
   2473	int pathlen1 = 0, pathlen2 = 0;
   2474	bool freepath1 = false, freepath2 = false;
   2475	int len;
   2476	u16 releases;
   2477	void *p, *end;
   2478	int ret;
   2479	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
   2480
   2481	ret = set_request_path_attr(req->r_inode, req->r_dentry,
   2482			      req->r_parent, req->r_path1, req->r_ino1.ino,
   2483			      &path1, &pathlen1, &ino1, &freepath1,
   2484			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
   2485					&req->r_req_flags));
   2486	if (ret < 0) {
   2487		msg = ERR_PTR(ret);
   2488		goto out;
   2489	}
   2490
   2491	/* If r_old_dentry is set, then assume that its parent is locked */
   2492	ret = set_request_path_attr(NULL, req->r_old_dentry,
   2493			      req->r_old_dentry_dir,
   2494			      req->r_path2, req->r_ino2.ino,
   2495			      &path2, &pathlen2, &ino2, &freepath2, true);
   2496	if (ret < 0) {
   2497		msg = ERR_PTR(ret);
   2498		goto out_free1;
   2499	}
   2500
   2501	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
   2502	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
   2503		sizeof(struct ceph_timespec);
   2504	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
   2505
   2506	/* calculate (max) length for cap releases */
   2507	len += sizeof(struct ceph_mds_request_release) *
   2508		(!!req->r_inode_drop + !!req->r_dentry_drop +
   2509		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
   2510
   2511	if (req->r_dentry_drop)
   2512		len += pathlen1;
   2513	if (req->r_old_dentry_drop)
   2514		len += pathlen2;
   2515
   2516	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
   2517	if (!msg) {
   2518		msg = ERR_PTR(-ENOMEM);
   2519		goto out_free2;
   2520	}
   2521
   2522	msg->hdr.tid = cpu_to_le64(req->r_tid);
   2523
   2524	/*
   2525	 * The old ceph_mds_request_head didn't contain a version field, and
   2526	 * one was added when we moved the message version from 3->4.
   2527	 */
   2528	if (legacy) {
   2529		msg->hdr.version = cpu_to_le16(3);
   2530		head = msg->front.iov_base;
   2531		p = msg->front.iov_base + sizeof(*head);
   2532	} else {
   2533		struct ceph_mds_request_head *new_head = msg->front.iov_base;
   2534
   2535		msg->hdr.version = cpu_to_le16(4);
   2536		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
   2537		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
   2538		p = msg->front.iov_base + sizeof(*new_head);
   2539	}
   2540
   2541	end = msg->front.iov_base + msg->front.iov_len;
   2542
   2543	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
   2544	head->op = cpu_to_le32(req->r_op);
   2545	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
   2546						 req->r_cred->fsuid));
   2547	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
   2548						 req->r_cred->fsgid));
   2549	head->ino = cpu_to_le64(req->r_deleg_ino);
   2550	head->args = req->r_args;
   2551
   2552	ceph_encode_filepath(&p, end, ino1, path1);
   2553	ceph_encode_filepath(&p, end, ino2, path2);
   2554
   2555	/* make note of release offset, in case we need to replay */
   2556	req->r_request_release_offset = p - msg->front.iov_base;
   2557
   2558	/* cap releases */
   2559	releases = 0;
   2560	if (req->r_inode_drop)
   2561		releases += ceph_encode_inode_release(&p,
   2562		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
   2563		      mds, req->r_inode_drop, req->r_inode_unless,
   2564		      req->r_op == CEPH_MDS_OP_READDIR);
   2565	if (req->r_dentry_drop)
   2566		releases += ceph_encode_dentry_release(&p, req->r_dentry,
   2567				req->r_parent, mds, req->r_dentry_drop,
   2568				req->r_dentry_unless);
   2569	if (req->r_old_dentry_drop)
   2570		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
   2571				req->r_old_dentry_dir, mds,
   2572				req->r_old_dentry_drop,
   2573				req->r_old_dentry_unless);
   2574	if (req->r_old_inode_drop)
   2575		releases += ceph_encode_inode_release(&p,
   2576		      d_inode(req->r_old_dentry),
   2577		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
   2578
   2579	if (drop_cap_releases) {
   2580		releases = 0;
   2581		p = msg->front.iov_base + req->r_request_release_offset;
   2582	}
   2583
   2584	head->num_releases = cpu_to_le16(releases);
   2585
   2586	encode_timestamp_and_gids(&p, req);
   2587
   2588	if (WARN_ON_ONCE(p > end)) {
   2589		ceph_msg_put(msg);
   2590		msg = ERR_PTR(-ERANGE);
   2591		goto out_free2;
   2592	}
   2593
   2594	msg->front.iov_len = p - msg->front.iov_base;
   2595	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   2596
   2597	if (req->r_pagelist) {
   2598		struct ceph_pagelist *pagelist = req->r_pagelist;
   2599		ceph_msg_data_add_pagelist(msg, pagelist);
   2600		msg->hdr.data_len = cpu_to_le32(pagelist->length);
   2601	} else {
   2602		msg->hdr.data_len = 0;
   2603	}
   2604
   2605	msg->hdr.data_off = cpu_to_le16(0);
   2606
   2607out_free2:
   2608	if (freepath2)
   2609		ceph_mdsc_free_path((char *)path2, pathlen2);
   2610out_free1:
   2611	if (freepath1)
   2612		ceph_mdsc_free_path((char *)path1, pathlen1);
   2613out:
   2614	return msg;
   2615}
   2616
   2617/*
   2618 * called under mdsc->mutex if error, under no mutex if
   2619 * success.
   2620 */
   2621static void complete_request(struct ceph_mds_client *mdsc,
   2622			     struct ceph_mds_request *req)
   2623{
   2624	req->r_end_latency = ktime_get();
   2625
   2626	if (req->r_callback)
   2627		req->r_callback(mdsc, req);
   2628	complete_all(&req->r_completion);
   2629}
   2630
   2631static struct ceph_mds_request_head_old *
   2632find_old_request_head(void *p, u64 features)
   2633{
   2634	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
   2635	struct ceph_mds_request_head *new_head;
   2636
   2637	if (legacy)
   2638		return (struct ceph_mds_request_head_old *)p;
   2639	new_head = (struct ceph_mds_request_head *)p;
   2640	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
   2641}
   2642
   2643/*
   2644 * called under mdsc->mutex
   2645 */
   2646static int __prepare_send_request(struct ceph_mds_session *session,
   2647				  struct ceph_mds_request *req,
   2648				  bool drop_cap_releases)
   2649{
   2650	int mds = session->s_mds;
   2651	struct ceph_mds_client *mdsc = session->s_mdsc;
   2652	struct ceph_mds_request_head_old *rhead;
   2653	struct ceph_msg *msg;
   2654	int flags = 0, max_retry;
   2655
   2656	/*
   2657	 * The type of 'r_attempts' in kernel 'ceph_mds_request'
   2658	 * is 'int', while in 'ceph_mds_request_head' the type of
   2659	 * 'num_retry' is '__u8'. So in case the request retries
   2660	 *  exceeding 256 times, the MDS will receive a incorrect
   2661	 *  retry seq.
   2662	 *
   2663	 * In this case it's ususally a bug in MDS and continue
   2664	 * retrying the request makes no sense.
   2665	 *
   2666	 * In future this could be fixed in ceph code, so avoid
   2667	 * using the hardcode here.
   2668	 */
   2669	max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
   2670	max_retry = 1 << (max_retry * BITS_PER_BYTE);
   2671	if (req->r_attempts >= max_retry) {
   2672		pr_warn_ratelimited("%s request tid %llu seq overflow\n",
   2673				    __func__, req->r_tid);
   2674		return -EMULTIHOP;
   2675	}
   2676
   2677	req->r_attempts++;
   2678	if (req->r_inode) {
   2679		struct ceph_cap *cap =
   2680			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
   2681
   2682		if (cap)
   2683			req->r_sent_on_mseq = cap->mseq;
   2684		else
   2685			req->r_sent_on_mseq = -1;
   2686	}
   2687	dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
   2688	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
   2689
   2690	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
   2691		void *p;
   2692
   2693		/*
   2694		 * Replay.  Do not regenerate message (and rebuild
   2695		 * paths, etc.); just use the original message.
   2696		 * Rebuilding paths will break for renames because
   2697		 * d_move mangles the src name.
   2698		 */
   2699		msg = req->r_request;
   2700		rhead = find_old_request_head(msg->front.iov_base,
   2701					      session->s_con.peer_features);
   2702
   2703		flags = le32_to_cpu(rhead->flags);
   2704		flags |= CEPH_MDS_FLAG_REPLAY;
   2705		rhead->flags = cpu_to_le32(flags);
   2706
   2707		if (req->r_target_inode)
   2708			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
   2709
   2710		rhead->num_retry = req->r_attempts - 1;
   2711
   2712		/* remove cap/dentry releases from message */
   2713		rhead->num_releases = 0;
   2714
   2715		p = msg->front.iov_base + req->r_request_release_offset;
   2716		encode_timestamp_and_gids(&p, req);
   2717
   2718		msg->front.iov_len = p - msg->front.iov_base;
   2719		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   2720		return 0;
   2721	}
   2722
   2723	if (req->r_request) {
   2724		ceph_msg_put(req->r_request);
   2725		req->r_request = NULL;
   2726	}
   2727	msg = create_request_message(session, req, drop_cap_releases);
   2728	if (IS_ERR(msg)) {
   2729		req->r_err = PTR_ERR(msg);
   2730		return PTR_ERR(msg);
   2731	}
   2732	req->r_request = msg;
   2733
   2734	rhead = find_old_request_head(msg->front.iov_base,
   2735				      session->s_con.peer_features);
   2736	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
   2737	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
   2738		flags |= CEPH_MDS_FLAG_REPLAY;
   2739	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
   2740		flags |= CEPH_MDS_FLAG_ASYNC;
   2741	if (req->r_parent)
   2742		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
   2743	rhead->flags = cpu_to_le32(flags);
   2744	rhead->num_fwd = req->r_num_fwd;
   2745	rhead->num_retry = req->r_attempts - 1;
   2746
   2747	dout(" r_parent = %p\n", req->r_parent);
   2748	return 0;
   2749}
   2750
   2751/*
   2752 * called under mdsc->mutex
   2753 */
   2754static int __send_request(struct ceph_mds_session *session,
   2755			  struct ceph_mds_request *req,
   2756			  bool drop_cap_releases)
   2757{
   2758	int err;
   2759
   2760	err = __prepare_send_request(session, req, drop_cap_releases);
   2761	if (!err) {
   2762		ceph_msg_get(req->r_request);
   2763		ceph_con_send(&session->s_con, req->r_request);
   2764	}
   2765
   2766	return err;
   2767}
   2768
   2769/*
   2770 * send request, or put it on the appropriate wait list.
   2771 */
   2772static void __do_request(struct ceph_mds_client *mdsc,
   2773			struct ceph_mds_request *req)
   2774{
   2775	struct ceph_mds_session *session = NULL;
   2776	int mds = -1;
   2777	int err = 0;
   2778	bool random;
   2779
   2780	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
   2781		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
   2782			__unregister_request(mdsc, req);
   2783		return;
   2784	}
   2785
   2786	if (req->r_timeout &&
   2787	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
   2788		dout("do_request timed out\n");
   2789		err = -ETIMEDOUT;
   2790		goto finish;
   2791	}
   2792	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
   2793		dout("do_request forced umount\n");
   2794		err = -EIO;
   2795		goto finish;
   2796	}
   2797	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
   2798		if (mdsc->mdsmap_err) {
   2799			err = mdsc->mdsmap_err;
   2800			dout("do_request mdsmap err %d\n", err);
   2801			goto finish;
   2802		}
   2803		if (mdsc->mdsmap->m_epoch == 0) {
   2804			dout("do_request no mdsmap, waiting for map\n");
   2805			list_add(&req->r_wait, &mdsc->waiting_for_map);
   2806			return;
   2807		}
   2808		if (!(mdsc->fsc->mount_options->flags &
   2809		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
   2810		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
   2811			err = -EHOSTUNREACH;
   2812			goto finish;
   2813		}
   2814	}
   2815
   2816	put_request_session(req);
   2817
   2818	mds = __choose_mds(mdsc, req, &random);
   2819	if (mds < 0 ||
   2820	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
   2821		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
   2822			err = -EJUKEBOX;
   2823			goto finish;
   2824		}
   2825		dout("do_request no mds or not active, waiting for map\n");
   2826		list_add(&req->r_wait, &mdsc->waiting_for_map);
   2827		return;
   2828	}
   2829
   2830	/* get, open session */
   2831	session = __ceph_lookup_mds_session(mdsc, mds);
   2832	if (!session) {
   2833		session = register_session(mdsc, mds);
   2834		if (IS_ERR(session)) {
   2835			err = PTR_ERR(session);
   2836			goto finish;
   2837		}
   2838	}
   2839	req->r_session = ceph_get_mds_session(session);
   2840
   2841	dout("do_request mds%d session %p state %s\n", mds, session,
   2842	     ceph_session_state_name(session->s_state));
   2843	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
   2844	    session->s_state != CEPH_MDS_SESSION_HUNG) {
   2845		/*
   2846		 * We cannot queue async requests since the caps and delegated
   2847		 * inodes are bound to the session. Just return -EJUKEBOX and
   2848		 * let the caller retry a sync request in that case.
   2849		 */
   2850		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
   2851			err = -EJUKEBOX;
   2852			goto out_session;
   2853		}
   2854
   2855		/*
   2856		 * If the session has been REJECTED, then return a hard error,
   2857		 * unless it's a CLEANRECOVER mount, in which case we'll queue
   2858		 * it to the mdsc queue.
   2859		 */
   2860		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
   2861			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
   2862				list_add(&req->r_wait, &mdsc->waiting_for_map);
   2863			else
   2864				err = -EACCES;
   2865			goto out_session;
   2866		}
   2867
   2868		if (session->s_state == CEPH_MDS_SESSION_NEW ||
   2869		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
   2870			err = __open_session(mdsc, session);
   2871			if (err)
   2872				goto out_session;
   2873			/* retry the same mds later */
   2874			if (random)
   2875				req->r_resend_mds = mds;
   2876		}
   2877		list_add(&req->r_wait, &session->s_waiting);
   2878		goto out_session;
   2879	}
   2880
   2881	/* send request */
   2882	req->r_resend_mds = -1;   /* forget any previous mds hint */
   2883
   2884	if (req->r_request_started == 0)   /* note request start time */
   2885		req->r_request_started = jiffies;
   2886
   2887	err = __send_request(session, req, false);
   2888
   2889out_session:
   2890	ceph_put_mds_session(session);
   2891finish:
   2892	if (err) {
   2893		dout("__do_request early error %d\n", err);
   2894		req->r_err = err;
   2895		complete_request(mdsc, req);
   2896		__unregister_request(mdsc, req);
   2897	}
   2898	return;
   2899}
   2900
   2901/*
   2902 * called under mdsc->mutex
   2903 */
   2904static void __wake_requests(struct ceph_mds_client *mdsc,
   2905			    struct list_head *head)
   2906{
   2907	struct ceph_mds_request *req;
   2908	LIST_HEAD(tmp_list);
   2909
   2910	list_splice_init(head, &tmp_list);
   2911
   2912	while (!list_empty(&tmp_list)) {
   2913		req = list_entry(tmp_list.next,
   2914				 struct ceph_mds_request, r_wait);
   2915		list_del_init(&req->r_wait);
   2916		dout(" wake request %p tid %llu\n", req, req->r_tid);
   2917		__do_request(mdsc, req);
   2918	}
   2919}
   2920
   2921/*
   2922 * Wake up threads with requests pending for @mds, so that they can
   2923 * resubmit their requests to a possibly different mds.
   2924 */
   2925static void kick_requests(struct ceph_mds_client *mdsc, int mds)
   2926{
   2927	struct ceph_mds_request *req;
   2928	struct rb_node *p = rb_first(&mdsc->request_tree);
   2929
   2930	dout("kick_requests mds%d\n", mds);
   2931	while (p) {
   2932		req = rb_entry(p, struct ceph_mds_request, r_node);
   2933		p = rb_next(p);
   2934		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
   2935			continue;
   2936		if (req->r_attempts > 0)
   2937			continue; /* only new requests */
   2938		if (req->r_session &&
   2939		    req->r_session->s_mds == mds) {
   2940			dout(" kicking tid %llu\n", req->r_tid);
   2941			list_del_init(&req->r_wait);
   2942			__do_request(mdsc, req);
   2943		}
   2944	}
   2945}
   2946
   2947int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
   2948			      struct ceph_mds_request *req)
   2949{
   2950	int err = 0;
   2951
   2952	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
   2953	if (req->r_inode)
   2954		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
   2955	if (req->r_parent) {
   2956		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
   2957		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
   2958			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
   2959		spin_lock(&ci->i_ceph_lock);
   2960		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
   2961		__ceph_touch_fmode(ci, mdsc, fmode);
   2962		spin_unlock(&ci->i_ceph_lock);
   2963	}
   2964	if (req->r_old_dentry_dir)
   2965		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
   2966				  CEPH_CAP_PIN);
   2967
   2968	if (req->r_inode) {
   2969		err = ceph_wait_on_async_create(req->r_inode);
   2970		if (err) {
   2971			dout("%s: wait for async create returned: %d\n",
   2972			     __func__, err);
   2973			return err;
   2974		}
   2975	}
   2976
   2977	if (!err && req->r_old_inode) {
   2978		err = ceph_wait_on_async_create(req->r_old_inode);
   2979		if (err) {
   2980			dout("%s: wait for async create returned: %d\n",
   2981			     __func__, err);
   2982			return err;
   2983		}
   2984	}
   2985
   2986	dout("submit_request on %p for inode %p\n", req, dir);
   2987	mutex_lock(&mdsc->mutex);
   2988	__register_request(mdsc, req, dir);
   2989	__do_request(mdsc, req);
   2990	err = req->r_err;
   2991	mutex_unlock(&mdsc->mutex);
   2992	return err;
   2993}
   2994
   2995int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
   2996			   struct ceph_mds_request *req,
   2997			   ceph_mds_request_wait_callback_t wait_func)
   2998{
   2999	int err;
   3000
   3001	/* wait */
   3002	dout("do_request waiting\n");
   3003	if (wait_func) {
   3004		err = wait_func(mdsc, req);
   3005	} else {
   3006		long timeleft = wait_for_completion_killable_timeout(
   3007					&req->r_completion,
   3008					ceph_timeout_jiffies(req->r_timeout));
   3009		if (timeleft > 0)
   3010			err = 0;
   3011		else if (!timeleft)
   3012			err = -ETIMEDOUT;  /* timed out */
   3013		else
   3014			err = timeleft;  /* killed */
   3015	}
   3016	dout("do_request waited, got %d\n", err);
   3017	mutex_lock(&mdsc->mutex);
   3018
   3019	/* only abort if we didn't race with a real reply */
   3020	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
   3021		err = le32_to_cpu(req->r_reply_info.head->result);
   3022	} else if (err < 0) {
   3023		dout("aborted request %lld with %d\n", req->r_tid, err);
   3024
   3025		/*
   3026		 * ensure we aren't running concurrently with
   3027		 * ceph_fill_trace or ceph_readdir_prepopulate, which
   3028		 * rely on locks (dir mutex) held by our caller.
   3029		 */
   3030		mutex_lock(&req->r_fill_mutex);
   3031		req->r_err = err;
   3032		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
   3033		mutex_unlock(&req->r_fill_mutex);
   3034
   3035		if (req->r_parent &&
   3036		    (req->r_op & CEPH_MDS_OP_WRITE))
   3037			ceph_invalidate_dir_request(req);
   3038	} else {
   3039		err = req->r_err;
   3040	}
   3041
   3042	mutex_unlock(&mdsc->mutex);
   3043	return err;
   3044}
   3045
   3046/*
   3047 * Synchrously perform an mds request.  Take care of all of the
   3048 * session setup, forwarding, retry details.
   3049 */
   3050int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
   3051			 struct inode *dir,
   3052			 struct ceph_mds_request *req)
   3053{
   3054	int err;
   3055
   3056	dout("do_request on %p\n", req);
   3057
   3058	/* issue */
   3059	err = ceph_mdsc_submit_request(mdsc, dir, req);
   3060	if (!err)
   3061		err = ceph_mdsc_wait_request(mdsc, req, NULL);
   3062	dout("do_request %p done, result %d\n", req, err);
   3063	return err;
   3064}
   3065
   3066/*
   3067 * Invalidate dir's completeness, dentry lease state on an aborted MDS
   3068 * namespace request.
   3069 */
   3070void ceph_invalidate_dir_request(struct ceph_mds_request *req)
   3071{
   3072	struct inode *dir = req->r_parent;
   3073	struct inode *old_dir = req->r_old_dentry_dir;
   3074
   3075	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
   3076
   3077	ceph_dir_clear_complete(dir);
   3078	if (old_dir)
   3079		ceph_dir_clear_complete(old_dir);
   3080	if (req->r_dentry)
   3081		ceph_invalidate_dentry_lease(req->r_dentry);
   3082	if (req->r_old_dentry)
   3083		ceph_invalidate_dentry_lease(req->r_old_dentry);
   3084}
   3085
   3086/*
   3087 * Handle mds reply.
   3088 *
   3089 * We take the session mutex and parse and process the reply immediately.
   3090 * This preserves the logical ordering of replies, capabilities, etc., sent
   3091 * by the MDS as they are applied to our local cache.
   3092 */
   3093static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
   3094{
   3095	struct ceph_mds_client *mdsc = session->s_mdsc;
   3096	struct ceph_mds_request *req;
   3097	struct ceph_mds_reply_head *head = msg->front.iov_base;
   3098	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
   3099	struct ceph_snap_realm *realm;
   3100	u64 tid;
   3101	int err, result;
   3102	int mds = session->s_mds;
   3103
   3104	if (msg->front.iov_len < sizeof(*head)) {
   3105		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
   3106		ceph_msg_dump(msg);
   3107		return;
   3108	}
   3109
   3110	/* get request, session */
   3111	tid = le64_to_cpu(msg->hdr.tid);
   3112	mutex_lock(&mdsc->mutex);
   3113	req = lookup_get_request(mdsc, tid);
   3114	if (!req) {
   3115		dout("handle_reply on unknown tid %llu\n", tid);
   3116		mutex_unlock(&mdsc->mutex);
   3117		return;
   3118	}
   3119	dout("handle_reply %p\n", req);
   3120
   3121	/* correct session? */
   3122	if (req->r_session != session) {
   3123		pr_err("mdsc_handle_reply got %llu on session mds%d"
   3124		       " not mds%d\n", tid, session->s_mds,
   3125		       req->r_session ? req->r_session->s_mds : -1);
   3126		mutex_unlock(&mdsc->mutex);
   3127		goto out;
   3128	}
   3129
   3130	/* dup? */
   3131	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
   3132	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
   3133		pr_warn("got a dup %s reply on %llu from mds%d\n",
   3134			   head->safe ? "safe" : "unsafe", tid, mds);
   3135		mutex_unlock(&mdsc->mutex);
   3136		goto out;
   3137	}
   3138	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
   3139		pr_warn("got unsafe after safe on %llu from mds%d\n",
   3140			   tid, mds);
   3141		mutex_unlock(&mdsc->mutex);
   3142		goto out;
   3143	}
   3144
   3145	result = le32_to_cpu(head->result);
   3146
   3147	if (head->safe) {
   3148		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
   3149		__unregister_request(mdsc, req);
   3150
   3151		/* last request during umount? */
   3152		if (mdsc->stopping && !__get_oldest_req(mdsc))
   3153			complete_all(&mdsc->safe_umount_waiters);
   3154
   3155		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
   3156			/*
   3157			 * We already handled the unsafe response, now do the
   3158			 * cleanup.  No need to examine the response; the MDS
   3159			 * doesn't include any result info in the safe
   3160			 * response.  And even if it did, there is nothing
   3161			 * useful we could do with a revised return value.
   3162			 */
   3163			dout("got safe reply %llu, mds%d\n", tid, mds);
   3164
   3165			mutex_unlock(&mdsc->mutex);
   3166			goto out;
   3167		}
   3168	} else {
   3169		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
   3170		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
   3171	}
   3172
   3173	dout("handle_reply tid %lld result %d\n", tid, result);
   3174	rinfo = &req->r_reply_info;
   3175	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
   3176		err = parse_reply_info(session, msg, rinfo, (u64)-1);
   3177	else
   3178		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
   3179	mutex_unlock(&mdsc->mutex);
   3180
   3181	/* Must find target inode outside of mutexes to avoid deadlocks */
   3182	if ((err >= 0) && rinfo->head->is_target) {
   3183		struct inode *in;
   3184		struct ceph_vino tvino = {
   3185			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
   3186			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
   3187		};
   3188
   3189		in = ceph_get_inode(mdsc->fsc->sb, tvino);
   3190		if (IS_ERR(in)) {
   3191			err = PTR_ERR(in);
   3192			mutex_lock(&session->s_mutex);
   3193			goto out_err;
   3194		}
   3195		req->r_target_inode = in;
   3196	}
   3197
   3198	mutex_lock(&session->s_mutex);
   3199	if (err < 0) {
   3200		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
   3201		ceph_msg_dump(msg);
   3202		goto out_err;
   3203	}
   3204
   3205	/* snap trace */
   3206	realm = NULL;
   3207	if (rinfo->snapblob_len) {
   3208		down_write(&mdsc->snap_rwsem);
   3209		ceph_update_snap_trace(mdsc, rinfo->snapblob,
   3210				rinfo->snapblob + rinfo->snapblob_len,
   3211				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
   3212				&realm);
   3213		downgrade_write(&mdsc->snap_rwsem);
   3214	} else {
   3215		down_read(&mdsc->snap_rwsem);
   3216	}
   3217
   3218	/* insert trace into our cache */
   3219	mutex_lock(&req->r_fill_mutex);
   3220	current->journal_info = req;
   3221	err = ceph_fill_trace(mdsc->fsc->sb, req);
   3222	if (err == 0) {
   3223		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
   3224				    req->r_op == CEPH_MDS_OP_LSSNAP))
   3225			ceph_readdir_prepopulate(req, req->r_session);
   3226	}
   3227	current->journal_info = NULL;
   3228	mutex_unlock(&req->r_fill_mutex);
   3229
   3230	up_read(&mdsc->snap_rwsem);
   3231	if (realm)
   3232		ceph_put_snap_realm(mdsc, realm);
   3233
   3234	if (err == 0) {
   3235		if (req->r_target_inode &&
   3236		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
   3237			struct ceph_inode_info *ci =
   3238				ceph_inode(req->r_target_inode);
   3239			spin_lock(&ci->i_unsafe_lock);
   3240			list_add_tail(&req->r_unsafe_target_item,
   3241				      &ci->i_unsafe_iops);
   3242			spin_unlock(&ci->i_unsafe_lock);
   3243		}
   3244
   3245		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
   3246	}
   3247out_err:
   3248	mutex_lock(&mdsc->mutex);
   3249	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
   3250		if (err) {
   3251			req->r_err = err;
   3252		} else {
   3253			req->r_reply =  ceph_msg_get(msg);
   3254			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
   3255		}
   3256	} else {
   3257		dout("reply arrived after request %lld was aborted\n", tid);
   3258	}
   3259	mutex_unlock(&mdsc->mutex);
   3260
   3261	mutex_unlock(&session->s_mutex);
   3262
   3263	/* kick calling process */
   3264	complete_request(mdsc, req);
   3265
   3266	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
   3267				     req->r_end_latency, err);
   3268out:
   3269	ceph_mdsc_put_request(req);
   3270	return;
   3271}
   3272
   3273
   3274
   3275/*
   3276 * handle mds notification that our request has been forwarded.
   3277 */
   3278static void handle_forward(struct ceph_mds_client *mdsc,
   3279			   struct ceph_mds_session *session,
   3280			   struct ceph_msg *msg)
   3281{
   3282	struct ceph_mds_request *req;
   3283	u64 tid = le64_to_cpu(msg->hdr.tid);
   3284	u32 next_mds;
   3285	u32 fwd_seq;
   3286	int err = -EINVAL;
   3287	void *p = msg->front.iov_base;
   3288	void *end = p + msg->front.iov_len;
   3289	bool aborted = false;
   3290
   3291	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
   3292	next_mds = ceph_decode_32(&p);
   3293	fwd_seq = ceph_decode_32(&p);
   3294
   3295	mutex_lock(&mdsc->mutex);
   3296	req = lookup_get_request(mdsc, tid);
   3297	if (!req) {
   3298		mutex_unlock(&mdsc->mutex);
   3299		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
   3300		return;  /* dup reply? */
   3301	}
   3302
   3303	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
   3304		dout("forward tid %llu aborted, unregistering\n", tid);
   3305		__unregister_request(mdsc, req);
   3306	} else if (fwd_seq <= req->r_num_fwd) {
   3307		/*
   3308		 * The type of 'num_fwd' in ceph 'MClientRequestForward'
   3309		 * is 'int32_t', while in 'ceph_mds_request_head' the
   3310		 * type is '__u8'. So in case the request bounces between
   3311		 * MDSes exceeding 256 times, the client will get stuck.
   3312		 *
   3313		 * In this case it's ususally a bug in MDS and continue
   3314		 * bouncing the request makes no sense.
   3315		 *
   3316		 * In future this could be fixed in ceph code, so avoid
   3317		 * using the hardcode here.
   3318		 */
   3319		int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
   3320		max = 1 << (max * BITS_PER_BYTE);
   3321		if (req->r_num_fwd >= max) {
   3322			mutex_lock(&req->r_fill_mutex);
   3323			req->r_err = -EMULTIHOP;
   3324			set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
   3325			mutex_unlock(&req->r_fill_mutex);
   3326			aborted = true;
   3327			pr_warn_ratelimited("forward tid %llu seq overflow\n",
   3328					    tid);
   3329		} else {
   3330			dout("forward tid %llu to mds%d - old seq %d <= %d\n",
   3331			     tid, next_mds, req->r_num_fwd, fwd_seq);
   3332		}
   3333	} else {
   3334		/* resend. forward race not possible; mds would drop */
   3335		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
   3336		BUG_ON(req->r_err);
   3337		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
   3338		req->r_attempts = 0;
   3339		req->r_num_fwd = fwd_seq;
   3340		req->r_resend_mds = next_mds;
   3341		put_request_session(req);
   3342		__do_request(mdsc, req);
   3343	}
   3344	mutex_unlock(&mdsc->mutex);
   3345
   3346	/* kick calling process */
   3347	if (aborted)
   3348		complete_request(mdsc, req);
   3349	ceph_mdsc_put_request(req);
   3350	return;
   3351
   3352bad:
   3353	pr_err("mdsc_handle_forward decode error err=%d\n", err);
   3354}
   3355
   3356static int __decode_session_metadata(void **p, void *end,
   3357				     bool *blocklisted)
   3358{
   3359	/* map<string,string> */
   3360	u32 n;
   3361	bool err_str;
   3362	ceph_decode_32_safe(p, end, n, bad);
   3363	while (n-- > 0) {
   3364		u32 len;
   3365		ceph_decode_32_safe(p, end, len, bad);
   3366		ceph_decode_need(p, end, len, bad);
   3367		err_str = !strncmp(*p, "error_string", len);
   3368		*p += len;
   3369		ceph_decode_32_safe(p, end, len, bad);
   3370		ceph_decode_need(p, end, len, bad);
   3371		/*
   3372		 * Match "blocklisted (blacklisted)" from newer MDSes,
   3373		 * or "blacklisted" from older MDSes.
   3374		 */
   3375		if (err_str && strnstr(*p, "blacklisted", len))
   3376			*blocklisted = true;
   3377		*p += len;
   3378	}
   3379	return 0;
   3380bad:
   3381	return -1;
   3382}
   3383
   3384/*
   3385 * handle a mds session control message
   3386 */
   3387static void handle_session(struct ceph_mds_session *session,
   3388			   struct ceph_msg *msg)
   3389{
   3390	struct ceph_mds_client *mdsc = session->s_mdsc;
   3391	int mds = session->s_mds;
   3392	int msg_version = le16_to_cpu(msg->hdr.version);
   3393	void *p = msg->front.iov_base;
   3394	void *end = p + msg->front.iov_len;
   3395	struct ceph_mds_session_head *h;
   3396	u32 op;
   3397	u64 seq, features = 0;
   3398	int wake = 0;
   3399	bool blocklisted = false;
   3400
   3401	/* decode */
   3402	ceph_decode_need(&p, end, sizeof(*h), bad);
   3403	h = p;
   3404	p += sizeof(*h);
   3405
   3406	op = le32_to_cpu(h->op);
   3407	seq = le64_to_cpu(h->seq);
   3408
   3409	if (msg_version >= 3) {
   3410		u32 len;
   3411		/* version >= 2 and < 5, decode metadata, skip otherwise
   3412		 * as it's handled via flags.
   3413		 */
   3414		if (msg_version >= 5)
   3415			ceph_decode_skip_map(&p, end, string, string, bad);
   3416		else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
   3417			goto bad;
   3418
   3419		/* version >= 3, feature bits */
   3420		ceph_decode_32_safe(&p, end, len, bad);
   3421		if (len) {
   3422			ceph_decode_64_safe(&p, end, features, bad);
   3423			p += len - sizeof(features);
   3424		}
   3425	}
   3426
   3427	if (msg_version >= 5) {
   3428		u32 flags, len;
   3429
   3430		/* version >= 4 */
   3431		ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
   3432		ceph_decode_32_safe(&p, end, len, bad); /* len */
   3433		ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
   3434
   3435		/* version >= 5, flags   */
   3436		ceph_decode_32_safe(&p, end, flags, bad);
   3437		if (flags & CEPH_SESSION_BLOCKLISTED) {
   3438			pr_warn("mds%d session blocklisted\n", session->s_mds);
   3439			blocklisted = true;
   3440		}
   3441	}
   3442
   3443	mutex_lock(&mdsc->mutex);
   3444	if (op == CEPH_SESSION_CLOSE) {
   3445		ceph_get_mds_session(session);
   3446		__unregister_session(mdsc, session);
   3447	}
   3448	/* FIXME: this ttl calculation is generous */
   3449	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
   3450	mutex_unlock(&mdsc->mutex);
   3451
   3452	mutex_lock(&session->s_mutex);
   3453
   3454	dout("handle_session mds%d %s %p state %s seq %llu\n",
   3455	     mds, ceph_session_op_name(op), session,
   3456	     ceph_session_state_name(session->s_state), seq);
   3457
   3458	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
   3459		session->s_state = CEPH_MDS_SESSION_OPEN;
   3460		pr_info("mds%d came back\n", session->s_mds);
   3461	}
   3462
   3463	switch (op) {
   3464	case CEPH_SESSION_OPEN:
   3465		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
   3466			pr_info("mds%d reconnect success\n", session->s_mds);
   3467		session->s_state = CEPH_MDS_SESSION_OPEN;
   3468		session->s_features = features;
   3469		renewed_caps(mdsc, session, 0);
   3470		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
   3471			metric_schedule_delayed(&mdsc->metric);
   3472		wake = 1;
   3473		if (mdsc->stopping)
   3474			__close_session(mdsc, session);
   3475		break;
   3476
   3477	case CEPH_SESSION_RENEWCAPS:
   3478		if (session->s_renew_seq == seq)
   3479			renewed_caps(mdsc, session, 1);
   3480		break;
   3481
   3482	case CEPH_SESSION_CLOSE:
   3483		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
   3484			pr_info("mds%d reconnect denied\n", session->s_mds);
   3485		session->s_state = CEPH_MDS_SESSION_CLOSED;
   3486		cleanup_session_requests(mdsc, session);
   3487		remove_session_caps(session);
   3488		wake = 2; /* for good measure */
   3489		wake_up_all(&mdsc->session_close_wq);
   3490		break;
   3491
   3492	case CEPH_SESSION_STALE:
   3493		pr_info("mds%d caps went stale, renewing\n",
   3494			session->s_mds);
   3495		atomic_inc(&session->s_cap_gen);
   3496		session->s_cap_ttl = jiffies - 1;
   3497		send_renew_caps(mdsc, session);
   3498		break;
   3499
   3500	case CEPH_SESSION_RECALL_STATE:
   3501		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
   3502		break;
   3503
   3504	case CEPH_SESSION_FLUSHMSG:
   3505		send_flushmsg_ack(mdsc, session, seq);
   3506		break;
   3507
   3508	case CEPH_SESSION_FORCE_RO:
   3509		dout("force_session_readonly %p\n", session);
   3510		spin_lock(&session->s_cap_lock);
   3511		session->s_readonly = true;
   3512		spin_unlock(&session->s_cap_lock);
   3513		wake_up_session_caps(session, FORCE_RO);
   3514		break;
   3515
   3516	case CEPH_SESSION_REJECT:
   3517		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
   3518		pr_info("mds%d rejected session\n", session->s_mds);
   3519		session->s_state = CEPH_MDS_SESSION_REJECTED;
   3520		cleanup_session_requests(mdsc, session);
   3521		remove_session_caps(session);
   3522		if (blocklisted)
   3523			mdsc->fsc->blocklisted = true;
   3524		wake = 2; /* for good measure */
   3525		break;
   3526
   3527	default:
   3528		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
   3529		WARN_ON(1);
   3530	}
   3531
   3532	mutex_unlock(&session->s_mutex);
   3533	if (wake) {
   3534		mutex_lock(&mdsc->mutex);
   3535		__wake_requests(mdsc, &session->s_waiting);
   3536		if (wake == 2)
   3537			kick_requests(mdsc, mds);
   3538		mutex_unlock(&mdsc->mutex);
   3539	}
   3540	if (op == CEPH_SESSION_CLOSE)
   3541		ceph_put_mds_session(session);
   3542	return;
   3543
   3544bad:
   3545	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
   3546	       (int)msg->front.iov_len);
   3547	ceph_msg_dump(msg);
   3548	return;
   3549}
   3550
   3551void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
   3552{
   3553	int dcaps;
   3554
   3555	dcaps = xchg(&req->r_dir_caps, 0);
   3556	if (dcaps) {
   3557		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
   3558		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
   3559	}
   3560}
   3561
   3562void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
   3563{
   3564	int dcaps;
   3565
   3566	dcaps = xchg(&req->r_dir_caps, 0);
   3567	if (dcaps) {
   3568		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
   3569		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
   3570						dcaps);
   3571	}
   3572}
   3573
   3574/*
   3575 * called under session->mutex.
   3576 */
   3577static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
   3578				   struct ceph_mds_session *session)
   3579{
   3580	struct ceph_mds_request *req, *nreq;
   3581	struct rb_node *p;
   3582
   3583	dout("replay_unsafe_requests mds%d\n", session->s_mds);
   3584
   3585	mutex_lock(&mdsc->mutex);
   3586	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
   3587		__send_request(session, req, true);
   3588
   3589	/*
   3590	 * also re-send old requests when MDS enters reconnect stage. So that MDS
   3591	 * can process completed request in clientreplay stage.
   3592	 */
   3593	p = rb_first(&mdsc->request_tree);
   3594	while (p) {
   3595		req = rb_entry(p, struct ceph_mds_request, r_node);
   3596		p = rb_next(p);
   3597		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
   3598			continue;
   3599		if (req->r_attempts == 0)
   3600			continue; /* only old requests */
   3601		if (!req->r_session)
   3602			continue;
   3603		if (req->r_session->s_mds != session->s_mds)
   3604			continue;
   3605
   3606		ceph_mdsc_release_dir_caps_no_check(req);
   3607
   3608		__send_request(session, req, true);
   3609	}
   3610	mutex_unlock(&mdsc->mutex);
   3611}
   3612
   3613static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
   3614{
   3615	struct ceph_msg *reply;
   3616	struct ceph_pagelist *_pagelist;
   3617	struct page *page;
   3618	__le32 *addr;
   3619	int err = -ENOMEM;
   3620
   3621	if (!recon_state->allow_multi)
   3622		return -ENOSPC;
   3623
   3624	/* can't handle message that contains both caps and realm */
   3625	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
   3626
   3627	/* pre-allocate new pagelist */
   3628	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
   3629	if (!_pagelist)
   3630		return -ENOMEM;
   3631
   3632	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
   3633	if (!reply)
   3634		goto fail_msg;
   3635
   3636	/* placeholder for nr_caps */
   3637	err = ceph_pagelist_encode_32(_pagelist, 0);
   3638	if (err < 0)
   3639		goto fail;
   3640
   3641	if (recon_state->nr_caps) {
   3642		/* currently encoding caps */
   3643		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
   3644		if (err)
   3645			goto fail;
   3646	} else {
   3647		/* placeholder for nr_realms (currently encoding relams) */
   3648		err = ceph_pagelist_encode_32(_pagelist, 0);
   3649		if (err < 0)
   3650			goto fail;
   3651	}
   3652
   3653	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
   3654	if (err)
   3655		goto fail;
   3656
   3657	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
   3658	addr = kmap_atomic(page);
   3659	if (recon_state->nr_caps) {
   3660		/* currently encoding caps */
   3661		*addr = cpu_to_le32(recon_state->nr_caps);
   3662	} else {
   3663		/* currently encoding relams */
   3664		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
   3665	}
   3666	kunmap_atomic(addr);
   3667
   3668	reply->hdr.version = cpu_to_le16(5);
   3669	reply->hdr.compat_version = cpu_to_le16(4);
   3670
   3671	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
   3672	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
   3673
   3674	ceph_con_send(&recon_state->session->s_con, reply);
   3675	ceph_pagelist_release(recon_state->pagelist);
   3676
   3677	recon_state->pagelist = _pagelist;
   3678	recon_state->nr_caps = 0;
   3679	recon_state->nr_realms = 0;
   3680	recon_state->msg_version = 5;
   3681	return 0;
   3682fail:
   3683	ceph_msg_put(reply);
   3684fail_msg:
   3685	ceph_pagelist_release(_pagelist);
   3686	return err;
   3687}
   3688
   3689static struct dentry* d_find_primary(struct inode *inode)
   3690{
   3691	struct dentry *alias, *dn = NULL;
   3692
   3693	if (hlist_empty(&inode->i_dentry))
   3694		return NULL;
   3695
   3696	spin_lock(&inode->i_lock);
   3697	if (hlist_empty(&inode->i_dentry))
   3698		goto out_unlock;
   3699
   3700	if (S_ISDIR(inode->i_mode)) {
   3701		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
   3702		if (!IS_ROOT(alias))
   3703			dn = dget(alias);
   3704		goto out_unlock;
   3705	}
   3706
   3707	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
   3708		spin_lock(&alias->d_lock);
   3709		if (!d_unhashed(alias) &&
   3710		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
   3711			dn = dget_dlock(alias);
   3712		}
   3713		spin_unlock(&alias->d_lock);
   3714		if (dn)
   3715			break;
   3716	}
   3717out_unlock:
   3718	spin_unlock(&inode->i_lock);
   3719	return dn;
   3720}
   3721
   3722/*
   3723 * Encode information about a cap for a reconnect with the MDS.
   3724 */
   3725static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
   3726			  void *arg)
   3727{
   3728	union {
   3729		struct ceph_mds_cap_reconnect v2;
   3730		struct ceph_mds_cap_reconnect_v1 v1;
   3731	} rec;
   3732	struct ceph_inode_info *ci = cap->ci;
   3733	struct ceph_reconnect_state *recon_state = arg;
   3734	struct ceph_pagelist *pagelist = recon_state->pagelist;
   3735	struct dentry *dentry;
   3736	char *path;
   3737	int pathlen = 0, err;
   3738	u64 pathbase;
   3739	u64 snap_follows;
   3740
   3741	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
   3742	     inode, ceph_vinop(inode), cap, cap->cap_id,
   3743	     ceph_cap_string(cap->issued));
   3744
   3745	dentry = d_find_primary(inode);
   3746	if (dentry) {
   3747		/* set pathbase to parent dir when msg_version >= 2 */
   3748		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
   3749					    recon_state->msg_version >= 2);
   3750		dput(dentry);
   3751		if (IS_ERR(path)) {
   3752			err = PTR_ERR(path);
   3753			goto out_err;
   3754		}
   3755	} else {
   3756		path = NULL;
   3757		pathbase = 0;
   3758	}
   3759
   3760	spin_lock(&ci->i_ceph_lock);
   3761	cap->seq = 0;        /* reset cap seq */
   3762	cap->issue_seq = 0;  /* and issue_seq */
   3763	cap->mseq = 0;       /* and migrate_seq */
   3764	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
   3765
   3766	/* These are lost when the session goes away */
   3767	if (S_ISDIR(inode->i_mode)) {
   3768		if (cap->issued & CEPH_CAP_DIR_CREATE) {
   3769			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
   3770			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
   3771		}
   3772		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
   3773	}
   3774
   3775	if (recon_state->msg_version >= 2) {
   3776		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
   3777		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
   3778		rec.v2.issued = cpu_to_le32(cap->issued);
   3779		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
   3780		rec.v2.pathbase = cpu_to_le64(pathbase);
   3781		rec.v2.flock_len = (__force __le32)
   3782			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
   3783	} else {
   3784		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
   3785		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
   3786		rec.v1.issued = cpu_to_le32(cap->issued);
   3787		rec.v1.size = cpu_to_le64(i_size_read(inode));
   3788		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
   3789		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
   3790		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
   3791		rec.v1.pathbase = cpu_to_le64(pathbase);
   3792	}
   3793
   3794	if (list_empty(&ci->i_cap_snaps)) {
   3795		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
   3796	} else {
   3797		struct ceph_cap_snap *capsnap =
   3798			list_first_entry(&ci->i_cap_snaps,
   3799					 struct ceph_cap_snap, ci_item);
   3800		snap_follows = capsnap->follows;
   3801	}
   3802	spin_unlock(&ci->i_ceph_lock);
   3803
   3804	if (recon_state->msg_version >= 2) {
   3805		int num_fcntl_locks, num_flock_locks;
   3806		struct ceph_filelock *flocks = NULL;
   3807		size_t struct_len, total_len = sizeof(u64);
   3808		u8 struct_v = 0;
   3809
   3810encode_again:
   3811		if (rec.v2.flock_len) {
   3812			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
   3813		} else {
   3814			num_fcntl_locks = 0;
   3815			num_flock_locks = 0;
   3816		}
   3817		if (num_fcntl_locks + num_flock_locks > 0) {
   3818			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
   3819					       sizeof(struct ceph_filelock),
   3820					       GFP_NOFS);
   3821			if (!flocks) {
   3822				err = -ENOMEM;
   3823				goto out_err;
   3824			}
   3825			err = ceph_encode_locks_to_buffer(inode, flocks,
   3826							  num_fcntl_locks,
   3827							  num_flock_locks);
   3828			if (err) {
   3829				kfree(flocks);
   3830				flocks = NULL;
   3831				if (err == -ENOSPC)
   3832					goto encode_again;
   3833				goto out_err;
   3834			}
   3835		} else {
   3836			kfree(flocks);
   3837			flocks = NULL;
   3838		}
   3839
   3840		if (recon_state->msg_version >= 3) {
   3841			/* version, compat_version and struct_len */
   3842			total_len += 2 * sizeof(u8) + sizeof(u32);
   3843			struct_v = 2;
   3844		}
   3845		/*
   3846		 * number of encoded locks is stable, so copy to pagelist
   3847		 */
   3848		struct_len = 2 * sizeof(u32) +
   3849			    (num_fcntl_locks + num_flock_locks) *
   3850			    sizeof(struct ceph_filelock);
   3851		rec.v2.flock_len = cpu_to_le32(struct_len);
   3852
   3853		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
   3854
   3855		if (struct_v >= 2)
   3856			struct_len += sizeof(u64); /* snap_follows */
   3857
   3858		total_len += struct_len;
   3859
   3860		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
   3861			err = send_reconnect_partial(recon_state);
   3862			if (err)
   3863				goto out_freeflocks;
   3864			pagelist = recon_state->pagelist;
   3865		}
   3866
   3867		err = ceph_pagelist_reserve(pagelist, total_len);
   3868		if (err)
   3869			goto out_freeflocks;
   3870
   3871		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
   3872		if (recon_state->msg_version >= 3) {
   3873			ceph_pagelist_encode_8(pagelist, struct_v);
   3874			ceph_pagelist_encode_8(pagelist, 1);
   3875			ceph_pagelist_encode_32(pagelist, struct_len);
   3876		}
   3877		ceph_pagelist_encode_string(pagelist, path, pathlen);
   3878		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
   3879		ceph_locks_to_pagelist(flocks, pagelist,
   3880				       num_fcntl_locks, num_flock_locks);
   3881		if (struct_v >= 2)
   3882			ceph_pagelist_encode_64(pagelist, snap_follows);
   3883out_freeflocks:
   3884		kfree(flocks);
   3885	} else {
   3886		err = ceph_pagelist_reserve(pagelist,
   3887					    sizeof(u64) + sizeof(u32) +
   3888					    pathlen + sizeof(rec.v1));
   3889		if (err)
   3890			goto out_err;
   3891
   3892		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
   3893		ceph_pagelist_encode_string(pagelist, path, pathlen);
   3894		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
   3895	}
   3896
   3897out_err:
   3898	ceph_mdsc_free_path(path, pathlen);
   3899	if (!err)
   3900		recon_state->nr_caps++;
   3901	return err;
   3902}
   3903
   3904static int encode_snap_realms(struct ceph_mds_client *mdsc,
   3905			      struct ceph_reconnect_state *recon_state)
   3906{
   3907	struct rb_node *p;
   3908	struct ceph_pagelist *pagelist = recon_state->pagelist;
   3909	int err = 0;
   3910
   3911	if (recon_state->msg_version >= 4) {
   3912		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
   3913		if (err < 0)
   3914			goto fail;
   3915	}
   3916
   3917	/*
   3918	 * snaprealms.  we provide mds with the ino, seq (version), and
   3919	 * parent for all of our realms.  If the mds has any newer info,
   3920	 * it will tell us.
   3921	 */
   3922	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
   3923		struct ceph_snap_realm *realm =
   3924		       rb_entry(p, struct ceph_snap_realm, node);
   3925		struct ceph_mds_snaprealm_reconnect sr_rec;
   3926
   3927		if (recon_state->msg_version >= 4) {
   3928			size_t need = sizeof(u8) * 2 + sizeof(u32) +
   3929				      sizeof(sr_rec);
   3930
   3931			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
   3932				err = send_reconnect_partial(recon_state);
   3933				if (err)
   3934					goto fail;
   3935				pagelist = recon_state->pagelist;
   3936			}
   3937
   3938			err = ceph_pagelist_reserve(pagelist, need);
   3939			if (err)
   3940				goto fail;
   3941
   3942			ceph_pagelist_encode_8(pagelist, 1);
   3943			ceph_pagelist_encode_8(pagelist, 1);
   3944			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
   3945		}
   3946
   3947		dout(" adding snap realm %llx seq %lld parent %llx\n",
   3948		     realm->ino, realm->seq, realm->parent_ino);
   3949		sr_rec.ino = cpu_to_le64(realm->ino);
   3950		sr_rec.seq = cpu_to_le64(realm->seq);
   3951		sr_rec.parent = cpu_to_le64(realm->parent_ino);
   3952
   3953		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
   3954		if (err)
   3955			goto fail;
   3956
   3957		recon_state->nr_realms++;
   3958	}
   3959fail:
   3960	return err;
   3961}
   3962
   3963
   3964/*
   3965 * If an MDS fails and recovers, clients need to reconnect in order to
   3966 * reestablish shared state.  This includes all caps issued through
   3967 * this session _and_ the snap_realm hierarchy.  Because it's not
   3968 * clear which snap realms the mds cares about, we send everything we
   3969 * know about.. that ensures we'll then get any new info the
   3970 * recovering MDS might have.
   3971 *
   3972 * This is a relatively heavyweight operation, but it's rare.
   3973 */
   3974static void send_mds_reconnect(struct ceph_mds_client *mdsc,
   3975			       struct ceph_mds_session *session)
   3976{
   3977	struct ceph_msg *reply;
   3978	int mds = session->s_mds;
   3979	int err = -ENOMEM;
   3980	struct ceph_reconnect_state recon_state = {
   3981		.session = session,
   3982	};
   3983	LIST_HEAD(dispose);
   3984
   3985	pr_info("mds%d reconnect start\n", mds);
   3986
   3987	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
   3988	if (!recon_state.pagelist)
   3989		goto fail_nopagelist;
   3990
   3991	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
   3992	if (!reply)
   3993		goto fail_nomsg;
   3994
   3995	xa_destroy(&session->s_delegated_inos);
   3996
   3997	mutex_lock(&session->s_mutex);
   3998	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
   3999	session->s_seq = 0;
   4000
   4001	dout("session %p state %s\n", session,
   4002	     ceph_session_state_name(session->s_state));
   4003
   4004	atomic_inc(&session->s_cap_gen);
   4005
   4006	spin_lock(&session->s_cap_lock);
   4007	/* don't know if session is readonly */
   4008	session->s_readonly = 0;
   4009	/*
   4010	 * notify __ceph_remove_cap() that we are composing cap reconnect.
   4011	 * If a cap get released before being added to the cap reconnect,
   4012	 * __ceph_remove_cap() should skip queuing cap release.
   4013	 */
   4014	session->s_cap_reconnect = 1;
   4015	/* drop old cap expires; we're about to reestablish that state */
   4016	detach_cap_releases(session, &dispose);
   4017	spin_unlock(&session->s_cap_lock);
   4018	dispose_cap_releases(mdsc, &dispose);
   4019
   4020	/* trim unused caps to reduce MDS's cache rejoin time */
   4021	if (mdsc->fsc->sb->s_root)
   4022		shrink_dcache_parent(mdsc->fsc->sb->s_root);
   4023
   4024	ceph_con_close(&session->s_con);
   4025	ceph_con_open(&session->s_con,
   4026		      CEPH_ENTITY_TYPE_MDS, mds,
   4027		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
   4028
   4029	/* replay unsafe requests */
   4030	replay_unsafe_requests(mdsc, session);
   4031
   4032	ceph_early_kick_flushing_caps(mdsc, session);
   4033
   4034	down_read(&mdsc->snap_rwsem);
   4035
   4036	/* placeholder for nr_caps */
   4037	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
   4038	if (err)
   4039		goto fail;
   4040
   4041	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
   4042		recon_state.msg_version = 3;
   4043		recon_state.allow_multi = true;
   4044	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
   4045		recon_state.msg_version = 3;
   4046	} else {
   4047		recon_state.msg_version = 2;
   4048	}
   4049	/* trsaverse this session's caps */
   4050	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
   4051
   4052	spin_lock(&session->s_cap_lock);
   4053	session->s_cap_reconnect = 0;
   4054	spin_unlock(&session->s_cap_lock);
   4055
   4056	if (err < 0)
   4057		goto fail;
   4058
   4059	/* check if all realms can be encoded into current message */
   4060	if (mdsc->num_snap_realms) {
   4061		size_t total_len =
   4062			recon_state.pagelist->length +
   4063			mdsc->num_snap_realms *
   4064			sizeof(struct ceph_mds_snaprealm_reconnect);
   4065		if (recon_state.msg_version >= 4) {
   4066			/* number of realms */
   4067			total_len += sizeof(u32);
   4068			/* version, compat_version and struct_len */
   4069			total_len += mdsc->num_snap_realms *
   4070				     (2 * sizeof(u8) + sizeof(u32));
   4071		}
   4072		if (total_len > RECONNECT_MAX_SIZE) {
   4073			if (!recon_state.allow_multi) {
   4074				err = -ENOSPC;
   4075				goto fail;
   4076			}
   4077			if (recon_state.nr_caps) {
   4078				err = send_reconnect_partial(&recon_state);
   4079				if (err)
   4080					goto fail;
   4081			}
   4082			recon_state.msg_version = 5;
   4083		}
   4084	}
   4085
   4086	err = encode_snap_realms(mdsc, &recon_state);
   4087	if (err < 0)
   4088		goto fail;
   4089
   4090	if (recon_state.msg_version >= 5) {
   4091		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
   4092		if (err < 0)
   4093			goto fail;
   4094	}
   4095
   4096	if (recon_state.nr_caps || recon_state.nr_realms) {
   4097		struct page *page =
   4098			list_first_entry(&recon_state.pagelist->head,
   4099					struct page, lru);
   4100		__le32 *addr = kmap_atomic(page);
   4101		if (recon_state.nr_caps) {
   4102			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
   4103			*addr = cpu_to_le32(recon_state.nr_caps);
   4104		} else if (recon_state.msg_version >= 4) {
   4105			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
   4106		}
   4107		kunmap_atomic(addr);
   4108	}
   4109
   4110	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
   4111	if (recon_state.msg_version >= 4)
   4112		reply->hdr.compat_version = cpu_to_le16(4);
   4113
   4114	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
   4115	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
   4116
   4117	ceph_con_send(&session->s_con, reply);
   4118
   4119	mutex_unlock(&session->s_mutex);
   4120
   4121	mutex_lock(&mdsc->mutex);
   4122	__wake_requests(mdsc, &session->s_waiting);
   4123	mutex_unlock(&mdsc->mutex);
   4124
   4125	up_read(&mdsc->snap_rwsem);
   4126	ceph_pagelist_release(recon_state.pagelist);
   4127	return;
   4128
   4129fail:
   4130	ceph_msg_put(reply);
   4131	up_read(&mdsc->snap_rwsem);
   4132	mutex_unlock(&session->s_mutex);
   4133fail_nomsg:
   4134	ceph_pagelist_release(recon_state.pagelist);
   4135fail_nopagelist:
   4136	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
   4137	return;
   4138}
   4139
   4140
   4141/*
   4142 * compare old and new mdsmaps, kicking requests
   4143 * and closing out old connections as necessary
   4144 *
   4145 * called under mdsc->mutex.
   4146 */
   4147static void check_new_map(struct ceph_mds_client *mdsc,
   4148			  struct ceph_mdsmap *newmap,
   4149			  struct ceph_mdsmap *oldmap)
   4150{
   4151	int i, j, err;
   4152	int oldstate, newstate;
   4153	struct ceph_mds_session *s;
   4154	unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
   4155
   4156	dout("check_new_map new %u old %u\n",
   4157	     newmap->m_epoch, oldmap->m_epoch);
   4158
   4159	if (newmap->m_info) {
   4160		for (i = 0; i < newmap->possible_max_rank; i++) {
   4161			for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
   4162				set_bit(newmap->m_info[i].export_targets[j], targets);
   4163		}
   4164	}
   4165
   4166	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
   4167		if (!mdsc->sessions[i])
   4168			continue;
   4169		s = mdsc->sessions[i];
   4170		oldstate = ceph_mdsmap_get_state(oldmap, i);
   4171		newstate = ceph_mdsmap_get_state(newmap, i);
   4172
   4173		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
   4174		     i, ceph_mds_state_name(oldstate),
   4175		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
   4176		     ceph_mds_state_name(newstate),
   4177		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
   4178		     ceph_session_state_name(s->s_state));
   4179
   4180		if (i >= newmap->possible_max_rank) {
   4181			/* force close session for stopped mds */
   4182			ceph_get_mds_session(s);
   4183			__unregister_session(mdsc, s);
   4184			__wake_requests(mdsc, &s->s_waiting);
   4185			mutex_unlock(&mdsc->mutex);
   4186
   4187			mutex_lock(&s->s_mutex);
   4188			cleanup_session_requests(mdsc, s);
   4189			remove_session_caps(s);
   4190			mutex_unlock(&s->s_mutex);
   4191
   4192			ceph_put_mds_session(s);
   4193
   4194			mutex_lock(&mdsc->mutex);
   4195			kick_requests(mdsc, i);
   4196			continue;
   4197		}
   4198
   4199		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
   4200			   ceph_mdsmap_get_addr(newmap, i),
   4201			   sizeof(struct ceph_entity_addr))) {
   4202			/* just close it */
   4203			mutex_unlock(&mdsc->mutex);
   4204			mutex_lock(&s->s_mutex);
   4205			mutex_lock(&mdsc->mutex);
   4206			ceph_con_close(&s->s_con);
   4207			mutex_unlock(&s->s_mutex);
   4208			s->s_state = CEPH_MDS_SESSION_RESTARTING;
   4209		} else if (oldstate == newstate) {
   4210			continue;  /* nothing new with this mds */
   4211		}
   4212
   4213		/*
   4214		 * send reconnect?
   4215		 */
   4216		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
   4217		    newstate >= CEPH_MDS_STATE_RECONNECT) {
   4218			mutex_unlock(&mdsc->mutex);
   4219			clear_bit(i, targets);
   4220			send_mds_reconnect(mdsc, s);
   4221			mutex_lock(&mdsc->mutex);
   4222		}
   4223
   4224		/*
   4225		 * kick request on any mds that has gone active.
   4226		 */
   4227		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
   4228		    newstate >= CEPH_MDS_STATE_ACTIVE) {
   4229			if (oldstate != CEPH_MDS_STATE_CREATING &&
   4230			    oldstate != CEPH_MDS_STATE_STARTING)
   4231				pr_info("mds%d recovery completed\n", s->s_mds);
   4232			kick_requests(mdsc, i);
   4233			mutex_unlock(&mdsc->mutex);
   4234			mutex_lock(&s->s_mutex);
   4235			mutex_lock(&mdsc->mutex);
   4236			ceph_kick_flushing_caps(mdsc, s);
   4237			mutex_unlock(&s->s_mutex);
   4238			wake_up_session_caps(s, RECONNECT);
   4239		}
   4240	}
   4241
   4242	/*
   4243	 * Only open and reconnect sessions that don't exist yet.
   4244	 */
   4245	for (i = 0; i < newmap->possible_max_rank; i++) {
   4246		/*
   4247		 * In case the import MDS is crashed just after
   4248		 * the EImportStart journal is flushed, so when
   4249		 * a standby MDS takes over it and is replaying
   4250		 * the EImportStart journal the new MDS daemon
   4251		 * will wait the client to reconnect it, but the
   4252		 * client may never register/open the session yet.
   4253		 *
   4254		 * Will try to reconnect that MDS daemon if the
   4255		 * rank number is in the export targets array and
   4256		 * is the up:reconnect state.
   4257		 */
   4258		newstate = ceph_mdsmap_get_state(newmap, i);
   4259		if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
   4260			continue;
   4261
   4262		/*
   4263		 * The session maybe registered and opened by some
   4264		 * requests which were choosing random MDSes during
   4265		 * the mdsc->mutex's unlock/lock gap below in rare
   4266		 * case. But the related MDS daemon will just queue
   4267		 * that requests and be still waiting for the client's
   4268		 * reconnection request in up:reconnect state.
   4269		 */
   4270		s = __ceph_lookup_mds_session(mdsc, i);
   4271		if (likely(!s)) {
   4272			s = __open_export_target_session(mdsc, i);
   4273			if (IS_ERR(s)) {
   4274				err = PTR_ERR(s);
   4275				pr_err("failed to open export target session, err %d\n",
   4276				       err);
   4277				continue;
   4278			}
   4279		}
   4280		dout("send reconnect to export target mds.%d\n", i);
   4281		mutex_unlock(&mdsc->mutex);
   4282		send_mds_reconnect(mdsc, s);
   4283		ceph_put_mds_session(s);
   4284		mutex_lock(&mdsc->mutex);
   4285	}
   4286
   4287	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
   4288		s = mdsc->sessions[i];
   4289		if (!s)
   4290			continue;
   4291		if (!ceph_mdsmap_is_laggy(newmap, i))
   4292			continue;
   4293		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
   4294		    s->s_state == CEPH_MDS_SESSION_HUNG ||
   4295		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
   4296			dout(" connecting to export targets of laggy mds%d\n",
   4297			     i);
   4298			__open_export_target_sessions(mdsc, s);
   4299		}
   4300	}
   4301}
   4302
   4303
   4304
   4305/*
   4306 * leases
   4307 */
   4308
   4309/*
   4310 * caller must hold session s_mutex, dentry->d_lock
   4311 */
   4312void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
   4313{
   4314	struct ceph_dentry_info *di = ceph_dentry(dentry);
   4315
   4316	ceph_put_mds_session(di->lease_session);
   4317	di->lease_session = NULL;
   4318}
   4319
   4320static void handle_lease(struct ceph_mds_client *mdsc,
   4321			 struct ceph_mds_session *session,
   4322			 struct ceph_msg *msg)
   4323{
   4324	struct super_block *sb = mdsc->fsc->sb;
   4325	struct inode *inode;
   4326	struct dentry *parent, *dentry;
   4327	struct ceph_dentry_info *di;
   4328	int mds = session->s_mds;
   4329	struct ceph_mds_lease *h = msg->front.iov_base;
   4330	u32 seq;
   4331	struct ceph_vino vino;
   4332	struct qstr dname;
   4333	int release = 0;
   4334
   4335	dout("handle_lease from mds%d\n", mds);
   4336
   4337	/* decode */
   4338	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
   4339		goto bad;
   4340	vino.ino = le64_to_cpu(h->ino);
   4341	vino.snap = CEPH_NOSNAP;
   4342	seq = le32_to_cpu(h->seq);
   4343	dname.len = get_unaligned_le32(h + 1);
   4344	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
   4345		goto bad;
   4346	dname.name = (void *)(h + 1) + sizeof(u32);
   4347
   4348	/* lookup inode */
   4349	inode = ceph_find_inode(sb, vino);
   4350	dout("handle_lease %s, ino %llx %p %.*s\n",
   4351	     ceph_lease_op_name(h->action), vino.ino, inode,
   4352	     dname.len, dname.name);
   4353
   4354	mutex_lock(&session->s_mutex);
   4355	inc_session_sequence(session);
   4356
   4357	if (!inode) {
   4358		dout("handle_lease no inode %llx\n", vino.ino);
   4359		goto release;
   4360	}
   4361
   4362	/* dentry */
   4363	parent = d_find_alias(inode);
   4364	if (!parent) {
   4365		dout("no parent dentry on inode %p\n", inode);
   4366		WARN_ON(1);
   4367		goto release;  /* hrm... */
   4368	}
   4369	dname.hash = full_name_hash(parent, dname.name, dname.len);
   4370	dentry = d_lookup(parent, &dname);
   4371	dput(parent);
   4372	if (!dentry)
   4373		goto release;
   4374
   4375	spin_lock(&dentry->d_lock);
   4376	di = ceph_dentry(dentry);
   4377	switch (h->action) {
   4378	case CEPH_MDS_LEASE_REVOKE:
   4379		if (di->lease_session == session) {
   4380			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
   4381				h->seq = cpu_to_le32(di->lease_seq);
   4382			__ceph_mdsc_drop_dentry_lease(dentry);
   4383		}
   4384		release = 1;
   4385		break;
   4386
   4387	case CEPH_MDS_LEASE_RENEW:
   4388		if (di->lease_session == session &&
   4389		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
   4390		    di->lease_renew_from &&
   4391		    di->lease_renew_after == 0) {
   4392			unsigned long duration =
   4393				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
   4394
   4395			di->lease_seq = seq;
   4396			di->time = di->lease_renew_from + duration;
   4397			di->lease_renew_after = di->lease_renew_from +
   4398				(duration >> 1);
   4399			di->lease_renew_from = 0;
   4400		}
   4401		break;
   4402	}
   4403	spin_unlock(&dentry->d_lock);
   4404	dput(dentry);
   4405
   4406	if (!release)
   4407		goto out;
   4408
   4409release:
   4410	/* let's just reuse the same message */
   4411	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
   4412	ceph_msg_get(msg);
   4413	ceph_con_send(&session->s_con, msg);
   4414
   4415out:
   4416	mutex_unlock(&session->s_mutex);
   4417	iput(inode);
   4418	return;
   4419
   4420bad:
   4421	pr_err("corrupt lease message\n");
   4422	ceph_msg_dump(msg);
   4423}
   4424
   4425void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
   4426			      struct dentry *dentry, char action,
   4427			      u32 seq)
   4428{
   4429	struct ceph_msg *msg;
   4430	struct ceph_mds_lease *lease;
   4431	struct inode *dir;
   4432	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
   4433
   4434	dout("lease_send_msg identry %p %s to mds%d\n",
   4435	     dentry, ceph_lease_op_name(action), session->s_mds);
   4436
   4437	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
   4438	if (!msg)
   4439		return;
   4440	lease = msg->front.iov_base;
   4441	lease->action = action;
   4442	lease->seq = cpu_to_le32(seq);
   4443
   4444	spin_lock(&dentry->d_lock);
   4445	dir = d_inode(dentry->d_parent);
   4446	lease->ino = cpu_to_le64(ceph_ino(dir));
   4447	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
   4448
   4449	put_unaligned_le32(dentry->d_name.len, lease + 1);
   4450	memcpy((void *)(lease + 1) + 4,
   4451	       dentry->d_name.name, dentry->d_name.len);
   4452	spin_unlock(&dentry->d_lock);
   4453
   4454	ceph_con_send(&session->s_con, msg);
   4455}
   4456
   4457/*
   4458 * lock unlock the session, to wait ongoing session activities
   4459 */
   4460static void lock_unlock_session(struct ceph_mds_session *s)
   4461{
   4462	mutex_lock(&s->s_mutex);
   4463	mutex_unlock(&s->s_mutex);
   4464}
   4465
   4466static void maybe_recover_session(struct ceph_mds_client *mdsc)
   4467{
   4468	struct ceph_fs_client *fsc = mdsc->fsc;
   4469
   4470	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
   4471		return;
   4472
   4473	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
   4474		return;
   4475
   4476	if (!READ_ONCE(fsc->blocklisted))
   4477		return;
   4478
   4479	pr_info("auto reconnect after blocklisted\n");
   4480	ceph_force_reconnect(fsc->sb);
   4481}
   4482
   4483bool check_session_state(struct ceph_mds_session *s)
   4484{
   4485	switch (s->s_state) {
   4486	case CEPH_MDS_SESSION_OPEN:
   4487		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
   4488			s->s_state = CEPH_MDS_SESSION_HUNG;
   4489			pr_info("mds%d hung\n", s->s_mds);
   4490		}
   4491		break;
   4492	case CEPH_MDS_SESSION_CLOSING:
   4493	case CEPH_MDS_SESSION_NEW:
   4494	case CEPH_MDS_SESSION_RESTARTING:
   4495	case CEPH_MDS_SESSION_CLOSED:
   4496	case CEPH_MDS_SESSION_REJECTED:
   4497		return false;
   4498	}
   4499
   4500	return true;
   4501}
   4502
   4503/*
   4504 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
   4505 * then we need to retransmit that request.
   4506 */
   4507void inc_session_sequence(struct ceph_mds_session *s)
   4508{
   4509	lockdep_assert_held(&s->s_mutex);
   4510
   4511	s->s_seq++;
   4512
   4513	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
   4514		int ret;
   4515
   4516		dout("resending session close request for mds%d\n", s->s_mds);
   4517		ret = request_close_session(s);
   4518		if (ret < 0)
   4519			pr_err("unable to close session to mds%d: %d\n",
   4520			       s->s_mds, ret);
   4521	}
   4522}
   4523
   4524/*
   4525 * delayed work -- periodically trim expired leases, renew caps with mds.  If
   4526 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
   4527 * workqueue delay value of 5 secs will be used.
   4528 */
   4529static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
   4530{
   4531	unsigned long max_delay = HZ * 5;
   4532
   4533	/* 5 secs default delay */
   4534	if (!delay || (delay > max_delay))
   4535		delay = max_delay;
   4536	schedule_delayed_work(&mdsc->delayed_work,
   4537			      round_jiffies_relative(delay));
   4538}
   4539
   4540static void delayed_work(struct work_struct *work)
   4541{
   4542	struct ceph_mds_client *mdsc =
   4543		container_of(work, struct ceph_mds_client, delayed_work.work);
   4544	unsigned long delay;
   4545	int renew_interval;
   4546	int renew_caps;
   4547	int i;
   4548
   4549	dout("mdsc delayed_work\n");
   4550
   4551	if (mdsc->stopping)
   4552		return;
   4553
   4554	mutex_lock(&mdsc->mutex);
   4555	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
   4556	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
   4557				   mdsc->last_renew_caps);
   4558	if (renew_caps)
   4559		mdsc->last_renew_caps = jiffies;
   4560
   4561	for (i = 0; i < mdsc->max_sessions; i++) {
   4562		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
   4563		if (!s)
   4564			continue;
   4565
   4566		if (!check_session_state(s)) {
   4567			ceph_put_mds_session(s);
   4568			continue;
   4569		}
   4570		mutex_unlock(&mdsc->mutex);
   4571
   4572		mutex_lock(&s->s_mutex);
   4573		if (renew_caps)
   4574			send_renew_caps(mdsc, s);
   4575		else
   4576			ceph_con_keepalive(&s->s_con);
   4577		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
   4578		    s->s_state == CEPH_MDS_SESSION_HUNG)
   4579			ceph_send_cap_releases(mdsc, s);
   4580		mutex_unlock(&s->s_mutex);
   4581		ceph_put_mds_session(s);
   4582
   4583		mutex_lock(&mdsc->mutex);
   4584	}
   4585	mutex_unlock(&mdsc->mutex);
   4586
   4587	delay = ceph_check_delayed_caps(mdsc);
   4588
   4589	ceph_queue_cap_reclaim_work(mdsc);
   4590
   4591	ceph_trim_snapid_map(mdsc);
   4592
   4593	maybe_recover_session(mdsc);
   4594
   4595	schedule_delayed(mdsc, delay);
   4596}
   4597
   4598int ceph_mdsc_init(struct ceph_fs_client *fsc)
   4599
   4600{
   4601	struct ceph_mds_client *mdsc;
   4602	int err;
   4603
   4604	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
   4605	if (!mdsc)
   4606		return -ENOMEM;
   4607	mdsc->fsc = fsc;
   4608	mutex_init(&mdsc->mutex);
   4609	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
   4610	if (!mdsc->mdsmap) {
   4611		err = -ENOMEM;
   4612		goto err_mdsc;
   4613	}
   4614
   4615	init_completion(&mdsc->safe_umount_waiters);
   4616	init_waitqueue_head(&mdsc->session_close_wq);
   4617	INIT_LIST_HEAD(&mdsc->waiting_for_map);
   4618	mdsc->quotarealms_inodes = RB_ROOT;
   4619	mutex_init(&mdsc->quotarealms_inodes_mutex);
   4620	init_rwsem(&mdsc->snap_rwsem);
   4621	mdsc->snap_realms = RB_ROOT;
   4622	INIT_LIST_HEAD(&mdsc->snap_empty);
   4623	spin_lock_init(&mdsc->snap_empty_lock);
   4624	mdsc->request_tree = RB_ROOT;
   4625	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
   4626	mdsc->last_renew_caps = jiffies;
   4627	INIT_LIST_HEAD(&mdsc->cap_delay_list);
   4628	INIT_LIST_HEAD(&mdsc->cap_wait_list);
   4629	spin_lock_init(&mdsc->cap_delay_lock);
   4630	INIT_LIST_HEAD(&mdsc->snap_flush_list);
   4631	spin_lock_init(&mdsc->snap_flush_lock);
   4632	mdsc->last_cap_flush_tid = 1;
   4633	INIT_LIST_HEAD(&mdsc->cap_flush_list);
   4634	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
   4635	spin_lock_init(&mdsc->cap_dirty_lock);
   4636	init_waitqueue_head(&mdsc->cap_flushing_wq);
   4637	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
   4638	err = ceph_metric_init(&mdsc->metric);
   4639	if (err)
   4640		goto err_mdsmap;
   4641
   4642	spin_lock_init(&mdsc->dentry_list_lock);
   4643	INIT_LIST_HEAD(&mdsc->dentry_leases);
   4644	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
   4645
   4646	ceph_caps_init(mdsc);
   4647	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
   4648
   4649	spin_lock_init(&mdsc->snapid_map_lock);
   4650	mdsc->snapid_map_tree = RB_ROOT;
   4651	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
   4652
   4653	init_rwsem(&mdsc->pool_perm_rwsem);
   4654	mdsc->pool_perm_tree = RB_ROOT;
   4655
   4656	strscpy(mdsc->nodename, utsname()->nodename,
   4657		sizeof(mdsc->nodename));
   4658
   4659	fsc->mdsc = mdsc;
   4660	return 0;
   4661
   4662err_mdsmap:
   4663	kfree(mdsc->mdsmap);
   4664err_mdsc:
   4665	kfree(mdsc);
   4666	return err;
   4667}
   4668
   4669/*
   4670 * Wait for safe replies on open mds requests.  If we time out, drop
   4671 * all requests from the tree to avoid dangling dentry refs.
   4672 */
   4673static void wait_requests(struct ceph_mds_client *mdsc)
   4674{
   4675	struct ceph_options *opts = mdsc->fsc->client->options;
   4676	struct ceph_mds_request *req;
   4677
   4678	mutex_lock(&mdsc->mutex);
   4679	if (__get_oldest_req(mdsc)) {
   4680		mutex_unlock(&mdsc->mutex);
   4681
   4682		dout("wait_requests waiting for requests\n");
   4683		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
   4684				    ceph_timeout_jiffies(opts->mount_timeout));
   4685
   4686		/* tear down remaining requests */
   4687		mutex_lock(&mdsc->mutex);
   4688		while ((req = __get_oldest_req(mdsc))) {
   4689			dout("wait_requests timed out on tid %llu\n",
   4690			     req->r_tid);
   4691			list_del_init(&req->r_wait);
   4692			__unregister_request(mdsc, req);
   4693		}
   4694	}
   4695	mutex_unlock(&mdsc->mutex);
   4696	dout("wait_requests done\n");
   4697}
   4698
   4699void send_flush_mdlog(struct ceph_mds_session *s)
   4700{
   4701	struct ceph_msg *msg;
   4702
   4703	/*
   4704	 * Pre-luminous MDS crashes when it sees an unknown session request
   4705	 */
   4706	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
   4707		return;
   4708
   4709	mutex_lock(&s->s_mutex);
   4710	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
   4711	     ceph_session_state_name(s->s_state), s->s_seq);
   4712	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
   4713				      s->s_seq);
   4714	if (!msg) {
   4715		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
   4716		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
   4717	} else {
   4718		ceph_con_send(&s->s_con, msg);
   4719	}
   4720	mutex_unlock(&s->s_mutex);
   4721}
   4722
   4723/*
   4724 * called before mount is ro, and before dentries are torn down.
   4725 * (hmm, does this still race with new lookups?)
   4726 */
   4727void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
   4728{
   4729	dout("pre_umount\n");
   4730	mdsc->stopping = 1;
   4731
   4732	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
   4733	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
   4734	ceph_flush_dirty_caps(mdsc);
   4735	wait_requests(mdsc);
   4736
   4737	/*
   4738	 * wait for reply handlers to drop their request refs and
   4739	 * their inode/dcache refs
   4740	 */
   4741	ceph_msgr_flush();
   4742
   4743	ceph_cleanup_quotarealms_inodes(mdsc);
   4744}
   4745
   4746/*
   4747 * flush the mdlog and wait for all write mds requests to flush.
   4748 */
   4749static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
   4750						 u64 want_tid)
   4751{
   4752	struct ceph_mds_request *req = NULL, *nextreq;
   4753	struct ceph_mds_session *last_session = NULL;
   4754	struct rb_node *n;
   4755
   4756	mutex_lock(&mdsc->mutex);
   4757	dout("%s want %lld\n", __func__, want_tid);
   4758restart:
   4759	req = __get_oldest_req(mdsc);
   4760	while (req && req->r_tid <= want_tid) {
   4761		/* find next request */
   4762		n = rb_next(&req->r_node);
   4763		if (n)
   4764			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
   4765		else
   4766			nextreq = NULL;
   4767		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
   4768		    (req->r_op & CEPH_MDS_OP_WRITE)) {
   4769			struct ceph_mds_session *s = req->r_session;
   4770
   4771			if (!s) {
   4772				req = nextreq;
   4773				continue;
   4774			}
   4775
   4776			/* write op */
   4777			ceph_mdsc_get_request(req);
   4778			if (nextreq)
   4779				ceph_mdsc_get_request(nextreq);
   4780			s = ceph_get_mds_session(s);
   4781			mutex_unlock(&mdsc->mutex);
   4782
   4783			/* send flush mdlog request to MDS */
   4784			if (last_session != s) {
   4785				send_flush_mdlog(s);
   4786				ceph_put_mds_session(last_session);
   4787				last_session = s;
   4788			} else {
   4789				ceph_put_mds_session(s);
   4790			}
   4791			dout("%s wait on %llu (want %llu)\n", __func__,
   4792			     req->r_tid, want_tid);
   4793			wait_for_completion(&req->r_safe_completion);
   4794
   4795			mutex_lock(&mdsc->mutex);
   4796			ceph_mdsc_put_request(req);
   4797			if (!nextreq)
   4798				break;  /* next dne before, so we're done! */
   4799			if (RB_EMPTY_NODE(&nextreq->r_node)) {
   4800				/* next request was removed from tree */
   4801				ceph_mdsc_put_request(nextreq);
   4802				goto restart;
   4803			}
   4804			ceph_mdsc_put_request(nextreq);  /* won't go away */
   4805		}
   4806		req = nextreq;
   4807	}
   4808	mutex_unlock(&mdsc->mutex);
   4809	ceph_put_mds_session(last_session);
   4810	dout("%s done\n", __func__);
   4811}
   4812
   4813void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
   4814{
   4815	u64 want_tid, want_flush;
   4816
   4817	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
   4818		return;
   4819
   4820	dout("sync\n");
   4821	mutex_lock(&mdsc->mutex);
   4822	want_tid = mdsc->last_tid;
   4823	mutex_unlock(&mdsc->mutex);
   4824
   4825	ceph_flush_dirty_caps(mdsc);
   4826	spin_lock(&mdsc->cap_dirty_lock);
   4827	want_flush = mdsc->last_cap_flush_tid;
   4828	if (!list_empty(&mdsc->cap_flush_list)) {
   4829		struct ceph_cap_flush *cf =
   4830			list_last_entry(&mdsc->cap_flush_list,
   4831					struct ceph_cap_flush, g_list);
   4832		cf->wake = true;
   4833	}
   4834	spin_unlock(&mdsc->cap_dirty_lock);
   4835
   4836	dout("sync want tid %lld flush_seq %lld\n",
   4837	     want_tid, want_flush);
   4838
   4839	flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
   4840	wait_caps_flush(mdsc, want_flush);
   4841}
   4842
   4843/*
   4844 * true if all sessions are closed, or we force unmount
   4845 */
   4846static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
   4847{
   4848	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
   4849		return true;
   4850	return atomic_read(&mdsc->num_sessions) <= skipped;
   4851}
   4852
   4853/*
   4854 * called after sb is ro.
   4855 */
   4856void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
   4857{
   4858	struct ceph_options *opts = mdsc->fsc->client->options;
   4859	struct ceph_mds_session *session;
   4860	int i;
   4861	int skipped = 0;
   4862
   4863	dout("close_sessions\n");
   4864
   4865	/* close sessions */
   4866	mutex_lock(&mdsc->mutex);
   4867	for (i = 0; i < mdsc->max_sessions; i++) {
   4868		session = __ceph_lookup_mds_session(mdsc, i);
   4869		if (!session)
   4870			continue;
   4871		mutex_unlock(&mdsc->mutex);
   4872		mutex_lock(&session->s_mutex);
   4873		if (__close_session(mdsc, session) <= 0)
   4874			skipped++;
   4875		mutex_unlock(&session->s_mutex);
   4876		ceph_put_mds_session(session);
   4877		mutex_lock(&mdsc->mutex);
   4878	}
   4879	mutex_unlock(&mdsc->mutex);
   4880
   4881	dout("waiting for sessions to close\n");
   4882	wait_event_timeout(mdsc->session_close_wq,
   4883			   done_closing_sessions(mdsc, skipped),
   4884			   ceph_timeout_jiffies(opts->mount_timeout));
   4885
   4886	/* tear down remaining sessions */
   4887	mutex_lock(&mdsc->mutex);
   4888	for (i = 0; i < mdsc->max_sessions; i++) {
   4889		if (mdsc->sessions[i]) {
   4890			session = ceph_get_mds_session(mdsc->sessions[i]);
   4891			__unregister_session(mdsc, session);
   4892			mutex_unlock(&mdsc->mutex);
   4893			mutex_lock(&session->s_mutex);
   4894			remove_session_caps(session);
   4895			mutex_unlock(&session->s_mutex);
   4896			ceph_put_mds_session(session);
   4897			mutex_lock(&mdsc->mutex);
   4898		}
   4899	}
   4900	WARN_ON(!list_empty(&mdsc->cap_delay_list));
   4901	mutex_unlock(&mdsc->mutex);
   4902
   4903	ceph_cleanup_snapid_map(mdsc);
   4904	ceph_cleanup_global_and_empty_realms(mdsc);
   4905
   4906	cancel_work_sync(&mdsc->cap_reclaim_work);
   4907	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
   4908
   4909	dout("stopped\n");
   4910}
   4911
   4912void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
   4913{
   4914	struct ceph_mds_session *session;
   4915	int mds;
   4916
   4917	dout("force umount\n");
   4918
   4919	mutex_lock(&mdsc->mutex);
   4920	for (mds = 0; mds < mdsc->max_sessions; mds++) {
   4921		session = __ceph_lookup_mds_session(mdsc, mds);
   4922		if (!session)
   4923			continue;
   4924
   4925		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
   4926			__unregister_session(mdsc, session);
   4927		__wake_requests(mdsc, &session->s_waiting);
   4928		mutex_unlock(&mdsc->mutex);
   4929
   4930		mutex_lock(&session->s_mutex);
   4931		__close_session(mdsc, session);
   4932		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
   4933			cleanup_session_requests(mdsc, session);
   4934			remove_session_caps(session);
   4935		}
   4936		mutex_unlock(&session->s_mutex);
   4937		ceph_put_mds_session(session);
   4938
   4939		mutex_lock(&mdsc->mutex);
   4940		kick_requests(mdsc, mds);
   4941	}
   4942	__wake_requests(mdsc, &mdsc->waiting_for_map);
   4943	mutex_unlock(&mdsc->mutex);
   4944}
   4945
   4946static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
   4947{
   4948	dout("stop\n");
   4949	/*
   4950	 * Make sure the delayed work stopped before releasing
   4951	 * the resources.
   4952	 *
   4953	 * Because the cancel_delayed_work_sync() will only
   4954	 * guarantee that the work finishes executing. But the
   4955	 * delayed work will re-arm itself again after that.
   4956	 */
   4957	flush_delayed_work(&mdsc->delayed_work);
   4958
   4959	if (mdsc->mdsmap)
   4960		ceph_mdsmap_destroy(mdsc->mdsmap);
   4961	kfree(mdsc->sessions);
   4962	ceph_caps_finalize(mdsc);
   4963	ceph_pool_perm_destroy(mdsc);
   4964}
   4965
   4966void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
   4967{
   4968	struct ceph_mds_client *mdsc = fsc->mdsc;
   4969	dout("mdsc_destroy %p\n", mdsc);
   4970
   4971	if (!mdsc)
   4972		return;
   4973
   4974	/* flush out any connection work with references to us */
   4975	ceph_msgr_flush();
   4976
   4977	ceph_mdsc_stop(mdsc);
   4978
   4979	ceph_metric_destroy(&mdsc->metric);
   4980
   4981	fsc->mdsc = NULL;
   4982	kfree(mdsc);
   4983	dout("mdsc_destroy %p done\n", mdsc);
   4984}
   4985
   4986void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
   4987{
   4988	struct ceph_fs_client *fsc = mdsc->fsc;
   4989	const char *mds_namespace = fsc->mount_options->mds_namespace;
   4990	void *p = msg->front.iov_base;
   4991	void *end = p + msg->front.iov_len;
   4992	u32 epoch;
   4993	u32 num_fs;
   4994	u32 mount_fscid = (u32)-1;
   4995	int err = -EINVAL;
   4996
   4997	ceph_decode_need(&p, end, sizeof(u32), bad);
   4998	epoch = ceph_decode_32(&p);
   4999
   5000	dout("handle_fsmap epoch %u\n", epoch);
   5001
   5002	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
   5003	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
   5004
   5005	ceph_decode_32_safe(&p, end, num_fs, bad);
   5006	while (num_fs-- > 0) {
   5007		void *info_p, *info_end;
   5008		u32 info_len;
   5009		u32 fscid, namelen;
   5010
   5011		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
   5012		p += 2;		// info_v, info_cv
   5013		info_len = ceph_decode_32(&p);
   5014		ceph_decode_need(&p, end, info_len, bad);
   5015		info_p = p;
   5016		info_end = p + info_len;
   5017		p = info_end;
   5018
   5019		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
   5020		fscid = ceph_decode_32(&info_p);
   5021		namelen = ceph_decode_32(&info_p);
   5022		ceph_decode_need(&info_p, info_end, namelen, bad);
   5023
   5024		if (mds_namespace &&
   5025		    strlen(mds_namespace) == namelen &&
   5026		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
   5027			mount_fscid = fscid;
   5028			break;
   5029		}
   5030	}
   5031
   5032	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
   5033	if (mount_fscid != (u32)-1) {
   5034		fsc->client->monc.fs_cluster_id = mount_fscid;
   5035		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
   5036				   0, true);
   5037		ceph_monc_renew_subs(&fsc->client->monc);
   5038	} else {
   5039		err = -ENOENT;
   5040		goto err_out;
   5041	}
   5042	return;
   5043
   5044bad:
   5045	pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
   5046	ceph_umount_begin(mdsc->fsc->sb);
   5047err_out:
   5048	mutex_lock(&mdsc->mutex);
   5049	mdsc->mdsmap_err = err;
   5050	__wake_requests(mdsc, &mdsc->waiting_for_map);
   5051	mutex_unlock(&mdsc->mutex);
   5052}
   5053
   5054/*
   5055 * handle mds map update.
   5056 */
   5057void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
   5058{
   5059	u32 epoch;
   5060	u32 maplen;
   5061	void *p = msg->front.iov_base;
   5062	void *end = p + msg->front.iov_len;
   5063	struct ceph_mdsmap *newmap, *oldmap;
   5064	struct ceph_fsid fsid;
   5065	int err = -EINVAL;
   5066
   5067	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
   5068	ceph_decode_copy(&p, &fsid, sizeof(fsid));
   5069	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
   5070		return;
   5071	epoch = ceph_decode_32(&p);
   5072	maplen = ceph_decode_32(&p);
   5073	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
   5074
   5075	/* do we need it? */
   5076	mutex_lock(&mdsc->mutex);
   5077	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
   5078		dout("handle_map epoch %u <= our %u\n",
   5079		     epoch, mdsc->mdsmap->m_epoch);
   5080		mutex_unlock(&mdsc->mutex);
   5081		return;
   5082	}
   5083
   5084	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
   5085	if (IS_ERR(newmap)) {
   5086		err = PTR_ERR(newmap);
   5087		goto bad_unlock;
   5088	}
   5089
   5090	/* swap into place */
   5091	if (mdsc->mdsmap) {
   5092		oldmap = mdsc->mdsmap;
   5093		mdsc->mdsmap = newmap;
   5094		check_new_map(mdsc, newmap, oldmap);
   5095		ceph_mdsmap_destroy(oldmap);
   5096	} else {
   5097		mdsc->mdsmap = newmap;  /* first mds map */
   5098	}
   5099	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
   5100					MAX_LFS_FILESIZE);
   5101
   5102	__wake_requests(mdsc, &mdsc->waiting_for_map);
   5103	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
   5104			  mdsc->mdsmap->m_epoch);
   5105
   5106	mutex_unlock(&mdsc->mutex);
   5107	schedule_delayed(mdsc, 0);
   5108	return;
   5109
   5110bad_unlock:
   5111	mutex_unlock(&mdsc->mutex);
   5112bad:
   5113	pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
   5114	ceph_umount_begin(mdsc->fsc->sb);
   5115	return;
   5116}
   5117
   5118static struct ceph_connection *mds_get_con(struct ceph_connection *con)
   5119{
   5120	struct ceph_mds_session *s = con->private;
   5121
   5122	if (ceph_get_mds_session(s))
   5123		return con;
   5124	return NULL;
   5125}
   5126
   5127static void mds_put_con(struct ceph_connection *con)
   5128{
   5129	struct ceph_mds_session *s = con->private;
   5130
   5131	ceph_put_mds_session(s);
   5132}
   5133
   5134/*
   5135 * if the client is unresponsive for long enough, the mds will kill
   5136 * the session entirely.
   5137 */
   5138static void mds_peer_reset(struct ceph_connection *con)
   5139{
   5140	struct ceph_mds_session *s = con->private;
   5141	struct ceph_mds_client *mdsc = s->s_mdsc;
   5142
   5143	pr_warn("mds%d closed our session\n", s->s_mds);
   5144	send_mds_reconnect(mdsc, s);
   5145}
   5146
   5147static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
   5148{
   5149	struct ceph_mds_session *s = con->private;
   5150	struct ceph_mds_client *mdsc = s->s_mdsc;
   5151	int type = le16_to_cpu(msg->hdr.type);
   5152
   5153	mutex_lock(&mdsc->mutex);
   5154	if (__verify_registered_session(mdsc, s) < 0) {
   5155		mutex_unlock(&mdsc->mutex);
   5156		goto out;
   5157	}
   5158	mutex_unlock(&mdsc->mutex);
   5159
   5160	switch (type) {
   5161	case CEPH_MSG_MDS_MAP:
   5162		ceph_mdsc_handle_mdsmap(mdsc, msg);
   5163		break;
   5164	case CEPH_MSG_FS_MAP_USER:
   5165		ceph_mdsc_handle_fsmap(mdsc, msg);
   5166		break;
   5167	case CEPH_MSG_CLIENT_SESSION:
   5168		handle_session(s, msg);
   5169		break;
   5170	case CEPH_MSG_CLIENT_REPLY:
   5171		handle_reply(s, msg);
   5172		break;
   5173	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
   5174		handle_forward(mdsc, s, msg);
   5175		break;
   5176	case CEPH_MSG_CLIENT_CAPS:
   5177		ceph_handle_caps(s, msg);
   5178		break;
   5179	case CEPH_MSG_CLIENT_SNAP:
   5180		ceph_handle_snap(mdsc, s, msg);
   5181		break;
   5182	case CEPH_MSG_CLIENT_LEASE:
   5183		handle_lease(mdsc, s, msg);
   5184		break;
   5185	case CEPH_MSG_CLIENT_QUOTA:
   5186		ceph_handle_quota(mdsc, s, msg);
   5187		break;
   5188
   5189	default:
   5190		pr_err("received unknown message type %d %s\n", type,
   5191		       ceph_msg_type_name(type));
   5192	}
   5193out:
   5194	ceph_msg_put(msg);
   5195}
   5196
   5197/*
   5198 * authentication
   5199 */
   5200
   5201/*
   5202 * Note: returned pointer is the address of a structure that's
   5203 * managed separately.  Caller must *not* attempt to free it.
   5204 */
   5205static struct ceph_auth_handshake *
   5206mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
   5207{
   5208	struct ceph_mds_session *s = con->private;
   5209	struct ceph_mds_client *mdsc = s->s_mdsc;
   5210	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
   5211	struct ceph_auth_handshake *auth = &s->s_auth;
   5212	int ret;
   5213
   5214	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
   5215					 force_new, proto, NULL, NULL);
   5216	if (ret)
   5217		return ERR_PTR(ret);
   5218
   5219	return auth;
   5220}
   5221
   5222static int mds_add_authorizer_challenge(struct ceph_connection *con,
   5223				    void *challenge_buf, int challenge_buf_len)
   5224{
   5225	struct ceph_mds_session *s = con->private;
   5226	struct ceph_mds_client *mdsc = s->s_mdsc;
   5227	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
   5228
   5229	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
   5230					    challenge_buf, challenge_buf_len);
   5231}
   5232
   5233static int mds_verify_authorizer_reply(struct ceph_connection *con)
   5234{
   5235	struct ceph_mds_session *s = con->private;
   5236	struct ceph_mds_client *mdsc = s->s_mdsc;
   5237	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
   5238	struct ceph_auth_handshake *auth = &s->s_auth;
   5239
   5240	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
   5241		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
   5242		NULL, NULL, NULL, NULL);
   5243}
   5244
   5245static int mds_invalidate_authorizer(struct ceph_connection *con)
   5246{
   5247	struct ceph_mds_session *s = con->private;
   5248	struct ceph_mds_client *mdsc = s->s_mdsc;
   5249	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
   5250
   5251	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
   5252
   5253	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
   5254}
   5255
   5256static int mds_get_auth_request(struct ceph_connection *con,
   5257				void *buf, int *buf_len,
   5258				void **authorizer, int *authorizer_len)
   5259{
   5260	struct ceph_mds_session *s = con->private;
   5261	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
   5262	struct ceph_auth_handshake *auth = &s->s_auth;
   5263	int ret;
   5264
   5265	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
   5266				       buf, buf_len);
   5267	if (ret)
   5268		return ret;
   5269
   5270	*authorizer = auth->authorizer_buf;
   5271	*authorizer_len = auth->authorizer_buf_len;
   5272	return 0;
   5273}
   5274
   5275static int mds_handle_auth_reply_more(struct ceph_connection *con,
   5276				      void *reply, int reply_len,
   5277				      void *buf, int *buf_len,
   5278				      void **authorizer, int *authorizer_len)
   5279{
   5280	struct ceph_mds_session *s = con->private;
   5281	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
   5282	struct ceph_auth_handshake *auth = &s->s_auth;
   5283	int ret;
   5284
   5285	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
   5286					      buf, buf_len);
   5287	if (ret)
   5288		return ret;
   5289
   5290	*authorizer = auth->authorizer_buf;
   5291	*authorizer_len = auth->authorizer_buf_len;
   5292	return 0;
   5293}
   5294
   5295static int mds_handle_auth_done(struct ceph_connection *con,
   5296				u64 global_id, void *reply, int reply_len,
   5297				u8 *session_key, int *session_key_len,
   5298				u8 *con_secret, int *con_secret_len)
   5299{
   5300	struct ceph_mds_session *s = con->private;
   5301	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
   5302	struct ceph_auth_handshake *auth = &s->s_auth;
   5303
   5304	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
   5305					       session_key, session_key_len,
   5306					       con_secret, con_secret_len);
   5307}
   5308
   5309static int mds_handle_auth_bad_method(struct ceph_connection *con,
   5310				      int used_proto, int result,
   5311				      const int *allowed_protos, int proto_cnt,
   5312				      const int *allowed_modes, int mode_cnt)
   5313{
   5314	struct ceph_mds_session *s = con->private;
   5315	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
   5316	int ret;
   5317
   5318	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
   5319					    used_proto, result,
   5320					    allowed_protos, proto_cnt,
   5321					    allowed_modes, mode_cnt)) {
   5322		ret = ceph_monc_validate_auth(monc);
   5323		if (ret)
   5324			return ret;
   5325	}
   5326
   5327	return -EACCES;
   5328}
   5329
   5330static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
   5331				struct ceph_msg_header *hdr, int *skip)
   5332{
   5333	struct ceph_msg *msg;
   5334	int type = (int) le16_to_cpu(hdr->type);
   5335	int front_len = (int) le32_to_cpu(hdr->front_len);
   5336
   5337	if (con->in_msg)
   5338		return con->in_msg;
   5339
   5340	*skip = 0;
   5341	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
   5342	if (!msg) {
   5343		pr_err("unable to allocate msg type %d len %d\n",
   5344		       type, front_len);
   5345		return NULL;
   5346	}
   5347
   5348	return msg;
   5349}
   5350
   5351static int mds_sign_message(struct ceph_msg *msg)
   5352{
   5353       struct ceph_mds_session *s = msg->con->private;
   5354       struct ceph_auth_handshake *auth = &s->s_auth;
   5355
   5356       return ceph_auth_sign_message(auth, msg);
   5357}
   5358
   5359static int mds_check_message_signature(struct ceph_msg *msg)
   5360{
   5361       struct ceph_mds_session *s = msg->con->private;
   5362       struct ceph_auth_handshake *auth = &s->s_auth;
   5363
   5364       return ceph_auth_check_message_signature(auth, msg);
   5365}
   5366
   5367static const struct ceph_connection_operations mds_con_ops = {
   5368	.get = mds_get_con,
   5369	.put = mds_put_con,
   5370	.alloc_msg = mds_alloc_msg,
   5371	.dispatch = mds_dispatch,
   5372	.peer_reset = mds_peer_reset,
   5373	.get_authorizer = mds_get_authorizer,
   5374	.add_authorizer_challenge = mds_add_authorizer_challenge,
   5375	.verify_authorizer_reply = mds_verify_authorizer_reply,
   5376	.invalidate_authorizer = mds_invalidate_authorizer,
   5377	.sign_message = mds_sign_message,
   5378	.check_message_signature = mds_check_message_signature,
   5379	.get_auth_request = mds_get_auth_request,
   5380	.handle_auth_reply_more = mds_handle_auth_reply_more,
   5381	.handle_auth_done = mds_handle_auth_done,
   5382	.handle_auth_bad_method = mds_handle_auth_bad_method,
   5383};
   5384
   5385/* eof */