cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

mon_client.c (39150B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/module.h>
      5#include <linux/types.h>
      6#include <linux/slab.h>
      7#include <linux/random.h>
      8#include <linux/sched.h>
      9
     10#include <linux/ceph/ceph_features.h>
     11#include <linux/ceph/mon_client.h>
     12#include <linux/ceph/libceph.h>
     13#include <linux/ceph/debugfs.h>
     14#include <linux/ceph/decode.h>
     15#include <linux/ceph/auth.h>
     16
     17/*
     18 * Interact with Ceph monitor cluster.  Handle requests for new map
     19 * versions, and periodically resend as needed.  Also implement
     20 * statfs() and umount().
     21 *
     22 * A small cluster of Ceph "monitors" are responsible for managing critical
     23 * cluster configuration and state information.  An odd number (e.g., 3, 5)
     24 * of cmon daemons use a modified version of the Paxos part-time parliament
     25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
     26 * list of clients who have mounted the file system.
     27 *
     28 * We maintain an open, active session with a monitor at all times in order to
     29 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
     30 * TCP socket to ensure we detect a failure.  If the connection does break, we
     31 * randomly hunt for a new monitor.  Once the connection is reestablished, we
     32 * resend any outstanding requests.
     33 */
     34
     35static const struct ceph_connection_operations mon_con_ops;
     36
     37static int __validate_auth(struct ceph_mon_client *monc);
     38
     39static int decode_mon_info(void **p, void *end, bool msgr2,
     40			   struct ceph_entity_addr *addr)
     41{
     42	void *mon_info_end;
     43	u32 struct_len;
     44	u8 struct_v;
     45	int ret;
     46
     47	ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
     48				  &struct_len);
     49	if (ret)
     50		return ret;
     51
     52	mon_info_end = *p + struct_len;
     53	ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
     54	ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
     55	if (ret)
     56		return ret;
     57
     58	*p = mon_info_end;
     59	return 0;
     60
     61e_inval:
     62	return -EINVAL;
     63}
     64
     65/*
     66 * Decode a monmap blob (e.g., during mount).
     67 *
     68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
     69 */
     70static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
     71{
     72	struct ceph_monmap *monmap = NULL;
     73	struct ceph_fsid fsid;
     74	u32 struct_len;
     75	int blob_len;
     76	int num_mon;
     77	u8 struct_v;
     78	u32 epoch;
     79	int ret;
     80	int i;
     81
     82	ceph_decode_32_safe(p, end, blob_len, e_inval);
     83	ceph_decode_need(p, end, blob_len, e_inval);
     84
     85	ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
     86	if (ret)
     87		goto fail;
     88
     89	dout("%s struct_v %d\n", __func__, struct_v);
     90	ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
     91	ceph_decode_32_safe(p, end, epoch, e_inval);
     92	if (struct_v >= 6) {
     93		u32 feat_struct_len;
     94		u8 feat_struct_v;
     95
     96		*p += sizeof(struct ceph_timespec);  /* skip last_changed */
     97		*p += sizeof(struct ceph_timespec);  /* skip created */
     98
     99		ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
    100					  &feat_struct_v, &feat_struct_len);
    101		if (ret)
    102			goto fail;
    103
    104		*p += feat_struct_len;  /* skip persistent_features */
    105
    106		ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
    107					  &feat_struct_v, &feat_struct_len);
    108		if (ret)
    109			goto fail;
    110
    111		*p += feat_struct_len;  /* skip optional_features */
    112	}
    113	ceph_decode_32_safe(p, end, num_mon, e_inval);
    114
    115	dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
    116	     num_mon);
    117	if (num_mon > CEPH_MAX_MON)
    118		goto e_inval;
    119
    120	monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
    121	if (!monmap) {
    122		ret = -ENOMEM;
    123		goto fail;
    124	}
    125	monmap->fsid = fsid;
    126	monmap->epoch = epoch;
    127	monmap->num_mon = num_mon;
    128
    129	/* legacy_mon_addr map or mon_info map */
    130	for (i = 0; i < num_mon; i++) {
    131		struct ceph_entity_inst *inst = &monmap->mon_inst[i];
    132
    133		ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
    134		inst->name.type = CEPH_ENTITY_TYPE_MON;
    135		inst->name.num = cpu_to_le64(i);
    136
    137		if (struct_v >= 6)
    138			ret = decode_mon_info(p, end, msgr2, &inst->addr);
    139		else
    140			ret = ceph_decode_entity_addr(p, end, &inst->addr);
    141		if (ret)
    142			goto fail;
    143
    144		dout("%s mon%d addr %s\n", __func__, i,
    145		     ceph_pr_addr(&inst->addr));
    146	}
    147
    148	return monmap;
    149
    150e_inval:
    151	ret = -EINVAL;
    152fail:
    153	kfree(monmap);
    154	return ERR_PTR(ret);
    155}
    156
    157/*
    158 * return true if *addr is included in the monmap.
    159 */
    160int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
    161{
    162	int i;
    163
    164	for (i = 0; i < m->num_mon; i++) {
    165		if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
    166			return 1;
    167	}
    168
    169	return 0;
    170}
    171
    172/*
    173 * Send an auth request.
    174 */
    175static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
    176{
    177	monc->pending_auth = 1;
    178	monc->m_auth->front.iov_len = len;
    179	monc->m_auth->hdr.front_len = cpu_to_le32(len);
    180	ceph_msg_revoke(monc->m_auth);
    181	ceph_msg_get(monc->m_auth);  /* keep our ref */
    182	ceph_con_send(&monc->con, monc->m_auth);
    183}
    184
    185/*
    186 * Close monitor session, if any.
    187 */
    188static void __close_session(struct ceph_mon_client *monc)
    189{
    190	dout("__close_session closing mon%d\n", monc->cur_mon);
    191	ceph_msg_revoke(monc->m_auth);
    192	ceph_msg_revoke_incoming(monc->m_auth_reply);
    193	ceph_msg_revoke(monc->m_subscribe);
    194	ceph_msg_revoke_incoming(monc->m_subscribe_ack);
    195	ceph_con_close(&monc->con);
    196
    197	monc->pending_auth = 0;
    198	ceph_auth_reset(monc->auth);
    199}
    200
    201/*
    202 * Pick a new monitor at random and set cur_mon.  If we are repicking
    203 * (i.e. cur_mon is already set), be sure to pick a different one.
    204 */
    205static void pick_new_mon(struct ceph_mon_client *monc)
    206{
    207	int old_mon = monc->cur_mon;
    208
    209	BUG_ON(monc->monmap->num_mon < 1);
    210
    211	if (monc->monmap->num_mon == 1) {
    212		monc->cur_mon = 0;
    213	} else {
    214		int max = monc->monmap->num_mon;
    215		int o = -1;
    216		int n;
    217
    218		if (monc->cur_mon >= 0) {
    219			if (monc->cur_mon < monc->monmap->num_mon)
    220				o = monc->cur_mon;
    221			if (o >= 0)
    222				max--;
    223		}
    224
    225		n = prandom_u32() % max;
    226		if (o >= 0 && n >= o)
    227			n++;
    228
    229		monc->cur_mon = n;
    230	}
    231
    232	dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
    233	     monc->cur_mon, monc->monmap->num_mon);
    234}
    235
    236/*
    237 * Open a session with a new monitor.
    238 */
    239static void __open_session(struct ceph_mon_client *monc)
    240{
    241	int ret;
    242
    243	pick_new_mon(monc);
    244
    245	monc->hunting = true;
    246	if (monc->had_a_connection) {
    247		monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
    248		if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
    249			monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
    250	}
    251
    252	monc->sub_renew_after = jiffies; /* i.e., expired */
    253	monc->sub_renew_sent = 0;
    254
    255	dout("%s opening mon%d\n", __func__, monc->cur_mon);
    256	ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
    257		      &monc->monmap->mon_inst[monc->cur_mon].addr);
    258
    259	/*
    260	 * Queue a keepalive to ensure that in case of an early fault
    261	 * the messenger doesn't put us into STANDBY state and instead
    262	 * retries.  This also ensures that our timestamp is valid by
    263	 * the time we finish hunting and delayed_work() checks it.
    264	 */
    265	ceph_con_keepalive(&monc->con);
    266	if (ceph_msgr2(monc->client)) {
    267		monc->pending_auth = 1;
    268		return;
    269	}
    270
    271	/* initiate authentication handshake */
    272	ret = ceph_auth_build_hello(monc->auth,
    273				    monc->m_auth->front.iov_base,
    274				    monc->m_auth->front_alloc_len);
    275	BUG_ON(ret <= 0);
    276	__send_prepared_auth_request(monc, ret);
    277}
    278
    279static void reopen_session(struct ceph_mon_client *monc)
    280{
    281	if (!monc->hunting)
    282		pr_info("mon%d %s session lost, hunting for new mon\n",
    283		    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
    284
    285	__close_session(monc);
    286	__open_session(monc);
    287}
    288
    289void ceph_monc_reopen_session(struct ceph_mon_client *monc)
    290{
    291	mutex_lock(&monc->mutex);
    292	reopen_session(monc);
    293	mutex_unlock(&monc->mutex);
    294}
    295
    296static void un_backoff(struct ceph_mon_client *monc)
    297{
    298	monc->hunt_mult /= 2; /* reduce by 50% */
    299	if (monc->hunt_mult < 1)
    300		monc->hunt_mult = 1;
    301	dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
    302}
    303
    304/*
    305 * Reschedule delayed work timer.
    306 */
    307static void __schedule_delayed(struct ceph_mon_client *monc)
    308{
    309	unsigned long delay;
    310
    311	if (monc->hunting)
    312		delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
    313	else
    314		delay = CEPH_MONC_PING_INTERVAL;
    315
    316	dout("__schedule_delayed after %lu\n", delay);
    317	mod_delayed_work(system_wq, &monc->delayed_work,
    318			 round_jiffies_relative(delay));
    319}
    320
    321const char *ceph_sub_str[] = {
    322	[CEPH_SUB_MONMAP] = "monmap",
    323	[CEPH_SUB_OSDMAP] = "osdmap",
    324	[CEPH_SUB_FSMAP]  = "fsmap.user",
    325	[CEPH_SUB_MDSMAP] = "mdsmap",
    326};
    327
    328/*
    329 * Send subscribe request for one or more maps, according to
    330 * monc->subs.
    331 */
    332static void __send_subscribe(struct ceph_mon_client *monc)
    333{
    334	struct ceph_msg *msg = monc->m_subscribe;
    335	void *p = msg->front.iov_base;
    336	void *const end = p + msg->front_alloc_len;
    337	int num = 0;
    338	int i;
    339
    340	dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
    341
    342	BUG_ON(monc->cur_mon < 0);
    343
    344	if (!monc->sub_renew_sent)
    345		monc->sub_renew_sent = jiffies | 1; /* never 0 */
    346
    347	msg->hdr.version = cpu_to_le16(2);
    348
    349	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
    350		if (monc->subs[i].want)
    351			num++;
    352	}
    353	BUG_ON(num < 1); /* monmap sub is always there */
    354	ceph_encode_32(&p, num);
    355	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
    356		char buf[32];
    357		int len;
    358
    359		if (!monc->subs[i].want)
    360			continue;
    361
    362		len = sprintf(buf, "%s", ceph_sub_str[i]);
    363		if (i == CEPH_SUB_MDSMAP &&
    364		    monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
    365			len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
    366
    367		dout("%s %s start %llu flags 0x%x\n", __func__, buf,
    368		     le64_to_cpu(monc->subs[i].item.start),
    369		     monc->subs[i].item.flags);
    370		ceph_encode_string(&p, end, buf, len);
    371		memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
    372		p += sizeof(monc->subs[i].item);
    373	}
    374
    375	BUG_ON(p > end);
    376	msg->front.iov_len = p - msg->front.iov_base;
    377	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
    378	ceph_msg_revoke(msg);
    379	ceph_con_send(&monc->con, ceph_msg_get(msg));
    380}
    381
    382static void handle_subscribe_ack(struct ceph_mon_client *monc,
    383				 struct ceph_msg *msg)
    384{
    385	unsigned int seconds;
    386	struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
    387
    388	if (msg->front.iov_len < sizeof(*h))
    389		goto bad;
    390	seconds = le32_to_cpu(h->duration);
    391
    392	mutex_lock(&monc->mutex);
    393	if (monc->sub_renew_sent) {
    394		/*
    395		 * This is only needed for legacy (infernalis or older)
    396		 * MONs -- see delayed_work().
    397		 */
    398		monc->sub_renew_after = monc->sub_renew_sent +
    399					    (seconds >> 1) * HZ - 1;
    400		dout("%s sent %lu duration %d renew after %lu\n", __func__,
    401		     monc->sub_renew_sent, seconds, monc->sub_renew_after);
    402		monc->sub_renew_sent = 0;
    403	} else {
    404		dout("%s sent %lu renew after %lu, ignoring\n", __func__,
    405		     monc->sub_renew_sent, monc->sub_renew_after);
    406	}
    407	mutex_unlock(&monc->mutex);
    408	return;
    409bad:
    410	pr_err("got corrupt subscribe-ack msg\n");
    411	ceph_msg_dump(msg);
    412}
    413
    414/*
    415 * Register interest in a map
    416 *
    417 * @sub: one of CEPH_SUB_*
    418 * @epoch: X for "every map since X", or 0 for "just the latest"
    419 */
    420static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
    421				 u32 epoch, bool continuous)
    422{
    423	__le64 start = cpu_to_le64(epoch);
    424	u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
    425
    426	dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
    427	     epoch, continuous);
    428
    429	if (monc->subs[sub].want &&
    430	    monc->subs[sub].item.start == start &&
    431	    monc->subs[sub].item.flags == flags)
    432		return false;
    433
    434	monc->subs[sub].item.start = start;
    435	monc->subs[sub].item.flags = flags;
    436	monc->subs[sub].want = true;
    437
    438	return true;
    439}
    440
    441bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
    442			bool continuous)
    443{
    444	bool need_request;
    445
    446	mutex_lock(&monc->mutex);
    447	need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
    448	mutex_unlock(&monc->mutex);
    449
    450	return need_request;
    451}
    452EXPORT_SYMBOL(ceph_monc_want_map);
    453
    454/*
    455 * Keep track of which maps we have
    456 *
    457 * @sub: one of CEPH_SUB_*
    458 */
    459static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
    460				u32 epoch)
    461{
    462	dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
    463
    464	if (monc->subs[sub].want) {
    465		if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
    466			monc->subs[sub].want = false;
    467		else
    468			monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
    469	}
    470
    471	monc->subs[sub].have = epoch;
    472}
    473
    474void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
    475{
    476	mutex_lock(&monc->mutex);
    477	__ceph_monc_got_map(monc, sub, epoch);
    478	mutex_unlock(&monc->mutex);
    479}
    480EXPORT_SYMBOL(ceph_monc_got_map);
    481
    482void ceph_monc_renew_subs(struct ceph_mon_client *monc)
    483{
    484	mutex_lock(&monc->mutex);
    485	__send_subscribe(monc);
    486	mutex_unlock(&monc->mutex);
    487}
    488EXPORT_SYMBOL(ceph_monc_renew_subs);
    489
    490/*
    491 * Wait for an osdmap with a given epoch.
    492 *
    493 * @epoch: epoch to wait for
    494 * @timeout: in jiffies, 0 means "wait forever"
    495 */
    496int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
    497			  unsigned long timeout)
    498{
    499	unsigned long started = jiffies;
    500	long ret;
    501
    502	mutex_lock(&monc->mutex);
    503	while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
    504		mutex_unlock(&monc->mutex);
    505
    506		if (timeout && time_after_eq(jiffies, started + timeout))
    507			return -ETIMEDOUT;
    508
    509		ret = wait_event_interruptible_timeout(monc->client->auth_wq,
    510				     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
    511				     ceph_timeout_jiffies(timeout));
    512		if (ret < 0)
    513			return ret;
    514
    515		mutex_lock(&monc->mutex);
    516	}
    517
    518	mutex_unlock(&monc->mutex);
    519	return 0;
    520}
    521EXPORT_SYMBOL(ceph_monc_wait_osdmap);
    522
    523/*
    524 * Open a session with a random monitor.  Request monmap and osdmap,
    525 * which are waited upon in __ceph_open_session().
    526 */
    527int ceph_monc_open_session(struct ceph_mon_client *monc)
    528{
    529	mutex_lock(&monc->mutex);
    530	__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
    531	__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
    532	__open_session(monc);
    533	__schedule_delayed(monc);
    534	mutex_unlock(&monc->mutex);
    535	return 0;
    536}
    537EXPORT_SYMBOL(ceph_monc_open_session);
    538
    539static void ceph_monc_handle_map(struct ceph_mon_client *monc,
    540				 struct ceph_msg *msg)
    541{
    542	struct ceph_client *client = monc->client;
    543	struct ceph_monmap *monmap;
    544	void *p, *end;
    545
    546	mutex_lock(&monc->mutex);
    547
    548	dout("handle_monmap\n");
    549	p = msg->front.iov_base;
    550	end = p + msg->front.iov_len;
    551
    552	monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
    553	if (IS_ERR(monmap)) {
    554		pr_err("problem decoding monmap, %d\n",
    555		       (int)PTR_ERR(monmap));
    556		ceph_msg_dump(msg);
    557		goto out;
    558	}
    559
    560	if (ceph_check_fsid(client, &monmap->fsid) < 0) {
    561		kfree(monmap);
    562		goto out;
    563	}
    564
    565	kfree(monc->monmap);
    566	monc->monmap = monmap;
    567
    568	__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
    569	client->have_fsid = true;
    570
    571out:
    572	mutex_unlock(&monc->mutex);
    573	wake_up_all(&client->auth_wq);
    574}
    575
    576/*
    577 * generic requests (currently statfs, mon_get_version)
    578 */
    579DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
    580
    581static void release_generic_request(struct kref *kref)
    582{
    583	struct ceph_mon_generic_request *req =
    584		container_of(kref, struct ceph_mon_generic_request, kref);
    585
    586	dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
    587	     req->reply);
    588	WARN_ON(!RB_EMPTY_NODE(&req->node));
    589
    590	if (req->reply)
    591		ceph_msg_put(req->reply);
    592	if (req->request)
    593		ceph_msg_put(req->request);
    594
    595	kfree(req);
    596}
    597
    598static void put_generic_request(struct ceph_mon_generic_request *req)
    599{
    600	if (req)
    601		kref_put(&req->kref, release_generic_request);
    602}
    603
    604static void get_generic_request(struct ceph_mon_generic_request *req)
    605{
    606	kref_get(&req->kref);
    607}
    608
    609static struct ceph_mon_generic_request *
    610alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
    611{
    612	struct ceph_mon_generic_request *req;
    613
    614	req = kzalloc(sizeof(*req), gfp);
    615	if (!req)
    616		return NULL;
    617
    618	req->monc = monc;
    619	kref_init(&req->kref);
    620	RB_CLEAR_NODE(&req->node);
    621	init_completion(&req->completion);
    622
    623	dout("%s greq %p\n", __func__, req);
    624	return req;
    625}
    626
    627static void register_generic_request(struct ceph_mon_generic_request *req)
    628{
    629	struct ceph_mon_client *monc = req->monc;
    630
    631	WARN_ON(req->tid);
    632
    633	get_generic_request(req);
    634	req->tid = ++monc->last_tid;
    635	insert_generic_request(&monc->generic_request_tree, req);
    636}
    637
    638static void send_generic_request(struct ceph_mon_client *monc,
    639				 struct ceph_mon_generic_request *req)
    640{
    641	WARN_ON(!req->tid);
    642
    643	dout("%s greq %p tid %llu\n", __func__, req, req->tid);
    644	req->request->hdr.tid = cpu_to_le64(req->tid);
    645	ceph_con_send(&monc->con, ceph_msg_get(req->request));
    646}
    647
    648static void __finish_generic_request(struct ceph_mon_generic_request *req)
    649{
    650	struct ceph_mon_client *monc = req->monc;
    651
    652	dout("%s greq %p tid %llu\n", __func__, req, req->tid);
    653	erase_generic_request(&monc->generic_request_tree, req);
    654
    655	ceph_msg_revoke(req->request);
    656	ceph_msg_revoke_incoming(req->reply);
    657}
    658
    659static void finish_generic_request(struct ceph_mon_generic_request *req)
    660{
    661	__finish_generic_request(req);
    662	put_generic_request(req);
    663}
    664
    665static void complete_generic_request(struct ceph_mon_generic_request *req)
    666{
    667	if (req->complete_cb)
    668		req->complete_cb(req);
    669	else
    670		complete_all(&req->completion);
    671	put_generic_request(req);
    672}
    673
    674static void cancel_generic_request(struct ceph_mon_generic_request *req)
    675{
    676	struct ceph_mon_client *monc = req->monc;
    677	struct ceph_mon_generic_request *lookup_req;
    678
    679	dout("%s greq %p tid %llu\n", __func__, req, req->tid);
    680
    681	mutex_lock(&monc->mutex);
    682	lookup_req = lookup_generic_request(&monc->generic_request_tree,
    683					    req->tid);
    684	if (lookup_req) {
    685		WARN_ON(lookup_req != req);
    686		finish_generic_request(req);
    687	}
    688
    689	mutex_unlock(&monc->mutex);
    690}
    691
    692static int wait_generic_request(struct ceph_mon_generic_request *req)
    693{
    694	int ret;
    695
    696	dout("%s greq %p tid %llu\n", __func__, req, req->tid);
    697	ret = wait_for_completion_interruptible(&req->completion);
    698	if (ret)
    699		cancel_generic_request(req);
    700	else
    701		ret = req->result; /* completed */
    702
    703	return ret;
    704}
    705
    706static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
    707					 struct ceph_msg_header *hdr,
    708					 int *skip)
    709{
    710	struct ceph_mon_client *monc = con->private;
    711	struct ceph_mon_generic_request *req;
    712	u64 tid = le64_to_cpu(hdr->tid);
    713	struct ceph_msg *m;
    714
    715	mutex_lock(&monc->mutex);
    716	req = lookup_generic_request(&monc->generic_request_tree, tid);
    717	if (!req) {
    718		dout("get_generic_reply %lld dne\n", tid);
    719		*skip = 1;
    720		m = NULL;
    721	} else {
    722		dout("get_generic_reply %lld got %p\n", tid, req->reply);
    723		*skip = 0;
    724		m = ceph_msg_get(req->reply);
    725		/*
    726		 * we don't need to track the connection reading into
    727		 * this reply because we only have one open connection
    728		 * at a time, ever.
    729		 */
    730	}
    731	mutex_unlock(&monc->mutex);
    732	return m;
    733}
    734
    735/*
    736 * statfs
    737 */
    738static void handle_statfs_reply(struct ceph_mon_client *monc,
    739				struct ceph_msg *msg)
    740{
    741	struct ceph_mon_generic_request *req;
    742	struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
    743	u64 tid = le64_to_cpu(msg->hdr.tid);
    744
    745	dout("%s msg %p tid %llu\n", __func__, msg, tid);
    746
    747	if (msg->front.iov_len != sizeof(*reply))
    748		goto bad;
    749
    750	mutex_lock(&monc->mutex);
    751	req = lookup_generic_request(&monc->generic_request_tree, tid);
    752	if (!req) {
    753		mutex_unlock(&monc->mutex);
    754		return;
    755	}
    756
    757	req->result = 0;
    758	*req->u.st = reply->st; /* struct */
    759	__finish_generic_request(req);
    760	mutex_unlock(&monc->mutex);
    761
    762	complete_generic_request(req);
    763	return;
    764
    765bad:
    766	pr_err("corrupt statfs reply, tid %llu\n", tid);
    767	ceph_msg_dump(msg);
    768}
    769
    770/*
    771 * Do a synchronous statfs().
    772 */
    773int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
    774			struct ceph_statfs *buf)
    775{
    776	struct ceph_mon_generic_request *req;
    777	struct ceph_mon_statfs *h;
    778	int ret = -ENOMEM;
    779
    780	req = alloc_generic_request(monc, GFP_NOFS);
    781	if (!req)
    782		goto out;
    783
    784	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
    785				    true);
    786	if (!req->request)
    787		goto out;
    788
    789	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
    790	if (!req->reply)
    791		goto out;
    792
    793	req->u.st = buf;
    794	req->request->hdr.version = cpu_to_le16(2);
    795
    796	mutex_lock(&monc->mutex);
    797	register_generic_request(req);
    798	/* fill out request */
    799	h = req->request->front.iov_base;
    800	h->monhdr.have_version = 0;
    801	h->monhdr.session_mon = cpu_to_le16(-1);
    802	h->monhdr.session_mon_tid = 0;
    803	h->fsid = monc->monmap->fsid;
    804	h->contains_data_pool = (data_pool != CEPH_NOPOOL);
    805	h->data_pool = cpu_to_le64(data_pool);
    806	send_generic_request(monc, req);
    807	mutex_unlock(&monc->mutex);
    808
    809	ret = wait_generic_request(req);
    810out:
    811	put_generic_request(req);
    812	return ret;
    813}
    814EXPORT_SYMBOL(ceph_monc_do_statfs);
    815
    816static void handle_get_version_reply(struct ceph_mon_client *monc,
    817				     struct ceph_msg *msg)
    818{
    819	struct ceph_mon_generic_request *req;
    820	u64 tid = le64_to_cpu(msg->hdr.tid);
    821	void *p = msg->front.iov_base;
    822	void *end = p + msg->front_alloc_len;
    823	u64 handle;
    824
    825	dout("%s msg %p tid %llu\n", __func__, msg, tid);
    826
    827	ceph_decode_need(&p, end, 2*sizeof(u64), bad);
    828	handle = ceph_decode_64(&p);
    829	if (tid != 0 && tid != handle)
    830		goto bad;
    831
    832	mutex_lock(&monc->mutex);
    833	req = lookup_generic_request(&monc->generic_request_tree, handle);
    834	if (!req) {
    835		mutex_unlock(&monc->mutex);
    836		return;
    837	}
    838
    839	req->result = 0;
    840	req->u.newest = ceph_decode_64(&p);
    841	__finish_generic_request(req);
    842	mutex_unlock(&monc->mutex);
    843
    844	complete_generic_request(req);
    845	return;
    846
    847bad:
    848	pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
    849	ceph_msg_dump(msg);
    850}
    851
    852static struct ceph_mon_generic_request *
    853__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
    854			ceph_monc_callback_t cb, u64 private_data)
    855{
    856	struct ceph_mon_generic_request *req;
    857
    858	req = alloc_generic_request(monc, GFP_NOIO);
    859	if (!req)
    860		goto err_put_req;
    861
    862	req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
    863				    sizeof(u64) + sizeof(u32) + strlen(what),
    864				    GFP_NOIO, true);
    865	if (!req->request)
    866		goto err_put_req;
    867
    868	req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
    869				  true);
    870	if (!req->reply)
    871		goto err_put_req;
    872
    873	req->complete_cb = cb;
    874	req->private_data = private_data;
    875
    876	mutex_lock(&monc->mutex);
    877	register_generic_request(req);
    878	{
    879		void *p = req->request->front.iov_base;
    880		void *const end = p + req->request->front_alloc_len;
    881
    882		ceph_encode_64(&p, req->tid); /* handle */
    883		ceph_encode_string(&p, end, what, strlen(what));
    884		WARN_ON(p != end);
    885	}
    886	send_generic_request(monc, req);
    887	mutex_unlock(&monc->mutex);
    888
    889	return req;
    890
    891err_put_req:
    892	put_generic_request(req);
    893	return ERR_PTR(-ENOMEM);
    894}
    895
    896/*
    897 * Send MMonGetVersion and wait for the reply.
    898 *
    899 * @what: one of "mdsmap", "osdmap" or "monmap"
    900 */
    901int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
    902			  u64 *newest)
    903{
    904	struct ceph_mon_generic_request *req;
    905	int ret;
    906
    907	req = __ceph_monc_get_version(monc, what, NULL, 0);
    908	if (IS_ERR(req))
    909		return PTR_ERR(req);
    910
    911	ret = wait_generic_request(req);
    912	if (!ret)
    913		*newest = req->u.newest;
    914
    915	put_generic_request(req);
    916	return ret;
    917}
    918EXPORT_SYMBOL(ceph_monc_get_version);
    919
    920/*
    921 * Send MMonGetVersion,
    922 *
    923 * @what: one of "mdsmap", "osdmap" or "monmap"
    924 */
    925int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
    926				ceph_monc_callback_t cb, u64 private_data)
    927{
    928	struct ceph_mon_generic_request *req;
    929
    930	req = __ceph_monc_get_version(monc, what, cb, private_data);
    931	if (IS_ERR(req))
    932		return PTR_ERR(req);
    933
    934	put_generic_request(req);
    935	return 0;
    936}
    937EXPORT_SYMBOL(ceph_monc_get_version_async);
    938
    939static void handle_command_ack(struct ceph_mon_client *monc,
    940			       struct ceph_msg *msg)
    941{
    942	struct ceph_mon_generic_request *req;
    943	void *p = msg->front.iov_base;
    944	void *const end = p + msg->front_alloc_len;
    945	u64 tid = le64_to_cpu(msg->hdr.tid);
    946
    947	dout("%s msg %p tid %llu\n", __func__, msg, tid);
    948
    949	ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
    950							    sizeof(u32), bad);
    951	p += sizeof(struct ceph_mon_request_header);
    952
    953	mutex_lock(&monc->mutex);
    954	req = lookup_generic_request(&monc->generic_request_tree, tid);
    955	if (!req) {
    956		mutex_unlock(&monc->mutex);
    957		return;
    958	}
    959
    960	req->result = ceph_decode_32(&p);
    961	__finish_generic_request(req);
    962	mutex_unlock(&monc->mutex);
    963
    964	complete_generic_request(req);
    965	return;
    966
    967bad:
    968	pr_err("corrupt mon_command ack, tid %llu\n", tid);
    969	ceph_msg_dump(msg);
    970}
    971
    972static __printf(2, 0)
    973int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
    974			 va_list ap)
    975{
    976	struct ceph_mon_generic_request *req;
    977	struct ceph_mon_command *h;
    978	int ret = -ENOMEM;
    979	int len;
    980
    981	req = alloc_generic_request(monc, GFP_NOIO);
    982	if (!req)
    983		goto out;
    984
    985	req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
    986	if (!req->request)
    987		goto out;
    988
    989	req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
    990				  true);
    991	if (!req->reply)
    992		goto out;
    993
    994	mutex_lock(&monc->mutex);
    995	register_generic_request(req);
    996	h = req->request->front.iov_base;
    997	h->monhdr.have_version = 0;
    998	h->monhdr.session_mon = cpu_to_le16(-1);
    999	h->monhdr.session_mon_tid = 0;
   1000	h->fsid = monc->monmap->fsid;
   1001	h->num_strs = cpu_to_le32(1);
   1002	len = vsprintf(h->str, fmt, ap);
   1003	h->str_len = cpu_to_le32(len);
   1004	send_generic_request(monc, req);
   1005	mutex_unlock(&monc->mutex);
   1006
   1007	ret = wait_generic_request(req);
   1008out:
   1009	put_generic_request(req);
   1010	return ret;
   1011}
   1012
   1013static __printf(2, 3)
   1014int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
   1015{
   1016	va_list ap;
   1017	int ret;
   1018
   1019	va_start(ap, fmt);
   1020	ret = do_mon_command_vargs(monc, fmt, ap);
   1021	va_end(ap);
   1022	return ret;
   1023}
   1024
   1025int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
   1026			    struct ceph_entity_addr *client_addr)
   1027{
   1028	int ret;
   1029
   1030	ret = do_mon_command(monc,
   1031			     "{ \"prefix\": \"osd blocklist\", \
   1032				\"blocklistop\": \"add\", \
   1033				\"addr\": \"%pISpc/%u\" }",
   1034			     &client_addr->in_addr,
   1035			     le32_to_cpu(client_addr->nonce));
   1036	if (ret == -EINVAL) {
   1037		/*
   1038		 * The monitor returns EINVAL on an unrecognized command.
   1039		 * Try the legacy command -- it is exactly the same except
   1040		 * for the name.
   1041		 */
   1042		ret = do_mon_command(monc,
   1043				     "{ \"prefix\": \"osd blacklist\", \
   1044					\"blacklistop\": \"add\", \
   1045					\"addr\": \"%pISpc/%u\" }",
   1046				     &client_addr->in_addr,
   1047				     le32_to_cpu(client_addr->nonce));
   1048	}
   1049	if (ret)
   1050		return ret;
   1051
   1052	/*
   1053	 * Make sure we have the osdmap that includes the blocklist
   1054	 * entry.  This is needed to ensure that the OSDs pick up the
   1055	 * new blocklist before processing any future requests from
   1056	 * this client.
   1057	 */
   1058	return ceph_wait_for_latest_osdmap(monc->client, 0);
   1059}
   1060EXPORT_SYMBOL(ceph_monc_blocklist_add);
   1061
   1062/*
   1063 * Resend pending generic requests.
   1064 */
   1065static void __resend_generic_request(struct ceph_mon_client *monc)
   1066{
   1067	struct ceph_mon_generic_request *req;
   1068	struct rb_node *p;
   1069
   1070	for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
   1071		req = rb_entry(p, struct ceph_mon_generic_request, node);
   1072		ceph_msg_revoke(req->request);
   1073		ceph_msg_revoke_incoming(req->reply);
   1074		ceph_con_send(&monc->con, ceph_msg_get(req->request));
   1075	}
   1076}
   1077
   1078/*
   1079 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
   1080 * renew/retry subscription as needed (in case it is timing out, or we
   1081 * got an ENOMEM).  And keep the monitor connection alive.
   1082 */
   1083static void delayed_work(struct work_struct *work)
   1084{
   1085	struct ceph_mon_client *monc =
   1086		container_of(work, struct ceph_mon_client, delayed_work.work);
   1087
   1088	dout("monc delayed_work\n");
   1089	mutex_lock(&monc->mutex);
   1090	if (monc->hunting) {
   1091		dout("%s continuing hunt\n", __func__);
   1092		reopen_session(monc);
   1093	} else {
   1094		int is_auth = ceph_auth_is_authenticated(monc->auth);
   1095		if (ceph_con_keepalive_expired(&monc->con,
   1096					       CEPH_MONC_PING_TIMEOUT)) {
   1097			dout("monc keepalive timeout\n");
   1098			is_auth = 0;
   1099			reopen_session(monc);
   1100		}
   1101
   1102		if (!monc->hunting) {
   1103			ceph_con_keepalive(&monc->con);
   1104			__validate_auth(monc);
   1105			un_backoff(monc);
   1106		}
   1107
   1108		if (is_auth &&
   1109		    !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
   1110			unsigned long now = jiffies;
   1111
   1112			dout("%s renew subs? now %lu renew after %lu\n",
   1113			     __func__, now, monc->sub_renew_after);
   1114			if (time_after_eq(now, monc->sub_renew_after))
   1115				__send_subscribe(monc);
   1116		}
   1117	}
   1118	__schedule_delayed(monc);
   1119	mutex_unlock(&monc->mutex);
   1120}
   1121
   1122/*
   1123 * On startup, we build a temporary monmap populated with the IPs
   1124 * provided by mount(2).
   1125 */
   1126static int build_initial_monmap(struct ceph_mon_client *monc)
   1127{
   1128	__le32 my_type = ceph_msgr2(monc->client) ?
   1129		CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
   1130	struct ceph_options *opt = monc->client->options;
   1131	int num_mon = opt->num_mon;
   1132	int i;
   1133
   1134	/* build initial monmap */
   1135	monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
   1136			       GFP_KERNEL);
   1137	if (!monc->monmap)
   1138		return -ENOMEM;
   1139
   1140	for (i = 0; i < num_mon; i++) {
   1141		struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
   1142
   1143		memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
   1144		       sizeof(inst->addr.in_addr));
   1145		inst->addr.type = my_type;
   1146		inst->addr.nonce = 0;
   1147		inst->name.type = CEPH_ENTITY_TYPE_MON;
   1148		inst->name.num = cpu_to_le64(i);
   1149	}
   1150	monc->monmap->num_mon = num_mon;
   1151	return 0;
   1152}
   1153
   1154int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
   1155{
   1156	int err;
   1157
   1158	dout("init\n");
   1159	memset(monc, 0, sizeof(*monc));
   1160	monc->client = cl;
   1161	mutex_init(&monc->mutex);
   1162
   1163	err = build_initial_monmap(monc);
   1164	if (err)
   1165		goto out;
   1166
   1167	/* connection */
   1168	/* authentication */
   1169	monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
   1170				    cl->options->con_modes);
   1171	if (IS_ERR(monc->auth)) {
   1172		err = PTR_ERR(monc->auth);
   1173		goto out_monmap;
   1174	}
   1175	monc->auth->want_keys =
   1176		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
   1177		CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
   1178
   1179	/* msgs */
   1180	err = -ENOMEM;
   1181	monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
   1182				     sizeof(struct ceph_mon_subscribe_ack),
   1183				     GFP_KERNEL, true);
   1184	if (!monc->m_subscribe_ack)
   1185		goto out_auth;
   1186
   1187	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
   1188					 GFP_KERNEL, true);
   1189	if (!monc->m_subscribe)
   1190		goto out_subscribe_ack;
   1191
   1192	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
   1193					  GFP_KERNEL, true);
   1194	if (!monc->m_auth_reply)
   1195		goto out_subscribe;
   1196
   1197	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
   1198	monc->pending_auth = 0;
   1199	if (!monc->m_auth)
   1200		goto out_auth_reply;
   1201
   1202	ceph_con_init(&monc->con, monc, &mon_con_ops,
   1203		      &monc->client->msgr);
   1204
   1205	monc->cur_mon = -1;
   1206	monc->had_a_connection = false;
   1207	monc->hunt_mult = 1;
   1208
   1209	INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
   1210	monc->generic_request_tree = RB_ROOT;
   1211	monc->last_tid = 0;
   1212
   1213	monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
   1214
   1215	return 0;
   1216
   1217out_auth_reply:
   1218	ceph_msg_put(monc->m_auth_reply);
   1219out_subscribe:
   1220	ceph_msg_put(monc->m_subscribe);
   1221out_subscribe_ack:
   1222	ceph_msg_put(monc->m_subscribe_ack);
   1223out_auth:
   1224	ceph_auth_destroy(monc->auth);
   1225out_monmap:
   1226	kfree(monc->monmap);
   1227out:
   1228	return err;
   1229}
   1230EXPORT_SYMBOL(ceph_monc_init);
   1231
   1232void ceph_monc_stop(struct ceph_mon_client *monc)
   1233{
   1234	dout("stop\n");
   1235	cancel_delayed_work_sync(&monc->delayed_work);
   1236
   1237	mutex_lock(&monc->mutex);
   1238	__close_session(monc);
   1239	monc->cur_mon = -1;
   1240	mutex_unlock(&monc->mutex);
   1241
   1242	/*
   1243	 * flush msgr queue before we destroy ourselves to ensure that:
   1244	 *  - any work that references our embedded con is finished.
   1245	 *  - any osd_client or other work that may reference an authorizer
   1246	 *    finishes before we shut down the auth subsystem.
   1247	 */
   1248	ceph_msgr_flush();
   1249
   1250	ceph_auth_destroy(monc->auth);
   1251
   1252	WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
   1253
   1254	ceph_msg_put(monc->m_auth);
   1255	ceph_msg_put(monc->m_auth_reply);
   1256	ceph_msg_put(monc->m_subscribe);
   1257	ceph_msg_put(monc->m_subscribe_ack);
   1258
   1259	kfree(monc->monmap);
   1260}
   1261EXPORT_SYMBOL(ceph_monc_stop);
   1262
   1263static void finish_hunting(struct ceph_mon_client *monc)
   1264{
   1265	if (monc->hunting) {
   1266		dout("%s found mon%d\n", __func__, monc->cur_mon);
   1267		monc->hunting = false;
   1268		monc->had_a_connection = true;
   1269		un_backoff(monc);
   1270		__schedule_delayed(monc);
   1271	}
   1272}
   1273
   1274static void finish_auth(struct ceph_mon_client *monc, int auth_err,
   1275			bool was_authed)
   1276{
   1277	dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
   1278	WARN_ON(auth_err > 0);
   1279
   1280	monc->pending_auth = 0;
   1281	if (auth_err) {
   1282		monc->client->auth_err = auth_err;
   1283		wake_up_all(&monc->client->auth_wq);
   1284		return;
   1285	}
   1286
   1287	if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
   1288		dout("%s authenticated, starting session global_id %llu\n",
   1289		     __func__, monc->auth->global_id);
   1290
   1291		monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
   1292		monc->client->msgr.inst.name.num =
   1293					cpu_to_le64(monc->auth->global_id);
   1294
   1295		__send_subscribe(monc);
   1296		__resend_generic_request(monc);
   1297
   1298		pr_info("mon%d %s session established\n", monc->cur_mon,
   1299			ceph_pr_addr(&monc->con.peer_addr));
   1300	}
   1301}
   1302
   1303static void handle_auth_reply(struct ceph_mon_client *monc,
   1304			      struct ceph_msg *msg)
   1305{
   1306	bool was_authed;
   1307	int ret;
   1308
   1309	mutex_lock(&monc->mutex);
   1310	was_authed = ceph_auth_is_authenticated(monc->auth);
   1311	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
   1312				     msg->front.iov_len,
   1313				     monc->m_auth->front.iov_base,
   1314				     monc->m_auth->front_alloc_len);
   1315	if (ret > 0) {
   1316		__send_prepared_auth_request(monc, ret);
   1317	} else {
   1318		finish_auth(monc, ret, was_authed);
   1319		finish_hunting(monc);
   1320	}
   1321	mutex_unlock(&monc->mutex);
   1322}
   1323
   1324static int __validate_auth(struct ceph_mon_client *monc)
   1325{
   1326	int ret;
   1327
   1328	if (monc->pending_auth)
   1329		return 0;
   1330
   1331	ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
   1332			      monc->m_auth->front_alloc_len);
   1333	if (ret <= 0)
   1334		return ret; /* either an error, or no need to authenticate */
   1335	__send_prepared_auth_request(monc, ret);
   1336	return 0;
   1337}
   1338
   1339int ceph_monc_validate_auth(struct ceph_mon_client *monc)
   1340{
   1341	int ret;
   1342
   1343	mutex_lock(&monc->mutex);
   1344	ret = __validate_auth(monc);
   1345	mutex_unlock(&monc->mutex);
   1346	return ret;
   1347}
   1348EXPORT_SYMBOL(ceph_monc_validate_auth);
   1349
   1350static int mon_get_auth_request(struct ceph_connection *con,
   1351				void *buf, int *buf_len,
   1352				void **authorizer, int *authorizer_len)
   1353{
   1354	struct ceph_mon_client *monc = con->private;
   1355	int ret;
   1356
   1357	mutex_lock(&monc->mutex);
   1358	ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
   1359	mutex_unlock(&monc->mutex);
   1360	if (ret < 0)
   1361		return ret;
   1362
   1363	*buf_len = ret;
   1364	*authorizer = NULL;
   1365	*authorizer_len = 0;
   1366	return 0;
   1367}
   1368
   1369static int mon_handle_auth_reply_more(struct ceph_connection *con,
   1370				      void *reply, int reply_len,
   1371				      void *buf, int *buf_len,
   1372				      void **authorizer, int *authorizer_len)
   1373{
   1374	struct ceph_mon_client *monc = con->private;
   1375	int ret;
   1376
   1377	mutex_lock(&monc->mutex);
   1378	ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
   1379					  buf, *buf_len);
   1380	mutex_unlock(&monc->mutex);
   1381	if (ret < 0)
   1382		return ret;
   1383
   1384	*buf_len = ret;
   1385	*authorizer = NULL;
   1386	*authorizer_len = 0;
   1387	return 0;
   1388}
   1389
   1390static int mon_handle_auth_done(struct ceph_connection *con,
   1391				u64 global_id, void *reply, int reply_len,
   1392				u8 *session_key, int *session_key_len,
   1393				u8 *con_secret, int *con_secret_len)
   1394{
   1395	struct ceph_mon_client *monc = con->private;
   1396	bool was_authed;
   1397	int ret;
   1398
   1399	mutex_lock(&monc->mutex);
   1400	WARN_ON(!monc->hunting);
   1401	was_authed = ceph_auth_is_authenticated(monc->auth);
   1402	ret = ceph_auth_handle_reply_done(monc->auth, global_id,
   1403					  reply, reply_len,
   1404					  session_key, session_key_len,
   1405					  con_secret, con_secret_len);
   1406	finish_auth(monc, ret, was_authed);
   1407	if (!ret)
   1408		finish_hunting(monc);
   1409	mutex_unlock(&monc->mutex);
   1410	return 0;
   1411}
   1412
   1413static int mon_handle_auth_bad_method(struct ceph_connection *con,
   1414				      int used_proto, int result,
   1415				      const int *allowed_protos, int proto_cnt,
   1416				      const int *allowed_modes, int mode_cnt)
   1417{
   1418	struct ceph_mon_client *monc = con->private;
   1419	bool was_authed;
   1420
   1421	mutex_lock(&monc->mutex);
   1422	WARN_ON(!monc->hunting);
   1423	was_authed = ceph_auth_is_authenticated(monc->auth);
   1424	ceph_auth_handle_bad_method(monc->auth, used_proto, result,
   1425				    allowed_protos, proto_cnt,
   1426				    allowed_modes, mode_cnt);
   1427	finish_auth(monc, -EACCES, was_authed);
   1428	mutex_unlock(&monc->mutex);
   1429	return 0;
   1430}
   1431
   1432/*
   1433 * handle incoming message
   1434 */
   1435static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
   1436{
   1437	struct ceph_mon_client *monc = con->private;
   1438	int type = le16_to_cpu(msg->hdr.type);
   1439
   1440	switch (type) {
   1441	case CEPH_MSG_AUTH_REPLY:
   1442		handle_auth_reply(monc, msg);
   1443		break;
   1444
   1445	case CEPH_MSG_MON_SUBSCRIBE_ACK:
   1446		handle_subscribe_ack(monc, msg);
   1447		break;
   1448
   1449	case CEPH_MSG_STATFS_REPLY:
   1450		handle_statfs_reply(monc, msg);
   1451		break;
   1452
   1453	case CEPH_MSG_MON_GET_VERSION_REPLY:
   1454		handle_get_version_reply(monc, msg);
   1455		break;
   1456
   1457	case CEPH_MSG_MON_COMMAND_ACK:
   1458		handle_command_ack(monc, msg);
   1459		break;
   1460
   1461	case CEPH_MSG_MON_MAP:
   1462		ceph_monc_handle_map(monc, msg);
   1463		break;
   1464
   1465	case CEPH_MSG_OSD_MAP:
   1466		ceph_osdc_handle_map(&monc->client->osdc, msg);
   1467		break;
   1468
   1469	default:
   1470		/* can the chained handler handle it? */
   1471		if (monc->client->extra_mon_dispatch &&
   1472		    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
   1473			break;
   1474
   1475		pr_err("received unknown message type %d %s\n", type,
   1476		       ceph_msg_type_name(type));
   1477	}
   1478	ceph_msg_put(msg);
   1479}
   1480
   1481/*
   1482 * Allocate memory for incoming message
   1483 */
   1484static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
   1485				      struct ceph_msg_header *hdr,
   1486				      int *skip)
   1487{
   1488	struct ceph_mon_client *monc = con->private;
   1489	int type = le16_to_cpu(hdr->type);
   1490	int front_len = le32_to_cpu(hdr->front_len);
   1491	struct ceph_msg *m = NULL;
   1492
   1493	*skip = 0;
   1494
   1495	switch (type) {
   1496	case CEPH_MSG_MON_SUBSCRIBE_ACK:
   1497		m = ceph_msg_get(monc->m_subscribe_ack);
   1498		break;
   1499	case CEPH_MSG_STATFS_REPLY:
   1500	case CEPH_MSG_MON_COMMAND_ACK:
   1501		return get_generic_reply(con, hdr, skip);
   1502	case CEPH_MSG_AUTH_REPLY:
   1503		m = ceph_msg_get(monc->m_auth_reply);
   1504		break;
   1505	case CEPH_MSG_MON_GET_VERSION_REPLY:
   1506		if (le64_to_cpu(hdr->tid) != 0)
   1507			return get_generic_reply(con, hdr, skip);
   1508
   1509		/*
   1510		 * Older OSDs don't set reply tid even if the original
   1511		 * request had a non-zero tid.  Work around this weirdness
   1512		 * by allocating a new message.
   1513		 */
   1514		fallthrough;
   1515	case CEPH_MSG_MON_MAP:
   1516	case CEPH_MSG_MDS_MAP:
   1517	case CEPH_MSG_OSD_MAP:
   1518	case CEPH_MSG_FS_MAP_USER:
   1519		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
   1520		if (!m)
   1521			return NULL;	/* ENOMEM--return skip == 0 */
   1522		break;
   1523	}
   1524
   1525	if (!m) {
   1526		pr_info("alloc_msg unknown type %d\n", type);
   1527		*skip = 1;
   1528	} else if (front_len > m->front_alloc_len) {
   1529		pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
   1530			front_len, m->front_alloc_len,
   1531			(unsigned int)con->peer_name.type,
   1532			le64_to_cpu(con->peer_name.num));
   1533		ceph_msg_put(m);
   1534		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
   1535	}
   1536
   1537	return m;
   1538}
   1539
   1540/*
   1541 * If the monitor connection resets, pick a new monitor and resubmit
   1542 * any pending requests.
   1543 */
   1544static void mon_fault(struct ceph_connection *con)
   1545{
   1546	struct ceph_mon_client *monc = con->private;
   1547
   1548	mutex_lock(&monc->mutex);
   1549	dout("%s mon%d\n", __func__, monc->cur_mon);
   1550	if (monc->cur_mon >= 0) {
   1551		if (!monc->hunting) {
   1552			dout("%s hunting for new mon\n", __func__);
   1553			reopen_session(monc);
   1554			__schedule_delayed(monc);
   1555		} else {
   1556			dout("%s already hunting\n", __func__);
   1557		}
   1558	}
   1559	mutex_unlock(&monc->mutex);
   1560}
   1561
   1562/*
   1563 * We can ignore refcounting on the connection struct, as all references
   1564 * will come from the messenger workqueue, which is drained prior to
   1565 * mon_client destruction.
   1566 */
   1567static struct ceph_connection *mon_get_con(struct ceph_connection *con)
   1568{
   1569	return con;
   1570}
   1571
   1572static void mon_put_con(struct ceph_connection *con)
   1573{
   1574}
   1575
   1576static const struct ceph_connection_operations mon_con_ops = {
   1577	.get = mon_get_con,
   1578	.put = mon_put_con,
   1579	.alloc_msg = mon_alloc_msg,
   1580	.dispatch = mon_dispatch,
   1581	.fault = mon_fault,
   1582	.get_auth_request = mon_get_auth_request,
   1583	.handle_auth_reply_more = mon_handle_auth_reply_more,
   1584	.handle_auth_done = mon_handle_auth_done,
   1585	.handle_auth_bad_method = mon_handle_auth_bad_method,
   1586};