cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

send.c (40211B)


      1/*
      2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
      3 *
      4 * This software is available to you under a choice of one of two
      5 * licenses.  You may choose to be licensed under the terms of the GNU
      6 * General Public License (GPL) Version 2, available from the file
      7 * COPYING in the main directory of this source tree, or the
      8 * OpenIB.org BSD license below:
      9 *
     10 *     Redistribution and use in source and binary forms, with or
     11 *     without modification, are permitted provided that the following
     12 *     conditions are met:
     13 *
     14 *      - Redistributions of source code must retain the above
     15 *        copyright notice, this list of conditions and the following
     16 *        disclaimer.
     17 *
     18 *      - Redistributions in binary form must reproduce the above
     19 *        copyright notice, this list of conditions and the following
     20 *        disclaimer in the documentation and/or other materials
     21 *        provided with the distribution.
     22 *
     23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     30 * SOFTWARE.
     31 *
     32 */
     33#include <linux/kernel.h>
     34#include <linux/moduleparam.h>
     35#include <linux/gfp.h>
     36#include <net/sock.h>
     37#include <linux/in.h>
     38#include <linux/list.h>
     39#include <linux/ratelimit.h>
     40#include <linux/export.h>
     41#include <linux/sizes.h>
     42
     43#include "rds.h"
     44
     45/* When transmitting messages in rds_send_xmit, we need to emerge from
     46 * time to time and briefly release the CPU. Otherwise the softlock watchdog
     47 * will kick our shin.
     48 * Also, it seems fairer to not let one busy connection stall all the
     49 * others.
     50 *
     51 * send_batch_count is the number of times we'll loop in send_xmit. Setting
     52 * it to 0 will restore the old behavior (where we looped until we had
     53 * drained the queue).
     54 */
     55static int send_batch_count = SZ_1K;
     56module_param(send_batch_count, int, 0444);
     57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
     58
     59static void rds_send_remove_from_sock(struct list_head *messages, int status);
     60
     61/*
     62 * Reset the send state.  Callers must ensure that this doesn't race with
     63 * rds_send_xmit().
     64 */
     65void rds_send_path_reset(struct rds_conn_path *cp)
     66{
     67	struct rds_message *rm, *tmp;
     68	unsigned long flags;
     69
     70	if (cp->cp_xmit_rm) {
     71		rm = cp->cp_xmit_rm;
     72		cp->cp_xmit_rm = NULL;
     73		/* Tell the user the RDMA op is no longer mapped by the
     74		 * transport. This isn't entirely true (it's flushed out
     75		 * independently) but as the connection is down, there's
     76		 * no ongoing RDMA to/from that memory */
     77		rds_message_unmapped(rm);
     78		rds_message_put(rm);
     79	}
     80
     81	cp->cp_xmit_sg = 0;
     82	cp->cp_xmit_hdr_off = 0;
     83	cp->cp_xmit_data_off = 0;
     84	cp->cp_xmit_atomic_sent = 0;
     85	cp->cp_xmit_rdma_sent = 0;
     86	cp->cp_xmit_data_sent = 0;
     87
     88	cp->cp_conn->c_map_queued = 0;
     89
     90	cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
     91	cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
     92
     93	/* Mark messages as retransmissions, and move them to the send q */
     94	spin_lock_irqsave(&cp->cp_lock, flags);
     95	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
     96		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
     97		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
     98	}
     99	list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
    100	spin_unlock_irqrestore(&cp->cp_lock, flags);
    101}
    102EXPORT_SYMBOL_GPL(rds_send_path_reset);
    103
    104static int acquire_in_xmit(struct rds_conn_path *cp)
    105{
    106	return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
    107}
    108
    109static void release_in_xmit(struct rds_conn_path *cp)
    110{
    111	clear_bit(RDS_IN_XMIT, &cp->cp_flags);
    112	smp_mb__after_atomic();
    113	/*
    114	 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
    115	 * hot path and finding waiters is very rare.  We don't want to walk
    116	 * the system-wide hashed waitqueue buckets in the fast path only to
    117	 * almost never find waiters.
    118	 */
    119	if (waitqueue_active(&cp->cp_waitq))
    120		wake_up_all(&cp->cp_waitq);
    121}
    122
    123/*
    124 * We're making the conscious trade-off here to only send one message
    125 * down the connection at a time.
    126 *   Pro:
    127 *      - tx queueing is a simple fifo list
    128 *   	- reassembly is optional and easily done by transports per conn
    129 *      - no per flow rx lookup at all, straight to the socket
    130 *   	- less per-frag memory and wire overhead
    131 *   Con:
    132 *      - queued acks can be delayed behind large messages
    133 *   Depends:
    134 *      - small message latency is higher behind queued large messages
    135 *      - large message latency isn't starved by intervening small sends
    136 */
    137int rds_send_xmit(struct rds_conn_path *cp)
    138{
    139	struct rds_connection *conn = cp->cp_conn;
    140	struct rds_message *rm;
    141	unsigned long flags;
    142	unsigned int tmp;
    143	struct scatterlist *sg;
    144	int ret = 0;
    145	LIST_HEAD(to_be_dropped);
    146	int batch_count;
    147	unsigned long send_gen = 0;
    148	int same_rm = 0;
    149
    150restart:
    151	batch_count = 0;
    152
    153	/*
    154	 * sendmsg calls here after having queued its message on the send
    155	 * queue.  We only have one task feeding the connection at a time.  If
    156	 * another thread is already feeding the queue then we back off.  This
    157	 * avoids blocking the caller and trading per-connection data between
    158	 * caches per message.
    159	 */
    160	if (!acquire_in_xmit(cp)) {
    161		rds_stats_inc(s_send_lock_contention);
    162		ret = -ENOMEM;
    163		goto out;
    164	}
    165
    166	if (rds_destroy_pending(cp->cp_conn)) {
    167		release_in_xmit(cp);
    168		ret = -ENETUNREACH; /* dont requeue send work */
    169		goto out;
    170	}
    171
    172	/*
    173	 * we record the send generation after doing the xmit acquire.
    174	 * if someone else manages to jump in and do some work, we'll use
    175	 * this to avoid a goto restart farther down.
    176	 *
    177	 * The acquire_in_xmit() check above ensures that only one
    178	 * caller can increment c_send_gen at any time.
    179	 */
    180	send_gen = READ_ONCE(cp->cp_send_gen) + 1;
    181	WRITE_ONCE(cp->cp_send_gen, send_gen);
    182
    183	/*
    184	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
    185	 * we do the opposite to avoid races.
    186	 */
    187	if (!rds_conn_path_up(cp)) {
    188		release_in_xmit(cp);
    189		ret = 0;
    190		goto out;
    191	}
    192
    193	if (conn->c_trans->xmit_path_prepare)
    194		conn->c_trans->xmit_path_prepare(cp);
    195
    196	/*
    197	 * spin trying to push headers and data down the connection until
    198	 * the connection doesn't make forward progress.
    199	 */
    200	while (1) {
    201
    202		rm = cp->cp_xmit_rm;
    203
    204		if (!rm) {
    205			same_rm = 0;
    206		} else {
    207			same_rm++;
    208			if (same_rm >= 4096) {
    209				rds_stats_inc(s_send_stuck_rm);
    210				ret = -EAGAIN;
    211				break;
    212			}
    213		}
    214
    215		/*
    216		 * If between sending messages, we can send a pending congestion
    217		 * map update.
    218		 */
    219		if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
    220			rm = rds_cong_update_alloc(conn);
    221			if (IS_ERR(rm)) {
    222				ret = PTR_ERR(rm);
    223				break;
    224			}
    225			rm->data.op_active = 1;
    226			rm->m_inc.i_conn_path = cp;
    227			rm->m_inc.i_conn = cp->cp_conn;
    228
    229			cp->cp_xmit_rm = rm;
    230		}
    231
    232		/*
    233		 * If not already working on one, grab the next message.
    234		 *
    235		 * cp_xmit_rm holds a ref while we're sending this message down
    236		 * the connction.  We can use this ref while holding the
    237		 * send_sem.. rds_send_reset() is serialized with it.
    238		 */
    239		if (!rm) {
    240			unsigned int len;
    241
    242			batch_count++;
    243
    244			/* we want to process as big a batch as we can, but
    245			 * we also want to avoid softlockups.  If we've been
    246			 * through a lot of messages, lets back off and see
    247			 * if anyone else jumps in
    248			 */
    249			if (batch_count >= send_batch_count)
    250				goto over_batch;
    251
    252			spin_lock_irqsave(&cp->cp_lock, flags);
    253
    254			if (!list_empty(&cp->cp_send_queue)) {
    255				rm = list_entry(cp->cp_send_queue.next,
    256						struct rds_message,
    257						m_conn_item);
    258				rds_message_addref(rm);
    259
    260				/*
    261				 * Move the message from the send queue to the retransmit
    262				 * list right away.
    263				 */
    264				list_move_tail(&rm->m_conn_item,
    265					       &cp->cp_retrans);
    266			}
    267
    268			spin_unlock_irqrestore(&cp->cp_lock, flags);
    269
    270			if (!rm)
    271				break;
    272
    273			/* Unfortunately, the way Infiniband deals with
    274			 * RDMA to a bad MR key is by moving the entire
    275			 * queue pair to error state. We could possibly
    276			 * recover from that, but right now we drop the
    277			 * connection.
    278			 * Therefore, we never retransmit messages with RDMA ops.
    279			 */
    280			if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
    281			    (rm->rdma.op_active &&
    282			    test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
    283				spin_lock_irqsave(&cp->cp_lock, flags);
    284				if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
    285					list_move(&rm->m_conn_item, &to_be_dropped);
    286				spin_unlock_irqrestore(&cp->cp_lock, flags);
    287				continue;
    288			}
    289
    290			/* Require an ACK every once in a while */
    291			len = ntohl(rm->m_inc.i_hdr.h_len);
    292			if (cp->cp_unacked_packets == 0 ||
    293			    cp->cp_unacked_bytes < len) {
    294				set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
    295
    296				cp->cp_unacked_packets =
    297					rds_sysctl_max_unacked_packets;
    298				cp->cp_unacked_bytes =
    299					rds_sysctl_max_unacked_bytes;
    300				rds_stats_inc(s_send_ack_required);
    301			} else {
    302				cp->cp_unacked_bytes -= len;
    303				cp->cp_unacked_packets--;
    304			}
    305
    306			cp->cp_xmit_rm = rm;
    307		}
    308
    309		/* The transport either sends the whole rdma or none of it */
    310		if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
    311			rm->m_final_op = &rm->rdma;
    312			/* The transport owns the mapped memory for now.
    313			 * You can't unmap it while it's on the send queue
    314			 */
    315			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
    316			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
    317			if (ret) {
    318				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
    319				wake_up_interruptible(&rm->m_flush_wait);
    320				break;
    321			}
    322			cp->cp_xmit_rdma_sent = 1;
    323
    324		}
    325
    326		if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
    327			rm->m_final_op = &rm->atomic;
    328			/* The transport owns the mapped memory for now.
    329			 * You can't unmap it while it's on the send queue
    330			 */
    331			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
    332			ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
    333			if (ret) {
    334				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
    335				wake_up_interruptible(&rm->m_flush_wait);
    336				break;
    337			}
    338			cp->cp_xmit_atomic_sent = 1;
    339
    340		}
    341
    342		/*
    343		 * A number of cases require an RDS header to be sent
    344		 * even if there is no data.
    345		 * We permit 0-byte sends; rds-ping depends on this.
    346		 * However, if there are exclusively attached silent ops,
    347		 * we skip the hdr/data send, to enable silent operation.
    348		 */
    349		if (rm->data.op_nents == 0) {
    350			int ops_present;
    351			int all_ops_are_silent = 1;
    352
    353			ops_present = (rm->atomic.op_active || rm->rdma.op_active);
    354			if (rm->atomic.op_active && !rm->atomic.op_silent)
    355				all_ops_are_silent = 0;
    356			if (rm->rdma.op_active && !rm->rdma.op_silent)
    357				all_ops_are_silent = 0;
    358
    359			if (ops_present && all_ops_are_silent
    360			    && !rm->m_rdma_cookie)
    361				rm->data.op_active = 0;
    362		}
    363
    364		if (rm->data.op_active && !cp->cp_xmit_data_sent) {
    365			rm->m_final_op = &rm->data;
    366
    367			ret = conn->c_trans->xmit(conn, rm,
    368						  cp->cp_xmit_hdr_off,
    369						  cp->cp_xmit_sg,
    370						  cp->cp_xmit_data_off);
    371			if (ret <= 0)
    372				break;
    373
    374			if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
    375				tmp = min_t(int, ret,
    376					    sizeof(struct rds_header) -
    377					    cp->cp_xmit_hdr_off);
    378				cp->cp_xmit_hdr_off += tmp;
    379				ret -= tmp;
    380			}
    381
    382			sg = &rm->data.op_sg[cp->cp_xmit_sg];
    383			while (ret) {
    384				tmp = min_t(int, ret, sg->length -
    385						      cp->cp_xmit_data_off);
    386				cp->cp_xmit_data_off += tmp;
    387				ret -= tmp;
    388				if (cp->cp_xmit_data_off == sg->length) {
    389					cp->cp_xmit_data_off = 0;
    390					sg++;
    391					cp->cp_xmit_sg++;
    392					BUG_ON(ret != 0 && cp->cp_xmit_sg ==
    393					       rm->data.op_nents);
    394				}
    395			}
    396
    397			if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
    398			    (cp->cp_xmit_sg == rm->data.op_nents))
    399				cp->cp_xmit_data_sent = 1;
    400		}
    401
    402		/*
    403		 * A rm will only take multiple times through this loop
    404		 * if there is a data op. Thus, if the data is sent (or there was
    405		 * none), then we're done with the rm.
    406		 */
    407		if (!rm->data.op_active || cp->cp_xmit_data_sent) {
    408			cp->cp_xmit_rm = NULL;
    409			cp->cp_xmit_sg = 0;
    410			cp->cp_xmit_hdr_off = 0;
    411			cp->cp_xmit_data_off = 0;
    412			cp->cp_xmit_rdma_sent = 0;
    413			cp->cp_xmit_atomic_sent = 0;
    414			cp->cp_xmit_data_sent = 0;
    415
    416			rds_message_put(rm);
    417		}
    418	}
    419
    420over_batch:
    421	if (conn->c_trans->xmit_path_complete)
    422		conn->c_trans->xmit_path_complete(cp);
    423	release_in_xmit(cp);
    424
    425	/* Nuke any messages we decided not to retransmit. */
    426	if (!list_empty(&to_be_dropped)) {
    427		/* irqs on here, so we can put(), unlike above */
    428		list_for_each_entry(rm, &to_be_dropped, m_conn_item)
    429			rds_message_put(rm);
    430		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
    431	}
    432
    433	/*
    434	 * Other senders can queue a message after we last test the send queue
    435	 * but before we clear RDS_IN_XMIT.  In that case they'd back off and
    436	 * not try and send their newly queued message.  We need to check the
    437	 * send queue after having cleared RDS_IN_XMIT so that their message
    438	 * doesn't get stuck on the send queue.
    439	 *
    440	 * If the transport cannot continue (i.e ret != 0), then it must
    441	 * call us when more room is available, such as from the tx
    442	 * completion handler.
    443	 *
    444	 * We have an extra generation check here so that if someone manages
    445	 * to jump in after our release_in_xmit, we'll see that they have done
    446	 * some work and we will skip our goto
    447	 */
    448	if (ret == 0) {
    449		bool raced;
    450
    451		smp_mb();
    452		raced = send_gen != READ_ONCE(cp->cp_send_gen);
    453
    454		if ((test_bit(0, &conn->c_map_queued) ||
    455		    !list_empty(&cp->cp_send_queue)) && !raced) {
    456			if (batch_count < send_batch_count)
    457				goto restart;
    458			rcu_read_lock();
    459			if (rds_destroy_pending(cp->cp_conn))
    460				ret = -ENETUNREACH;
    461			else
    462				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
    463			rcu_read_unlock();
    464		} else if (raced) {
    465			rds_stats_inc(s_send_lock_queue_raced);
    466		}
    467	}
    468out:
    469	return ret;
    470}
    471EXPORT_SYMBOL_GPL(rds_send_xmit);
    472
    473static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
    474{
    475	u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
    476
    477	assert_spin_locked(&rs->rs_lock);
    478
    479	BUG_ON(rs->rs_snd_bytes < len);
    480	rs->rs_snd_bytes -= len;
    481
    482	if (rs->rs_snd_bytes == 0)
    483		rds_stats_inc(s_send_queue_empty);
    484}
    485
    486static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
    487				    is_acked_func is_acked)
    488{
    489	if (is_acked)
    490		return is_acked(rm, ack);
    491	return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
    492}
    493
    494/*
    495 * This is pretty similar to what happens below in the ACK
    496 * handling code - except that we call here as soon as we get
    497 * the IB send completion on the RDMA op and the accompanying
    498 * message.
    499 */
    500void rds_rdma_send_complete(struct rds_message *rm, int status)
    501{
    502	struct rds_sock *rs = NULL;
    503	struct rm_rdma_op *ro;
    504	struct rds_notifier *notifier;
    505	unsigned long flags;
    506
    507	spin_lock_irqsave(&rm->m_rs_lock, flags);
    508
    509	ro = &rm->rdma;
    510	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
    511	    ro->op_active && ro->op_notify && ro->op_notifier) {
    512		notifier = ro->op_notifier;
    513		rs = rm->m_rs;
    514		sock_hold(rds_rs_to_sk(rs));
    515
    516		notifier->n_status = status;
    517		spin_lock(&rs->rs_lock);
    518		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
    519		spin_unlock(&rs->rs_lock);
    520
    521		ro->op_notifier = NULL;
    522	}
    523
    524	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
    525
    526	if (rs) {
    527		rds_wake_sk_sleep(rs);
    528		sock_put(rds_rs_to_sk(rs));
    529	}
    530}
    531EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
    532
    533/*
    534 * Just like above, except looks at atomic op
    535 */
    536void rds_atomic_send_complete(struct rds_message *rm, int status)
    537{
    538	struct rds_sock *rs = NULL;
    539	struct rm_atomic_op *ao;
    540	struct rds_notifier *notifier;
    541	unsigned long flags;
    542
    543	spin_lock_irqsave(&rm->m_rs_lock, flags);
    544
    545	ao = &rm->atomic;
    546	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
    547	    && ao->op_active && ao->op_notify && ao->op_notifier) {
    548		notifier = ao->op_notifier;
    549		rs = rm->m_rs;
    550		sock_hold(rds_rs_to_sk(rs));
    551
    552		notifier->n_status = status;
    553		spin_lock(&rs->rs_lock);
    554		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
    555		spin_unlock(&rs->rs_lock);
    556
    557		ao->op_notifier = NULL;
    558	}
    559
    560	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
    561
    562	if (rs) {
    563		rds_wake_sk_sleep(rs);
    564		sock_put(rds_rs_to_sk(rs));
    565	}
    566}
    567EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
    568
    569/*
    570 * This is the same as rds_rdma_send_complete except we
    571 * don't do any locking - we have all the ingredients (message,
    572 * socket, socket lock) and can just move the notifier.
    573 */
    574static inline void
    575__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
    576{
    577	struct rm_rdma_op *ro;
    578	struct rm_atomic_op *ao;
    579
    580	ro = &rm->rdma;
    581	if (ro->op_active && ro->op_notify && ro->op_notifier) {
    582		ro->op_notifier->n_status = status;
    583		list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
    584		ro->op_notifier = NULL;
    585	}
    586
    587	ao = &rm->atomic;
    588	if (ao->op_active && ao->op_notify && ao->op_notifier) {
    589		ao->op_notifier->n_status = status;
    590		list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
    591		ao->op_notifier = NULL;
    592	}
    593
    594	/* No need to wake the app - caller does this */
    595}
    596
    597/*
    598 * This removes messages from the socket's list if they're on it.  The list
    599 * argument must be private to the caller, we must be able to modify it
    600 * without locks.  The messages must have a reference held for their
    601 * position on the list.  This function will drop that reference after
    602 * removing the messages from the 'messages' list regardless of if it found
    603 * the messages on the socket list or not.
    604 */
    605static void rds_send_remove_from_sock(struct list_head *messages, int status)
    606{
    607	unsigned long flags;
    608	struct rds_sock *rs = NULL;
    609	struct rds_message *rm;
    610
    611	while (!list_empty(messages)) {
    612		int was_on_sock = 0;
    613
    614		rm = list_entry(messages->next, struct rds_message,
    615				m_conn_item);
    616		list_del_init(&rm->m_conn_item);
    617
    618		/*
    619		 * If we see this flag cleared then we're *sure* that someone
    620		 * else beat us to removing it from the sock.  If we race
    621		 * with their flag update we'll get the lock and then really
    622		 * see that the flag has been cleared.
    623		 *
    624		 * The message spinlock makes sure nobody clears rm->m_rs
    625		 * while we're messing with it. It does not prevent the
    626		 * message from being removed from the socket, though.
    627		 */
    628		spin_lock_irqsave(&rm->m_rs_lock, flags);
    629		if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
    630			goto unlock_and_drop;
    631
    632		if (rs != rm->m_rs) {
    633			if (rs) {
    634				rds_wake_sk_sleep(rs);
    635				sock_put(rds_rs_to_sk(rs));
    636			}
    637			rs = rm->m_rs;
    638			if (rs)
    639				sock_hold(rds_rs_to_sk(rs));
    640		}
    641		if (!rs)
    642			goto unlock_and_drop;
    643		spin_lock(&rs->rs_lock);
    644
    645		if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
    646			struct rm_rdma_op *ro = &rm->rdma;
    647			struct rds_notifier *notifier;
    648
    649			list_del_init(&rm->m_sock_item);
    650			rds_send_sndbuf_remove(rs, rm);
    651
    652			if (ro->op_active && ro->op_notifier &&
    653			       (ro->op_notify || (ro->op_recverr && status))) {
    654				notifier = ro->op_notifier;
    655				list_add_tail(&notifier->n_list,
    656						&rs->rs_notify_queue);
    657				if (!notifier->n_status)
    658					notifier->n_status = status;
    659				rm->rdma.op_notifier = NULL;
    660			}
    661			was_on_sock = 1;
    662		}
    663		spin_unlock(&rs->rs_lock);
    664
    665unlock_and_drop:
    666		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
    667		rds_message_put(rm);
    668		if (was_on_sock)
    669			rds_message_put(rm);
    670	}
    671
    672	if (rs) {
    673		rds_wake_sk_sleep(rs);
    674		sock_put(rds_rs_to_sk(rs));
    675	}
    676}
    677
    678/*
    679 * Transports call here when they've determined that the receiver queued
    680 * messages up to, and including, the given sequence number.  Messages are
    681 * moved to the retrans queue when rds_send_xmit picks them off the send
    682 * queue. This means that in the TCP case, the message may not have been
    683 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
    684 * checks the RDS_MSG_HAS_ACK_SEQ bit.
    685 */
    686void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
    687			      is_acked_func is_acked)
    688{
    689	struct rds_message *rm, *tmp;
    690	unsigned long flags;
    691	LIST_HEAD(list);
    692
    693	spin_lock_irqsave(&cp->cp_lock, flags);
    694
    695	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
    696		if (!rds_send_is_acked(rm, ack, is_acked))
    697			break;
    698
    699		list_move(&rm->m_conn_item, &list);
    700		clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
    701	}
    702
    703	/* order flag updates with spin locks */
    704	if (!list_empty(&list))
    705		smp_mb__after_atomic();
    706
    707	spin_unlock_irqrestore(&cp->cp_lock, flags);
    708
    709	/* now remove the messages from the sock list as needed */
    710	rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
    711}
    712EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
    713
    714void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
    715			 is_acked_func is_acked)
    716{
    717	WARN_ON(conn->c_trans->t_mp_capable);
    718	rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
    719}
    720EXPORT_SYMBOL_GPL(rds_send_drop_acked);
    721
    722void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
    723{
    724	struct rds_message *rm, *tmp;
    725	struct rds_connection *conn;
    726	struct rds_conn_path *cp;
    727	unsigned long flags;
    728	LIST_HEAD(list);
    729
    730	/* get all the messages we're dropping under the rs lock */
    731	spin_lock_irqsave(&rs->rs_lock, flags);
    732
    733	list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
    734		if (dest &&
    735		    (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
    736		     dest->sin6_port != rm->m_inc.i_hdr.h_dport))
    737			continue;
    738
    739		list_move(&rm->m_sock_item, &list);
    740		rds_send_sndbuf_remove(rs, rm);
    741		clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
    742	}
    743
    744	/* order flag updates with the rs lock */
    745	smp_mb__after_atomic();
    746
    747	spin_unlock_irqrestore(&rs->rs_lock, flags);
    748
    749	if (list_empty(&list))
    750		return;
    751
    752	/* Remove the messages from the conn */
    753	list_for_each_entry(rm, &list, m_sock_item) {
    754
    755		conn = rm->m_inc.i_conn;
    756		if (conn->c_trans->t_mp_capable)
    757			cp = rm->m_inc.i_conn_path;
    758		else
    759			cp = &conn->c_path[0];
    760
    761		spin_lock_irqsave(&cp->cp_lock, flags);
    762		/*
    763		 * Maybe someone else beat us to removing rm from the conn.
    764		 * If we race with their flag update we'll get the lock and
    765		 * then really see that the flag has been cleared.
    766		 */
    767		if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
    768			spin_unlock_irqrestore(&cp->cp_lock, flags);
    769			continue;
    770		}
    771		list_del_init(&rm->m_conn_item);
    772		spin_unlock_irqrestore(&cp->cp_lock, flags);
    773
    774		/*
    775		 * Couldn't grab m_rs_lock in top loop (lock ordering),
    776		 * but we can now.
    777		 */
    778		spin_lock_irqsave(&rm->m_rs_lock, flags);
    779
    780		spin_lock(&rs->rs_lock);
    781		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
    782		spin_unlock(&rs->rs_lock);
    783
    784		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
    785
    786		rds_message_put(rm);
    787	}
    788
    789	rds_wake_sk_sleep(rs);
    790
    791	while (!list_empty(&list)) {
    792		rm = list_entry(list.next, struct rds_message, m_sock_item);
    793		list_del_init(&rm->m_sock_item);
    794		rds_message_wait(rm);
    795
    796		/* just in case the code above skipped this message
    797		 * because RDS_MSG_ON_CONN wasn't set, run it again here
    798		 * taking m_rs_lock is the only thing that keeps us
    799		 * from racing with ack processing.
    800		 */
    801		spin_lock_irqsave(&rm->m_rs_lock, flags);
    802
    803		spin_lock(&rs->rs_lock);
    804		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
    805		spin_unlock(&rs->rs_lock);
    806
    807		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
    808
    809		rds_message_put(rm);
    810	}
    811}
    812
    813/*
    814 * we only want this to fire once so we use the callers 'queued'.  It's
    815 * possible that another thread can race with us and remove the
    816 * message from the flow with RDS_CANCEL_SENT_TO.
    817 */
    818static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
    819			     struct rds_conn_path *cp,
    820			     struct rds_message *rm, __be16 sport,
    821			     __be16 dport, int *queued)
    822{
    823	unsigned long flags;
    824	u32 len;
    825
    826	if (*queued)
    827		goto out;
    828
    829	len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
    830
    831	/* this is the only place which holds both the socket's rs_lock
    832	 * and the connection's c_lock */
    833	spin_lock_irqsave(&rs->rs_lock, flags);
    834
    835	/*
    836	 * If there is a little space in sndbuf, we don't queue anything,
    837	 * and userspace gets -EAGAIN. But poll() indicates there's send
    838	 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
    839	 * freed up by incoming acks. So we check the *old* value of
    840	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
    841	 * and poll() now knows no more data can be sent.
    842	 */
    843	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
    844		rs->rs_snd_bytes += len;
    845
    846		/* let recv side know we are close to send space exhaustion.
    847		 * This is probably not the optimal way to do it, as this
    848		 * means we set the flag on *all* messages as soon as our
    849		 * throughput hits a certain threshold.
    850		 */
    851		if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
    852			set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
    853
    854		list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
    855		set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
    856		rds_message_addref(rm);
    857		sock_hold(rds_rs_to_sk(rs));
    858		rm->m_rs = rs;
    859
    860		/* The code ordering is a little weird, but we're
    861		   trying to minimize the time we hold c_lock */
    862		rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
    863		rm->m_inc.i_conn = conn;
    864		rm->m_inc.i_conn_path = cp;
    865		rds_message_addref(rm);
    866
    867		spin_lock(&cp->cp_lock);
    868		rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
    869		list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
    870		set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
    871		spin_unlock(&cp->cp_lock);
    872
    873		rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
    874			 rm, len, rs, rs->rs_snd_bytes,
    875			 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
    876
    877		*queued = 1;
    878	}
    879
    880	spin_unlock_irqrestore(&rs->rs_lock, flags);
    881out:
    882	return *queued;
    883}
    884
    885/*
    886 * rds_message is getting to be quite complicated, and we'd like to allocate
    887 * it all in one go. This figures out how big it needs to be up front.
    888 */
    889static int rds_rm_size(struct msghdr *msg, int num_sgs,
    890		       struct rds_iov_vector_arr *vct)
    891{
    892	struct cmsghdr *cmsg;
    893	int size = 0;
    894	int cmsg_groups = 0;
    895	int retval;
    896	bool zcopy_cookie = false;
    897	struct rds_iov_vector *iov, *tmp_iov;
    898
    899	if (num_sgs < 0)
    900		return -EINVAL;
    901
    902	for_each_cmsghdr(cmsg, msg) {
    903		if (!CMSG_OK(msg, cmsg))
    904			return -EINVAL;
    905
    906		if (cmsg->cmsg_level != SOL_RDS)
    907			continue;
    908
    909		switch (cmsg->cmsg_type) {
    910		case RDS_CMSG_RDMA_ARGS:
    911			if (vct->indx >= vct->len) {
    912				vct->len += vct->incr;
    913				tmp_iov =
    914					krealloc(vct->vec,
    915						 vct->len *
    916						 sizeof(struct rds_iov_vector),
    917						 GFP_KERNEL);
    918				if (!tmp_iov) {
    919					vct->len -= vct->incr;
    920					return -ENOMEM;
    921				}
    922				vct->vec = tmp_iov;
    923			}
    924			iov = &vct->vec[vct->indx];
    925			memset(iov, 0, sizeof(struct rds_iov_vector));
    926			vct->indx++;
    927			cmsg_groups |= 1;
    928			retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
    929			if (retval < 0)
    930				return retval;
    931			size += retval;
    932
    933			break;
    934
    935		case RDS_CMSG_ZCOPY_COOKIE:
    936			zcopy_cookie = true;
    937			fallthrough;
    938
    939		case RDS_CMSG_RDMA_DEST:
    940		case RDS_CMSG_RDMA_MAP:
    941			cmsg_groups |= 2;
    942			/* these are valid but do no add any size */
    943			break;
    944
    945		case RDS_CMSG_ATOMIC_CSWP:
    946		case RDS_CMSG_ATOMIC_FADD:
    947		case RDS_CMSG_MASKED_ATOMIC_CSWP:
    948		case RDS_CMSG_MASKED_ATOMIC_FADD:
    949			cmsg_groups |= 1;
    950			size += sizeof(struct scatterlist);
    951			break;
    952
    953		default:
    954			return -EINVAL;
    955		}
    956
    957	}
    958
    959	if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
    960		return -EINVAL;
    961
    962	size += num_sgs * sizeof(struct scatterlist);
    963
    964	/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
    965	if (cmsg_groups == 3)
    966		return -EINVAL;
    967
    968	return size;
    969}
    970
    971static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
    972			  struct cmsghdr *cmsg)
    973{
    974	u32 *cookie;
    975
    976	if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
    977	    !rm->data.op_mmp_znotifier)
    978		return -EINVAL;
    979	cookie = CMSG_DATA(cmsg);
    980	rm->data.op_mmp_znotifier->z_cookie = *cookie;
    981	return 0;
    982}
    983
    984static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
    985			 struct msghdr *msg, int *allocated_mr,
    986			 struct rds_iov_vector_arr *vct)
    987{
    988	struct cmsghdr *cmsg;
    989	int ret = 0, ind = 0;
    990
    991	for_each_cmsghdr(cmsg, msg) {
    992		if (!CMSG_OK(msg, cmsg))
    993			return -EINVAL;
    994
    995		if (cmsg->cmsg_level != SOL_RDS)
    996			continue;
    997
    998		/* As a side effect, RDMA_DEST and RDMA_MAP will set
    999		 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
   1000		 */
   1001		switch (cmsg->cmsg_type) {
   1002		case RDS_CMSG_RDMA_ARGS:
   1003			if (ind >= vct->indx)
   1004				return -ENOMEM;
   1005			ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
   1006			ind++;
   1007			break;
   1008
   1009		case RDS_CMSG_RDMA_DEST:
   1010			ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
   1011			break;
   1012
   1013		case RDS_CMSG_RDMA_MAP:
   1014			ret = rds_cmsg_rdma_map(rs, rm, cmsg);
   1015			if (!ret)
   1016				*allocated_mr = 1;
   1017			else if (ret == -ENODEV)
   1018				/* Accommodate the get_mr() case which can fail
   1019				 * if connection isn't established yet.
   1020				 */
   1021				ret = -EAGAIN;
   1022			break;
   1023		case RDS_CMSG_ATOMIC_CSWP:
   1024		case RDS_CMSG_ATOMIC_FADD:
   1025		case RDS_CMSG_MASKED_ATOMIC_CSWP:
   1026		case RDS_CMSG_MASKED_ATOMIC_FADD:
   1027			ret = rds_cmsg_atomic(rs, rm, cmsg);
   1028			break;
   1029
   1030		case RDS_CMSG_ZCOPY_COOKIE:
   1031			ret = rds_cmsg_zcopy(rs, rm, cmsg);
   1032			break;
   1033
   1034		default:
   1035			return -EINVAL;
   1036		}
   1037
   1038		if (ret)
   1039			break;
   1040	}
   1041
   1042	return ret;
   1043}
   1044
   1045static int rds_send_mprds_hash(struct rds_sock *rs,
   1046			       struct rds_connection *conn, int nonblock)
   1047{
   1048	int hash;
   1049
   1050	if (conn->c_npaths == 0)
   1051		hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
   1052	else
   1053		hash = RDS_MPATH_HASH(rs, conn->c_npaths);
   1054	if (conn->c_npaths == 0 && hash != 0) {
   1055		rds_send_ping(conn, 0);
   1056
   1057		/* The underlying connection is not up yet.  Need to wait
   1058		 * until it is up to be sure that the non-zero c_path can be
   1059		 * used.  But if we are interrupted, we have to use the zero
   1060		 * c_path in case the connection ends up being non-MP capable.
   1061		 */
   1062		if (conn->c_npaths == 0) {
   1063			/* Cannot wait for the connection be made, so just use
   1064			 * the base c_path.
   1065			 */
   1066			if (nonblock)
   1067				return 0;
   1068			if (wait_event_interruptible(conn->c_hs_waitq,
   1069						     conn->c_npaths != 0))
   1070				hash = 0;
   1071		}
   1072		if (conn->c_npaths == 1)
   1073			hash = 0;
   1074	}
   1075	return hash;
   1076}
   1077
   1078static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
   1079{
   1080	struct rds_rdma_args *args;
   1081	struct cmsghdr *cmsg;
   1082
   1083	for_each_cmsghdr(cmsg, msg) {
   1084		if (!CMSG_OK(msg, cmsg))
   1085			return -EINVAL;
   1086
   1087		if (cmsg->cmsg_level != SOL_RDS)
   1088			continue;
   1089
   1090		if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
   1091			if (cmsg->cmsg_len <
   1092			    CMSG_LEN(sizeof(struct rds_rdma_args)))
   1093				return -EINVAL;
   1094			args = CMSG_DATA(cmsg);
   1095			*rdma_bytes += args->remote_vec.bytes;
   1096		}
   1097	}
   1098	return 0;
   1099}
   1100
   1101int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
   1102{
   1103	struct sock *sk = sock->sk;
   1104	struct rds_sock *rs = rds_sk_to_rs(sk);
   1105	DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
   1106	DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
   1107	__be16 dport;
   1108	struct rds_message *rm = NULL;
   1109	struct rds_connection *conn;
   1110	int ret = 0;
   1111	int queued = 0, allocated_mr = 0;
   1112	int nonblock = msg->msg_flags & MSG_DONTWAIT;
   1113	long timeo = sock_sndtimeo(sk, nonblock);
   1114	struct rds_conn_path *cpath;
   1115	struct in6_addr daddr;
   1116	__u32 scope_id = 0;
   1117	size_t total_payload_len = payload_len, rdma_payload_len = 0;
   1118	bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
   1119		      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
   1120	int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
   1121	int namelen;
   1122	struct rds_iov_vector_arr vct;
   1123	int ind;
   1124
   1125	memset(&vct, 0, sizeof(vct));
   1126
   1127	/* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */
   1128	vct.incr = 1;
   1129
   1130	/* Mirror Linux UDP mirror of BSD error message compatibility */
   1131	/* XXX: Perhaps MSG_MORE someday */
   1132	if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
   1133		ret = -EOPNOTSUPP;
   1134		goto out;
   1135	}
   1136
   1137	namelen = msg->msg_namelen;
   1138	if (namelen != 0) {
   1139		if (namelen < sizeof(*usin)) {
   1140			ret = -EINVAL;
   1141			goto out;
   1142		}
   1143		switch (usin->sin_family) {
   1144		case AF_INET:
   1145			if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
   1146			    usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
   1147			    ipv4_is_multicast(usin->sin_addr.s_addr)) {
   1148				ret = -EINVAL;
   1149				goto out;
   1150			}
   1151			ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
   1152			dport = usin->sin_port;
   1153			break;
   1154
   1155#if IS_ENABLED(CONFIG_IPV6)
   1156		case AF_INET6: {
   1157			int addr_type;
   1158
   1159			if (namelen < sizeof(*sin6)) {
   1160				ret = -EINVAL;
   1161				goto out;
   1162			}
   1163			addr_type = ipv6_addr_type(&sin6->sin6_addr);
   1164			if (!(addr_type & IPV6_ADDR_UNICAST)) {
   1165				__be32 addr4;
   1166
   1167				if (!(addr_type & IPV6_ADDR_MAPPED)) {
   1168					ret = -EINVAL;
   1169					goto out;
   1170				}
   1171
   1172				/* It is a mapped address.  Need to do some
   1173				 * sanity checks.
   1174				 */
   1175				addr4 = sin6->sin6_addr.s6_addr32[3];
   1176				if (addr4 == htonl(INADDR_ANY) ||
   1177				    addr4 == htonl(INADDR_BROADCAST) ||
   1178				    ipv4_is_multicast(addr4)) {
   1179					ret = -EINVAL;
   1180					goto out;
   1181				}
   1182			}
   1183			if (addr_type & IPV6_ADDR_LINKLOCAL) {
   1184				if (sin6->sin6_scope_id == 0) {
   1185					ret = -EINVAL;
   1186					goto out;
   1187				}
   1188				scope_id = sin6->sin6_scope_id;
   1189			}
   1190
   1191			daddr = sin6->sin6_addr;
   1192			dport = sin6->sin6_port;
   1193			break;
   1194		}
   1195#endif
   1196
   1197		default:
   1198			ret = -EINVAL;
   1199			goto out;
   1200		}
   1201	} else {
   1202		/* We only care about consistency with ->connect() */
   1203		lock_sock(sk);
   1204		daddr = rs->rs_conn_addr;
   1205		dport = rs->rs_conn_port;
   1206		scope_id = rs->rs_bound_scope_id;
   1207		release_sock(sk);
   1208	}
   1209
   1210	lock_sock(sk);
   1211	if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
   1212		release_sock(sk);
   1213		ret = -ENOTCONN;
   1214		goto out;
   1215	} else if (namelen != 0) {
   1216		/* Cannot send to an IPv4 address using an IPv6 source
   1217		 * address and cannot send to an IPv6 address using an
   1218		 * IPv4 source address.
   1219		 */
   1220		if (ipv6_addr_v4mapped(&daddr) ^
   1221		    ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
   1222			release_sock(sk);
   1223			ret = -EOPNOTSUPP;
   1224			goto out;
   1225		}
   1226		/* If the socket is already bound to a link local address,
   1227		 * it can only send to peers on the same link.  But allow
   1228		 * communicating between link local and non-link local address.
   1229		 */
   1230		if (scope_id != rs->rs_bound_scope_id) {
   1231			if (!scope_id) {
   1232				scope_id = rs->rs_bound_scope_id;
   1233			} else if (rs->rs_bound_scope_id) {
   1234				release_sock(sk);
   1235				ret = -EINVAL;
   1236				goto out;
   1237			}
   1238		}
   1239	}
   1240	release_sock(sk);
   1241
   1242	ret = rds_rdma_bytes(msg, &rdma_payload_len);
   1243	if (ret)
   1244		goto out;
   1245
   1246	total_payload_len += rdma_payload_len;
   1247	if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
   1248		ret = -EMSGSIZE;
   1249		goto out;
   1250	}
   1251
   1252	if (payload_len > rds_sk_sndbuf(rs)) {
   1253		ret = -EMSGSIZE;
   1254		goto out;
   1255	}
   1256
   1257	if (zcopy) {
   1258		if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
   1259			ret = -EOPNOTSUPP;
   1260			goto out;
   1261		}
   1262		num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
   1263	}
   1264	/* size of rm including all sgs */
   1265	ret = rds_rm_size(msg, num_sgs, &vct);
   1266	if (ret < 0)
   1267		goto out;
   1268
   1269	rm = rds_message_alloc(ret, GFP_KERNEL);
   1270	if (!rm) {
   1271		ret = -ENOMEM;
   1272		goto out;
   1273	}
   1274
   1275	/* Attach data to the rm */
   1276	if (payload_len) {
   1277		rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
   1278		if (IS_ERR(rm->data.op_sg)) {
   1279			ret = PTR_ERR(rm->data.op_sg);
   1280			goto out;
   1281		}
   1282		ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
   1283		if (ret)
   1284			goto out;
   1285	}
   1286	rm->data.op_active = 1;
   1287
   1288	rm->m_daddr = daddr;
   1289
   1290	/* rds_conn_create has a spinlock that runs with IRQ off.
   1291	 * Caching the conn in the socket helps a lot. */
   1292	if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
   1293	    rs->rs_tos == rs->rs_conn->c_tos) {
   1294		conn = rs->rs_conn;
   1295	} else {
   1296		conn = rds_conn_create_outgoing(sock_net(sock->sk),
   1297						&rs->rs_bound_addr, &daddr,
   1298						rs->rs_transport, rs->rs_tos,
   1299						sock->sk->sk_allocation,
   1300						scope_id);
   1301		if (IS_ERR(conn)) {
   1302			ret = PTR_ERR(conn);
   1303			goto out;
   1304		}
   1305		rs->rs_conn = conn;
   1306	}
   1307
   1308	if (conn->c_trans->t_mp_capable)
   1309		cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
   1310	else
   1311		cpath = &conn->c_path[0];
   1312
   1313	rm->m_conn_path = cpath;
   1314
   1315	/* Parse any control messages the user may have included. */
   1316	ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
   1317	if (ret) {
   1318		/* Trigger connection so that its ready for the next retry */
   1319		if (ret ==  -EAGAIN)
   1320			rds_conn_connect_if_down(conn);
   1321		goto out;
   1322	}
   1323
   1324	if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
   1325		printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
   1326			       &rm->rdma, conn->c_trans->xmit_rdma);
   1327		ret = -EOPNOTSUPP;
   1328		goto out;
   1329	}
   1330
   1331	if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
   1332		printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
   1333			       &rm->atomic, conn->c_trans->xmit_atomic);
   1334		ret = -EOPNOTSUPP;
   1335		goto out;
   1336	}
   1337
   1338	if (rds_destroy_pending(conn)) {
   1339		ret = -EAGAIN;
   1340		goto out;
   1341	}
   1342
   1343	if (rds_conn_path_down(cpath))
   1344		rds_check_all_paths(conn);
   1345
   1346	ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
   1347	if (ret) {
   1348		rs->rs_seen_congestion = 1;
   1349		goto out;
   1350	}
   1351	while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
   1352				  dport, &queued)) {
   1353		rds_stats_inc(s_send_queue_full);
   1354
   1355		if (nonblock) {
   1356			ret = -EAGAIN;
   1357			goto out;
   1358		}
   1359
   1360		timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
   1361					rds_send_queue_rm(rs, conn, cpath, rm,
   1362							  rs->rs_bound_port,
   1363							  dport,
   1364							  &queued),
   1365					timeo);
   1366		rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
   1367		if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
   1368			continue;
   1369
   1370		ret = timeo;
   1371		if (ret == 0)
   1372			ret = -ETIMEDOUT;
   1373		goto out;
   1374	}
   1375
   1376	/*
   1377	 * By now we've committed to the send.  We reuse rds_send_worker()
   1378	 * to retry sends in the rds thread if the transport asks us to.
   1379	 */
   1380	rds_stats_inc(s_send_queued);
   1381
   1382	ret = rds_send_xmit(cpath);
   1383	if (ret == -ENOMEM || ret == -EAGAIN) {
   1384		ret = 0;
   1385		rcu_read_lock();
   1386		if (rds_destroy_pending(cpath->cp_conn))
   1387			ret = -ENETUNREACH;
   1388		else
   1389			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
   1390		rcu_read_unlock();
   1391	}
   1392	if (ret)
   1393		goto out;
   1394	rds_message_put(rm);
   1395
   1396	for (ind = 0; ind < vct.indx; ind++)
   1397		kfree(vct.vec[ind].iov);
   1398	kfree(vct.vec);
   1399
   1400	return payload_len;
   1401
   1402out:
   1403	for (ind = 0; ind < vct.indx; ind++)
   1404		kfree(vct.vec[ind].iov);
   1405	kfree(vct.vec);
   1406
   1407	/* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
   1408	 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
   1409	 * or in any other way, we need to destroy the MR again */
   1410	if (allocated_mr)
   1411		rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
   1412
   1413	if (rm)
   1414		rds_message_put(rm);
   1415	return ret;
   1416}
   1417
   1418/*
   1419 * send out a probe. Can be shared by rds_send_ping,
   1420 * rds_send_pong, rds_send_hb.
   1421 * rds_send_hb should use h_flags
   1422 *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
   1423 * or
   1424 *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
   1425 */
   1426static int
   1427rds_send_probe(struct rds_conn_path *cp, __be16 sport,
   1428	       __be16 dport, u8 h_flags)
   1429{
   1430	struct rds_message *rm;
   1431	unsigned long flags;
   1432	int ret = 0;
   1433
   1434	rm = rds_message_alloc(0, GFP_ATOMIC);
   1435	if (!rm) {
   1436		ret = -ENOMEM;
   1437		goto out;
   1438	}
   1439
   1440	rm->m_daddr = cp->cp_conn->c_faddr;
   1441	rm->data.op_active = 1;
   1442
   1443	rds_conn_path_connect_if_down(cp);
   1444
   1445	ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
   1446	if (ret)
   1447		goto out;
   1448
   1449	spin_lock_irqsave(&cp->cp_lock, flags);
   1450	list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
   1451	set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
   1452	rds_message_addref(rm);
   1453	rm->m_inc.i_conn = cp->cp_conn;
   1454	rm->m_inc.i_conn_path = cp;
   1455
   1456	rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
   1457				    cp->cp_next_tx_seq);
   1458	rm->m_inc.i_hdr.h_flags |= h_flags;
   1459	cp->cp_next_tx_seq++;
   1460
   1461	if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
   1462	    cp->cp_conn->c_trans->t_mp_capable) {
   1463		u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
   1464		u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
   1465
   1466		rds_message_add_extension(&rm->m_inc.i_hdr,
   1467					  RDS_EXTHDR_NPATHS, &npaths,
   1468					  sizeof(npaths));
   1469		rds_message_add_extension(&rm->m_inc.i_hdr,
   1470					  RDS_EXTHDR_GEN_NUM,
   1471					  &my_gen_num,
   1472					  sizeof(u32));
   1473	}
   1474	spin_unlock_irqrestore(&cp->cp_lock, flags);
   1475
   1476	rds_stats_inc(s_send_queued);
   1477	rds_stats_inc(s_send_pong);
   1478
   1479	/* schedule the send work on rds_wq */
   1480	rcu_read_lock();
   1481	if (!rds_destroy_pending(cp->cp_conn))
   1482		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
   1483	rcu_read_unlock();
   1484
   1485	rds_message_put(rm);
   1486	return 0;
   1487
   1488out:
   1489	if (rm)
   1490		rds_message_put(rm);
   1491	return ret;
   1492}
   1493
   1494int
   1495rds_send_pong(struct rds_conn_path *cp, __be16 dport)
   1496{
   1497	return rds_send_probe(cp, 0, dport, 0);
   1498}
   1499
   1500void
   1501rds_send_ping(struct rds_connection *conn, int cp_index)
   1502{
   1503	unsigned long flags;
   1504	struct rds_conn_path *cp = &conn->c_path[cp_index];
   1505
   1506	spin_lock_irqsave(&cp->cp_lock, flags);
   1507	if (conn->c_ping_triggered) {
   1508		spin_unlock_irqrestore(&cp->cp_lock, flags);
   1509		return;
   1510	}
   1511	conn->c_ping_triggered = 1;
   1512	spin_unlock_irqrestore(&cp->cp_lock, flags);
   1513	rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
   1514}
   1515EXPORT_SYMBOL_GPL(rds_send_ping);