cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

messenger_v1.c (39853B)


      1// SPDX-License-Identifier: GPL-2.0
      2#include <linux/ceph/ceph_debug.h>
      3
      4#include <linux/bvec.h>
      5#include <linux/crc32c.h>
      6#include <linux/net.h>
      7#include <linux/socket.h>
      8#include <net/sock.h>
      9
     10#include <linux/ceph/ceph_features.h>
     11#include <linux/ceph/decode.h>
     12#include <linux/ceph/libceph.h>
     13#include <linux/ceph/messenger.h>
     14
     15/* static tag bytes (protocol control messages) */
     16static char tag_msg = CEPH_MSGR_TAG_MSG;
     17static char tag_ack = CEPH_MSGR_TAG_ACK;
     18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
     19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
     20
     21/*
     22 * If @buf is NULL, discard up to @len bytes.
     23 */
     24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
     25{
     26	struct kvec iov = {buf, len};
     27	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
     28	int r;
     29
     30	if (!buf)
     31		msg.msg_flags |= MSG_TRUNC;
     32
     33	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
     34	r = sock_recvmsg(sock, &msg, msg.msg_flags);
     35	if (r == -EAGAIN)
     36		r = 0;
     37	return r;
     38}
     39
     40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
     41		     int page_offset, size_t length)
     42{
     43	struct bio_vec bvec = {
     44		.bv_page = page,
     45		.bv_offset = page_offset,
     46		.bv_len = length
     47	};
     48	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
     49	int r;
     50
     51	BUG_ON(page_offset + length > PAGE_SIZE);
     52	iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
     53	r = sock_recvmsg(sock, &msg, msg.msg_flags);
     54	if (r == -EAGAIN)
     55		r = 0;
     56	return r;
     57}
     58
     59/*
     60 * write something.  @more is true if caller will be sending more data
     61 * shortly.
     62 */
     63static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
     64			    size_t kvlen, size_t len, bool more)
     65{
     66	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
     67	int r;
     68
     69	if (more)
     70		msg.msg_flags |= MSG_MORE;
     71	else
     72		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
     73
     74	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
     75	if (r == -EAGAIN)
     76		r = 0;
     77	return r;
     78}
     79
     80/*
     81 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
     82 */
     83static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
     84			     int offset, size_t size, int more)
     85{
     86	ssize_t (*sendpage)(struct socket *sock, struct page *page,
     87			    int offset, size_t size, int flags);
     88	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
     89	int ret;
     90
     91	/*
     92	 * sendpage cannot properly handle pages with page_count == 0,
     93	 * we need to fall back to sendmsg if that's the case.
     94	 *
     95	 * Same goes for slab pages: skb_can_coalesce() allows
     96	 * coalescing neighboring slab objects into a single frag which
     97	 * triggers one of hardened usercopy checks.
     98	 */
     99	if (sendpage_ok(page))
    100		sendpage = sock->ops->sendpage;
    101	else
    102		sendpage = sock_no_sendpage;
    103
    104	ret = sendpage(sock, page, offset, size, flags);
    105	if (ret == -EAGAIN)
    106		ret = 0;
    107
    108	return ret;
    109}
    110
    111static void con_out_kvec_reset(struct ceph_connection *con)
    112{
    113	BUG_ON(con->v1.out_skip);
    114
    115	con->v1.out_kvec_left = 0;
    116	con->v1.out_kvec_bytes = 0;
    117	con->v1.out_kvec_cur = &con->v1.out_kvec[0];
    118}
    119
    120static void con_out_kvec_add(struct ceph_connection *con,
    121				size_t size, void *data)
    122{
    123	int index = con->v1.out_kvec_left;
    124
    125	BUG_ON(con->v1.out_skip);
    126	BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
    127
    128	con->v1.out_kvec[index].iov_len = size;
    129	con->v1.out_kvec[index].iov_base = data;
    130	con->v1.out_kvec_left++;
    131	con->v1.out_kvec_bytes += size;
    132}
    133
    134/*
    135 * Chop off a kvec from the end.  Return residual number of bytes for
    136 * that kvec, i.e. how many bytes would have been written if the kvec
    137 * hadn't been nuked.
    138 */
    139static int con_out_kvec_skip(struct ceph_connection *con)
    140{
    141	int skip = 0;
    142
    143	if (con->v1.out_kvec_bytes > 0) {
    144		skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
    145		BUG_ON(con->v1.out_kvec_bytes < skip);
    146		BUG_ON(!con->v1.out_kvec_left);
    147		con->v1.out_kvec_bytes -= skip;
    148		con->v1.out_kvec_left--;
    149	}
    150
    151	return skip;
    152}
    153
    154static size_t sizeof_footer(struct ceph_connection *con)
    155{
    156	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
    157	    sizeof(struct ceph_msg_footer) :
    158	    sizeof(struct ceph_msg_footer_old);
    159}
    160
    161static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
    162{
    163	/* Initialize data cursor */
    164
    165	ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
    166}
    167
    168/*
    169 * Prepare footer for currently outgoing message, and finish things
    170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
    171 */
    172static void prepare_write_message_footer(struct ceph_connection *con)
    173{
    174	struct ceph_msg *m = con->out_msg;
    175
    176	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
    177
    178	dout("prepare_write_message_footer %p\n", con);
    179	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
    180	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
    181		if (con->ops->sign_message)
    182			con->ops->sign_message(m);
    183		else
    184			m->footer.sig = 0;
    185	} else {
    186		m->old_footer.flags = m->footer.flags;
    187	}
    188	con->v1.out_more = m->more_to_follow;
    189	con->v1.out_msg_done = true;
    190}
    191
    192/*
    193 * Prepare headers for the next outgoing message.
    194 */
    195static void prepare_write_message(struct ceph_connection *con)
    196{
    197	struct ceph_msg *m;
    198	u32 crc;
    199
    200	con_out_kvec_reset(con);
    201	con->v1.out_msg_done = false;
    202
    203	/* Sneak an ack in there first?  If we can get it into the same
    204	 * TCP packet that's a good thing. */
    205	if (con->in_seq > con->in_seq_acked) {
    206		con->in_seq_acked = con->in_seq;
    207		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
    208		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
    209		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
    210			&con->v1.out_temp_ack);
    211	}
    212
    213	ceph_con_get_out_msg(con);
    214	m = con->out_msg;
    215
    216	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
    217	     m, con->out_seq, le16_to_cpu(m->hdr.type),
    218	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
    219	     m->data_length);
    220	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
    221	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
    222
    223	/* tag + hdr + front + middle */
    224	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
    225	con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
    226	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
    227
    228	if (m->middle)
    229		con_out_kvec_add(con, m->middle->vec.iov_len,
    230			m->middle->vec.iov_base);
    231
    232	/* fill in hdr crc and finalize hdr */
    233	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
    234	con->out_msg->hdr.crc = cpu_to_le32(crc);
    235	memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
    236
    237	/* fill in front and middle crc, footer */
    238	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
    239	con->out_msg->footer.front_crc = cpu_to_le32(crc);
    240	if (m->middle) {
    241		crc = crc32c(0, m->middle->vec.iov_base,
    242				m->middle->vec.iov_len);
    243		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
    244	} else
    245		con->out_msg->footer.middle_crc = 0;
    246	dout("%s front_crc %u middle_crc %u\n", __func__,
    247	     le32_to_cpu(con->out_msg->footer.front_crc),
    248	     le32_to_cpu(con->out_msg->footer.middle_crc));
    249	con->out_msg->footer.flags = 0;
    250
    251	/* is there a data payload? */
    252	con->out_msg->footer.data_crc = 0;
    253	if (m->data_length) {
    254		prepare_message_data(con->out_msg, m->data_length);
    255		con->v1.out_more = 1;  /* data + footer will follow */
    256	} else {
    257		/* no, queue up footer too and be done */
    258		prepare_write_message_footer(con);
    259	}
    260
    261	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    262}
    263
    264/*
    265 * Prepare an ack.
    266 */
    267static void prepare_write_ack(struct ceph_connection *con)
    268{
    269	dout("prepare_write_ack %p %llu -> %llu\n", con,
    270	     con->in_seq_acked, con->in_seq);
    271	con->in_seq_acked = con->in_seq;
    272
    273	con_out_kvec_reset(con);
    274
    275	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
    276
    277	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
    278	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
    279			 &con->v1.out_temp_ack);
    280
    281	con->v1.out_more = 1;  /* more will follow.. eventually.. */
    282	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    283}
    284
    285/*
    286 * Prepare to share the seq during handshake
    287 */
    288static void prepare_write_seq(struct ceph_connection *con)
    289{
    290	dout("prepare_write_seq %p %llu -> %llu\n", con,
    291	     con->in_seq_acked, con->in_seq);
    292	con->in_seq_acked = con->in_seq;
    293
    294	con_out_kvec_reset(con);
    295
    296	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
    297	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
    298			 &con->v1.out_temp_ack);
    299
    300	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    301}
    302
    303/*
    304 * Prepare to write keepalive byte.
    305 */
    306static void prepare_write_keepalive(struct ceph_connection *con)
    307{
    308	dout("prepare_write_keepalive %p\n", con);
    309	con_out_kvec_reset(con);
    310	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
    311		struct timespec64 now;
    312
    313		ktime_get_real_ts64(&now);
    314		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
    315		ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
    316		con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
    317				 &con->v1.out_temp_keepalive2);
    318	} else {
    319		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
    320	}
    321	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    322}
    323
    324/*
    325 * Connection negotiation.
    326 */
    327
    328static int get_connect_authorizer(struct ceph_connection *con)
    329{
    330	struct ceph_auth_handshake *auth;
    331	int auth_proto;
    332
    333	if (!con->ops->get_authorizer) {
    334		con->v1.auth = NULL;
    335		con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
    336		con->v1.out_connect.authorizer_len = 0;
    337		return 0;
    338	}
    339
    340	auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
    341	if (IS_ERR(auth))
    342		return PTR_ERR(auth);
    343
    344	con->v1.auth = auth;
    345	con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
    346	con->v1.out_connect.authorizer_len =
    347		cpu_to_le32(auth->authorizer_buf_len);
    348	return 0;
    349}
    350
    351/*
    352 * We connected to a peer and are saying hello.
    353 */
    354static void prepare_write_banner(struct ceph_connection *con)
    355{
    356	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
    357	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
    358					&con->msgr->my_enc_addr);
    359
    360	con->v1.out_more = 0;
    361	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    362}
    363
    364static void __prepare_write_connect(struct ceph_connection *con)
    365{
    366	con_out_kvec_add(con, sizeof(con->v1.out_connect),
    367			 &con->v1.out_connect);
    368	if (con->v1.auth)
    369		con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
    370				 con->v1.auth->authorizer_buf);
    371
    372	con->v1.out_more = 0;
    373	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
    374}
    375
    376static int prepare_write_connect(struct ceph_connection *con)
    377{
    378	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
    379	int proto;
    380	int ret;
    381
    382	switch (con->peer_name.type) {
    383	case CEPH_ENTITY_TYPE_MON:
    384		proto = CEPH_MONC_PROTOCOL;
    385		break;
    386	case CEPH_ENTITY_TYPE_OSD:
    387		proto = CEPH_OSDC_PROTOCOL;
    388		break;
    389	case CEPH_ENTITY_TYPE_MDS:
    390		proto = CEPH_MDSC_PROTOCOL;
    391		break;
    392	default:
    393		BUG();
    394	}
    395
    396	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
    397	     con->v1.connect_seq, global_seq, proto);
    398
    399	con->v1.out_connect.features =
    400		cpu_to_le64(from_msgr(con->msgr)->supported_features);
    401	con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
    402	con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
    403	con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
    404	con->v1.out_connect.protocol_version = cpu_to_le32(proto);
    405	con->v1.out_connect.flags = 0;
    406
    407	ret = get_connect_authorizer(con);
    408	if (ret)
    409		return ret;
    410
    411	__prepare_write_connect(con);
    412	return 0;
    413}
    414
    415/*
    416 * write as much of pending kvecs to the socket as we can.
    417 *  1 -> done
    418 *  0 -> socket full, but more to do
    419 * <0 -> error
    420 */
    421static int write_partial_kvec(struct ceph_connection *con)
    422{
    423	int ret;
    424
    425	dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
    426	while (con->v1.out_kvec_bytes > 0) {
    427		ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
    428				       con->v1.out_kvec_left,
    429				       con->v1.out_kvec_bytes,
    430				       con->v1.out_more);
    431		if (ret <= 0)
    432			goto out;
    433		con->v1.out_kvec_bytes -= ret;
    434		if (!con->v1.out_kvec_bytes)
    435			break;            /* done */
    436
    437		/* account for full iov entries consumed */
    438		while (ret >= con->v1.out_kvec_cur->iov_len) {
    439			BUG_ON(!con->v1.out_kvec_left);
    440			ret -= con->v1.out_kvec_cur->iov_len;
    441			con->v1.out_kvec_cur++;
    442			con->v1.out_kvec_left--;
    443		}
    444		/* and for a partially-consumed entry */
    445		if (ret) {
    446			con->v1.out_kvec_cur->iov_len -= ret;
    447			con->v1.out_kvec_cur->iov_base += ret;
    448		}
    449	}
    450	con->v1.out_kvec_left = 0;
    451	ret = 1;
    452out:
    453	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
    454	     con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
    455	return ret;  /* done! */
    456}
    457
    458/*
    459 * Write as much message data payload as we can.  If we finish, queue
    460 * up the footer.
    461 *  1 -> done, footer is now queued in out_kvec[].
    462 *  0 -> socket full, but more to do
    463 * <0 -> error
    464 */
    465static int write_partial_message_data(struct ceph_connection *con)
    466{
    467	struct ceph_msg *msg = con->out_msg;
    468	struct ceph_msg_data_cursor *cursor = &msg->cursor;
    469	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
    470	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
    471	u32 crc;
    472
    473	dout("%s %p msg %p\n", __func__, con, msg);
    474
    475	if (!msg->num_data_items)
    476		return -EINVAL;
    477
    478	/*
    479	 * Iterate through each page that contains data to be
    480	 * written, and send as much as possible for each.
    481	 *
    482	 * If we are calculating the data crc (the default), we will
    483	 * need to map the page.  If we have no pages, they have
    484	 * been revoked, so use the zero page.
    485	 */
    486	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
    487	while (cursor->total_resid) {
    488		struct page *page;
    489		size_t page_offset;
    490		size_t length;
    491		int ret;
    492
    493		if (!cursor->resid) {
    494			ceph_msg_data_advance(cursor, 0);
    495			continue;
    496		}
    497
    498		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
    499		if (length == cursor->total_resid)
    500			more = MSG_MORE;
    501		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
    502					more);
    503		if (ret <= 0) {
    504			if (do_datacrc)
    505				msg->footer.data_crc = cpu_to_le32(crc);
    506
    507			return ret;
    508		}
    509		if (do_datacrc && cursor->need_crc)
    510			crc = ceph_crc32c_page(crc, page, page_offset, length);
    511		ceph_msg_data_advance(cursor, (size_t)ret);
    512	}
    513
    514	dout("%s %p msg %p done\n", __func__, con, msg);
    515
    516	/* prepare and queue up footer, too */
    517	if (do_datacrc)
    518		msg->footer.data_crc = cpu_to_le32(crc);
    519	else
    520		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
    521	con_out_kvec_reset(con);
    522	prepare_write_message_footer(con);
    523
    524	return 1;	/* must return > 0 to indicate success */
    525}
    526
    527/*
    528 * write some zeros
    529 */
    530static int write_partial_skip(struct ceph_connection *con)
    531{
    532	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
    533	int ret;
    534
    535	dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
    536	while (con->v1.out_skip > 0) {
    537		size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
    538
    539		if (size == con->v1.out_skip)
    540			more = MSG_MORE;
    541		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
    542					more);
    543		if (ret <= 0)
    544			goto out;
    545		con->v1.out_skip -= ret;
    546	}
    547	ret = 1;
    548out:
    549	return ret;
    550}
    551
    552/*
    553 * Prepare to read connection handshake, or an ack.
    554 */
    555static void prepare_read_banner(struct ceph_connection *con)
    556{
    557	dout("prepare_read_banner %p\n", con);
    558	con->v1.in_base_pos = 0;
    559}
    560
    561static void prepare_read_connect(struct ceph_connection *con)
    562{
    563	dout("prepare_read_connect %p\n", con);
    564	con->v1.in_base_pos = 0;
    565}
    566
    567static void prepare_read_ack(struct ceph_connection *con)
    568{
    569	dout("prepare_read_ack %p\n", con);
    570	con->v1.in_base_pos = 0;
    571}
    572
    573static void prepare_read_seq(struct ceph_connection *con)
    574{
    575	dout("prepare_read_seq %p\n", con);
    576	con->v1.in_base_pos = 0;
    577	con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
    578}
    579
    580static void prepare_read_tag(struct ceph_connection *con)
    581{
    582	dout("prepare_read_tag %p\n", con);
    583	con->v1.in_base_pos = 0;
    584	con->v1.in_tag = CEPH_MSGR_TAG_READY;
    585}
    586
    587static void prepare_read_keepalive_ack(struct ceph_connection *con)
    588{
    589	dout("prepare_read_keepalive_ack %p\n", con);
    590	con->v1.in_base_pos = 0;
    591}
    592
    593/*
    594 * Prepare to read a message.
    595 */
    596static int prepare_read_message(struct ceph_connection *con)
    597{
    598	dout("prepare_read_message %p\n", con);
    599	BUG_ON(con->in_msg != NULL);
    600	con->v1.in_base_pos = 0;
    601	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
    602	return 0;
    603}
    604
    605static int read_partial(struct ceph_connection *con,
    606			int end, int size, void *object)
    607{
    608	while (con->v1.in_base_pos < end) {
    609		int left = end - con->v1.in_base_pos;
    610		int have = size - left;
    611		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
    612		if (ret <= 0)
    613			return ret;
    614		con->v1.in_base_pos += ret;
    615	}
    616	return 1;
    617}
    618
    619/*
    620 * Read all or part of the connect-side handshake on a new connection
    621 */
    622static int read_partial_banner(struct ceph_connection *con)
    623{
    624	int size;
    625	int end;
    626	int ret;
    627
    628	dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
    629
    630	/* peer's banner */
    631	size = strlen(CEPH_BANNER);
    632	end = size;
    633	ret = read_partial(con, end, size, con->v1.in_banner);
    634	if (ret <= 0)
    635		goto out;
    636
    637	size = sizeof(con->v1.actual_peer_addr);
    638	end += size;
    639	ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
    640	if (ret <= 0)
    641		goto out;
    642	ceph_decode_banner_addr(&con->v1.actual_peer_addr);
    643
    644	size = sizeof(con->v1.peer_addr_for_me);
    645	end += size;
    646	ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
    647	if (ret <= 0)
    648		goto out;
    649	ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
    650
    651out:
    652	return ret;
    653}
    654
    655static int read_partial_connect(struct ceph_connection *con)
    656{
    657	int size;
    658	int end;
    659	int ret;
    660
    661	dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
    662
    663	size = sizeof(con->v1.in_reply);
    664	end = size;
    665	ret = read_partial(con, end, size, &con->v1.in_reply);
    666	if (ret <= 0)
    667		goto out;
    668
    669	if (con->v1.auth) {
    670		size = le32_to_cpu(con->v1.in_reply.authorizer_len);
    671		if (size > con->v1.auth->authorizer_reply_buf_len) {
    672			pr_err("authorizer reply too big: %d > %zu\n", size,
    673			       con->v1.auth->authorizer_reply_buf_len);
    674			ret = -EINVAL;
    675			goto out;
    676		}
    677
    678		end += size;
    679		ret = read_partial(con, end, size,
    680				   con->v1.auth->authorizer_reply_buf);
    681		if (ret <= 0)
    682			goto out;
    683	}
    684
    685	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
    686	     con, con->v1.in_reply.tag,
    687	     le32_to_cpu(con->v1.in_reply.connect_seq),
    688	     le32_to_cpu(con->v1.in_reply.global_seq));
    689out:
    690	return ret;
    691}
    692
    693/*
    694 * Verify the hello banner looks okay.
    695 */
    696static int verify_hello(struct ceph_connection *con)
    697{
    698	if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
    699		pr_err("connect to %s got bad banner\n",
    700		       ceph_pr_addr(&con->peer_addr));
    701		con->error_msg = "protocol error, bad banner";
    702		return -1;
    703	}
    704	return 0;
    705}
    706
    707static int process_banner(struct ceph_connection *con)
    708{
    709	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
    710
    711	dout("process_banner on %p\n", con);
    712
    713	if (verify_hello(con) < 0)
    714		return -1;
    715
    716	/*
    717	 * Make sure the other end is who we wanted.  note that the other
    718	 * end may not yet know their ip address, so if it's 0.0.0.0, give
    719	 * them the benefit of the doubt.
    720	 */
    721	if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
    722		   sizeof(con->peer_addr)) != 0 &&
    723	    !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
    724	      con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
    725		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
    726			ceph_pr_addr(&con->peer_addr),
    727			le32_to_cpu(con->peer_addr.nonce),
    728			ceph_pr_addr(&con->v1.actual_peer_addr),
    729			le32_to_cpu(con->v1.actual_peer_addr.nonce));
    730		con->error_msg = "wrong peer at address";
    731		return -1;
    732	}
    733
    734	/*
    735	 * did we learn our address?
    736	 */
    737	if (ceph_addr_is_blank(my_addr)) {
    738		memcpy(&my_addr->in_addr,
    739		       &con->v1.peer_addr_for_me.in_addr,
    740		       sizeof(con->v1.peer_addr_for_me.in_addr));
    741		ceph_addr_set_port(my_addr, 0);
    742		ceph_encode_my_addr(con->msgr);
    743		dout("process_banner learned my addr is %s\n",
    744		     ceph_pr_addr(my_addr));
    745	}
    746
    747	return 0;
    748}
    749
    750static int process_connect(struct ceph_connection *con)
    751{
    752	u64 sup_feat = from_msgr(con->msgr)->supported_features;
    753	u64 req_feat = from_msgr(con->msgr)->required_features;
    754	u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
    755	int ret;
    756
    757	dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
    758
    759	if (con->v1.auth) {
    760		int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
    761
    762		/*
    763		 * Any connection that defines ->get_authorizer()
    764		 * should also define ->add_authorizer_challenge() and
    765		 * ->verify_authorizer_reply().
    766		 *
    767		 * See get_connect_authorizer().
    768		 */
    769		if (con->v1.in_reply.tag ==
    770				CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
    771			ret = con->ops->add_authorizer_challenge(
    772				con, con->v1.auth->authorizer_reply_buf, len);
    773			if (ret < 0)
    774				return ret;
    775
    776			con_out_kvec_reset(con);
    777			__prepare_write_connect(con);
    778			prepare_read_connect(con);
    779			return 0;
    780		}
    781
    782		if (len) {
    783			ret = con->ops->verify_authorizer_reply(con);
    784			if (ret < 0) {
    785				con->error_msg = "bad authorize reply";
    786				return ret;
    787			}
    788		}
    789	}
    790
    791	switch (con->v1.in_reply.tag) {
    792	case CEPH_MSGR_TAG_FEATURES:
    793		pr_err("%s%lld %s feature set mismatch,"
    794		       " my %llx < server's %llx, missing %llx\n",
    795		       ENTITY_NAME(con->peer_name),
    796		       ceph_pr_addr(&con->peer_addr),
    797		       sup_feat, server_feat, server_feat & ~sup_feat);
    798		con->error_msg = "missing required protocol features";
    799		return -1;
    800
    801	case CEPH_MSGR_TAG_BADPROTOVER:
    802		pr_err("%s%lld %s protocol version mismatch,"
    803		       " my %d != server's %d\n",
    804		       ENTITY_NAME(con->peer_name),
    805		       ceph_pr_addr(&con->peer_addr),
    806		       le32_to_cpu(con->v1.out_connect.protocol_version),
    807		       le32_to_cpu(con->v1.in_reply.protocol_version));
    808		con->error_msg = "protocol version mismatch";
    809		return -1;
    810
    811	case CEPH_MSGR_TAG_BADAUTHORIZER:
    812		con->v1.auth_retry++;
    813		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
    814		     con->v1.auth_retry);
    815		if (con->v1.auth_retry == 2) {
    816			con->error_msg = "connect authorization failure";
    817			return -1;
    818		}
    819		con_out_kvec_reset(con);
    820		ret = prepare_write_connect(con);
    821		if (ret < 0)
    822			return ret;
    823		prepare_read_connect(con);
    824		break;
    825
    826	case CEPH_MSGR_TAG_RESETSESSION:
    827		/*
    828		 * If we connected with a large connect_seq but the peer
    829		 * has no record of a session with us (no connection, or
    830		 * connect_seq == 0), they will send RESETSESION to indicate
    831		 * that they must have reset their session, and may have
    832		 * dropped messages.
    833		 */
    834		dout("process_connect got RESET peer seq %u\n",
    835		     le32_to_cpu(con->v1.in_reply.connect_seq));
    836		pr_info("%s%lld %s session reset\n",
    837			ENTITY_NAME(con->peer_name),
    838			ceph_pr_addr(&con->peer_addr));
    839		ceph_con_reset_session(con);
    840		con_out_kvec_reset(con);
    841		ret = prepare_write_connect(con);
    842		if (ret < 0)
    843			return ret;
    844		prepare_read_connect(con);
    845
    846		/* Tell ceph about it. */
    847		mutex_unlock(&con->mutex);
    848		if (con->ops->peer_reset)
    849			con->ops->peer_reset(con);
    850		mutex_lock(&con->mutex);
    851		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
    852			return -EAGAIN;
    853		break;
    854
    855	case CEPH_MSGR_TAG_RETRY_SESSION:
    856		/*
    857		 * If we sent a smaller connect_seq than the peer has, try
    858		 * again with a larger value.
    859		 */
    860		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
    861		     le32_to_cpu(con->v1.out_connect.connect_seq),
    862		     le32_to_cpu(con->v1.in_reply.connect_seq));
    863		con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
    864		con_out_kvec_reset(con);
    865		ret = prepare_write_connect(con);
    866		if (ret < 0)
    867			return ret;
    868		prepare_read_connect(con);
    869		break;
    870
    871	case CEPH_MSGR_TAG_RETRY_GLOBAL:
    872		/*
    873		 * If we sent a smaller global_seq than the peer has, try
    874		 * again with a larger value.
    875		 */
    876		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
    877		     con->v1.peer_global_seq,
    878		     le32_to_cpu(con->v1.in_reply.global_seq));
    879		ceph_get_global_seq(con->msgr,
    880				    le32_to_cpu(con->v1.in_reply.global_seq));
    881		con_out_kvec_reset(con);
    882		ret = prepare_write_connect(con);
    883		if (ret < 0)
    884			return ret;
    885		prepare_read_connect(con);
    886		break;
    887
    888	case CEPH_MSGR_TAG_SEQ:
    889	case CEPH_MSGR_TAG_READY:
    890		if (req_feat & ~server_feat) {
    891			pr_err("%s%lld %s protocol feature mismatch,"
    892			       " my required %llx > server's %llx, need %llx\n",
    893			       ENTITY_NAME(con->peer_name),
    894			       ceph_pr_addr(&con->peer_addr),
    895			       req_feat, server_feat, req_feat & ~server_feat);
    896			con->error_msg = "missing required protocol features";
    897			return -1;
    898		}
    899
    900		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
    901		con->state = CEPH_CON_S_OPEN;
    902		con->v1.auth_retry = 0;    /* we authenticated; clear flag */
    903		con->v1.peer_global_seq =
    904			le32_to_cpu(con->v1.in_reply.global_seq);
    905		con->v1.connect_seq++;
    906		con->peer_features = server_feat;
    907		dout("process_connect got READY gseq %d cseq %d (%d)\n",
    908		     con->v1.peer_global_seq,
    909		     le32_to_cpu(con->v1.in_reply.connect_seq),
    910		     con->v1.connect_seq);
    911		WARN_ON(con->v1.connect_seq !=
    912			le32_to_cpu(con->v1.in_reply.connect_seq));
    913
    914		if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
    915			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
    916
    917		con->delay = 0;      /* reset backoff memory */
    918
    919		if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
    920			prepare_write_seq(con);
    921			prepare_read_seq(con);
    922		} else {
    923			prepare_read_tag(con);
    924		}
    925		break;
    926
    927	case CEPH_MSGR_TAG_WAIT:
    928		/*
    929		 * If there is a connection race (we are opening
    930		 * connections to each other), one of us may just have
    931		 * to WAIT.  This shouldn't happen if we are the
    932		 * client.
    933		 */
    934		con->error_msg = "protocol error, got WAIT as client";
    935		return -1;
    936
    937	default:
    938		con->error_msg = "protocol error, garbage tag during connect";
    939		return -1;
    940	}
    941	return 0;
    942}
    943
    944/*
    945 * read (part of) an ack
    946 */
    947static int read_partial_ack(struct ceph_connection *con)
    948{
    949	int size = sizeof(con->v1.in_temp_ack);
    950	int end = size;
    951
    952	return read_partial(con, end, size, &con->v1.in_temp_ack);
    953}
    954
    955/*
    956 * We can finally discard anything that's been acked.
    957 */
    958static void process_ack(struct ceph_connection *con)
    959{
    960	u64 ack = le64_to_cpu(con->v1.in_temp_ack);
    961
    962	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
    963		ceph_con_discard_sent(con, ack);
    964	else
    965		ceph_con_discard_requeued(con, ack);
    966
    967	prepare_read_tag(con);
    968}
    969
    970static int read_partial_message_section(struct ceph_connection *con,
    971					struct kvec *section,
    972					unsigned int sec_len, u32 *crc)
    973{
    974	int ret, left;
    975
    976	BUG_ON(!section);
    977
    978	while (section->iov_len < sec_len) {
    979		BUG_ON(section->iov_base == NULL);
    980		left = sec_len - section->iov_len;
    981		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
    982				       section->iov_len, left);
    983		if (ret <= 0)
    984			return ret;
    985		section->iov_len += ret;
    986	}
    987	if (section->iov_len == sec_len)
    988		*crc = crc32c(0, section->iov_base, section->iov_len);
    989
    990	return 1;
    991}
    992
    993static int read_partial_msg_data(struct ceph_connection *con)
    994{
    995	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
    996	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
    997	struct page *page;
    998	size_t page_offset;
    999	size_t length;
   1000	u32 crc = 0;
   1001	int ret;
   1002
   1003	if (do_datacrc)
   1004		crc = con->in_data_crc;
   1005	while (cursor->total_resid) {
   1006		if (!cursor->resid) {
   1007			ceph_msg_data_advance(cursor, 0);
   1008			continue;
   1009		}
   1010
   1011		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
   1012		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
   1013		if (ret <= 0) {
   1014			if (do_datacrc)
   1015				con->in_data_crc = crc;
   1016
   1017			return ret;
   1018		}
   1019
   1020		if (do_datacrc)
   1021			crc = ceph_crc32c_page(crc, page, page_offset, ret);
   1022		ceph_msg_data_advance(cursor, (size_t)ret);
   1023	}
   1024	if (do_datacrc)
   1025		con->in_data_crc = crc;
   1026
   1027	return 1;	/* must return > 0 to indicate success */
   1028}
   1029
   1030static int read_partial_msg_data_bounce(struct ceph_connection *con)
   1031{
   1032	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
   1033	struct page *page;
   1034	size_t off, len;
   1035	u32 crc;
   1036	int ret;
   1037
   1038	if (unlikely(!con->bounce_page)) {
   1039		con->bounce_page = alloc_page(GFP_NOIO);
   1040		if (!con->bounce_page) {
   1041			pr_err("failed to allocate bounce page\n");
   1042			return -ENOMEM;
   1043		}
   1044	}
   1045
   1046	crc = con->in_data_crc;
   1047	while (cursor->total_resid) {
   1048		if (!cursor->resid) {
   1049			ceph_msg_data_advance(cursor, 0);
   1050			continue;
   1051		}
   1052
   1053		page = ceph_msg_data_next(cursor, &off, &len, NULL);
   1054		ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
   1055		if (ret <= 0) {
   1056			con->in_data_crc = crc;
   1057			return ret;
   1058		}
   1059
   1060		crc = crc32c(crc, page_address(con->bounce_page), ret);
   1061		memcpy_to_page(page, off, page_address(con->bounce_page), ret);
   1062
   1063		ceph_msg_data_advance(cursor, ret);
   1064	}
   1065	con->in_data_crc = crc;
   1066
   1067	return 1;	/* must return > 0 to indicate success */
   1068}
   1069
   1070/*
   1071 * read (part of) a message.
   1072 */
   1073static int read_partial_message(struct ceph_connection *con)
   1074{
   1075	struct ceph_msg *m = con->in_msg;
   1076	int size;
   1077	int end;
   1078	int ret;
   1079	unsigned int front_len, middle_len, data_len;
   1080	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
   1081	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
   1082	u64 seq;
   1083	u32 crc;
   1084
   1085	dout("read_partial_message con %p msg %p\n", con, m);
   1086
   1087	/* header */
   1088	size = sizeof(con->v1.in_hdr);
   1089	end = size;
   1090	ret = read_partial(con, end, size, &con->v1.in_hdr);
   1091	if (ret <= 0)
   1092		return ret;
   1093
   1094	crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
   1095	if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
   1096		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
   1097		       crc, con->v1.in_hdr.crc);
   1098		return -EBADMSG;
   1099	}
   1100
   1101	front_len = le32_to_cpu(con->v1.in_hdr.front_len);
   1102	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
   1103		return -EIO;
   1104	middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
   1105	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
   1106		return -EIO;
   1107	data_len = le32_to_cpu(con->v1.in_hdr.data_len);
   1108	if (data_len > CEPH_MSG_MAX_DATA_LEN)
   1109		return -EIO;
   1110
   1111	/* verify seq# */
   1112	seq = le64_to_cpu(con->v1.in_hdr.seq);
   1113	if ((s64)seq - (s64)con->in_seq < 1) {
   1114		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
   1115			ENTITY_NAME(con->peer_name),
   1116			ceph_pr_addr(&con->peer_addr),
   1117			seq, con->in_seq + 1);
   1118		con->v1.in_base_pos = -front_len - middle_len - data_len -
   1119				      sizeof_footer(con);
   1120		con->v1.in_tag = CEPH_MSGR_TAG_READY;
   1121		return 1;
   1122	} else if ((s64)seq - (s64)con->in_seq > 1) {
   1123		pr_err("read_partial_message bad seq %lld expected %lld\n",
   1124		       seq, con->in_seq + 1);
   1125		con->error_msg = "bad message sequence # for incoming message";
   1126		return -EBADE;
   1127	}
   1128
   1129	/* allocate message? */
   1130	if (!con->in_msg) {
   1131		int skip = 0;
   1132
   1133		dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
   1134		     front_len, data_len);
   1135		ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
   1136		if (ret < 0)
   1137			return ret;
   1138
   1139		BUG_ON((!con->in_msg) ^ skip);
   1140		if (skip) {
   1141			/* skip this message */
   1142			dout("alloc_msg said skip message\n");
   1143			con->v1.in_base_pos = -front_len - middle_len -
   1144					      data_len - sizeof_footer(con);
   1145			con->v1.in_tag = CEPH_MSGR_TAG_READY;
   1146			con->in_seq++;
   1147			return 1;
   1148		}
   1149
   1150		BUG_ON(!con->in_msg);
   1151		BUG_ON(con->in_msg->con != con);
   1152		m = con->in_msg;
   1153		m->front.iov_len = 0;    /* haven't read it yet */
   1154		if (m->middle)
   1155			m->middle->vec.iov_len = 0;
   1156
   1157		/* prepare for data payload, if any */
   1158
   1159		if (data_len)
   1160			prepare_message_data(con->in_msg, data_len);
   1161	}
   1162
   1163	/* front */
   1164	ret = read_partial_message_section(con, &m->front, front_len,
   1165					   &con->in_front_crc);
   1166	if (ret <= 0)
   1167		return ret;
   1168
   1169	/* middle */
   1170	if (m->middle) {
   1171		ret = read_partial_message_section(con, &m->middle->vec,
   1172						   middle_len,
   1173						   &con->in_middle_crc);
   1174		if (ret <= 0)
   1175			return ret;
   1176	}
   1177
   1178	/* (page) data */
   1179	if (data_len) {
   1180		if (!m->num_data_items)
   1181			return -EIO;
   1182
   1183		if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
   1184			ret = read_partial_msg_data_bounce(con);
   1185		else
   1186			ret = read_partial_msg_data(con);
   1187		if (ret <= 0)
   1188			return ret;
   1189	}
   1190
   1191	/* footer */
   1192	size = sizeof_footer(con);
   1193	end += size;
   1194	ret = read_partial(con, end, size, &m->footer);
   1195	if (ret <= 0)
   1196		return ret;
   1197
   1198	if (!need_sign) {
   1199		m->footer.flags = m->old_footer.flags;
   1200		m->footer.sig = 0;
   1201	}
   1202
   1203	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
   1204	     m, front_len, m->footer.front_crc, middle_len,
   1205	     m->footer.middle_crc, data_len, m->footer.data_crc);
   1206
   1207	/* crc ok? */
   1208	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
   1209		pr_err("read_partial_message %p front crc %u != exp. %u\n",
   1210		       m, con->in_front_crc, m->footer.front_crc);
   1211		return -EBADMSG;
   1212	}
   1213	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
   1214		pr_err("read_partial_message %p middle crc %u != exp %u\n",
   1215		       m, con->in_middle_crc, m->footer.middle_crc);
   1216		return -EBADMSG;
   1217	}
   1218	if (do_datacrc &&
   1219	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
   1220	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
   1221		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
   1222		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
   1223		return -EBADMSG;
   1224	}
   1225
   1226	if (need_sign && con->ops->check_message_signature &&
   1227	    con->ops->check_message_signature(m)) {
   1228		pr_err("read_partial_message %p signature check failed\n", m);
   1229		return -EBADMSG;
   1230	}
   1231
   1232	return 1; /* done! */
   1233}
   1234
   1235static int read_keepalive_ack(struct ceph_connection *con)
   1236{
   1237	struct ceph_timespec ceph_ts;
   1238	size_t size = sizeof(ceph_ts);
   1239	int ret = read_partial(con, size, size, &ceph_ts);
   1240	if (ret <= 0)
   1241		return ret;
   1242	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
   1243	prepare_read_tag(con);
   1244	return 1;
   1245}
   1246
   1247/*
   1248 * Read what we can from the socket.
   1249 */
   1250int ceph_con_v1_try_read(struct ceph_connection *con)
   1251{
   1252	int ret = -1;
   1253
   1254more:
   1255	dout("try_read start %p state %d\n", con, con->state);
   1256	if (con->state != CEPH_CON_S_V1_BANNER &&
   1257	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
   1258	    con->state != CEPH_CON_S_OPEN)
   1259		return 0;
   1260
   1261	BUG_ON(!con->sock);
   1262
   1263	dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
   1264	     con->v1.in_base_pos);
   1265
   1266	if (con->state == CEPH_CON_S_V1_BANNER) {
   1267		ret = read_partial_banner(con);
   1268		if (ret <= 0)
   1269			goto out;
   1270		ret = process_banner(con);
   1271		if (ret < 0)
   1272			goto out;
   1273
   1274		con->state = CEPH_CON_S_V1_CONNECT_MSG;
   1275
   1276		/*
   1277		 * Received banner is good, exchange connection info.
   1278		 * Do not reset out_kvec, as sending our banner raced
   1279		 * with receiving peer banner after connect completed.
   1280		 */
   1281		ret = prepare_write_connect(con);
   1282		if (ret < 0)
   1283			goto out;
   1284		prepare_read_connect(con);
   1285
   1286		/* Send connection info before awaiting response */
   1287		goto out;
   1288	}
   1289
   1290	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
   1291		ret = read_partial_connect(con);
   1292		if (ret <= 0)
   1293			goto out;
   1294		ret = process_connect(con);
   1295		if (ret < 0)
   1296			goto out;
   1297		goto more;
   1298	}
   1299
   1300	WARN_ON(con->state != CEPH_CON_S_OPEN);
   1301
   1302	if (con->v1.in_base_pos < 0) {
   1303		/*
   1304		 * skipping + discarding content.
   1305		 */
   1306		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
   1307		if (ret <= 0)
   1308			goto out;
   1309		dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
   1310		con->v1.in_base_pos += ret;
   1311		if (con->v1.in_base_pos)
   1312			goto more;
   1313	}
   1314	if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
   1315		/*
   1316		 * what's next?
   1317		 */
   1318		ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
   1319		if (ret <= 0)
   1320			goto out;
   1321		dout("try_read got tag %d\n", con->v1.in_tag);
   1322		switch (con->v1.in_tag) {
   1323		case CEPH_MSGR_TAG_MSG:
   1324			prepare_read_message(con);
   1325			break;
   1326		case CEPH_MSGR_TAG_ACK:
   1327			prepare_read_ack(con);
   1328			break;
   1329		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
   1330			prepare_read_keepalive_ack(con);
   1331			break;
   1332		case CEPH_MSGR_TAG_CLOSE:
   1333			ceph_con_close_socket(con);
   1334			con->state = CEPH_CON_S_CLOSED;
   1335			goto out;
   1336		default:
   1337			goto bad_tag;
   1338		}
   1339	}
   1340	if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
   1341		ret = read_partial_message(con);
   1342		if (ret <= 0) {
   1343			switch (ret) {
   1344			case -EBADMSG:
   1345				con->error_msg = "bad crc/signature";
   1346				fallthrough;
   1347			case -EBADE:
   1348				ret = -EIO;
   1349				break;
   1350			case -EIO:
   1351				con->error_msg = "io error";
   1352				break;
   1353			}
   1354			goto out;
   1355		}
   1356		if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
   1357			goto more;
   1358		ceph_con_process_message(con);
   1359		if (con->state == CEPH_CON_S_OPEN)
   1360			prepare_read_tag(con);
   1361		goto more;
   1362	}
   1363	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
   1364	    con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
   1365		/*
   1366		 * the final handshake seq exchange is semantically
   1367		 * equivalent to an ACK
   1368		 */
   1369		ret = read_partial_ack(con);
   1370		if (ret <= 0)
   1371			goto out;
   1372		process_ack(con);
   1373		goto more;
   1374	}
   1375	if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
   1376		ret = read_keepalive_ack(con);
   1377		if (ret <= 0)
   1378			goto out;
   1379		goto more;
   1380	}
   1381
   1382out:
   1383	dout("try_read done on %p ret %d\n", con, ret);
   1384	return ret;
   1385
   1386bad_tag:
   1387	pr_err("try_read bad tag %d\n", con->v1.in_tag);
   1388	con->error_msg = "protocol error, garbage tag";
   1389	ret = -1;
   1390	goto out;
   1391}
   1392
   1393/*
   1394 * Write something to the socket.  Called in a worker thread when the
   1395 * socket appears to be writeable and we have something ready to send.
   1396 */
   1397int ceph_con_v1_try_write(struct ceph_connection *con)
   1398{
   1399	int ret = 1;
   1400
   1401	dout("try_write start %p state %d\n", con, con->state);
   1402	if (con->state != CEPH_CON_S_PREOPEN &&
   1403	    con->state != CEPH_CON_S_V1_BANNER &&
   1404	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
   1405	    con->state != CEPH_CON_S_OPEN)
   1406		return 0;
   1407
   1408	/* open the socket first? */
   1409	if (con->state == CEPH_CON_S_PREOPEN) {
   1410		BUG_ON(con->sock);
   1411		con->state = CEPH_CON_S_V1_BANNER;
   1412
   1413		con_out_kvec_reset(con);
   1414		prepare_write_banner(con);
   1415		prepare_read_banner(con);
   1416
   1417		BUG_ON(con->in_msg);
   1418		con->v1.in_tag = CEPH_MSGR_TAG_READY;
   1419		dout("try_write initiating connect on %p new state %d\n",
   1420		     con, con->state);
   1421		ret = ceph_tcp_connect(con);
   1422		if (ret < 0) {
   1423			con->error_msg = "connect error";
   1424			goto out;
   1425		}
   1426	}
   1427
   1428more:
   1429	dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
   1430	BUG_ON(!con->sock);
   1431
   1432	/* kvec data queued? */
   1433	if (con->v1.out_kvec_left) {
   1434		ret = write_partial_kvec(con);
   1435		if (ret <= 0)
   1436			goto out;
   1437	}
   1438	if (con->v1.out_skip) {
   1439		ret = write_partial_skip(con);
   1440		if (ret <= 0)
   1441			goto out;
   1442	}
   1443
   1444	/* msg pages? */
   1445	if (con->out_msg) {
   1446		if (con->v1.out_msg_done) {
   1447			ceph_msg_put(con->out_msg);
   1448			con->out_msg = NULL;   /* we're done with this one */
   1449			goto do_next;
   1450		}
   1451
   1452		ret = write_partial_message_data(con);
   1453		if (ret == 1)
   1454			goto more;  /* we need to send the footer, too! */
   1455		if (ret == 0)
   1456			goto out;
   1457		if (ret < 0) {
   1458			dout("try_write write_partial_message_data err %d\n",
   1459			     ret);
   1460			goto out;
   1461		}
   1462	}
   1463
   1464do_next:
   1465	if (con->state == CEPH_CON_S_OPEN) {
   1466		if (ceph_con_flag_test_and_clear(con,
   1467				CEPH_CON_F_KEEPALIVE_PENDING)) {
   1468			prepare_write_keepalive(con);
   1469			goto more;
   1470		}
   1471		/* is anything else pending? */
   1472		if (!list_empty(&con->out_queue)) {
   1473			prepare_write_message(con);
   1474			goto more;
   1475		}
   1476		if (con->in_seq > con->in_seq_acked) {
   1477			prepare_write_ack(con);
   1478			goto more;
   1479		}
   1480	}
   1481
   1482	/* Nothing to do! */
   1483	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
   1484	dout("try_write nothing else to write.\n");
   1485	ret = 0;
   1486out:
   1487	dout("try_write done on %p ret %d\n", con, ret);
   1488	return ret;
   1489}
   1490
   1491void ceph_con_v1_revoke(struct ceph_connection *con)
   1492{
   1493	struct ceph_msg *msg = con->out_msg;
   1494
   1495	WARN_ON(con->v1.out_skip);
   1496	/* footer */
   1497	if (con->v1.out_msg_done) {
   1498		con->v1.out_skip += con_out_kvec_skip(con);
   1499	} else {
   1500		WARN_ON(!msg->data_length);
   1501		con->v1.out_skip += sizeof_footer(con);
   1502	}
   1503	/* data, middle, front */
   1504	if (msg->data_length)
   1505		con->v1.out_skip += msg->cursor.total_resid;
   1506	if (msg->middle)
   1507		con->v1.out_skip += con_out_kvec_skip(con);
   1508	con->v1.out_skip += con_out_kvec_skip(con);
   1509
   1510	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
   1511	     con->v1.out_kvec_bytes, con->v1.out_skip);
   1512}
   1513
   1514void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
   1515{
   1516	unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
   1517	unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
   1518	unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
   1519
   1520	/* skip rest of message */
   1521	con->v1.in_base_pos = con->v1.in_base_pos -
   1522			sizeof(struct ceph_msg_header) -
   1523			front_len -
   1524			middle_len -
   1525			data_len -
   1526			sizeof(struct ceph_msg_footer);
   1527
   1528	con->v1.in_tag = CEPH_MSGR_TAG_READY;
   1529	con->in_seq++;
   1530
   1531	dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
   1532}
   1533
   1534bool ceph_con_v1_opened(struct ceph_connection *con)
   1535{
   1536	return con->v1.connect_seq;
   1537}
   1538
   1539void ceph_con_v1_reset_session(struct ceph_connection *con)
   1540{
   1541	con->v1.connect_seq = 0;
   1542	con->v1.peer_global_seq = 0;
   1543}
   1544
   1545void ceph_con_v1_reset_protocol(struct ceph_connection *con)
   1546{
   1547	con->v1.out_skip = 0;
   1548}