cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xprtsock.c (85870B)


      1// SPDX-License-Identifier: GPL-2.0
      2/*
      3 * linux/net/sunrpc/xprtsock.c
      4 *
      5 * Client-side transport implementation for sockets.
      6 *
      7 * TCP callback races fixes (C) 1998 Red Hat
      8 * TCP send fixes (C) 1998 Red Hat
      9 * TCP NFS related read + write fixes
     10 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
     11 *
     12 * Rewrite of larges part of the code in order to stabilize TCP stuff.
     13 * Fix behaviour when socket buffer is full.
     14 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
     15 *
     16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
     17 *
     18 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
     19 *   <gilles.quillard@bull.net>
     20 */
     21
     22#include <linux/types.h>
     23#include <linux/string.h>
     24#include <linux/slab.h>
     25#include <linux/module.h>
     26#include <linux/capability.h>
     27#include <linux/pagemap.h>
     28#include <linux/errno.h>
     29#include <linux/socket.h>
     30#include <linux/in.h>
     31#include <linux/net.h>
     32#include <linux/mm.h>
     33#include <linux/un.h>
     34#include <linux/udp.h>
     35#include <linux/tcp.h>
     36#include <linux/sunrpc/clnt.h>
     37#include <linux/sunrpc/addr.h>
     38#include <linux/sunrpc/sched.h>
     39#include <linux/sunrpc/svcsock.h>
     40#include <linux/sunrpc/xprtsock.h>
     41#include <linux/file.h>
     42#ifdef CONFIG_SUNRPC_BACKCHANNEL
     43#include <linux/sunrpc/bc_xprt.h>
     44#endif
     45
     46#include <net/sock.h>
     47#include <net/checksum.h>
     48#include <net/udp.h>
     49#include <net/tcp.h>
     50#include <linux/bvec.h>
     51#include <linux/highmem.h>
     52#include <linux/uio.h>
     53#include <linux/sched/mm.h>
     54
     55#include <trace/events/sunrpc.h>
     56
     57#include "socklib.h"
     58#include "sunrpc.h"
     59
     60static void xs_close(struct rpc_xprt *xprt);
     61static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock);
     62static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
     63		struct socket *sock);
     64
     65/*
     66 * xprtsock tunables
     67 */
     68static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
     69static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
     70static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
     71
     72static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
     73static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
     74
     75#define XS_TCP_LINGER_TO	(15U * HZ)
     76static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
     77
     78/*
     79 * We can register our own files under /proc/sys/sunrpc by
     80 * calling register_sysctl_table() again.  The files in that
     81 * directory become the union of all files registered there.
     82 *
     83 * We simply need to make sure that we don't collide with
     84 * someone else's file names!
     85 */
     86
     87static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
     88static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
     89static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
     90static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
     91static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
     92
     93static struct ctl_table_header *sunrpc_table_header;
     94
     95static struct xprt_class xs_local_transport;
     96static struct xprt_class xs_udp_transport;
     97static struct xprt_class xs_tcp_transport;
     98static struct xprt_class xs_bc_tcp_transport;
     99
    100/*
    101 * FIXME: changing the UDP slot table size should also resize the UDP
    102 *        socket buffers for existing UDP transports
    103 */
    104static struct ctl_table xs_tunables_table[] = {
    105	{
    106		.procname	= "udp_slot_table_entries",
    107		.data		= &xprt_udp_slot_table_entries,
    108		.maxlen		= sizeof(unsigned int),
    109		.mode		= 0644,
    110		.proc_handler	= proc_dointvec_minmax,
    111		.extra1		= &min_slot_table_size,
    112		.extra2		= &max_slot_table_size
    113	},
    114	{
    115		.procname	= "tcp_slot_table_entries",
    116		.data		= &xprt_tcp_slot_table_entries,
    117		.maxlen		= sizeof(unsigned int),
    118		.mode		= 0644,
    119		.proc_handler	= proc_dointvec_minmax,
    120		.extra1		= &min_slot_table_size,
    121		.extra2		= &max_slot_table_size
    122	},
    123	{
    124		.procname	= "tcp_max_slot_table_entries",
    125		.data		= &xprt_max_tcp_slot_table_entries,
    126		.maxlen		= sizeof(unsigned int),
    127		.mode		= 0644,
    128		.proc_handler	= proc_dointvec_minmax,
    129		.extra1		= &min_slot_table_size,
    130		.extra2		= &max_tcp_slot_table_limit
    131	},
    132	{
    133		.procname	= "min_resvport",
    134		.data		= &xprt_min_resvport,
    135		.maxlen		= sizeof(unsigned int),
    136		.mode		= 0644,
    137		.proc_handler	= proc_dointvec_minmax,
    138		.extra1		= &xprt_min_resvport_limit,
    139		.extra2		= &xprt_max_resvport_limit
    140	},
    141	{
    142		.procname	= "max_resvport",
    143		.data		= &xprt_max_resvport,
    144		.maxlen		= sizeof(unsigned int),
    145		.mode		= 0644,
    146		.proc_handler	= proc_dointvec_minmax,
    147		.extra1		= &xprt_min_resvport_limit,
    148		.extra2		= &xprt_max_resvport_limit
    149	},
    150	{
    151		.procname	= "tcp_fin_timeout",
    152		.data		= &xs_tcp_fin_timeout,
    153		.maxlen		= sizeof(xs_tcp_fin_timeout),
    154		.mode		= 0644,
    155		.proc_handler	= proc_dointvec_jiffies,
    156	},
    157	{ },
    158};
    159
    160static struct ctl_table sunrpc_table[] = {
    161	{
    162		.procname	= "sunrpc",
    163		.mode		= 0555,
    164		.child		= xs_tunables_table
    165	},
    166	{ },
    167};
    168
    169/*
    170 * Wait duration for a reply from the RPC portmapper.
    171 */
    172#define XS_BIND_TO		(60U * HZ)
    173
    174/*
    175 * Delay if a UDP socket connect error occurs.  This is most likely some
    176 * kind of resource problem on the local host.
    177 */
    178#define XS_UDP_REEST_TO		(2U * HZ)
    179
    180/*
    181 * The reestablish timeout allows clients to delay for a bit before attempting
    182 * to reconnect to a server that just dropped our connection.
    183 *
    184 * We implement an exponential backoff when trying to reestablish a TCP
    185 * transport connection with the server.  Some servers like to drop a TCP
    186 * connection when they are overworked, so we start with a short timeout and
    187 * increase over time if the server is down or not responding.
    188 */
    189#define XS_TCP_INIT_REEST_TO	(3U * HZ)
    190
    191/*
    192 * TCP idle timeout; client drops the transport socket if it is idle
    193 * for this long.  Note that we also timeout UDP sockets to prevent
    194 * holding port numbers when there is no RPC traffic.
    195 */
    196#define XS_IDLE_DISC_TO		(5U * 60 * HZ)
    197
    198#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
    199# undef  RPC_DEBUG_DATA
    200# define RPCDBG_FACILITY	RPCDBG_TRANS
    201#endif
    202
    203#ifdef RPC_DEBUG_DATA
    204static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
    205{
    206	u8 *buf = (u8 *) packet;
    207	int j;
    208
    209	dprintk("RPC:       %s\n", msg);
    210	for (j = 0; j < count && j < 128; j += 4) {
    211		if (!(j & 31)) {
    212			if (j)
    213				dprintk("\n");
    214			dprintk("0x%04x ", j);
    215		}
    216		dprintk("%02x%02x%02x%02x ",
    217			buf[j], buf[j+1], buf[j+2], buf[j+3]);
    218	}
    219	dprintk("\n");
    220}
    221#else
    222static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
    223{
    224	/* NOP */
    225}
    226#endif
    227
    228static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
    229{
    230	return (struct rpc_xprt *) sk->sk_user_data;
    231}
    232
    233static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
    234{
    235	return (struct sockaddr *) &xprt->addr;
    236}
    237
    238static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
    239{
    240	return (struct sockaddr_un *) &xprt->addr;
    241}
    242
    243static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
    244{
    245	return (struct sockaddr_in *) &xprt->addr;
    246}
    247
    248static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
    249{
    250	return (struct sockaddr_in6 *) &xprt->addr;
    251}
    252
    253static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
    254{
    255	struct sockaddr *sap = xs_addr(xprt);
    256	struct sockaddr_in6 *sin6;
    257	struct sockaddr_in *sin;
    258	struct sockaddr_un *sun;
    259	char buf[128];
    260
    261	switch (sap->sa_family) {
    262	case AF_LOCAL:
    263		sun = xs_addr_un(xprt);
    264		strlcpy(buf, sun->sun_path, sizeof(buf));
    265		xprt->address_strings[RPC_DISPLAY_ADDR] =
    266						kstrdup(buf, GFP_KERNEL);
    267		break;
    268	case AF_INET:
    269		(void)rpc_ntop(sap, buf, sizeof(buf));
    270		xprt->address_strings[RPC_DISPLAY_ADDR] =
    271						kstrdup(buf, GFP_KERNEL);
    272		sin = xs_addr_in(xprt);
    273		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
    274		break;
    275	case AF_INET6:
    276		(void)rpc_ntop(sap, buf, sizeof(buf));
    277		xprt->address_strings[RPC_DISPLAY_ADDR] =
    278						kstrdup(buf, GFP_KERNEL);
    279		sin6 = xs_addr_in6(xprt);
    280		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
    281		break;
    282	default:
    283		BUG();
    284	}
    285
    286	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
    287}
    288
    289static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
    290{
    291	struct sockaddr *sap = xs_addr(xprt);
    292	char buf[128];
    293
    294	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
    295	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
    296
    297	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
    298	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
    299}
    300
    301static void xs_format_peer_addresses(struct rpc_xprt *xprt,
    302				     const char *protocol,
    303				     const char *netid)
    304{
    305	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
    306	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
    307	xs_format_common_peer_addresses(xprt);
    308	xs_format_common_peer_ports(xprt);
    309}
    310
    311static void xs_update_peer_port(struct rpc_xprt *xprt)
    312{
    313	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
    314	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
    315
    316	xs_format_common_peer_ports(xprt);
    317}
    318
    319static void xs_free_peer_addresses(struct rpc_xprt *xprt)
    320{
    321	unsigned int i;
    322
    323	for (i = 0; i < RPC_DISPLAY_MAX; i++)
    324		switch (i) {
    325		case RPC_DISPLAY_PROTO:
    326		case RPC_DISPLAY_NETID:
    327			continue;
    328		default:
    329			kfree(xprt->address_strings[i]);
    330		}
    331}
    332
    333static size_t
    334xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
    335{
    336	size_t i,n;
    337
    338	if (!want || !(buf->flags & XDRBUF_SPARSE_PAGES))
    339		return want;
    340	n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
    341	for (i = 0; i < n; i++) {
    342		if (buf->pages[i])
    343			continue;
    344		buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
    345		if (!buf->pages[i]) {
    346			i *= PAGE_SIZE;
    347			return i > buf->page_base ? i - buf->page_base : 0;
    348		}
    349	}
    350	return want;
    351}
    352
    353static ssize_t
    354xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
    355{
    356	ssize_t ret;
    357	if (seek != 0)
    358		iov_iter_advance(&msg->msg_iter, seek);
    359	ret = sock_recvmsg(sock, msg, flags);
    360	return ret > 0 ? ret + seek : ret;
    361}
    362
    363static ssize_t
    364xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
    365		struct kvec *kvec, size_t count, size_t seek)
    366{
    367	iov_iter_kvec(&msg->msg_iter, READ, kvec, 1, count);
    368	return xs_sock_recvmsg(sock, msg, flags, seek);
    369}
    370
    371static ssize_t
    372xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
    373		struct bio_vec *bvec, unsigned long nr, size_t count,
    374		size_t seek)
    375{
    376	iov_iter_bvec(&msg->msg_iter, READ, bvec, nr, count);
    377	return xs_sock_recvmsg(sock, msg, flags, seek);
    378}
    379
    380static ssize_t
    381xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
    382		size_t count)
    383{
    384	iov_iter_discard(&msg->msg_iter, READ, count);
    385	return sock_recvmsg(sock, msg, flags);
    386}
    387
    388#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE
    389static void
    390xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek)
    391{
    392	struct bvec_iter bi = {
    393		.bi_size = count,
    394	};
    395	struct bio_vec bv;
    396
    397	bvec_iter_advance(bvec, &bi, seek & PAGE_MASK);
    398	for_each_bvec(bv, bvec, bi, bi)
    399		flush_dcache_page(bv.bv_page);
    400}
    401#else
    402static inline void
    403xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek)
    404{
    405}
    406#endif
    407
    408static ssize_t
    409xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
    410		struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
    411{
    412	size_t want, seek_init = seek, offset = 0;
    413	ssize_t ret;
    414
    415	want = min_t(size_t, count, buf->head[0].iov_len);
    416	if (seek < want) {
    417		ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
    418		if (ret <= 0)
    419			goto sock_err;
    420		offset += ret;
    421		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
    422			goto out;
    423		if (ret != want)
    424			goto out;
    425		seek = 0;
    426	} else {
    427		seek -= want;
    428		offset += want;
    429	}
    430
    431	want = xs_alloc_sparse_pages(
    432		buf, min_t(size_t, count - offset, buf->page_len),
    433		GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
    434	if (seek < want) {
    435		ret = xs_read_bvec(sock, msg, flags, buf->bvec,
    436				xdr_buf_pagecount(buf),
    437				want + buf->page_base,
    438				seek + buf->page_base);
    439		if (ret <= 0)
    440			goto sock_err;
    441		xs_flush_bvec(buf->bvec, ret, seek + buf->page_base);
    442		ret -= buf->page_base;
    443		offset += ret;
    444		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
    445			goto out;
    446		if (ret != want)
    447			goto out;
    448		seek = 0;
    449	} else {
    450		seek -= want;
    451		offset += want;
    452	}
    453
    454	want = min_t(size_t, count - offset, buf->tail[0].iov_len);
    455	if (seek < want) {
    456		ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
    457		if (ret <= 0)
    458			goto sock_err;
    459		offset += ret;
    460		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
    461			goto out;
    462		if (ret != want)
    463			goto out;
    464	} else if (offset < seek_init)
    465		offset = seek_init;
    466	ret = -EMSGSIZE;
    467out:
    468	*read = offset - seek_init;
    469	return ret;
    470sock_err:
    471	offset += seek;
    472	goto out;
    473}
    474
    475static void
    476xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
    477{
    478	if (!transport->recv.copied) {
    479		if (buf->head[0].iov_len >= transport->recv.offset)
    480			memcpy(buf->head[0].iov_base,
    481					&transport->recv.xid,
    482					transport->recv.offset);
    483		transport->recv.copied = transport->recv.offset;
    484	}
    485}
    486
    487static bool
    488xs_read_stream_request_done(struct sock_xprt *transport)
    489{
    490	return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
    491}
    492
    493static void
    494xs_read_stream_check_eor(struct sock_xprt *transport,
    495		struct msghdr *msg)
    496{
    497	if (xs_read_stream_request_done(transport))
    498		msg->msg_flags |= MSG_EOR;
    499}
    500
    501static ssize_t
    502xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
    503		int flags, struct rpc_rqst *req)
    504{
    505	struct xdr_buf *buf = &req->rq_private_buf;
    506	size_t want, read;
    507	ssize_t ret;
    508
    509	xs_read_header(transport, buf);
    510
    511	want = transport->recv.len - transport->recv.offset;
    512	if (want != 0) {
    513		ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
    514				transport->recv.copied + want,
    515				transport->recv.copied,
    516				&read);
    517		transport->recv.offset += read;
    518		transport->recv.copied += read;
    519	}
    520
    521	if (transport->recv.offset == transport->recv.len)
    522		xs_read_stream_check_eor(transport, msg);
    523
    524	if (want == 0)
    525		return 0;
    526
    527	switch (ret) {
    528	default:
    529		break;
    530	case -EFAULT:
    531	case -EMSGSIZE:
    532		msg->msg_flags |= MSG_TRUNC;
    533		return read;
    534	case 0:
    535		return -ESHUTDOWN;
    536	}
    537	return ret < 0 ? ret : read;
    538}
    539
    540static size_t
    541xs_read_stream_headersize(bool isfrag)
    542{
    543	if (isfrag)
    544		return sizeof(__be32);
    545	return 3 * sizeof(__be32);
    546}
    547
    548static ssize_t
    549xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
    550		int flags, size_t want, size_t seek)
    551{
    552	struct kvec kvec = {
    553		.iov_base = &transport->recv.fraghdr,
    554		.iov_len = want,
    555	};
    556	return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
    557}
    558
    559#if defined(CONFIG_SUNRPC_BACKCHANNEL)
    560static ssize_t
    561xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
    562{
    563	struct rpc_xprt *xprt = &transport->xprt;
    564	struct rpc_rqst *req;
    565	ssize_t ret;
    566
    567	/* Is this transport associated with the backchannel? */
    568	if (!xprt->bc_serv)
    569		return -ESHUTDOWN;
    570
    571	/* Look up and lock the request corresponding to the given XID */
    572	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
    573	if (!req) {
    574		printk(KERN_WARNING "Callback slot table overflowed\n");
    575		return -ESHUTDOWN;
    576	}
    577	if (transport->recv.copied && !req->rq_private_buf.len)
    578		return -ESHUTDOWN;
    579
    580	ret = xs_read_stream_request(transport, msg, flags, req);
    581	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
    582		xprt_complete_bc_request(req, transport->recv.copied);
    583	else
    584		req->rq_private_buf.len = transport->recv.copied;
    585
    586	return ret;
    587}
    588#else /* CONFIG_SUNRPC_BACKCHANNEL */
    589static ssize_t
    590xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
    591{
    592	return -ESHUTDOWN;
    593}
    594#endif /* CONFIG_SUNRPC_BACKCHANNEL */
    595
    596static ssize_t
    597xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
    598{
    599	struct rpc_xprt *xprt = &transport->xprt;
    600	struct rpc_rqst *req;
    601	ssize_t ret = 0;
    602
    603	/* Look up and lock the request corresponding to the given XID */
    604	spin_lock(&xprt->queue_lock);
    605	req = xprt_lookup_rqst(xprt, transport->recv.xid);
    606	if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
    607		msg->msg_flags |= MSG_TRUNC;
    608		goto out;
    609	}
    610	xprt_pin_rqst(req);
    611	spin_unlock(&xprt->queue_lock);
    612
    613	ret = xs_read_stream_request(transport, msg, flags, req);
    614
    615	spin_lock(&xprt->queue_lock);
    616	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
    617		xprt_complete_rqst(req->rq_task, transport->recv.copied);
    618	else
    619		req->rq_private_buf.len = transport->recv.copied;
    620	xprt_unpin_rqst(req);
    621out:
    622	spin_unlock(&xprt->queue_lock);
    623	return ret;
    624}
    625
    626static ssize_t
    627xs_read_stream(struct sock_xprt *transport, int flags)
    628{
    629	struct msghdr msg = { 0 };
    630	size_t want, read = 0;
    631	ssize_t ret = 0;
    632
    633	if (transport->recv.len == 0) {
    634		want = xs_read_stream_headersize(transport->recv.copied != 0);
    635		ret = xs_read_stream_header(transport, &msg, flags, want,
    636				transport->recv.offset);
    637		if (ret <= 0)
    638			goto out_err;
    639		transport->recv.offset = ret;
    640		if (transport->recv.offset != want)
    641			return transport->recv.offset;
    642		transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
    643			RPC_FRAGMENT_SIZE_MASK;
    644		transport->recv.offset -= sizeof(transport->recv.fraghdr);
    645		read = ret;
    646	}
    647
    648	switch (be32_to_cpu(transport->recv.calldir)) {
    649	default:
    650		msg.msg_flags |= MSG_TRUNC;
    651		break;
    652	case RPC_CALL:
    653		ret = xs_read_stream_call(transport, &msg, flags);
    654		break;
    655	case RPC_REPLY:
    656		ret = xs_read_stream_reply(transport, &msg, flags);
    657	}
    658	if (msg.msg_flags & MSG_TRUNC) {
    659		transport->recv.calldir = cpu_to_be32(-1);
    660		transport->recv.copied = -1;
    661	}
    662	if (ret < 0)
    663		goto out_err;
    664	read += ret;
    665	if (transport->recv.offset < transport->recv.len) {
    666		if (!(msg.msg_flags & MSG_TRUNC))
    667			return read;
    668		msg.msg_flags = 0;
    669		ret = xs_read_discard(transport->sock, &msg, flags,
    670				transport->recv.len - transport->recv.offset);
    671		if (ret <= 0)
    672			goto out_err;
    673		transport->recv.offset += ret;
    674		read += ret;
    675		if (transport->recv.offset != transport->recv.len)
    676			return read;
    677	}
    678	if (xs_read_stream_request_done(transport)) {
    679		trace_xs_stream_read_request(transport);
    680		transport->recv.copied = 0;
    681	}
    682	transport->recv.offset = 0;
    683	transport->recv.len = 0;
    684	return read;
    685out_err:
    686	return ret != 0 ? ret : -ESHUTDOWN;
    687}
    688
    689static __poll_t xs_poll_socket(struct sock_xprt *transport)
    690{
    691	return transport->sock->ops->poll(transport->file, transport->sock,
    692			NULL);
    693}
    694
    695static bool xs_poll_socket_readable(struct sock_xprt *transport)
    696{
    697	__poll_t events = xs_poll_socket(transport);
    698
    699	return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
    700}
    701
    702static void xs_poll_check_readable(struct sock_xprt *transport)
    703{
    704
    705	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
    706	if (!xs_poll_socket_readable(transport))
    707		return;
    708	if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
    709		queue_work(xprtiod_workqueue, &transport->recv_worker);
    710}
    711
    712static void xs_stream_data_receive(struct sock_xprt *transport)
    713{
    714	size_t read = 0;
    715	ssize_t ret = 0;
    716
    717	mutex_lock(&transport->recv_mutex);
    718	if (transport->sock == NULL)
    719		goto out;
    720	for (;;) {
    721		ret = xs_read_stream(transport, MSG_DONTWAIT);
    722		if (ret < 0)
    723			break;
    724		read += ret;
    725		cond_resched();
    726	}
    727	if (ret == -ESHUTDOWN)
    728		kernel_sock_shutdown(transport->sock, SHUT_RDWR);
    729	else
    730		xs_poll_check_readable(transport);
    731out:
    732	mutex_unlock(&transport->recv_mutex);
    733	trace_xs_stream_read_data(&transport->xprt, ret, read);
    734}
    735
    736static void xs_stream_data_receive_workfn(struct work_struct *work)
    737{
    738	struct sock_xprt *transport =
    739		container_of(work, struct sock_xprt, recv_worker);
    740	unsigned int pflags = memalloc_nofs_save();
    741
    742	xs_stream_data_receive(transport);
    743	memalloc_nofs_restore(pflags);
    744}
    745
    746static void
    747xs_stream_reset_connect(struct sock_xprt *transport)
    748{
    749	transport->recv.offset = 0;
    750	transport->recv.len = 0;
    751	transport->recv.copied = 0;
    752	transport->xmit.offset = 0;
    753}
    754
    755static void
    756xs_stream_start_connect(struct sock_xprt *transport)
    757{
    758	transport->xprt.stat.connect_count++;
    759	transport->xprt.stat.connect_start = jiffies;
    760}
    761
    762#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
    763
    764/**
    765 * xs_nospace - handle transmit was incomplete
    766 * @req: pointer to RPC request
    767 * @transport: pointer to struct sock_xprt
    768 *
    769 */
    770static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
    771{
    772	struct rpc_xprt *xprt = &transport->xprt;
    773	struct sock *sk = transport->inet;
    774	int ret = -EAGAIN;
    775
    776	trace_rpc_socket_nospace(req, transport);
    777
    778	/* Protect against races with write_space */
    779	spin_lock(&xprt->transport_lock);
    780
    781	/* Don't race with disconnect */
    782	if (xprt_connected(xprt)) {
    783		/* wait for more buffer space */
    784		set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
    785		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
    786		sk->sk_write_pending++;
    787		xprt_wait_for_buffer_space(xprt);
    788	} else
    789		ret = -ENOTCONN;
    790
    791	spin_unlock(&xprt->transport_lock);
    792	return ret;
    793}
    794
    795static int xs_sock_nospace(struct rpc_rqst *req)
    796{
    797	struct sock_xprt *transport =
    798		container_of(req->rq_xprt, struct sock_xprt, xprt);
    799	struct sock *sk = transport->inet;
    800	int ret = -EAGAIN;
    801
    802	lock_sock(sk);
    803	if (!sock_writeable(sk))
    804		ret = xs_nospace(req, transport);
    805	release_sock(sk);
    806	return ret;
    807}
    808
    809static int xs_stream_nospace(struct rpc_rqst *req, bool vm_wait)
    810{
    811	struct sock_xprt *transport =
    812		container_of(req->rq_xprt, struct sock_xprt, xprt);
    813	struct sock *sk = transport->inet;
    814	int ret = -EAGAIN;
    815
    816	if (vm_wait)
    817		return -ENOBUFS;
    818	lock_sock(sk);
    819	if (!sk_stream_memory_free(sk))
    820		ret = xs_nospace(req, transport);
    821	release_sock(sk);
    822	return ret;
    823}
    824
    825static int
    826xs_stream_prepare_request(struct rpc_rqst *req)
    827{
    828	gfp_t gfp = rpc_task_gfp_mask();
    829	int ret;
    830
    831	ret = xdr_alloc_bvec(&req->rq_snd_buf, gfp);
    832	if (ret < 0)
    833		return ret;
    834	xdr_free_bvec(&req->rq_rcv_buf);
    835	return xdr_alloc_bvec(&req->rq_rcv_buf, gfp);
    836}
    837
    838/*
    839 * Determine if the previous message in the stream was aborted before it
    840 * could complete transmission.
    841 */
    842static bool
    843xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
    844{
    845	return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
    846}
    847
    848/*
    849 * Return the stream record marker field for a record of length < 2^31-1
    850 */
    851static rpc_fraghdr
    852xs_stream_record_marker(struct xdr_buf *xdr)
    853{
    854	if (!xdr->len)
    855		return 0;
    856	return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
    857}
    858
    859/**
    860 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
    861 * @req: pointer to RPC request
    862 *
    863 * Return values:
    864 *        0:	The request has been sent
    865 *   EAGAIN:	The socket was blocked, please call again later to
    866 *		complete the request
    867 * ENOTCONN:	Caller needs to invoke connect logic then call again
    868 *    other:	Some other error occurred, the request was not sent
    869 */
    870static int xs_local_send_request(struct rpc_rqst *req)
    871{
    872	struct rpc_xprt *xprt = req->rq_xprt;
    873	struct sock_xprt *transport =
    874				container_of(xprt, struct sock_xprt, xprt);
    875	struct xdr_buf *xdr = &req->rq_snd_buf;
    876	rpc_fraghdr rm = xs_stream_record_marker(xdr);
    877	unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
    878	struct msghdr msg = {
    879		.msg_flags	= XS_SENDMSG_FLAGS,
    880	};
    881	bool vm_wait;
    882	unsigned int sent;
    883	int status;
    884
    885	/* Close the stream if the previous transmission was incomplete */
    886	if (xs_send_request_was_aborted(transport, req)) {
    887		xprt_force_disconnect(xprt);
    888		return -ENOTCONN;
    889	}
    890
    891	xs_pktdump("packet data:",
    892			req->rq_svec->iov_base, req->rq_svec->iov_len);
    893
    894	vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
    895
    896	req->rq_xtime = ktime_get();
    897	status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
    898				   transport->xmit.offset, rm, &sent);
    899	dprintk("RPC:       %s(%u) = %d\n",
    900			__func__, xdr->len - transport->xmit.offset, status);
    901
    902	if (likely(sent > 0) || status == 0) {
    903		transport->xmit.offset += sent;
    904		req->rq_bytes_sent = transport->xmit.offset;
    905		if (likely(req->rq_bytes_sent >= msglen)) {
    906			req->rq_xmit_bytes_sent += transport->xmit.offset;
    907			transport->xmit.offset = 0;
    908			return 0;
    909		}
    910		status = -EAGAIN;
    911		vm_wait = false;
    912	}
    913
    914	switch (status) {
    915	case -EAGAIN:
    916		status = xs_stream_nospace(req, vm_wait);
    917		break;
    918	default:
    919		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
    920			-status);
    921		fallthrough;
    922	case -EPIPE:
    923		xprt_force_disconnect(xprt);
    924		status = -ENOTCONN;
    925	}
    926
    927	return status;
    928}
    929
    930/**
    931 * xs_udp_send_request - write an RPC request to a UDP socket
    932 * @req: pointer to RPC request
    933 *
    934 * Return values:
    935 *        0:	The request has been sent
    936 *   EAGAIN:	The socket was blocked, please call again later to
    937 *		complete the request
    938 * ENOTCONN:	Caller needs to invoke connect logic then call again
    939 *    other:	Some other error occurred, the request was not sent
    940 */
    941static int xs_udp_send_request(struct rpc_rqst *req)
    942{
    943	struct rpc_xprt *xprt = req->rq_xprt;
    944	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
    945	struct xdr_buf *xdr = &req->rq_snd_buf;
    946	struct msghdr msg = {
    947		.msg_name	= xs_addr(xprt),
    948		.msg_namelen	= xprt->addrlen,
    949		.msg_flags	= XS_SENDMSG_FLAGS,
    950	};
    951	unsigned int sent;
    952	int status;
    953
    954	xs_pktdump("packet data:",
    955				req->rq_svec->iov_base,
    956				req->rq_svec->iov_len);
    957
    958	if (!xprt_bound(xprt))
    959		return -ENOTCONN;
    960
    961	if (!xprt_request_get_cong(xprt, req))
    962		return -EBADSLT;
    963
    964	status = xdr_alloc_bvec(xdr, rpc_task_gfp_mask());
    965	if (status < 0)
    966		return status;
    967	req->rq_xtime = ktime_get();
    968	status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);
    969
    970	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
    971			xdr->len, status);
    972
    973	/* firewall is blocking us, don't return -EAGAIN or we end up looping */
    974	if (status == -EPERM)
    975		goto process_status;
    976
    977	if (status == -EAGAIN && sock_writeable(transport->inet))
    978		status = -ENOBUFS;
    979
    980	if (sent > 0 || status == 0) {
    981		req->rq_xmit_bytes_sent += sent;
    982		if (sent >= req->rq_slen)
    983			return 0;
    984		/* Still some bytes left; set up for a retry later. */
    985		status = -EAGAIN;
    986	}
    987
    988process_status:
    989	switch (status) {
    990	case -ENOTSOCK:
    991		status = -ENOTCONN;
    992		/* Should we call xs_close() here? */
    993		break;
    994	case -EAGAIN:
    995		status = xs_sock_nospace(req);
    996		break;
    997	case -ENETUNREACH:
    998	case -ENOBUFS:
    999	case -EPIPE:
   1000	case -ECONNREFUSED:
   1001	case -EPERM:
   1002		/* When the server has died, an ICMP port unreachable message
   1003		 * prompts ECONNREFUSED. */
   1004		break;
   1005	default:
   1006		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
   1007			-status);
   1008	}
   1009
   1010	return status;
   1011}
   1012
   1013/**
   1014 * xs_tcp_send_request - write an RPC request to a TCP socket
   1015 * @req: pointer to RPC request
   1016 *
   1017 * Return values:
   1018 *        0:	The request has been sent
   1019 *   EAGAIN:	The socket was blocked, please call again later to
   1020 *		complete the request
   1021 * ENOTCONN:	Caller needs to invoke connect logic then call again
   1022 *    other:	Some other error occurred, the request was not sent
   1023 *
   1024 * XXX: In the case of soft timeouts, should we eventually give up
   1025 *	if sendmsg is not able to make progress?
   1026 */
   1027static int xs_tcp_send_request(struct rpc_rqst *req)
   1028{
   1029	struct rpc_xprt *xprt = req->rq_xprt;
   1030	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1031	struct xdr_buf *xdr = &req->rq_snd_buf;
   1032	rpc_fraghdr rm = xs_stream_record_marker(xdr);
   1033	unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
   1034	struct msghdr msg = {
   1035		.msg_flags	= XS_SENDMSG_FLAGS,
   1036	};
   1037	bool vm_wait;
   1038	unsigned int sent;
   1039	int status;
   1040
   1041	/* Close the stream if the previous transmission was incomplete */
   1042	if (xs_send_request_was_aborted(transport, req)) {
   1043		if (transport->sock != NULL)
   1044			kernel_sock_shutdown(transport->sock, SHUT_RDWR);
   1045		return -ENOTCONN;
   1046	}
   1047	if (!transport->inet)
   1048		return -ENOTCONN;
   1049
   1050	xs_pktdump("packet data:",
   1051				req->rq_svec->iov_base,
   1052				req->rq_svec->iov_len);
   1053
   1054	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
   1055		xs_tcp_set_socket_timeouts(xprt, transport->sock);
   1056
   1057	xs_set_srcport(transport, transport->sock);
   1058
   1059	/* Continue transmitting the packet/record. We must be careful
   1060	 * to cope with writespace callbacks arriving _after_ we have
   1061	 * called sendmsg(). */
   1062	req->rq_xtime = ktime_get();
   1063	tcp_sock_set_cork(transport->inet, true);
   1064
   1065	vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
   1066
   1067	do {
   1068		status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
   1069					   transport->xmit.offset, rm, &sent);
   1070
   1071		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
   1072				xdr->len - transport->xmit.offset, status);
   1073
   1074		/* If we've sent the entire packet, immediately
   1075		 * reset the count of bytes sent. */
   1076		transport->xmit.offset += sent;
   1077		req->rq_bytes_sent = transport->xmit.offset;
   1078		if (likely(req->rq_bytes_sent >= msglen)) {
   1079			req->rq_xmit_bytes_sent += transport->xmit.offset;
   1080			transport->xmit.offset = 0;
   1081			if (atomic_long_read(&xprt->xmit_queuelen) == 1)
   1082				tcp_sock_set_cork(transport->inet, false);
   1083			return 0;
   1084		}
   1085
   1086		WARN_ON_ONCE(sent == 0 && status == 0);
   1087
   1088		if (sent > 0)
   1089			vm_wait = false;
   1090
   1091	} while (status == 0);
   1092
   1093	switch (status) {
   1094	case -ENOTSOCK:
   1095		status = -ENOTCONN;
   1096		/* Should we call xs_close() here? */
   1097		break;
   1098	case -EAGAIN:
   1099		status = xs_stream_nospace(req, vm_wait);
   1100		break;
   1101	case -ECONNRESET:
   1102	case -ECONNREFUSED:
   1103	case -ENOTCONN:
   1104	case -EADDRINUSE:
   1105	case -ENOBUFS:
   1106	case -EPIPE:
   1107		break;
   1108	default:
   1109		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
   1110			-status);
   1111	}
   1112
   1113	return status;
   1114}
   1115
   1116static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
   1117{
   1118	transport->old_data_ready = sk->sk_data_ready;
   1119	transport->old_state_change = sk->sk_state_change;
   1120	transport->old_write_space = sk->sk_write_space;
   1121	transport->old_error_report = sk->sk_error_report;
   1122}
   1123
   1124static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
   1125{
   1126	sk->sk_data_ready = transport->old_data_ready;
   1127	sk->sk_state_change = transport->old_state_change;
   1128	sk->sk_write_space = transport->old_write_space;
   1129	sk->sk_error_report = transport->old_error_report;
   1130}
   1131
   1132static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
   1133{
   1134	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1135
   1136	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
   1137	clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
   1138	clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
   1139	clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
   1140	clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
   1141}
   1142
   1143static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
   1144{
   1145	set_bit(nr, &transport->sock_state);
   1146	queue_work(xprtiod_workqueue, &transport->error_worker);
   1147}
   1148
   1149static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
   1150{
   1151	xprt->connect_cookie++;
   1152	smp_mb__before_atomic();
   1153	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
   1154	clear_bit(XPRT_CLOSING, &xprt->state);
   1155	xs_sock_reset_state_flags(xprt);
   1156	smp_mb__after_atomic();
   1157}
   1158
   1159/**
   1160 * xs_error_report - callback to handle TCP socket state errors
   1161 * @sk: socket
   1162 *
   1163 * Note: we don't call sock_error() since there may be a rpc_task
   1164 * using the socket, and so we don't want to clear sk->sk_err.
   1165 */
   1166static void xs_error_report(struct sock *sk)
   1167{
   1168	struct sock_xprt *transport;
   1169	struct rpc_xprt *xprt;
   1170
   1171	if (!(xprt = xprt_from_sock(sk)))
   1172		return;
   1173
   1174	transport = container_of(xprt, struct sock_xprt, xprt);
   1175	transport->xprt_err = -sk->sk_err;
   1176	if (transport->xprt_err == 0)
   1177		return;
   1178	dprintk("RPC:       xs_error_report client %p, error=%d...\n",
   1179			xprt, -transport->xprt_err);
   1180	trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err);
   1181
   1182	/* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */
   1183	smp_mb__before_atomic();
   1184	xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
   1185}
   1186
   1187static void xs_reset_transport(struct sock_xprt *transport)
   1188{
   1189	struct socket *sock = transport->sock;
   1190	struct sock *sk = transport->inet;
   1191	struct rpc_xprt *xprt = &transport->xprt;
   1192	struct file *filp = transport->file;
   1193
   1194	if (sk == NULL)
   1195		return;
   1196	/*
   1197	 * Make sure we're calling this in a context from which it is safe
   1198	 * to call __fput_sync(). In practice that means rpciod and the
   1199	 * system workqueue.
   1200	 */
   1201	if (!(current->flags & PF_WQ_WORKER)) {
   1202		WARN_ON_ONCE(1);
   1203		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
   1204		return;
   1205	}
   1206
   1207	if (atomic_read(&transport->xprt.swapper))
   1208		sk_clear_memalloc(sk);
   1209
   1210	kernel_sock_shutdown(sock, SHUT_RDWR);
   1211
   1212	mutex_lock(&transport->recv_mutex);
   1213	lock_sock(sk);
   1214	transport->inet = NULL;
   1215	transport->sock = NULL;
   1216	transport->file = NULL;
   1217
   1218	sk->sk_user_data = NULL;
   1219
   1220	xs_restore_old_callbacks(transport, sk);
   1221	xprt_clear_connected(xprt);
   1222	xs_sock_reset_connection_flags(xprt);
   1223	/* Reset stream record info */
   1224	xs_stream_reset_connect(transport);
   1225	release_sock(sk);
   1226	mutex_unlock(&transport->recv_mutex);
   1227
   1228	trace_rpc_socket_close(xprt, sock);
   1229	__fput_sync(filp);
   1230
   1231	xprt_disconnect_done(xprt);
   1232}
   1233
   1234/**
   1235 * xs_close - close a socket
   1236 * @xprt: transport
   1237 *
   1238 * This is used when all requests are complete; ie, no DRC state remains
   1239 * on the server we want to save.
   1240 *
   1241 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
   1242 * xs_reset_transport() zeroing the socket from underneath a writer.
   1243 */
   1244static void xs_close(struct rpc_xprt *xprt)
   1245{
   1246	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1247
   1248	dprintk("RPC:       xs_close xprt %p\n", xprt);
   1249
   1250	xs_reset_transport(transport);
   1251	xprt->reestablish_timeout = 0;
   1252}
   1253
   1254static void xs_inject_disconnect(struct rpc_xprt *xprt)
   1255{
   1256	dprintk("RPC:       injecting transport disconnect on xprt=%p\n",
   1257		xprt);
   1258	xprt_disconnect_done(xprt);
   1259}
   1260
   1261static void xs_xprt_free(struct rpc_xprt *xprt)
   1262{
   1263	xs_free_peer_addresses(xprt);
   1264	xprt_free(xprt);
   1265}
   1266
   1267/**
   1268 * xs_destroy - prepare to shutdown a transport
   1269 * @xprt: doomed transport
   1270 *
   1271 */
   1272static void xs_destroy(struct rpc_xprt *xprt)
   1273{
   1274	struct sock_xprt *transport = container_of(xprt,
   1275			struct sock_xprt, xprt);
   1276	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
   1277
   1278	cancel_delayed_work_sync(&transport->connect_worker);
   1279	xs_close(xprt);
   1280	cancel_work_sync(&transport->recv_worker);
   1281	cancel_work_sync(&transport->error_worker);
   1282	xs_xprt_free(xprt);
   1283	module_put(THIS_MODULE);
   1284}
   1285
   1286/**
   1287 * xs_udp_data_read_skb - receive callback for UDP sockets
   1288 * @xprt: transport
   1289 * @sk: socket
   1290 * @skb: skbuff
   1291 *
   1292 */
   1293static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
   1294		struct sock *sk,
   1295		struct sk_buff *skb)
   1296{
   1297	struct rpc_task *task;
   1298	struct rpc_rqst *rovr;
   1299	int repsize, copied;
   1300	u32 _xid;
   1301	__be32 *xp;
   1302
   1303	repsize = skb->len;
   1304	if (repsize < 4) {
   1305		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
   1306		return;
   1307	}
   1308
   1309	/* Copy the XID from the skb... */
   1310	xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid);
   1311	if (xp == NULL)
   1312		return;
   1313
   1314	/* Look up and lock the request corresponding to the given XID */
   1315	spin_lock(&xprt->queue_lock);
   1316	rovr = xprt_lookup_rqst(xprt, *xp);
   1317	if (!rovr)
   1318		goto out_unlock;
   1319	xprt_pin_rqst(rovr);
   1320	xprt_update_rtt(rovr->rq_task);
   1321	spin_unlock(&xprt->queue_lock);
   1322	task = rovr->rq_task;
   1323
   1324	if ((copied = rovr->rq_private_buf.buflen) > repsize)
   1325		copied = repsize;
   1326
   1327	/* Suck it into the iovec, verify checksum if not done by hw. */
   1328	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
   1329		spin_lock(&xprt->queue_lock);
   1330		__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
   1331		goto out_unpin;
   1332	}
   1333
   1334
   1335	spin_lock(&xprt->transport_lock);
   1336	xprt_adjust_cwnd(xprt, task, copied);
   1337	spin_unlock(&xprt->transport_lock);
   1338	spin_lock(&xprt->queue_lock);
   1339	xprt_complete_rqst(task, copied);
   1340	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
   1341out_unpin:
   1342	xprt_unpin_rqst(rovr);
   1343 out_unlock:
   1344	spin_unlock(&xprt->queue_lock);
   1345}
   1346
   1347static void xs_udp_data_receive(struct sock_xprt *transport)
   1348{
   1349	struct sk_buff *skb;
   1350	struct sock *sk;
   1351	int err;
   1352
   1353	mutex_lock(&transport->recv_mutex);
   1354	sk = transport->inet;
   1355	if (sk == NULL)
   1356		goto out;
   1357	for (;;) {
   1358		skb = skb_recv_udp(sk, MSG_DONTWAIT, &err);
   1359		if (skb == NULL)
   1360			break;
   1361		xs_udp_data_read_skb(&transport->xprt, sk, skb);
   1362		consume_skb(skb);
   1363		cond_resched();
   1364	}
   1365	xs_poll_check_readable(transport);
   1366out:
   1367	mutex_unlock(&transport->recv_mutex);
   1368}
   1369
   1370static void xs_udp_data_receive_workfn(struct work_struct *work)
   1371{
   1372	struct sock_xprt *transport =
   1373		container_of(work, struct sock_xprt, recv_worker);
   1374	unsigned int pflags = memalloc_nofs_save();
   1375
   1376	xs_udp_data_receive(transport);
   1377	memalloc_nofs_restore(pflags);
   1378}
   1379
   1380/**
   1381 * xs_data_ready - "data ready" callback for UDP sockets
   1382 * @sk: socket with data to read
   1383 *
   1384 */
   1385static void xs_data_ready(struct sock *sk)
   1386{
   1387	struct rpc_xprt *xprt;
   1388
   1389	dprintk("RPC:       xs_data_ready...\n");
   1390	xprt = xprt_from_sock(sk);
   1391	if (xprt != NULL) {
   1392		struct sock_xprt *transport = container_of(xprt,
   1393				struct sock_xprt, xprt);
   1394		transport->old_data_ready(sk);
   1395		/* Any data means we had a useful conversation, so
   1396		 * then we don't need to delay the next reconnect
   1397		 */
   1398		if (xprt->reestablish_timeout)
   1399			xprt->reestablish_timeout = 0;
   1400		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
   1401			queue_work(xprtiod_workqueue, &transport->recv_worker);
   1402	}
   1403}
   1404
   1405/*
   1406 * Helper function to force a TCP close if the server is sending
   1407 * junk and/or it has put us in CLOSE_WAIT
   1408 */
   1409static void xs_tcp_force_close(struct rpc_xprt *xprt)
   1410{
   1411	xprt_force_disconnect(xprt);
   1412}
   1413
   1414#if defined(CONFIG_SUNRPC_BACKCHANNEL)
   1415static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
   1416{
   1417	return PAGE_SIZE;
   1418}
   1419#endif /* CONFIG_SUNRPC_BACKCHANNEL */
   1420
   1421/**
   1422 * xs_local_state_change - callback to handle AF_LOCAL socket state changes
   1423 * @sk: socket whose state has changed
   1424 *
   1425 */
   1426static void xs_local_state_change(struct sock *sk)
   1427{
   1428	struct rpc_xprt *xprt;
   1429	struct sock_xprt *transport;
   1430
   1431	if (!(xprt = xprt_from_sock(sk)))
   1432		return;
   1433	transport = container_of(xprt, struct sock_xprt, xprt);
   1434	if (sk->sk_shutdown & SHUTDOWN_MASK) {
   1435		clear_bit(XPRT_CONNECTED, &xprt->state);
   1436		/* Trigger the socket release */
   1437		xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
   1438	}
   1439}
   1440
   1441/**
   1442 * xs_tcp_state_change - callback to handle TCP socket state changes
   1443 * @sk: socket whose state has changed
   1444 *
   1445 */
   1446static void xs_tcp_state_change(struct sock *sk)
   1447{
   1448	struct rpc_xprt *xprt;
   1449	struct sock_xprt *transport;
   1450
   1451	if (!(xprt = xprt_from_sock(sk)))
   1452		return;
   1453	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
   1454	dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
   1455			sk->sk_state, xprt_connected(xprt),
   1456			sock_flag(sk, SOCK_DEAD),
   1457			sock_flag(sk, SOCK_ZAPPED),
   1458			sk->sk_shutdown);
   1459
   1460	transport = container_of(xprt, struct sock_xprt, xprt);
   1461	trace_rpc_socket_state_change(xprt, sk->sk_socket);
   1462	switch (sk->sk_state) {
   1463	case TCP_ESTABLISHED:
   1464		if (!xprt_test_and_set_connected(xprt)) {
   1465			xprt->connect_cookie++;
   1466			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
   1467			xprt_clear_connecting(xprt);
   1468
   1469			xprt->stat.connect_count++;
   1470			xprt->stat.connect_time += (long)jiffies -
   1471						   xprt->stat.connect_start;
   1472			xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
   1473		}
   1474		break;
   1475	case TCP_FIN_WAIT1:
   1476		/* The client initiated a shutdown of the socket */
   1477		xprt->connect_cookie++;
   1478		xprt->reestablish_timeout = 0;
   1479		set_bit(XPRT_CLOSING, &xprt->state);
   1480		smp_mb__before_atomic();
   1481		clear_bit(XPRT_CONNECTED, &xprt->state);
   1482		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
   1483		smp_mb__after_atomic();
   1484		break;
   1485	case TCP_CLOSE_WAIT:
   1486		/* The server initiated a shutdown of the socket */
   1487		xprt->connect_cookie++;
   1488		clear_bit(XPRT_CONNECTED, &xprt->state);
   1489		xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
   1490		fallthrough;
   1491	case TCP_CLOSING:
   1492		/*
   1493		 * If the server closed down the connection, make sure that
   1494		 * we back off before reconnecting
   1495		 */
   1496		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
   1497			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
   1498		break;
   1499	case TCP_LAST_ACK:
   1500		set_bit(XPRT_CLOSING, &xprt->state);
   1501		smp_mb__before_atomic();
   1502		clear_bit(XPRT_CONNECTED, &xprt->state);
   1503		smp_mb__after_atomic();
   1504		break;
   1505	case TCP_CLOSE:
   1506		if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
   1507					&transport->sock_state))
   1508			xprt_clear_connecting(xprt);
   1509		clear_bit(XPRT_CLOSING, &xprt->state);
   1510		/* Trigger the socket release */
   1511		xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
   1512	}
   1513}
   1514
   1515static void xs_write_space(struct sock *sk)
   1516{
   1517	struct sock_xprt *transport;
   1518	struct rpc_xprt *xprt;
   1519
   1520	if (!sk->sk_socket)
   1521		return;
   1522	clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
   1523
   1524	if (unlikely(!(xprt = xprt_from_sock(sk))))
   1525		return;
   1526	transport = container_of(xprt, struct sock_xprt, xprt);
   1527	if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state))
   1528		return;
   1529	xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
   1530	sk->sk_write_pending--;
   1531}
   1532
   1533/**
   1534 * xs_udp_write_space - callback invoked when socket buffer space
   1535 *                             becomes available
   1536 * @sk: socket whose state has changed
   1537 *
   1538 * Called when more output buffer space is available for this socket.
   1539 * We try not to wake our writers until they can make "significant"
   1540 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
   1541 * with a bunch of small requests.
   1542 */
   1543static void xs_udp_write_space(struct sock *sk)
   1544{
   1545	/* from net/core/sock.c:sock_def_write_space */
   1546	if (sock_writeable(sk))
   1547		xs_write_space(sk);
   1548}
   1549
   1550/**
   1551 * xs_tcp_write_space - callback invoked when socket buffer space
   1552 *                             becomes available
   1553 * @sk: socket whose state has changed
   1554 *
   1555 * Called when more output buffer space is available for this socket.
   1556 * We try not to wake our writers until they can make "significant"
   1557 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
   1558 * with a bunch of small requests.
   1559 */
   1560static void xs_tcp_write_space(struct sock *sk)
   1561{
   1562	/* from net/core/stream.c:sk_stream_write_space */
   1563	if (sk_stream_is_writeable(sk))
   1564		xs_write_space(sk);
   1565}
   1566
   1567static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
   1568{
   1569	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1570	struct sock *sk = transport->inet;
   1571
   1572	if (transport->rcvsize) {
   1573		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
   1574		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
   1575	}
   1576	if (transport->sndsize) {
   1577		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
   1578		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
   1579		sk->sk_write_space(sk);
   1580	}
   1581}
   1582
   1583/**
   1584 * xs_udp_set_buffer_size - set send and receive limits
   1585 * @xprt: generic transport
   1586 * @sndsize: requested size of send buffer, in bytes
   1587 * @rcvsize: requested size of receive buffer, in bytes
   1588 *
   1589 * Set socket send and receive buffer size limits.
   1590 */
   1591static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
   1592{
   1593	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1594
   1595	transport->sndsize = 0;
   1596	if (sndsize)
   1597		transport->sndsize = sndsize + 1024;
   1598	transport->rcvsize = 0;
   1599	if (rcvsize)
   1600		transport->rcvsize = rcvsize + 1024;
   1601
   1602	xs_udp_do_set_buffer_size(xprt);
   1603}
   1604
   1605/**
   1606 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
   1607 * @xprt: controlling transport
   1608 * @task: task that timed out
   1609 *
   1610 * Adjust the congestion window after a retransmit timeout has occurred.
   1611 */
   1612static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
   1613{
   1614	spin_lock(&xprt->transport_lock);
   1615	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
   1616	spin_unlock(&xprt->transport_lock);
   1617}
   1618
   1619static int xs_get_random_port(void)
   1620{
   1621	unsigned short min = xprt_min_resvport, max = xprt_max_resvport;
   1622	unsigned short range;
   1623	unsigned short rand;
   1624
   1625	if (max < min)
   1626		return -EADDRINUSE;
   1627	range = max - min + 1;
   1628	rand = (unsigned short) prandom_u32() % range;
   1629	return rand + min;
   1630}
   1631
   1632static unsigned short xs_sock_getport(struct socket *sock)
   1633{
   1634	struct sockaddr_storage buf;
   1635	unsigned short port = 0;
   1636
   1637	if (kernel_getsockname(sock, (struct sockaddr *)&buf) < 0)
   1638		goto out;
   1639	switch (buf.ss_family) {
   1640	case AF_INET6:
   1641		port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port);
   1642		break;
   1643	case AF_INET:
   1644		port = ntohs(((struct sockaddr_in *)&buf)->sin_port);
   1645	}
   1646out:
   1647	return port;
   1648}
   1649
   1650/**
   1651 * xs_set_port - reset the port number in the remote endpoint address
   1652 * @xprt: generic transport
   1653 * @port: new port number
   1654 *
   1655 */
   1656static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
   1657{
   1658	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
   1659
   1660	rpc_set_port(xs_addr(xprt), port);
   1661	xs_update_peer_port(xprt);
   1662}
   1663
   1664static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
   1665{
   1666	if (transport->srcport == 0 && transport->xprt.reuseport)
   1667		transport->srcport = xs_sock_getport(sock);
   1668}
   1669
   1670static int xs_get_srcport(struct sock_xprt *transport)
   1671{
   1672	int port = transport->srcport;
   1673
   1674	if (port == 0 && transport->xprt.resvport)
   1675		port = xs_get_random_port();
   1676	return port;
   1677}
   1678
   1679static unsigned short xs_sock_srcport(struct rpc_xprt *xprt)
   1680{
   1681	struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt);
   1682	unsigned short ret = 0;
   1683	mutex_lock(&sock->recv_mutex);
   1684	if (sock->sock)
   1685		ret = xs_sock_getport(sock->sock);
   1686	mutex_unlock(&sock->recv_mutex);
   1687	return ret;
   1688}
   1689
   1690static int xs_sock_srcaddr(struct rpc_xprt *xprt, char *buf, size_t buflen)
   1691{
   1692	struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt);
   1693	union {
   1694		struct sockaddr sa;
   1695		struct sockaddr_storage st;
   1696	} saddr;
   1697	int ret = -ENOTCONN;
   1698
   1699	mutex_lock(&sock->recv_mutex);
   1700	if (sock->sock) {
   1701		ret = kernel_getsockname(sock->sock, &saddr.sa);
   1702		if (ret >= 0)
   1703			ret = snprintf(buf, buflen, "%pISc", &saddr.sa);
   1704	}
   1705	mutex_unlock(&sock->recv_mutex);
   1706	return ret;
   1707}
   1708
   1709static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
   1710{
   1711	if (transport->srcport != 0)
   1712		transport->srcport = 0;
   1713	if (!transport->xprt.resvport)
   1714		return 0;
   1715	if (port <= xprt_min_resvport || port > xprt_max_resvport)
   1716		return xprt_max_resvport;
   1717	return --port;
   1718}
   1719static int xs_bind(struct sock_xprt *transport, struct socket *sock)
   1720{
   1721	struct sockaddr_storage myaddr;
   1722	int err, nloop = 0;
   1723	int port = xs_get_srcport(transport);
   1724	unsigned short last;
   1725
   1726	/*
   1727	 * If we are asking for any ephemeral port (i.e. port == 0 &&
   1728	 * transport->xprt.resvport == 0), don't bind.  Let the local
   1729	 * port selection happen implicitly when the socket is used
   1730	 * (for example at connect time).
   1731	 *
   1732	 * This ensures that we can continue to establish TCP
   1733	 * connections even when all local ephemeral ports are already
   1734	 * a part of some TCP connection.  This makes no difference
   1735	 * for UDP sockets, but also doesn't harm them.
   1736	 *
   1737	 * If we're asking for any reserved port (i.e. port == 0 &&
   1738	 * transport->xprt.resvport == 1) xs_get_srcport above will
   1739	 * ensure that port is non-zero and we will bind as needed.
   1740	 */
   1741	if (port <= 0)
   1742		return port;
   1743
   1744	memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
   1745	do {
   1746		rpc_set_port((struct sockaddr *)&myaddr, port);
   1747		err = kernel_bind(sock, (struct sockaddr *)&myaddr,
   1748				transport->xprt.addrlen);
   1749		if (err == 0) {
   1750			if (transport->xprt.reuseport)
   1751				transport->srcport = port;
   1752			break;
   1753		}
   1754		last = port;
   1755		port = xs_next_srcport(transport, port);
   1756		if (port > last)
   1757			nloop++;
   1758	} while (err == -EADDRINUSE && nloop != 2);
   1759
   1760	if (myaddr.ss_family == AF_INET)
   1761		dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
   1762				&((struct sockaddr_in *)&myaddr)->sin_addr,
   1763				port, err ? "failed" : "ok", err);
   1764	else
   1765		dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
   1766				&((struct sockaddr_in6 *)&myaddr)->sin6_addr,
   1767				port, err ? "failed" : "ok", err);
   1768	return err;
   1769}
   1770
   1771/*
   1772 * We don't support autobind on AF_LOCAL sockets
   1773 */
   1774static void xs_local_rpcbind(struct rpc_task *task)
   1775{
   1776	xprt_set_bound(task->tk_xprt);
   1777}
   1778
   1779static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
   1780{
   1781}
   1782
   1783#ifdef CONFIG_DEBUG_LOCK_ALLOC
   1784static struct lock_class_key xs_key[3];
   1785static struct lock_class_key xs_slock_key[3];
   1786
   1787static inline void xs_reclassify_socketu(struct socket *sock)
   1788{
   1789	struct sock *sk = sock->sk;
   1790
   1791	sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
   1792		&xs_slock_key[0], "sk_lock-AF_LOCAL-RPC", &xs_key[0]);
   1793}
   1794
   1795static inline void xs_reclassify_socket4(struct socket *sock)
   1796{
   1797	struct sock *sk = sock->sk;
   1798
   1799	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
   1800		&xs_slock_key[1], "sk_lock-AF_INET-RPC", &xs_key[1]);
   1801}
   1802
   1803static inline void xs_reclassify_socket6(struct socket *sock)
   1804{
   1805	struct sock *sk = sock->sk;
   1806
   1807	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
   1808		&xs_slock_key[2], "sk_lock-AF_INET6-RPC", &xs_key[2]);
   1809}
   1810
   1811static inline void xs_reclassify_socket(int family, struct socket *sock)
   1812{
   1813	if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk)))
   1814		return;
   1815
   1816	switch (family) {
   1817	case AF_LOCAL:
   1818		xs_reclassify_socketu(sock);
   1819		break;
   1820	case AF_INET:
   1821		xs_reclassify_socket4(sock);
   1822		break;
   1823	case AF_INET6:
   1824		xs_reclassify_socket6(sock);
   1825		break;
   1826	}
   1827}
   1828#else
   1829static inline void xs_reclassify_socket(int family, struct socket *sock)
   1830{
   1831}
   1832#endif
   1833
   1834static void xs_dummy_setup_socket(struct work_struct *work)
   1835{
   1836}
   1837
   1838static struct socket *xs_create_sock(struct rpc_xprt *xprt,
   1839		struct sock_xprt *transport, int family, int type,
   1840		int protocol, bool reuseport)
   1841{
   1842	struct file *filp;
   1843	struct socket *sock;
   1844	int err;
   1845
   1846	err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
   1847	if (err < 0) {
   1848		dprintk("RPC:       can't create %d transport socket (%d).\n",
   1849				protocol, -err);
   1850		goto out;
   1851	}
   1852	xs_reclassify_socket(family, sock);
   1853
   1854	if (reuseport)
   1855		sock_set_reuseport(sock->sk);
   1856
   1857	err = xs_bind(transport, sock);
   1858	if (err) {
   1859		sock_release(sock);
   1860		goto out;
   1861	}
   1862
   1863	filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
   1864	if (IS_ERR(filp))
   1865		return ERR_CAST(filp);
   1866	transport->file = filp;
   1867
   1868	return sock;
   1869out:
   1870	return ERR_PTR(err);
   1871}
   1872
   1873static int xs_local_finish_connecting(struct rpc_xprt *xprt,
   1874				      struct socket *sock)
   1875{
   1876	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
   1877									xprt);
   1878
   1879	if (!transport->inet) {
   1880		struct sock *sk = sock->sk;
   1881
   1882		lock_sock(sk);
   1883
   1884		xs_save_old_callbacks(transport, sk);
   1885
   1886		sk->sk_user_data = xprt;
   1887		sk->sk_data_ready = xs_data_ready;
   1888		sk->sk_write_space = xs_udp_write_space;
   1889		sk->sk_state_change = xs_local_state_change;
   1890		sk->sk_error_report = xs_error_report;
   1891
   1892		xprt_clear_connected(xprt);
   1893
   1894		/* Reset to new socket */
   1895		transport->sock = sock;
   1896		transport->inet = sk;
   1897
   1898		release_sock(sk);
   1899	}
   1900
   1901	xs_stream_start_connect(transport);
   1902
   1903	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
   1904}
   1905
   1906/**
   1907 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
   1908 * @transport: socket transport to connect
   1909 */
   1910static int xs_local_setup_socket(struct sock_xprt *transport)
   1911{
   1912	struct rpc_xprt *xprt = &transport->xprt;
   1913	struct file *filp;
   1914	struct socket *sock;
   1915	int status;
   1916
   1917	status = __sock_create(xprt->xprt_net, AF_LOCAL,
   1918					SOCK_STREAM, 0, &sock, 1);
   1919	if (status < 0) {
   1920		dprintk("RPC:       can't create AF_LOCAL "
   1921			"transport socket (%d).\n", -status);
   1922		goto out;
   1923	}
   1924	xs_reclassify_socket(AF_LOCAL, sock);
   1925
   1926	filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
   1927	if (IS_ERR(filp)) {
   1928		status = PTR_ERR(filp);
   1929		goto out;
   1930	}
   1931	transport->file = filp;
   1932
   1933	dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
   1934			xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
   1935
   1936	status = xs_local_finish_connecting(xprt, sock);
   1937	trace_rpc_socket_connect(xprt, sock, status);
   1938	switch (status) {
   1939	case 0:
   1940		dprintk("RPC:       xprt %p connected to %s\n",
   1941				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
   1942		xprt->stat.connect_count++;
   1943		xprt->stat.connect_time += (long)jiffies -
   1944					   xprt->stat.connect_start;
   1945		xprt_set_connected(xprt);
   1946		break;
   1947	case -ENOBUFS:
   1948		break;
   1949	case -ENOENT:
   1950		dprintk("RPC:       xprt %p: socket %s does not exist\n",
   1951				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
   1952		break;
   1953	case -ECONNREFUSED:
   1954		dprintk("RPC:       xprt %p: connection refused for %s\n",
   1955				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
   1956		break;
   1957	default:
   1958		printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
   1959				__func__, -status,
   1960				xprt->address_strings[RPC_DISPLAY_ADDR]);
   1961	}
   1962
   1963out:
   1964	xprt_clear_connecting(xprt);
   1965	xprt_wake_pending_tasks(xprt, status);
   1966	return status;
   1967}
   1968
   1969static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
   1970{
   1971	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   1972	int ret;
   1973
   1974	if (transport->file)
   1975		goto force_disconnect;
   1976
   1977	if (RPC_IS_ASYNC(task)) {
   1978		/*
   1979		 * We want the AF_LOCAL connect to be resolved in the
   1980		 * filesystem namespace of the process making the rpc
   1981		 * call.  Thus we connect synchronously.
   1982		 *
   1983		 * If we want to support asynchronous AF_LOCAL calls,
   1984		 * we'll need to figure out how to pass a namespace to
   1985		 * connect.
   1986		 */
   1987		task->tk_rpc_status = -ENOTCONN;
   1988		rpc_exit(task, -ENOTCONN);
   1989		goto out_wake;
   1990	}
   1991	ret = xs_local_setup_socket(transport);
   1992	if (ret && !RPC_IS_SOFTCONN(task))
   1993		msleep_interruptible(15000);
   1994	return;
   1995force_disconnect:
   1996	xprt_force_disconnect(xprt);
   1997out_wake:
   1998	xprt_clear_connecting(xprt);
   1999	xprt_wake_pending_tasks(xprt, -ENOTCONN);
   2000}
   2001
   2002#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
   2003/*
   2004 * Note that this should be called with XPRT_LOCKED held, or recv_mutex
   2005 * held, or when we otherwise know that we have exclusive access to the
   2006 * socket, to guard against races with xs_reset_transport.
   2007 */
   2008static void xs_set_memalloc(struct rpc_xprt *xprt)
   2009{
   2010	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
   2011			xprt);
   2012
   2013	/*
   2014	 * If there's no sock, then we have nothing to set. The
   2015	 * reconnecting process will get it for us.
   2016	 */
   2017	if (!transport->inet)
   2018		return;
   2019	if (atomic_read(&xprt->swapper))
   2020		sk_set_memalloc(transport->inet);
   2021}
   2022
   2023/**
   2024 * xs_enable_swap - Tag this transport as being used for swap.
   2025 * @xprt: transport to tag
   2026 *
   2027 * Take a reference to this transport on behalf of the rpc_clnt, and
   2028 * optionally mark it for swapping if it wasn't already.
   2029 */
   2030static int
   2031xs_enable_swap(struct rpc_xprt *xprt)
   2032{
   2033	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
   2034
   2035	mutex_lock(&xs->recv_mutex);
   2036	if (atomic_inc_return(&xprt->swapper) == 1 &&
   2037	    xs->inet)
   2038		sk_set_memalloc(xs->inet);
   2039	mutex_unlock(&xs->recv_mutex);
   2040	return 0;
   2041}
   2042
   2043/**
   2044 * xs_disable_swap - Untag this transport as being used for swap.
   2045 * @xprt: transport to tag
   2046 *
   2047 * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the
   2048 * swapper refcount goes to 0, untag the socket as a memalloc socket.
   2049 */
   2050static void
   2051xs_disable_swap(struct rpc_xprt *xprt)
   2052{
   2053	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
   2054
   2055	mutex_lock(&xs->recv_mutex);
   2056	if (atomic_dec_and_test(&xprt->swapper) &&
   2057	    xs->inet)
   2058		sk_clear_memalloc(xs->inet);
   2059	mutex_unlock(&xs->recv_mutex);
   2060}
   2061#else
   2062static void xs_set_memalloc(struct rpc_xprt *xprt)
   2063{
   2064}
   2065
   2066static int
   2067xs_enable_swap(struct rpc_xprt *xprt)
   2068{
   2069	return -EINVAL;
   2070}
   2071
   2072static void
   2073xs_disable_swap(struct rpc_xprt *xprt)
   2074{
   2075}
   2076#endif
   2077
   2078static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
   2079{
   2080	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2081
   2082	if (!transport->inet) {
   2083		struct sock *sk = sock->sk;
   2084
   2085		lock_sock(sk);
   2086
   2087		xs_save_old_callbacks(transport, sk);
   2088
   2089		sk->sk_user_data = xprt;
   2090		sk->sk_data_ready = xs_data_ready;
   2091		sk->sk_write_space = xs_udp_write_space;
   2092
   2093		xprt_set_connected(xprt);
   2094
   2095		/* Reset to new socket */
   2096		transport->sock = sock;
   2097		transport->inet = sk;
   2098
   2099		xs_set_memalloc(xprt);
   2100
   2101		release_sock(sk);
   2102	}
   2103	xs_udp_do_set_buffer_size(xprt);
   2104
   2105	xprt->stat.connect_start = jiffies;
   2106}
   2107
   2108static void xs_udp_setup_socket(struct work_struct *work)
   2109{
   2110	struct sock_xprt *transport =
   2111		container_of(work, struct sock_xprt, connect_worker.work);
   2112	struct rpc_xprt *xprt = &transport->xprt;
   2113	struct socket *sock;
   2114	int status = -EIO;
   2115	unsigned int pflags = current->flags;
   2116
   2117	if (atomic_read(&xprt->swapper))
   2118		current->flags |= PF_MEMALLOC;
   2119	sock = xs_create_sock(xprt, transport,
   2120			xs_addr(xprt)->sa_family, SOCK_DGRAM,
   2121			IPPROTO_UDP, false);
   2122	if (IS_ERR(sock))
   2123		goto out;
   2124
   2125	dprintk("RPC:       worker connecting xprt %p via %s to "
   2126				"%s (port %s)\n", xprt,
   2127			xprt->address_strings[RPC_DISPLAY_PROTO],
   2128			xprt->address_strings[RPC_DISPLAY_ADDR],
   2129			xprt->address_strings[RPC_DISPLAY_PORT]);
   2130
   2131	xs_udp_finish_connecting(xprt, sock);
   2132	trace_rpc_socket_connect(xprt, sock, 0);
   2133	status = 0;
   2134out:
   2135	xprt_clear_connecting(xprt);
   2136	xprt_unlock_connect(xprt, transport);
   2137	xprt_wake_pending_tasks(xprt, status);
   2138	current_restore_flags(pflags, PF_MEMALLOC);
   2139}
   2140
   2141/**
   2142 * xs_tcp_shutdown - gracefully shut down a TCP socket
   2143 * @xprt: transport
   2144 *
   2145 * Initiates a graceful shutdown of the TCP socket by calling the
   2146 * equivalent of shutdown(SHUT_RDWR);
   2147 */
   2148static void xs_tcp_shutdown(struct rpc_xprt *xprt)
   2149{
   2150	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2151	struct socket *sock = transport->sock;
   2152	int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
   2153
   2154	if (sock == NULL)
   2155		return;
   2156	if (!xprt->reuseport) {
   2157		xs_close(xprt);
   2158		return;
   2159	}
   2160	switch (skst) {
   2161	case TCP_FIN_WAIT1:
   2162	case TCP_FIN_WAIT2:
   2163		break;
   2164	case TCP_ESTABLISHED:
   2165	case TCP_CLOSE_WAIT:
   2166		kernel_sock_shutdown(sock, SHUT_RDWR);
   2167		trace_rpc_socket_shutdown(xprt, sock);
   2168		break;
   2169	default:
   2170		xs_reset_transport(transport);
   2171	}
   2172}
   2173
   2174static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
   2175		struct socket *sock)
   2176{
   2177	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2178	unsigned int keepidle;
   2179	unsigned int keepcnt;
   2180	unsigned int timeo;
   2181
   2182	spin_lock(&xprt->transport_lock);
   2183	keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
   2184	keepcnt = xprt->timeout->to_retries + 1;
   2185	timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
   2186		(xprt->timeout->to_retries + 1);
   2187	clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
   2188	spin_unlock(&xprt->transport_lock);
   2189
   2190	/* TCP Keepalive options */
   2191	sock_set_keepalive(sock->sk);
   2192	tcp_sock_set_keepidle(sock->sk, keepidle);
   2193	tcp_sock_set_keepintvl(sock->sk, keepidle);
   2194	tcp_sock_set_keepcnt(sock->sk, keepcnt);
   2195
   2196	/* TCP user timeout (see RFC5482) */
   2197	tcp_sock_set_user_timeout(sock->sk, timeo);
   2198}
   2199
   2200static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
   2201		unsigned long connect_timeout,
   2202		unsigned long reconnect_timeout)
   2203{
   2204	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2205	struct rpc_timeout to;
   2206	unsigned long initval;
   2207
   2208	spin_lock(&xprt->transport_lock);
   2209	if (reconnect_timeout < xprt->max_reconnect_timeout)
   2210		xprt->max_reconnect_timeout = reconnect_timeout;
   2211	if (connect_timeout < xprt->connect_timeout) {
   2212		memcpy(&to, xprt->timeout, sizeof(to));
   2213		initval = DIV_ROUND_UP(connect_timeout, to.to_retries + 1);
   2214		/* Arbitrary lower limit */
   2215		if (initval <  XS_TCP_INIT_REEST_TO << 1)
   2216			initval = XS_TCP_INIT_REEST_TO << 1;
   2217		to.to_initval = initval;
   2218		to.to_maxval = initval;
   2219		memcpy(&transport->tcp_timeout, &to,
   2220				sizeof(transport->tcp_timeout));
   2221		xprt->timeout = &transport->tcp_timeout;
   2222		xprt->connect_timeout = connect_timeout;
   2223	}
   2224	set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
   2225	spin_unlock(&xprt->transport_lock);
   2226}
   2227
   2228static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
   2229{
   2230	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2231
   2232	if (!transport->inet) {
   2233		struct sock *sk = sock->sk;
   2234
   2235		/* Avoid temporary address, they are bad for long-lived
   2236		 * connections such as NFS mounts.
   2237		 * RFC4941, section 3.6 suggests that:
   2238		 *    Individual applications, which have specific
   2239		 *    knowledge about the normal duration of connections,
   2240		 *    MAY override this as appropriate.
   2241		 */
   2242		if (xs_addr(xprt)->sa_family == PF_INET6) {
   2243			ip6_sock_set_addr_preferences(sk,
   2244				IPV6_PREFER_SRC_PUBLIC);
   2245		}
   2246
   2247		xs_tcp_set_socket_timeouts(xprt, sock);
   2248		tcp_sock_set_nodelay(sk);
   2249
   2250		lock_sock(sk);
   2251
   2252		xs_save_old_callbacks(transport, sk);
   2253
   2254		sk->sk_user_data = xprt;
   2255		sk->sk_data_ready = xs_data_ready;
   2256		sk->sk_state_change = xs_tcp_state_change;
   2257		sk->sk_write_space = xs_tcp_write_space;
   2258		sk->sk_error_report = xs_error_report;
   2259
   2260		/* socket options */
   2261		sock_reset_flag(sk, SOCK_LINGER);
   2262
   2263		xprt_clear_connected(xprt);
   2264
   2265		/* Reset to new socket */
   2266		transport->sock = sock;
   2267		transport->inet = sk;
   2268
   2269		release_sock(sk);
   2270	}
   2271
   2272	if (!xprt_bound(xprt))
   2273		return -ENOTCONN;
   2274
   2275	xs_set_memalloc(xprt);
   2276
   2277	xs_stream_start_connect(transport);
   2278
   2279	/* Tell the socket layer to start connecting... */
   2280	set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
   2281	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
   2282}
   2283
   2284/**
   2285 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
   2286 * @work: queued work item
   2287 *
   2288 * Invoked by a work queue tasklet.
   2289 */
   2290static void xs_tcp_setup_socket(struct work_struct *work)
   2291{
   2292	struct sock_xprt *transport =
   2293		container_of(work, struct sock_xprt, connect_worker.work);
   2294	struct socket *sock = transport->sock;
   2295	struct rpc_xprt *xprt = &transport->xprt;
   2296	int status;
   2297	unsigned int pflags = current->flags;
   2298
   2299	if (atomic_read(&xprt->swapper))
   2300		current->flags |= PF_MEMALLOC;
   2301
   2302	if (xprt_connected(xprt))
   2303		goto out;
   2304	if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT,
   2305			       &transport->sock_state) ||
   2306	    !sock) {
   2307		xs_reset_transport(transport);
   2308		sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
   2309				      SOCK_STREAM, IPPROTO_TCP, true);
   2310		if (IS_ERR(sock)) {
   2311			xprt_wake_pending_tasks(xprt, PTR_ERR(sock));
   2312			goto out;
   2313		}
   2314	}
   2315
   2316	dprintk("RPC:       worker connecting xprt %p via %s to "
   2317				"%s (port %s)\n", xprt,
   2318			xprt->address_strings[RPC_DISPLAY_PROTO],
   2319			xprt->address_strings[RPC_DISPLAY_ADDR],
   2320			xprt->address_strings[RPC_DISPLAY_PORT]);
   2321
   2322	status = xs_tcp_finish_connecting(xprt, sock);
   2323	trace_rpc_socket_connect(xprt, sock, status);
   2324	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
   2325			xprt, -status, xprt_connected(xprt),
   2326			sock->sk->sk_state);
   2327	switch (status) {
   2328	case 0:
   2329	case -EINPROGRESS:
   2330		/* SYN_SENT! */
   2331		set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state);
   2332		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
   2333			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
   2334		fallthrough;
   2335	case -EALREADY:
   2336		goto out_unlock;
   2337	case -EADDRNOTAVAIL:
   2338		/* Source port number is unavailable. Try a new one! */
   2339		transport->srcport = 0;
   2340		status = -EAGAIN;
   2341		break;
   2342	case -EINVAL:
   2343		/* Happens, for instance, if the user specified a link
   2344		 * local IPv6 address without a scope-id.
   2345		 */
   2346	case -ECONNREFUSED:
   2347	case -ECONNRESET:
   2348	case -ENETDOWN:
   2349	case -ENETUNREACH:
   2350	case -EHOSTUNREACH:
   2351	case -EADDRINUSE:
   2352	case -ENOBUFS:
   2353		break;
   2354	default:
   2355		printk("%s: connect returned unhandled error %d\n",
   2356			__func__, status);
   2357		status = -EAGAIN;
   2358	}
   2359
   2360	/* xs_tcp_force_close() wakes tasks with a fixed error code.
   2361	 * We need to wake them first to ensure the correct error code.
   2362	 */
   2363	xprt_wake_pending_tasks(xprt, status);
   2364	xs_tcp_force_close(xprt);
   2365out:
   2366	xprt_clear_connecting(xprt);
   2367out_unlock:
   2368	xprt_unlock_connect(xprt, transport);
   2369	current_restore_flags(pflags, PF_MEMALLOC);
   2370}
   2371
   2372/**
   2373 * xs_connect - connect a socket to a remote endpoint
   2374 * @xprt: pointer to transport structure
   2375 * @task: address of RPC task that manages state of connect request
   2376 *
   2377 * TCP: If the remote end dropped the connection, delay reconnecting.
   2378 *
   2379 * UDP socket connects are synchronous, but we use a work queue anyway
   2380 * to guarantee that even unprivileged user processes can set up a
   2381 * socket on a privileged port.
   2382 *
   2383 * If a UDP socket connect fails, the delay behavior here prevents
   2384 * retry floods (hard mounts).
   2385 */
   2386static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
   2387{
   2388	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2389	unsigned long delay = 0;
   2390
   2391	WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
   2392
   2393	if (transport->sock != NULL) {
   2394		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
   2395			"seconds\n", xprt, xprt->reestablish_timeout / HZ);
   2396
   2397		delay = xprt_reconnect_delay(xprt);
   2398		xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
   2399
   2400	} else
   2401		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
   2402
   2403	queue_delayed_work(xprtiod_workqueue,
   2404			&transport->connect_worker,
   2405			delay);
   2406}
   2407
   2408static void xs_wake_disconnect(struct sock_xprt *transport)
   2409{
   2410	if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
   2411		xs_tcp_force_close(&transport->xprt);
   2412}
   2413
   2414static void xs_wake_write(struct sock_xprt *transport)
   2415{
   2416	if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
   2417		xprt_write_space(&transport->xprt);
   2418}
   2419
   2420static void xs_wake_error(struct sock_xprt *transport)
   2421{
   2422	int sockerr;
   2423
   2424	if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
   2425		return;
   2426	mutex_lock(&transport->recv_mutex);
   2427	if (transport->sock == NULL)
   2428		goto out;
   2429	if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
   2430		goto out;
   2431	sockerr = xchg(&transport->xprt_err, 0);
   2432	if (sockerr < 0)
   2433		xprt_wake_pending_tasks(&transport->xprt, sockerr);
   2434out:
   2435	mutex_unlock(&transport->recv_mutex);
   2436}
   2437
   2438static void xs_wake_pending(struct sock_xprt *transport)
   2439{
   2440	if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
   2441		xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
   2442}
   2443
   2444static void xs_error_handle(struct work_struct *work)
   2445{
   2446	struct sock_xprt *transport = container_of(work,
   2447			struct sock_xprt, error_worker);
   2448
   2449	xs_wake_disconnect(transport);
   2450	xs_wake_write(transport);
   2451	xs_wake_error(transport);
   2452	xs_wake_pending(transport);
   2453}
   2454
   2455/**
   2456 * xs_local_print_stats - display AF_LOCAL socket-specific stats
   2457 * @xprt: rpc_xprt struct containing statistics
   2458 * @seq: output file
   2459 *
   2460 */
   2461static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
   2462{
   2463	long idle_time = 0;
   2464
   2465	if (xprt_connected(xprt))
   2466		idle_time = (long)(jiffies - xprt->last_used) / HZ;
   2467
   2468	seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
   2469			"%llu %llu %lu %llu %llu\n",
   2470			xprt->stat.bind_count,
   2471			xprt->stat.connect_count,
   2472			xprt->stat.connect_time / HZ,
   2473			idle_time,
   2474			xprt->stat.sends,
   2475			xprt->stat.recvs,
   2476			xprt->stat.bad_xids,
   2477			xprt->stat.req_u,
   2478			xprt->stat.bklog_u,
   2479			xprt->stat.max_slots,
   2480			xprt->stat.sending_u,
   2481			xprt->stat.pending_u);
   2482}
   2483
   2484/**
   2485 * xs_udp_print_stats - display UDP socket-specific stats
   2486 * @xprt: rpc_xprt struct containing statistics
   2487 * @seq: output file
   2488 *
   2489 */
   2490static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
   2491{
   2492	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2493
   2494	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu "
   2495			"%lu %llu %llu\n",
   2496			transport->srcport,
   2497			xprt->stat.bind_count,
   2498			xprt->stat.sends,
   2499			xprt->stat.recvs,
   2500			xprt->stat.bad_xids,
   2501			xprt->stat.req_u,
   2502			xprt->stat.bklog_u,
   2503			xprt->stat.max_slots,
   2504			xprt->stat.sending_u,
   2505			xprt->stat.pending_u);
   2506}
   2507
   2508/**
   2509 * xs_tcp_print_stats - display TCP socket-specific stats
   2510 * @xprt: rpc_xprt struct containing statistics
   2511 * @seq: output file
   2512 *
   2513 */
   2514static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
   2515{
   2516	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
   2517	long idle_time = 0;
   2518
   2519	if (xprt_connected(xprt))
   2520		idle_time = (long)(jiffies - xprt->last_used) / HZ;
   2521
   2522	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu "
   2523			"%llu %llu %lu %llu %llu\n",
   2524			transport->srcport,
   2525			xprt->stat.bind_count,
   2526			xprt->stat.connect_count,
   2527			xprt->stat.connect_time / HZ,
   2528			idle_time,
   2529			xprt->stat.sends,
   2530			xprt->stat.recvs,
   2531			xprt->stat.bad_xids,
   2532			xprt->stat.req_u,
   2533			xprt->stat.bklog_u,
   2534			xprt->stat.max_slots,
   2535			xprt->stat.sending_u,
   2536			xprt->stat.pending_u);
   2537}
   2538
   2539/*
   2540 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
   2541 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
   2542 * to use the server side send routines.
   2543 */
   2544static int bc_malloc(struct rpc_task *task)
   2545{
   2546	struct rpc_rqst *rqst = task->tk_rqstp;
   2547	size_t size = rqst->rq_callsize;
   2548	struct page *page;
   2549	struct rpc_buffer *buf;
   2550
   2551	if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
   2552		WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
   2553			  size);
   2554		return -EINVAL;
   2555	}
   2556
   2557	page = alloc_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
   2558	if (!page)
   2559		return -ENOMEM;
   2560
   2561	buf = page_address(page);
   2562	buf->len = PAGE_SIZE;
   2563
   2564	rqst->rq_buffer = buf->data;
   2565	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
   2566	return 0;
   2567}
   2568
   2569/*
   2570 * Free the space allocated in the bc_alloc routine
   2571 */
   2572static void bc_free(struct rpc_task *task)
   2573{
   2574	void *buffer = task->tk_rqstp->rq_buffer;
   2575	struct rpc_buffer *buf;
   2576
   2577	buf = container_of(buffer, struct rpc_buffer, data);
   2578	free_page((unsigned long)buf);
   2579}
   2580
   2581static int bc_sendto(struct rpc_rqst *req)
   2582{
   2583	struct xdr_buf *xdr = &req->rq_snd_buf;
   2584	struct sock_xprt *transport =
   2585			container_of(req->rq_xprt, struct sock_xprt, xprt);
   2586	struct msghdr msg = {
   2587		.msg_flags	= 0,
   2588	};
   2589	rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
   2590					 (u32)xdr->len);
   2591	unsigned int sent = 0;
   2592	int err;
   2593
   2594	req->rq_xtime = ktime_get();
   2595	err = xdr_alloc_bvec(xdr, rpc_task_gfp_mask());
   2596	if (err < 0)
   2597		return err;
   2598	err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent);
   2599	xdr_free_bvec(xdr);
   2600	if (err < 0 || sent != (xdr->len + sizeof(marker)))
   2601		return -EAGAIN;
   2602	return sent;
   2603}
   2604
   2605/**
   2606 * bc_send_request - Send a backchannel Call on a TCP socket
   2607 * @req: rpc_rqst containing Call message to be sent
   2608 *
   2609 * xpt_mutex ensures @rqstp's whole message is written to the socket
   2610 * without interruption.
   2611 *
   2612 * Return values:
   2613 *   %0 if the message was sent successfully
   2614 *   %ENOTCONN if the message was not sent
   2615 */
   2616static int bc_send_request(struct rpc_rqst *req)
   2617{
   2618	struct svc_xprt	*xprt;
   2619	int len;
   2620
   2621	/*
   2622	 * Get the server socket associated with this callback xprt
   2623	 */
   2624	xprt = req->rq_xprt->bc_xprt;
   2625
   2626	/*
   2627	 * Grab the mutex to serialize data as the connection is shared
   2628	 * with the fore channel
   2629	 */
   2630	mutex_lock(&xprt->xpt_mutex);
   2631	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
   2632		len = -ENOTCONN;
   2633	else
   2634		len = bc_sendto(req);
   2635	mutex_unlock(&xprt->xpt_mutex);
   2636
   2637	if (len > 0)
   2638		len = 0;
   2639
   2640	return len;
   2641}
   2642
   2643/*
   2644 * The close routine. Since this is client initiated, we do nothing
   2645 */
   2646
   2647static void bc_close(struct rpc_xprt *xprt)
   2648{
   2649	xprt_disconnect_done(xprt);
   2650}
   2651
   2652/*
   2653 * The xprt destroy routine. Again, because this connection is client
   2654 * initiated, we do nothing
   2655 */
   2656
   2657static void bc_destroy(struct rpc_xprt *xprt)
   2658{
   2659	dprintk("RPC:       bc_destroy xprt %p\n", xprt);
   2660
   2661	xs_xprt_free(xprt);
   2662	module_put(THIS_MODULE);
   2663}
   2664
   2665static const struct rpc_xprt_ops xs_local_ops = {
   2666	.reserve_xprt		= xprt_reserve_xprt,
   2667	.release_xprt		= xprt_release_xprt,
   2668	.alloc_slot		= xprt_alloc_slot,
   2669	.free_slot		= xprt_free_slot,
   2670	.rpcbind		= xs_local_rpcbind,
   2671	.set_port		= xs_local_set_port,
   2672	.connect		= xs_local_connect,
   2673	.buf_alloc		= rpc_malloc,
   2674	.buf_free		= rpc_free,
   2675	.prepare_request	= xs_stream_prepare_request,
   2676	.send_request		= xs_local_send_request,
   2677	.wait_for_reply_request	= xprt_wait_for_reply_request_def,
   2678	.close			= xs_close,
   2679	.destroy		= xs_destroy,
   2680	.print_stats		= xs_local_print_stats,
   2681	.enable_swap		= xs_enable_swap,
   2682	.disable_swap		= xs_disable_swap,
   2683};
   2684
   2685static const struct rpc_xprt_ops xs_udp_ops = {
   2686	.set_buffer_size	= xs_udp_set_buffer_size,
   2687	.reserve_xprt		= xprt_reserve_xprt_cong,
   2688	.release_xprt		= xprt_release_xprt_cong,
   2689	.alloc_slot		= xprt_alloc_slot,
   2690	.free_slot		= xprt_free_slot,
   2691	.rpcbind		= rpcb_getport_async,
   2692	.set_port		= xs_set_port,
   2693	.connect		= xs_connect,
   2694	.get_srcaddr		= xs_sock_srcaddr,
   2695	.get_srcport		= xs_sock_srcport,
   2696	.buf_alloc		= rpc_malloc,
   2697	.buf_free		= rpc_free,
   2698	.send_request		= xs_udp_send_request,
   2699	.wait_for_reply_request	= xprt_wait_for_reply_request_rtt,
   2700	.timer			= xs_udp_timer,
   2701	.release_request	= xprt_release_rqst_cong,
   2702	.close			= xs_close,
   2703	.destroy		= xs_destroy,
   2704	.print_stats		= xs_udp_print_stats,
   2705	.enable_swap		= xs_enable_swap,
   2706	.disable_swap		= xs_disable_swap,
   2707	.inject_disconnect	= xs_inject_disconnect,
   2708};
   2709
   2710static const struct rpc_xprt_ops xs_tcp_ops = {
   2711	.reserve_xprt		= xprt_reserve_xprt,
   2712	.release_xprt		= xprt_release_xprt,
   2713	.alloc_slot		= xprt_alloc_slot,
   2714	.free_slot		= xprt_free_slot,
   2715	.rpcbind		= rpcb_getport_async,
   2716	.set_port		= xs_set_port,
   2717	.connect		= xs_connect,
   2718	.get_srcaddr		= xs_sock_srcaddr,
   2719	.get_srcport		= xs_sock_srcport,
   2720	.buf_alloc		= rpc_malloc,
   2721	.buf_free		= rpc_free,
   2722	.prepare_request	= xs_stream_prepare_request,
   2723	.send_request		= xs_tcp_send_request,
   2724	.wait_for_reply_request	= xprt_wait_for_reply_request_def,
   2725	.close			= xs_tcp_shutdown,
   2726	.destroy		= xs_destroy,
   2727	.set_connect_timeout	= xs_tcp_set_connect_timeout,
   2728	.print_stats		= xs_tcp_print_stats,
   2729	.enable_swap		= xs_enable_swap,
   2730	.disable_swap		= xs_disable_swap,
   2731	.inject_disconnect	= xs_inject_disconnect,
   2732#ifdef CONFIG_SUNRPC_BACKCHANNEL
   2733	.bc_setup		= xprt_setup_bc,
   2734	.bc_maxpayload		= xs_tcp_bc_maxpayload,
   2735	.bc_num_slots		= xprt_bc_max_slots,
   2736	.bc_free_rqst		= xprt_free_bc_rqst,
   2737	.bc_destroy		= xprt_destroy_bc,
   2738#endif
   2739};
   2740
   2741/*
   2742 * The rpc_xprt_ops for the server backchannel
   2743 */
   2744
   2745static const struct rpc_xprt_ops bc_tcp_ops = {
   2746	.reserve_xprt		= xprt_reserve_xprt,
   2747	.release_xprt		= xprt_release_xprt,
   2748	.alloc_slot		= xprt_alloc_slot,
   2749	.free_slot		= xprt_free_slot,
   2750	.buf_alloc		= bc_malloc,
   2751	.buf_free		= bc_free,
   2752	.send_request		= bc_send_request,
   2753	.wait_for_reply_request	= xprt_wait_for_reply_request_def,
   2754	.close			= bc_close,
   2755	.destroy		= bc_destroy,
   2756	.print_stats		= xs_tcp_print_stats,
   2757	.enable_swap		= xs_enable_swap,
   2758	.disable_swap		= xs_disable_swap,
   2759	.inject_disconnect	= xs_inject_disconnect,
   2760};
   2761
   2762static int xs_init_anyaddr(const int family, struct sockaddr *sap)
   2763{
   2764	static const struct sockaddr_in sin = {
   2765		.sin_family		= AF_INET,
   2766		.sin_addr.s_addr	= htonl(INADDR_ANY),
   2767	};
   2768	static const struct sockaddr_in6 sin6 = {
   2769		.sin6_family		= AF_INET6,
   2770		.sin6_addr		= IN6ADDR_ANY_INIT,
   2771	};
   2772
   2773	switch (family) {
   2774	case AF_LOCAL:
   2775		break;
   2776	case AF_INET:
   2777		memcpy(sap, &sin, sizeof(sin));
   2778		break;
   2779	case AF_INET6:
   2780		memcpy(sap, &sin6, sizeof(sin6));
   2781		break;
   2782	default:
   2783		dprintk("RPC:       %s: Bad address family\n", __func__);
   2784		return -EAFNOSUPPORT;
   2785	}
   2786	return 0;
   2787}
   2788
   2789static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
   2790				      unsigned int slot_table_size,
   2791				      unsigned int max_slot_table_size)
   2792{
   2793	struct rpc_xprt *xprt;
   2794	struct sock_xprt *new;
   2795
   2796	if (args->addrlen > sizeof(xprt->addr)) {
   2797		dprintk("RPC:       xs_setup_xprt: address too large\n");
   2798		return ERR_PTR(-EBADF);
   2799	}
   2800
   2801	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
   2802			max_slot_table_size);
   2803	if (xprt == NULL) {
   2804		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
   2805				"rpc_xprt\n");
   2806		return ERR_PTR(-ENOMEM);
   2807	}
   2808
   2809	new = container_of(xprt, struct sock_xprt, xprt);
   2810	mutex_init(&new->recv_mutex);
   2811	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
   2812	xprt->addrlen = args->addrlen;
   2813	if (args->srcaddr)
   2814		memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
   2815	else {
   2816		int err;
   2817		err = xs_init_anyaddr(args->dstaddr->sa_family,
   2818					(struct sockaddr *)&new->srcaddr);
   2819		if (err != 0) {
   2820			xprt_free(xprt);
   2821			return ERR_PTR(err);
   2822		}
   2823	}
   2824
   2825	return xprt;
   2826}
   2827
   2828static const struct rpc_timeout xs_local_default_timeout = {
   2829	.to_initval = 10 * HZ,
   2830	.to_maxval = 10 * HZ,
   2831	.to_retries = 2,
   2832};
   2833
   2834/**
   2835 * xs_setup_local - Set up transport to use an AF_LOCAL socket
   2836 * @args: rpc transport creation arguments
   2837 *
   2838 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
   2839 */
   2840static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
   2841{
   2842	struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
   2843	struct sock_xprt *transport;
   2844	struct rpc_xprt *xprt;
   2845	struct rpc_xprt *ret;
   2846
   2847	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
   2848			xprt_max_tcp_slot_table_entries);
   2849	if (IS_ERR(xprt))
   2850		return xprt;
   2851	transport = container_of(xprt, struct sock_xprt, xprt);
   2852
   2853	xprt->prot = 0;
   2854	xprt->xprt_class = &xs_local_transport;
   2855	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
   2856
   2857	xprt->bind_timeout = XS_BIND_TO;
   2858	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
   2859	xprt->idle_timeout = XS_IDLE_DISC_TO;
   2860
   2861	xprt->ops = &xs_local_ops;
   2862	xprt->timeout = &xs_local_default_timeout;
   2863
   2864	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
   2865	INIT_WORK(&transport->error_worker, xs_error_handle);
   2866	INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
   2867
   2868	switch (sun->sun_family) {
   2869	case AF_LOCAL:
   2870		if (sun->sun_path[0] != '/') {
   2871			dprintk("RPC:       bad AF_LOCAL address: %s\n",
   2872					sun->sun_path);
   2873			ret = ERR_PTR(-EINVAL);
   2874			goto out_err;
   2875		}
   2876		xprt_set_bound(xprt);
   2877		xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
   2878		break;
   2879	default:
   2880		ret = ERR_PTR(-EAFNOSUPPORT);
   2881		goto out_err;
   2882	}
   2883
   2884	dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
   2885			xprt->address_strings[RPC_DISPLAY_ADDR]);
   2886
   2887	if (try_module_get(THIS_MODULE))
   2888		return xprt;
   2889	ret = ERR_PTR(-EINVAL);
   2890out_err:
   2891	xs_xprt_free(xprt);
   2892	return ret;
   2893}
   2894
   2895static const struct rpc_timeout xs_udp_default_timeout = {
   2896	.to_initval = 5 * HZ,
   2897	.to_maxval = 30 * HZ,
   2898	.to_increment = 5 * HZ,
   2899	.to_retries = 5,
   2900};
   2901
   2902/**
   2903 * xs_setup_udp - Set up transport to use a UDP socket
   2904 * @args: rpc transport creation arguments
   2905 *
   2906 */
   2907static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
   2908{
   2909	struct sockaddr *addr = args->dstaddr;
   2910	struct rpc_xprt *xprt;
   2911	struct sock_xprt *transport;
   2912	struct rpc_xprt *ret;
   2913
   2914	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
   2915			xprt_udp_slot_table_entries);
   2916	if (IS_ERR(xprt))
   2917		return xprt;
   2918	transport = container_of(xprt, struct sock_xprt, xprt);
   2919
   2920	xprt->prot = IPPROTO_UDP;
   2921	xprt->xprt_class = &xs_udp_transport;
   2922	/* XXX: header size can vary due to auth type, IPv6, etc. */
   2923	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
   2924
   2925	xprt->bind_timeout = XS_BIND_TO;
   2926	xprt->reestablish_timeout = XS_UDP_REEST_TO;
   2927	xprt->idle_timeout = XS_IDLE_DISC_TO;
   2928
   2929	xprt->ops = &xs_udp_ops;
   2930
   2931	xprt->timeout = &xs_udp_default_timeout;
   2932
   2933	INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
   2934	INIT_WORK(&transport->error_worker, xs_error_handle);
   2935	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
   2936
   2937	switch (addr->sa_family) {
   2938	case AF_INET:
   2939		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
   2940			xprt_set_bound(xprt);
   2941
   2942		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
   2943		break;
   2944	case AF_INET6:
   2945		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
   2946			xprt_set_bound(xprt);
   2947
   2948		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
   2949		break;
   2950	default:
   2951		ret = ERR_PTR(-EAFNOSUPPORT);
   2952		goto out_err;
   2953	}
   2954
   2955	if (xprt_bound(xprt))
   2956		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
   2957				xprt->address_strings[RPC_DISPLAY_ADDR],
   2958				xprt->address_strings[RPC_DISPLAY_PORT],
   2959				xprt->address_strings[RPC_DISPLAY_PROTO]);
   2960	else
   2961		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
   2962				xprt->address_strings[RPC_DISPLAY_ADDR],
   2963				xprt->address_strings[RPC_DISPLAY_PROTO]);
   2964
   2965	if (try_module_get(THIS_MODULE))
   2966		return xprt;
   2967	ret = ERR_PTR(-EINVAL);
   2968out_err:
   2969	xs_xprt_free(xprt);
   2970	return ret;
   2971}
   2972
   2973static const struct rpc_timeout xs_tcp_default_timeout = {
   2974	.to_initval = 60 * HZ,
   2975	.to_maxval = 60 * HZ,
   2976	.to_retries = 2,
   2977};
   2978
   2979/**
   2980 * xs_setup_tcp - Set up transport to use a TCP socket
   2981 * @args: rpc transport creation arguments
   2982 *
   2983 */
   2984static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
   2985{
   2986	struct sockaddr *addr = args->dstaddr;
   2987	struct rpc_xprt *xprt;
   2988	struct sock_xprt *transport;
   2989	struct rpc_xprt *ret;
   2990	unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries;
   2991
   2992	if (args->flags & XPRT_CREATE_INFINITE_SLOTS)
   2993		max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT;
   2994
   2995	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
   2996			max_slot_table_size);
   2997	if (IS_ERR(xprt))
   2998		return xprt;
   2999	transport = container_of(xprt, struct sock_xprt, xprt);
   3000
   3001	xprt->prot = IPPROTO_TCP;
   3002	xprt->xprt_class = &xs_tcp_transport;
   3003	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
   3004
   3005	xprt->bind_timeout = XS_BIND_TO;
   3006	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
   3007	xprt->idle_timeout = XS_IDLE_DISC_TO;
   3008
   3009	xprt->ops = &xs_tcp_ops;
   3010	xprt->timeout = &xs_tcp_default_timeout;
   3011
   3012	xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
   3013	xprt->connect_timeout = xprt->timeout->to_initval *
   3014		(xprt->timeout->to_retries + 1);
   3015
   3016	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
   3017	INIT_WORK(&transport->error_worker, xs_error_handle);
   3018	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
   3019
   3020	switch (addr->sa_family) {
   3021	case AF_INET:
   3022		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
   3023			xprt_set_bound(xprt);
   3024
   3025		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
   3026		break;
   3027	case AF_INET6:
   3028		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
   3029			xprt_set_bound(xprt);
   3030
   3031		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
   3032		break;
   3033	default:
   3034		ret = ERR_PTR(-EAFNOSUPPORT);
   3035		goto out_err;
   3036	}
   3037
   3038	if (xprt_bound(xprt))
   3039		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
   3040				xprt->address_strings[RPC_DISPLAY_ADDR],
   3041				xprt->address_strings[RPC_DISPLAY_PORT],
   3042				xprt->address_strings[RPC_DISPLAY_PROTO]);
   3043	else
   3044		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
   3045				xprt->address_strings[RPC_DISPLAY_ADDR],
   3046				xprt->address_strings[RPC_DISPLAY_PROTO]);
   3047
   3048	if (try_module_get(THIS_MODULE))
   3049		return xprt;
   3050	ret = ERR_PTR(-EINVAL);
   3051out_err:
   3052	xs_xprt_free(xprt);
   3053	return ret;
   3054}
   3055
   3056/**
   3057 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
   3058 * @args: rpc transport creation arguments
   3059 *
   3060 */
   3061static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
   3062{
   3063	struct sockaddr *addr = args->dstaddr;
   3064	struct rpc_xprt *xprt;
   3065	struct sock_xprt *transport;
   3066	struct svc_sock *bc_sock;
   3067	struct rpc_xprt *ret;
   3068
   3069	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
   3070			xprt_tcp_slot_table_entries);
   3071	if (IS_ERR(xprt))
   3072		return xprt;
   3073	transport = container_of(xprt, struct sock_xprt, xprt);
   3074
   3075	xprt->prot = IPPROTO_TCP;
   3076	xprt->xprt_class = &xs_bc_tcp_transport;
   3077	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
   3078	xprt->timeout = &xs_tcp_default_timeout;
   3079
   3080	/* backchannel */
   3081	xprt_set_bound(xprt);
   3082	xprt->bind_timeout = 0;
   3083	xprt->reestablish_timeout = 0;
   3084	xprt->idle_timeout = 0;
   3085
   3086	xprt->ops = &bc_tcp_ops;
   3087
   3088	switch (addr->sa_family) {
   3089	case AF_INET:
   3090		xs_format_peer_addresses(xprt, "tcp",
   3091					 RPCBIND_NETID_TCP);
   3092		break;
   3093	case AF_INET6:
   3094		xs_format_peer_addresses(xprt, "tcp",
   3095				   RPCBIND_NETID_TCP6);
   3096		break;
   3097	default:
   3098		ret = ERR_PTR(-EAFNOSUPPORT);
   3099		goto out_err;
   3100	}
   3101
   3102	dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
   3103			xprt->address_strings[RPC_DISPLAY_ADDR],
   3104			xprt->address_strings[RPC_DISPLAY_PORT],
   3105			xprt->address_strings[RPC_DISPLAY_PROTO]);
   3106
   3107	/*
   3108	 * Once we've associated a backchannel xprt with a connection,
   3109	 * we want to keep it around as long as the connection lasts,
   3110	 * in case we need to start using it for a backchannel again;
   3111	 * this reference won't be dropped until bc_xprt is destroyed.
   3112	 */
   3113	xprt_get(xprt);
   3114	args->bc_xprt->xpt_bc_xprt = xprt;
   3115	xprt->bc_xprt = args->bc_xprt;
   3116	bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
   3117	transport->sock = bc_sock->sk_sock;
   3118	transport->inet = bc_sock->sk_sk;
   3119
   3120	/*
   3121	 * Since we don't want connections for the backchannel, we set
   3122	 * the xprt status to connected
   3123	 */
   3124	xprt_set_connected(xprt);
   3125
   3126	if (try_module_get(THIS_MODULE))
   3127		return xprt;
   3128
   3129	args->bc_xprt->xpt_bc_xprt = NULL;
   3130	args->bc_xprt->xpt_bc_xps = NULL;
   3131	xprt_put(xprt);
   3132	ret = ERR_PTR(-EINVAL);
   3133out_err:
   3134	xs_xprt_free(xprt);
   3135	return ret;
   3136}
   3137
   3138static struct xprt_class	xs_local_transport = {
   3139	.list		= LIST_HEAD_INIT(xs_local_transport.list),
   3140	.name		= "named UNIX socket",
   3141	.owner		= THIS_MODULE,
   3142	.ident		= XPRT_TRANSPORT_LOCAL,
   3143	.setup		= xs_setup_local,
   3144	.netid		= { "" },
   3145};
   3146
   3147static struct xprt_class	xs_udp_transport = {
   3148	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
   3149	.name		= "udp",
   3150	.owner		= THIS_MODULE,
   3151	.ident		= XPRT_TRANSPORT_UDP,
   3152	.setup		= xs_setup_udp,
   3153	.netid		= { "udp", "udp6", "" },
   3154};
   3155
   3156static struct xprt_class	xs_tcp_transport = {
   3157	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
   3158	.name		= "tcp",
   3159	.owner		= THIS_MODULE,
   3160	.ident		= XPRT_TRANSPORT_TCP,
   3161	.setup		= xs_setup_tcp,
   3162	.netid		= { "tcp", "tcp6", "" },
   3163};
   3164
   3165static struct xprt_class	xs_bc_tcp_transport = {
   3166	.list		= LIST_HEAD_INIT(xs_bc_tcp_transport.list),
   3167	.name		= "tcp NFSv4.1 backchannel",
   3168	.owner		= THIS_MODULE,
   3169	.ident		= XPRT_TRANSPORT_BC_TCP,
   3170	.setup		= xs_setup_bc_tcp,
   3171	.netid		= { "" },
   3172};
   3173
   3174/**
   3175 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
   3176 *
   3177 */
   3178int init_socket_xprt(void)
   3179{
   3180	if (!sunrpc_table_header)
   3181		sunrpc_table_header = register_sysctl_table(sunrpc_table);
   3182
   3183	xprt_register_transport(&xs_local_transport);
   3184	xprt_register_transport(&xs_udp_transport);
   3185	xprt_register_transport(&xs_tcp_transport);
   3186	xprt_register_transport(&xs_bc_tcp_transport);
   3187
   3188	return 0;
   3189}
   3190
   3191/**
   3192 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
   3193 *
   3194 */
   3195void cleanup_socket_xprt(void)
   3196{
   3197	if (sunrpc_table_header) {
   3198		unregister_sysctl_table(sunrpc_table_header);
   3199		sunrpc_table_header = NULL;
   3200	}
   3201
   3202	xprt_unregister_transport(&xs_local_transport);
   3203	xprt_unregister_transport(&xs_udp_transport);
   3204	xprt_unregister_transport(&xs_tcp_transport);
   3205	xprt_unregister_transport(&xs_bc_tcp_transport);
   3206}
   3207
   3208static int param_set_portnr(const char *val, const struct kernel_param *kp)
   3209{
   3210	return param_set_uint_minmax(val, kp,
   3211			RPC_MIN_RESVPORT,
   3212			RPC_MAX_RESVPORT);
   3213}
   3214
   3215static const struct kernel_param_ops param_ops_portnr = {
   3216	.set = param_set_portnr,
   3217	.get = param_get_uint,
   3218};
   3219
   3220#define param_check_portnr(name, p) \
   3221	__param_check(name, p, unsigned int);
   3222
   3223module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
   3224module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
   3225
   3226static int param_set_slot_table_size(const char *val,
   3227				     const struct kernel_param *kp)
   3228{
   3229	return param_set_uint_minmax(val, kp,
   3230			RPC_MIN_SLOT_TABLE,
   3231			RPC_MAX_SLOT_TABLE);
   3232}
   3233
   3234static const struct kernel_param_ops param_ops_slot_table_size = {
   3235	.set = param_set_slot_table_size,
   3236	.get = param_get_uint,
   3237};
   3238
   3239#define param_check_slot_table_size(name, p) \
   3240	__param_check(name, p, unsigned int);
   3241
   3242static int param_set_max_slot_table_size(const char *val,
   3243				     const struct kernel_param *kp)
   3244{
   3245	return param_set_uint_minmax(val, kp,
   3246			RPC_MIN_SLOT_TABLE,
   3247			RPC_MAX_SLOT_TABLE_LIMIT);
   3248}
   3249
   3250static const struct kernel_param_ops param_ops_max_slot_table_size = {
   3251	.set = param_set_max_slot_table_size,
   3252	.get = param_get_uint,
   3253};
   3254
   3255#define param_check_max_slot_table_size(name, p) \
   3256	__param_check(name, p, unsigned int);
   3257
   3258module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
   3259		   slot_table_size, 0644);
   3260module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
   3261		   max_slot_table_size, 0644);
   3262module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
   3263		   slot_table_size, 0644);