cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

recv.c (23910B)


      1/*
      2 * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
      3 *
      4 * This software is available to you under a choice of one of two
      5 * licenses.  You may choose to be licensed under the terms of the GNU
      6 * General Public License (GPL) Version 2, available from the file
      7 * COPYING in the main directory of this source tree, or the
      8 * OpenIB.org BSD license below:
      9 *
     10 *     Redistribution and use in source and binary forms, with or
     11 *     without modification, are permitted provided that the following
     12 *     conditions are met:
     13 *
     14 *      - Redistributions of source code must retain the above
     15 *        copyright notice, this list of conditions and the following
     16 *        disclaimer.
     17 *
     18 *      - Redistributions in binary form must reproduce the above
     19 *        copyright notice, this list of conditions and the following
     20 *        disclaimer in the documentation and/or other materials
     21 *        provided with the distribution.
     22 *
     23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     30 * SOFTWARE.
     31 *
     32 */
     33#include <linux/kernel.h>
     34#include <linux/slab.h>
     35#include <net/sock.h>
     36#include <linux/in.h>
     37#include <linux/export.h>
     38#include <linux/time.h>
     39#include <linux/rds.h>
     40
     41#include "rds.h"
     42
     43void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
     44		 struct in6_addr *saddr)
     45{
     46	refcount_set(&inc->i_refcount, 1);
     47	INIT_LIST_HEAD(&inc->i_item);
     48	inc->i_conn = conn;
     49	inc->i_saddr = *saddr;
     50	inc->i_usercopy.rdma_cookie = 0;
     51	inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
     52
     53	memset(inc->i_rx_lat_trace, 0, sizeof(inc->i_rx_lat_trace));
     54}
     55EXPORT_SYMBOL_GPL(rds_inc_init);
     56
     57void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
     58		       struct in6_addr  *saddr)
     59{
     60	refcount_set(&inc->i_refcount, 1);
     61	INIT_LIST_HEAD(&inc->i_item);
     62	inc->i_conn = cp->cp_conn;
     63	inc->i_conn_path = cp;
     64	inc->i_saddr = *saddr;
     65	inc->i_usercopy.rdma_cookie = 0;
     66	inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
     67}
     68EXPORT_SYMBOL_GPL(rds_inc_path_init);
     69
     70static void rds_inc_addref(struct rds_incoming *inc)
     71{
     72	rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
     73	refcount_inc(&inc->i_refcount);
     74}
     75
     76void rds_inc_put(struct rds_incoming *inc)
     77{
     78	rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
     79	if (refcount_dec_and_test(&inc->i_refcount)) {
     80		BUG_ON(!list_empty(&inc->i_item));
     81
     82		inc->i_conn->c_trans->inc_free(inc);
     83	}
     84}
     85EXPORT_SYMBOL_GPL(rds_inc_put);
     86
     87static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
     88				  struct rds_cong_map *map,
     89				  int delta, __be16 port)
     90{
     91	int now_congested;
     92
     93	if (delta == 0)
     94		return;
     95
     96	rs->rs_rcv_bytes += delta;
     97	if (delta > 0)
     98		rds_stats_add(s_recv_bytes_added_to_socket, delta);
     99	else
    100		rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
    101
    102	/* loop transport doesn't send/recv congestion updates */
    103	if (rs->rs_transport->t_type == RDS_TRANS_LOOP)
    104		return;
    105
    106	now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
    107
    108	rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d "
    109	  "now_cong %d delta %d\n",
    110	  rs, &rs->rs_bound_addr,
    111	  ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
    112	  rds_sk_rcvbuf(rs), now_congested, delta);
    113
    114	/* wasn't -> am congested */
    115	if (!rs->rs_congested && now_congested) {
    116		rs->rs_congested = 1;
    117		rds_cong_set_bit(map, port);
    118		rds_cong_queue_updates(map);
    119	}
    120	/* was -> aren't congested */
    121	/* Require more free space before reporting uncongested to prevent
    122	   bouncing cong/uncong state too often */
    123	else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
    124		rs->rs_congested = 0;
    125		rds_cong_clear_bit(map, port);
    126		rds_cong_queue_updates(map);
    127	}
    128
    129	/* do nothing if no change in cong state */
    130}
    131
    132static void rds_conn_peer_gen_update(struct rds_connection *conn,
    133				     u32 peer_gen_num)
    134{
    135	int i;
    136	struct rds_message *rm, *tmp;
    137	unsigned long flags;
    138
    139	WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
    140	if (peer_gen_num != 0) {
    141		if (conn->c_peer_gen_num != 0 &&
    142		    peer_gen_num != conn->c_peer_gen_num) {
    143			for (i = 0; i < RDS_MPATH_WORKERS; i++) {
    144				struct rds_conn_path *cp;
    145
    146				cp = &conn->c_path[i];
    147				spin_lock_irqsave(&cp->cp_lock, flags);
    148				cp->cp_next_tx_seq = 1;
    149				cp->cp_next_rx_seq = 0;
    150				list_for_each_entry_safe(rm, tmp,
    151							 &cp->cp_retrans,
    152							 m_conn_item) {
    153					set_bit(RDS_MSG_FLUSH, &rm->m_flags);
    154				}
    155				spin_unlock_irqrestore(&cp->cp_lock, flags);
    156			}
    157		}
    158		conn->c_peer_gen_num = peer_gen_num;
    159	}
    160}
    161
    162/*
    163 * Process all extension headers that come with this message.
    164 */
    165static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
    166{
    167	struct rds_header *hdr = &inc->i_hdr;
    168	unsigned int pos = 0, type, len;
    169	union {
    170		struct rds_ext_header_version version;
    171		struct rds_ext_header_rdma rdma;
    172		struct rds_ext_header_rdma_dest rdma_dest;
    173	} buffer;
    174
    175	while (1) {
    176		len = sizeof(buffer);
    177		type = rds_message_next_extension(hdr, &pos, &buffer, &len);
    178		if (type == RDS_EXTHDR_NONE)
    179			break;
    180		/* Process extension header here */
    181		switch (type) {
    182		case RDS_EXTHDR_RDMA:
    183			rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
    184			break;
    185
    186		case RDS_EXTHDR_RDMA_DEST:
    187			/* We ignore the size for now. We could stash it
    188			 * somewhere and use it for error checking. */
    189			inc->i_usercopy.rdma_cookie = rds_rdma_make_cookie(
    190					be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
    191					be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
    192
    193			break;
    194		}
    195	}
    196}
    197
    198static void rds_recv_hs_exthdrs(struct rds_header *hdr,
    199				struct rds_connection *conn)
    200{
    201	unsigned int pos = 0, type, len;
    202	union {
    203		struct rds_ext_header_version version;
    204		u16 rds_npaths;
    205		u32 rds_gen_num;
    206	} buffer;
    207	u32 new_peer_gen_num = 0;
    208
    209	while (1) {
    210		len = sizeof(buffer);
    211		type = rds_message_next_extension(hdr, &pos, &buffer, &len);
    212		if (type == RDS_EXTHDR_NONE)
    213			break;
    214		/* Process extension header here */
    215		switch (type) {
    216		case RDS_EXTHDR_NPATHS:
    217			conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
    218					       be16_to_cpu(buffer.rds_npaths));
    219			break;
    220		case RDS_EXTHDR_GEN_NUM:
    221			new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
    222			break;
    223		default:
    224			pr_warn_ratelimited("ignoring unknown exthdr type "
    225					     "0x%x\n", type);
    226		}
    227	}
    228	/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
    229	conn->c_npaths = max_t(int, conn->c_npaths, 1);
    230	conn->c_ping_triggered = 0;
    231	rds_conn_peer_gen_update(conn, new_peer_gen_num);
    232}
    233
    234/* rds_start_mprds() will synchronously start multiple paths when appropriate.
    235 * The scheme is based on the following rules:
    236 *
    237 * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
    238 *    sender's npaths (s_npaths)
    239 * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
    240 *    sends back a probe-pong with r_npaths. After that, if rcvr is the
    241 *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
    242 *    mprds_paths.
    243 * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
    244 *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
    245 *    called after reception of the probe-pong on all mprds_paths.
    246 *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
    247 *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
    248 * 4. rds_connect_worker must only trigger a connection if laddr < faddr.
    249 * 5. sender may end up queuing the packet on the cp. will get sent out later.
    250 *    when connection is completed.
    251 */
    252static void rds_start_mprds(struct rds_connection *conn)
    253{
    254	int i;
    255	struct rds_conn_path *cp;
    256
    257	if (conn->c_npaths > 1 &&
    258	    rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) {
    259		for (i = 0; i < conn->c_npaths; i++) {
    260			cp = &conn->c_path[i];
    261			rds_conn_path_connect_if_down(cp);
    262		}
    263	}
    264}
    265
    266/*
    267 * The transport must make sure that this is serialized against other
    268 * rx and conn reset on this specific conn.
    269 *
    270 * We currently assert that only one fragmented message will be sent
    271 * down a connection at a time.  This lets us reassemble in the conn
    272 * instead of per-flow which means that we don't have to go digging through
    273 * flows to tear down partial reassembly progress on conn failure and
    274 * we save flow lookup and locking for each frag arrival.  It does mean
    275 * that small messages will wait behind large ones.  Fragmenting at all
    276 * is only to reduce the memory consumption of pre-posted buffers.
    277 *
    278 * The caller passes in saddr and daddr instead of us getting it from the
    279 * conn.  This lets loopback, who only has one conn for both directions,
    280 * tell us which roles the addrs in the conn are playing for this message.
    281 */
    282void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr,
    283		       struct in6_addr *daddr,
    284		       struct rds_incoming *inc, gfp_t gfp)
    285{
    286	struct rds_sock *rs = NULL;
    287	struct sock *sk;
    288	unsigned long flags;
    289	struct rds_conn_path *cp;
    290
    291	inc->i_conn = conn;
    292	inc->i_rx_jiffies = jiffies;
    293	if (conn->c_trans->t_mp_capable)
    294		cp = inc->i_conn_path;
    295	else
    296		cp = &conn->c_path[0];
    297
    298	rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
    299		 "flags 0x%x rx_jiffies %lu\n", conn,
    300		 (unsigned long long)cp->cp_next_rx_seq,
    301		 inc,
    302		 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
    303		 be32_to_cpu(inc->i_hdr.h_len),
    304		 be16_to_cpu(inc->i_hdr.h_sport),
    305		 be16_to_cpu(inc->i_hdr.h_dport),
    306		 inc->i_hdr.h_flags,
    307		 inc->i_rx_jiffies);
    308
    309	/*
    310	 * Sequence numbers should only increase.  Messages get their
    311	 * sequence number as they're queued in a sending conn.  They
    312	 * can be dropped, though, if the sending socket is closed before
    313	 * they hit the wire.  So sequence numbers can skip forward
    314	 * under normal operation.  They can also drop back in the conn
    315	 * failover case as previously sent messages are resent down the
    316	 * new instance of a conn.  We drop those, otherwise we have
    317	 * to assume that the next valid seq does not come after a
    318	 * hole in the fragment stream.
    319	 *
    320	 * The headers don't give us a way to realize if fragments of
    321	 * a message have been dropped.  We assume that frags that arrive
    322	 * to a flow are part of the current message on the flow that is
    323	 * being reassembled.  This means that senders can't drop messages
    324	 * from the sending conn until all their frags are sent.
    325	 *
    326	 * XXX we could spend more on the wire to get more robust failure
    327	 * detection, arguably worth it to avoid data corruption.
    328	 */
    329	if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
    330	    (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
    331		rds_stats_inc(s_recv_drop_old_seq);
    332		goto out;
    333	}
    334	cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
    335
    336	if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
    337		if (inc->i_hdr.h_sport == 0) {
    338			rdsdebug("ignore ping with 0 sport from %pI6c\n",
    339				 saddr);
    340			goto out;
    341		}
    342		rds_stats_inc(s_recv_ping);
    343		rds_send_pong(cp, inc->i_hdr.h_sport);
    344		/* if this is a handshake ping, start multipath if necessary */
    345		if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport),
    346				 be16_to_cpu(inc->i_hdr.h_dport))) {
    347			rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
    348			rds_start_mprds(cp->cp_conn);
    349		}
    350		goto out;
    351	}
    352
    353	if (be16_to_cpu(inc->i_hdr.h_dport) ==  RDS_FLAG_PROBE_PORT &&
    354	    inc->i_hdr.h_sport == 0) {
    355		rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
    356		/* if this is a handshake pong, start multipath if necessary */
    357		rds_start_mprds(cp->cp_conn);
    358		wake_up(&cp->cp_conn->c_hs_waitq);
    359		goto out;
    360	}
    361
    362	rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if);
    363	if (!rs) {
    364		rds_stats_inc(s_recv_drop_no_sock);
    365		goto out;
    366	}
    367
    368	/* Process extension headers */
    369	rds_recv_incoming_exthdrs(inc, rs);
    370
    371	/* We can be racing with rds_release() which marks the socket dead. */
    372	sk = rds_rs_to_sk(rs);
    373
    374	/* serialize with rds_release -> sock_orphan */
    375	write_lock_irqsave(&rs->rs_recv_lock, flags);
    376	if (!sock_flag(sk, SOCK_DEAD)) {
    377		rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
    378		rds_stats_inc(s_recv_queued);
    379		rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
    380				      be32_to_cpu(inc->i_hdr.h_len),
    381				      inc->i_hdr.h_dport);
    382		if (sock_flag(sk, SOCK_RCVTSTAMP))
    383			inc->i_usercopy.rx_tstamp = ktime_get_real();
    384		rds_inc_addref(inc);
    385		inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
    386		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
    387		__rds_wake_sk_sleep(sk);
    388	} else {
    389		rds_stats_inc(s_recv_drop_dead_sock);
    390	}
    391	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
    392
    393out:
    394	if (rs)
    395		rds_sock_put(rs);
    396}
    397EXPORT_SYMBOL_GPL(rds_recv_incoming);
    398
    399/*
    400 * be very careful here.  This is being called as the condition in
    401 * wait_event_*() needs to cope with being called many times.
    402 */
    403static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
    404{
    405	unsigned long flags;
    406
    407	if (!*inc) {
    408		read_lock_irqsave(&rs->rs_recv_lock, flags);
    409		if (!list_empty(&rs->rs_recv_queue)) {
    410			*inc = list_entry(rs->rs_recv_queue.next,
    411					  struct rds_incoming,
    412					  i_item);
    413			rds_inc_addref(*inc);
    414		}
    415		read_unlock_irqrestore(&rs->rs_recv_lock, flags);
    416	}
    417
    418	return *inc != NULL;
    419}
    420
    421static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
    422			    int drop)
    423{
    424	struct sock *sk = rds_rs_to_sk(rs);
    425	int ret = 0;
    426	unsigned long flags;
    427
    428	write_lock_irqsave(&rs->rs_recv_lock, flags);
    429	if (!list_empty(&inc->i_item)) {
    430		ret = 1;
    431		if (drop) {
    432			/* XXX make sure this i_conn is reliable */
    433			rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
    434					      -be32_to_cpu(inc->i_hdr.h_len),
    435					      inc->i_hdr.h_dport);
    436			list_del_init(&inc->i_item);
    437			rds_inc_put(inc);
    438		}
    439	}
    440	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
    441
    442	rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
    443	return ret;
    444}
    445
    446/*
    447 * Pull errors off the error queue.
    448 * If msghdr is NULL, we will just purge the error queue.
    449 */
    450int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
    451{
    452	struct rds_notifier *notifier;
    453	struct rds_rdma_notify cmsg;
    454	unsigned int count = 0, max_messages = ~0U;
    455	unsigned long flags;
    456	LIST_HEAD(copy);
    457	int err = 0;
    458
    459	memset(&cmsg, 0, sizeof(cmsg));	/* fill holes with zero */
    460
    461	/* put_cmsg copies to user space and thus may sleep. We can't do this
    462	 * with rs_lock held, so first grab as many notifications as we can stuff
    463	 * in the user provided cmsg buffer. We don't try to copy more, to avoid
    464	 * losing notifications - except when the buffer is so small that it wouldn't
    465	 * even hold a single notification. Then we give him as much of this single
    466	 * msg as we can squeeze in, and set MSG_CTRUNC.
    467	 */
    468	if (msghdr) {
    469		max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
    470		if (!max_messages)
    471			max_messages = 1;
    472	}
    473
    474	spin_lock_irqsave(&rs->rs_lock, flags);
    475	while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
    476		notifier = list_entry(rs->rs_notify_queue.next,
    477				struct rds_notifier, n_list);
    478		list_move(&notifier->n_list, &copy);
    479		count++;
    480	}
    481	spin_unlock_irqrestore(&rs->rs_lock, flags);
    482
    483	if (!count)
    484		return 0;
    485
    486	while (!list_empty(&copy)) {
    487		notifier = list_entry(copy.next, struct rds_notifier, n_list);
    488
    489		if (msghdr) {
    490			cmsg.user_token = notifier->n_user_token;
    491			cmsg.status = notifier->n_status;
    492
    493			err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
    494				       sizeof(cmsg), &cmsg);
    495			if (err)
    496				break;
    497		}
    498
    499		list_del_init(&notifier->n_list);
    500		kfree(notifier);
    501	}
    502
    503	/* If we bailed out because of an error in put_cmsg,
    504	 * we may be left with one or more notifications that we
    505	 * didn't process. Return them to the head of the list. */
    506	if (!list_empty(&copy)) {
    507		spin_lock_irqsave(&rs->rs_lock, flags);
    508		list_splice(&copy, &rs->rs_notify_queue);
    509		spin_unlock_irqrestore(&rs->rs_lock, flags);
    510	}
    511
    512	return err;
    513}
    514
    515/*
    516 * Queue a congestion notification
    517 */
    518static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
    519{
    520	uint64_t notify = rs->rs_cong_notify;
    521	unsigned long flags;
    522	int err;
    523
    524	err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
    525			sizeof(notify), &notify);
    526	if (err)
    527		return err;
    528
    529	spin_lock_irqsave(&rs->rs_lock, flags);
    530	rs->rs_cong_notify &= ~notify;
    531	spin_unlock_irqrestore(&rs->rs_lock, flags);
    532
    533	return 0;
    534}
    535
    536/*
    537 * Receive any control messages.
    538 */
    539static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
    540			 struct rds_sock *rs)
    541{
    542	int ret = 0;
    543
    544	if (inc->i_usercopy.rdma_cookie) {
    545		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
    546				sizeof(inc->i_usercopy.rdma_cookie),
    547				&inc->i_usercopy.rdma_cookie);
    548		if (ret)
    549			goto out;
    550	}
    551
    552	if ((inc->i_usercopy.rx_tstamp != 0) &&
    553	    sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
    554		struct __kernel_old_timeval tv =
    555			ns_to_kernel_old_timeval(inc->i_usercopy.rx_tstamp);
    556
    557		if (!sock_flag(rds_rs_to_sk(rs), SOCK_TSTAMP_NEW)) {
    558			ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_OLD,
    559				       sizeof(tv), &tv);
    560		} else {
    561			struct __kernel_sock_timeval sk_tv;
    562
    563			sk_tv.tv_sec = tv.tv_sec;
    564			sk_tv.tv_usec = tv.tv_usec;
    565
    566			ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_NEW,
    567				       sizeof(sk_tv), &sk_tv);
    568		}
    569
    570		if (ret)
    571			goto out;
    572	}
    573
    574	if (rs->rs_rx_traces) {
    575		struct rds_cmsg_rx_trace t;
    576		int i, j;
    577
    578		memset(&t, 0, sizeof(t));
    579		inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
    580		t.rx_traces =  rs->rs_rx_traces;
    581		for (i = 0; i < rs->rs_rx_traces; i++) {
    582			j = rs->rs_rx_trace[i];
    583			t.rx_trace_pos[i] = j;
    584			t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
    585					  inc->i_rx_lat_trace[j];
    586		}
    587
    588		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
    589			       sizeof(t), &t);
    590		if (ret)
    591			goto out;
    592	}
    593
    594out:
    595	return ret;
    596}
    597
    598static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
    599{
    600	struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
    601	struct rds_msg_zcopy_info *info = NULL;
    602	struct rds_zcopy_cookies *done;
    603	unsigned long flags;
    604
    605	if (!msg->msg_control)
    606		return false;
    607
    608	if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
    609	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
    610		return false;
    611
    612	spin_lock_irqsave(&q->lock, flags);
    613	if (!list_empty(&q->zcookie_head)) {
    614		info = list_entry(q->zcookie_head.next,
    615				  struct rds_msg_zcopy_info, rs_zcookie_next);
    616		list_del(&info->rs_zcookie_next);
    617	}
    618	spin_unlock_irqrestore(&q->lock, flags);
    619	if (!info)
    620		return false;
    621	done = &info->zcookies;
    622	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
    623		     done)) {
    624		spin_lock_irqsave(&q->lock, flags);
    625		list_add(&info->rs_zcookie_next, &q->zcookie_head);
    626		spin_unlock_irqrestore(&q->lock, flags);
    627		return false;
    628	}
    629	kfree(info);
    630	return true;
    631}
    632
    633int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
    634		int msg_flags)
    635{
    636	struct sock *sk = sock->sk;
    637	struct rds_sock *rs = rds_sk_to_rs(sk);
    638	long timeo;
    639	int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
    640	DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
    641	DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
    642	struct rds_incoming *inc = NULL;
    643
    644	/* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
    645	timeo = sock_rcvtimeo(sk, nonblock);
    646
    647	rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
    648
    649	if (msg_flags & MSG_OOB)
    650		goto out;
    651	if (msg_flags & MSG_ERRQUEUE)
    652		return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
    653
    654	while (1) {
    655		/* If there are pending notifications, do those - and nothing else */
    656		if (!list_empty(&rs->rs_notify_queue)) {
    657			ret = rds_notify_queue_get(rs, msg);
    658			break;
    659		}
    660
    661		if (rs->rs_cong_notify) {
    662			ret = rds_notify_cong(rs, msg);
    663			break;
    664		}
    665
    666		if (!rds_next_incoming(rs, &inc)) {
    667			if (nonblock) {
    668				bool reaped = rds_recvmsg_zcookie(rs, msg);
    669
    670				ret = reaped ?  0 : -EAGAIN;
    671				break;
    672			}
    673
    674			timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
    675					(!list_empty(&rs->rs_notify_queue) ||
    676					 rs->rs_cong_notify ||
    677					 rds_next_incoming(rs, &inc)), timeo);
    678			rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
    679				 timeo);
    680			if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
    681				continue;
    682
    683			ret = timeo;
    684			if (ret == 0)
    685				ret = -ETIMEDOUT;
    686			break;
    687		}
    688
    689		rdsdebug("copying inc %p from %pI6c:%u to user\n", inc,
    690			 &inc->i_conn->c_faddr,
    691			 ntohs(inc->i_hdr.h_sport));
    692		ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
    693		if (ret < 0)
    694			break;
    695
    696		/*
    697		 * if the message we just copied isn't at the head of the
    698		 * recv queue then someone else raced us to return it, try
    699		 * to get the next message.
    700		 */
    701		if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
    702			rds_inc_put(inc);
    703			inc = NULL;
    704			rds_stats_inc(s_recv_deliver_raced);
    705			iov_iter_revert(&msg->msg_iter, ret);
    706			continue;
    707		}
    708
    709		if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
    710			if (msg_flags & MSG_TRUNC)
    711				ret = be32_to_cpu(inc->i_hdr.h_len);
    712			msg->msg_flags |= MSG_TRUNC;
    713		}
    714
    715		if (rds_cmsg_recv(inc, msg, rs)) {
    716			ret = -EFAULT;
    717			break;
    718		}
    719		rds_recvmsg_zcookie(rs, msg);
    720
    721		rds_stats_inc(s_recv_delivered);
    722
    723		if (msg->msg_name) {
    724			if (ipv6_addr_v4mapped(&inc->i_saddr)) {
    725				sin->sin_family = AF_INET;
    726				sin->sin_port = inc->i_hdr.h_sport;
    727				sin->sin_addr.s_addr =
    728				    inc->i_saddr.s6_addr32[3];
    729				memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
    730				msg->msg_namelen = sizeof(*sin);
    731			} else {
    732				sin6->sin6_family = AF_INET6;
    733				sin6->sin6_port = inc->i_hdr.h_sport;
    734				sin6->sin6_addr = inc->i_saddr;
    735				sin6->sin6_flowinfo = 0;
    736				sin6->sin6_scope_id = rs->rs_bound_scope_id;
    737				msg->msg_namelen = sizeof(*sin6);
    738			}
    739		}
    740		break;
    741	}
    742
    743	if (inc)
    744		rds_inc_put(inc);
    745
    746out:
    747	return ret;
    748}
    749
    750/*
    751 * The socket is being shut down and we're asked to drop messages that were
    752 * queued for recvmsg.  The caller has unbound the socket so the receive path
    753 * won't queue any more incoming fragments or messages on the socket.
    754 */
    755void rds_clear_recv_queue(struct rds_sock *rs)
    756{
    757	struct sock *sk = rds_rs_to_sk(rs);
    758	struct rds_incoming *inc, *tmp;
    759	unsigned long flags;
    760
    761	write_lock_irqsave(&rs->rs_recv_lock, flags);
    762	list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
    763		rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
    764				      -be32_to_cpu(inc->i_hdr.h_len),
    765				      inc->i_hdr.h_dport);
    766		list_del_init(&inc->i_item);
    767		rds_inc_put(inc);
    768	}
    769	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
    770}
    771
    772/*
    773 * inc->i_saddr isn't used here because it is only set in the receive
    774 * path.
    775 */
    776void rds_inc_info_copy(struct rds_incoming *inc,
    777		       struct rds_info_iterator *iter,
    778		       __be32 saddr, __be32 daddr, int flip)
    779{
    780	struct rds_info_message minfo;
    781
    782	minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
    783	minfo.len = be32_to_cpu(inc->i_hdr.h_len);
    784	minfo.tos = inc->i_conn->c_tos;
    785
    786	if (flip) {
    787		minfo.laddr = daddr;
    788		minfo.faddr = saddr;
    789		minfo.lport = inc->i_hdr.h_dport;
    790		minfo.fport = inc->i_hdr.h_sport;
    791	} else {
    792		minfo.laddr = saddr;
    793		minfo.faddr = daddr;
    794		minfo.lport = inc->i_hdr.h_sport;
    795		minfo.fport = inc->i_hdr.h_dport;
    796	}
    797
    798	minfo.flags = 0;
    799
    800	rds_info_copy(iter, &minfo, sizeof(minfo));
    801}
    802
    803#if IS_ENABLED(CONFIG_IPV6)
    804void rds6_inc_info_copy(struct rds_incoming *inc,
    805			struct rds_info_iterator *iter,
    806			struct in6_addr *saddr, struct in6_addr *daddr,
    807			int flip)
    808{
    809	struct rds6_info_message minfo6;
    810
    811	minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence);
    812	minfo6.len = be32_to_cpu(inc->i_hdr.h_len);
    813	minfo6.tos = inc->i_conn->c_tos;
    814
    815	if (flip) {
    816		minfo6.laddr = *daddr;
    817		minfo6.faddr = *saddr;
    818		minfo6.lport = inc->i_hdr.h_dport;
    819		minfo6.fport = inc->i_hdr.h_sport;
    820	} else {
    821		minfo6.laddr = *saddr;
    822		minfo6.faddr = *daddr;
    823		minfo6.lport = inc->i_hdr.h_sport;
    824		minfo6.fport = inc->i_hdr.h_dport;
    825	}
    826
    827	minfo6.flags = 0;
    828
    829	rds_info_copy(iter, &minfo6, sizeof(minfo6));
    830}
    831#endif