cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

caps.c (130311B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/fs.h>
      5#include <linux/kernel.h>
      6#include <linux/sched/signal.h>
      7#include <linux/slab.h>
      8#include <linux/vmalloc.h>
      9#include <linux/wait.h>
     10#include <linux/writeback.h>
     11#include <linux/iversion.h>
     12
     13#include "super.h"
     14#include "mds_client.h"
     15#include "cache.h"
     16#include <linux/ceph/decode.h>
     17#include <linux/ceph/messenger.h>
     18
     19/*
     20 * Capability management
     21 *
     22 * The Ceph metadata servers control client access to inode metadata
     23 * and file data by issuing capabilities, granting clients permission
     24 * to read and/or write both inode field and file data to OSDs
     25 * (storage nodes).  Each capability consists of a set of bits
     26 * indicating which operations are allowed.
     27 *
     28 * If the client holds a *_SHARED cap, the client has a coherent value
     29 * that can be safely read from the cached inode.
     30 *
     31 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
     32 * client is allowed to change inode attributes (e.g., file size,
     33 * mtime), note its dirty state in the ceph_cap, and asynchronously
     34 * flush that metadata change to the MDS.
     35 *
     36 * In the event of a conflicting operation (perhaps by another
     37 * client), the MDS will revoke the conflicting client capabilities.
     38 *
     39 * In order for a client to cache an inode, it must hold a capability
     40 * with at least one MDS server.  When inodes are released, release
     41 * notifications are batched and periodically sent en masse to the MDS
     42 * cluster to release server state.
     43 */
     44
     45static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
     46static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
     47				 struct ceph_mds_session *session,
     48				 struct ceph_inode_info *ci,
     49				 u64 oldest_flush_tid);
     50
     51/*
     52 * Generate readable cap strings for debugging output.
     53 */
     54#define MAX_CAP_STR 20
     55static char cap_str[MAX_CAP_STR][40];
     56static DEFINE_SPINLOCK(cap_str_lock);
     57static int last_cap_str;
     58
     59static char *gcap_string(char *s, int c)
     60{
     61	if (c & CEPH_CAP_GSHARED)
     62		*s++ = 's';
     63	if (c & CEPH_CAP_GEXCL)
     64		*s++ = 'x';
     65	if (c & CEPH_CAP_GCACHE)
     66		*s++ = 'c';
     67	if (c & CEPH_CAP_GRD)
     68		*s++ = 'r';
     69	if (c & CEPH_CAP_GWR)
     70		*s++ = 'w';
     71	if (c & CEPH_CAP_GBUFFER)
     72		*s++ = 'b';
     73	if (c & CEPH_CAP_GWREXTEND)
     74		*s++ = 'a';
     75	if (c & CEPH_CAP_GLAZYIO)
     76		*s++ = 'l';
     77	return s;
     78}
     79
     80const char *ceph_cap_string(int caps)
     81{
     82	int i;
     83	char *s;
     84	int c;
     85
     86	spin_lock(&cap_str_lock);
     87	i = last_cap_str++;
     88	if (last_cap_str == MAX_CAP_STR)
     89		last_cap_str = 0;
     90	spin_unlock(&cap_str_lock);
     91
     92	s = cap_str[i];
     93
     94	if (caps & CEPH_CAP_PIN)
     95		*s++ = 'p';
     96
     97	c = (caps >> CEPH_CAP_SAUTH) & 3;
     98	if (c) {
     99		*s++ = 'A';
    100		s = gcap_string(s, c);
    101	}
    102
    103	c = (caps >> CEPH_CAP_SLINK) & 3;
    104	if (c) {
    105		*s++ = 'L';
    106		s = gcap_string(s, c);
    107	}
    108
    109	c = (caps >> CEPH_CAP_SXATTR) & 3;
    110	if (c) {
    111		*s++ = 'X';
    112		s = gcap_string(s, c);
    113	}
    114
    115	c = caps >> CEPH_CAP_SFILE;
    116	if (c) {
    117		*s++ = 'F';
    118		s = gcap_string(s, c);
    119	}
    120
    121	if (s == cap_str[i])
    122		*s++ = '-';
    123	*s = 0;
    124	return cap_str[i];
    125}
    126
    127void ceph_caps_init(struct ceph_mds_client *mdsc)
    128{
    129	INIT_LIST_HEAD(&mdsc->caps_list);
    130	spin_lock_init(&mdsc->caps_list_lock);
    131}
    132
    133void ceph_caps_finalize(struct ceph_mds_client *mdsc)
    134{
    135	struct ceph_cap *cap;
    136
    137	spin_lock(&mdsc->caps_list_lock);
    138	while (!list_empty(&mdsc->caps_list)) {
    139		cap = list_first_entry(&mdsc->caps_list,
    140				       struct ceph_cap, caps_item);
    141		list_del(&cap->caps_item);
    142		kmem_cache_free(ceph_cap_cachep, cap);
    143	}
    144	mdsc->caps_total_count = 0;
    145	mdsc->caps_avail_count = 0;
    146	mdsc->caps_use_count = 0;
    147	mdsc->caps_reserve_count = 0;
    148	mdsc->caps_min_count = 0;
    149	spin_unlock(&mdsc->caps_list_lock);
    150}
    151
    152void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
    153			      struct ceph_mount_options *fsopt)
    154{
    155	spin_lock(&mdsc->caps_list_lock);
    156	mdsc->caps_min_count = fsopt->max_readdir;
    157	if (mdsc->caps_min_count < 1024)
    158		mdsc->caps_min_count = 1024;
    159	mdsc->caps_use_max = fsopt->caps_max;
    160	if (mdsc->caps_use_max > 0 &&
    161	    mdsc->caps_use_max < mdsc->caps_min_count)
    162		mdsc->caps_use_max = mdsc->caps_min_count;
    163	spin_unlock(&mdsc->caps_list_lock);
    164}
    165
    166static void __ceph_unreserve_caps(struct ceph_mds_client *mdsc, int nr_caps)
    167{
    168	struct ceph_cap *cap;
    169	int i;
    170
    171	if (nr_caps) {
    172		BUG_ON(mdsc->caps_reserve_count < nr_caps);
    173		mdsc->caps_reserve_count -= nr_caps;
    174		if (mdsc->caps_avail_count >=
    175		    mdsc->caps_reserve_count + mdsc->caps_min_count) {
    176			mdsc->caps_total_count -= nr_caps;
    177			for (i = 0; i < nr_caps; i++) {
    178				cap = list_first_entry(&mdsc->caps_list,
    179					struct ceph_cap, caps_item);
    180				list_del(&cap->caps_item);
    181				kmem_cache_free(ceph_cap_cachep, cap);
    182			}
    183		} else {
    184			mdsc->caps_avail_count += nr_caps;
    185		}
    186
    187		dout("%s: caps %d = %d used + %d resv + %d avail\n",
    188		     __func__,
    189		     mdsc->caps_total_count, mdsc->caps_use_count,
    190		     mdsc->caps_reserve_count, mdsc->caps_avail_count);
    191		BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    192						 mdsc->caps_reserve_count +
    193						 mdsc->caps_avail_count);
    194	}
    195}
    196
    197/*
    198 * Called under mdsc->mutex.
    199 */
    200int ceph_reserve_caps(struct ceph_mds_client *mdsc,
    201		      struct ceph_cap_reservation *ctx, int need)
    202{
    203	int i, j;
    204	struct ceph_cap *cap;
    205	int have;
    206	int alloc = 0;
    207	int max_caps;
    208	int err = 0;
    209	bool trimmed = false;
    210	struct ceph_mds_session *s;
    211	LIST_HEAD(newcaps);
    212
    213	dout("reserve caps ctx=%p need=%d\n", ctx, need);
    214
    215	/* first reserve any caps that are already allocated */
    216	spin_lock(&mdsc->caps_list_lock);
    217	if (mdsc->caps_avail_count >= need)
    218		have = need;
    219	else
    220		have = mdsc->caps_avail_count;
    221	mdsc->caps_avail_count -= have;
    222	mdsc->caps_reserve_count += have;
    223	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    224					 mdsc->caps_reserve_count +
    225					 mdsc->caps_avail_count);
    226	spin_unlock(&mdsc->caps_list_lock);
    227
    228	for (i = have; i < need; ) {
    229		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
    230		if (cap) {
    231			list_add(&cap->caps_item, &newcaps);
    232			alloc++;
    233			i++;
    234			continue;
    235		}
    236
    237		if (!trimmed) {
    238			for (j = 0; j < mdsc->max_sessions; j++) {
    239				s = __ceph_lookup_mds_session(mdsc, j);
    240				if (!s)
    241					continue;
    242				mutex_unlock(&mdsc->mutex);
    243
    244				mutex_lock(&s->s_mutex);
    245				max_caps = s->s_nr_caps - (need - i);
    246				ceph_trim_caps(mdsc, s, max_caps);
    247				mutex_unlock(&s->s_mutex);
    248
    249				ceph_put_mds_session(s);
    250				mutex_lock(&mdsc->mutex);
    251			}
    252			trimmed = true;
    253
    254			spin_lock(&mdsc->caps_list_lock);
    255			if (mdsc->caps_avail_count) {
    256				int more_have;
    257				if (mdsc->caps_avail_count >= need - i)
    258					more_have = need - i;
    259				else
    260					more_have = mdsc->caps_avail_count;
    261
    262				i += more_have;
    263				have += more_have;
    264				mdsc->caps_avail_count -= more_have;
    265				mdsc->caps_reserve_count += more_have;
    266
    267			}
    268			spin_unlock(&mdsc->caps_list_lock);
    269
    270			continue;
    271		}
    272
    273		pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
    274			ctx, need, have + alloc);
    275		err = -ENOMEM;
    276		break;
    277	}
    278
    279	if (!err) {
    280		BUG_ON(have + alloc != need);
    281		ctx->count = need;
    282		ctx->used = 0;
    283	}
    284
    285	spin_lock(&mdsc->caps_list_lock);
    286	mdsc->caps_total_count += alloc;
    287	mdsc->caps_reserve_count += alloc;
    288	list_splice(&newcaps, &mdsc->caps_list);
    289
    290	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    291					 mdsc->caps_reserve_count +
    292					 mdsc->caps_avail_count);
    293
    294	if (err)
    295		__ceph_unreserve_caps(mdsc, have + alloc);
    296
    297	spin_unlock(&mdsc->caps_list_lock);
    298
    299	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
    300	     ctx, mdsc->caps_total_count, mdsc->caps_use_count,
    301	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
    302	return err;
    303}
    304
    305void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
    306			 struct ceph_cap_reservation *ctx)
    307{
    308	bool reclaim = false;
    309	if (!ctx->count)
    310		return;
    311
    312	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
    313	spin_lock(&mdsc->caps_list_lock);
    314	__ceph_unreserve_caps(mdsc, ctx->count);
    315	ctx->count = 0;
    316
    317	if (mdsc->caps_use_max > 0 &&
    318	    mdsc->caps_use_count > mdsc->caps_use_max)
    319		reclaim = true;
    320	spin_unlock(&mdsc->caps_list_lock);
    321
    322	if (reclaim)
    323		ceph_reclaim_caps_nr(mdsc, ctx->used);
    324}
    325
    326struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
    327			      struct ceph_cap_reservation *ctx)
    328{
    329	struct ceph_cap *cap = NULL;
    330
    331	/* temporary, until we do something about cap import/export */
    332	if (!ctx) {
    333		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
    334		if (cap) {
    335			spin_lock(&mdsc->caps_list_lock);
    336			mdsc->caps_use_count++;
    337			mdsc->caps_total_count++;
    338			spin_unlock(&mdsc->caps_list_lock);
    339		} else {
    340			spin_lock(&mdsc->caps_list_lock);
    341			if (mdsc->caps_avail_count) {
    342				BUG_ON(list_empty(&mdsc->caps_list));
    343
    344				mdsc->caps_avail_count--;
    345				mdsc->caps_use_count++;
    346				cap = list_first_entry(&mdsc->caps_list,
    347						struct ceph_cap, caps_item);
    348				list_del(&cap->caps_item);
    349
    350				BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    351				       mdsc->caps_reserve_count + mdsc->caps_avail_count);
    352			}
    353			spin_unlock(&mdsc->caps_list_lock);
    354		}
    355
    356		return cap;
    357	}
    358
    359	spin_lock(&mdsc->caps_list_lock);
    360	dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
    361	     ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
    362	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
    363	BUG_ON(!ctx->count);
    364	BUG_ON(ctx->count > mdsc->caps_reserve_count);
    365	BUG_ON(list_empty(&mdsc->caps_list));
    366
    367	ctx->count--;
    368	ctx->used++;
    369	mdsc->caps_reserve_count--;
    370	mdsc->caps_use_count++;
    371
    372	cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
    373	list_del(&cap->caps_item);
    374
    375	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    376	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
    377	spin_unlock(&mdsc->caps_list_lock);
    378	return cap;
    379}
    380
    381void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
    382{
    383	spin_lock(&mdsc->caps_list_lock);
    384	dout("put_cap %p %d = %d used + %d resv + %d avail\n",
    385	     cap, mdsc->caps_total_count, mdsc->caps_use_count,
    386	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
    387	mdsc->caps_use_count--;
    388	/*
    389	 * Keep some preallocated caps around (ceph_min_count), to
    390	 * avoid lots of free/alloc churn.
    391	 */
    392	if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
    393				      mdsc->caps_min_count) {
    394		mdsc->caps_total_count--;
    395		kmem_cache_free(ceph_cap_cachep, cap);
    396	} else {
    397		mdsc->caps_avail_count++;
    398		list_add(&cap->caps_item, &mdsc->caps_list);
    399	}
    400
    401	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
    402	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
    403	spin_unlock(&mdsc->caps_list_lock);
    404}
    405
    406void ceph_reservation_status(struct ceph_fs_client *fsc,
    407			     int *total, int *avail, int *used, int *reserved,
    408			     int *min)
    409{
    410	struct ceph_mds_client *mdsc = fsc->mdsc;
    411
    412	spin_lock(&mdsc->caps_list_lock);
    413
    414	if (total)
    415		*total = mdsc->caps_total_count;
    416	if (avail)
    417		*avail = mdsc->caps_avail_count;
    418	if (used)
    419		*used = mdsc->caps_use_count;
    420	if (reserved)
    421		*reserved = mdsc->caps_reserve_count;
    422	if (min)
    423		*min = mdsc->caps_min_count;
    424
    425	spin_unlock(&mdsc->caps_list_lock);
    426}
    427
    428/*
    429 * Find ceph_cap for given mds, if any.
    430 *
    431 * Called with i_ceph_lock held.
    432 */
    433static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
    434{
    435	struct ceph_cap *cap;
    436	struct rb_node *n = ci->i_caps.rb_node;
    437
    438	while (n) {
    439		cap = rb_entry(n, struct ceph_cap, ci_node);
    440		if (mds < cap->mds)
    441			n = n->rb_left;
    442		else if (mds > cap->mds)
    443			n = n->rb_right;
    444		else
    445			return cap;
    446	}
    447	return NULL;
    448}
    449
    450struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
    451{
    452	struct ceph_cap *cap;
    453
    454	spin_lock(&ci->i_ceph_lock);
    455	cap = __get_cap_for_mds(ci, mds);
    456	spin_unlock(&ci->i_ceph_lock);
    457	return cap;
    458}
    459
    460/*
    461 * Called under i_ceph_lock.
    462 */
    463static void __insert_cap_node(struct ceph_inode_info *ci,
    464			      struct ceph_cap *new)
    465{
    466	struct rb_node **p = &ci->i_caps.rb_node;
    467	struct rb_node *parent = NULL;
    468	struct ceph_cap *cap = NULL;
    469
    470	while (*p) {
    471		parent = *p;
    472		cap = rb_entry(parent, struct ceph_cap, ci_node);
    473		if (new->mds < cap->mds)
    474			p = &(*p)->rb_left;
    475		else if (new->mds > cap->mds)
    476			p = &(*p)->rb_right;
    477		else
    478			BUG();
    479	}
    480
    481	rb_link_node(&new->ci_node, parent, p);
    482	rb_insert_color(&new->ci_node, &ci->i_caps);
    483}
    484
    485/*
    486 * (re)set cap hold timeouts, which control the delayed release
    487 * of unused caps back to the MDS.  Should be called on cap use.
    488 */
    489static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
    490			       struct ceph_inode_info *ci)
    491{
    492	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
    493	ci->i_hold_caps_max = round_jiffies(jiffies +
    494					    opt->caps_wanted_delay_max * HZ);
    495	dout("__cap_set_timeouts %p %lu\n", &ci->netfs.inode,
    496	     ci->i_hold_caps_max - jiffies);
    497}
    498
    499/*
    500 * (Re)queue cap at the end of the delayed cap release list.
    501 *
    502 * If I_FLUSH is set, leave the inode at the front of the list.
    503 *
    504 * Caller holds i_ceph_lock
    505 *    -> we take mdsc->cap_delay_lock
    506 */
    507static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
    508				struct ceph_inode_info *ci)
    509{
    510	dout("__cap_delay_requeue %p flags 0x%lx at %lu\n", &ci->netfs.inode,
    511	     ci->i_ceph_flags, ci->i_hold_caps_max);
    512	if (!mdsc->stopping) {
    513		spin_lock(&mdsc->cap_delay_lock);
    514		if (!list_empty(&ci->i_cap_delay_list)) {
    515			if (ci->i_ceph_flags & CEPH_I_FLUSH)
    516				goto no_change;
    517			list_del_init(&ci->i_cap_delay_list);
    518		}
    519		__cap_set_timeouts(mdsc, ci);
    520		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
    521no_change:
    522		spin_unlock(&mdsc->cap_delay_lock);
    523	}
    524}
    525
    526/*
    527 * Queue an inode for immediate writeback.  Mark inode with I_FLUSH,
    528 * indicating we should send a cap message to flush dirty metadata
    529 * asap, and move to the front of the delayed cap list.
    530 */
    531static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
    532				      struct ceph_inode_info *ci)
    533{
    534	dout("__cap_delay_requeue_front %p\n", &ci->netfs.inode);
    535	spin_lock(&mdsc->cap_delay_lock);
    536	ci->i_ceph_flags |= CEPH_I_FLUSH;
    537	if (!list_empty(&ci->i_cap_delay_list))
    538		list_del_init(&ci->i_cap_delay_list);
    539	list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
    540	spin_unlock(&mdsc->cap_delay_lock);
    541}
    542
    543/*
    544 * Cancel delayed work on cap.
    545 *
    546 * Caller must hold i_ceph_lock.
    547 */
    548static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
    549			       struct ceph_inode_info *ci)
    550{
    551	dout("__cap_delay_cancel %p\n", &ci->netfs.inode);
    552	if (list_empty(&ci->i_cap_delay_list))
    553		return;
    554	spin_lock(&mdsc->cap_delay_lock);
    555	list_del_init(&ci->i_cap_delay_list);
    556	spin_unlock(&mdsc->cap_delay_lock);
    557}
    558
    559/* Common issue checks for add_cap, handle_cap_grant. */
    560static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
    561			      unsigned issued)
    562{
    563	unsigned had = __ceph_caps_issued(ci, NULL);
    564
    565	lockdep_assert_held(&ci->i_ceph_lock);
    566
    567	/*
    568	 * Each time we receive FILE_CACHE anew, we increment
    569	 * i_rdcache_gen.
    570	 */
    571	if (S_ISREG(ci->netfs.inode.i_mode) &&
    572	    (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
    573	    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
    574		ci->i_rdcache_gen++;
    575	}
    576
    577	/*
    578	 * If FILE_SHARED is newly issued, mark dir not complete. We don't
    579	 * know what happened to this directory while we didn't have the cap.
    580	 * If FILE_SHARED is being revoked, also mark dir not complete. It
    581	 * stops on-going cached readdir.
    582	 */
    583	if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
    584		if (issued & CEPH_CAP_FILE_SHARED)
    585			atomic_inc(&ci->i_shared_gen);
    586		if (S_ISDIR(ci->netfs.inode.i_mode)) {
    587			dout(" marking %p NOT complete\n", &ci->netfs.inode);
    588			__ceph_dir_clear_complete(ci);
    589		}
    590	}
    591
    592	/* Wipe saved layout if we're losing DIR_CREATE caps */
    593	if (S_ISDIR(ci->netfs.inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
    594		!(issued & CEPH_CAP_DIR_CREATE)) {
    595	     ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
    596	     memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
    597	}
    598}
    599
    600/**
    601 * change_auth_cap_ses - move inode to appropriate lists when auth caps change
    602 * @ci: inode to be moved
    603 * @session: new auth caps session
    604 */
    605static void change_auth_cap_ses(struct ceph_inode_info *ci,
    606				struct ceph_mds_session *session)
    607{
    608	lockdep_assert_held(&ci->i_ceph_lock);
    609
    610	if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
    611		return;
    612
    613	spin_lock(&session->s_mdsc->cap_dirty_lock);
    614	if (!list_empty(&ci->i_dirty_item))
    615		list_move(&ci->i_dirty_item, &session->s_cap_dirty);
    616	if (!list_empty(&ci->i_flushing_item))
    617		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
    618	spin_unlock(&session->s_mdsc->cap_dirty_lock);
    619}
    620
    621/*
    622 * Add a capability under the given MDS session.
    623 *
    624 * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
    625 *
    626 * @fmode is the open file mode, if we are opening a file, otherwise
    627 * it is < 0.  (This is so we can atomically add the cap and add an
    628 * open file reference to it.)
    629 */
    630void ceph_add_cap(struct inode *inode,
    631		  struct ceph_mds_session *session, u64 cap_id,
    632		  unsigned issued, unsigned wanted,
    633		  unsigned seq, unsigned mseq, u64 realmino, int flags,
    634		  struct ceph_cap **new_cap)
    635{
    636	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
    637	struct ceph_inode_info *ci = ceph_inode(inode);
    638	struct ceph_cap *cap;
    639	int mds = session->s_mds;
    640	int actual_wanted;
    641	u32 gen;
    642
    643	lockdep_assert_held(&ci->i_ceph_lock);
    644
    645	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
    646	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
    647
    648	gen = atomic_read(&session->s_cap_gen);
    649
    650	cap = __get_cap_for_mds(ci, mds);
    651	if (!cap) {
    652		cap = *new_cap;
    653		*new_cap = NULL;
    654
    655		cap->issued = 0;
    656		cap->implemented = 0;
    657		cap->mds = mds;
    658		cap->mds_wanted = 0;
    659		cap->mseq = 0;
    660
    661		cap->ci = ci;
    662		__insert_cap_node(ci, cap);
    663
    664		/* add to session cap list */
    665		cap->session = session;
    666		spin_lock(&session->s_cap_lock);
    667		list_add_tail(&cap->session_caps, &session->s_caps);
    668		session->s_nr_caps++;
    669		atomic64_inc(&mdsc->metric.total_caps);
    670		spin_unlock(&session->s_cap_lock);
    671	} else {
    672		spin_lock(&session->s_cap_lock);
    673		list_move_tail(&cap->session_caps, &session->s_caps);
    674		spin_unlock(&session->s_cap_lock);
    675
    676		if (cap->cap_gen < gen)
    677			cap->issued = cap->implemented = CEPH_CAP_PIN;
    678
    679		/*
    680		 * auth mds of the inode changed. we received the cap export
    681		 * message, but still haven't received the cap import message.
    682		 * handle_cap_export() updated the new auth MDS' cap.
    683		 *
    684		 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
    685		 * a message that was send before the cap import message. So
    686		 * don't remove caps.
    687		 */
    688		if (ceph_seq_cmp(seq, cap->seq) <= 0) {
    689			WARN_ON(cap != ci->i_auth_cap);
    690			WARN_ON(cap->cap_id != cap_id);
    691			seq = cap->seq;
    692			mseq = cap->mseq;
    693			issued |= cap->issued;
    694			flags |= CEPH_CAP_FLAG_AUTH;
    695		}
    696	}
    697
    698	if (!ci->i_snap_realm ||
    699	    ((flags & CEPH_CAP_FLAG_AUTH) &&
    700	     realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
    701		/*
    702		 * add this inode to the appropriate snap realm
    703		 */
    704		struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
    705							       realmino);
    706		if (realm)
    707			ceph_change_snap_realm(inode, realm);
    708		else
    709			WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
    710			     __func__, realmino, ci->i_vino.ino,
    711			     ci->i_snap_realm ? ci->i_snap_realm->ino : 0);
    712	}
    713
    714	__check_cap_issue(ci, cap, issued);
    715
    716	/*
    717	 * If we are issued caps we don't want, or the mds' wanted
    718	 * value appears to be off, queue a check so we'll release
    719	 * later and/or update the mds wanted value.
    720	 */
    721	actual_wanted = __ceph_caps_wanted(ci);
    722	if ((wanted & ~actual_wanted) ||
    723	    (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
    724		dout(" issued %s, mds wanted %s, actual %s, queueing\n",
    725		     ceph_cap_string(issued), ceph_cap_string(wanted),
    726		     ceph_cap_string(actual_wanted));
    727		__cap_delay_requeue(mdsc, ci);
    728	}
    729
    730	if (flags & CEPH_CAP_FLAG_AUTH) {
    731		if (!ci->i_auth_cap ||
    732		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
    733			if (ci->i_auth_cap &&
    734			    ci->i_auth_cap->session != cap->session)
    735				change_auth_cap_ses(ci, cap->session);
    736			ci->i_auth_cap = cap;
    737			cap->mds_wanted = wanted;
    738		}
    739	} else {
    740		WARN_ON(ci->i_auth_cap == cap);
    741	}
    742
    743	dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
    744	     inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
    745	     ceph_cap_string(issued|cap->issued), seq, mds);
    746	cap->cap_id = cap_id;
    747	cap->issued = issued;
    748	cap->implemented |= issued;
    749	if (ceph_seq_cmp(mseq, cap->mseq) > 0)
    750		cap->mds_wanted = wanted;
    751	else
    752		cap->mds_wanted |= wanted;
    753	cap->seq = seq;
    754	cap->issue_seq = seq;
    755	cap->mseq = mseq;
    756	cap->cap_gen = gen;
    757}
    758
    759/*
    760 * Return true if cap has not timed out and belongs to the current
    761 * generation of the MDS session (i.e. has not gone 'stale' due to
    762 * us losing touch with the mds).
    763 */
    764static int __cap_is_valid(struct ceph_cap *cap)
    765{
    766	unsigned long ttl;
    767	u32 gen;
    768
    769	gen = atomic_read(&cap->session->s_cap_gen);
    770	ttl = cap->session->s_cap_ttl;
    771
    772	if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
    773		dout("__cap_is_valid %p cap %p issued %s "
    774		     "but STALE (gen %u vs %u)\n", &cap->ci->netfs.inode,
    775		     cap, ceph_cap_string(cap->issued), cap->cap_gen, gen);
    776		return 0;
    777	}
    778
    779	return 1;
    780}
    781
    782/*
    783 * Return set of valid cap bits issued to us.  Note that caps time
    784 * out, and may be invalidated in bulk if the client session times out
    785 * and session->s_cap_gen is bumped.
    786 */
    787int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
    788{
    789	int have = ci->i_snap_caps;
    790	struct ceph_cap *cap;
    791	struct rb_node *p;
    792
    793	if (implemented)
    794		*implemented = 0;
    795	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
    796		cap = rb_entry(p, struct ceph_cap, ci_node);
    797		if (!__cap_is_valid(cap))
    798			continue;
    799		dout("__ceph_caps_issued %p cap %p issued %s\n",
    800		     &ci->netfs.inode, cap, ceph_cap_string(cap->issued));
    801		have |= cap->issued;
    802		if (implemented)
    803			*implemented |= cap->implemented;
    804	}
    805	/*
    806	 * exclude caps issued by non-auth MDS, but are been revoking
    807	 * by the auth MDS. The non-auth MDS should be revoking/exporting
    808	 * these caps, but the message is delayed.
    809	 */
    810	if (ci->i_auth_cap) {
    811		cap = ci->i_auth_cap;
    812		have &= ~cap->implemented | cap->issued;
    813	}
    814	return have;
    815}
    816
    817/*
    818 * Get cap bits issued by caps other than @ocap
    819 */
    820int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
    821{
    822	int have = ci->i_snap_caps;
    823	struct ceph_cap *cap;
    824	struct rb_node *p;
    825
    826	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
    827		cap = rb_entry(p, struct ceph_cap, ci_node);
    828		if (cap == ocap)
    829			continue;
    830		if (!__cap_is_valid(cap))
    831			continue;
    832		have |= cap->issued;
    833	}
    834	return have;
    835}
    836
    837/*
    838 * Move a cap to the end of the LRU (oldest caps at list head, newest
    839 * at list tail).
    840 */
    841static void __touch_cap(struct ceph_cap *cap)
    842{
    843	struct ceph_mds_session *s = cap->session;
    844
    845	spin_lock(&s->s_cap_lock);
    846	if (!s->s_cap_iterator) {
    847		dout("__touch_cap %p cap %p mds%d\n", &cap->ci->netfs.inode, cap,
    848		     s->s_mds);
    849		list_move_tail(&cap->session_caps, &s->s_caps);
    850	} else {
    851		dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
    852		     &cap->ci->netfs.inode, cap, s->s_mds);
    853	}
    854	spin_unlock(&s->s_cap_lock);
    855}
    856
    857/*
    858 * Check if we hold the given mask.  If so, move the cap(s) to the
    859 * front of their respective LRUs.  (This is the preferred way for
    860 * callers to check for caps they want.)
    861 */
    862int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
    863{
    864	struct ceph_cap *cap;
    865	struct rb_node *p;
    866	int have = ci->i_snap_caps;
    867
    868	if ((have & mask) == mask) {
    869		dout("__ceph_caps_issued_mask ino 0x%llx snap issued %s"
    870		     " (mask %s)\n", ceph_ino(&ci->netfs.inode),
    871		     ceph_cap_string(have),
    872		     ceph_cap_string(mask));
    873		return 1;
    874	}
    875
    876	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
    877		cap = rb_entry(p, struct ceph_cap, ci_node);
    878		if (!__cap_is_valid(cap))
    879			continue;
    880		if ((cap->issued & mask) == mask) {
    881			dout("__ceph_caps_issued_mask ino 0x%llx cap %p issued %s"
    882			     " (mask %s)\n", ceph_ino(&ci->netfs.inode), cap,
    883			     ceph_cap_string(cap->issued),
    884			     ceph_cap_string(mask));
    885			if (touch)
    886				__touch_cap(cap);
    887			return 1;
    888		}
    889
    890		/* does a combination of caps satisfy mask? */
    891		have |= cap->issued;
    892		if ((have & mask) == mask) {
    893			dout("__ceph_caps_issued_mask ino 0x%llx combo issued %s"
    894			     " (mask %s)\n", ceph_ino(&ci->netfs.inode),
    895			     ceph_cap_string(cap->issued),
    896			     ceph_cap_string(mask));
    897			if (touch) {
    898				struct rb_node *q;
    899
    900				/* touch this + preceding caps */
    901				__touch_cap(cap);
    902				for (q = rb_first(&ci->i_caps); q != p;
    903				     q = rb_next(q)) {
    904					cap = rb_entry(q, struct ceph_cap,
    905						       ci_node);
    906					if (!__cap_is_valid(cap))
    907						continue;
    908					if (cap->issued & mask)
    909						__touch_cap(cap);
    910				}
    911			}
    912			return 1;
    913		}
    914	}
    915
    916	return 0;
    917}
    918
    919int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
    920				   int touch)
    921{
    922	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->netfs.inode.i_sb);
    923	int r;
    924
    925	r = __ceph_caps_issued_mask(ci, mask, touch);
    926	if (r)
    927		ceph_update_cap_hit(&fsc->mdsc->metric);
    928	else
    929		ceph_update_cap_mis(&fsc->mdsc->metric);
    930	return r;
    931}
    932
    933/*
    934 * Return true if mask caps are currently being revoked by an MDS.
    935 */
    936int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
    937			       struct ceph_cap *ocap, int mask)
    938{
    939	struct ceph_cap *cap;
    940	struct rb_node *p;
    941
    942	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
    943		cap = rb_entry(p, struct ceph_cap, ci_node);
    944		if (cap != ocap &&
    945		    (cap->implemented & ~cap->issued & mask))
    946			return 1;
    947	}
    948	return 0;
    949}
    950
    951int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
    952{
    953	struct inode *inode = &ci->netfs.inode;
    954	int ret;
    955
    956	spin_lock(&ci->i_ceph_lock);
    957	ret = __ceph_caps_revoking_other(ci, NULL, mask);
    958	spin_unlock(&ci->i_ceph_lock);
    959	dout("ceph_caps_revoking %p %s = %d\n", inode,
    960	     ceph_cap_string(mask), ret);
    961	return ret;
    962}
    963
    964int __ceph_caps_used(struct ceph_inode_info *ci)
    965{
    966	int used = 0;
    967	if (ci->i_pin_ref)
    968		used |= CEPH_CAP_PIN;
    969	if (ci->i_rd_ref)
    970		used |= CEPH_CAP_FILE_RD;
    971	if (ci->i_rdcache_ref ||
    972	    (S_ISREG(ci->netfs.inode.i_mode) &&
    973	     ci->netfs.inode.i_data.nrpages))
    974		used |= CEPH_CAP_FILE_CACHE;
    975	if (ci->i_wr_ref)
    976		used |= CEPH_CAP_FILE_WR;
    977	if (ci->i_wb_ref || ci->i_wrbuffer_ref)
    978		used |= CEPH_CAP_FILE_BUFFER;
    979	if (ci->i_fx_ref)
    980		used |= CEPH_CAP_FILE_EXCL;
    981	return used;
    982}
    983
    984#define FMODE_WAIT_BIAS 1000
    985
    986/*
    987 * wanted, by virtue of open file modes
    988 */
    989int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
    990{
    991	const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
    992	const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
    993	const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
    994	const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
    995	struct ceph_mount_options *opt =
    996		ceph_inode_to_client(&ci->netfs.inode)->mount_options;
    997	unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
    998	unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;
    999
   1000	if (S_ISDIR(ci->netfs.inode.i_mode)) {
   1001		int want = 0;
   1002
   1003		/* use used_cutoff here, to keep dir's wanted caps longer */
   1004		if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
   1005		    time_after(ci->i_last_rd, used_cutoff))
   1006			want |= CEPH_CAP_ANY_SHARED;
   1007
   1008		if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
   1009		    time_after(ci->i_last_wr, used_cutoff)) {
   1010			want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
   1011			if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
   1012				want |= CEPH_CAP_ANY_DIR_OPS;
   1013		}
   1014
   1015		if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
   1016			want |= CEPH_CAP_PIN;
   1017
   1018		return want;
   1019	} else {
   1020		int bits = 0;
   1021
   1022		if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
   1023			if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
   1024			    time_after(ci->i_last_rd, used_cutoff))
   1025				bits |= 1 << RD_SHIFT;
   1026		} else if (time_after(ci->i_last_rd, idle_cutoff)) {
   1027			bits |= 1 << RD_SHIFT;
   1028		}
   1029
   1030		if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
   1031			if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
   1032			    time_after(ci->i_last_wr, used_cutoff))
   1033				bits |= 1 << WR_SHIFT;
   1034		} else if (time_after(ci->i_last_wr, idle_cutoff)) {
   1035			bits |= 1 << WR_SHIFT;
   1036		}
   1037
   1038		/* check lazyio only when read/write is wanted */
   1039		if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
   1040		    ci->i_nr_by_mode[LAZY_SHIFT] > 0)
   1041			bits |= 1 << LAZY_SHIFT;
   1042
   1043		return bits ? ceph_caps_for_mode(bits >> 1) : 0;
   1044	}
   1045}
   1046
   1047/*
   1048 * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
   1049 */
   1050int __ceph_caps_wanted(struct ceph_inode_info *ci)
   1051{
   1052	int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
   1053	if (S_ISDIR(ci->netfs.inode.i_mode)) {
   1054		/* we want EXCL if holding caps of dir ops */
   1055		if (w & CEPH_CAP_ANY_DIR_OPS)
   1056			w |= CEPH_CAP_FILE_EXCL;
   1057	} else {
   1058		/* we want EXCL if dirty data */
   1059		if (w & CEPH_CAP_FILE_BUFFER)
   1060			w |= CEPH_CAP_FILE_EXCL;
   1061	}
   1062	return w;
   1063}
   1064
   1065/*
   1066 * Return caps we have registered with the MDS(s) as 'wanted'.
   1067 */
   1068int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
   1069{
   1070	struct ceph_cap *cap;
   1071	struct rb_node *p;
   1072	int mds_wanted = 0;
   1073
   1074	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
   1075		cap = rb_entry(p, struct ceph_cap, ci_node);
   1076		if (check && !__cap_is_valid(cap))
   1077			continue;
   1078		if (cap == ci->i_auth_cap)
   1079			mds_wanted |= cap->mds_wanted;
   1080		else
   1081			mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
   1082	}
   1083	return mds_wanted;
   1084}
   1085
   1086int ceph_is_any_caps(struct inode *inode)
   1087{
   1088	struct ceph_inode_info *ci = ceph_inode(inode);
   1089	int ret;
   1090
   1091	spin_lock(&ci->i_ceph_lock);
   1092	ret = __ceph_is_any_real_caps(ci);
   1093	spin_unlock(&ci->i_ceph_lock);
   1094
   1095	return ret;
   1096}
   1097
   1098/*
   1099 * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
   1100 *
   1101 * caller should hold i_ceph_lock.
   1102 * caller will not hold session s_mutex if called from destroy_inode.
   1103 */
   1104void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
   1105{
   1106	struct ceph_mds_session *session = cap->session;
   1107	struct ceph_inode_info *ci = cap->ci;
   1108	struct ceph_mds_client *mdsc;
   1109	int removed = 0;
   1110
   1111	/* 'ci' being NULL means the remove have already occurred */
   1112	if (!ci) {
   1113		dout("%s: cap inode is NULL\n", __func__);
   1114		return;
   1115	}
   1116
   1117	lockdep_assert_held(&ci->i_ceph_lock);
   1118
   1119	dout("__ceph_remove_cap %p from %p\n", cap, &ci->netfs.inode);
   1120
   1121	mdsc = ceph_inode_to_client(&ci->netfs.inode)->mdsc;
   1122
   1123	/* remove from inode's cap rbtree, and clear auth cap */
   1124	rb_erase(&cap->ci_node, &ci->i_caps);
   1125	if (ci->i_auth_cap == cap)
   1126		ci->i_auth_cap = NULL;
   1127
   1128	/* remove from session list */
   1129	spin_lock(&session->s_cap_lock);
   1130	if (session->s_cap_iterator == cap) {
   1131		/* not yet, we are iterating over this very cap */
   1132		dout("__ceph_remove_cap  delaying %p removal from session %p\n",
   1133		     cap, cap->session);
   1134	} else {
   1135		list_del_init(&cap->session_caps);
   1136		session->s_nr_caps--;
   1137		atomic64_dec(&mdsc->metric.total_caps);
   1138		cap->session = NULL;
   1139		removed = 1;
   1140	}
   1141	/* protect backpointer with s_cap_lock: see iterate_session_caps */
   1142	cap->ci = NULL;
   1143
   1144	/*
   1145	 * s_cap_reconnect is protected by s_cap_lock. no one changes
   1146	 * s_cap_gen while session is in the reconnect state.
   1147	 */
   1148	if (queue_release &&
   1149	    (!session->s_cap_reconnect ||
   1150	     cap->cap_gen == atomic_read(&session->s_cap_gen))) {
   1151		cap->queue_release = 1;
   1152		if (removed) {
   1153			__ceph_queue_cap_release(session, cap);
   1154			removed = 0;
   1155		}
   1156	} else {
   1157		cap->queue_release = 0;
   1158	}
   1159	cap->cap_ino = ci->i_vino.ino;
   1160
   1161	spin_unlock(&session->s_cap_lock);
   1162
   1163	if (removed)
   1164		ceph_put_cap(mdsc, cap);
   1165
   1166	if (!__ceph_is_any_real_caps(ci)) {
   1167		/* when reconnect denied, we remove session caps forcibly,
   1168		 * i_wr_ref can be non-zero. If there are ongoing write,
   1169		 * keep i_snap_realm.
   1170		 */
   1171		if (ci->i_wr_ref == 0 && ci->i_snap_realm)
   1172			ceph_change_snap_realm(&ci->netfs.inode, NULL);
   1173
   1174		__cap_delay_cancel(mdsc, ci);
   1175	}
   1176}
   1177
   1178void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
   1179{
   1180	struct ceph_inode_info *ci = cap->ci;
   1181	struct ceph_fs_client *fsc;
   1182
   1183	/* 'ci' being NULL means the remove have already occurred */
   1184	if (!ci) {
   1185		dout("%s: cap inode is NULL\n", __func__);
   1186		return;
   1187	}
   1188
   1189	lockdep_assert_held(&ci->i_ceph_lock);
   1190
   1191	fsc = ceph_inode_to_client(&ci->netfs.inode);
   1192	WARN_ON_ONCE(ci->i_auth_cap == cap &&
   1193		     !list_empty(&ci->i_dirty_item) &&
   1194		     !fsc->blocklisted &&
   1195		     !ceph_inode_is_shutdown(&ci->netfs.inode));
   1196
   1197	__ceph_remove_cap(cap, queue_release);
   1198}
   1199
   1200struct cap_msg_args {
   1201	struct ceph_mds_session	*session;
   1202	u64			ino, cid, follows;
   1203	u64			flush_tid, oldest_flush_tid, size, max_size;
   1204	u64			xattr_version;
   1205	u64			change_attr;
   1206	struct ceph_buffer	*xattr_buf;
   1207	struct ceph_buffer	*old_xattr_buf;
   1208	struct timespec64	atime, mtime, ctime, btime;
   1209	int			op, caps, wanted, dirty;
   1210	u32			seq, issue_seq, mseq, time_warp_seq;
   1211	u32			flags;
   1212	kuid_t			uid;
   1213	kgid_t			gid;
   1214	umode_t			mode;
   1215	bool			inline_data;
   1216	bool			wake;
   1217};
   1218
   1219/*
   1220 * cap struct size + flock buffer size + inline version + inline data size +
   1221 * osd_epoch_barrier + oldest_flush_tid
   1222 */
   1223#define CAP_MSG_SIZE (sizeof(struct ceph_mds_caps) + \
   1224		      4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4)
   1225
   1226/* Marshal up the cap msg to the MDS */
   1227static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg)
   1228{
   1229	struct ceph_mds_caps *fc;
   1230	void *p;
   1231	struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
   1232
   1233	dout("%s %s %llx %llx caps %s wanted %s dirty %s seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu xattr_ver %llu xattr_len %d\n",
   1234	     __func__, ceph_cap_op_name(arg->op), arg->cid, arg->ino,
   1235	     ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted),
   1236	     ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq,
   1237	     arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows,
   1238	     arg->size, arg->max_size, arg->xattr_version,
   1239	     arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
   1240
   1241	msg->hdr.version = cpu_to_le16(10);
   1242	msg->hdr.tid = cpu_to_le64(arg->flush_tid);
   1243
   1244	fc = msg->front.iov_base;
   1245	memset(fc, 0, sizeof(*fc));
   1246
   1247	fc->cap_id = cpu_to_le64(arg->cid);
   1248	fc->op = cpu_to_le32(arg->op);
   1249	fc->seq = cpu_to_le32(arg->seq);
   1250	fc->issue_seq = cpu_to_le32(arg->issue_seq);
   1251	fc->migrate_seq = cpu_to_le32(arg->mseq);
   1252	fc->caps = cpu_to_le32(arg->caps);
   1253	fc->wanted = cpu_to_le32(arg->wanted);
   1254	fc->dirty = cpu_to_le32(arg->dirty);
   1255	fc->ino = cpu_to_le64(arg->ino);
   1256	fc->snap_follows = cpu_to_le64(arg->follows);
   1257
   1258	fc->size = cpu_to_le64(arg->size);
   1259	fc->max_size = cpu_to_le64(arg->max_size);
   1260	ceph_encode_timespec64(&fc->mtime, &arg->mtime);
   1261	ceph_encode_timespec64(&fc->atime, &arg->atime);
   1262	ceph_encode_timespec64(&fc->ctime, &arg->ctime);
   1263	fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
   1264
   1265	fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
   1266	fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
   1267	fc->mode = cpu_to_le32(arg->mode);
   1268
   1269	fc->xattr_version = cpu_to_le64(arg->xattr_version);
   1270	if (arg->xattr_buf) {
   1271		msg->middle = ceph_buffer_get(arg->xattr_buf);
   1272		fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
   1273		msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
   1274	}
   1275
   1276	p = fc + 1;
   1277	/* flock buffer size (version 2) */
   1278	ceph_encode_32(&p, 0);
   1279	/* inline version (version 4) */
   1280	ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
   1281	/* inline data size */
   1282	ceph_encode_32(&p, 0);
   1283	/*
   1284	 * osd_epoch_barrier (version 5)
   1285	 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
   1286	 * case it was recently changed
   1287	 */
   1288	ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
   1289	/* oldest_flush_tid (version 6) */
   1290	ceph_encode_64(&p, arg->oldest_flush_tid);
   1291
   1292	/*
   1293	 * caller_uid/caller_gid (version 7)
   1294	 *
   1295	 * Currently, we don't properly track which caller dirtied the caps
   1296	 * last, and force a flush of them when there is a conflict. For now,
   1297	 * just set this to 0:0, to emulate how the MDS has worked up to now.
   1298	 */
   1299	ceph_encode_32(&p, 0);
   1300	ceph_encode_32(&p, 0);
   1301
   1302	/* pool namespace (version 8) (mds always ignores this) */
   1303	ceph_encode_32(&p, 0);
   1304
   1305	/* btime and change_attr (version 9) */
   1306	ceph_encode_timespec64(p, &arg->btime);
   1307	p += sizeof(struct ceph_timespec);
   1308	ceph_encode_64(&p, arg->change_attr);
   1309
   1310	/* Advisory flags (version 10) */
   1311	ceph_encode_32(&p, arg->flags);
   1312}
   1313
   1314/*
   1315 * Queue cap releases when an inode is dropped from our cache.
   1316 */
   1317void __ceph_remove_caps(struct ceph_inode_info *ci)
   1318{
   1319	struct rb_node *p;
   1320
   1321	/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
   1322	 * may call __ceph_caps_issued_mask() on a freeing inode. */
   1323	spin_lock(&ci->i_ceph_lock);
   1324	p = rb_first(&ci->i_caps);
   1325	while (p) {
   1326		struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
   1327		p = rb_next(p);
   1328		ceph_remove_cap(cap, true);
   1329	}
   1330	spin_unlock(&ci->i_ceph_lock);
   1331}
   1332
   1333/*
   1334 * Prepare to send a cap message to an MDS. Update the cap state, and populate
   1335 * the arg struct with the parameters that will need to be sent. This should
   1336 * be done under the i_ceph_lock to guard against changes to cap state.
   1337 *
   1338 * Make note of max_size reported/requested from mds, revoked caps
   1339 * that have now been implemented.
   1340 */
   1341static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
   1342		       int op, int flags, int used, int want, int retain,
   1343		       int flushing, u64 flush_tid, u64 oldest_flush_tid)
   1344{
   1345	struct ceph_inode_info *ci = cap->ci;
   1346	struct inode *inode = &ci->netfs.inode;
   1347	int held, revoking;
   1348
   1349	lockdep_assert_held(&ci->i_ceph_lock);
   1350
   1351	held = cap->issued | cap->implemented;
   1352	revoking = cap->implemented & ~cap->issued;
   1353	retain &= ~revoking;
   1354
   1355	dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
   1356	     __func__, inode, cap, cap->session,
   1357	     ceph_cap_string(held), ceph_cap_string(held & retain),
   1358	     ceph_cap_string(revoking));
   1359	BUG_ON((retain & CEPH_CAP_PIN) == 0);
   1360
   1361	ci->i_ceph_flags &= ~CEPH_I_FLUSH;
   1362
   1363	cap->issued &= retain;  /* drop bits we don't want */
   1364	/*
   1365	 * Wake up any waiters on wanted -> needed transition. This is due to
   1366	 * the weird transition from buffered to sync IO... we need to flush
   1367	 * dirty pages _before_ allowing sync writes to avoid reordering.
   1368	 */
   1369	arg->wake = cap->implemented & ~cap->issued;
   1370	cap->implemented &= cap->issued | used;
   1371	cap->mds_wanted = want;
   1372
   1373	arg->session = cap->session;
   1374	arg->ino = ceph_vino(inode).ino;
   1375	arg->cid = cap->cap_id;
   1376	arg->follows = flushing ? ci->i_head_snapc->seq : 0;
   1377	arg->flush_tid = flush_tid;
   1378	arg->oldest_flush_tid = oldest_flush_tid;
   1379
   1380	arg->size = i_size_read(inode);
   1381	ci->i_reported_size = arg->size;
   1382	arg->max_size = ci->i_wanted_max_size;
   1383	if (cap == ci->i_auth_cap) {
   1384		if (want & CEPH_CAP_ANY_FILE_WR)
   1385			ci->i_requested_max_size = arg->max_size;
   1386		else
   1387			ci->i_requested_max_size = 0;
   1388	}
   1389
   1390	if (flushing & CEPH_CAP_XATTR_EXCL) {
   1391		arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
   1392		arg->xattr_version = ci->i_xattrs.version;
   1393		arg->xattr_buf = ci->i_xattrs.blob;
   1394	} else {
   1395		arg->xattr_buf = NULL;
   1396		arg->old_xattr_buf = NULL;
   1397	}
   1398
   1399	arg->mtime = inode->i_mtime;
   1400	arg->atime = inode->i_atime;
   1401	arg->ctime = inode->i_ctime;
   1402	arg->btime = ci->i_btime;
   1403	arg->change_attr = inode_peek_iversion_raw(inode);
   1404
   1405	arg->op = op;
   1406	arg->caps = cap->implemented;
   1407	arg->wanted = want;
   1408	arg->dirty = flushing;
   1409
   1410	arg->seq = cap->seq;
   1411	arg->issue_seq = cap->issue_seq;
   1412	arg->mseq = cap->mseq;
   1413	arg->time_warp_seq = ci->i_time_warp_seq;
   1414
   1415	arg->uid = inode->i_uid;
   1416	arg->gid = inode->i_gid;
   1417	arg->mode = inode->i_mode;
   1418
   1419	arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
   1420	if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
   1421	    !list_empty(&ci->i_cap_snaps)) {
   1422		struct ceph_cap_snap *capsnap;
   1423		list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
   1424			if (capsnap->cap_flush.tid)
   1425				break;
   1426			if (capsnap->need_flush) {
   1427				flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
   1428				break;
   1429			}
   1430		}
   1431	}
   1432	arg->flags = flags;
   1433}
   1434
   1435/*
   1436 * Send a cap msg on the given inode.
   1437 *
   1438 * Caller should hold snap_rwsem (read), s_mutex.
   1439 */
   1440static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci)
   1441{
   1442	struct ceph_msg *msg;
   1443	struct inode *inode = &ci->netfs.inode;
   1444
   1445	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
   1446	if (!msg) {
   1447		pr_err("error allocating cap msg: ino (%llx.%llx) flushing %s tid %llu, requeuing cap.\n",
   1448		       ceph_vinop(inode), ceph_cap_string(arg->dirty),
   1449		       arg->flush_tid);
   1450		spin_lock(&ci->i_ceph_lock);
   1451		__cap_delay_requeue(arg->session->s_mdsc, ci);
   1452		spin_unlock(&ci->i_ceph_lock);
   1453		return;
   1454	}
   1455
   1456	encode_cap_msg(msg, arg);
   1457	ceph_con_send(&arg->session->s_con, msg);
   1458	ceph_buffer_put(arg->old_xattr_buf);
   1459	if (arg->wake)
   1460		wake_up_all(&ci->i_cap_wq);
   1461}
   1462
   1463static inline int __send_flush_snap(struct inode *inode,
   1464				    struct ceph_mds_session *session,
   1465				    struct ceph_cap_snap *capsnap,
   1466				    u32 mseq, u64 oldest_flush_tid)
   1467{
   1468	struct cap_msg_args	arg;
   1469	struct ceph_msg		*msg;
   1470
   1471	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
   1472	if (!msg)
   1473		return -ENOMEM;
   1474
   1475	arg.session = session;
   1476	arg.ino = ceph_vino(inode).ino;
   1477	arg.cid = 0;
   1478	arg.follows = capsnap->follows;
   1479	arg.flush_tid = capsnap->cap_flush.tid;
   1480	arg.oldest_flush_tid = oldest_flush_tid;
   1481
   1482	arg.size = capsnap->size;
   1483	arg.max_size = 0;
   1484	arg.xattr_version = capsnap->xattr_version;
   1485	arg.xattr_buf = capsnap->xattr_blob;
   1486	arg.old_xattr_buf = NULL;
   1487
   1488	arg.atime = capsnap->atime;
   1489	arg.mtime = capsnap->mtime;
   1490	arg.ctime = capsnap->ctime;
   1491	arg.btime = capsnap->btime;
   1492	arg.change_attr = capsnap->change_attr;
   1493
   1494	arg.op = CEPH_CAP_OP_FLUSHSNAP;
   1495	arg.caps = capsnap->issued;
   1496	arg.wanted = 0;
   1497	arg.dirty = capsnap->dirty;
   1498
   1499	arg.seq = 0;
   1500	arg.issue_seq = 0;
   1501	arg.mseq = mseq;
   1502	arg.time_warp_seq = capsnap->time_warp_seq;
   1503
   1504	arg.uid = capsnap->uid;
   1505	arg.gid = capsnap->gid;
   1506	arg.mode = capsnap->mode;
   1507
   1508	arg.inline_data = capsnap->inline_data;
   1509	arg.flags = 0;
   1510	arg.wake = false;
   1511
   1512	encode_cap_msg(msg, &arg);
   1513	ceph_con_send(&arg.session->s_con, msg);
   1514	return 0;
   1515}
   1516
   1517/*
   1518 * When a snapshot is taken, clients accumulate dirty metadata on
   1519 * inodes with capabilities in ceph_cap_snaps to describe the file
   1520 * state at the time the snapshot was taken.  This must be flushed
   1521 * asynchronously back to the MDS once sync writes complete and dirty
   1522 * data is written out.
   1523 *
   1524 * Called under i_ceph_lock.
   1525 */
   1526static void __ceph_flush_snaps(struct ceph_inode_info *ci,
   1527			       struct ceph_mds_session *session)
   1528		__releases(ci->i_ceph_lock)
   1529		__acquires(ci->i_ceph_lock)
   1530{
   1531	struct inode *inode = &ci->netfs.inode;
   1532	struct ceph_mds_client *mdsc = session->s_mdsc;
   1533	struct ceph_cap_snap *capsnap;
   1534	u64 oldest_flush_tid = 0;
   1535	u64 first_tid = 1, last_tid = 0;
   1536
   1537	dout("__flush_snaps %p session %p\n", inode, session);
   1538
   1539	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
   1540		/*
   1541		 * we need to wait for sync writes to complete and for dirty
   1542		 * pages to be written out.
   1543		 */
   1544		if (capsnap->dirty_pages || capsnap->writing)
   1545			break;
   1546
   1547		/* should be removed by ceph_try_drop_cap_snap() */
   1548		BUG_ON(!capsnap->need_flush);
   1549
   1550		/* only flush each capsnap once */
   1551		if (capsnap->cap_flush.tid > 0) {
   1552			dout(" already flushed %p, skipping\n", capsnap);
   1553			continue;
   1554		}
   1555
   1556		spin_lock(&mdsc->cap_dirty_lock);
   1557		capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
   1558		list_add_tail(&capsnap->cap_flush.g_list,
   1559			      &mdsc->cap_flush_list);
   1560		if (oldest_flush_tid == 0)
   1561			oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   1562		if (list_empty(&ci->i_flushing_item)) {
   1563			list_add_tail(&ci->i_flushing_item,
   1564				      &session->s_cap_flushing);
   1565		}
   1566		spin_unlock(&mdsc->cap_dirty_lock);
   1567
   1568		list_add_tail(&capsnap->cap_flush.i_list,
   1569			      &ci->i_cap_flush_list);
   1570
   1571		if (first_tid == 1)
   1572			first_tid = capsnap->cap_flush.tid;
   1573		last_tid = capsnap->cap_flush.tid;
   1574	}
   1575
   1576	ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
   1577
   1578	while (first_tid <= last_tid) {
   1579		struct ceph_cap *cap = ci->i_auth_cap;
   1580		struct ceph_cap_flush *cf = NULL, *iter;
   1581		int ret;
   1582
   1583		if (!(cap && cap->session == session)) {
   1584			dout("__flush_snaps %p auth cap %p not mds%d, "
   1585			     "stop\n", inode, cap, session->s_mds);
   1586			break;
   1587		}
   1588
   1589		ret = -ENOENT;
   1590		list_for_each_entry(iter, &ci->i_cap_flush_list, i_list) {
   1591			if (iter->tid >= first_tid) {
   1592				cf = iter;
   1593				ret = 0;
   1594				break;
   1595			}
   1596		}
   1597		if (ret < 0)
   1598			break;
   1599
   1600		first_tid = cf->tid + 1;
   1601
   1602		capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
   1603		refcount_inc(&capsnap->nref);
   1604		spin_unlock(&ci->i_ceph_lock);
   1605
   1606		dout("__flush_snaps %p capsnap %p tid %llu %s\n",
   1607		     inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
   1608
   1609		ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
   1610					oldest_flush_tid);
   1611		if (ret < 0) {
   1612			pr_err("__flush_snaps: error sending cap flushsnap, "
   1613			       "ino (%llx.%llx) tid %llu follows %llu\n",
   1614				ceph_vinop(inode), cf->tid, capsnap->follows);
   1615		}
   1616
   1617		ceph_put_cap_snap(capsnap);
   1618		spin_lock(&ci->i_ceph_lock);
   1619	}
   1620}
   1621
   1622void ceph_flush_snaps(struct ceph_inode_info *ci,
   1623		      struct ceph_mds_session **psession)
   1624{
   1625	struct inode *inode = &ci->netfs.inode;
   1626	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
   1627	struct ceph_mds_session *session = NULL;
   1628	int mds;
   1629
   1630	dout("ceph_flush_snaps %p\n", inode);
   1631	if (psession)
   1632		session = *psession;
   1633retry:
   1634	spin_lock(&ci->i_ceph_lock);
   1635	if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
   1636		dout(" no capsnap needs flush, doing nothing\n");
   1637		goto out;
   1638	}
   1639	if (!ci->i_auth_cap) {
   1640		dout(" no auth cap (migrating?), doing nothing\n");
   1641		goto out;
   1642	}
   1643
   1644	mds = ci->i_auth_cap->session->s_mds;
   1645	if (session && session->s_mds != mds) {
   1646		dout(" oops, wrong session %p mutex\n", session);
   1647		ceph_put_mds_session(session);
   1648		session = NULL;
   1649	}
   1650	if (!session) {
   1651		spin_unlock(&ci->i_ceph_lock);
   1652		mutex_lock(&mdsc->mutex);
   1653		session = __ceph_lookup_mds_session(mdsc, mds);
   1654		mutex_unlock(&mdsc->mutex);
   1655		goto retry;
   1656	}
   1657
   1658	// make sure flushsnap messages are sent in proper order.
   1659	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
   1660		__kick_flushing_caps(mdsc, session, ci, 0);
   1661
   1662	__ceph_flush_snaps(ci, session);
   1663out:
   1664	spin_unlock(&ci->i_ceph_lock);
   1665
   1666	if (psession)
   1667		*psession = session;
   1668	else
   1669		ceph_put_mds_session(session);
   1670	/* we flushed them all; remove this inode from the queue */
   1671	spin_lock(&mdsc->snap_flush_lock);
   1672	list_del_init(&ci->i_snap_flush_item);
   1673	spin_unlock(&mdsc->snap_flush_lock);
   1674}
   1675
   1676/*
   1677 * Mark caps dirty.  If inode is newly dirty, return the dirty flags.
   1678 * Caller is then responsible for calling __mark_inode_dirty with the
   1679 * returned flags value.
   1680 */
   1681int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
   1682			   struct ceph_cap_flush **pcf)
   1683{
   1684	struct ceph_mds_client *mdsc =
   1685		ceph_sb_to_client(ci->netfs.inode.i_sb)->mdsc;
   1686	struct inode *inode = &ci->netfs.inode;
   1687	int was = ci->i_dirty_caps;
   1688	int dirty = 0;
   1689
   1690	lockdep_assert_held(&ci->i_ceph_lock);
   1691
   1692	if (!ci->i_auth_cap) {
   1693		pr_warn("__mark_dirty_caps %p %llx mask %s, "
   1694			"but no auth cap (session was closed?)\n",
   1695			inode, ceph_ino(inode), ceph_cap_string(mask));
   1696		return 0;
   1697	}
   1698
   1699	dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->netfs.inode,
   1700	     ceph_cap_string(mask), ceph_cap_string(was),
   1701	     ceph_cap_string(was | mask));
   1702	ci->i_dirty_caps |= mask;
   1703	if (was == 0) {
   1704		struct ceph_mds_session *session = ci->i_auth_cap->session;
   1705
   1706		WARN_ON_ONCE(ci->i_prealloc_cap_flush);
   1707		swap(ci->i_prealloc_cap_flush, *pcf);
   1708
   1709		if (!ci->i_head_snapc) {
   1710			WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
   1711			ci->i_head_snapc = ceph_get_snap_context(
   1712				ci->i_snap_realm->cached_context);
   1713		}
   1714		dout(" inode %p now dirty snapc %p auth cap %p\n",
   1715		     &ci->netfs.inode, ci->i_head_snapc, ci->i_auth_cap);
   1716		BUG_ON(!list_empty(&ci->i_dirty_item));
   1717		spin_lock(&mdsc->cap_dirty_lock);
   1718		list_add(&ci->i_dirty_item, &session->s_cap_dirty);
   1719		spin_unlock(&mdsc->cap_dirty_lock);
   1720		if (ci->i_flushing_caps == 0) {
   1721			ihold(inode);
   1722			dirty |= I_DIRTY_SYNC;
   1723		}
   1724	} else {
   1725		WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
   1726	}
   1727	BUG_ON(list_empty(&ci->i_dirty_item));
   1728	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
   1729	    (mask & CEPH_CAP_FILE_BUFFER))
   1730		dirty |= I_DIRTY_DATASYNC;
   1731	__cap_delay_requeue(mdsc, ci);
   1732	return dirty;
   1733}
   1734
   1735struct ceph_cap_flush *ceph_alloc_cap_flush(void)
   1736{
   1737	struct ceph_cap_flush *cf;
   1738
   1739	cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
   1740	if (!cf)
   1741		return NULL;
   1742
   1743	cf->is_capsnap = false;
   1744	return cf;
   1745}
   1746
   1747void ceph_free_cap_flush(struct ceph_cap_flush *cf)
   1748{
   1749	if (cf)
   1750		kmem_cache_free(ceph_cap_flush_cachep, cf);
   1751}
   1752
   1753static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
   1754{
   1755	if (!list_empty(&mdsc->cap_flush_list)) {
   1756		struct ceph_cap_flush *cf =
   1757			list_first_entry(&mdsc->cap_flush_list,
   1758					 struct ceph_cap_flush, g_list);
   1759		return cf->tid;
   1760	}
   1761	return 0;
   1762}
   1763
   1764/*
   1765 * Remove cap_flush from the mdsc's or inode's flushing cap list.
   1766 * Return true if caller needs to wake up flush waiters.
   1767 */
   1768static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
   1769					 struct ceph_cap_flush *cf)
   1770{
   1771	struct ceph_cap_flush *prev;
   1772	bool wake = cf->wake;
   1773
   1774	if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
   1775		prev = list_prev_entry(cf, g_list);
   1776		prev->wake = true;
   1777		wake = false;
   1778	}
   1779	list_del_init(&cf->g_list);
   1780	return wake;
   1781}
   1782
   1783static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
   1784				       struct ceph_cap_flush *cf)
   1785{
   1786	struct ceph_cap_flush *prev;
   1787	bool wake = cf->wake;
   1788
   1789	if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
   1790		prev = list_prev_entry(cf, i_list);
   1791		prev->wake = true;
   1792		wake = false;
   1793	}
   1794	list_del_init(&cf->i_list);
   1795	return wake;
   1796}
   1797
   1798/*
   1799 * Add dirty inode to the flushing list.  Assigned a seq number so we
   1800 * can wait for caps to flush without starving.
   1801 *
   1802 * Called under i_ceph_lock. Returns the flush tid.
   1803 */
   1804static u64 __mark_caps_flushing(struct inode *inode,
   1805				struct ceph_mds_session *session, bool wake,
   1806				u64 *oldest_flush_tid)
   1807{
   1808	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   1809	struct ceph_inode_info *ci = ceph_inode(inode);
   1810	struct ceph_cap_flush *cf = NULL;
   1811	int flushing;
   1812
   1813	lockdep_assert_held(&ci->i_ceph_lock);
   1814	BUG_ON(ci->i_dirty_caps == 0);
   1815	BUG_ON(list_empty(&ci->i_dirty_item));
   1816	BUG_ON(!ci->i_prealloc_cap_flush);
   1817
   1818	flushing = ci->i_dirty_caps;
   1819	dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
   1820	     ceph_cap_string(flushing),
   1821	     ceph_cap_string(ci->i_flushing_caps),
   1822	     ceph_cap_string(ci->i_flushing_caps | flushing));
   1823	ci->i_flushing_caps |= flushing;
   1824	ci->i_dirty_caps = 0;
   1825	dout(" inode %p now !dirty\n", inode);
   1826
   1827	swap(cf, ci->i_prealloc_cap_flush);
   1828	cf->caps = flushing;
   1829	cf->wake = wake;
   1830
   1831	spin_lock(&mdsc->cap_dirty_lock);
   1832	list_del_init(&ci->i_dirty_item);
   1833
   1834	cf->tid = ++mdsc->last_cap_flush_tid;
   1835	list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
   1836	*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   1837
   1838	if (list_empty(&ci->i_flushing_item)) {
   1839		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
   1840		mdsc->num_cap_flushing++;
   1841	}
   1842	spin_unlock(&mdsc->cap_dirty_lock);
   1843
   1844	list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
   1845
   1846	return cf->tid;
   1847}
   1848
   1849/*
   1850 * try to invalidate mapping pages without blocking.
   1851 */
   1852static int try_nonblocking_invalidate(struct inode *inode)
   1853	__releases(ci->i_ceph_lock)
   1854	__acquires(ci->i_ceph_lock)
   1855{
   1856	struct ceph_inode_info *ci = ceph_inode(inode);
   1857	u32 invalidating_gen = ci->i_rdcache_gen;
   1858
   1859	spin_unlock(&ci->i_ceph_lock);
   1860	ceph_fscache_invalidate(inode, false);
   1861	invalidate_mapping_pages(&inode->i_data, 0, -1);
   1862	spin_lock(&ci->i_ceph_lock);
   1863
   1864	if (inode->i_data.nrpages == 0 &&
   1865	    invalidating_gen == ci->i_rdcache_gen) {
   1866		/* success. */
   1867		dout("try_nonblocking_invalidate %p success\n", inode);
   1868		/* save any racing async invalidate some trouble */
   1869		ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
   1870		return 0;
   1871	}
   1872	dout("try_nonblocking_invalidate %p failed\n", inode);
   1873	return -1;
   1874}
   1875
   1876bool __ceph_should_report_size(struct ceph_inode_info *ci)
   1877{
   1878	loff_t size = i_size_read(&ci->netfs.inode);
   1879	/* mds will adjust max size according to the reported size */
   1880	if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
   1881		return false;
   1882	if (size >= ci->i_max_size)
   1883		return true;
   1884	/* half of previous max_size increment has been used */
   1885	if (ci->i_max_size > ci->i_reported_size &&
   1886	    (size << 1) >= ci->i_max_size + ci->i_reported_size)
   1887		return true;
   1888	return false;
   1889}
   1890
   1891/*
   1892 * Swiss army knife function to examine currently used and wanted
   1893 * versus held caps.  Release, flush, ack revoked caps to mds as
   1894 * appropriate.
   1895 *
   1896 *  CHECK_CAPS_AUTHONLY - we should only check the auth cap
   1897 *  CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
   1898 *    further delay.
   1899 */
   1900void ceph_check_caps(struct ceph_inode_info *ci, int flags,
   1901		     struct ceph_mds_session *session)
   1902{
   1903	struct inode *inode = &ci->netfs.inode;
   1904	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
   1905	struct ceph_cap *cap;
   1906	u64 flush_tid, oldest_flush_tid;
   1907	int file_wanted, used, cap_used;
   1908	int issued, implemented, want, retain, revoking, flushing = 0;
   1909	int mds = -1;   /* keep track of how far we've gone through i_caps list
   1910			   to avoid an infinite loop on retry */
   1911	struct rb_node *p;
   1912	bool queue_invalidate = false;
   1913	bool tried_invalidate = false;
   1914	bool queue_writeback = false;
   1915
   1916	if (session)
   1917		ceph_get_mds_session(session);
   1918
   1919	spin_lock(&ci->i_ceph_lock);
   1920	if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
   1921		/* Don't send messages until we get async create reply */
   1922		spin_unlock(&ci->i_ceph_lock);
   1923		ceph_put_mds_session(session);
   1924		return;
   1925	}
   1926
   1927	if (ci->i_ceph_flags & CEPH_I_FLUSH)
   1928		flags |= CHECK_CAPS_FLUSH;
   1929retry:
   1930	/* Caps wanted by virtue of active open files. */
   1931	file_wanted = __ceph_caps_file_wanted(ci);
   1932
   1933	/* Caps which have active references against them */
   1934	used = __ceph_caps_used(ci);
   1935
   1936	/*
   1937	 * "issued" represents the current caps that the MDS wants us to have.
   1938	 * "implemented" is the set that we have been granted, and includes the
   1939	 * ones that have not yet been returned to the MDS (the "revoking" set,
   1940	 * usually because they have outstanding references).
   1941	 */
   1942	issued = __ceph_caps_issued(ci, &implemented);
   1943	revoking = implemented & ~issued;
   1944
   1945	want = file_wanted;
   1946
   1947	/* The ones we currently want to retain (may be adjusted below) */
   1948	retain = file_wanted | used | CEPH_CAP_PIN;
   1949	if (!mdsc->stopping && inode->i_nlink > 0) {
   1950		if (file_wanted) {
   1951			retain |= CEPH_CAP_ANY;       /* be greedy */
   1952		} else if (S_ISDIR(inode->i_mode) &&
   1953			   (issued & CEPH_CAP_FILE_SHARED) &&
   1954			   __ceph_dir_is_complete(ci)) {
   1955			/*
   1956			 * If a directory is complete, we want to keep
   1957			 * the exclusive cap. So that MDS does not end up
   1958			 * revoking the shared cap on every create/unlink
   1959			 * operation.
   1960			 */
   1961			if (IS_RDONLY(inode)) {
   1962				want = CEPH_CAP_ANY_SHARED;
   1963			} else {
   1964				want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
   1965			}
   1966			retain |= want;
   1967		} else {
   1968
   1969			retain |= CEPH_CAP_ANY_SHARED;
   1970			/*
   1971			 * keep RD only if we didn't have the file open RW,
   1972			 * because then the mds would revoke it anyway to
   1973			 * journal max_size=0.
   1974			 */
   1975			if (ci->i_max_size == 0)
   1976				retain |= CEPH_CAP_ANY_RD;
   1977		}
   1978	}
   1979
   1980	dout("check_caps %llx.%llx file_want %s used %s dirty %s flushing %s"
   1981	     " issued %s revoking %s retain %s %s%s\n", ceph_vinop(inode),
   1982	     ceph_cap_string(file_wanted),
   1983	     ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
   1984	     ceph_cap_string(ci->i_flushing_caps),
   1985	     ceph_cap_string(issued), ceph_cap_string(revoking),
   1986	     ceph_cap_string(retain),
   1987	     (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
   1988	     (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
   1989
   1990	/*
   1991	 * If we no longer need to hold onto old our caps, and we may
   1992	 * have cached pages, but don't want them, then try to invalidate.
   1993	 * If we fail, it's because pages are locked.... try again later.
   1994	 */
   1995	if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
   1996	    S_ISREG(inode->i_mode) &&
   1997	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
   1998	    inode->i_data.nrpages &&		/* have cached pages */
   1999	    (revoking & (CEPH_CAP_FILE_CACHE|
   2000			 CEPH_CAP_FILE_LAZYIO)) && /*  or revoking cache */
   2001	    !tried_invalidate) {
   2002		dout("check_caps trying to invalidate on %llx.%llx\n",
   2003		     ceph_vinop(inode));
   2004		if (try_nonblocking_invalidate(inode) < 0) {
   2005			dout("check_caps queuing invalidate\n");
   2006			queue_invalidate = true;
   2007			ci->i_rdcache_revoking = ci->i_rdcache_gen;
   2008		}
   2009		tried_invalidate = true;
   2010		goto retry;
   2011	}
   2012
   2013	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
   2014		int mflags = 0;
   2015		struct cap_msg_args arg;
   2016
   2017		cap = rb_entry(p, struct ceph_cap, ci_node);
   2018
   2019		/* avoid looping forever */
   2020		if (mds >= cap->mds ||
   2021		    ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
   2022			continue;
   2023
   2024		/*
   2025		 * If we have an auth cap, we don't need to consider any
   2026		 * overlapping caps as used.
   2027		 */
   2028		cap_used = used;
   2029		if (ci->i_auth_cap && cap != ci->i_auth_cap)
   2030			cap_used &= ~ci->i_auth_cap->issued;
   2031
   2032		revoking = cap->implemented & ~cap->issued;
   2033		dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
   2034		     cap->mds, cap, ceph_cap_string(cap_used),
   2035		     ceph_cap_string(cap->issued),
   2036		     ceph_cap_string(cap->implemented),
   2037		     ceph_cap_string(revoking));
   2038
   2039		if (cap == ci->i_auth_cap &&
   2040		    (cap->issued & CEPH_CAP_FILE_WR)) {
   2041			/* request larger max_size from MDS? */
   2042			if (ci->i_wanted_max_size > ci->i_max_size &&
   2043			    ci->i_wanted_max_size > ci->i_requested_max_size) {
   2044				dout("requesting new max_size\n");
   2045				goto ack;
   2046			}
   2047
   2048			/* approaching file_max? */
   2049			if (__ceph_should_report_size(ci)) {
   2050				dout("i_size approaching max_size\n");
   2051				goto ack;
   2052			}
   2053		}
   2054		/* flush anything dirty? */
   2055		if (cap == ci->i_auth_cap) {
   2056			if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
   2057				dout("flushing dirty caps\n");
   2058				goto ack;
   2059			}
   2060			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
   2061				dout("flushing snap caps\n");
   2062				goto ack;
   2063			}
   2064		}
   2065
   2066		/* completed revocation? going down and there are no caps? */
   2067		if (revoking) {
   2068			if ((revoking & cap_used) == 0) {
   2069				dout("completed revocation of %s\n",
   2070				      ceph_cap_string(cap->implemented & ~cap->issued));
   2071				goto ack;
   2072			}
   2073
   2074			/*
   2075			 * If the "i_wrbuffer_ref" was increased by mmap or generic
   2076			 * cache write just before the ceph_check_caps() is called,
   2077			 * the Fb capability revoking will fail this time. Then we
   2078			 * must wait for the BDI's delayed work to flush the dirty
   2079			 * pages and to release the "i_wrbuffer_ref", which will cost
   2080			 * at most 5 seconds. That means the MDS needs to wait at
   2081			 * most 5 seconds to finished the Fb capability's revocation.
   2082			 *
   2083			 * Let's queue a writeback for it.
   2084			 */
   2085			if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
   2086			    (revoking & CEPH_CAP_FILE_BUFFER))
   2087				queue_writeback = true;
   2088		}
   2089
   2090		/* want more caps from mds? */
   2091		if (want & ~cap->mds_wanted) {
   2092			if (want & ~(cap->mds_wanted | cap->issued))
   2093				goto ack;
   2094			if (!__cap_is_valid(cap))
   2095				goto ack;
   2096		}
   2097
   2098		/* things we might delay */
   2099		if ((cap->issued & ~retain) == 0)
   2100			continue;     /* nope, all good */
   2101
   2102ack:
   2103		ceph_put_mds_session(session);
   2104		session = ceph_get_mds_session(cap->session);
   2105
   2106		/* kick flushing and flush snaps before sending normal
   2107		 * cap message */
   2108		if (cap == ci->i_auth_cap &&
   2109		    (ci->i_ceph_flags &
   2110		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
   2111			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
   2112				__kick_flushing_caps(mdsc, session, ci, 0);
   2113			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
   2114				__ceph_flush_snaps(ci, session);
   2115
   2116			goto retry;
   2117		}
   2118
   2119		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
   2120			flushing = ci->i_dirty_caps;
   2121			flush_tid = __mark_caps_flushing(inode, session, false,
   2122							 &oldest_flush_tid);
   2123			if (flags & CHECK_CAPS_FLUSH &&
   2124			    list_empty(&session->s_cap_dirty))
   2125				mflags |= CEPH_CLIENT_CAPS_SYNC;
   2126		} else {
   2127			flushing = 0;
   2128			flush_tid = 0;
   2129			spin_lock(&mdsc->cap_dirty_lock);
   2130			oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   2131			spin_unlock(&mdsc->cap_dirty_lock);
   2132		}
   2133
   2134		mds = cap->mds;  /* remember mds, so we don't repeat */
   2135
   2136		__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
   2137			   want, retain, flushing, flush_tid, oldest_flush_tid);
   2138
   2139		spin_unlock(&ci->i_ceph_lock);
   2140		__send_cap(&arg, ci);
   2141		spin_lock(&ci->i_ceph_lock);
   2142
   2143		goto retry; /* retake i_ceph_lock and restart our cap scan. */
   2144	}
   2145
   2146	/* periodically re-calculate caps wanted by open files */
   2147	if (__ceph_is_any_real_caps(ci) &&
   2148	    list_empty(&ci->i_cap_delay_list) &&
   2149	    (file_wanted & ~CEPH_CAP_PIN) &&
   2150	    !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
   2151		__cap_delay_requeue(mdsc, ci);
   2152	}
   2153
   2154	spin_unlock(&ci->i_ceph_lock);
   2155
   2156	ceph_put_mds_session(session);
   2157	if (queue_writeback)
   2158		ceph_queue_writeback(inode);
   2159	if (queue_invalidate)
   2160		ceph_queue_invalidate(inode);
   2161}
   2162
   2163/*
   2164 * Try to flush dirty caps back to the auth mds.
   2165 */
   2166static int try_flush_caps(struct inode *inode, u64 *ptid)
   2167{
   2168	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   2169	struct ceph_inode_info *ci = ceph_inode(inode);
   2170	int flushing = 0;
   2171	u64 flush_tid = 0, oldest_flush_tid = 0;
   2172
   2173	spin_lock(&ci->i_ceph_lock);
   2174retry_locked:
   2175	if (ci->i_dirty_caps && ci->i_auth_cap) {
   2176		struct ceph_cap *cap = ci->i_auth_cap;
   2177		struct cap_msg_args arg;
   2178		struct ceph_mds_session *session = cap->session;
   2179
   2180		if (session->s_state < CEPH_MDS_SESSION_OPEN) {
   2181			spin_unlock(&ci->i_ceph_lock);
   2182			goto out;
   2183		}
   2184
   2185		if (ci->i_ceph_flags &
   2186		    (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
   2187			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
   2188				__kick_flushing_caps(mdsc, session, ci, 0);
   2189			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
   2190				__ceph_flush_snaps(ci, session);
   2191			goto retry_locked;
   2192		}
   2193
   2194		flushing = ci->i_dirty_caps;
   2195		flush_tid = __mark_caps_flushing(inode, session, true,
   2196						 &oldest_flush_tid);
   2197
   2198		__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
   2199			   __ceph_caps_used(ci), __ceph_caps_wanted(ci),
   2200			   (cap->issued | cap->implemented),
   2201			   flushing, flush_tid, oldest_flush_tid);
   2202		spin_unlock(&ci->i_ceph_lock);
   2203
   2204		__send_cap(&arg, ci);
   2205	} else {
   2206		if (!list_empty(&ci->i_cap_flush_list)) {
   2207			struct ceph_cap_flush *cf =
   2208				list_last_entry(&ci->i_cap_flush_list,
   2209						struct ceph_cap_flush, i_list);
   2210			cf->wake = true;
   2211			flush_tid = cf->tid;
   2212		}
   2213		flushing = ci->i_flushing_caps;
   2214		spin_unlock(&ci->i_ceph_lock);
   2215	}
   2216out:
   2217	*ptid = flush_tid;
   2218	return flushing;
   2219}
   2220
   2221/*
   2222 * Return true if we've flushed caps through the given flush_tid.
   2223 */
   2224static int caps_are_flushed(struct inode *inode, u64 flush_tid)
   2225{
   2226	struct ceph_inode_info *ci = ceph_inode(inode);
   2227	int ret = 1;
   2228
   2229	spin_lock(&ci->i_ceph_lock);
   2230	if (!list_empty(&ci->i_cap_flush_list)) {
   2231		struct ceph_cap_flush * cf =
   2232			list_first_entry(&ci->i_cap_flush_list,
   2233					 struct ceph_cap_flush, i_list);
   2234		if (cf->tid <= flush_tid)
   2235			ret = 0;
   2236	}
   2237	spin_unlock(&ci->i_ceph_lock);
   2238	return ret;
   2239}
   2240
   2241/*
   2242 * flush the mdlog and wait for any unsafe requests to complete.
   2243 */
   2244static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode)
   2245{
   2246	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   2247	struct ceph_inode_info *ci = ceph_inode(inode);
   2248	struct ceph_mds_request *req1 = NULL, *req2 = NULL;
   2249	unsigned int max_sessions;
   2250	int ret, err = 0;
   2251
   2252	spin_lock(&ci->i_unsafe_lock);
   2253	if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
   2254		req1 = list_last_entry(&ci->i_unsafe_dirops,
   2255					struct ceph_mds_request,
   2256					r_unsafe_dir_item);
   2257		ceph_mdsc_get_request(req1);
   2258	}
   2259	if (!list_empty(&ci->i_unsafe_iops)) {
   2260		req2 = list_last_entry(&ci->i_unsafe_iops,
   2261					struct ceph_mds_request,
   2262					r_unsafe_target_item);
   2263		ceph_mdsc_get_request(req2);
   2264	}
   2265	spin_unlock(&ci->i_unsafe_lock);
   2266
   2267	/*
   2268	 * The mdsc->max_sessions is unlikely to be changed
   2269	 * mostly, here we will retry it by reallocating the
   2270	 * sessions array memory to get rid of the mdsc->mutex
   2271	 * lock.
   2272	 */
   2273retry:
   2274	max_sessions = mdsc->max_sessions;
   2275
   2276	/*
   2277	 * Trigger to flush the journal logs in all the relevant MDSes
   2278	 * manually, or in the worst case we must wait at most 5 seconds
   2279	 * to wait the journal logs to be flushed by the MDSes periodically.
   2280	 */
   2281	if ((req1 || req2) && likely(max_sessions)) {
   2282		struct ceph_mds_session **sessions = NULL;
   2283		struct ceph_mds_session *s;
   2284		struct ceph_mds_request *req;
   2285		int i;
   2286
   2287		sessions = kzalloc(max_sessions * sizeof(s), GFP_KERNEL);
   2288		if (!sessions) {
   2289			err = -ENOMEM;
   2290			goto out;
   2291		}
   2292
   2293		spin_lock(&ci->i_unsafe_lock);
   2294		if (req1) {
   2295			list_for_each_entry(req, &ci->i_unsafe_dirops,
   2296					    r_unsafe_dir_item) {
   2297				s = req->r_session;
   2298				if (!s)
   2299					continue;
   2300				if (unlikely(s->s_mds >= max_sessions)) {
   2301					spin_unlock(&ci->i_unsafe_lock);
   2302					for (i = 0; i < max_sessions; i++) {
   2303						s = sessions[i];
   2304						if (s)
   2305							ceph_put_mds_session(s);
   2306					}
   2307					kfree(sessions);
   2308					goto retry;
   2309				}
   2310				if (!sessions[s->s_mds]) {
   2311					s = ceph_get_mds_session(s);
   2312					sessions[s->s_mds] = s;
   2313				}
   2314			}
   2315		}
   2316		if (req2) {
   2317			list_for_each_entry(req, &ci->i_unsafe_iops,
   2318					    r_unsafe_target_item) {
   2319				s = req->r_session;
   2320				if (!s)
   2321					continue;
   2322				if (unlikely(s->s_mds >= max_sessions)) {
   2323					spin_unlock(&ci->i_unsafe_lock);
   2324					for (i = 0; i < max_sessions; i++) {
   2325						s = sessions[i];
   2326						if (s)
   2327							ceph_put_mds_session(s);
   2328					}
   2329					kfree(sessions);
   2330					goto retry;
   2331				}
   2332				if (!sessions[s->s_mds]) {
   2333					s = ceph_get_mds_session(s);
   2334					sessions[s->s_mds] = s;
   2335				}
   2336			}
   2337		}
   2338		spin_unlock(&ci->i_unsafe_lock);
   2339
   2340		/* the auth MDS */
   2341		spin_lock(&ci->i_ceph_lock);
   2342		if (ci->i_auth_cap) {
   2343		      s = ci->i_auth_cap->session;
   2344		      if (!sessions[s->s_mds])
   2345			      sessions[s->s_mds] = ceph_get_mds_session(s);
   2346		}
   2347		spin_unlock(&ci->i_ceph_lock);
   2348
   2349		/* send flush mdlog request to MDSes */
   2350		for (i = 0; i < max_sessions; i++) {
   2351			s = sessions[i];
   2352			if (s) {
   2353				send_flush_mdlog(s);
   2354				ceph_put_mds_session(s);
   2355			}
   2356		}
   2357		kfree(sessions);
   2358	}
   2359
   2360	dout("%s %p wait on tid %llu %llu\n", __func__,
   2361	     inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
   2362	if (req1) {
   2363		ret = !wait_for_completion_timeout(&req1->r_safe_completion,
   2364					ceph_timeout_jiffies(req1->r_timeout));
   2365		if (ret)
   2366			err = -EIO;
   2367	}
   2368	if (req2) {
   2369		ret = !wait_for_completion_timeout(&req2->r_safe_completion,
   2370					ceph_timeout_jiffies(req2->r_timeout));
   2371		if (ret)
   2372			err = -EIO;
   2373	}
   2374
   2375out:
   2376	if (req1)
   2377		ceph_mdsc_put_request(req1);
   2378	if (req2)
   2379		ceph_mdsc_put_request(req2);
   2380	return err;
   2381}
   2382
   2383int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
   2384{
   2385	struct inode *inode = file->f_mapping->host;
   2386	struct ceph_inode_info *ci = ceph_inode(inode);
   2387	u64 flush_tid;
   2388	int ret, err;
   2389	int dirty;
   2390
   2391	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
   2392
   2393	ret = file_write_and_wait_range(file, start, end);
   2394	if (datasync)
   2395		goto out;
   2396
   2397	ret = ceph_wait_on_async_create(inode);
   2398	if (ret)
   2399		goto out;
   2400
   2401	dirty = try_flush_caps(inode, &flush_tid);
   2402	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
   2403
   2404	err = flush_mdlog_and_wait_inode_unsafe_requests(inode);
   2405
   2406	/*
   2407	 * only wait on non-file metadata writeback (the mds
   2408	 * can recover size and mtime, so we don't need to
   2409	 * wait for that)
   2410	 */
   2411	if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
   2412		err = wait_event_interruptible(ci->i_cap_wq,
   2413					caps_are_flushed(inode, flush_tid));
   2414	}
   2415
   2416	if (err < 0)
   2417		ret = err;
   2418
   2419	err = file_check_and_advance_wb_err(file);
   2420	if (err < 0)
   2421		ret = err;
   2422out:
   2423	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
   2424	return ret;
   2425}
   2426
   2427/*
   2428 * Flush any dirty caps back to the mds.  If we aren't asked to wait,
   2429 * queue inode for flush but don't do so immediately, because we can
   2430 * get by with fewer MDS messages if we wait for data writeback to
   2431 * complete first.
   2432 */
   2433int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
   2434{
   2435	struct ceph_inode_info *ci = ceph_inode(inode);
   2436	u64 flush_tid;
   2437	int err = 0;
   2438	int dirty;
   2439	int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
   2440
   2441	dout("write_inode %p wait=%d\n", inode, wait);
   2442	ceph_fscache_unpin_writeback(inode, wbc);
   2443	if (wait) {
   2444		err = ceph_wait_on_async_create(inode);
   2445		if (err)
   2446			return err;
   2447		dirty = try_flush_caps(inode, &flush_tid);
   2448		if (dirty)
   2449			err = wait_event_interruptible(ci->i_cap_wq,
   2450				       caps_are_flushed(inode, flush_tid));
   2451	} else {
   2452		struct ceph_mds_client *mdsc =
   2453			ceph_sb_to_client(inode->i_sb)->mdsc;
   2454
   2455		spin_lock(&ci->i_ceph_lock);
   2456		if (__ceph_caps_dirty(ci))
   2457			__cap_delay_requeue_front(mdsc, ci);
   2458		spin_unlock(&ci->i_ceph_lock);
   2459	}
   2460	return err;
   2461}
   2462
   2463static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
   2464				 struct ceph_mds_session *session,
   2465				 struct ceph_inode_info *ci,
   2466				 u64 oldest_flush_tid)
   2467	__releases(ci->i_ceph_lock)
   2468	__acquires(ci->i_ceph_lock)
   2469{
   2470	struct inode *inode = &ci->netfs.inode;
   2471	struct ceph_cap *cap;
   2472	struct ceph_cap_flush *cf;
   2473	int ret;
   2474	u64 first_tid = 0;
   2475	u64 last_snap_flush = 0;
   2476
   2477	/* Don't do anything until create reply comes in */
   2478	if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE)
   2479		return;
   2480
   2481	ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
   2482
   2483	list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
   2484		if (cf->is_capsnap) {
   2485			last_snap_flush = cf->tid;
   2486			break;
   2487		}
   2488	}
   2489
   2490	list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
   2491		if (cf->tid < first_tid)
   2492			continue;
   2493
   2494		cap = ci->i_auth_cap;
   2495		if (!(cap && cap->session == session)) {
   2496			pr_err("%p auth cap %p not mds%d ???\n",
   2497			       inode, cap, session->s_mds);
   2498			break;
   2499		}
   2500
   2501		first_tid = cf->tid + 1;
   2502
   2503		if (!cf->is_capsnap) {
   2504			struct cap_msg_args arg;
   2505
   2506			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
   2507			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
   2508			__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
   2509					 (cf->tid < last_snap_flush ?
   2510					  CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
   2511					  __ceph_caps_used(ci),
   2512					  __ceph_caps_wanted(ci),
   2513					  (cap->issued | cap->implemented),
   2514					  cf->caps, cf->tid, oldest_flush_tid);
   2515			spin_unlock(&ci->i_ceph_lock);
   2516			__send_cap(&arg, ci);
   2517		} else {
   2518			struct ceph_cap_snap *capsnap =
   2519					container_of(cf, struct ceph_cap_snap,
   2520						    cap_flush);
   2521			dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
   2522			     inode, capsnap, cf->tid,
   2523			     ceph_cap_string(capsnap->dirty));
   2524
   2525			refcount_inc(&capsnap->nref);
   2526			spin_unlock(&ci->i_ceph_lock);
   2527
   2528			ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
   2529						oldest_flush_tid);
   2530			if (ret < 0) {
   2531				pr_err("kick_flushing_caps: error sending "
   2532					"cap flushsnap, ino (%llx.%llx) "
   2533					"tid %llu follows %llu\n",
   2534					ceph_vinop(inode), cf->tid,
   2535					capsnap->follows);
   2536			}
   2537
   2538			ceph_put_cap_snap(capsnap);
   2539		}
   2540
   2541		spin_lock(&ci->i_ceph_lock);
   2542	}
   2543}
   2544
   2545void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
   2546				   struct ceph_mds_session *session)
   2547{
   2548	struct ceph_inode_info *ci;
   2549	struct ceph_cap *cap;
   2550	u64 oldest_flush_tid;
   2551
   2552	dout("early_kick_flushing_caps mds%d\n", session->s_mds);
   2553
   2554	spin_lock(&mdsc->cap_dirty_lock);
   2555	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   2556	spin_unlock(&mdsc->cap_dirty_lock);
   2557
   2558	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
   2559		spin_lock(&ci->i_ceph_lock);
   2560		cap = ci->i_auth_cap;
   2561		if (!(cap && cap->session == session)) {
   2562			pr_err("%p auth cap %p not mds%d ???\n",
   2563				&ci->netfs.inode, cap, session->s_mds);
   2564			spin_unlock(&ci->i_ceph_lock);
   2565			continue;
   2566		}
   2567
   2568
   2569		/*
   2570		 * if flushing caps were revoked, we re-send the cap flush
   2571		 * in client reconnect stage. This guarantees MDS * processes
   2572		 * the cap flush message before issuing the flushing caps to
   2573		 * other client.
   2574		 */
   2575		if ((cap->issued & ci->i_flushing_caps) !=
   2576		    ci->i_flushing_caps) {
   2577			/* encode_caps_cb() also will reset these sequence
   2578			 * numbers. make sure sequence numbers in cap flush
   2579			 * message match later reconnect message */
   2580			cap->seq = 0;
   2581			cap->issue_seq = 0;
   2582			cap->mseq = 0;
   2583			__kick_flushing_caps(mdsc, session, ci,
   2584					     oldest_flush_tid);
   2585		} else {
   2586			ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
   2587		}
   2588
   2589		spin_unlock(&ci->i_ceph_lock);
   2590	}
   2591}
   2592
   2593void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
   2594			     struct ceph_mds_session *session)
   2595{
   2596	struct ceph_inode_info *ci;
   2597	struct ceph_cap *cap;
   2598	u64 oldest_flush_tid;
   2599
   2600	lockdep_assert_held(&session->s_mutex);
   2601
   2602	dout("kick_flushing_caps mds%d\n", session->s_mds);
   2603
   2604	spin_lock(&mdsc->cap_dirty_lock);
   2605	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   2606	spin_unlock(&mdsc->cap_dirty_lock);
   2607
   2608	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
   2609		spin_lock(&ci->i_ceph_lock);
   2610		cap = ci->i_auth_cap;
   2611		if (!(cap && cap->session == session)) {
   2612			pr_err("%p auth cap %p not mds%d ???\n",
   2613				&ci->netfs.inode, cap, session->s_mds);
   2614			spin_unlock(&ci->i_ceph_lock);
   2615			continue;
   2616		}
   2617		if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
   2618			__kick_flushing_caps(mdsc, session, ci,
   2619					     oldest_flush_tid);
   2620		}
   2621		spin_unlock(&ci->i_ceph_lock);
   2622	}
   2623}
   2624
   2625void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
   2626				   struct ceph_inode_info *ci)
   2627{
   2628	struct ceph_mds_client *mdsc = session->s_mdsc;
   2629	struct ceph_cap *cap = ci->i_auth_cap;
   2630
   2631	lockdep_assert_held(&ci->i_ceph_lock);
   2632
   2633	dout("%s %p flushing %s\n", __func__, &ci->netfs.inode,
   2634	     ceph_cap_string(ci->i_flushing_caps));
   2635
   2636	if (!list_empty(&ci->i_cap_flush_list)) {
   2637		u64 oldest_flush_tid;
   2638		spin_lock(&mdsc->cap_dirty_lock);
   2639		list_move_tail(&ci->i_flushing_item,
   2640			       &cap->session->s_cap_flushing);
   2641		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   2642		spin_unlock(&mdsc->cap_dirty_lock);
   2643
   2644		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
   2645	}
   2646}
   2647
   2648
   2649/*
   2650 * Take references to capabilities we hold, so that we don't release
   2651 * them to the MDS prematurely.
   2652 */
   2653void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
   2654			    bool snap_rwsem_locked)
   2655{
   2656	lockdep_assert_held(&ci->i_ceph_lock);
   2657
   2658	if (got & CEPH_CAP_PIN)
   2659		ci->i_pin_ref++;
   2660	if (got & CEPH_CAP_FILE_RD)
   2661		ci->i_rd_ref++;
   2662	if (got & CEPH_CAP_FILE_CACHE)
   2663		ci->i_rdcache_ref++;
   2664	if (got & CEPH_CAP_FILE_EXCL)
   2665		ci->i_fx_ref++;
   2666	if (got & CEPH_CAP_FILE_WR) {
   2667		if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
   2668			BUG_ON(!snap_rwsem_locked);
   2669			ci->i_head_snapc = ceph_get_snap_context(
   2670					ci->i_snap_realm->cached_context);
   2671		}
   2672		ci->i_wr_ref++;
   2673	}
   2674	if (got & CEPH_CAP_FILE_BUFFER) {
   2675		if (ci->i_wb_ref == 0)
   2676			ihold(&ci->netfs.inode);
   2677		ci->i_wb_ref++;
   2678		dout("%s %p wb %d -> %d (?)\n", __func__,
   2679		     &ci->netfs.inode, ci->i_wb_ref-1, ci->i_wb_ref);
   2680	}
   2681}
   2682
   2683/*
   2684 * Try to grab cap references.  Specify those refs we @want, and the
   2685 * minimal set we @need.  Also include the larger offset we are writing
   2686 * to (when applicable), and check against max_size here as well.
   2687 * Note that caller is responsible for ensuring max_size increases are
   2688 * requested from the MDS.
   2689 *
   2690 * Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
   2691 * or a negative error code. There are 3 speical error codes:
   2692 *  -EAGAIN:  need to sleep but non-blocking is specified
   2693 *  -EFBIG:   ask caller to call check_max_size() and try again.
   2694 *  -EUCLEAN: ask caller to call ceph_renew_caps() and try again.
   2695 */
   2696enum {
   2697	/* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
   2698	NON_BLOCKING	= (1 << 8),
   2699	CHECK_FILELOCK	= (1 << 9),
   2700};
   2701
   2702static int try_get_cap_refs(struct inode *inode, int need, int want,
   2703			    loff_t endoff, int flags, int *got)
   2704{
   2705	struct ceph_inode_info *ci = ceph_inode(inode);
   2706	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
   2707	int ret = 0;
   2708	int have, implemented;
   2709	bool snap_rwsem_locked = false;
   2710
   2711	dout("get_cap_refs %p need %s want %s\n", inode,
   2712	     ceph_cap_string(need), ceph_cap_string(want));
   2713
   2714again:
   2715	spin_lock(&ci->i_ceph_lock);
   2716
   2717	if ((flags & CHECK_FILELOCK) &&
   2718	    (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
   2719		dout("try_get_cap_refs %p error filelock\n", inode);
   2720		ret = -EIO;
   2721		goto out_unlock;
   2722	}
   2723
   2724	/* finish pending truncate */
   2725	while (ci->i_truncate_pending) {
   2726		spin_unlock(&ci->i_ceph_lock);
   2727		if (snap_rwsem_locked) {
   2728			up_read(&mdsc->snap_rwsem);
   2729			snap_rwsem_locked = false;
   2730		}
   2731		__ceph_do_pending_vmtruncate(inode);
   2732		spin_lock(&ci->i_ceph_lock);
   2733	}
   2734
   2735	have = __ceph_caps_issued(ci, &implemented);
   2736
   2737	if (have & need & CEPH_CAP_FILE_WR) {
   2738		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
   2739			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
   2740			     inode, endoff, ci->i_max_size);
   2741			if (endoff > ci->i_requested_max_size)
   2742				ret = ci->i_auth_cap ? -EFBIG : -EUCLEAN;
   2743			goto out_unlock;
   2744		}
   2745		/*
   2746		 * If a sync write is in progress, we must wait, so that we
   2747		 * can get a final snapshot value for size+mtime.
   2748		 */
   2749		if (__ceph_have_pending_cap_snap(ci)) {
   2750			dout("get_cap_refs %p cap_snap_pending\n", inode);
   2751			goto out_unlock;
   2752		}
   2753	}
   2754
   2755	if ((have & need) == need) {
   2756		/*
   2757		 * Look at (implemented & ~have & not) so that we keep waiting
   2758		 * on transition from wanted -> needed caps.  This is needed
   2759		 * for WRBUFFER|WR -> WR to avoid a new WR sync write from
   2760		 * going before a prior buffered writeback happens.
   2761		 */
   2762		int not = want & ~(have & need);
   2763		int revoking = implemented & ~have;
   2764		dout("get_cap_refs %p have %s but not %s (revoking %s)\n",
   2765		     inode, ceph_cap_string(have), ceph_cap_string(not),
   2766		     ceph_cap_string(revoking));
   2767		if ((revoking & not) == 0) {
   2768			if (!snap_rwsem_locked &&
   2769			    !ci->i_head_snapc &&
   2770			    (need & CEPH_CAP_FILE_WR)) {
   2771				if (!down_read_trylock(&mdsc->snap_rwsem)) {
   2772					/*
   2773					 * we can not call down_read() when
   2774					 * task isn't in TASK_RUNNING state
   2775					 */
   2776					if (flags & NON_BLOCKING) {
   2777						ret = -EAGAIN;
   2778						goto out_unlock;
   2779					}
   2780
   2781					spin_unlock(&ci->i_ceph_lock);
   2782					down_read(&mdsc->snap_rwsem);
   2783					snap_rwsem_locked = true;
   2784					goto again;
   2785				}
   2786				snap_rwsem_locked = true;
   2787			}
   2788			if ((have & want) == want)
   2789				*got = need | want;
   2790			else
   2791				*got = need;
   2792			ceph_take_cap_refs(ci, *got, true);
   2793			ret = 1;
   2794		}
   2795	} else {
   2796		int session_readonly = false;
   2797		int mds_wanted;
   2798		if (ci->i_auth_cap &&
   2799		    (need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) {
   2800			struct ceph_mds_session *s = ci->i_auth_cap->session;
   2801			spin_lock(&s->s_cap_lock);
   2802			session_readonly = s->s_readonly;
   2803			spin_unlock(&s->s_cap_lock);
   2804		}
   2805		if (session_readonly) {
   2806			dout("get_cap_refs %p need %s but mds%d readonly\n",
   2807			     inode, ceph_cap_string(need), ci->i_auth_cap->mds);
   2808			ret = -EROFS;
   2809			goto out_unlock;
   2810		}
   2811
   2812		if (ceph_inode_is_shutdown(inode)) {
   2813			dout("get_cap_refs %p inode is shutdown\n", inode);
   2814			ret = -ESTALE;
   2815			goto out_unlock;
   2816		}
   2817		mds_wanted = __ceph_caps_mds_wanted(ci, false);
   2818		if (need & ~mds_wanted) {
   2819			dout("get_cap_refs %p need %s > mds_wanted %s\n",
   2820			     inode, ceph_cap_string(need),
   2821			     ceph_cap_string(mds_wanted));
   2822			ret = -EUCLEAN;
   2823			goto out_unlock;
   2824		}
   2825
   2826		dout("get_cap_refs %p have %s need %s\n", inode,
   2827		     ceph_cap_string(have), ceph_cap_string(need));
   2828	}
   2829out_unlock:
   2830
   2831	__ceph_touch_fmode(ci, mdsc, flags);
   2832
   2833	spin_unlock(&ci->i_ceph_lock);
   2834	if (snap_rwsem_locked)
   2835		up_read(&mdsc->snap_rwsem);
   2836
   2837	if (!ret)
   2838		ceph_update_cap_mis(&mdsc->metric);
   2839	else if (ret == 1)
   2840		ceph_update_cap_hit(&mdsc->metric);
   2841
   2842	dout("get_cap_refs %p ret %d got %s\n", inode,
   2843	     ret, ceph_cap_string(*got));
   2844	return ret;
   2845}
   2846
   2847/*
   2848 * Check the offset we are writing up to against our current
   2849 * max_size.  If necessary, tell the MDS we want to write to
   2850 * a larger offset.
   2851 */
   2852static void check_max_size(struct inode *inode, loff_t endoff)
   2853{
   2854	struct ceph_inode_info *ci = ceph_inode(inode);
   2855	int check = 0;
   2856
   2857	/* do we need to explicitly request a larger max_size? */
   2858	spin_lock(&ci->i_ceph_lock);
   2859	if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
   2860		dout("write %p at large endoff %llu, req max_size\n",
   2861		     inode, endoff);
   2862		ci->i_wanted_max_size = endoff;
   2863	}
   2864	/* duplicate ceph_check_caps()'s logic */
   2865	if (ci->i_auth_cap &&
   2866	    (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
   2867	    ci->i_wanted_max_size > ci->i_max_size &&
   2868	    ci->i_wanted_max_size > ci->i_requested_max_size)
   2869		check = 1;
   2870	spin_unlock(&ci->i_ceph_lock);
   2871	if (check)
   2872		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
   2873}
   2874
   2875static inline int get_used_fmode(int caps)
   2876{
   2877	int fmode = 0;
   2878	if (caps & CEPH_CAP_FILE_RD)
   2879		fmode |= CEPH_FILE_MODE_RD;
   2880	if (caps & CEPH_CAP_FILE_WR)
   2881		fmode |= CEPH_FILE_MODE_WR;
   2882	return fmode;
   2883}
   2884
   2885int ceph_try_get_caps(struct inode *inode, int need, int want,
   2886		      bool nonblock, int *got)
   2887{
   2888	int ret, flags;
   2889
   2890	BUG_ON(need & ~CEPH_CAP_FILE_RD);
   2891	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
   2892			CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
   2893			CEPH_CAP_ANY_DIR_OPS));
   2894	if (need) {
   2895		ret = ceph_pool_perm_check(inode, need);
   2896		if (ret < 0)
   2897			return ret;
   2898	}
   2899
   2900	flags = get_used_fmode(need | want);
   2901	if (nonblock)
   2902		flags |= NON_BLOCKING;
   2903
   2904	ret = try_get_cap_refs(inode, need, want, 0, flags, got);
   2905	/* three special error codes */
   2906	if (ret == -EAGAIN || ret == -EFBIG || ret == -EUCLEAN)
   2907		ret = 0;
   2908	return ret;
   2909}
   2910
   2911/*
   2912 * Wait for caps, and take cap references.  If we can't get a WR cap
   2913 * due to a small max_size, make sure we check_max_size (and possibly
   2914 * ask the mds) so we don't get hung up indefinitely.
   2915 */
   2916int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got)
   2917{
   2918	struct ceph_file_info *fi = filp->private_data;
   2919	struct inode *inode = file_inode(filp);
   2920	struct ceph_inode_info *ci = ceph_inode(inode);
   2921	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
   2922	int ret, _got, flags;
   2923
   2924	ret = ceph_pool_perm_check(inode, need);
   2925	if (ret < 0)
   2926		return ret;
   2927
   2928	if ((fi->fmode & CEPH_FILE_MODE_WR) &&
   2929	    fi->filp_gen != READ_ONCE(fsc->filp_gen))
   2930		return -EBADF;
   2931
   2932	flags = get_used_fmode(need | want);
   2933
   2934	while (true) {
   2935		flags &= CEPH_FILE_MODE_MASK;
   2936		if (atomic_read(&fi->num_locks))
   2937			flags |= CHECK_FILELOCK;
   2938		_got = 0;
   2939		ret = try_get_cap_refs(inode, need, want, endoff,
   2940				       flags, &_got);
   2941		WARN_ON_ONCE(ret == -EAGAIN);
   2942		if (!ret) {
   2943			struct ceph_mds_client *mdsc = fsc->mdsc;
   2944			struct cap_wait cw;
   2945			DEFINE_WAIT_FUNC(wait, woken_wake_function);
   2946
   2947			cw.ino = ceph_ino(inode);
   2948			cw.tgid = current->tgid;
   2949			cw.need = need;
   2950			cw.want = want;
   2951
   2952			spin_lock(&mdsc->caps_list_lock);
   2953			list_add(&cw.list, &mdsc->cap_wait_list);
   2954			spin_unlock(&mdsc->caps_list_lock);
   2955
   2956			/* make sure used fmode not timeout */
   2957			ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS);
   2958			add_wait_queue(&ci->i_cap_wq, &wait);
   2959
   2960			flags |= NON_BLOCKING;
   2961			while (!(ret = try_get_cap_refs(inode, need, want,
   2962							endoff, flags, &_got))) {
   2963				if (signal_pending(current)) {
   2964					ret = -ERESTARTSYS;
   2965					break;
   2966				}
   2967				wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
   2968			}
   2969
   2970			remove_wait_queue(&ci->i_cap_wq, &wait);
   2971			ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS);
   2972
   2973			spin_lock(&mdsc->caps_list_lock);
   2974			list_del(&cw.list);
   2975			spin_unlock(&mdsc->caps_list_lock);
   2976
   2977			if (ret == -EAGAIN)
   2978				continue;
   2979		}
   2980
   2981		if ((fi->fmode & CEPH_FILE_MODE_WR) &&
   2982		    fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
   2983			if (ret >= 0 && _got)
   2984				ceph_put_cap_refs(ci, _got);
   2985			return -EBADF;
   2986		}
   2987
   2988		if (ret < 0) {
   2989			if (ret == -EFBIG || ret == -EUCLEAN) {
   2990				int ret2 = ceph_wait_on_async_create(inode);
   2991				if (ret2 < 0)
   2992					return ret2;
   2993			}
   2994			if (ret == -EFBIG) {
   2995				check_max_size(inode, endoff);
   2996				continue;
   2997			}
   2998			if (ret == -EUCLEAN) {
   2999				/* session was killed, try renew caps */
   3000				ret = ceph_renew_caps(inode, flags);
   3001				if (ret == 0)
   3002					continue;
   3003			}
   3004			return ret;
   3005		}
   3006
   3007		if (S_ISREG(ci->netfs.inode.i_mode) &&
   3008		    ci->i_inline_version != CEPH_INLINE_NONE &&
   3009		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
   3010		    i_size_read(inode) > 0) {
   3011			struct page *page =
   3012				find_get_page(inode->i_mapping, 0);
   3013			if (page) {
   3014				bool uptodate = PageUptodate(page);
   3015
   3016				put_page(page);
   3017				if (uptodate)
   3018					break;
   3019			}
   3020			/*
   3021			 * drop cap refs first because getattr while
   3022			 * holding * caps refs can cause deadlock.
   3023			 */
   3024			ceph_put_cap_refs(ci, _got);
   3025			_got = 0;
   3026
   3027			/*
   3028			 * getattr request will bring inline data into
   3029			 * page cache
   3030			 */
   3031			ret = __ceph_do_getattr(inode, NULL,
   3032						CEPH_STAT_CAP_INLINE_DATA,
   3033						true);
   3034			if (ret < 0)
   3035				return ret;
   3036			continue;
   3037		}
   3038		break;
   3039	}
   3040	*got = _got;
   3041	return 0;
   3042}
   3043
   3044/*
   3045 * Take cap refs.  Caller must already know we hold at least one ref
   3046 * on the caps in question or we don't know this is safe.
   3047 */
   3048void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
   3049{
   3050	spin_lock(&ci->i_ceph_lock);
   3051	ceph_take_cap_refs(ci, caps, false);
   3052	spin_unlock(&ci->i_ceph_lock);
   3053}
   3054
   3055
   3056/*
   3057 * drop cap_snap that is not associated with any snapshot.
   3058 * we don't need to send FLUSHSNAP message for it.
   3059 */
   3060static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
   3061				  struct ceph_cap_snap *capsnap)
   3062{
   3063	if (!capsnap->need_flush &&
   3064	    !capsnap->writing && !capsnap->dirty_pages) {
   3065		dout("dropping cap_snap %p follows %llu\n",
   3066		     capsnap, capsnap->follows);
   3067		BUG_ON(capsnap->cap_flush.tid > 0);
   3068		ceph_put_snap_context(capsnap->context);
   3069		if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
   3070			ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
   3071
   3072		list_del(&capsnap->ci_item);
   3073		ceph_put_cap_snap(capsnap);
   3074		return 1;
   3075	}
   3076	return 0;
   3077}
   3078
   3079enum put_cap_refs_mode {
   3080	PUT_CAP_REFS_SYNC = 0,
   3081	PUT_CAP_REFS_NO_CHECK,
   3082	PUT_CAP_REFS_ASYNC,
   3083};
   3084
   3085/*
   3086 * Release cap refs.
   3087 *
   3088 * If we released the last ref on any given cap, call ceph_check_caps
   3089 * to release (or schedule a release).
   3090 *
   3091 * If we are releasing a WR cap (from a sync write), finalize any affected
   3092 * cap_snap, and wake up any waiters.
   3093 */
   3094static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
   3095				enum put_cap_refs_mode mode)
   3096{
   3097	struct inode *inode = &ci->netfs.inode;
   3098	int last = 0, put = 0, flushsnaps = 0, wake = 0;
   3099	bool check_flushsnaps = false;
   3100
   3101	spin_lock(&ci->i_ceph_lock);
   3102	if (had & CEPH_CAP_PIN)
   3103		--ci->i_pin_ref;
   3104	if (had & CEPH_CAP_FILE_RD)
   3105		if (--ci->i_rd_ref == 0)
   3106			last++;
   3107	if (had & CEPH_CAP_FILE_CACHE)
   3108		if (--ci->i_rdcache_ref == 0)
   3109			last++;
   3110	if (had & CEPH_CAP_FILE_EXCL)
   3111		if (--ci->i_fx_ref == 0)
   3112			last++;
   3113	if (had & CEPH_CAP_FILE_BUFFER) {
   3114		if (--ci->i_wb_ref == 0) {
   3115			last++;
   3116			/* put the ref held by ceph_take_cap_refs() */
   3117			put++;
   3118			check_flushsnaps = true;
   3119		}
   3120		dout("put_cap_refs %p wb %d -> %d (?)\n",
   3121		     inode, ci->i_wb_ref+1, ci->i_wb_ref);
   3122	}
   3123	if (had & CEPH_CAP_FILE_WR) {
   3124		if (--ci->i_wr_ref == 0) {
   3125			last++;
   3126			check_flushsnaps = true;
   3127			if (ci->i_wrbuffer_ref_head == 0 &&
   3128			    ci->i_dirty_caps == 0 &&
   3129			    ci->i_flushing_caps == 0) {
   3130				BUG_ON(!ci->i_head_snapc);
   3131				ceph_put_snap_context(ci->i_head_snapc);
   3132				ci->i_head_snapc = NULL;
   3133			}
   3134			/* see comment in __ceph_remove_cap() */
   3135			if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
   3136				ceph_change_snap_realm(inode, NULL);
   3137		}
   3138	}
   3139	if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
   3140		struct ceph_cap_snap *capsnap =
   3141			list_last_entry(&ci->i_cap_snaps,
   3142					struct ceph_cap_snap,
   3143					ci_item);
   3144
   3145		capsnap->writing = 0;
   3146		if (ceph_try_drop_cap_snap(ci, capsnap))
   3147			/* put the ref held by ceph_queue_cap_snap() */
   3148			put++;
   3149		else if (__ceph_finish_cap_snap(ci, capsnap))
   3150			flushsnaps = 1;
   3151		wake = 1;
   3152	}
   3153	spin_unlock(&ci->i_ceph_lock);
   3154
   3155	dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
   3156	     last ? " last" : "", put ? " put" : "");
   3157
   3158	switch (mode) {
   3159	case PUT_CAP_REFS_SYNC:
   3160		if (last)
   3161			ceph_check_caps(ci, 0, NULL);
   3162		else if (flushsnaps)
   3163			ceph_flush_snaps(ci, NULL);
   3164		break;
   3165	case PUT_CAP_REFS_ASYNC:
   3166		if (last)
   3167			ceph_queue_check_caps(inode);
   3168		else if (flushsnaps)
   3169			ceph_queue_flush_snaps(inode);
   3170		break;
   3171	default:
   3172		break;
   3173	}
   3174	if (wake)
   3175		wake_up_all(&ci->i_cap_wq);
   3176	while (put-- > 0)
   3177		iput(inode);
   3178}
   3179
   3180void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
   3181{
   3182	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_SYNC);
   3183}
   3184
   3185void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had)
   3186{
   3187	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_ASYNC);
   3188}
   3189
   3190void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
   3191{
   3192	__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_NO_CHECK);
   3193}
   3194
   3195/*
   3196 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
   3197 * context.  Adjust per-snap dirty page accounting as appropriate.
   3198 * Once all dirty data for a cap_snap is flushed, flush snapped file
   3199 * metadata back to the MDS.  If we dropped the last ref, call
   3200 * ceph_check_caps.
   3201 */
   3202void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
   3203				struct ceph_snap_context *snapc)
   3204{
   3205	struct inode *inode = &ci->netfs.inode;
   3206	struct ceph_cap_snap *capsnap = NULL, *iter;
   3207	int put = 0;
   3208	bool last = false;
   3209	bool flush_snaps = false;
   3210	bool complete_capsnap = false;
   3211
   3212	spin_lock(&ci->i_ceph_lock);
   3213	ci->i_wrbuffer_ref -= nr;
   3214	if (ci->i_wrbuffer_ref == 0) {
   3215		last = true;
   3216		put++;
   3217	}
   3218
   3219	if (ci->i_head_snapc == snapc) {
   3220		ci->i_wrbuffer_ref_head -= nr;
   3221		if (ci->i_wrbuffer_ref_head == 0 &&
   3222		    ci->i_wr_ref == 0 &&
   3223		    ci->i_dirty_caps == 0 &&
   3224		    ci->i_flushing_caps == 0) {
   3225			BUG_ON(!ci->i_head_snapc);
   3226			ceph_put_snap_context(ci->i_head_snapc);
   3227			ci->i_head_snapc = NULL;
   3228		}
   3229		dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
   3230		     inode,
   3231		     ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
   3232		     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
   3233		     last ? " LAST" : "");
   3234	} else {
   3235		list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
   3236			if (iter->context == snapc) {
   3237				capsnap = iter;
   3238				break;
   3239			}
   3240		}
   3241
   3242		if (!capsnap) {
   3243			/*
   3244			 * The capsnap should already be removed when removing
   3245			 * auth cap in the case of a forced unmount.
   3246			 */
   3247			WARN_ON_ONCE(ci->i_auth_cap);
   3248			goto unlock;
   3249		}
   3250
   3251		capsnap->dirty_pages -= nr;
   3252		if (capsnap->dirty_pages == 0) {
   3253			complete_capsnap = true;
   3254			if (!capsnap->writing) {
   3255				if (ceph_try_drop_cap_snap(ci, capsnap)) {
   3256					put++;
   3257				} else {
   3258					ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
   3259					flush_snaps = true;
   3260				}
   3261			}
   3262		}
   3263		dout("put_wrbuffer_cap_refs on %p cap_snap %p "
   3264		     " snap %lld %d/%d -> %d/%d %s%s\n",
   3265		     inode, capsnap, capsnap->context->seq,
   3266		     ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
   3267		     ci->i_wrbuffer_ref, capsnap->dirty_pages,
   3268		     last ? " (wrbuffer last)" : "",
   3269		     complete_capsnap ? " (complete capsnap)" : "");
   3270	}
   3271
   3272unlock:
   3273	spin_unlock(&ci->i_ceph_lock);
   3274
   3275	if (last) {
   3276		ceph_check_caps(ci, 0, NULL);
   3277	} else if (flush_snaps) {
   3278		ceph_flush_snaps(ci, NULL);
   3279	}
   3280	if (complete_capsnap)
   3281		wake_up_all(&ci->i_cap_wq);
   3282	while (put-- > 0) {
   3283		iput(inode);
   3284	}
   3285}
   3286
   3287/*
   3288 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
   3289 */
   3290static void invalidate_aliases(struct inode *inode)
   3291{
   3292	struct dentry *dn, *prev = NULL;
   3293
   3294	dout("invalidate_aliases inode %p\n", inode);
   3295	d_prune_aliases(inode);
   3296	/*
   3297	 * For non-directory inode, d_find_alias() only returns
   3298	 * hashed dentry. After calling d_invalidate(), the
   3299	 * dentry becomes unhashed.
   3300	 *
   3301	 * For directory inode, d_find_alias() can return
   3302	 * unhashed dentry. But directory inode should have
   3303	 * one alias at most.
   3304	 */
   3305	while ((dn = d_find_alias(inode))) {
   3306		if (dn == prev) {
   3307			dput(dn);
   3308			break;
   3309		}
   3310		d_invalidate(dn);
   3311		if (prev)
   3312			dput(prev);
   3313		prev = dn;
   3314	}
   3315	if (prev)
   3316		dput(prev);
   3317}
   3318
   3319struct cap_extra_info {
   3320	struct ceph_string *pool_ns;
   3321	/* inline data */
   3322	u64 inline_version;
   3323	void *inline_data;
   3324	u32 inline_len;
   3325	/* dirstat */
   3326	bool dirstat_valid;
   3327	u64 nfiles;
   3328	u64 nsubdirs;
   3329	u64 change_attr;
   3330	/* currently issued */
   3331	int issued;
   3332	struct timespec64 btime;
   3333};
   3334
   3335/*
   3336 * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
   3337 * actually be a revocation if it specifies a smaller cap set.)
   3338 *
   3339 * caller holds s_mutex and i_ceph_lock, we drop both.
   3340 */
   3341static void handle_cap_grant(struct inode *inode,
   3342			     struct ceph_mds_session *session,
   3343			     struct ceph_cap *cap,
   3344			     struct ceph_mds_caps *grant,
   3345			     struct ceph_buffer *xattr_buf,
   3346			     struct cap_extra_info *extra_info)
   3347	__releases(ci->i_ceph_lock)
   3348	__releases(session->s_mdsc->snap_rwsem)
   3349{
   3350	struct ceph_inode_info *ci = ceph_inode(inode);
   3351	int seq = le32_to_cpu(grant->seq);
   3352	int newcaps = le32_to_cpu(grant->caps);
   3353	int used, wanted, dirty;
   3354	u64 size = le64_to_cpu(grant->size);
   3355	u64 max_size = le64_to_cpu(grant->max_size);
   3356	unsigned char check_caps = 0;
   3357	bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
   3358	bool wake = false;
   3359	bool writeback = false;
   3360	bool queue_trunc = false;
   3361	bool queue_invalidate = false;
   3362	bool deleted_inode = false;
   3363	bool fill_inline = false;
   3364
   3365	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
   3366	     inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
   3367	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
   3368		i_size_read(inode));
   3369
   3370
   3371	/*
   3372	 * If CACHE is being revoked, and we have no dirty buffers,
   3373	 * try to invalidate (once).  (If there are dirty buffers, we
   3374	 * will invalidate _after_ writeback.)
   3375	 */
   3376	if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
   3377	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
   3378	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
   3379	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
   3380		if (try_nonblocking_invalidate(inode)) {
   3381			/* there were locked pages.. invalidate later
   3382			   in a separate thread. */
   3383			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
   3384				queue_invalidate = true;
   3385				ci->i_rdcache_revoking = ci->i_rdcache_gen;
   3386			}
   3387		}
   3388	}
   3389
   3390	if (was_stale)
   3391		cap->issued = cap->implemented = CEPH_CAP_PIN;
   3392
   3393	/*
   3394	 * auth mds of the inode changed. we received the cap export message,
   3395	 * but still haven't received the cap import message. handle_cap_export
   3396	 * updated the new auth MDS' cap.
   3397	 *
   3398	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
   3399	 * that was sent before the cap import message. So don't remove caps.
   3400	 */
   3401	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
   3402		WARN_ON(cap != ci->i_auth_cap);
   3403		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
   3404		seq = cap->seq;
   3405		newcaps |= cap->issued;
   3406	}
   3407
   3408	/* side effects now are allowed */
   3409	cap->cap_gen = atomic_read(&session->s_cap_gen);
   3410	cap->seq = seq;
   3411
   3412	__check_cap_issue(ci, cap, newcaps);
   3413
   3414	inode_set_max_iversion_raw(inode, extra_info->change_attr);
   3415
   3416	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
   3417	    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
   3418		umode_t mode = le32_to_cpu(grant->mode);
   3419
   3420		if (inode_wrong_type(inode, mode))
   3421			pr_warn_once("inode type changed! (ino %llx.%llx is 0%o, mds says 0%o)\n",
   3422				     ceph_vinop(inode), inode->i_mode, mode);
   3423		else
   3424			inode->i_mode = mode;
   3425		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
   3426		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
   3427		ci->i_btime = extra_info->btime;
   3428		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
   3429		     from_kuid(&init_user_ns, inode->i_uid),
   3430		     from_kgid(&init_user_ns, inode->i_gid));
   3431	}
   3432
   3433	if ((newcaps & CEPH_CAP_LINK_SHARED) &&
   3434	    (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
   3435		set_nlink(inode, le32_to_cpu(grant->nlink));
   3436		if (inode->i_nlink == 0)
   3437			deleted_inode = true;
   3438	}
   3439
   3440	if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
   3441	    grant->xattr_len) {
   3442		int len = le32_to_cpu(grant->xattr_len);
   3443		u64 version = le64_to_cpu(grant->xattr_version);
   3444
   3445		if (version > ci->i_xattrs.version) {
   3446			dout(" got new xattrs v%llu on %p len %d\n",
   3447			     version, inode, len);
   3448			if (ci->i_xattrs.blob)
   3449				ceph_buffer_put(ci->i_xattrs.blob);
   3450			ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
   3451			ci->i_xattrs.version = version;
   3452			ceph_forget_all_cached_acls(inode);
   3453			ceph_security_invalidate_secctx(inode);
   3454		}
   3455	}
   3456
   3457	if (newcaps & CEPH_CAP_ANY_RD) {
   3458		struct timespec64 mtime, atime, ctime;
   3459		/* ctime/mtime/atime? */
   3460		ceph_decode_timespec64(&mtime, &grant->mtime);
   3461		ceph_decode_timespec64(&atime, &grant->atime);
   3462		ceph_decode_timespec64(&ctime, &grant->ctime);
   3463		ceph_fill_file_time(inode, extra_info->issued,
   3464				    le32_to_cpu(grant->time_warp_seq),
   3465				    &ctime, &mtime, &atime);
   3466	}
   3467
   3468	if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
   3469		ci->i_files = extra_info->nfiles;
   3470		ci->i_subdirs = extra_info->nsubdirs;
   3471	}
   3472
   3473	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
   3474		/* file layout may have changed */
   3475		s64 old_pool = ci->i_layout.pool_id;
   3476		struct ceph_string *old_ns;
   3477
   3478		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
   3479		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
   3480					lockdep_is_held(&ci->i_ceph_lock));
   3481		rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
   3482
   3483		if (ci->i_layout.pool_id != old_pool ||
   3484		    extra_info->pool_ns != old_ns)
   3485			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
   3486
   3487		extra_info->pool_ns = old_ns;
   3488
   3489		/* size/truncate_seq? */
   3490		queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
   3491					le32_to_cpu(grant->truncate_seq),
   3492					le64_to_cpu(grant->truncate_size),
   3493					size);
   3494	}
   3495
   3496	if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
   3497		if (max_size != ci->i_max_size) {
   3498			dout("max_size %lld -> %llu\n",
   3499			     ci->i_max_size, max_size);
   3500			ci->i_max_size = max_size;
   3501			if (max_size >= ci->i_wanted_max_size) {
   3502				ci->i_wanted_max_size = 0;  /* reset */
   3503				ci->i_requested_max_size = 0;
   3504			}
   3505			wake = true;
   3506		}
   3507	}
   3508
   3509	/* check cap bits */
   3510	wanted = __ceph_caps_wanted(ci);
   3511	used = __ceph_caps_used(ci);
   3512	dirty = __ceph_caps_dirty(ci);
   3513	dout(" my wanted = %s, used = %s, dirty %s\n",
   3514	     ceph_cap_string(wanted),
   3515	     ceph_cap_string(used),
   3516	     ceph_cap_string(dirty));
   3517
   3518	if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
   3519	    (wanted & ~(cap->mds_wanted | newcaps))) {
   3520		/*
   3521		 * If mds is importing cap, prior cap messages that update
   3522		 * 'wanted' may get dropped by mds (migrate seq mismatch).
   3523		 *
   3524		 * We don't send cap message to update 'wanted' if what we
   3525		 * want are already issued. If mds revokes caps, cap message
   3526		 * that releases caps also tells mds what we want. But if
   3527		 * caps got revoked by mds forcedly (session stale). We may
   3528		 * haven't told mds what we want.
   3529		 */
   3530		check_caps = 1;
   3531	}
   3532
   3533	/* revocation, grant, or no-op? */
   3534	if (cap->issued & ~newcaps) {
   3535		int revoking = cap->issued & ~newcaps;
   3536
   3537		dout("revocation: %s -> %s (revoking %s)\n",
   3538		     ceph_cap_string(cap->issued),
   3539		     ceph_cap_string(newcaps),
   3540		     ceph_cap_string(revoking));
   3541		if (S_ISREG(inode->i_mode) &&
   3542		    (revoking & used & CEPH_CAP_FILE_BUFFER))
   3543			writeback = true;  /* initiate writeback; will delay ack */
   3544		else if (queue_invalidate &&
   3545			 revoking == CEPH_CAP_FILE_CACHE &&
   3546			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0)
   3547			; /* do nothing yet, invalidation will be queued */
   3548		else if (cap == ci->i_auth_cap)
   3549			check_caps = 1; /* check auth cap only */
   3550		else
   3551			check_caps = 2; /* check all caps */
   3552		cap->issued = newcaps;
   3553		cap->implemented |= newcaps;
   3554	} else if (cap->issued == newcaps) {
   3555		dout("caps unchanged: %s -> %s\n",
   3556		     ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
   3557	} else {
   3558		dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
   3559		     ceph_cap_string(newcaps));
   3560		/* non-auth MDS is revoking the newly grant caps ? */
   3561		if (cap == ci->i_auth_cap &&
   3562		    __ceph_caps_revoking_other(ci, cap, newcaps))
   3563		    check_caps = 2;
   3564
   3565		cap->issued = newcaps;
   3566		cap->implemented |= newcaps; /* add bits only, to
   3567					      * avoid stepping on a
   3568					      * pending revocation */
   3569		wake = true;
   3570	}
   3571	BUG_ON(cap->issued & ~cap->implemented);
   3572
   3573	if (extra_info->inline_version > 0 &&
   3574	    extra_info->inline_version >= ci->i_inline_version) {
   3575		ci->i_inline_version = extra_info->inline_version;
   3576		if (ci->i_inline_version != CEPH_INLINE_NONE &&
   3577		    (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
   3578			fill_inline = true;
   3579	}
   3580
   3581	if (ci->i_auth_cap == cap &&
   3582	    le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
   3583		if (newcaps & ~extra_info->issued)
   3584			wake = true;
   3585
   3586		if (ci->i_requested_max_size > max_size ||
   3587		    !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
   3588			/* re-request max_size if necessary */
   3589			ci->i_requested_max_size = 0;
   3590			wake = true;
   3591		}
   3592
   3593		ceph_kick_flushing_inode_caps(session, ci);
   3594		spin_unlock(&ci->i_ceph_lock);
   3595		up_read(&session->s_mdsc->snap_rwsem);
   3596	} else {
   3597		spin_unlock(&ci->i_ceph_lock);
   3598	}
   3599
   3600	if (fill_inline)
   3601		ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
   3602				      extra_info->inline_len);
   3603
   3604	if (queue_trunc)
   3605		ceph_queue_vmtruncate(inode);
   3606
   3607	if (writeback)
   3608		/*
   3609		 * queue inode for writeback: we can't actually call
   3610		 * filemap_write_and_wait, etc. from message handler
   3611		 * context.
   3612		 */
   3613		ceph_queue_writeback(inode);
   3614	if (queue_invalidate)
   3615		ceph_queue_invalidate(inode);
   3616	if (deleted_inode)
   3617		invalidate_aliases(inode);
   3618	if (wake)
   3619		wake_up_all(&ci->i_cap_wq);
   3620
   3621	mutex_unlock(&session->s_mutex);
   3622	if (check_caps == 1)
   3623		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
   3624				session);
   3625	else if (check_caps == 2)
   3626		ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
   3627}
   3628
   3629/*
   3630 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
   3631 * MDS has been safely committed.
   3632 */
   3633static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
   3634				 struct ceph_mds_caps *m,
   3635				 struct ceph_mds_session *session,
   3636				 struct ceph_cap *cap)
   3637	__releases(ci->i_ceph_lock)
   3638{
   3639	struct ceph_inode_info *ci = ceph_inode(inode);
   3640	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   3641	struct ceph_cap_flush *cf, *tmp_cf;
   3642	LIST_HEAD(to_remove);
   3643	unsigned seq = le32_to_cpu(m->seq);
   3644	int dirty = le32_to_cpu(m->dirty);
   3645	int cleaned = 0;
   3646	bool drop = false;
   3647	bool wake_ci = false;
   3648	bool wake_mdsc = false;
   3649
   3650	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
   3651		/* Is this the one that was flushed? */
   3652		if (cf->tid == flush_tid)
   3653			cleaned = cf->caps;
   3654
   3655		/* Is this a capsnap? */
   3656		if (cf->is_capsnap)
   3657			continue;
   3658
   3659		if (cf->tid <= flush_tid) {
   3660			/*
   3661			 * An earlier or current tid. The FLUSH_ACK should
   3662			 * represent a superset of this flush's caps.
   3663			 */
   3664			wake_ci |= __detach_cap_flush_from_ci(ci, cf);
   3665			list_add_tail(&cf->i_list, &to_remove);
   3666		} else {
   3667			/*
   3668			 * This is a later one. Any caps in it are still dirty
   3669			 * so don't count them as cleaned.
   3670			 */
   3671			cleaned &= ~cf->caps;
   3672			if (!cleaned)
   3673				break;
   3674		}
   3675	}
   3676
   3677	dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
   3678	     " flushing %s -> %s\n",
   3679	     inode, session->s_mds, seq, ceph_cap_string(dirty),
   3680	     ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
   3681	     ceph_cap_string(ci->i_flushing_caps & ~cleaned));
   3682
   3683	if (list_empty(&to_remove) && !cleaned)
   3684		goto out;
   3685
   3686	ci->i_flushing_caps &= ~cleaned;
   3687
   3688	spin_lock(&mdsc->cap_dirty_lock);
   3689
   3690	list_for_each_entry(cf, &to_remove, i_list)
   3691		wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
   3692
   3693	if (ci->i_flushing_caps == 0) {
   3694		if (list_empty(&ci->i_cap_flush_list)) {
   3695			list_del_init(&ci->i_flushing_item);
   3696			if (!list_empty(&session->s_cap_flushing)) {
   3697				dout(" mds%d still flushing cap on %p\n",
   3698				     session->s_mds,
   3699				     &list_first_entry(&session->s_cap_flushing,
   3700						struct ceph_inode_info,
   3701						i_flushing_item)->netfs.inode);
   3702			}
   3703		}
   3704		mdsc->num_cap_flushing--;
   3705		dout(" inode %p now !flushing\n", inode);
   3706
   3707		if (ci->i_dirty_caps == 0) {
   3708			dout(" inode %p now clean\n", inode);
   3709			BUG_ON(!list_empty(&ci->i_dirty_item));
   3710			drop = true;
   3711			if (ci->i_wr_ref == 0 &&
   3712			    ci->i_wrbuffer_ref_head == 0) {
   3713				BUG_ON(!ci->i_head_snapc);
   3714				ceph_put_snap_context(ci->i_head_snapc);
   3715				ci->i_head_snapc = NULL;
   3716			}
   3717		} else {
   3718			BUG_ON(list_empty(&ci->i_dirty_item));
   3719		}
   3720	}
   3721	spin_unlock(&mdsc->cap_dirty_lock);
   3722
   3723out:
   3724	spin_unlock(&ci->i_ceph_lock);
   3725
   3726	while (!list_empty(&to_remove)) {
   3727		cf = list_first_entry(&to_remove,
   3728				      struct ceph_cap_flush, i_list);
   3729		list_del_init(&cf->i_list);
   3730		if (!cf->is_capsnap)
   3731			ceph_free_cap_flush(cf);
   3732	}
   3733
   3734	if (wake_ci)
   3735		wake_up_all(&ci->i_cap_wq);
   3736	if (wake_mdsc)
   3737		wake_up_all(&mdsc->cap_flushing_wq);
   3738	if (drop)
   3739		iput(inode);
   3740}
   3741
   3742void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
   3743			   bool *wake_ci, bool *wake_mdsc)
   3744{
   3745	struct ceph_inode_info *ci = ceph_inode(inode);
   3746	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   3747	bool ret;
   3748
   3749	lockdep_assert_held(&ci->i_ceph_lock);
   3750
   3751	dout("removing capsnap %p, inode %p ci %p\n", capsnap, inode, ci);
   3752
   3753	list_del_init(&capsnap->ci_item);
   3754	ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
   3755	if (wake_ci)
   3756		*wake_ci = ret;
   3757
   3758	spin_lock(&mdsc->cap_dirty_lock);
   3759	if (list_empty(&ci->i_cap_flush_list))
   3760		list_del_init(&ci->i_flushing_item);
   3761
   3762	ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush);
   3763	if (wake_mdsc)
   3764		*wake_mdsc = ret;
   3765	spin_unlock(&mdsc->cap_dirty_lock);
   3766}
   3767
   3768void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
   3769			 bool *wake_ci, bool *wake_mdsc)
   3770{
   3771	struct ceph_inode_info *ci = ceph_inode(inode);
   3772
   3773	lockdep_assert_held(&ci->i_ceph_lock);
   3774
   3775	WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing);
   3776	__ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc);
   3777}
   3778
   3779/*
   3780 * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
   3781 * throw away our cap_snap.
   3782 *
   3783 * Caller hold s_mutex.
   3784 */
   3785static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
   3786				     struct ceph_mds_caps *m,
   3787				     struct ceph_mds_session *session)
   3788{
   3789	struct ceph_inode_info *ci = ceph_inode(inode);
   3790	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
   3791	u64 follows = le64_to_cpu(m->snap_follows);
   3792	struct ceph_cap_snap *capsnap = NULL, *iter;
   3793	bool wake_ci = false;
   3794	bool wake_mdsc = false;
   3795
   3796	dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
   3797	     inode, ci, session->s_mds, follows);
   3798
   3799	spin_lock(&ci->i_ceph_lock);
   3800	list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
   3801		if (iter->follows == follows) {
   3802			if (iter->cap_flush.tid != flush_tid) {
   3803				dout(" cap_snap %p follows %lld tid %lld !="
   3804				     " %lld\n", iter, follows,
   3805				     flush_tid, iter->cap_flush.tid);
   3806				break;
   3807			}
   3808			capsnap = iter;
   3809			break;
   3810		} else {
   3811			dout(" skipping cap_snap %p follows %lld\n",
   3812			     iter, iter->follows);
   3813		}
   3814	}
   3815	if (capsnap)
   3816		ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
   3817	spin_unlock(&ci->i_ceph_lock);
   3818
   3819	if (capsnap) {
   3820		ceph_put_snap_context(capsnap->context);
   3821		ceph_put_cap_snap(capsnap);
   3822		if (wake_ci)
   3823			wake_up_all(&ci->i_cap_wq);
   3824		if (wake_mdsc)
   3825			wake_up_all(&mdsc->cap_flushing_wq);
   3826		iput(inode);
   3827	}
   3828}
   3829
   3830/*
   3831 * Handle TRUNC from MDS, indicating file truncation.
   3832 *
   3833 * caller hold s_mutex.
   3834 */
   3835static bool handle_cap_trunc(struct inode *inode,
   3836			     struct ceph_mds_caps *trunc,
   3837			     struct ceph_mds_session *session)
   3838{
   3839	struct ceph_inode_info *ci = ceph_inode(inode);
   3840	int mds = session->s_mds;
   3841	int seq = le32_to_cpu(trunc->seq);
   3842	u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
   3843	u64 truncate_size = le64_to_cpu(trunc->truncate_size);
   3844	u64 size = le64_to_cpu(trunc->size);
   3845	int implemented = 0;
   3846	int dirty = __ceph_caps_dirty(ci);
   3847	int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
   3848	bool queue_trunc = false;
   3849
   3850	lockdep_assert_held(&ci->i_ceph_lock);
   3851
   3852	issued |= implemented | dirty;
   3853
   3854	dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
   3855	     inode, mds, seq, truncate_size, truncate_seq);
   3856	queue_trunc = ceph_fill_file_size(inode, issued,
   3857					  truncate_seq, truncate_size, size);
   3858	return queue_trunc;
   3859}
   3860
   3861/*
   3862 * Handle EXPORT from MDS.  Cap is being migrated _from_ this mds to a
   3863 * different one.  If we are the most recent migration we've seen (as
   3864 * indicated by mseq), make note of the migrating cap bits for the
   3865 * duration (until we see the corresponding IMPORT).
   3866 *
   3867 * caller holds s_mutex
   3868 */
   3869static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
   3870			      struct ceph_mds_cap_peer *ph,
   3871			      struct ceph_mds_session *session)
   3872{
   3873	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
   3874	struct ceph_mds_session *tsession = NULL;
   3875	struct ceph_cap *cap, *tcap, *new_cap = NULL;
   3876	struct ceph_inode_info *ci = ceph_inode(inode);
   3877	u64 t_cap_id;
   3878	unsigned mseq = le32_to_cpu(ex->migrate_seq);
   3879	unsigned t_seq, t_mseq;
   3880	int target, issued;
   3881	int mds = session->s_mds;
   3882
   3883	if (ph) {
   3884		t_cap_id = le64_to_cpu(ph->cap_id);
   3885		t_seq = le32_to_cpu(ph->seq);
   3886		t_mseq = le32_to_cpu(ph->mseq);
   3887		target = le32_to_cpu(ph->mds);
   3888	} else {
   3889		t_cap_id = t_seq = t_mseq = 0;
   3890		target = -1;
   3891	}
   3892
   3893	dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
   3894	     inode, ci, mds, mseq, target);
   3895retry:
   3896	down_read(&mdsc->snap_rwsem);
   3897	spin_lock(&ci->i_ceph_lock);
   3898	cap = __get_cap_for_mds(ci, mds);
   3899	if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
   3900		goto out_unlock;
   3901
   3902	if (target < 0) {
   3903		ceph_remove_cap(cap, false);
   3904		goto out_unlock;
   3905	}
   3906
   3907	/*
   3908	 * now we know we haven't received the cap import message yet
   3909	 * because the exported cap still exist.
   3910	 */
   3911
   3912	issued = cap->issued;
   3913	if (issued != cap->implemented)
   3914		pr_err_ratelimited("handle_cap_export: issued != implemented: "
   3915				"ino (%llx.%llx) mds%d seq %d mseq %d "
   3916				"issued %s implemented %s\n",
   3917				ceph_vinop(inode), mds, cap->seq, cap->mseq,
   3918				ceph_cap_string(issued),
   3919				ceph_cap_string(cap->implemented));
   3920
   3921
   3922	tcap = __get_cap_for_mds(ci, target);
   3923	if (tcap) {
   3924		/* already have caps from the target */
   3925		if (tcap->cap_id == t_cap_id &&
   3926		    ceph_seq_cmp(tcap->seq, t_seq) < 0) {
   3927			dout(" updating import cap %p mds%d\n", tcap, target);
   3928			tcap->cap_id = t_cap_id;
   3929			tcap->seq = t_seq - 1;
   3930			tcap->issue_seq = t_seq - 1;
   3931			tcap->issued |= issued;
   3932			tcap->implemented |= issued;
   3933			if (cap == ci->i_auth_cap) {
   3934				ci->i_auth_cap = tcap;
   3935				change_auth_cap_ses(ci, tcap->session);
   3936			}
   3937		}
   3938		ceph_remove_cap(cap, false);
   3939		goto out_unlock;
   3940	} else if (tsession) {
   3941		/* add placeholder for the export tagert */
   3942		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
   3943		tcap = new_cap;
   3944		ceph_add_cap(inode, tsession, t_cap_id, issued, 0,
   3945			     t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
   3946
   3947		if (!list_empty(&ci->i_cap_flush_list) &&
   3948		    ci->i_auth_cap == tcap) {
   3949			spin_lock(&mdsc->cap_dirty_lock);
   3950			list_move_tail(&ci->i_flushing_item,
   3951				       &tcap->session->s_cap_flushing);
   3952			spin_unlock(&mdsc->cap_dirty_lock);
   3953		}
   3954
   3955		ceph_remove_cap(cap, false);
   3956		goto out_unlock;
   3957	}
   3958
   3959	spin_unlock(&ci->i_ceph_lock);
   3960	up_read(&mdsc->snap_rwsem);
   3961	mutex_unlock(&session->s_mutex);
   3962
   3963	/* open target session */
   3964	tsession = ceph_mdsc_open_export_target_session(mdsc, target);
   3965	if (!IS_ERR(tsession)) {
   3966		if (mds > target) {
   3967			mutex_lock(&session->s_mutex);
   3968			mutex_lock_nested(&tsession->s_mutex,
   3969					  SINGLE_DEPTH_NESTING);
   3970		} else {
   3971			mutex_lock(&tsession->s_mutex);
   3972			mutex_lock_nested(&session->s_mutex,
   3973					  SINGLE_DEPTH_NESTING);
   3974		}
   3975		new_cap = ceph_get_cap(mdsc, NULL);
   3976	} else {
   3977		WARN_ON(1);
   3978		tsession = NULL;
   3979		target = -1;
   3980		mutex_lock(&session->s_mutex);
   3981	}
   3982	goto retry;
   3983
   3984out_unlock:
   3985	spin_unlock(&ci->i_ceph_lock);
   3986	up_read(&mdsc->snap_rwsem);
   3987	mutex_unlock(&session->s_mutex);
   3988	if (tsession) {
   3989		mutex_unlock(&tsession->s_mutex);
   3990		ceph_put_mds_session(tsession);
   3991	}
   3992	if (new_cap)
   3993		ceph_put_cap(mdsc, new_cap);
   3994}
   3995
   3996/*
   3997 * Handle cap IMPORT.
   3998 *
   3999 * caller holds s_mutex. acquires i_ceph_lock
   4000 */
   4001static void handle_cap_import(struct ceph_mds_client *mdsc,
   4002			      struct inode *inode, struct ceph_mds_caps *im,
   4003			      struct ceph_mds_cap_peer *ph,
   4004			      struct ceph_mds_session *session,
   4005			      struct ceph_cap **target_cap, int *old_issued)
   4006{
   4007	struct ceph_inode_info *ci = ceph_inode(inode);
   4008	struct ceph_cap *cap, *ocap, *new_cap = NULL;
   4009	int mds = session->s_mds;
   4010	int issued;
   4011	unsigned caps = le32_to_cpu(im->caps);
   4012	unsigned wanted = le32_to_cpu(im->wanted);
   4013	unsigned seq = le32_to_cpu(im->seq);
   4014	unsigned mseq = le32_to_cpu(im->migrate_seq);
   4015	u64 realmino = le64_to_cpu(im->realm);
   4016	u64 cap_id = le64_to_cpu(im->cap_id);
   4017	u64 p_cap_id;
   4018	int peer;
   4019
   4020	if (ph) {
   4021		p_cap_id = le64_to_cpu(ph->cap_id);
   4022		peer = le32_to_cpu(ph->mds);
   4023	} else {
   4024		p_cap_id = 0;
   4025		peer = -1;
   4026	}
   4027
   4028	dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
   4029	     inode, ci, mds, mseq, peer);
   4030retry:
   4031	cap = __get_cap_for_mds(ci, mds);
   4032	if (!cap) {
   4033		if (!new_cap) {
   4034			spin_unlock(&ci->i_ceph_lock);
   4035			new_cap = ceph_get_cap(mdsc, NULL);
   4036			spin_lock(&ci->i_ceph_lock);
   4037			goto retry;
   4038		}
   4039		cap = new_cap;
   4040	} else {
   4041		if (new_cap) {
   4042			ceph_put_cap(mdsc, new_cap);
   4043			new_cap = NULL;
   4044		}
   4045	}
   4046
   4047	__ceph_caps_issued(ci, &issued);
   4048	issued |= __ceph_caps_dirty(ci);
   4049
   4050	ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq,
   4051		     realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
   4052
   4053	ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
   4054	if (ocap && ocap->cap_id == p_cap_id) {
   4055		dout(" remove export cap %p mds%d flags %d\n",
   4056		     ocap, peer, ph->flags);
   4057		if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
   4058		    (ocap->seq != le32_to_cpu(ph->seq) ||
   4059		     ocap->mseq != le32_to_cpu(ph->mseq))) {
   4060			pr_err_ratelimited("handle_cap_import: "
   4061					"mismatched seq/mseq: ino (%llx.%llx) "
   4062					"mds%d seq %d mseq %d importer mds%d "
   4063					"has peer seq %d mseq %d\n",
   4064					ceph_vinop(inode), peer, ocap->seq,
   4065					ocap->mseq, mds, le32_to_cpu(ph->seq),
   4066					le32_to_cpu(ph->mseq));
   4067		}
   4068		ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
   4069	}
   4070
   4071	*old_issued = issued;
   4072	*target_cap = cap;
   4073}
   4074
   4075/*
   4076 * Handle a caps message from the MDS.
   4077 *
   4078 * Identify the appropriate session, inode, and call the right handler
   4079 * based on the cap op.
   4080 */
   4081void ceph_handle_caps(struct ceph_mds_session *session,
   4082		      struct ceph_msg *msg)
   4083{
   4084	struct ceph_mds_client *mdsc = session->s_mdsc;
   4085	struct inode *inode;
   4086	struct ceph_inode_info *ci;
   4087	struct ceph_cap *cap;
   4088	struct ceph_mds_caps *h;
   4089	struct ceph_mds_cap_peer *peer = NULL;
   4090	struct ceph_snap_realm *realm = NULL;
   4091	int op;
   4092	int msg_version = le16_to_cpu(msg->hdr.version);
   4093	u32 seq, mseq;
   4094	struct ceph_vino vino;
   4095	void *snaptrace;
   4096	size_t snaptrace_len;
   4097	void *p, *end;
   4098	struct cap_extra_info extra_info = {};
   4099	bool queue_trunc;
   4100
   4101	dout("handle_caps from mds%d\n", session->s_mds);
   4102
   4103	/* decode */
   4104	end = msg->front.iov_base + msg->front.iov_len;
   4105	if (msg->front.iov_len < sizeof(*h))
   4106		goto bad;
   4107	h = msg->front.iov_base;
   4108	op = le32_to_cpu(h->op);
   4109	vino.ino = le64_to_cpu(h->ino);
   4110	vino.snap = CEPH_NOSNAP;
   4111	seq = le32_to_cpu(h->seq);
   4112	mseq = le32_to_cpu(h->migrate_seq);
   4113
   4114	snaptrace = h + 1;
   4115	snaptrace_len = le32_to_cpu(h->snap_trace_len);
   4116	p = snaptrace + snaptrace_len;
   4117
   4118	if (msg_version >= 2) {
   4119		u32 flock_len;
   4120		ceph_decode_32_safe(&p, end, flock_len, bad);
   4121		if (p + flock_len > end)
   4122			goto bad;
   4123		p += flock_len;
   4124	}
   4125
   4126	if (msg_version >= 3) {
   4127		if (op == CEPH_CAP_OP_IMPORT) {
   4128			if (p + sizeof(*peer) > end)
   4129				goto bad;
   4130			peer = p;
   4131			p += sizeof(*peer);
   4132		} else if (op == CEPH_CAP_OP_EXPORT) {
   4133			/* recorded in unused fields */
   4134			peer = (void *)&h->size;
   4135		}
   4136	}
   4137
   4138	if (msg_version >= 4) {
   4139		ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
   4140		ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
   4141		if (p + extra_info.inline_len > end)
   4142			goto bad;
   4143		extra_info.inline_data = p;
   4144		p += extra_info.inline_len;
   4145	}
   4146
   4147	if (msg_version >= 5) {
   4148		struct ceph_osd_client	*osdc = &mdsc->fsc->client->osdc;
   4149		u32			epoch_barrier;
   4150
   4151		ceph_decode_32_safe(&p, end, epoch_barrier, bad);
   4152		ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
   4153	}
   4154
   4155	if (msg_version >= 8) {
   4156		u32 pool_ns_len;
   4157
   4158		/* version >= 6 */
   4159		ceph_decode_skip_64(&p, end, bad);	// flush_tid
   4160		/* version >= 7 */
   4161		ceph_decode_skip_32(&p, end, bad);	// caller_uid
   4162		ceph_decode_skip_32(&p, end, bad);	// caller_gid
   4163		/* version >= 8 */
   4164		ceph_decode_32_safe(&p, end, pool_ns_len, bad);
   4165		if (pool_ns_len > 0) {
   4166			ceph_decode_need(&p, end, pool_ns_len, bad);
   4167			extra_info.pool_ns =
   4168				ceph_find_or_create_string(p, pool_ns_len);
   4169			p += pool_ns_len;
   4170		}
   4171	}
   4172
   4173	if (msg_version >= 9) {
   4174		struct ceph_timespec *btime;
   4175
   4176		if (p + sizeof(*btime) > end)
   4177			goto bad;
   4178		btime = p;
   4179		ceph_decode_timespec64(&extra_info.btime, btime);
   4180		p += sizeof(*btime);
   4181		ceph_decode_64_safe(&p, end, extra_info.change_attr, bad);
   4182	}
   4183
   4184	if (msg_version >= 11) {
   4185		/* version >= 10 */
   4186		ceph_decode_skip_32(&p, end, bad); // flags
   4187		/* version >= 11 */
   4188		extra_info.dirstat_valid = true;
   4189		ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
   4190		ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
   4191	}
   4192
   4193	/* lookup ino */
   4194	inode = ceph_find_inode(mdsc->fsc->sb, vino);
   4195	dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
   4196	     vino.snap, inode);
   4197
   4198	mutex_lock(&session->s_mutex);
   4199	inc_session_sequence(session);
   4200	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
   4201	     (unsigned)seq);
   4202
   4203	if (!inode) {
   4204		dout(" i don't have ino %llx\n", vino.ino);
   4205
   4206		if (op == CEPH_CAP_OP_IMPORT) {
   4207			cap = ceph_get_cap(mdsc, NULL);
   4208			cap->cap_ino = vino.ino;
   4209			cap->queue_release = 1;
   4210			cap->cap_id = le64_to_cpu(h->cap_id);
   4211			cap->mseq = mseq;
   4212			cap->seq = seq;
   4213			cap->issue_seq = seq;
   4214			spin_lock(&session->s_cap_lock);
   4215			__ceph_queue_cap_release(session, cap);
   4216			spin_unlock(&session->s_cap_lock);
   4217		}
   4218		goto flush_cap_releases;
   4219	}
   4220	ci = ceph_inode(inode);
   4221
   4222	/* these will work even if we don't have a cap yet */
   4223	switch (op) {
   4224	case CEPH_CAP_OP_FLUSHSNAP_ACK:
   4225		handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
   4226					 h, session);
   4227		goto done;
   4228
   4229	case CEPH_CAP_OP_EXPORT:
   4230		handle_cap_export(inode, h, peer, session);
   4231		goto done_unlocked;
   4232
   4233	case CEPH_CAP_OP_IMPORT:
   4234		realm = NULL;
   4235		if (snaptrace_len) {
   4236			down_write(&mdsc->snap_rwsem);
   4237			ceph_update_snap_trace(mdsc, snaptrace,
   4238					       snaptrace + snaptrace_len,
   4239					       false, &realm);
   4240			downgrade_write(&mdsc->snap_rwsem);
   4241		} else {
   4242			down_read(&mdsc->snap_rwsem);
   4243		}
   4244		spin_lock(&ci->i_ceph_lock);
   4245		handle_cap_import(mdsc, inode, h, peer, session,
   4246				  &cap, &extra_info.issued);
   4247		handle_cap_grant(inode, session, cap,
   4248				 h, msg->middle, &extra_info);
   4249		if (realm)
   4250			ceph_put_snap_realm(mdsc, realm);
   4251		goto done_unlocked;
   4252	}
   4253
   4254	/* the rest require a cap */
   4255	spin_lock(&ci->i_ceph_lock);
   4256	cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
   4257	if (!cap) {
   4258		dout(" no cap on %p ino %llx.%llx from mds%d\n",
   4259		     inode, ceph_ino(inode), ceph_snap(inode),
   4260		     session->s_mds);
   4261		spin_unlock(&ci->i_ceph_lock);
   4262		goto flush_cap_releases;
   4263	}
   4264
   4265	/* note that each of these drops i_ceph_lock for us */
   4266	switch (op) {
   4267	case CEPH_CAP_OP_REVOKE:
   4268	case CEPH_CAP_OP_GRANT:
   4269		__ceph_caps_issued(ci, &extra_info.issued);
   4270		extra_info.issued |= __ceph_caps_dirty(ci);
   4271		handle_cap_grant(inode, session, cap,
   4272				 h, msg->middle, &extra_info);
   4273		goto done_unlocked;
   4274
   4275	case CEPH_CAP_OP_FLUSH_ACK:
   4276		handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
   4277				     h, session, cap);
   4278		break;
   4279
   4280	case CEPH_CAP_OP_TRUNC:
   4281		queue_trunc = handle_cap_trunc(inode, h, session);
   4282		spin_unlock(&ci->i_ceph_lock);
   4283		if (queue_trunc)
   4284			ceph_queue_vmtruncate(inode);
   4285		break;
   4286
   4287	default:
   4288		spin_unlock(&ci->i_ceph_lock);
   4289		pr_err("ceph_handle_caps: unknown cap op %d %s\n", op,
   4290		       ceph_cap_op_name(op));
   4291	}
   4292
   4293done:
   4294	mutex_unlock(&session->s_mutex);
   4295done_unlocked:
   4296	iput(inode);
   4297out:
   4298	ceph_put_string(extra_info.pool_ns);
   4299	return;
   4300
   4301flush_cap_releases:
   4302	/*
   4303	 * send any cap release message to try to move things
   4304	 * along for the mds (who clearly thinks we still have this
   4305	 * cap).
   4306	 */
   4307	ceph_flush_cap_releases(mdsc, session);
   4308	goto done;
   4309
   4310bad:
   4311	pr_err("ceph_handle_caps: corrupt message\n");
   4312	ceph_msg_dump(msg);
   4313	goto out;
   4314}
   4315
   4316/*
   4317 * Delayed work handler to process end of delayed cap release LRU list.
   4318 *
   4319 * If new caps are added to the list while processing it, these won't get
   4320 * processed in this run.  In this case, the ci->i_hold_caps_max will be
   4321 * returned so that the work can be scheduled accordingly.
   4322 */
   4323unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
   4324{
   4325	struct inode *inode;
   4326	struct ceph_inode_info *ci;
   4327	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
   4328	unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
   4329	unsigned long loop_start = jiffies;
   4330	unsigned long delay = 0;
   4331
   4332	dout("check_delayed_caps\n");
   4333	spin_lock(&mdsc->cap_delay_lock);
   4334	while (!list_empty(&mdsc->cap_delay_list)) {
   4335		ci = list_first_entry(&mdsc->cap_delay_list,
   4336				      struct ceph_inode_info,
   4337				      i_cap_delay_list);
   4338		if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
   4339			dout("%s caps added recently.  Exiting loop", __func__);
   4340			delay = ci->i_hold_caps_max;
   4341			break;
   4342		}
   4343		if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
   4344		    time_before(jiffies, ci->i_hold_caps_max))
   4345			break;
   4346		list_del_init(&ci->i_cap_delay_list);
   4347
   4348		inode = igrab(&ci->netfs.inode);
   4349		if (inode) {
   4350			spin_unlock(&mdsc->cap_delay_lock);
   4351			dout("check_delayed_caps on %p\n", inode);
   4352			ceph_check_caps(ci, 0, NULL);
   4353			iput(inode);
   4354			spin_lock(&mdsc->cap_delay_lock);
   4355		}
   4356	}
   4357	spin_unlock(&mdsc->cap_delay_lock);
   4358
   4359	return delay;
   4360}
   4361
   4362/*
   4363 * Flush all dirty caps to the mds
   4364 */
   4365static void flush_dirty_session_caps(struct ceph_mds_session *s)
   4366{
   4367	struct ceph_mds_client *mdsc = s->s_mdsc;
   4368	struct ceph_inode_info *ci;
   4369	struct inode *inode;
   4370
   4371	dout("flush_dirty_caps\n");
   4372	spin_lock(&mdsc->cap_dirty_lock);
   4373	while (!list_empty(&s->s_cap_dirty)) {
   4374		ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
   4375				      i_dirty_item);
   4376		inode = &ci->netfs.inode;
   4377		ihold(inode);
   4378		dout("flush_dirty_caps %llx.%llx\n", ceph_vinop(inode));
   4379		spin_unlock(&mdsc->cap_dirty_lock);
   4380		ceph_wait_on_async_create(inode);
   4381		ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
   4382		iput(inode);
   4383		spin_lock(&mdsc->cap_dirty_lock);
   4384	}
   4385	spin_unlock(&mdsc->cap_dirty_lock);
   4386	dout("flush_dirty_caps done\n");
   4387}
   4388
   4389void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
   4390{
   4391	ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true);
   4392}
   4393
   4394void __ceph_touch_fmode(struct ceph_inode_info *ci,
   4395			struct ceph_mds_client *mdsc, int fmode)
   4396{
   4397	unsigned long now = jiffies;
   4398	if (fmode & CEPH_FILE_MODE_RD)
   4399		ci->i_last_rd = now;
   4400	if (fmode & CEPH_FILE_MODE_WR)
   4401		ci->i_last_wr = now;
   4402	/* queue periodic check */
   4403	if (fmode &&
   4404	    __ceph_is_any_real_caps(ci) &&
   4405	    list_empty(&ci->i_cap_delay_list))
   4406		__cap_delay_requeue(mdsc, ci);
   4407}
   4408
   4409void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
   4410{
   4411	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb);
   4412	int bits = (fmode << 1) | 1;
   4413	bool already_opened = false;
   4414	int i;
   4415
   4416	if (count == 1)
   4417		atomic64_inc(&mdsc->metric.opened_files);
   4418
   4419	spin_lock(&ci->i_ceph_lock);
   4420	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
   4421		/*
   4422		 * If any of the mode ref is larger than 0,
   4423		 * that means it has been already opened by
   4424		 * others. Just skip checking the PIN ref.
   4425		 */
   4426		if (i && ci->i_nr_by_mode[i])
   4427			already_opened = true;
   4428
   4429		if (bits & (1 << i))
   4430			ci->i_nr_by_mode[i] += count;
   4431	}
   4432
   4433	if (!already_opened)
   4434		percpu_counter_inc(&mdsc->metric.opened_inodes);
   4435	spin_unlock(&ci->i_ceph_lock);
   4436}
   4437
   4438/*
   4439 * Drop open file reference.  If we were the last open file,
   4440 * we may need to release capabilities to the MDS (or schedule
   4441 * their delayed release).
   4442 */
   4443void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
   4444{
   4445	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb);
   4446	int bits = (fmode << 1) | 1;
   4447	bool is_closed = true;
   4448	int i;
   4449
   4450	if (count == 1)
   4451		atomic64_dec(&mdsc->metric.opened_files);
   4452
   4453	spin_lock(&ci->i_ceph_lock);
   4454	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
   4455		if (bits & (1 << i)) {
   4456			BUG_ON(ci->i_nr_by_mode[i] < count);
   4457			ci->i_nr_by_mode[i] -= count;
   4458		}
   4459
   4460		/*
   4461		 * If any of the mode ref is not 0 after
   4462		 * decreased, that means it is still opened
   4463		 * by others. Just skip checking the PIN ref.
   4464		 */
   4465		if (i && ci->i_nr_by_mode[i])
   4466			is_closed = false;
   4467	}
   4468
   4469	if (is_closed)
   4470		percpu_counter_dec(&mdsc->metric.opened_inodes);
   4471	spin_unlock(&ci->i_ceph_lock);
   4472}
   4473
   4474/*
   4475 * For a soon-to-be unlinked file, drop the LINK caps. If it
   4476 * looks like the link count will hit 0, drop any other caps (other
   4477 * than PIN) we don't specifically want (due to the file still being
   4478 * open).
   4479 */
   4480int ceph_drop_caps_for_unlink(struct inode *inode)
   4481{
   4482	struct ceph_inode_info *ci = ceph_inode(inode);
   4483	int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
   4484
   4485	spin_lock(&ci->i_ceph_lock);
   4486	if (inode->i_nlink == 1) {
   4487		drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
   4488
   4489		if (__ceph_caps_dirty(ci)) {
   4490			struct ceph_mds_client *mdsc =
   4491				ceph_inode_to_client(inode)->mdsc;
   4492			__cap_delay_requeue_front(mdsc, ci);
   4493		}
   4494	}
   4495	spin_unlock(&ci->i_ceph_lock);
   4496	return drop;
   4497}
   4498
   4499/*
   4500 * Helpers for embedding cap and dentry lease releases into mds
   4501 * requests.
   4502 *
   4503 * @force is used by dentry_release (below) to force inclusion of a
   4504 * record for the directory inode, even when there aren't any caps to
   4505 * drop.
   4506 */
   4507int ceph_encode_inode_release(void **p, struct inode *inode,
   4508			      int mds, int drop, int unless, int force)
   4509{
   4510	struct ceph_inode_info *ci = ceph_inode(inode);
   4511	struct ceph_cap *cap;
   4512	struct ceph_mds_request_release *rel = *p;
   4513	int used, dirty;
   4514	int ret = 0;
   4515
   4516	spin_lock(&ci->i_ceph_lock);
   4517	used = __ceph_caps_used(ci);
   4518	dirty = __ceph_caps_dirty(ci);
   4519
   4520	dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
   4521	     inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
   4522	     ceph_cap_string(unless));
   4523
   4524	/* only drop unused, clean caps */
   4525	drop &= ~(used | dirty);
   4526
   4527	cap = __get_cap_for_mds(ci, mds);
   4528	if (cap && __cap_is_valid(cap)) {
   4529		unless &= cap->issued;
   4530		if (unless) {
   4531			if (unless & CEPH_CAP_AUTH_EXCL)
   4532				drop &= ~CEPH_CAP_AUTH_SHARED;
   4533			if (unless & CEPH_CAP_LINK_EXCL)
   4534				drop &= ~CEPH_CAP_LINK_SHARED;
   4535			if (unless & CEPH_CAP_XATTR_EXCL)
   4536				drop &= ~CEPH_CAP_XATTR_SHARED;
   4537			if (unless & CEPH_CAP_FILE_EXCL)
   4538				drop &= ~CEPH_CAP_FILE_SHARED;
   4539		}
   4540
   4541		if (force || (cap->issued & drop)) {
   4542			if (cap->issued & drop) {
   4543				int wanted = __ceph_caps_wanted(ci);
   4544				dout("encode_inode_release %p cap %p "
   4545				     "%s -> %s, wanted %s -> %s\n", inode, cap,
   4546				     ceph_cap_string(cap->issued),
   4547				     ceph_cap_string(cap->issued & ~drop),
   4548				     ceph_cap_string(cap->mds_wanted),
   4549				     ceph_cap_string(wanted));
   4550
   4551				cap->issued &= ~drop;
   4552				cap->implemented &= ~drop;
   4553				cap->mds_wanted = wanted;
   4554				if (cap == ci->i_auth_cap &&
   4555				    !(wanted & CEPH_CAP_ANY_FILE_WR))
   4556					ci->i_requested_max_size = 0;
   4557			} else {
   4558				dout("encode_inode_release %p cap %p %s"
   4559				     " (force)\n", inode, cap,
   4560				     ceph_cap_string(cap->issued));
   4561			}
   4562
   4563			rel->ino = cpu_to_le64(ceph_ino(inode));
   4564			rel->cap_id = cpu_to_le64(cap->cap_id);
   4565			rel->seq = cpu_to_le32(cap->seq);
   4566			rel->issue_seq = cpu_to_le32(cap->issue_seq);
   4567			rel->mseq = cpu_to_le32(cap->mseq);
   4568			rel->caps = cpu_to_le32(cap->implemented);
   4569			rel->wanted = cpu_to_le32(cap->mds_wanted);
   4570			rel->dname_len = 0;
   4571			rel->dname_seq = 0;
   4572			*p += sizeof(*rel);
   4573			ret = 1;
   4574		} else {
   4575			dout("encode_inode_release %p cap %p %s (noop)\n",
   4576			     inode, cap, ceph_cap_string(cap->issued));
   4577		}
   4578	}
   4579	spin_unlock(&ci->i_ceph_lock);
   4580	return ret;
   4581}
   4582
   4583int ceph_encode_dentry_release(void **p, struct dentry *dentry,
   4584			       struct inode *dir,
   4585			       int mds, int drop, int unless)
   4586{
   4587	struct dentry *parent = NULL;
   4588	struct ceph_mds_request_release *rel = *p;
   4589	struct ceph_dentry_info *di = ceph_dentry(dentry);
   4590	int force = 0;
   4591	int ret;
   4592
   4593	/*
   4594	 * force an record for the directory caps if we have a dentry lease.
   4595	 * this is racy (can't take i_ceph_lock and d_lock together), but it
   4596	 * doesn't have to be perfect; the mds will revoke anything we don't
   4597	 * release.
   4598	 */
   4599	spin_lock(&dentry->d_lock);
   4600	if (di->lease_session && di->lease_session->s_mds == mds)
   4601		force = 1;
   4602	if (!dir) {
   4603		parent = dget(dentry->d_parent);
   4604		dir = d_inode(parent);
   4605	}
   4606	spin_unlock(&dentry->d_lock);
   4607
   4608	ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
   4609	dput(parent);
   4610
   4611	spin_lock(&dentry->d_lock);
   4612	if (ret && di->lease_session && di->lease_session->s_mds == mds) {
   4613		dout("encode_dentry_release %p mds%d seq %d\n",
   4614		     dentry, mds, (int)di->lease_seq);
   4615		rel->dname_len = cpu_to_le32(dentry->d_name.len);
   4616		memcpy(*p, dentry->d_name.name, dentry->d_name.len);
   4617		*p += dentry->d_name.len;
   4618		rel->dname_seq = cpu_to_le32(di->lease_seq);
   4619		__ceph_mdsc_drop_dentry_lease(dentry);
   4620	}
   4621	spin_unlock(&dentry->d_lock);
   4622	return ret;
   4623}
   4624
   4625static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
   4626{
   4627	struct ceph_inode_info *ci = ceph_inode(inode);
   4628	struct ceph_cap_snap *capsnap;
   4629	int capsnap_release = 0;
   4630
   4631	lockdep_assert_held(&ci->i_ceph_lock);
   4632
   4633	dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
   4634
   4635	while (!list_empty(&ci->i_cap_snaps)) {
   4636		capsnap = list_first_entry(&ci->i_cap_snaps,
   4637					   struct ceph_cap_snap, ci_item);
   4638		__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
   4639		ceph_put_snap_context(capsnap->context);
   4640		ceph_put_cap_snap(capsnap);
   4641		capsnap_release++;
   4642	}
   4643	wake_up_all(&ci->i_cap_wq);
   4644	wake_up_all(&mdsc->cap_flushing_wq);
   4645	return capsnap_release;
   4646}
   4647
   4648int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate)
   4649{
   4650	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
   4651	struct ceph_mds_client *mdsc = fsc->mdsc;
   4652	struct ceph_inode_info *ci = ceph_inode(inode);
   4653	bool is_auth;
   4654	bool dirty_dropped = false;
   4655	int iputs = 0;
   4656
   4657	lockdep_assert_held(&ci->i_ceph_lock);
   4658
   4659	dout("removing cap %p, ci is %p, inode is %p\n",
   4660	     cap, ci, &ci->netfs.inode);
   4661
   4662	is_auth = (cap == ci->i_auth_cap);
   4663	__ceph_remove_cap(cap, false);
   4664	if (is_auth) {
   4665		struct ceph_cap_flush *cf;
   4666
   4667		if (ceph_inode_is_shutdown(inode)) {
   4668			if (inode->i_data.nrpages > 0)
   4669				*invalidate = true;
   4670			if (ci->i_wrbuffer_ref > 0)
   4671				mapping_set_error(&inode->i_data, -EIO);
   4672		}
   4673
   4674		spin_lock(&mdsc->cap_dirty_lock);
   4675
   4676		/* trash all of the cap flushes for this inode */
   4677		while (!list_empty(&ci->i_cap_flush_list)) {
   4678			cf = list_first_entry(&ci->i_cap_flush_list,
   4679					      struct ceph_cap_flush, i_list);
   4680			list_del_init(&cf->g_list);
   4681			list_del_init(&cf->i_list);
   4682			if (!cf->is_capsnap)
   4683				ceph_free_cap_flush(cf);
   4684		}
   4685
   4686		if (!list_empty(&ci->i_dirty_item)) {
   4687			pr_warn_ratelimited(
   4688				" dropping dirty %s state for %p %lld\n",
   4689				ceph_cap_string(ci->i_dirty_caps),
   4690				inode, ceph_ino(inode));
   4691			ci->i_dirty_caps = 0;
   4692			list_del_init(&ci->i_dirty_item);
   4693			dirty_dropped = true;
   4694		}
   4695		if (!list_empty(&ci->i_flushing_item)) {
   4696			pr_warn_ratelimited(
   4697				" dropping dirty+flushing %s state for %p %lld\n",
   4698				ceph_cap_string(ci->i_flushing_caps),
   4699				inode, ceph_ino(inode));
   4700			ci->i_flushing_caps = 0;
   4701			list_del_init(&ci->i_flushing_item);
   4702			mdsc->num_cap_flushing--;
   4703			dirty_dropped = true;
   4704		}
   4705		spin_unlock(&mdsc->cap_dirty_lock);
   4706
   4707		if (dirty_dropped) {
   4708			mapping_set_error(inode->i_mapping, -EIO);
   4709
   4710			if (ci->i_wrbuffer_ref_head == 0 &&
   4711			    ci->i_wr_ref == 0 &&
   4712			    ci->i_dirty_caps == 0 &&
   4713			    ci->i_flushing_caps == 0) {
   4714				ceph_put_snap_context(ci->i_head_snapc);
   4715				ci->i_head_snapc = NULL;
   4716			}
   4717		}
   4718
   4719		if (atomic_read(&ci->i_filelock_ref) > 0) {
   4720			/* make further file lock syscall return -EIO */
   4721			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
   4722			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
   4723					    inode, ceph_ino(inode));
   4724		}
   4725
   4726		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
   4727			cf = ci->i_prealloc_cap_flush;
   4728			ci->i_prealloc_cap_flush = NULL;
   4729			if (!cf->is_capsnap)
   4730				ceph_free_cap_flush(cf);
   4731		}
   4732
   4733		if (!list_empty(&ci->i_cap_snaps))
   4734			iputs = remove_capsnaps(mdsc, inode);
   4735	}
   4736	if (dirty_dropped)
   4737		++iputs;
   4738	return iputs;
   4739}