cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

messenger.c (55245B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/crc32c.h>
      5#include <linux/ctype.h>
      6#include <linux/highmem.h>
      7#include <linux/inet.h>
      8#include <linux/kthread.h>
      9#include <linux/net.h>
     10#include <linux/nsproxy.h>
     11#include <linux/sched/mm.h>
     12#include <linux/slab.h>
     13#include <linux/socket.h>
     14#include <linux/string.h>
     15#ifdef	CONFIG_BLOCK
     16#include <linux/bio.h>
     17#endif	/* CONFIG_BLOCK */
     18#include <linux/dns_resolver.h>
     19#include <net/tcp.h>
     20
     21#include <linux/ceph/ceph_features.h>
     22#include <linux/ceph/libceph.h>
     23#include <linux/ceph/messenger.h>
     24#include <linux/ceph/decode.h>
     25#include <linux/ceph/pagelist.h>
     26#include <linux/export.h>
     27
     28/*
     29 * Ceph uses the messenger to exchange ceph_msg messages with other
     30 * hosts in the system.  The messenger provides ordered and reliable
     31 * delivery.  We tolerate TCP disconnects by reconnecting (with
     32 * exponential backoff) in the case of a fault (disconnection, bad
     33 * crc, protocol error).  Acks allow sent messages to be discarded by
     34 * the sender.
     35 */
     36
     37/*
     38 * We track the state of the socket on a given connection using
     39 * values defined below.  The transition to a new socket state is
     40 * handled by a function which verifies we aren't coming from an
     41 * unexpected state.
     42 *
     43 *      --------
     44 *      | NEW* |  transient initial state
     45 *      --------
     46 *          | con_sock_state_init()
     47 *          v
     48 *      ----------
     49 *      | CLOSED |  initialized, but no socket (and no
     50 *      ----------  TCP connection)
     51 *       ^      \
     52 *       |       \ con_sock_state_connecting()
     53 *       |        ----------------------
     54 *       |                              \
     55 *       + con_sock_state_closed()       \
     56 *       |+---------------------------    \
     57 *       | \                          \    \
     58 *       |  -----------                \    \
     59 *       |  | CLOSING |  socket event;  \    \
     60 *       |  -----------  await close     \    \
     61 *       |       ^                        \   |
     62 *       |       |                         \  |
     63 *       |       + con_sock_state_closing() \ |
     64 *       |      / \                         | |
     65 *       |     /   ---------------          | |
     66 *       |    /                   \         v v
     67 *       |   /                    --------------
     68 *       |  /    -----------------| CONNECTING |  socket created, TCP
     69 *       |  |   /                 --------------  connect initiated
     70 *       |  |   | con_sock_state_connected()
     71 *       |  |   v
     72 *      -------------
     73 *      | CONNECTED |  TCP connection established
     74 *      -------------
     75 *
     76 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
     77 */
     78
     79#define CON_SOCK_STATE_NEW		0	/* -> CLOSED */
     80#define CON_SOCK_STATE_CLOSED		1	/* -> CONNECTING */
     81#define CON_SOCK_STATE_CONNECTING	2	/* -> CONNECTED or -> CLOSING */
     82#define CON_SOCK_STATE_CONNECTED	3	/* -> CLOSING or -> CLOSED */
     83#define CON_SOCK_STATE_CLOSING		4	/* -> CLOSED */
     84
     85static bool con_flag_valid(unsigned long con_flag)
     86{
     87	switch (con_flag) {
     88	case CEPH_CON_F_LOSSYTX:
     89	case CEPH_CON_F_KEEPALIVE_PENDING:
     90	case CEPH_CON_F_WRITE_PENDING:
     91	case CEPH_CON_F_SOCK_CLOSED:
     92	case CEPH_CON_F_BACKOFF:
     93		return true;
     94	default:
     95		return false;
     96	}
     97}
     98
     99void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
    100{
    101	BUG_ON(!con_flag_valid(con_flag));
    102
    103	clear_bit(con_flag, &con->flags);
    104}
    105
    106void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
    107{
    108	BUG_ON(!con_flag_valid(con_flag));
    109
    110	set_bit(con_flag, &con->flags);
    111}
    112
    113bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
    114{
    115	BUG_ON(!con_flag_valid(con_flag));
    116
    117	return test_bit(con_flag, &con->flags);
    118}
    119
    120bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
    121				  unsigned long con_flag)
    122{
    123	BUG_ON(!con_flag_valid(con_flag));
    124
    125	return test_and_clear_bit(con_flag, &con->flags);
    126}
    127
    128bool ceph_con_flag_test_and_set(struct ceph_connection *con,
    129				unsigned long con_flag)
    130{
    131	BUG_ON(!con_flag_valid(con_flag));
    132
    133	return test_and_set_bit(con_flag, &con->flags);
    134}
    135
    136/* Slab caches for frequently-allocated structures */
    137
    138static struct kmem_cache	*ceph_msg_cache;
    139
    140#ifdef CONFIG_LOCKDEP
    141static struct lock_class_key socket_class;
    142#endif
    143
    144static void queue_con(struct ceph_connection *con);
    145static void cancel_con(struct ceph_connection *con);
    146static void ceph_con_workfn(struct work_struct *);
    147static void con_fault(struct ceph_connection *con);
    148
    149/*
    150 * Nicely render a sockaddr as a string.  An array of formatted
    151 * strings is used, to approximate reentrancy.
    152 */
    153#define ADDR_STR_COUNT_LOG	5	/* log2(# address strings in array) */
    154#define ADDR_STR_COUNT		(1 << ADDR_STR_COUNT_LOG)
    155#define ADDR_STR_COUNT_MASK	(ADDR_STR_COUNT - 1)
    156#define MAX_ADDR_STR_LEN	64	/* 54 is enough */
    157
    158static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
    159static atomic_t addr_str_seq = ATOMIC_INIT(0);
    160
    161struct page *ceph_zero_page;		/* used in certain error cases */
    162
    163const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
    164{
    165	int i;
    166	char *s;
    167	struct sockaddr_storage ss = addr->in_addr; /* align */
    168	struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
    169	struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
    170
    171	i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
    172	s = addr_str[i];
    173
    174	switch (ss.ss_family) {
    175	case AF_INET:
    176		snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
    177			 le32_to_cpu(addr->type), &in4->sin_addr,
    178			 ntohs(in4->sin_port));
    179		break;
    180
    181	case AF_INET6:
    182		snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
    183			 le32_to_cpu(addr->type), &in6->sin6_addr,
    184			 ntohs(in6->sin6_port));
    185		break;
    186
    187	default:
    188		snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
    189			 ss.ss_family);
    190	}
    191
    192	return s;
    193}
    194EXPORT_SYMBOL(ceph_pr_addr);
    195
    196void ceph_encode_my_addr(struct ceph_messenger *msgr)
    197{
    198	if (!ceph_msgr2(from_msgr(msgr))) {
    199		memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
    200		       sizeof(msgr->my_enc_addr));
    201		ceph_encode_banner_addr(&msgr->my_enc_addr);
    202	}
    203}
    204
    205/*
    206 * work queue for all reading and writing to/from the socket.
    207 */
    208static struct workqueue_struct *ceph_msgr_wq;
    209
    210static int ceph_msgr_slab_init(void)
    211{
    212	BUG_ON(ceph_msg_cache);
    213	ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
    214	if (!ceph_msg_cache)
    215		return -ENOMEM;
    216
    217	return 0;
    218}
    219
    220static void ceph_msgr_slab_exit(void)
    221{
    222	BUG_ON(!ceph_msg_cache);
    223	kmem_cache_destroy(ceph_msg_cache);
    224	ceph_msg_cache = NULL;
    225}
    226
    227static void _ceph_msgr_exit(void)
    228{
    229	if (ceph_msgr_wq) {
    230		destroy_workqueue(ceph_msgr_wq);
    231		ceph_msgr_wq = NULL;
    232	}
    233
    234	BUG_ON(!ceph_zero_page);
    235	put_page(ceph_zero_page);
    236	ceph_zero_page = NULL;
    237
    238	ceph_msgr_slab_exit();
    239}
    240
    241int __init ceph_msgr_init(void)
    242{
    243	if (ceph_msgr_slab_init())
    244		return -ENOMEM;
    245
    246	BUG_ON(ceph_zero_page);
    247	ceph_zero_page = ZERO_PAGE(0);
    248	get_page(ceph_zero_page);
    249
    250	/*
    251	 * The number of active work items is limited by the number of
    252	 * connections, so leave @max_active at default.
    253	 */
    254	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
    255	if (ceph_msgr_wq)
    256		return 0;
    257
    258	pr_err("msgr_init failed to create workqueue\n");
    259	_ceph_msgr_exit();
    260
    261	return -ENOMEM;
    262}
    263
    264void ceph_msgr_exit(void)
    265{
    266	BUG_ON(ceph_msgr_wq == NULL);
    267
    268	_ceph_msgr_exit();
    269}
    270
    271void ceph_msgr_flush(void)
    272{
    273	flush_workqueue(ceph_msgr_wq);
    274}
    275EXPORT_SYMBOL(ceph_msgr_flush);
    276
    277/* Connection socket state transition functions */
    278
    279static void con_sock_state_init(struct ceph_connection *con)
    280{
    281	int old_state;
    282
    283	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
    284	if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
    285		printk("%s: unexpected old state %d\n", __func__, old_state);
    286	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
    287	     CON_SOCK_STATE_CLOSED);
    288}
    289
    290static void con_sock_state_connecting(struct ceph_connection *con)
    291{
    292	int old_state;
    293
    294	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
    295	if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
    296		printk("%s: unexpected old state %d\n", __func__, old_state);
    297	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
    298	     CON_SOCK_STATE_CONNECTING);
    299}
    300
    301static void con_sock_state_connected(struct ceph_connection *con)
    302{
    303	int old_state;
    304
    305	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
    306	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
    307		printk("%s: unexpected old state %d\n", __func__, old_state);
    308	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
    309	     CON_SOCK_STATE_CONNECTED);
    310}
    311
    312static void con_sock_state_closing(struct ceph_connection *con)
    313{
    314	int old_state;
    315
    316	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
    317	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
    318			old_state != CON_SOCK_STATE_CONNECTED &&
    319			old_state != CON_SOCK_STATE_CLOSING))
    320		printk("%s: unexpected old state %d\n", __func__, old_state);
    321	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
    322	     CON_SOCK_STATE_CLOSING);
    323}
    324
    325static void con_sock_state_closed(struct ceph_connection *con)
    326{
    327	int old_state;
    328
    329	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
    330	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
    331		    old_state != CON_SOCK_STATE_CLOSING &&
    332		    old_state != CON_SOCK_STATE_CONNECTING &&
    333		    old_state != CON_SOCK_STATE_CLOSED))
    334		printk("%s: unexpected old state %d\n", __func__, old_state);
    335	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
    336	     CON_SOCK_STATE_CLOSED);
    337}
    338
    339/*
    340 * socket callback functions
    341 */
    342
    343/* data available on socket, or listen socket received a connect */
    344static void ceph_sock_data_ready(struct sock *sk)
    345{
    346	struct ceph_connection *con = sk->sk_user_data;
    347	if (atomic_read(&con->msgr->stopping)) {
    348		return;
    349	}
    350
    351	if (sk->sk_state != TCP_CLOSE_WAIT) {
    352		dout("%s %p state = %d, queueing work\n", __func__,
    353		     con, con->state);
    354		queue_con(con);
    355	}
    356}
    357
    358/* socket has buffer space for writing */
    359static void ceph_sock_write_space(struct sock *sk)
    360{
    361	struct ceph_connection *con = sk->sk_user_data;
    362
    363	/* only queue to workqueue if there is data we want to write,
    364	 * and there is sufficient space in the socket buffer to accept
    365	 * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
    366	 * doesn't get called again until try_write() fills the socket
    367	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
    368	 * and net/core/stream.c:sk_stream_write_space().
    369	 */
    370	if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
    371		if (sk_stream_is_writeable(sk)) {
    372			dout("%s %p queueing write work\n", __func__, con);
    373			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
    374			queue_con(con);
    375		}
    376	} else {
    377		dout("%s %p nothing to write\n", __func__, con);
    378	}
    379}
    380
    381/* socket's state has changed */
    382static void ceph_sock_state_change(struct sock *sk)
    383{
    384	struct ceph_connection *con = sk->sk_user_data;
    385
    386	dout("%s %p state = %d sk_state = %u\n", __func__,
    387	     con, con->state, sk->sk_state);
    388
    389	switch (sk->sk_state) {
    390	case TCP_CLOSE:
    391		dout("%s TCP_CLOSE\n", __func__);
    392		fallthrough;
    393	case TCP_CLOSE_WAIT:
    394		dout("%s TCP_CLOSE_WAIT\n", __func__);
    395		con_sock_state_closing(con);
    396		ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
    397		queue_con(con);
    398		break;
    399	case TCP_ESTABLISHED:
    400		dout("%s TCP_ESTABLISHED\n", __func__);
    401		con_sock_state_connected(con);
    402		queue_con(con);
    403		break;
    404	default:	/* Everything else is uninteresting */
    405		break;
    406	}
    407}
    408
    409/*
    410 * set up socket callbacks
    411 */
    412static void set_sock_callbacks(struct socket *sock,
    413			       struct ceph_connection *con)
    414{
    415	struct sock *sk = sock->sk;
    416	sk->sk_user_data = con;
    417	sk->sk_data_ready = ceph_sock_data_ready;
    418	sk->sk_write_space = ceph_sock_write_space;
    419	sk->sk_state_change = ceph_sock_state_change;
    420}
    421
    422
    423/*
    424 * socket helpers
    425 */
    426
    427/*
    428 * initiate connection to a remote socket.
    429 */
    430int ceph_tcp_connect(struct ceph_connection *con)
    431{
    432	struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
    433	struct socket *sock;
    434	unsigned int noio_flag;
    435	int ret;
    436
    437	dout("%s con %p peer_addr %s\n", __func__, con,
    438	     ceph_pr_addr(&con->peer_addr));
    439	BUG_ON(con->sock);
    440
    441	/* sock_create_kern() allocates with GFP_KERNEL */
    442	noio_flag = memalloc_noio_save();
    443	ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
    444			       SOCK_STREAM, IPPROTO_TCP, &sock);
    445	memalloc_noio_restore(noio_flag);
    446	if (ret)
    447		return ret;
    448	sock->sk->sk_allocation = GFP_NOFS;
    449
    450#ifdef CONFIG_LOCKDEP
    451	lockdep_set_class(&sock->sk->sk_lock, &socket_class);
    452#endif
    453
    454	set_sock_callbacks(sock, con);
    455
    456	con_sock_state_connecting(con);
    457	ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss),
    458				 O_NONBLOCK);
    459	if (ret == -EINPROGRESS) {
    460		dout("connect %s EINPROGRESS sk_state = %u\n",
    461		     ceph_pr_addr(&con->peer_addr),
    462		     sock->sk->sk_state);
    463	} else if (ret < 0) {
    464		pr_err("connect %s error %d\n",
    465		       ceph_pr_addr(&con->peer_addr), ret);
    466		sock_release(sock);
    467		return ret;
    468	}
    469
    470	if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
    471		tcp_sock_set_nodelay(sock->sk);
    472
    473	con->sock = sock;
    474	return 0;
    475}
    476
    477/*
    478 * Shutdown/close the socket for the given connection.
    479 */
    480int ceph_con_close_socket(struct ceph_connection *con)
    481{
    482	int rc = 0;
    483
    484	dout("%s con %p sock %p\n", __func__, con, con->sock);
    485	if (con->sock) {
    486		rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
    487		sock_release(con->sock);
    488		con->sock = NULL;
    489	}
    490
    491	/*
    492	 * Forcibly clear the SOCK_CLOSED flag.  It gets set
    493	 * independent of the connection mutex, and we could have
    494	 * received a socket close event before we had the chance to
    495	 * shut the socket down.
    496	 */
    497	ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
    498
    499	con_sock_state_closed(con);
    500	return rc;
    501}
    502
    503static void ceph_con_reset_protocol(struct ceph_connection *con)
    504{
    505	dout("%s con %p\n", __func__, con);
    506
    507	ceph_con_close_socket(con);
    508	if (con->in_msg) {
    509		WARN_ON(con->in_msg->con != con);
    510		ceph_msg_put(con->in_msg);
    511		con->in_msg = NULL;
    512	}
    513	if (con->out_msg) {
    514		WARN_ON(con->out_msg->con != con);
    515		ceph_msg_put(con->out_msg);
    516		con->out_msg = NULL;
    517	}
    518	if (con->bounce_page) {
    519		__free_page(con->bounce_page);
    520		con->bounce_page = NULL;
    521	}
    522
    523	if (ceph_msgr2(from_msgr(con->msgr)))
    524		ceph_con_v2_reset_protocol(con);
    525	else
    526		ceph_con_v1_reset_protocol(con);
    527}
    528
    529/*
    530 * Reset a connection.  Discard all incoming and outgoing messages
    531 * and clear *_seq state.
    532 */
    533static void ceph_msg_remove(struct ceph_msg *msg)
    534{
    535	list_del_init(&msg->list_head);
    536
    537	ceph_msg_put(msg);
    538}
    539
    540static void ceph_msg_remove_list(struct list_head *head)
    541{
    542	while (!list_empty(head)) {
    543		struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
    544							list_head);
    545		ceph_msg_remove(msg);
    546	}
    547}
    548
    549void ceph_con_reset_session(struct ceph_connection *con)
    550{
    551	dout("%s con %p\n", __func__, con);
    552
    553	WARN_ON(con->in_msg);
    554	WARN_ON(con->out_msg);
    555	ceph_msg_remove_list(&con->out_queue);
    556	ceph_msg_remove_list(&con->out_sent);
    557	con->out_seq = 0;
    558	con->in_seq = 0;
    559	con->in_seq_acked = 0;
    560
    561	if (ceph_msgr2(from_msgr(con->msgr)))
    562		ceph_con_v2_reset_session(con);
    563	else
    564		ceph_con_v1_reset_session(con);
    565}
    566
    567/*
    568 * mark a peer down.  drop any open connections.
    569 */
    570void ceph_con_close(struct ceph_connection *con)
    571{
    572	mutex_lock(&con->mutex);
    573	dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
    574	con->state = CEPH_CON_S_CLOSED;
    575
    576	ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX);  /* so we retry next
    577							  connect */
    578	ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
    579	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
    580	ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
    581
    582	ceph_con_reset_protocol(con);
    583	ceph_con_reset_session(con);
    584	cancel_con(con);
    585	mutex_unlock(&con->mutex);
    586}
    587EXPORT_SYMBOL(ceph_con_close);
    588
    589/*
    590 * Reopen a closed connection, with a new peer address.
    591 */
    592void ceph_con_open(struct ceph_connection *con,
    593		   __u8 entity_type, __u64 entity_num,
    594		   struct ceph_entity_addr *addr)
    595{
    596	mutex_lock(&con->mutex);
    597	dout("con_open %p %s\n", con, ceph_pr_addr(addr));
    598
    599	WARN_ON(con->state != CEPH_CON_S_CLOSED);
    600	con->state = CEPH_CON_S_PREOPEN;
    601
    602	con->peer_name.type = (__u8) entity_type;
    603	con->peer_name.num = cpu_to_le64(entity_num);
    604
    605	memcpy(&con->peer_addr, addr, sizeof(*addr));
    606	con->delay = 0;      /* reset backoff memory */
    607	mutex_unlock(&con->mutex);
    608	queue_con(con);
    609}
    610EXPORT_SYMBOL(ceph_con_open);
    611
    612/*
    613 * return true if this connection ever successfully opened
    614 */
    615bool ceph_con_opened(struct ceph_connection *con)
    616{
    617	if (ceph_msgr2(from_msgr(con->msgr)))
    618		return ceph_con_v2_opened(con);
    619
    620	return ceph_con_v1_opened(con);
    621}
    622
    623/*
    624 * initialize a new connection.
    625 */
    626void ceph_con_init(struct ceph_connection *con, void *private,
    627	const struct ceph_connection_operations *ops,
    628	struct ceph_messenger *msgr)
    629{
    630	dout("con_init %p\n", con);
    631	memset(con, 0, sizeof(*con));
    632	con->private = private;
    633	con->ops = ops;
    634	con->msgr = msgr;
    635
    636	con_sock_state_init(con);
    637
    638	mutex_init(&con->mutex);
    639	INIT_LIST_HEAD(&con->out_queue);
    640	INIT_LIST_HEAD(&con->out_sent);
    641	INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
    642
    643	con->state = CEPH_CON_S_CLOSED;
    644}
    645EXPORT_SYMBOL(ceph_con_init);
    646
    647/*
    648 * We maintain a global counter to order connection attempts.  Get
    649 * a unique seq greater than @gt.
    650 */
    651u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
    652{
    653	u32 ret;
    654
    655	spin_lock(&msgr->global_seq_lock);
    656	if (msgr->global_seq < gt)
    657		msgr->global_seq = gt;
    658	ret = ++msgr->global_seq;
    659	spin_unlock(&msgr->global_seq_lock);
    660	return ret;
    661}
    662
    663/*
    664 * Discard messages that have been acked by the server.
    665 */
    666void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
    667{
    668	struct ceph_msg *msg;
    669	u64 seq;
    670
    671	dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
    672	while (!list_empty(&con->out_sent)) {
    673		msg = list_first_entry(&con->out_sent, struct ceph_msg,
    674				       list_head);
    675		WARN_ON(msg->needs_out_seq);
    676		seq = le64_to_cpu(msg->hdr.seq);
    677		if (seq > ack_seq)
    678			break;
    679
    680		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
    681		     msg, seq);
    682		ceph_msg_remove(msg);
    683	}
    684}
    685
    686/*
    687 * Discard messages that have been requeued in con_fault(), up to
    688 * reconnect_seq.  This avoids gratuitously resending messages that
    689 * the server had received and handled prior to reconnect.
    690 */
    691void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
    692{
    693	struct ceph_msg *msg;
    694	u64 seq;
    695
    696	dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
    697	while (!list_empty(&con->out_queue)) {
    698		msg = list_first_entry(&con->out_queue, struct ceph_msg,
    699				       list_head);
    700		if (msg->needs_out_seq)
    701			break;
    702		seq = le64_to_cpu(msg->hdr.seq);
    703		if (seq > reconnect_seq)
    704			break;
    705
    706		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
    707		     msg, seq);
    708		ceph_msg_remove(msg);
    709	}
    710}
    711
    712#ifdef CONFIG_BLOCK
    713
    714/*
    715 * For a bio data item, a piece is whatever remains of the next
    716 * entry in the current bio iovec, or the first entry in the next
    717 * bio in the list.
    718 */
    719static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
    720					size_t length)
    721{
    722	struct ceph_msg_data *data = cursor->data;
    723	struct ceph_bio_iter *it = &cursor->bio_iter;
    724
    725	cursor->resid = min_t(size_t, length, data->bio_length);
    726	*it = data->bio_pos;
    727	if (cursor->resid < it->iter.bi_size)
    728		it->iter.bi_size = cursor->resid;
    729
    730	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
    731	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
    732}
    733
    734static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
    735						size_t *page_offset,
    736						size_t *length)
    737{
    738	struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
    739					   cursor->bio_iter.iter);
    740
    741	*page_offset = bv.bv_offset;
    742	*length = bv.bv_len;
    743	return bv.bv_page;
    744}
    745
    746static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
    747					size_t bytes)
    748{
    749	struct ceph_bio_iter *it = &cursor->bio_iter;
    750	struct page *page = bio_iter_page(it->bio, it->iter);
    751
    752	BUG_ON(bytes > cursor->resid);
    753	BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
    754	cursor->resid -= bytes;
    755	bio_advance_iter(it->bio, &it->iter, bytes);
    756
    757	if (!cursor->resid) {
    758		BUG_ON(!cursor->last_piece);
    759		return false;   /* no more data */
    760	}
    761
    762	if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
    763		       page == bio_iter_page(it->bio, it->iter)))
    764		return false;	/* more bytes to process in this segment */
    765
    766	if (!it->iter.bi_size) {
    767		it->bio = it->bio->bi_next;
    768		it->iter = it->bio->bi_iter;
    769		if (cursor->resid < it->iter.bi_size)
    770			it->iter.bi_size = cursor->resid;
    771	}
    772
    773	BUG_ON(cursor->last_piece);
    774	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
    775	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
    776	return true;
    777}
    778#endif /* CONFIG_BLOCK */
    779
    780static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
    781					size_t length)
    782{
    783	struct ceph_msg_data *data = cursor->data;
    784	struct bio_vec *bvecs = data->bvec_pos.bvecs;
    785
    786	cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
    787	cursor->bvec_iter = data->bvec_pos.iter;
    788	cursor->bvec_iter.bi_size = cursor->resid;
    789
    790	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
    791	cursor->last_piece =
    792	    cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
    793}
    794
    795static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
    796						size_t *page_offset,
    797						size_t *length)
    798{
    799	struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
    800					   cursor->bvec_iter);
    801
    802	*page_offset = bv.bv_offset;
    803	*length = bv.bv_len;
    804	return bv.bv_page;
    805}
    806
    807static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
    808					size_t bytes)
    809{
    810	struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
    811	struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
    812
    813	BUG_ON(bytes > cursor->resid);
    814	BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
    815	cursor->resid -= bytes;
    816	bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
    817
    818	if (!cursor->resid) {
    819		BUG_ON(!cursor->last_piece);
    820		return false;   /* no more data */
    821	}
    822
    823	if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
    824		       page == bvec_iter_page(bvecs, cursor->bvec_iter)))
    825		return false;	/* more bytes to process in this segment */
    826
    827	BUG_ON(cursor->last_piece);
    828	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
    829	cursor->last_piece =
    830	    cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
    831	return true;
    832}
    833
    834/*
    835 * For a page array, a piece comes from the first page in the array
    836 * that has not already been fully consumed.
    837 */
    838static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
    839					size_t length)
    840{
    841	struct ceph_msg_data *data = cursor->data;
    842	int page_count;
    843
    844	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
    845
    846	BUG_ON(!data->pages);
    847	BUG_ON(!data->length);
    848
    849	cursor->resid = min(length, data->length);
    850	page_count = calc_pages_for(data->alignment, (u64)data->length);
    851	cursor->page_offset = data->alignment & ~PAGE_MASK;
    852	cursor->page_index = 0;
    853	BUG_ON(page_count > (int)USHRT_MAX);
    854	cursor->page_count = (unsigned short)page_count;
    855	BUG_ON(length > SIZE_MAX - cursor->page_offset);
    856	cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
    857}
    858
    859static struct page *
    860ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
    861					size_t *page_offset, size_t *length)
    862{
    863	struct ceph_msg_data *data = cursor->data;
    864
    865	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
    866
    867	BUG_ON(cursor->page_index >= cursor->page_count);
    868	BUG_ON(cursor->page_offset >= PAGE_SIZE);
    869
    870	*page_offset = cursor->page_offset;
    871	if (cursor->last_piece)
    872		*length = cursor->resid;
    873	else
    874		*length = PAGE_SIZE - *page_offset;
    875
    876	return data->pages[cursor->page_index];
    877}
    878
    879static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
    880						size_t bytes)
    881{
    882	BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
    883
    884	BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
    885
    886	/* Advance the cursor page offset */
    887
    888	cursor->resid -= bytes;
    889	cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
    890	if (!bytes || cursor->page_offset)
    891		return false;	/* more bytes to process in the current page */
    892
    893	if (!cursor->resid)
    894		return false;   /* no more data */
    895
    896	/* Move on to the next page; offset is already at 0 */
    897
    898	BUG_ON(cursor->page_index >= cursor->page_count);
    899	cursor->page_index++;
    900	cursor->last_piece = cursor->resid <= PAGE_SIZE;
    901
    902	return true;
    903}
    904
    905/*
    906 * For a pagelist, a piece is whatever remains to be consumed in the
    907 * first page in the list, or the front of the next page.
    908 */
    909static void
    910ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
    911					size_t length)
    912{
    913	struct ceph_msg_data *data = cursor->data;
    914	struct ceph_pagelist *pagelist;
    915	struct page *page;
    916
    917	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
    918
    919	pagelist = data->pagelist;
    920	BUG_ON(!pagelist);
    921
    922	if (!length)
    923		return;		/* pagelist can be assigned but empty */
    924
    925	BUG_ON(list_empty(&pagelist->head));
    926	page = list_first_entry(&pagelist->head, struct page, lru);
    927
    928	cursor->resid = min(length, pagelist->length);
    929	cursor->page = page;
    930	cursor->offset = 0;
    931	cursor->last_piece = cursor->resid <= PAGE_SIZE;
    932}
    933
    934static struct page *
    935ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
    936				size_t *page_offset, size_t *length)
    937{
    938	struct ceph_msg_data *data = cursor->data;
    939	struct ceph_pagelist *pagelist;
    940
    941	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
    942
    943	pagelist = data->pagelist;
    944	BUG_ON(!pagelist);
    945
    946	BUG_ON(!cursor->page);
    947	BUG_ON(cursor->offset + cursor->resid != pagelist->length);
    948
    949	/* offset of first page in pagelist is always 0 */
    950	*page_offset = cursor->offset & ~PAGE_MASK;
    951	if (cursor->last_piece)
    952		*length = cursor->resid;
    953	else
    954		*length = PAGE_SIZE - *page_offset;
    955
    956	return cursor->page;
    957}
    958
    959static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
    960						size_t bytes)
    961{
    962	struct ceph_msg_data *data = cursor->data;
    963	struct ceph_pagelist *pagelist;
    964
    965	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
    966
    967	pagelist = data->pagelist;
    968	BUG_ON(!pagelist);
    969
    970	BUG_ON(cursor->offset + cursor->resid != pagelist->length);
    971	BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
    972
    973	/* Advance the cursor offset */
    974
    975	cursor->resid -= bytes;
    976	cursor->offset += bytes;
    977	/* offset of first page in pagelist is always 0 */
    978	if (!bytes || cursor->offset & ~PAGE_MASK)
    979		return false;	/* more bytes to process in the current page */
    980
    981	if (!cursor->resid)
    982		return false;   /* no more data */
    983
    984	/* Move on to the next page */
    985
    986	BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
    987	cursor->page = list_next_entry(cursor->page, lru);
    988	cursor->last_piece = cursor->resid <= PAGE_SIZE;
    989
    990	return true;
    991}
    992
    993/*
    994 * Message data is handled (sent or received) in pieces, where each
    995 * piece resides on a single page.  The network layer might not
    996 * consume an entire piece at once.  A data item's cursor keeps
    997 * track of which piece is next to process and how much remains to
    998 * be processed in that piece.  It also tracks whether the current
    999 * piece is the last one in the data item.
   1000 */
   1001static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
   1002{
   1003	size_t length = cursor->total_resid;
   1004
   1005	switch (cursor->data->type) {
   1006	case CEPH_MSG_DATA_PAGELIST:
   1007		ceph_msg_data_pagelist_cursor_init(cursor, length);
   1008		break;
   1009	case CEPH_MSG_DATA_PAGES:
   1010		ceph_msg_data_pages_cursor_init(cursor, length);
   1011		break;
   1012#ifdef CONFIG_BLOCK
   1013	case CEPH_MSG_DATA_BIO:
   1014		ceph_msg_data_bio_cursor_init(cursor, length);
   1015		break;
   1016#endif /* CONFIG_BLOCK */
   1017	case CEPH_MSG_DATA_BVECS:
   1018		ceph_msg_data_bvecs_cursor_init(cursor, length);
   1019		break;
   1020	case CEPH_MSG_DATA_NONE:
   1021	default:
   1022		/* BUG(); */
   1023		break;
   1024	}
   1025	cursor->need_crc = true;
   1026}
   1027
   1028void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
   1029			       struct ceph_msg *msg, size_t length)
   1030{
   1031	BUG_ON(!length);
   1032	BUG_ON(length > msg->data_length);
   1033	BUG_ON(!msg->num_data_items);
   1034
   1035	cursor->total_resid = length;
   1036	cursor->data = msg->data;
   1037
   1038	__ceph_msg_data_cursor_init(cursor);
   1039}
   1040
   1041/*
   1042 * Return the page containing the next piece to process for a given
   1043 * data item, and supply the page offset and length of that piece.
   1044 * Indicate whether this is the last piece in this data item.
   1045 */
   1046struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
   1047				size_t *page_offset, size_t *length,
   1048				bool *last_piece)
   1049{
   1050	struct page *page;
   1051
   1052	switch (cursor->data->type) {
   1053	case CEPH_MSG_DATA_PAGELIST:
   1054		page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
   1055		break;
   1056	case CEPH_MSG_DATA_PAGES:
   1057		page = ceph_msg_data_pages_next(cursor, page_offset, length);
   1058		break;
   1059#ifdef CONFIG_BLOCK
   1060	case CEPH_MSG_DATA_BIO:
   1061		page = ceph_msg_data_bio_next(cursor, page_offset, length);
   1062		break;
   1063#endif /* CONFIG_BLOCK */
   1064	case CEPH_MSG_DATA_BVECS:
   1065		page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
   1066		break;
   1067	case CEPH_MSG_DATA_NONE:
   1068	default:
   1069		page = NULL;
   1070		break;
   1071	}
   1072
   1073	BUG_ON(!page);
   1074	BUG_ON(*page_offset + *length > PAGE_SIZE);
   1075	BUG_ON(!*length);
   1076	BUG_ON(*length > cursor->resid);
   1077	if (last_piece)
   1078		*last_piece = cursor->last_piece;
   1079
   1080	return page;
   1081}
   1082
   1083/*
   1084 * Returns true if the result moves the cursor on to the next piece
   1085 * of the data item.
   1086 */
   1087void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
   1088{
   1089	bool new_piece;
   1090
   1091	BUG_ON(bytes > cursor->resid);
   1092	switch (cursor->data->type) {
   1093	case CEPH_MSG_DATA_PAGELIST:
   1094		new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
   1095		break;
   1096	case CEPH_MSG_DATA_PAGES:
   1097		new_piece = ceph_msg_data_pages_advance(cursor, bytes);
   1098		break;
   1099#ifdef CONFIG_BLOCK
   1100	case CEPH_MSG_DATA_BIO:
   1101		new_piece = ceph_msg_data_bio_advance(cursor, bytes);
   1102		break;
   1103#endif /* CONFIG_BLOCK */
   1104	case CEPH_MSG_DATA_BVECS:
   1105		new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
   1106		break;
   1107	case CEPH_MSG_DATA_NONE:
   1108	default:
   1109		BUG();
   1110		break;
   1111	}
   1112	cursor->total_resid -= bytes;
   1113
   1114	if (!cursor->resid && cursor->total_resid) {
   1115		WARN_ON(!cursor->last_piece);
   1116		cursor->data++;
   1117		__ceph_msg_data_cursor_init(cursor);
   1118		new_piece = true;
   1119	}
   1120	cursor->need_crc = new_piece;
   1121}
   1122
   1123u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
   1124		     unsigned int length)
   1125{
   1126	char *kaddr;
   1127
   1128	kaddr = kmap(page);
   1129	BUG_ON(kaddr == NULL);
   1130	crc = crc32c(crc, kaddr + page_offset, length);
   1131	kunmap(page);
   1132
   1133	return crc;
   1134}
   1135
   1136bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
   1137{
   1138	struct sockaddr_storage ss = addr->in_addr; /* align */
   1139	struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
   1140	struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
   1141
   1142	switch (ss.ss_family) {
   1143	case AF_INET:
   1144		return addr4->s_addr == htonl(INADDR_ANY);
   1145	case AF_INET6:
   1146		return ipv6_addr_any(addr6);
   1147	default:
   1148		return true;
   1149	}
   1150}
   1151
   1152int ceph_addr_port(const struct ceph_entity_addr *addr)
   1153{
   1154	switch (get_unaligned(&addr->in_addr.ss_family)) {
   1155	case AF_INET:
   1156		return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
   1157	case AF_INET6:
   1158		return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
   1159	}
   1160	return 0;
   1161}
   1162
   1163void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
   1164{
   1165	switch (get_unaligned(&addr->in_addr.ss_family)) {
   1166	case AF_INET:
   1167		put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
   1168		break;
   1169	case AF_INET6:
   1170		put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
   1171		break;
   1172	}
   1173}
   1174
   1175/*
   1176 * Unlike other *_pton function semantics, zero indicates success.
   1177 */
   1178static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
   1179		char delim, const char **ipend)
   1180{
   1181	memset(&addr->in_addr, 0, sizeof(addr->in_addr));
   1182
   1183	if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
   1184		put_unaligned(AF_INET, &addr->in_addr.ss_family);
   1185		return 0;
   1186	}
   1187
   1188	if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
   1189		put_unaligned(AF_INET6, &addr->in_addr.ss_family);
   1190		return 0;
   1191	}
   1192
   1193	return -EINVAL;
   1194}
   1195
   1196/*
   1197 * Extract hostname string and resolve using kernel DNS facility.
   1198 */
   1199#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
   1200static int ceph_dns_resolve_name(const char *name, size_t namelen,
   1201		struct ceph_entity_addr *addr, char delim, const char **ipend)
   1202{
   1203	const char *end, *delim_p;
   1204	char *colon_p, *ip_addr = NULL;
   1205	int ip_len, ret;
   1206
   1207	/*
   1208	 * The end of the hostname occurs immediately preceding the delimiter or
   1209	 * the port marker (':') where the delimiter takes precedence.
   1210	 */
   1211	delim_p = memchr(name, delim, namelen);
   1212	colon_p = memchr(name, ':', namelen);
   1213
   1214	if (delim_p && colon_p)
   1215		end = delim_p < colon_p ? delim_p : colon_p;
   1216	else if (!delim_p && colon_p)
   1217		end = colon_p;
   1218	else {
   1219		end = delim_p;
   1220		if (!end) /* case: hostname:/ */
   1221			end = name + namelen;
   1222	}
   1223
   1224	if (end <= name)
   1225		return -EINVAL;
   1226
   1227	/* do dns_resolve upcall */
   1228	ip_len = dns_query(current->nsproxy->net_ns,
   1229			   NULL, name, end - name, NULL, &ip_addr, NULL, false);
   1230	if (ip_len > 0)
   1231		ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
   1232	else
   1233		ret = -ESRCH;
   1234
   1235	kfree(ip_addr);
   1236
   1237	*ipend = end;
   1238
   1239	pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
   1240			ret, ret ? "failed" : ceph_pr_addr(addr));
   1241
   1242	return ret;
   1243}
   1244#else
   1245static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
   1246		struct ceph_entity_addr *addr, char delim, const char **ipend)
   1247{
   1248	return -EINVAL;
   1249}
   1250#endif
   1251
   1252/*
   1253 * Parse a server name (IP or hostname). If a valid IP address is not found
   1254 * then try to extract a hostname to resolve using userspace DNS upcall.
   1255 */
   1256static int ceph_parse_server_name(const char *name, size_t namelen,
   1257		struct ceph_entity_addr *addr, char delim, const char **ipend)
   1258{
   1259	int ret;
   1260
   1261	ret = ceph_pton(name, namelen, addr, delim, ipend);
   1262	if (ret)
   1263		ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
   1264
   1265	return ret;
   1266}
   1267
   1268/*
   1269 * Parse an ip[:port] list into an addr array.  Use the default
   1270 * monitor port if a port isn't specified.
   1271 */
   1272int ceph_parse_ips(const char *c, const char *end,
   1273		   struct ceph_entity_addr *addr,
   1274		   int max_count, int *count, char delim)
   1275{
   1276	int i, ret = -EINVAL;
   1277	const char *p = c;
   1278
   1279	dout("parse_ips on '%.*s'\n", (int)(end-c), c);
   1280	for (i = 0; i < max_count; i++) {
   1281		char cur_delim = delim;
   1282		const char *ipend;
   1283		int port;
   1284
   1285		if (*p == '[') {
   1286			cur_delim = ']';
   1287			p++;
   1288		}
   1289
   1290		ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim,
   1291					     &ipend);
   1292		if (ret)
   1293			goto bad;
   1294		ret = -EINVAL;
   1295
   1296		p = ipend;
   1297
   1298		if (cur_delim == ']') {
   1299			if (*p != ']') {
   1300				dout("missing matching ']'\n");
   1301				goto bad;
   1302			}
   1303			p++;
   1304		}
   1305
   1306		/* port? */
   1307		if (p < end && *p == ':') {
   1308			port = 0;
   1309			p++;
   1310			while (p < end && *p >= '0' && *p <= '9') {
   1311				port = (port * 10) + (*p - '0');
   1312				p++;
   1313			}
   1314			if (port == 0)
   1315				port = CEPH_MON_PORT;
   1316			else if (port > 65535)
   1317				goto bad;
   1318		} else {
   1319			port = CEPH_MON_PORT;
   1320		}
   1321
   1322		ceph_addr_set_port(&addr[i], port);
   1323		/*
   1324		 * We want the type to be set according to ms_mode
   1325		 * option, but options are normally parsed after mon
   1326		 * addresses.  Rather than complicating parsing, set
   1327		 * to LEGACY and override in build_initial_monmap()
   1328		 * for mon addresses and ceph_messenger_init() for
   1329		 * ip option.
   1330		 */
   1331		addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
   1332		addr[i].nonce = 0;
   1333
   1334		dout("%s got %s\n", __func__, ceph_pr_addr(&addr[i]));
   1335
   1336		if (p == end)
   1337			break;
   1338		if (*p != delim)
   1339			goto bad;
   1340		p++;
   1341	}
   1342
   1343	if (p != end)
   1344		goto bad;
   1345
   1346	if (count)
   1347		*count = i + 1;
   1348	return 0;
   1349
   1350bad:
   1351	return ret;
   1352}
   1353
   1354/*
   1355 * Process message.  This happens in the worker thread.  The callback should
   1356 * be careful not to do anything that waits on other incoming messages or it
   1357 * may deadlock.
   1358 */
   1359void ceph_con_process_message(struct ceph_connection *con)
   1360{
   1361	struct ceph_msg *msg = con->in_msg;
   1362
   1363	BUG_ON(con->in_msg->con != con);
   1364	con->in_msg = NULL;
   1365
   1366	/* if first message, set peer_name */
   1367	if (con->peer_name.type == 0)
   1368		con->peer_name = msg->hdr.src;
   1369
   1370	con->in_seq++;
   1371	mutex_unlock(&con->mutex);
   1372
   1373	dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
   1374	     msg, le64_to_cpu(msg->hdr.seq),
   1375	     ENTITY_NAME(msg->hdr.src),
   1376	     le16_to_cpu(msg->hdr.type),
   1377	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
   1378	     le32_to_cpu(msg->hdr.front_len),
   1379	     le32_to_cpu(msg->hdr.middle_len),
   1380	     le32_to_cpu(msg->hdr.data_len),
   1381	     con->in_front_crc, con->in_middle_crc, con->in_data_crc);
   1382	con->ops->dispatch(con, msg);
   1383
   1384	mutex_lock(&con->mutex);
   1385}
   1386
   1387/*
   1388 * Atomically queue work on a connection after the specified delay.
   1389 * Bump @con reference to avoid races with connection teardown.
   1390 * Returns 0 if work was queued, or an error code otherwise.
   1391 */
   1392static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
   1393{
   1394	if (!con->ops->get(con)) {
   1395		dout("%s %p ref count 0\n", __func__, con);
   1396		return -ENOENT;
   1397	}
   1398
   1399	if (delay >= HZ)
   1400		delay = round_jiffies_relative(delay);
   1401
   1402	dout("%s %p %lu\n", __func__, con, delay);
   1403	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
   1404		dout("%s %p - already queued\n", __func__, con);
   1405		con->ops->put(con);
   1406		return -EBUSY;
   1407	}
   1408
   1409	return 0;
   1410}
   1411
   1412static void queue_con(struct ceph_connection *con)
   1413{
   1414	(void) queue_con_delay(con, 0);
   1415}
   1416
   1417static void cancel_con(struct ceph_connection *con)
   1418{
   1419	if (cancel_delayed_work(&con->work)) {
   1420		dout("%s %p\n", __func__, con);
   1421		con->ops->put(con);
   1422	}
   1423}
   1424
   1425static bool con_sock_closed(struct ceph_connection *con)
   1426{
   1427	if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
   1428		return false;
   1429
   1430#define CASE(x)								\
   1431	case CEPH_CON_S_ ## x:						\
   1432		con->error_msg = "socket closed (con state " #x ")";	\
   1433		break;
   1434
   1435	switch (con->state) {
   1436	CASE(CLOSED);
   1437	CASE(PREOPEN);
   1438	CASE(V1_BANNER);
   1439	CASE(V1_CONNECT_MSG);
   1440	CASE(V2_BANNER_PREFIX);
   1441	CASE(V2_BANNER_PAYLOAD);
   1442	CASE(V2_HELLO);
   1443	CASE(V2_AUTH);
   1444	CASE(V2_AUTH_SIGNATURE);
   1445	CASE(V2_SESSION_CONNECT);
   1446	CASE(V2_SESSION_RECONNECT);
   1447	CASE(OPEN);
   1448	CASE(STANDBY);
   1449	default:
   1450		BUG();
   1451	}
   1452#undef CASE
   1453
   1454	return true;
   1455}
   1456
   1457static bool con_backoff(struct ceph_connection *con)
   1458{
   1459	int ret;
   1460
   1461	if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
   1462		return false;
   1463
   1464	ret = queue_con_delay(con, con->delay);
   1465	if (ret) {
   1466		dout("%s: con %p FAILED to back off %lu\n", __func__,
   1467			con, con->delay);
   1468		BUG_ON(ret == -ENOENT);
   1469		ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
   1470	}
   1471
   1472	return true;
   1473}
   1474
   1475/* Finish fault handling; con->mutex must *not* be held here */
   1476
   1477static void con_fault_finish(struct ceph_connection *con)
   1478{
   1479	dout("%s %p\n", __func__, con);
   1480
   1481	/*
   1482	 * in case we faulted due to authentication, invalidate our
   1483	 * current tickets so that we can get new ones.
   1484	 */
   1485	if (con->v1.auth_retry) {
   1486		dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
   1487		if (con->ops->invalidate_authorizer)
   1488			con->ops->invalidate_authorizer(con);
   1489		con->v1.auth_retry = 0;
   1490	}
   1491
   1492	if (con->ops->fault)
   1493		con->ops->fault(con);
   1494}
   1495
   1496/*
   1497 * Do some work on a connection.  Drop a connection ref when we're done.
   1498 */
   1499static void ceph_con_workfn(struct work_struct *work)
   1500{
   1501	struct ceph_connection *con = container_of(work, struct ceph_connection,
   1502						   work.work);
   1503	bool fault;
   1504
   1505	mutex_lock(&con->mutex);
   1506	while (true) {
   1507		int ret;
   1508
   1509		if ((fault = con_sock_closed(con))) {
   1510			dout("%s: con %p SOCK_CLOSED\n", __func__, con);
   1511			break;
   1512		}
   1513		if (con_backoff(con)) {
   1514			dout("%s: con %p BACKOFF\n", __func__, con);
   1515			break;
   1516		}
   1517		if (con->state == CEPH_CON_S_STANDBY) {
   1518			dout("%s: con %p STANDBY\n", __func__, con);
   1519			break;
   1520		}
   1521		if (con->state == CEPH_CON_S_CLOSED) {
   1522			dout("%s: con %p CLOSED\n", __func__, con);
   1523			BUG_ON(con->sock);
   1524			break;
   1525		}
   1526		if (con->state == CEPH_CON_S_PREOPEN) {
   1527			dout("%s: con %p PREOPEN\n", __func__, con);
   1528			BUG_ON(con->sock);
   1529		}
   1530
   1531		if (ceph_msgr2(from_msgr(con->msgr)))
   1532			ret = ceph_con_v2_try_read(con);
   1533		else
   1534			ret = ceph_con_v1_try_read(con);
   1535		if (ret < 0) {
   1536			if (ret == -EAGAIN)
   1537				continue;
   1538			if (!con->error_msg)
   1539				con->error_msg = "socket error on read";
   1540			fault = true;
   1541			break;
   1542		}
   1543
   1544		if (ceph_msgr2(from_msgr(con->msgr)))
   1545			ret = ceph_con_v2_try_write(con);
   1546		else
   1547			ret = ceph_con_v1_try_write(con);
   1548		if (ret < 0) {
   1549			if (ret == -EAGAIN)
   1550				continue;
   1551			if (!con->error_msg)
   1552				con->error_msg = "socket error on write";
   1553			fault = true;
   1554		}
   1555
   1556		break;	/* If we make it to here, we're done */
   1557	}
   1558	if (fault)
   1559		con_fault(con);
   1560	mutex_unlock(&con->mutex);
   1561
   1562	if (fault)
   1563		con_fault_finish(con);
   1564
   1565	con->ops->put(con);
   1566}
   1567
   1568/*
   1569 * Generic error/fault handler.  A retry mechanism is used with
   1570 * exponential backoff
   1571 */
   1572static void con_fault(struct ceph_connection *con)
   1573{
   1574	dout("fault %p state %d to peer %s\n",
   1575	     con, con->state, ceph_pr_addr(&con->peer_addr));
   1576
   1577	pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
   1578		ceph_pr_addr(&con->peer_addr), con->error_msg);
   1579	con->error_msg = NULL;
   1580
   1581	WARN_ON(con->state == CEPH_CON_S_STANDBY ||
   1582		con->state == CEPH_CON_S_CLOSED);
   1583
   1584	ceph_con_reset_protocol(con);
   1585
   1586	if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
   1587		dout("fault on LOSSYTX channel, marking CLOSED\n");
   1588		con->state = CEPH_CON_S_CLOSED;
   1589		return;
   1590	}
   1591
   1592	/* Requeue anything that hasn't been acked */
   1593	list_splice_init(&con->out_sent, &con->out_queue);
   1594
   1595	/* If there are no messages queued or keepalive pending, place
   1596	 * the connection in a STANDBY state */
   1597	if (list_empty(&con->out_queue) &&
   1598	    !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
   1599		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
   1600		ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
   1601		con->state = CEPH_CON_S_STANDBY;
   1602	} else {
   1603		/* retry after a delay. */
   1604		con->state = CEPH_CON_S_PREOPEN;
   1605		if (!con->delay) {
   1606			con->delay = BASE_DELAY_INTERVAL;
   1607		} else if (con->delay < MAX_DELAY_INTERVAL) {
   1608			con->delay *= 2;
   1609			if (con->delay > MAX_DELAY_INTERVAL)
   1610				con->delay = MAX_DELAY_INTERVAL;
   1611		}
   1612		ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
   1613		queue_con(con);
   1614	}
   1615}
   1616
   1617void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
   1618{
   1619	u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
   1620	msgr->inst.addr.nonce = cpu_to_le32(nonce);
   1621	ceph_encode_my_addr(msgr);
   1622}
   1623
   1624/*
   1625 * initialize a new messenger instance
   1626 */
   1627void ceph_messenger_init(struct ceph_messenger *msgr,
   1628			 struct ceph_entity_addr *myaddr)
   1629{
   1630	spin_lock_init(&msgr->global_seq_lock);
   1631
   1632	if (myaddr) {
   1633		memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
   1634		       sizeof(msgr->inst.addr.in_addr));
   1635		ceph_addr_set_port(&msgr->inst.addr, 0);
   1636	}
   1637
   1638	/*
   1639	 * Since nautilus, clients are identified using type ANY.
   1640	 * For msgr1, ceph_encode_banner_addr() munges it to NONE.
   1641	 */
   1642	msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
   1643
   1644	/* generate a random non-zero nonce */
   1645	do {
   1646		get_random_bytes(&msgr->inst.addr.nonce,
   1647				 sizeof(msgr->inst.addr.nonce));
   1648	} while (!msgr->inst.addr.nonce);
   1649	ceph_encode_my_addr(msgr);
   1650
   1651	atomic_set(&msgr->stopping, 0);
   1652	write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
   1653
   1654	dout("%s %p\n", __func__, msgr);
   1655}
   1656
   1657void ceph_messenger_fini(struct ceph_messenger *msgr)
   1658{
   1659	put_net(read_pnet(&msgr->net));
   1660}
   1661
   1662static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
   1663{
   1664	if (msg->con)
   1665		msg->con->ops->put(msg->con);
   1666
   1667	msg->con = con ? con->ops->get(con) : NULL;
   1668	BUG_ON(msg->con != con);
   1669}
   1670
   1671static void clear_standby(struct ceph_connection *con)
   1672{
   1673	/* come back from STANDBY? */
   1674	if (con->state == CEPH_CON_S_STANDBY) {
   1675		dout("clear_standby %p and ++connect_seq\n", con);
   1676		con->state = CEPH_CON_S_PREOPEN;
   1677		con->v1.connect_seq++;
   1678		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
   1679		WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
   1680	}
   1681}
   1682
   1683/*
   1684 * Queue up an outgoing message on the given connection.
   1685 *
   1686 * Consumes a ref on @msg.
   1687 */
   1688void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
   1689{
   1690	/* set src+dst */
   1691	msg->hdr.src = con->msgr->inst.name;
   1692	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
   1693	msg->needs_out_seq = true;
   1694
   1695	mutex_lock(&con->mutex);
   1696
   1697	if (con->state == CEPH_CON_S_CLOSED) {
   1698		dout("con_send %p closed, dropping %p\n", con, msg);
   1699		ceph_msg_put(msg);
   1700		mutex_unlock(&con->mutex);
   1701		return;
   1702	}
   1703
   1704	msg_con_set(msg, con);
   1705
   1706	BUG_ON(!list_empty(&msg->list_head));
   1707	list_add_tail(&msg->list_head, &con->out_queue);
   1708	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
   1709	     ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
   1710	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
   1711	     le32_to_cpu(msg->hdr.front_len),
   1712	     le32_to_cpu(msg->hdr.middle_len),
   1713	     le32_to_cpu(msg->hdr.data_len));
   1714
   1715	clear_standby(con);
   1716	mutex_unlock(&con->mutex);
   1717
   1718	/* if there wasn't anything waiting to send before, queue
   1719	 * new work */
   1720	if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
   1721		queue_con(con);
   1722}
   1723EXPORT_SYMBOL(ceph_con_send);
   1724
   1725/*
   1726 * Revoke a message that was previously queued for send
   1727 */
   1728void ceph_msg_revoke(struct ceph_msg *msg)
   1729{
   1730	struct ceph_connection *con = msg->con;
   1731
   1732	if (!con) {
   1733		dout("%s msg %p null con\n", __func__, msg);
   1734		return;		/* Message not in our possession */
   1735	}
   1736
   1737	mutex_lock(&con->mutex);
   1738	if (list_empty(&msg->list_head)) {
   1739		WARN_ON(con->out_msg == msg);
   1740		dout("%s con %p msg %p not linked\n", __func__, con, msg);
   1741		mutex_unlock(&con->mutex);
   1742		return;
   1743	}
   1744
   1745	dout("%s con %p msg %p was linked\n", __func__, con, msg);
   1746	msg->hdr.seq = 0;
   1747	ceph_msg_remove(msg);
   1748
   1749	if (con->out_msg == msg) {
   1750		WARN_ON(con->state != CEPH_CON_S_OPEN);
   1751		dout("%s con %p msg %p was sending\n", __func__, con, msg);
   1752		if (ceph_msgr2(from_msgr(con->msgr)))
   1753			ceph_con_v2_revoke(con);
   1754		else
   1755			ceph_con_v1_revoke(con);
   1756		ceph_msg_put(con->out_msg);
   1757		con->out_msg = NULL;
   1758	} else {
   1759		dout("%s con %p msg %p not current, out_msg %p\n", __func__,
   1760		     con, msg, con->out_msg);
   1761	}
   1762	mutex_unlock(&con->mutex);
   1763}
   1764
   1765/*
   1766 * Revoke a message that we may be reading data into
   1767 */
   1768void ceph_msg_revoke_incoming(struct ceph_msg *msg)
   1769{
   1770	struct ceph_connection *con = msg->con;
   1771
   1772	if (!con) {
   1773		dout("%s msg %p null con\n", __func__, msg);
   1774		return;		/* Message not in our possession */
   1775	}
   1776
   1777	mutex_lock(&con->mutex);
   1778	if (con->in_msg == msg) {
   1779		WARN_ON(con->state != CEPH_CON_S_OPEN);
   1780		dout("%s con %p msg %p was recving\n", __func__, con, msg);
   1781		if (ceph_msgr2(from_msgr(con->msgr)))
   1782			ceph_con_v2_revoke_incoming(con);
   1783		else
   1784			ceph_con_v1_revoke_incoming(con);
   1785		ceph_msg_put(con->in_msg);
   1786		con->in_msg = NULL;
   1787	} else {
   1788		dout("%s con %p msg %p not current, in_msg %p\n", __func__,
   1789		     con, msg, con->in_msg);
   1790	}
   1791	mutex_unlock(&con->mutex);
   1792}
   1793
   1794/*
   1795 * Queue a keepalive byte to ensure the tcp connection is alive.
   1796 */
   1797void ceph_con_keepalive(struct ceph_connection *con)
   1798{
   1799	dout("con_keepalive %p\n", con);
   1800	mutex_lock(&con->mutex);
   1801	clear_standby(con);
   1802	ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
   1803	mutex_unlock(&con->mutex);
   1804
   1805	if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
   1806		queue_con(con);
   1807}
   1808EXPORT_SYMBOL(ceph_con_keepalive);
   1809
   1810bool ceph_con_keepalive_expired(struct ceph_connection *con,
   1811			       unsigned long interval)
   1812{
   1813	if (interval > 0 &&
   1814	    (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
   1815		struct timespec64 now;
   1816		struct timespec64 ts;
   1817		ktime_get_real_ts64(&now);
   1818		jiffies_to_timespec64(interval, &ts);
   1819		ts = timespec64_add(con->last_keepalive_ack, ts);
   1820		return timespec64_compare(&now, &ts) >= 0;
   1821	}
   1822	return false;
   1823}
   1824
   1825static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
   1826{
   1827	BUG_ON(msg->num_data_items >= msg->max_data_items);
   1828	return &msg->data[msg->num_data_items++];
   1829}
   1830
   1831static void ceph_msg_data_destroy(struct ceph_msg_data *data)
   1832{
   1833	if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
   1834		int num_pages = calc_pages_for(data->alignment, data->length);
   1835		ceph_release_page_vector(data->pages, num_pages);
   1836	} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
   1837		ceph_pagelist_release(data->pagelist);
   1838	}
   1839}
   1840
   1841void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
   1842			     size_t length, size_t alignment, bool own_pages)
   1843{
   1844	struct ceph_msg_data *data;
   1845
   1846	BUG_ON(!pages);
   1847	BUG_ON(!length);
   1848
   1849	data = ceph_msg_data_add(msg);
   1850	data->type = CEPH_MSG_DATA_PAGES;
   1851	data->pages = pages;
   1852	data->length = length;
   1853	data->alignment = alignment & ~PAGE_MASK;
   1854	data->own_pages = own_pages;
   1855
   1856	msg->data_length += length;
   1857}
   1858EXPORT_SYMBOL(ceph_msg_data_add_pages);
   1859
   1860void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
   1861				struct ceph_pagelist *pagelist)
   1862{
   1863	struct ceph_msg_data *data;
   1864
   1865	BUG_ON(!pagelist);
   1866	BUG_ON(!pagelist->length);
   1867
   1868	data = ceph_msg_data_add(msg);
   1869	data->type = CEPH_MSG_DATA_PAGELIST;
   1870	refcount_inc(&pagelist->refcnt);
   1871	data->pagelist = pagelist;
   1872
   1873	msg->data_length += pagelist->length;
   1874}
   1875EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
   1876
   1877#ifdef	CONFIG_BLOCK
   1878void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
   1879			   u32 length)
   1880{
   1881	struct ceph_msg_data *data;
   1882
   1883	data = ceph_msg_data_add(msg);
   1884	data->type = CEPH_MSG_DATA_BIO;
   1885	data->bio_pos = *bio_pos;
   1886	data->bio_length = length;
   1887
   1888	msg->data_length += length;
   1889}
   1890EXPORT_SYMBOL(ceph_msg_data_add_bio);
   1891#endif	/* CONFIG_BLOCK */
   1892
   1893void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
   1894			     struct ceph_bvec_iter *bvec_pos)
   1895{
   1896	struct ceph_msg_data *data;
   1897
   1898	data = ceph_msg_data_add(msg);
   1899	data->type = CEPH_MSG_DATA_BVECS;
   1900	data->bvec_pos = *bvec_pos;
   1901
   1902	msg->data_length += bvec_pos->iter.bi_size;
   1903}
   1904EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
   1905
   1906/*
   1907 * construct a new message with given type, size
   1908 * the new msg has a ref count of 1.
   1909 */
   1910struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
   1911			       gfp_t flags, bool can_fail)
   1912{
   1913	struct ceph_msg *m;
   1914
   1915	m = kmem_cache_zalloc(ceph_msg_cache, flags);
   1916	if (m == NULL)
   1917		goto out;
   1918
   1919	m->hdr.type = cpu_to_le16(type);
   1920	m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
   1921	m->hdr.front_len = cpu_to_le32(front_len);
   1922
   1923	INIT_LIST_HEAD(&m->list_head);
   1924	kref_init(&m->kref);
   1925
   1926	/* front */
   1927	if (front_len) {
   1928		m->front.iov_base = kvmalloc(front_len, flags);
   1929		if (m->front.iov_base == NULL) {
   1930			dout("ceph_msg_new can't allocate %d bytes\n",
   1931			     front_len);
   1932			goto out2;
   1933		}
   1934	} else {
   1935		m->front.iov_base = NULL;
   1936	}
   1937	m->front_alloc_len = m->front.iov_len = front_len;
   1938
   1939	if (max_data_items) {
   1940		m->data = kmalloc_array(max_data_items, sizeof(*m->data),
   1941					flags);
   1942		if (!m->data)
   1943			goto out2;
   1944
   1945		m->max_data_items = max_data_items;
   1946	}
   1947
   1948	dout("ceph_msg_new %p front %d\n", m, front_len);
   1949	return m;
   1950
   1951out2:
   1952	ceph_msg_put(m);
   1953out:
   1954	if (!can_fail) {
   1955		pr_err("msg_new can't create type %d front %d\n", type,
   1956		       front_len);
   1957		WARN_ON(1);
   1958	} else {
   1959		dout("msg_new can't create type %d front %d\n", type,
   1960		     front_len);
   1961	}
   1962	return NULL;
   1963}
   1964EXPORT_SYMBOL(ceph_msg_new2);
   1965
   1966struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
   1967			      bool can_fail)
   1968{
   1969	return ceph_msg_new2(type, front_len, 0, flags, can_fail);
   1970}
   1971EXPORT_SYMBOL(ceph_msg_new);
   1972
   1973/*
   1974 * Allocate "middle" portion of a message, if it is needed and wasn't
   1975 * allocated by alloc_msg.  This allows us to read a small fixed-size
   1976 * per-type header in the front and then gracefully fail (i.e.,
   1977 * propagate the error to the caller based on info in the front) when
   1978 * the middle is too large.
   1979 */
   1980static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
   1981{
   1982	int type = le16_to_cpu(msg->hdr.type);
   1983	int middle_len = le32_to_cpu(msg->hdr.middle_len);
   1984
   1985	dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
   1986	     ceph_msg_type_name(type), middle_len);
   1987	BUG_ON(!middle_len);
   1988	BUG_ON(msg->middle);
   1989
   1990	msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
   1991	if (!msg->middle)
   1992		return -ENOMEM;
   1993	return 0;
   1994}
   1995
   1996/*
   1997 * Allocate a message for receiving an incoming message on a
   1998 * connection, and save the result in con->in_msg.  Uses the
   1999 * connection's private alloc_msg op if available.
   2000 *
   2001 * Returns 0 on success, or a negative error code.
   2002 *
   2003 * On success, if we set *skip = 1:
   2004 *  - the next message should be skipped and ignored.
   2005 *  - con->in_msg == NULL
   2006 * or if we set *skip = 0:
   2007 *  - con->in_msg is non-null.
   2008 * On error (ENOMEM, EAGAIN, ...),
   2009 *  - con->in_msg == NULL
   2010 */
   2011int ceph_con_in_msg_alloc(struct ceph_connection *con,
   2012			  struct ceph_msg_header *hdr, int *skip)
   2013{
   2014	int middle_len = le32_to_cpu(hdr->middle_len);
   2015	struct ceph_msg *msg;
   2016	int ret = 0;
   2017
   2018	BUG_ON(con->in_msg != NULL);
   2019	BUG_ON(!con->ops->alloc_msg);
   2020
   2021	mutex_unlock(&con->mutex);
   2022	msg = con->ops->alloc_msg(con, hdr, skip);
   2023	mutex_lock(&con->mutex);
   2024	if (con->state != CEPH_CON_S_OPEN) {
   2025		if (msg)
   2026			ceph_msg_put(msg);
   2027		return -EAGAIN;
   2028	}
   2029	if (msg) {
   2030		BUG_ON(*skip);
   2031		msg_con_set(msg, con);
   2032		con->in_msg = msg;
   2033	} else {
   2034		/*
   2035		 * Null message pointer means either we should skip
   2036		 * this message or we couldn't allocate memory.  The
   2037		 * former is not an error.
   2038		 */
   2039		if (*skip)
   2040			return 0;
   2041
   2042		con->error_msg = "error allocating memory for incoming message";
   2043		return -ENOMEM;
   2044	}
   2045	memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
   2046
   2047	if (middle_len && !con->in_msg->middle) {
   2048		ret = ceph_alloc_middle(con, con->in_msg);
   2049		if (ret < 0) {
   2050			ceph_msg_put(con->in_msg);
   2051			con->in_msg = NULL;
   2052		}
   2053	}
   2054
   2055	return ret;
   2056}
   2057
   2058void ceph_con_get_out_msg(struct ceph_connection *con)
   2059{
   2060	struct ceph_msg *msg;
   2061
   2062	BUG_ON(list_empty(&con->out_queue));
   2063	msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
   2064	WARN_ON(msg->con != con);
   2065
   2066	/*
   2067	 * Put the message on "sent" list using a ref from ceph_con_send().
   2068	 * It is put when the message is acked or revoked.
   2069	 */
   2070	list_move_tail(&msg->list_head, &con->out_sent);
   2071
   2072	/*
   2073	 * Only assign outgoing seq # if we haven't sent this message
   2074	 * yet.  If it is requeued, resend with it's original seq.
   2075	 */
   2076	if (msg->needs_out_seq) {
   2077		msg->hdr.seq = cpu_to_le64(++con->out_seq);
   2078		msg->needs_out_seq = false;
   2079
   2080		if (con->ops->reencode_message)
   2081			con->ops->reencode_message(msg);
   2082	}
   2083
   2084	/*
   2085	 * Get a ref for out_msg.  It is put when we are done sending the
   2086	 * message or in case of a fault.
   2087	 */
   2088	WARN_ON(con->out_msg);
   2089	con->out_msg = ceph_msg_get(msg);
   2090}
   2091
   2092/*
   2093 * Free a generically kmalloc'd message.
   2094 */
   2095static void ceph_msg_free(struct ceph_msg *m)
   2096{
   2097	dout("%s %p\n", __func__, m);
   2098	kvfree(m->front.iov_base);
   2099	kfree(m->data);
   2100	kmem_cache_free(ceph_msg_cache, m);
   2101}
   2102
   2103static void ceph_msg_release(struct kref *kref)
   2104{
   2105	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
   2106	int i;
   2107
   2108	dout("%s %p\n", __func__, m);
   2109	WARN_ON(!list_empty(&m->list_head));
   2110
   2111	msg_con_set(m, NULL);
   2112
   2113	/* drop middle, data, if any */
   2114	if (m->middle) {
   2115		ceph_buffer_put(m->middle);
   2116		m->middle = NULL;
   2117	}
   2118
   2119	for (i = 0; i < m->num_data_items; i++)
   2120		ceph_msg_data_destroy(&m->data[i]);
   2121
   2122	if (m->pool)
   2123		ceph_msgpool_put(m->pool, m);
   2124	else
   2125		ceph_msg_free(m);
   2126}
   2127
   2128struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
   2129{
   2130	dout("%s %p (was %d)\n", __func__, msg,
   2131	     kref_read(&msg->kref));
   2132	kref_get(&msg->kref);
   2133	return msg;
   2134}
   2135EXPORT_SYMBOL(ceph_msg_get);
   2136
   2137void ceph_msg_put(struct ceph_msg *msg)
   2138{
   2139	dout("%s %p (was %d)\n", __func__, msg,
   2140	     kref_read(&msg->kref));
   2141	kref_put(&msg->kref, ceph_msg_release);
   2142}
   2143EXPORT_SYMBOL(ceph_msg_put);
   2144
   2145void ceph_msg_dump(struct ceph_msg *msg)
   2146{
   2147	pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
   2148		 msg->front_alloc_len, msg->data_length);
   2149	print_hex_dump(KERN_DEBUG, "header: ",
   2150		       DUMP_PREFIX_OFFSET, 16, 1,
   2151		       &msg->hdr, sizeof(msg->hdr), true);
   2152	print_hex_dump(KERN_DEBUG, " front: ",
   2153		       DUMP_PREFIX_OFFSET, 16, 1,
   2154		       msg->front.iov_base, msg->front.iov_len, true);
   2155	if (msg->middle)
   2156		print_hex_dump(KERN_DEBUG, "middle: ",
   2157			       DUMP_PREFIX_OFFSET, 16, 1,
   2158			       msg->middle->vec.iov_base,
   2159			       msg->middle->vec.iov_len, true);
   2160	print_hex_dump(KERN_DEBUG, "footer: ",
   2161		       DUMP_PREFIX_OFFSET, 16, 1,
   2162		       &msg->footer, sizeof(msg->footer), true);
   2163}
   2164EXPORT_SYMBOL(ceph_msg_dump);