cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

clnt.c (75480B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/*
      3 *  linux/net/sunrpc/clnt.c
      4 *
      5 *  This file contains the high-level RPC interface.
      6 *  It is modeled as a finite state machine to support both synchronous
      7 *  and asynchronous requests.
      8 *
      9 *  -	RPC header generation and argument serialization.
     10 *  -	Credential refresh.
     11 *  -	TCP connect handling.
     12 *  -	Retry of operation when it is suspected the operation failed because
     13 *	of uid squashing on the server, or when the credentials were stale
     14 *	and need to be refreshed, or when a packet was damaged in transit.
     15 *	This may be have to be moved to the VFS layer.
     16 *
     17 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
     18 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
     19 */
     20
     21
     22#include <linux/module.h>
     23#include <linux/types.h>
     24#include <linux/kallsyms.h>
     25#include <linux/mm.h>
     26#include <linux/namei.h>
     27#include <linux/mount.h>
     28#include <linux/slab.h>
     29#include <linux/rcupdate.h>
     30#include <linux/utsname.h>
     31#include <linux/workqueue.h>
     32#include <linux/in.h>
     33#include <linux/in6.h>
     34#include <linux/un.h>
     35
     36#include <linux/sunrpc/clnt.h>
     37#include <linux/sunrpc/addr.h>
     38#include <linux/sunrpc/rpc_pipe_fs.h>
     39#include <linux/sunrpc/metrics.h>
     40#include <linux/sunrpc/bc_xprt.h>
     41#include <trace/events/sunrpc.h>
     42
     43#include "sunrpc.h"
     44#include "sysfs.h"
     45#include "netns.h"
     46
     47#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
     48# define RPCDBG_FACILITY	RPCDBG_CALL
     49#endif
     50
     51/*
     52 * All RPC clients are linked into this list
     53 */
     54
     55static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
     56
     57
     58static void	call_start(struct rpc_task *task);
     59static void	call_reserve(struct rpc_task *task);
     60static void	call_reserveresult(struct rpc_task *task);
     61static void	call_allocate(struct rpc_task *task);
     62static void	call_encode(struct rpc_task *task);
     63static void	call_decode(struct rpc_task *task);
     64static void	call_bind(struct rpc_task *task);
     65static void	call_bind_status(struct rpc_task *task);
     66static void	call_transmit(struct rpc_task *task);
     67static void	call_status(struct rpc_task *task);
     68static void	call_transmit_status(struct rpc_task *task);
     69static void	call_refresh(struct rpc_task *task);
     70static void	call_refreshresult(struct rpc_task *task);
     71static void	call_connect(struct rpc_task *task);
     72static void	call_connect_status(struct rpc_task *task);
     73
     74static int	rpc_encode_header(struct rpc_task *task,
     75				  struct xdr_stream *xdr);
     76static int	rpc_decode_header(struct rpc_task *task,
     77				  struct xdr_stream *xdr);
     78static int	rpc_ping(struct rpc_clnt *clnt);
     79static int	rpc_ping_noreply(struct rpc_clnt *clnt);
     80static void	rpc_check_timeout(struct rpc_task *task);
     81
     82static void rpc_register_client(struct rpc_clnt *clnt)
     83{
     84	struct net *net = rpc_net_ns(clnt);
     85	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
     86
     87	spin_lock(&sn->rpc_client_lock);
     88	list_add(&clnt->cl_clients, &sn->all_clients);
     89	spin_unlock(&sn->rpc_client_lock);
     90}
     91
     92static void rpc_unregister_client(struct rpc_clnt *clnt)
     93{
     94	struct net *net = rpc_net_ns(clnt);
     95	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
     96
     97	spin_lock(&sn->rpc_client_lock);
     98	list_del(&clnt->cl_clients);
     99	spin_unlock(&sn->rpc_client_lock);
    100}
    101
    102static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
    103{
    104	rpc_remove_client_dir(clnt);
    105}
    106
    107static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
    108{
    109	struct net *net = rpc_net_ns(clnt);
    110	struct super_block *pipefs_sb;
    111
    112	pipefs_sb = rpc_get_sb_net(net);
    113	if (pipefs_sb) {
    114		__rpc_clnt_remove_pipedir(clnt);
    115		rpc_put_sb_net(net);
    116	}
    117}
    118
    119static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
    120				    struct rpc_clnt *clnt)
    121{
    122	static uint32_t clntid;
    123	const char *dir_name = clnt->cl_program->pipe_dir_name;
    124	char name[15];
    125	struct dentry *dir, *dentry;
    126
    127	dir = rpc_d_lookup_sb(sb, dir_name);
    128	if (dir == NULL) {
    129		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
    130		return dir;
    131	}
    132	for (;;) {
    133		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
    134		name[sizeof(name) - 1] = '\0';
    135		dentry = rpc_create_client_dir(dir, name, clnt);
    136		if (!IS_ERR(dentry))
    137			break;
    138		if (dentry == ERR_PTR(-EEXIST))
    139			continue;
    140		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
    141				" %s/%s, error %ld\n",
    142				dir_name, name, PTR_ERR(dentry));
    143		break;
    144	}
    145	dput(dir);
    146	return dentry;
    147}
    148
    149static int
    150rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
    151{
    152	struct dentry *dentry;
    153
    154	if (clnt->cl_program->pipe_dir_name != NULL) {
    155		dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
    156		if (IS_ERR(dentry))
    157			return PTR_ERR(dentry);
    158	}
    159	return 0;
    160}
    161
    162static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
    163{
    164	if (clnt->cl_program->pipe_dir_name == NULL)
    165		return 1;
    166
    167	switch (event) {
    168	case RPC_PIPEFS_MOUNT:
    169		if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
    170			return 1;
    171		if (refcount_read(&clnt->cl_count) == 0)
    172			return 1;
    173		break;
    174	case RPC_PIPEFS_UMOUNT:
    175		if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
    176			return 1;
    177		break;
    178	}
    179	return 0;
    180}
    181
    182static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
    183				   struct super_block *sb)
    184{
    185	struct dentry *dentry;
    186
    187	switch (event) {
    188	case RPC_PIPEFS_MOUNT:
    189		dentry = rpc_setup_pipedir_sb(sb, clnt);
    190		if (!dentry)
    191			return -ENOENT;
    192		if (IS_ERR(dentry))
    193			return PTR_ERR(dentry);
    194		break;
    195	case RPC_PIPEFS_UMOUNT:
    196		__rpc_clnt_remove_pipedir(clnt);
    197		break;
    198	default:
    199		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
    200		return -ENOTSUPP;
    201	}
    202	return 0;
    203}
    204
    205static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
    206				struct super_block *sb)
    207{
    208	int error = 0;
    209
    210	for (;; clnt = clnt->cl_parent) {
    211		if (!rpc_clnt_skip_event(clnt, event))
    212			error = __rpc_clnt_handle_event(clnt, event, sb);
    213		if (error || clnt == clnt->cl_parent)
    214			break;
    215	}
    216	return error;
    217}
    218
    219static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
    220{
    221	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
    222	struct rpc_clnt *clnt;
    223
    224	spin_lock(&sn->rpc_client_lock);
    225	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
    226		if (rpc_clnt_skip_event(clnt, event))
    227			continue;
    228		spin_unlock(&sn->rpc_client_lock);
    229		return clnt;
    230	}
    231	spin_unlock(&sn->rpc_client_lock);
    232	return NULL;
    233}
    234
    235static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
    236			    void *ptr)
    237{
    238	struct super_block *sb = ptr;
    239	struct rpc_clnt *clnt;
    240	int error = 0;
    241
    242	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
    243		error = __rpc_pipefs_event(clnt, event, sb);
    244		if (error)
    245			break;
    246	}
    247	return error;
    248}
    249
    250static struct notifier_block rpc_clients_block = {
    251	.notifier_call	= rpc_pipefs_event,
    252	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
    253};
    254
    255int rpc_clients_notifier_register(void)
    256{
    257	return rpc_pipefs_notifier_register(&rpc_clients_block);
    258}
    259
    260void rpc_clients_notifier_unregister(void)
    261{
    262	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
    263}
    264
    265static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
    266		struct rpc_xprt *xprt,
    267		const struct rpc_timeout *timeout)
    268{
    269	struct rpc_xprt *old;
    270
    271	spin_lock(&clnt->cl_lock);
    272	old = rcu_dereference_protected(clnt->cl_xprt,
    273			lockdep_is_held(&clnt->cl_lock));
    274
    275	if (!xprt_bound(xprt))
    276		clnt->cl_autobind = 1;
    277
    278	clnt->cl_timeout = timeout;
    279	rcu_assign_pointer(clnt->cl_xprt, xprt);
    280	spin_unlock(&clnt->cl_lock);
    281
    282	return old;
    283}
    284
    285static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
    286{
    287	clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
    288			nodename, sizeof(clnt->cl_nodename));
    289}
    290
    291static int rpc_client_register(struct rpc_clnt *clnt,
    292			       rpc_authflavor_t pseudoflavor,
    293			       const char *client_name)
    294{
    295	struct rpc_auth_create_args auth_args = {
    296		.pseudoflavor = pseudoflavor,
    297		.target_name = client_name,
    298	};
    299	struct rpc_auth *auth;
    300	struct net *net = rpc_net_ns(clnt);
    301	struct super_block *pipefs_sb;
    302	int err;
    303
    304	rpc_clnt_debugfs_register(clnt);
    305
    306	pipefs_sb = rpc_get_sb_net(net);
    307	if (pipefs_sb) {
    308		err = rpc_setup_pipedir(pipefs_sb, clnt);
    309		if (err)
    310			goto out;
    311	}
    312
    313	rpc_register_client(clnt);
    314	if (pipefs_sb)
    315		rpc_put_sb_net(net);
    316
    317	auth = rpcauth_create(&auth_args, clnt);
    318	if (IS_ERR(auth)) {
    319		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
    320				pseudoflavor);
    321		err = PTR_ERR(auth);
    322		goto err_auth;
    323	}
    324	return 0;
    325err_auth:
    326	pipefs_sb = rpc_get_sb_net(net);
    327	rpc_unregister_client(clnt);
    328	__rpc_clnt_remove_pipedir(clnt);
    329out:
    330	if (pipefs_sb)
    331		rpc_put_sb_net(net);
    332	rpc_sysfs_client_destroy(clnt);
    333	rpc_clnt_debugfs_unregister(clnt);
    334	return err;
    335}
    336
    337static DEFINE_IDA(rpc_clids);
    338
    339void rpc_cleanup_clids(void)
    340{
    341	ida_destroy(&rpc_clids);
    342}
    343
    344static int rpc_alloc_clid(struct rpc_clnt *clnt)
    345{
    346	int clid;
    347
    348	clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
    349	if (clid < 0)
    350		return clid;
    351	clnt->cl_clid = clid;
    352	return 0;
    353}
    354
    355static void rpc_free_clid(struct rpc_clnt *clnt)
    356{
    357	ida_simple_remove(&rpc_clids, clnt->cl_clid);
    358}
    359
    360static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
    361		struct rpc_xprt_switch *xps,
    362		struct rpc_xprt *xprt,
    363		struct rpc_clnt *parent)
    364{
    365	const struct rpc_program *program = args->program;
    366	const struct rpc_version *version;
    367	struct rpc_clnt *clnt = NULL;
    368	const struct rpc_timeout *timeout;
    369	const char *nodename = args->nodename;
    370	int err;
    371
    372	err = rpciod_up();
    373	if (err)
    374		goto out_no_rpciod;
    375
    376	err = -EINVAL;
    377	if (args->version >= program->nrvers)
    378		goto out_err;
    379	version = program->version[args->version];
    380	if (version == NULL)
    381		goto out_err;
    382
    383	err = -ENOMEM;
    384	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
    385	if (!clnt)
    386		goto out_err;
    387	clnt->cl_parent = parent ? : clnt;
    388
    389	err = rpc_alloc_clid(clnt);
    390	if (err)
    391		goto out_no_clid;
    392
    393	clnt->cl_cred	  = get_cred(args->cred);
    394	clnt->cl_procinfo = version->procs;
    395	clnt->cl_maxproc  = version->nrprocs;
    396	clnt->cl_prog     = args->prognumber ? : program->number;
    397	clnt->cl_vers     = version->number;
    398	clnt->cl_stats    = program->stats;
    399	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
    400	rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
    401	err = -ENOMEM;
    402	if (clnt->cl_metrics == NULL)
    403		goto out_no_stats;
    404	clnt->cl_program  = program;
    405	INIT_LIST_HEAD(&clnt->cl_tasks);
    406	spin_lock_init(&clnt->cl_lock);
    407
    408	timeout = xprt->timeout;
    409	if (args->timeout != NULL) {
    410		memcpy(&clnt->cl_timeout_default, args->timeout,
    411				sizeof(clnt->cl_timeout_default));
    412		timeout = &clnt->cl_timeout_default;
    413	}
    414
    415	rpc_clnt_set_transport(clnt, xprt, timeout);
    416	xprt->main = true;
    417	xprt_iter_init(&clnt->cl_xpi, xps);
    418	xprt_switch_put(xps);
    419
    420	clnt->cl_rtt = &clnt->cl_rtt_default;
    421	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
    422
    423	refcount_set(&clnt->cl_count, 1);
    424
    425	if (nodename == NULL)
    426		nodename = utsname()->nodename;
    427	/* save the nodename */
    428	rpc_clnt_set_nodename(clnt, nodename);
    429
    430	rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
    431	err = rpc_client_register(clnt, args->authflavor, args->client_name);
    432	if (err)
    433		goto out_no_path;
    434	if (parent)
    435		refcount_inc(&parent->cl_count);
    436
    437	trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
    438	return clnt;
    439
    440out_no_path:
    441	rpc_free_iostats(clnt->cl_metrics);
    442out_no_stats:
    443	put_cred(clnt->cl_cred);
    444	rpc_free_clid(clnt);
    445out_no_clid:
    446	kfree(clnt);
    447out_err:
    448	rpciod_down();
    449out_no_rpciod:
    450	xprt_switch_put(xps);
    451	xprt_put(xprt);
    452	trace_rpc_clnt_new_err(program->name, args->servername, err);
    453	return ERR_PTR(err);
    454}
    455
    456static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
    457					struct rpc_xprt *xprt)
    458{
    459	struct rpc_clnt *clnt = NULL;
    460	struct rpc_xprt_switch *xps;
    461
    462	if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
    463		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
    464		xps = args->bc_xprt->xpt_bc_xps;
    465		xprt_switch_get(xps);
    466	} else {
    467		xps = xprt_switch_alloc(xprt, GFP_KERNEL);
    468		if (xps == NULL) {
    469			xprt_put(xprt);
    470			return ERR_PTR(-ENOMEM);
    471		}
    472		if (xprt->bc_xprt) {
    473			xprt_switch_get(xps);
    474			xprt->bc_xprt->xpt_bc_xps = xps;
    475		}
    476	}
    477	clnt = rpc_new_client(args, xps, xprt, NULL);
    478	if (IS_ERR(clnt))
    479		return clnt;
    480
    481	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
    482		int err = rpc_ping(clnt);
    483		if (err != 0) {
    484			rpc_shutdown_client(clnt);
    485			return ERR_PTR(err);
    486		}
    487	} else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
    488		int err = rpc_ping_noreply(clnt);
    489		if (err != 0) {
    490			rpc_shutdown_client(clnt);
    491			return ERR_PTR(err);
    492		}
    493	}
    494
    495	clnt->cl_softrtry = 1;
    496	if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
    497		clnt->cl_softrtry = 0;
    498		if (args->flags & RPC_CLNT_CREATE_SOFTERR)
    499			clnt->cl_softerr = 1;
    500	}
    501
    502	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
    503		clnt->cl_autobind = 1;
    504	if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
    505		clnt->cl_noretranstimeo = 1;
    506	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
    507		clnt->cl_discrtry = 1;
    508	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
    509		clnt->cl_chatty = 1;
    510
    511	return clnt;
    512}
    513
    514/**
    515 * rpc_create - create an RPC client and transport with one call
    516 * @args: rpc_clnt create argument structure
    517 *
    518 * Creates and initializes an RPC transport and an RPC client.
    519 *
    520 * It can ping the server in order to determine if it is up, and to see if
    521 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
    522 * this behavior so asynchronous tasks can also use rpc_create.
    523 */
    524struct rpc_clnt *rpc_create(struct rpc_create_args *args)
    525{
    526	struct rpc_xprt *xprt;
    527	struct xprt_create xprtargs = {
    528		.net = args->net,
    529		.ident = args->protocol,
    530		.srcaddr = args->saddress,
    531		.dstaddr = args->address,
    532		.addrlen = args->addrsize,
    533		.servername = args->servername,
    534		.bc_xprt = args->bc_xprt,
    535	};
    536	char servername[48];
    537	struct rpc_clnt *clnt;
    538	int i;
    539
    540	if (args->bc_xprt) {
    541		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
    542		xprt = args->bc_xprt->xpt_bc_xprt;
    543		if (xprt) {
    544			xprt_get(xprt);
    545			return rpc_create_xprt(args, xprt);
    546		}
    547	}
    548
    549	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
    550		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
    551	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
    552		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
    553	/*
    554	 * If the caller chooses not to specify a hostname, whip
    555	 * up a string representation of the passed-in address.
    556	 */
    557	if (xprtargs.servername == NULL) {
    558		struct sockaddr_un *sun =
    559				(struct sockaddr_un *)args->address;
    560		struct sockaddr_in *sin =
    561				(struct sockaddr_in *)args->address;
    562		struct sockaddr_in6 *sin6 =
    563				(struct sockaddr_in6 *)args->address;
    564
    565		servername[0] = '\0';
    566		switch (args->address->sa_family) {
    567		case AF_LOCAL:
    568			snprintf(servername, sizeof(servername), "%s",
    569				 sun->sun_path);
    570			break;
    571		case AF_INET:
    572			snprintf(servername, sizeof(servername), "%pI4",
    573				 &sin->sin_addr.s_addr);
    574			break;
    575		case AF_INET6:
    576			snprintf(servername, sizeof(servername), "%pI6",
    577				 &sin6->sin6_addr);
    578			break;
    579		default:
    580			/* caller wants default server name, but
    581			 * address family isn't recognized. */
    582			return ERR_PTR(-EINVAL);
    583		}
    584		xprtargs.servername = servername;
    585	}
    586
    587	xprt = xprt_create_transport(&xprtargs);
    588	if (IS_ERR(xprt))
    589		return (struct rpc_clnt *)xprt;
    590
    591	/*
    592	 * By default, kernel RPC client connects from a reserved port.
    593	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
    594	 * but it is always enabled for rpciod, which handles the connect
    595	 * operation.
    596	 */
    597	xprt->resvport = 1;
    598	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
    599		xprt->resvport = 0;
    600	xprt->reuseport = 0;
    601	if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
    602		xprt->reuseport = 1;
    603
    604	clnt = rpc_create_xprt(args, xprt);
    605	if (IS_ERR(clnt) || args->nconnect <= 1)
    606		return clnt;
    607
    608	for (i = 0; i < args->nconnect - 1; i++) {
    609		if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
    610			break;
    611	}
    612	return clnt;
    613}
    614EXPORT_SYMBOL_GPL(rpc_create);
    615
    616/*
    617 * This function clones the RPC client structure. It allows us to share the
    618 * same transport while varying parameters such as the authentication
    619 * flavour.
    620 */
    621static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
    622					   struct rpc_clnt *clnt)
    623{
    624	struct rpc_xprt_switch *xps;
    625	struct rpc_xprt *xprt;
    626	struct rpc_clnt *new;
    627	int err;
    628
    629	err = -ENOMEM;
    630	rcu_read_lock();
    631	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
    632	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
    633	rcu_read_unlock();
    634	if (xprt == NULL || xps == NULL) {
    635		xprt_put(xprt);
    636		xprt_switch_put(xps);
    637		goto out_err;
    638	}
    639	args->servername = xprt->servername;
    640	args->nodename = clnt->cl_nodename;
    641
    642	new = rpc_new_client(args, xps, xprt, clnt);
    643	if (IS_ERR(new))
    644		return new;
    645
    646	/* Turn off autobind on clones */
    647	new->cl_autobind = 0;
    648	new->cl_softrtry = clnt->cl_softrtry;
    649	new->cl_softerr = clnt->cl_softerr;
    650	new->cl_noretranstimeo = clnt->cl_noretranstimeo;
    651	new->cl_discrtry = clnt->cl_discrtry;
    652	new->cl_chatty = clnt->cl_chatty;
    653	new->cl_principal = clnt->cl_principal;
    654	new->cl_max_connect = clnt->cl_max_connect;
    655	return new;
    656
    657out_err:
    658	trace_rpc_clnt_clone_err(clnt, err);
    659	return ERR_PTR(err);
    660}
    661
    662/**
    663 * rpc_clone_client - Clone an RPC client structure
    664 *
    665 * @clnt: RPC client whose parameters are copied
    666 *
    667 * Returns a fresh RPC client or an ERR_PTR.
    668 */
    669struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
    670{
    671	struct rpc_create_args args = {
    672		.program	= clnt->cl_program,
    673		.prognumber	= clnt->cl_prog,
    674		.version	= clnt->cl_vers,
    675		.authflavor	= clnt->cl_auth->au_flavor,
    676		.cred		= clnt->cl_cred,
    677	};
    678	return __rpc_clone_client(&args, clnt);
    679}
    680EXPORT_SYMBOL_GPL(rpc_clone_client);
    681
    682/**
    683 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
    684 *
    685 * @clnt: RPC client whose parameters are copied
    686 * @flavor: security flavor for new client
    687 *
    688 * Returns a fresh RPC client or an ERR_PTR.
    689 */
    690struct rpc_clnt *
    691rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
    692{
    693	struct rpc_create_args args = {
    694		.program	= clnt->cl_program,
    695		.prognumber	= clnt->cl_prog,
    696		.version	= clnt->cl_vers,
    697		.authflavor	= flavor,
    698		.cred		= clnt->cl_cred,
    699	};
    700	return __rpc_clone_client(&args, clnt);
    701}
    702EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
    703
    704/**
    705 * rpc_switch_client_transport: switch the RPC transport on the fly
    706 * @clnt: pointer to a struct rpc_clnt
    707 * @args: pointer to the new transport arguments
    708 * @timeout: pointer to the new timeout parameters
    709 *
    710 * This function allows the caller to switch the RPC transport for the
    711 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
    712 * server, for instance.  It assumes that the caller has ensured that
    713 * there are no active RPC tasks by using some form of locking.
    714 *
    715 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
    716 * negative errno is returned, and "clnt" continues to use the old
    717 * xprt.
    718 */
    719int rpc_switch_client_transport(struct rpc_clnt *clnt,
    720		struct xprt_create *args,
    721		const struct rpc_timeout *timeout)
    722{
    723	const struct rpc_timeout *old_timeo;
    724	rpc_authflavor_t pseudoflavor;
    725	struct rpc_xprt_switch *xps, *oldxps;
    726	struct rpc_xprt *xprt, *old;
    727	struct rpc_clnt *parent;
    728	int err;
    729
    730	xprt = xprt_create_transport(args);
    731	if (IS_ERR(xprt))
    732		return PTR_ERR(xprt);
    733
    734	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
    735	if (xps == NULL) {
    736		xprt_put(xprt);
    737		return -ENOMEM;
    738	}
    739
    740	pseudoflavor = clnt->cl_auth->au_flavor;
    741
    742	old_timeo = clnt->cl_timeout;
    743	old = rpc_clnt_set_transport(clnt, xprt, timeout);
    744	oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
    745
    746	rpc_unregister_client(clnt);
    747	__rpc_clnt_remove_pipedir(clnt);
    748	rpc_sysfs_client_destroy(clnt);
    749	rpc_clnt_debugfs_unregister(clnt);
    750
    751	/*
    752	 * A new transport was created.  "clnt" therefore
    753	 * becomes the root of a new cl_parent tree.  clnt's
    754	 * children, if it has any, still point to the old xprt.
    755	 */
    756	parent = clnt->cl_parent;
    757	clnt->cl_parent = clnt;
    758
    759	/*
    760	 * The old rpc_auth cache cannot be re-used.  GSS
    761	 * contexts in particular are between a single
    762	 * client and server.
    763	 */
    764	err = rpc_client_register(clnt, pseudoflavor, NULL);
    765	if (err)
    766		goto out_revert;
    767
    768	synchronize_rcu();
    769	if (parent != clnt)
    770		rpc_release_client(parent);
    771	xprt_switch_put(oldxps);
    772	xprt_put(old);
    773	trace_rpc_clnt_replace_xprt(clnt);
    774	return 0;
    775
    776out_revert:
    777	xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
    778	rpc_clnt_set_transport(clnt, old, old_timeo);
    779	clnt->cl_parent = parent;
    780	rpc_client_register(clnt, pseudoflavor, NULL);
    781	xprt_switch_put(xps);
    782	xprt_put(xprt);
    783	trace_rpc_clnt_replace_xprt_err(clnt);
    784	return err;
    785}
    786EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
    787
    788static
    789int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
    790{
    791	struct rpc_xprt_switch *xps;
    792
    793	rcu_read_lock();
    794	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
    795	rcu_read_unlock();
    796	if (xps == NULL)
    797		return -EAGAIN;
    798	xprt_iter_init_listall(xpi, xps);
    799	xprt_switch_put(xps);
    800	return 0;
    801}
    802
    803/**
    804 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
    805 * @clnt: pointer to client
    806 * @fn: function to apply
    807 * @data: void pointer to function data
    808 *
    809 * Iterates through the list of RPC transports currently attached to the
    810 * client and applies the function fn(clnt, xprt, data).
    811 *
    812 * On error, the iteration stops, and the function returns the error value.
    813 */
    814int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
    815		int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
    816		void *data)
    817{
    818	struct rpc_xprt_iter xpi;
    819	int ret;
    820
    821	ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
    822	if (ret)
    823		return ret;
    824	for (;;) {
    825		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
    826
    827		if (!xprt)
    828			break;
    829		ret = fn(clnt, xprt, data);
    830		xprt_put(xprt);
    831		if (ret < 0)
    832			break;
    833	}
    834	xprt_iter_destroy(&xpi);
    835	return ret;
    836}
    837EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
    838
    839/*
    840 * Kill all tasks for the given client.
    841 * XXX: kill their descendants as well?
    842 */
    843void rpc_killall_tasks(struct rpc_clnt *clnt)
    844{
    845	struct rpc_task	*rovr;
    846
    847
    848	if (list_empty(&clnt->cl_tasks))
    849		return;
    850
    851	/*
    852	 * Spin lock all_tasks to prevent changes...
    853	 */
    854	trace_rpc_clnt_killall(clnt);
    855	spin_lock(&clnt->cl_lock);
    856	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
    857		rpc_signal_task(rovr);
    858	spin_unlock(&clnt->cl_lock);
    859}
    860EXPORT_SYMBOL_GPL(rpc_killall_tasks);
    861
    862/*
    863 * Properly shut down an RPC client, terminating all outstanding
    864 * requests.
    865 */
    866void rpc_shutdown_client(struct rpc_clnt *clnt)
    867{
    868	might_sleep();
    869
    870	trace_rpc_clnt_shutdown(clnt);
    871
    872	while (!list_empty(&clnt->cl_tasks)) {
    873		rpc_killall_tasks(clnt);
    874		wait_event_timeout(destroy_wait,
    875			list_empty(&clnt->cl_tasks), 1*HZ);
    876	}
    877
    878	rpc_release_client(clnt);
    879}
    880EXPORT_SYMBOL_GPL(rpc_shutdown_client);
    881
    882/*
    883 * Free an RPC client
    884 */
    885static void rpc_free_client_work(struct work_struct *work)
    886{
    887	struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
    888
    889	trace_rpc_clnt_free(clnt);
    890
    891	/* These might block on processes that might allocate memory,
    892	 * so they cannot be called in rpciod, so they are handled separately
    893	 * here.
    894	 */
    895	rpc_sysfs_client_destroy(clnt);
    896	rpc_clnt_debugfs_unregister(clnt);
    897	rpc_free_clid(clnt);
    898	rpc_clnt_remove_pipedir(clnt);
    899	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
    900
    901	kfree(clnt);
    902	rpciod_down();
    903}
    904static struct rpc_clnt *
    905rpc_free_client(struct rpc_clnt *clnt)
    906{
    907	struct rpc_clnt *parent = NULL;
    908
    909	trace_rpc_clnt_release(clnt);
    910	if (clnt->cl_parent != clnt)
    911		parent = clnt->cl_parent;
    912	rpc_unregister_client(clnt);
    913	rpc_free_iostats(clnt->cl_metrics);
    914	clnt->cl_metrics = NULL;
    915	xprt_iter_destroy(&clnt->cl_xpi);
    916	put_cred(clnt->cl_cred);
    917
    918	INIT_WORK(&clnt->cl_work, rpc_free_client_work);
    919	schedule_work(&clnt->cl_work);
    920	return parent;
    921}
    922
    923/*
    924 * Free an RPC client
    925 */
    926static struct rpc_clnt *
    927rpc_free_auth(struct rpc_clnt *clnt)
    928{
    929	/*
    930	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
    931	 *       release remaining GSS contexts. This mechanism ensures
    932	 *       that it can do so safely.
    933	 */
    934	if (clnt->cl_auth != NULL) {
    935		rpcauth_release(clnt->cl_auth);
    936		clnt->cl_auth = NULL;
    937	}
    938	if (refcount_dec_and_test(&clnt->cl_count))
    939		return rpc_free_client(clnt);
    940	return NULL;
    941}
    942
    943/*
    944 * Release reference to the RPC client
    945 */
    946void
    947rpc_release_client(struct rpc_clnt *clnt)
    948{
    949	do {
    950		if (list_empty(&clnt->cl_tasks))
    951			wake_up(&destroy_wait);
    952		if (refcount_dec_not_one(&clnt->cl_count))
    953			break;
    954		clnt = rpc_free_auth(clnt);
    955	} while (clnt != NULL);
    956}
    957EXPORT_SYMBOL_GPL(rpc_release_client);
    958
    959/**
    960 * rpc_bind_new_program - bind a new RPC program to an existing client
    961 * @old: old rpc_client
    962 * @program: rpc program to set
    963 * @vers: rpc program version
    964 *
    965 * Clones the rpc client and sets up a new RPC program. This is mainly
    966 * of use for enabling different RPC programs to share the same transport.
    967 * The Sun NFSv2/v3 ACL protocol can do this.
    968 */
    969struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
    970				      const struct rpc_program *program,
    971				      u32 vers)
    972{
    973	struct rpc_create_args args = {
    974		.program	= program,
    975		.prognumber	= program->number,
    976		.version	= vers,
    977		.authflavor	= old->cl_auth->au_flavor,
    978		.cred		= old->cl_cred,
    979	};
    980	struct rpc_clnt *clnt;
    981	int err;
    982
    983	clnt = __rpc_clone_client(&args, old);
    984	if (IS_ERR(clnt))
    985		goto out;
    986	err = rpc_ping(clnt);
    987	if (err != 0) {
    988		rpc_shutdown_client(clnt);
    989		clnt = ERR_PTR(err);
    990	}
    991out:
    992	return clnt;
    993}
    994EXPORT_SYMBOL_GPL(rpc_bind_new_program);
    995
    996struct rpc_xprt *
    997rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
    998{
    999	struct rpc_xprt_switch *xps;
   1000
   1001	if (!xprt)
   1002		return NULL;
   1003	rcu_read_lock();
   1004	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
   1005	atomic_long_inc(&xps->xps_queuelen);
   1006	rcu_read_unlock();
   1007	atomic_long_inc(&xprt->queuelen);
   1008
   1009	return xprt;
   1010}
   1011
   1012static void
   1013rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
   1014{
   1015	struct rpc_xprt_switch *xps;
   1016
   1017	atomic_long_dec(&xprt->queuelen);
   1018	rcu_read_lock();
   1019	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
   1020	atomic_long_dec(&xps->xps_queuelen);
   1021	rcu_read_unlock();
   1022
   1023	xprt_put(xprt);
   1024}
   1025
   1026void rpc_task_release_transport(struct rpc_task *task)
   1027{
   1028	struct rpc_xprt *xprt = task->tk_xprt;
   1029
   1030	if (xprt) {
   1031		task->tk_xprt = NULL;
   1032		if (task->tk_client)
   1033			rpc_task_release_xprt(task->tk_client, xprt);
   1034		else
   1035			xprt_put(xprt);
   1036	}
   1037}
   1038EXPORT_SYMBOL_GPL(rpc_task_release_transport);
   1039
   1040void rpc_task_release_client(struct rpc_task *task)
   1041{
   1042	struct rpc_clnt *clnt = task->tk_client;
   1043
   1044	rpc_task_release_transport(task);
   1045	if (clnt != NULL) {
   1046		/* Remove from client task list */
   1047		spin_lock(&clnt->cl_lock);
   1048		list_del(&task->tk_task);
   1049		spin_unlock(&clnt->cl_lock);
   1050		task->tk_client = NULL;
   1051
   1052		rpc_release_client(clnt);
   1053	}
   1054}
   1055
   1056static struct rpc_xprt *
   1057rpc_task_get_first_xprt(struct rpc_clnt *clnt)
   1058{
   1059	struct rpc_xprt *xprt;
   1060
   1061	rcu_read_lock();
   1062	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
   1063	rcu_read_unlock();
   1064	return rpc_task_get_xprt(clnt, xprt);
   1065}
   1066
   1067static struct rpc_xprt *
   1068rpc_task_get_next_xprt(struct rpc_clnt *clnt)
   1069{
   1070	return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
   1071}
   1072
   1073static
   1074void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
   1075{
   1076	if (task->tk_xprt) {
   1077		if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
   1078		      (task->tk_flags & RPC_TASK_MOVEABLE)))
   1079			return;
   1080		xprt_release(task);
   1081		xprt_put(task->tk_xprt);
   1082	}
   1083	if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
   1084		task->tk_xprt = rpc_task_get_first_xprt(clnt);
   1085	else
   1086		task->tk_xprt = rpc_task_get_next_xprt(clnt);
   1087}
   1088
   1089static
   1090void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
   1091{
   1092	rpc_task_set_transport(task, clnt);
   1093	task->tk_client = clnt;
   1094	refcount_inc(&clnt->cl_count);
   1095	if (clnt->cl_softrtry)
   1096		task->tk_flags |= RPC_TASK_SOFT;
   1097	if (clnt->cl_softerr)
   1098		task->tk_flags |= RPC_TASK_TIMEOUT;
   1099	if (clnt->cl_noretranstimeo)
   1100		task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
   1101	/* Add to the client's list of all tasks */
   1102	spin_lock(&clnt->cl_lock);
   1103	list_add_tail(&task->tk_task, &clnt->cl_tasks);
   1104	spin_unlock(&clnt->cl_lock);
   1105}
   1106
   1107static void
   1108rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
   1109{
   1110	if (msg != NULL) {
   1111		task->tk_msg.rpc_proc = msg->rpc_proc;
   1112		task->tk_msg.rpc_argp = msg->rpc_argp;
   1113		task->tk_msg.rpc_resp = msg->rpc_resp;
   1114		task->tk_msg.rpc_cred = msg->rpc_cred;
   1115		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
   1116			get_cred(task->tk_msg.rpc_cred);
   1117	}
   1118}
   1119
   1120/*
   1121 * Default callback for async RPC calls
   1122 */
   1123static void
   1124rpc_default_callback(struct rpc_task *task, void *data)
   1125{
   1126}
   1127
   1128static const struct rpc_call_ops rpc_default_ops = {
   1129	.rpc_call_done = rpc_default_callback,
   1130};
   1131
   1132/**
   1133 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
   1134 * @task_setup_data: pointer to task initialisation data
   1135 */
   1136struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
   1137{
   1138	struct rpc_task *task;
   1139
   1140	task = rpc_new_task(task_setup_data);
   1141	if (IS_ERR(task))
   1142		return task;
   1143
   1144	if (!RPC_IS_ASYNC(task))
   1145		task->tk_flags |= RPC_TASK_CRED_NOREF;
   1146
   1147	rpc_task_set_client(task, task_setup_data->rpc_client);
   1148	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
   1149
   1150	if (task->tk_action == NULL)
   1151		rpc_call_start(task);
   1152
   1153	atomic_inc(&task->tk_count);
   1154	rpc_execute(task);
   1155	return task;
   1156}
   1157EXPORT_SYMBOL_GPL(rpc_run_task);
   1158
   1159/**
   1160 * rpc_call_sync - Perform a synchronous RPC call
   1161 * @clnt: pointer to RPC client
   1162 * @msg: RPC call parameters
   1163 * @flags: RPC call flags
   1164 */
   1165int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
   1166{
   1167	struct rpc_task	*task;
   1168	struct rpc_task_setup task_setup_data = {
   1169		.rpc_client = clnt,
   1170		.rpc_message = msg,
   1171		.callback_ops = &rpc_default_ops,
   1172		.flags = flags,
   1173	};
   1174	int status;
   1175
   1176	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
   1177	if (flags & RPC_TASK_ASYNC) {
   1178		rpc_release_calldata(task_setup_data.callback_ops,
   1179			task_setup_data.callback_data);
   1180		return -EINVAL;
   1181	}
   1182
   1183	task = rpc_run_task(&task_setup_data);
   1184	if (IS_ERR(task))
   1185		return PTR_ERR(task);
   1186	status = task->tk_status;
   1187	rpc_put_task(task);
   1188	return status;
   1189}
   1190EXPORT_SYMBOL_GPL(rpc_call_sync);
   1191
   1192/**
   1193 * rpc_call_async - Perform an asynchronous RPC call
   1194 * @clnt: pointer to RPC client
   1195 * @msg: RPC call parameters
   1196 * @flags: RPC call flags
   1197 * @tk_ops: RPC call ops
   1198 * @data: user call data
   1199 */
   1200int
   1201rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
   1202	       const struct rpc_call_ops *tk_ops, void *data)
   1203{
   1204	struct rpc_task	*task;
   1205	struct rpc_task_setup task_setup_data = {
   1206		.rpc_client = clnt,
   1207		.rpc_message = msg,
   1208		.callback_ops = tk_ops,
   1209		.callback_data = data,
   1210		.flags = flags|RPC_TASK_ASYNC,
   1211	};
   1212
   1213	task = rpc_run_task(&task_setup_data);
   1214	if (IS_ERR(task))
   1215		return PTR_ERR(task);
   1216	rpc_put_task(task);
   1217	return 0;
   1218}
   1219EXPORT_SYMBOL_GPL(rpc_call_async);
   1220
   1221#if defined(CONFIG_SUNRPC_BACKCHANNEL)
   1222static void call_bc_encode(struct rpc_task *task);
   1223
   1224/**
   1225 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
   1226 * rpc_execute against it
   1227 * @req: RPC request
   1228 */
   1229struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
   1230{
   1231	struct rpc_task *task;
   1232	struct rpc_task_setup task_setup_data = {
   1233		.callback_ops = &rpc_default_ops,
   1234		.flags = RPC_TASK_SOFTCONN |
   1235			RPC_TASK_NO_RETRANS_TIMEOUT,
   1236	};
   1237
   1238	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
   1239	/*
   1240	 * Create an rpc_task to send the data
   1241	 */
   1242	task = rpc_new_task(&task_setup_data);
   1243	if (IS_ERR(task)) {
   1244		xprt_free_bc_request(req);
   1245		return task;
   1246	}
   1247
   1248	xprt_init_bc_request(req, task);
   1249
   1250	task->tk_action = call_bc_encode;
   1251	atomic_inc(&task->tk_count);
   1252	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
   1253	rpc_execute(task);
   1254
   1255	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
   1256	return task;
   1257}
   1258#endif /* CONFIG_SUNRPC_BACKCHANNEL */
   1259
   1260/**
   1261 * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
   1262 * @req: RPC request to prepare
   1263 * @pages: vector of struct page pointers
   1264 * @base: offset in first page where receive should start, in bytes
   1265 * @len: expected size of the upper layer data payload, in bytes
   1266 * @hdrsize: expected size of upper layer reply header, in XDR words
   1267 *
   1268 */
   1269void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
   1270			     unsigned int base, unsigned int len,
   1271			     unsigned int hdrsize)
   1272{
   1273	hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
   1274
   1275	xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
   1276	trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
   1277}
   1278EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
   1279
   1280void
   1281rpc_call_start(struct rpc_task *task)
   1282{
   1283	task->tk_action = call_start;
   1284}
   1285EXPORT_SYMBOL_GPL(rpc_call_start);
   1286
   1287/**
   1288 * rpc_peeraddr - extract remote peer address from clnt's xprt
   1289 * @clnt: RPC client structure
   1290 * @buf: target buffer
   1291 * @bufsize: length of target buffer
   1292 *
   1293 * Returns the number of bytes that are actually in the stored address.
   1294 */
   1295size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
   1296{
   1297	size_t bytes;
   1298	struct rpc_xprt *xprt;
   1299
   1300	rcu_read_lock();
   1301	xprt = rcu_dereference(clnt->cl_xprt);
   1302
   1303	bytes = xprt->addrlen;
   1304	if (bytes > bufsize)
   1305		bytes = bufsize;
   1306	memcpy(buf, &xprt->addr, bytes);
   1307	rcu_read_unlock();
   1308
   1309	return bytes;
   1310}
   1311EXPORT_SYMBOL_GPL(rpc_peeraddr);
   1312
   1313/**
   1314 * rpc_peeraddr2str - return remote peer address in printable format
   1315 * @clnt: RPC client structure
   1316 * @format: address format
   1317 *
   1318 * NB: the lifetime of the memory referenced by the returned pointer is
   1319 * the same as the rpc_xprt itself.  As long as the caller uses this
   1320 * pointer, it must hold the RCU read lock.
   1321 */
   1322const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
   1323			     enum rpc_display_format_t format)
   1324{
   1325	struct rpc_xprt *xprt;
   1326
   1327	xprt = rcu_dereference(clnt->cl_xprt);
   1328
   1329	if (xprt->address_strings[format] != NULL)
   1330		return xprt->address_strings[format];
   1331	else
   1332		return "unprintable";
   1333}
   1334EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
   1335
   1336static const struct sockaddr_in rpc_inaddr_loopback = {
   1337	.sin_family		= AF_INET,
   1338	.sin_addr.s_addr	= htonl(INADDR_ANY),
   1339};
   1340
   1341static const struct sockaddr_in6 rpc_in6addr_loopback = {
   1342	.sin6_family		= AF_INET6,
   1343	.sin6_addr		= IN6ADDR_ANY_INIT,
   1344};
   1345
   1346/*
   1347 * Try a getsockname() on a connected datagram socket.  Using a
   1348 * connected datagram socket prevents leaving a socket in TIME_WAIT.
   1349 * This conserves the ephemeral port number space.
   1350 *
   1351 * Returns zero and fills in "buf" if successful; otherwise, a
   1352 * negative errno is returned.
   1353 */
   1354static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
   1355			struct sockaddr *buf)
   1356{
   1357	struct socket *sock;
   1358	int err;
   1359
   1360	err = __sock_create(net, sap->sa_family,
   1361				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
   1362	if (err < 0) {
   1363		dprintk("RPC:       can't create UDP socket (%d)\n", err);
   1364		goto out;
   1365	}
   1366
   1367	switch (sap->sa_family) {
   1368	case AF_INET:
   1369		err = kernel_bind(sock,
   1370				(struct sockaddr *)&rpc_inaddr_loopback,
   1371				sizeof(rpc_inaddr_loopback));
   1372		break;
   1373	case AF_INET6:
   1374		err = kernel_bind(sock,
   1375				(struct sockaddr *)&rpc_in6addr_loopback,
   1376				sizeof(rpc_in6addr_loopback));
   1377		break;
   1378	default:
   1379		err = -EAFNOSUPPORT;
   1380		goto out;
   1381	}
   1382	if (err < 0) {
   1383		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
   1384		goto out_release;
   1385	}
   1386
   1387	err = kernel_connect(sock, sap, salen, 0);
   1388	if (err < 0) {
   1389		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
   1390		goto out_release;
   1391	}
   1392
   1393	err = kernel_getsockname(sock, buf);
   1394	if (err < 0) {
   1395		dprintk("RPC:       getsockname failed (%d)\n", err);
   1396		goto out_release;
   1397	}
   1398
   1399	err = 0;
   1400	if (buf->sa_family == AF_INET6) {
   1401		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
   1402		sin6->sin6_scope_id = 0;
   1403	}
   1404	dprintk("RPC:       %s succeeded\n", __func__);
   1405
   1406out_release:
   1407	sock_release(sock);
   1408out:
   1409	return err;
   1410}
   1411
   1412/*
   1413 * Scraping a connected socket failed, so we don't have a useable
   1414 * local address.  Fallback: generate an address that will prevent
   1415 * the server from calling us back.
   1416 *
   1417 * Returns zero and fills in "buf" if successful; otherwise, a
   1418 * negative errno is returned.
   1419 */
   1420static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
   1421{
   1422	switch (family) {
   1423	case AF_INET:
   1424		if (buflen < sizeof(rpc_inaddr_loopback))
   1425			return -EINVAL;
   1426		memcpy(buf, &rpc_inaddr_loopback,
   1427				sizeof(rpc_inaddr_loopback));
   1428		break;
   1429	case AF_INET6:
   1430		if (buflen < sizeof(rpc_in6addr_loopback))
   1431			return -EINVAL;
   1432		memcpy(buf, &rpc_in6addr_loopback,
   1433				sizeof(rpc_in6addr_loopback));
   1434		break;
   1435	default:
   1436		dprintk("RPC:       %s: address family not supported\n",
   1437			__func__);
   1438		return -EAFNOSUPPORT;
   1439	}
   1440	dprintk("RPC:       %s: succeeded\n", __func__);
   1441	return 0;
   1442}
   1443
   1444/**
   1445 * rpc_localaddr - discover local endpoint address for an RPC client
   1446 * @clnt: RPC client structure
   1447 * @buf: target buffer
   1448 * @buflen: size of target buffer, in bytes
   1449 *
   1450 * Returns zero and fills in "buf" and "buflen" if successful;
   1451 * otherwise, a negative errno is returned.
   1452 *
   1453 * This works even if the underlying transport is not currently connected,
   1454 * or if the upper layer never previously provided a source address.
   1455 *
   1456 * The result of this function call is transient: multiple calls in
   1457 * succession may give different results, depending on how local
   1458 * networking configuration changes over time.
   1459 */
   1460int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
   1461{
   1462	struct sockaddr_storage address;
   1463	struct sockaddr *sap = (struct sockaddr *)&address;
   1464	struct rpc_xprt *xprt;
   1465	struct net *net;
   1466	size_t salen;
   1467	int err;
   1468
   1469	rcu_read_lock();
   1470	xprt = rcu_dereference(clnt->cl_xprt);
   1471	salen = xprt->addrlen;
   1472	memcpy(sap, &xprt->addr, salen);
   1473	net = get_net(xprt->xprt_net);
   1474	rcu_read_unlock();
   1475
   1476	rpc_set_port(sap, 0);
   1477	err = rpc_sockname(net, sap, salen, buf);
   1478	put_net(net);
   1479	if (err != 0)
   1480		/* Couldn't discover local address, return ANYADDR */
   1481		return rpc_anyaddr(sap->sa_family, buf, buflen);
   1482	return 0;
   1483}
   1484EXPORT_SYMBOL_GPL(rpc_localaddr);
   1485
   1486void
   1487rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
   1488{
   1489	struct rpc_xprt *xprt;
   1490
   1491	rcu_read_lock();
   1492	xprt = rcu_dereference(clnt->cl_xprt);
   1493	if (xprt->ops->set_buffer_size)
   1494		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
   1495	rcu_read_unlock();
   1496}
   1497EXPORT_SYMBOL_GPL(rpc_setbufsize);
   1498
   1499/**
   1500 * rpc_net_ns - Get the network namespace for this RPC client
   1501 * @clnt: RPC client to query
   1502 *
   1503 */
   1504struct net *rpc_net_ns(struct rpc_clnt *clnt)
   1505{
   1506	struct net *ret;
   1507
   1508	rcu_read_lock();
   1509	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
   1510	rcu_read_unlock();
   1511	return ret;
   1512}
   1513EXPORT_SYMBOL_GPL(rpc_net_ns);
   1514
   1515/**
   1516 * rpc_max_payload - Get maximum payload size for a transport, in bytes
   1517 * @clnt: RPC client to query
   1518 *
   1519 * For stream transports, this is one RPC record fragment (see RFC
   1520 * 1831), as we don't support multi-record requests yet.  For datagram
   1521 * transports, this is the size of an IP packet minus the IP, UDP, and
   1522 * RPC header sizes.
   1523 */
   1524size_t rpc_max_payload(struct rpc_clnt *clnt)
   1525{
   1526	size_t ret;
   1527
   1528	rcu_read_lock();
   1529	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
   1530	rcu_read_unlock();
   1531	return ret;
   1532}
   1533EXPORT_SYMBOL_GPL(rpc_max_payload);
   1534
   1535/**
   1536 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
   1537 * @clnt: RPC client to query
   1538 */
   1539size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
   1540{
   1541	struct rpc_xprt *xprt;
   1542	size_t ret;
   1543
   1544	rcu_read_lock();
   1545	xprt = rcu_dereference(clnt->cl_xprt);
   1546	ret = xprt->ops->bc_maxpayload(xprt);
   1547	rcu_read_unlock();
   1548	return ret;
   1549}
   1550EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
   1551
   1552unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
   1553{
   1554	struct rpc_xprt *xprt;
   1555	unsigned int ret;
   1556
   1557	rcu_read_lock();
   1558	xprt = rcu_dereference(clnt->cl_xprt);
   1559	ret = xprt->ops->bc_num_slots(xprt);
   1560	rcu_read_unlock();
   1561	return ret;
   1562}
   1563EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
   1564
   1565/**
   1566 * rpc_force_rebind - force transport to check that remote port is unchanged
   1567 * @clnt: client to rebind
   1568 *
   1569 */
   1570void rpc_force_rebind(struct rpc_clnt *clnt)
   1571{
   1572	if (clnt->cl_autobind) {
   1573		rcu_read_lock();
   1574		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
   1575		rcu_read_unlock();
   1576	}
   1577}
   1578EXPORT_SYMBOL_GPL(rpc_force_rebind);
   1579
   1580static int
   1581__rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
   1582{
   1583	task->tk_status = 0;
   1584	task->tk_rpc_status = 0;
   1585	task->tk_action = action;
   1586	return 1;
   1587}
   1588
   1589/*
   1590 * Restart an (async) RPC call. Usually called from within the
   1591 * exit handler.
   1592 */
   1593int
   1594rpc_restart_call(struct rpc_task *task)
   1595{
   1596	return __rpc_restart_call(task, call_start);
   1597}
   1598EXPORT_SYMBOL_GPL(rpc_restart_call);
   1599
   1600/*
   1601 * Restart an (async) RPC call from the call_prepare state.
   1602 * Usually called from within the exit handler.
   1603 */
   1604int
   1605rpc_restart_call_prepare(struct rpc_task *task)
   1606{
   1607	if (task->tk_ops->rpc_call_prepare != NULL)
   1608		return __rpc_restart_call(task, rpc_prepare_task);
   1609	return rpc_restart_call(task);
   1610}
   1611EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
   1612
   1613const char
   1614*rpc_proc_name(const struct rpc_task *task)
   1615{
   1616	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
   1617
   1618	if (proc) {
   1619		if (proc->p_name)
   1620			return proc->p_name;
   1621		else
   1622			return "NULL";
   1623	} else
   1624		return "no proc";
   1625}
   1626
   1627static void
   1628__rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
   1629{
   1630	trace_rpc_call_rpcerror(task, tk_status, rpc_status);
   1631	task->tk_rpc_status = rpc_status;
   1632	rpc_exit(task, tk_status);
   1633}
   1634
   1635static void
   1636rpc_call_rpcerror(struct rpc_task *task, int status)
   1637{
   1638	__rpc_call_rpcerror(task, status, status);
   1639}
   1640
   1641/*
   1642 * 0.  Initial state
   1643 *
   1644 *     Other FSM states can be visited zero or more times, but
   1645 *     this state is visited exactly once for each RPC.
   1646 */
   1647static void
   1648call_start(struct rpc_task *task)
   1649{
   1650	struct rpc_clnt	*clnt = task->tk_client;
   1651	int idx = task->tk_msg.rpc_proc->p_statidx;
   1652
   1653	trace_rpc_request(task);
   1654
   1655	/* Increment call count (version might not be valid for ping) */
   1656	if (clnt->cl_program->version[clnt->cl_vers])
   1657		clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
   1658	clnt->cl_stats->rpccnt++;
   1659	task->tk_action = call_reserve;
   1660	rpc_task_set_transport(task, clnt);
   1661}
   1662
   1663/*
   1664 * 1.	Reserve an RPC call slot
   1665 */
   1666static void
   1667call_reserve(struct rpc_task *task)
   1668{
   1669	task->tk_status  = 0;
   1670	task->tk_action  = call_reserveresult;
   1671	xprt_reserve(task);
   1672}
   1673
   1674static void call_retry_reserve(struct rpc_task *task);
   1675
   1676/*
   1677 * 1b.	Grok the result of xprt_reserve()
   1678 */
   1679static void
   1680call_reserveresult(struct rpc_task *task)
   1681{
   1682	int status = task->tk_status;
   1683
   1684	/*
   1685	 * After a call to xprt_reserve(), we must have either
   1686	 * a request slot or else an error status.
   1687	 */
   1688	task->tk_status = 0;
   1689	if (status >= 0) {
   1690		if (task->tk_rqstp) {
   1691			task->tk_action = call_refresh;
   1692			return;
   1693		}
   1694
   1695		rpc_call_rpcerror(task, -EIO);
   1696		return;
   1697	}
   1698
   1699	switch (status) {
   1700	case -ENOMEM:
   1701		rpc_delay(task, HZ >> 2);
   1702		fallthrough;
   1703	case -EAGAIN:	/* woken up; retry */
   1704		task->tk_action = call_retry_reserve;
   1705		return;
   1706	default:
   1707		rpc_call_rpcerror(task, status);
   1708	}
   1709}
   1710
   1711/*
   1712 * 1c.	Retry reserving an RPC call slot
   1713 */
   1714static void
   1715call_retry_reserve(struct rpc_task *task)
   1716{
   1717	task->tk_status  = 0;
   1718	task->tk_action  = call_reserveresult;
   1719	xprt_retry_reserve(task);
   1720}
   1721
   1722/*
   1723 * 2.	Bind and/or refresh the credentials
   1724 */
   1725static void
   1726call_refresh(struct rpc_task *task)
   1727{
   1728	task->tk_action = call_refreshresult;
   1729	task->tk_status = 0;
   1730	task->tk_client->cl_stats->rpcauthrefresh++;
   1731	rpcauth_refreshcred(task);
   1732}
   1733
   1734/*
   1735 * 2a.	Process the results of a credential refresh
   1736 */
   1737static void
   1738call_refreshresult(struct rpc_task *task)
   1739{
   1740	int status = task->tk_status;
   1741
   1742	task->tk_status = 0;
   1743	task->tk_action = call_refresh;
   1744	switch (status) {
   1745	case 0:
   1746		if (rpcauth_uptodatecred(task)) {
   1747			task->tk_action = call_allocate;
   1748			return;
   1749		}
   1750		/* Use rate-limiting and a max number of retries if refresh
   1751		 * had status 0 but failed to update the cred.
   1752		 */
   1753		fallthrough;
   1754	case -ETIMEDOUT:
   1755		rpc_delay(task, 3*HZ);
   1756		fallthrough;
   1757	case -EAGAIN:
   1758		status = -EACCES;
   1759		fallthrough;
   1760	case -EKEYEXPIRED:
   1761		if (!task->tk_cred_retry)
   1762			break;
   1763		task->tk_cred_retry--;
   1764		trace_rpc_retry_refresh_status(task);
   1765		return;
   1766	case -ENOMEM:
   1767		rpc_delay(task, HZ >> 4);
   1768		return;
   1769	}
   1770	trace_rpc_refresh_status(task);
   1771	rpc_call_rpcerror(task, status);
   1772}
   1773
   1774/*
   1775 * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
   1776 *	(Note: buffer memory is freed in xprt_release).
   1777 */
   1778static void
   1779call_allocate(struct rpc_task *task)
   1780{
   1781	const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
   1782	struct rpc_rqst *req = task->tk_rqstp;
   1783	struct rpc_xprt *xprt = req->rq_xprt;
   1784	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
   1785	int status;
   1786
   1787	task->tk_status = 0;
   1788	task->tk_action = call_encode;
   1789
   1790	if (req->rq_buffer)
   1791		return;
   1792
   1793	if (proc->p_proc != 0) {
   1794		BUG_ON(proc->p_arglen == 0);
   1795		if (proc->p_decode != NULL)
   1796			BUG_ON(proc->p_replen == 0);
   1797	}
   1798
   1799	/*
   1800	 * Calculate the size (in quads) of the RPC call
   1801	 * and reply headers, and convert both values
   1802	 * to byte sizes.
   1803	 */
   1804	req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
   1805			   proc->p_arglen;
   1806	req->rq_callsize <<= 2;
   1807	/*
   1808	 * Note: the reply buffer must at minimum allocate enough space
   1809	 * for the 'struct accepted_reply' from RFC5531.
   1810	 */
   1811	req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
   1812			max_t(size_t, proc->p_replen, 2);
   1813	req->rq_rcvsize <<= 2;
   1814
   1815	status = xprt->ops->buf_alloc(task);
   1816	trace_rpc_buf_alloc(task, status);
   1817	if (status == 0)
   1818		return;
   1819	if (status != -ENOMEM) {
   1820		rpc_call_rpcerror(task, status);
   1821		return;
   1822	}
   1823
   1824	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
   1825		task->tk_action = call_allocate;
   1826		rpc_delay(task, HZ>>4);
   1827		return;
   1828	}
   1829
   1830	rpc_call_rpcerror(task, -ERESTARTSYS);
   1831}
   1832
   1833static int
   1834rpc_task_need_encode(struct rpc_task *task)
   1835{
   1836	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
   1837		(!(task->tk_flags & RPC_TASK_SENT) ||
   1838		 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
   1839		 xprt_request_need_retransmit(task));
   1840}
   1841
   1842static void
   1843rpc_xdr_encode(struct rpc_task *task)
   1844{
   1845	struct rpc_rqst	*req = task->tk_rqstp;
   1846	struct xdr_stream xdr;
   1847
   1848	xdr_buf_init(&req->rq_snd_buf,
   1849		     req->rq_buffer,
   1850		     req->rq_callsize);
   1851	xdr_buf_init(&req->rq_rcv_buf,
   1852		     req->rq_rbuffer,
   1853		     req->rq_rcvsize);
   1854
   1855	req->rq_reply_bytes_recvd = 0;
   1856	req->rq_snd_buf.head[0].iov_len = 0;
   1857	xdr_init_encode(&xdr, &req->rq_snd_buf,
   1858			req->rq_snd_buf.head[0].iov_base, req);
   1859	xdr_free_bvec(&req->rq_snd_buf);
   1860	if (rpc_encode_header(task, &xdr))
   1861		return;
   1862
   1863	task->tk_status = rpcauth_wrap_req(task, &xdr);
   1864}
   1865
   1866/*
   1867 * 3.	Encode arguments of an RPC call
   1868 */
   1869static void
   1870call_encode(struct rpc_task *task)
   1871{
   1872	if (!rpc_task_need_encode(task))
   1873		goto out;
   1874
   1875	/* Dequeue task from the receive queue while we're encoding */
   1876	xprt_request_dequeue_xprt(task);
   1877	/* Encode here so that rpcsec_gss can use correct sequence number. */
   1878	rpc_xdr_encode(task);
   1879	/* Add task to reply queue before transmission to avoid races */
   1880	if (task->tk_status == 0 && rpc_reply_expected(task))
   1881		task->tk_status = xprt_request_enqueue_receive(task);
   1882	/* Did the encode result in an error condition? */
   1883	if (task->tk_status != 0) {
   1884		/* Was the error nonfatal? */
   1885		switch (task->tk_status) {
   1886		case -EAGAIN:
   1887		case -ENOMEM:
   1888			rpc_delay(task, HZ >> 4);
   1889			break;
   1890		case -EKEYEXPIRED:
   1891			if (!task->tk_cred_retry) {
   1892				rpc_exit(task, task->tk_status);
   1893			} else {
   1894				task->tk_action = call_refresh;
   1895				task->tk_cred_retry--;
   1896				trace_rpc_retry_refresh_status(task);
   1897			}
   1898			break;
   1899		default:
   1900			rpc_call_rpcerror(task, task->tk_status);
   1901		}
   1902		return;
   1903	}
   1904
   1905	xprt_request_enqueue_transmit(task);
   1906out:
   1907	task->tk_action = call_transmit;
   1908	/* Check that the connection is OK */
   1909	if (!xprt_bound(task->tk_xprt))
   1910		task->tk_action = call_bind;
   1911	else if (!xprt_connected(task->tk_xprt))
   1912		task->tk_action = call_connect;
   1913}
   1914
   1915/*
   1916 * Helpers to check if the task was already transmitted, and
   1917 * to take action when that is the case.
   1918 */
   1919static bool
   1920rpc_task_transmitted(struct rpc_task *task)
   1921{
   1922	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
   1923}
   1924
   1925static void
   1926rpc_task_handle_transmitted(struct rpc_task *task)
   1927{
   1928	xprt_end_transmit(task);
   1929	task->tk_action = call_transmit_status;
   1930}
   1931
   1932/*
   1933 * 4.	Get the server port number if not yet set
   1934 */
   1935static void
   1936call_bind(struct rpc_task *task)
   1937{
   1938	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
   1939
   1940	if (rpc_task_transmitted(task)) {
   1941		rpc_task_handle_transmitted(task);
   1942		return;
   1943	}
   1944
   1945	if (xprt_bound(xprt)) {
   1946		task->tk_action = call_connect;
   1947		return;
   1948	}
   1949
   1950	task->tk_action = call_bind_status;
   1951	if (!xprt_prepare_transmit(task))
   1952		return;
   1953
   1954	xprt->ops->rpcbind(task);
   1955}
   1956
   1957/*
   1958 * 4a.	Sort out bind result
   1959 */
   1960static void
   1961call_bind_status(struct rpc_task *task)
   1962{
   1963	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
   1964	int status = -EIO;
   1965
   1966	if (rpc_task_transmitted(task)) {
   1967		rpc_task_handle_transmitted(task);
   1968		return;
   1969	}
   1970
   1971	if (task->tk_status >= 0)
   1972		goto out_next;
   1973	if (xprt_bound(xprt)) {
   1974		task->tk_status = 0;
   1975		goto out_next;
   1976	}
   1977
   1978	switch (task->tk_status) {
   1979	case -ENOMEM:
   1980		rpc_delay(task, HZ >> 2);
   1981		goto retry_timeout;
   1982	case -EACCES:
   1983		trace_rpcb_prog_unavail_err(task);
   1984		/* fail immediately if this is an RPC ping */
   1985		if (task->tk_msg.rpc_proc->p_proc == 0) {
   1986			status = -EOPNOTSUPP;
   1987			break;
   1988		}
   1989		if (task->tk_rebind_retry == 0)
   1990			break;
   1991		task->tk_rebind_retry--;
   1992		rpc_delay(task, 3*HZ);
   1993		goto retry_timeout;
   1994	case -ENOBUFS:
   1995		rpc_delay(task, HZ >> 2);
   1996		goto retry_timeout;
   1997	case -EAGAIN:
   1998		goto retry_timeout;
   1999	case -ETIMEDOUT:
   2000		trace_rpcb_timeout_err(task);
   2001		goto retry_timeout;
   2002	case -EPFNOSUPPORT:
   2003		/* server doesn't support any rpcbind version we know of */
   2004		trace_rpcb_bind_version_err(task);
   2005		break;
   2006	case -EPROTONOSUPPORT:
   2007		trace_rpcb_bind_version_err(task);
   2008		goto retry_timeout;
   2009	case -ECONNREFUSED:		/* connection problems */
   2010	case -ECONNRESET:
   2011	case -ECONNABORTED:
   2012	case -ENOTCONN:
   2013	case -EHOSTDOWN:
   2014	case -ENETDOWN:
   2015	case -EHOSTUNREACH:
   2016	case -ENETUNREACH:
   2017	case -EPIPE:
   2018		trace_rpcb_unreachable_err(task);
   2019		if (!RPC_IS_SOFTCONN(task)) {
   2020			rpc_delay(task, 5*HZ);
   2021			goto retry_timeout;
   2022		}
   2023		status = task->tk_status;
   2024		break;
   2025	default:
   2026		trace_rpcb_unrecognized_err(task);
   2027	}
   2028
   2029	rpc_call_rpcerror(task, status);
   2030	return;
   2031out_next:
   2032	task->tk_action = call_connect;
   2033	return;
   2034retry_timeout:
   2035	task->tk_status = 0;
   2036	task->tk_action = call_bind;
   2037	rpc_check_timeout(task);
   2038}
   2039
   2040/*
   2041 * 4b.	Connect to the RPC server
   2042 */
   2043static void
   2044call_connect(struct rpc_task *task)
   2045{
   2046	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
   2047
   2048	if (rpc_task_transmitted(task)) {
   2049		rpc_task_handle_transmitted(task);
   2050		return;
   2051	}
   2052
   2053	if (xprt_connected(xprt)) {
   2054		task->tk_action = call_transmit;
   2055		return;
   2056	}
   2057
   2058	task->tk_action = call_connect_status;
   2059	if (task->tk_status < 0)
   2060		return;
   2061	if (task->tk_flags & RPC_TASK_NOCONNECT) {
   2062		rpc_call_rpcerror(task, -ENOTCONN);
   2063		return;
   2064	}
   2065	if (!xprt_prepare_transmit(task))
   2066		return;
   2067	xprt_connect(task);
   2068}
   2069
   2070/*
   2071 * 4c.	Sort out connect result
   2072 */
   2073static void
   2074call_connect_status(struct rpc_task *task)
   2075{
   2076	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
   2077	struct rpc_clnt *clnt = task->tk_client;
   2078	int status = task->tk_status;
   2079
   2080	if (rpc_task_transmitted(task)) {
   2081		rpc_task_handle_transmitted(task);
   2082		return;
   2083	}
   2084
   2085	trace_rpc_connect_status(task);
   2086
   2087	if (task->tk_status == 0) {
   2088		clnt->cl_stats->netreconn++;
   2089		goto out_next;
   2090	}
   2091	if (xprt_connected(xprt)) {
   2092		task->tk_status = 0;
   2093		goto out_next;
   2094	}
   2095
   2096	task->tk_status = 0;
   2097	switch (status) {
   2098	case -ECONNREFUSED:
   2099		/* A positive refusal suggests a rebind is needed. */
   2100		if (RPC_IS_SOFTCONN(task))
   2101			break;
   2102		if (clnt->cl_autobind) {
   2103			rpc_force_rebind(clnt);
   2104			goto out_retry;
   2105		}
   2106		fallthrough;
   2107	case -ECONNRESET:
   2108	case -ECONNABORTED:
   2109	case -ENETDOWN:
   2110	case -ENETUNREACH:
   2111	case -EHOSTUNREACH:
   2112	case -EPIPE:
   2113	case -EPROTO:
   2114		xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
   2115					    task->tk_rqstp->rq_connect_cookie);
   2116		if (RPC_IS_SOFTCONN(task))
   2117			break;
   2118		/* retry with existing socket, after a delay */
   2119		rpc_delay(task, 3*HZ);
   2120		fallthrough;
   2121	case -EADDRINUSE:
   2122	case -ENOTCONN:
   2123	case -EAGAIN:
   2124	case -ETIMEDOUT:
   2125		if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
   2126		    (task->tk_flags & RPC_TASK_MOVEABLE) &&
   2127		    test_bit(XPRT_REMOVE, &xprt->state)) {
   2128			struct rpc_xprt *saved = task->tk_xprt;
   2129			struct rpc_xprt_switch *xps;
   2130
   2131			rcu_read_lock();
   2132			xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
   2133			rcu_read_unlock();
   2134			if (xps->xps_nxprts > 1) {
   2135				long value;
   2136
   2137				xprt_release(task);
   2138				value = atomic_long_dec_return(&xprt->queuelen);
   2139				if (value == 0)
   2140					rpc_xprt_switch_remove_xprt(xps, saved);
   2141				xprt_put(saved);
   2142				task->tk_xprt = NULL;
   2143				task->tk_action = call_start;
   2144			}
   2145			xprt_switch_put(xps);
   2146			if (!task->tk_xprt)
   2147				return;
   2148		}
   2149		goto out_retry;
   2150	case -ENOBUFS:
   2151		rpc_delay(task, HZ >> 2);
   2152		goto out_retry;
   2153	}
   2154	rpc_call_rpcerror(task, status);
   2155	return;
   2156out_next:
   2157	task->tk_action = call_transmit;
   2158	return;
   2159out_retry:
   2160	/* Check for timeouts before looping back to call_bind */
   2161	task->tk_action = call_bind;
   2162	rpc_check_timeout(task);
   2163}
   2164
   2165/*
   2166 * 5.	Transmit the RPC request, and wait for reply
   2167 */
   2168static void
   2169call_transmit(struct rpc_task *task)
   2170{
   2171	if (rpc_task_transmitted(task)) {
   2172		rpc_task_handle_transmitted(task);
   2173		return;
   2174	}
   2175
   2176	task->tk_action = call_transmit_status;
   2177	if (!xprt_prepare_transmit(task))
   2178		return;
   2179	task->tk_status = 0;
   2180	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
   2181		if (!xprt_connected(task->tk_xprt)) {
   2182			task->tk_status = -ENOTCONN;
   2183			return;
   2184		}
   2185		xprt_transmit(task);
   2186	}
   2187	xprt_end_transmit(task);
   2188}
   2189
   2190/*
   2191 * 5a.	Handle cleanup after a transmission
   2192 */
   2193static void
   2194call_transmit_status(struct rpc_task *task)
   2195{
   2196	task->tk_action = call_status;
   2197
   2198	/*
   2199	 * Common case: success.  Force the compiler to put this
   2200	 * test first.
   2201	 */
   2202	if (rpc_task_transmitted(task)) {
   2203		task->tk_status = 0;
   2204		xprt_request_wait_receive(task);
   2205		return;
   2206	}
   2207
   2208	switch (task->tk_status) {
   2209	default:
   2210		break;
   2211	case -EBADMSG:
   2212		task->tk_status = 0;
   2213		task->tk_action = call_encode;
   2214		break;
   2215		/*
   2216		 * Special cases: if we've been waiting on the
   2217		 * socket's write_space() callback, or if the
   2218		 * socket just returned a connection error,
   2219		 * then hold onto the transport lock.
   2220		 */
   2221	case -ENOMEM:
   2222	case -ENOBUFS:
   2223		rpc_delay(task, HZ>>2);
   2224		fallthrough;
   2225	case -EBADSLT:
   2226	case -EAGAIN:
   2227		task->tk_action = call_transmit;
   2228		task->tk_status = 0;
   2229		break;
   2230	case -ECONNREFUSED:
   2231	case -EHOSTDOWN:
   2232	case -ENETDOWN:
   2233	case -EHOSTUNREACH:
   2234	case -ENETUNREACH:
   2235	case -EPERM:
   2236		if (RPC_IS_SOFTCONN(task)) {
   2237			if (!task->tk_msg.rpc_proc->p_proc)
   2238				trace_xprt_ping(task->tk_xprt,
   2239						task->tk_status);
   2240			rpc_call_rpcerror(task, task->tk_status);
   2241			return;
   2242		}
   2243		fallthrough;
   2244	case -ECONNRESET:
   2245	case -ECONNABORTED:
   2246	case -EADDRINUSE:
   2247	case -ENOTCONN:
   2248	case -EPIPE:
   2249		task->tk_action = call_bind;
   2250		task->tk_status = 0;
   2251		break;
   2252	}
   2253	rpc_check_timeout(task);
   2254}
   2255
   2256#if defined(CONFIG_SUNRPC_BACKCHANNEL)
   2257static void call_bc_transmit(struct rpc_task *task);
   2258static void call_bc_transmit_status(struct rpc_task *task);
   2259
   2260static void
   2261call_bc_encode(struct rpc_task *task)
   2262{
   2263	xprt_request_enqueue_transmit(task);
   2264	task->tk_action = call_bc_transmit;
   2265}
   2266
   2267/*
   2268 * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
   2269 * addition, disconnect on connectivity errors.
   2270 */
   2271static void
   2272call_bc_transmit(struct rpc_task *task)
   2273{
   2274	task->tk_action = call_bc_transmit_status;
   2275	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
   2276		if (!xprt_prepare_transmit(task))
   2277			return;
   2278		task->tk_status = 0;
   2279		xprt_transmit(task);
   2280	}
   2281	xprt_end_transmit(task);
   2282}
   2283
   2284static void
   2285call_bc_transmit_status(struct rpc_task *task)
   2286{
   2287	struct rpc_rqst *req = task->tk_rqstp;
   2288
   2289	if (rpc_task_transmitted(task))
   2290		task->tk_status = 0;
   2291
   2292	switch (task->tk_status) {
   2293	case 0:
   2294		/* Success */
   2295	case -ENETDOWN:
   2296	case -EHOSTDOWN:
   2297	case -EHOSTUNREACH:
   2298	case -ENETUNREACH:
   2299	case -ECONNRESET:
   2300	case -ECONNREFUSED:
   2301	case -EADDRINUSE:
   2302	case -ENOTCONN:
   2303	case -EPIPE:
   2304		break;
   2305	case -ENOMEM:
   2306	case -ENOBUFS:
   2307		rpc_delay(task, HZ>>2);
   2308		fallthrough;
   2309	case -EBADSLT:
   2310	case -EAGAIN:
   2311		task->tk_status = 0;
   2312		task->tk_action = call_bc_transmit;
   2313		return;
   2314	case -ETIMEDOUT:
   2315		/*
   2316		 * Problem reaching the server.  Disconnect and let the
   2317		 * forechannel reestablish the connection.  The server will
   2318		 * have to retransmit the backchannel request and we'll
   2319		 * reprocess it.  Since these ops are idempotent, there's no
   2320		 * need to cache our reply at this time.
   2321		 */
   2322		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
   2323			"error: %d\n", task->tk_status);
   2324		xprt_conditional_disconnect(req->rq_xprt,
   2325			req->rq_connect_cookie);
   2326		break;
   2327	default:
   2328		/*
   2329		 * We were unable to reply and will have to drop the
   2330		 * request.  The server should reconnect and retransmit.
   2331		 */
   2332		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
   2333			"error: %d\n", task->tk_status);
   2334		break;
   2335	}
   2336	task->tk_action = rpc_exit_task;
   2337}
   2338#endif /* CONFIG_SUNRPC_BACKCHANNEL */
   2339
   2340/*
   2341 * 6.	Sort out the RPC call status
   2342 */
   2343static void
   2344call_status(struct rpc_task *task)
   2345{
   2346	struct rpc_clnt	*clnt = task->tk_client;
   2347	int		status;
   2348
   2349	if (!task->tk_msg.rpc_proc->p_proc)
   2350		trace_xprt_ping(task->tk_xprt, task->tk_status);
   2351
   2352	status = task->tk_status;
   2353	if (status >= 0) {
   2354		task->tk_action = call_decode;
   2355		return;
   2356	}
   2357
   2358	trace_rpc_call_status(task);
   2359	task->tk_status = 0;
   2360	switch(status) {
   2361	case -EHOSTDOWN:
   2362	case -ENETDOWN:
   2363	case -EHOSTUNREACH:
   2364	case -ENETUNREACH:
   2365	case -EPERM:
   2366		if (RPC_IS_SOFTCONN(task))
   2367			goto out_exit;
   2368		/*
   2369		 * Delay any retries for 3 seconds, then handle as if it
   2370		 * were a timeout.
   2371		 */
   2372		rpc_delay(task, 3*HZ);
   2373		fallthrough;
   2374	case -ETIMEDOUT:
   2375		break;
   2376	case -ECONNREFUSED:
   2377	case -ECONNRESET:
   2378	case -ECONNABORTED:
   2379	case -ENOTCONN:
   2380		rpc_force_rebind(clnt);
   2381		break;
   2382	case -EADDRINUSE:
   2383		rpc_delay(task, 3*HZ);
   2384		fallthrough;
   2385	case -EPIPE:
   2386	case -EAGAIN:
   2387		break;
   2388	case -ENFILE:
   2389	case -ENOBUFS:
   2390	case -ENOMEM:
   2391		rpc_delay(task, HZ>>2);
   2392		break;
   2393	case -EIO:
   2394		/* shutdown or soft timeout */
   2395		goto out_exit;
   2396	default:
   2397		if (clnt->cl_chatty)
   2398			printk("%s: RPC call returned error %d\n",
   2399			       clnt->cl_program->name, -status);
   2400		goto out_exit;
   2401	}
   2402	task->tk_action = call_encode;
   2403	if (status != -ECONNRESET && status != -ECONNABORTED)
   2404		rpc_check_timeout(task);
   2405	return;
   2406out_exit:
   2407	rpc_call_rpcerror(task, status);
   2408}
   2409
   2410static bool
   2411rpc_check_connected(const struct rpc_rqst *req)
   2412{
   2413	/* No allocated request or transport? return true */
   2414	if (!req || !req->rq_xprt)
   2415		return true;
   2416	return xprt_connected(req->rq_xprt);
   2417}
   2418
   2419static void
   2420rpc_check_timeout(struct rpc_task *task)
   2421{
   2422	struct rpc_clnt	*clnt = task->tk_client;
   2423
   2424	if (RPC_SIGNALLED(task)) {
   2425		rpc_call_rpcerror(task, -ERESTARTSYS);
   2426		return;
   2427	}
   2428
   2429	if (xprt_adjust_timeout(task->tk_rqstp) == 0)
   2430		return;
   2431
   2432	trace_rpc_timeout_status(task);
   2433	task->tk_timeouts++;
   2434
   2435	if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
   2436		rpc_call_rpcerror(task, -ETIMEDOUT);
   2437		return;
   2438	}
   2439
   2440	if (RPC_IS_SOFT(task)) {
   2441		/*
   2442		 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
   2443		 * been sent, it should time out only if the transport
   2444		 * connection gets terminally broken.
   2445		 */
   2446		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
   2447		    rpc_check_connected(task->tk_rqstp))
   2448			return;
   2449
   2450		if (clnt->cl_chatty) {
   2451			pr_notice_ratelimited(
   2452				"%s: server %s not responding, timed out\n",
   2453				clnt->cl_program->name,
   2454				task->tk_xprt->servername);
   2455		}
   2456		if (task->tk_flags & RPC_TASK_TIMEOUT)
   2457			rpc_call_rpcerror(task, -ETIMEDOUT);
   2458		else
   2459			__rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
   2460		return;
   2461	}
   2462
   2463	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
   2464		task->tk_flags |= RPC_CALL_MAJORSEEN;
   2465		if (clnt->cl_chatty) {
   2466			pr_notice_ratelimited(
   2467				"%s: server %s not responding, still trying\n",
   2468				clnt->cl_program->name,
   2469				task->tk_xprt->servername);
   2470		}
   2471	}
   2472	rpc_force_rebind(clnt);
   2473	/*
   2474	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
   2475	 * event? RFC2203 requires the server to drop all such requests.
   2476	 */
   2477	rpcauth_invalcred(task);
   2478}
   2479
   2480/*
   2481 * 7.	Decode the RPC reply
   2482 */
   2483static void
   2484call_decode(struct rpc_task *task)
   2485{
   2486	struct rpc_clnt	*clnt = task->tk_client;
   2487	struct rpc_rqst	*req = task->tk_rqstp;
   2488	struct xdr_stream xdr;
   2489	int err;
   2490
   2491	if (!task->tk_msg.rpc_proc->p_decode) {
   2492		task->tk_action = rpc_exit_task;
   2493		return;
   2494	}
   2495
   2496	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
   2497		if (clnt->cl_chatty) {
   2498			pr_notice_ratelimited("%s: server %s OK\n",
   2499				clnt->cl_program->name,
   2500				task->tk_xprt->servername);
   2501		}
   2502		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
   2503	}
   2504
   2505	/*
   2506	 * Did we ever call xprt_complete_rqst()? If not, we should assume
   2507	 * the message is incomplete.
   2508	 */
   2509	err = -EAGAIN;
   2510	if (!req->rq_reply_bytes_recvd)
   2511		goto out;
   2512
   2513	/* Ensure that we see all writes made by xprt_complete_rqst()
   2514	 * before it changed req->rq_reply_bytes_recvd.
   2515	 */
   2516	smp_rmb();
   2517
   2518	req->rq_rcv_buf.len = req->rq_private_buf.len;
   2519	trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
   2520
   2521	/* Check that the softirq receive buffer is valid */
   2522	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
   2523				sizeof(req->rq_rcv_buf)) != 0);
   2524
   2525	xdr_init_decode(&xdr, &req->rq_rcv_buf,
   2526			req->rq_rcv_buf.head[0].iov_base, req);
   2527	err = rpc_decode_header(task, &xdr);
   2528out:
   2529	switch (err) {
   2530	case 0:
   2531		task->tk_action = rpc_exit_task;
   2532		task->tk_status = rpcauth_unwrap_resp(task, &xdr);
   2533		return;
   2534	case -EAGAIN:
   2535		task->tk_status = 0;
   2536		if (task->tk_client->cl_discrtry)
   2537			xprt_conditional_disconnect(req->rq_xprt,
   2538						    req->rq_connect_cookie);
   2539		task->tk_action = call_encode;
   2540		rpc_check_timeout(task);
   2541		break;
   2542	case -EKEYREJECTED:
   2543		task->tk_action = call_reserve;
   2544		rpc_check_timeout(task);
   2545		rpcauth_invalcred(task);
   2546		/* Ensure we obtain a new XID if we retry! */
   2547		xprt_release(task);
   2548	}
   2549}
   2550
   2551static int
   2552rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
   2553{
   2554	struct rpc_clnt *clnt = task->tk_client;
   2555	struct rpc_rqst	*req = task->tk_rqstp;
   2556	__be32 *p;
   2557	int error;
   2558
   2559	error = -EMSGSIZE;
   2560	p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
   2561	if (!p)
   2562		goto out_fail;
   2563	*p++ = req->rq_xid;
   2564	*p++ = rpc_call;
   2565	*p++ = cpu_to_be32(RPC_VERSION);
   2566	*p++ = cpu_to_be32(clnt->cl_prog);
   2567	*p++ = cpu_to_be32(clnt->cl_vers);
   2568	*p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
   2569
   2570	error = rpcauth_marshcred(task, xdr);
   2571	if (error < 0)
   2572		goto out_fail;
   2573	return 0;
   2574out_fail:
   2575	trace_rpc_bad_callhdr(task);
   2576	rpc_call_rpcerror(task, error);
   2577	return error;
   2578}
   2579
   2580static noinline int
   2581rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
   2582{
   2583	struct rpc_clnt *clnt = task->tk_client;
   2584	int error;
   2585	__be32 *p;
   2586
   2587	/* RFC-1014 says that the representation of XDR data must be a
   2588	 * multiple of four bytes
   2589	 * - if it isn't pointer subtraction in the NFS client may give
   2590	 *   undefined results
   2591	 */
   2592	if (task->tk_rqstp->rq_rcv_buf.len & 3)
   2593		goto out_unparsable;
   2594
   2595	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
   2596	if (!p)
   2597		goto out_unparsable;
   2598	p++;	/* skip XID */
   2599	if (*p++ != rpc_reply)
   2600		goto out_unparsable;
   2601	if (*p++ != rpc_msg_accepted)
   2602		goto out_msg_denied;
   2603
   2604	error = rpcauth_checkverf(task, xdr);
   2605	if (error)
   2606		goto out_verifier;
   2607
   2608	p = xdr_inline_decode(xdr, sizeof(*p));
   2609	if (!p)
   2610		goto out_unparsable;
   2611	switch (*p) {
   2612	case rpc_success:
   2613		return 0;
   2614	case rpc_prog_unavail:
   2615		trace_rpc__prog_unavail(task);
   2616		error = -EPFNOSUPPORT;
   2617		goto out_err;
   2618	case rpc_prog_mismatch:
   2619		trace_rpc__prog_mismatch(task);
   2620		error = -EPROTONOSUPPORT;
   2621		goto out_err;
   2622	case rpc_proc_unavail:
   2623		trace_rpc__proc_unavail(task);
   2624		error = -EOPNOTSUPP;
   2625		goto out_err;
   2626	case rpc_garbage_args:
   2627	case rpc_system_err:
   2628		trace_rpc__garbage_args(task);
   2629		error = -EIO;
   2630		break;
   2631	default:
   2632		goto out_unparsable;
   2633	}
   2634
   2635out_garbage:
   2636	clnt->cl_stats->rpcgarbage++;
   2637	if (task->tk_garb_retry) {
   2638		task->tk_garb_retry--;
   2639		task->tk_action = call_encode;
   2640		return -EAGAIN;
   2641	}
   2642out_err:
   2643	rpc_call_rpcerror(task, error);
   2644	return error;
   2645
   2646out_unparsable:
   2647	trace_rpc__unparsable(task);
   2648	error = -EIO;
   2649	goto out_garbage;
   2650
   2651out_verifier:
   2652	trace_rpc_bad_verifier(task);
   2653	goto out_garbage;
   2654
   2655out_msg_denied:
   2656	error = -EACCES;
   2657	p = xdr_inline_decode(xdr, sizeof(*p));
   2658	if (!p)
   2659		goto out_unparsable;
   2660	switch (*p++) {
   2661	case rpc_auth_error:
   2662		break;
   2663	case rpc_mismatch:
   2664		trace_rpc__mismatch(task);
   2665		error = -EPROTONOSUPPORT;
   2666		goto out_err;
   2667	default:
   2668		goto out_unparsable;
   2669	}
   2670
   2671	p = xdr_inline_decode(xdr, sizeof(*p));
   2672	if (!p)
   2673		goto out_unparsable;
   2674	switch (*p++) {
   2675	case rpc_autherr_rejectedcred:
   2676	case rpc_autherr_rejectedverf:
   2677	case rpcsec_gsserr_credproblem:
   2678	case rpcsec_gsserr_ctxproblem:
   2679		if (!task->tk_cred_retry)
   2680			break;
   2681		task->tk_cred_retry--;
   2682		trace_rpc__stale_creds(task);
   2683		return -EKEYREJECTED;
   2684	case rpc_autherr_badcred:
   2685	case rpc_autherr_badverf:
   2686		/* possibly garbled cred/verf? */
   2687		if (!task->tk_garb_retry)
   2688			break;
   2689		task->tk_garb_retry--;
   2690		trace_rpc__bad_creds(task);
   2691		task->tk_action = call_encode;
   2692		return -EAGAIN;
   2693	case rpc_autherr_tooweak:
   2694		trace_rpc__auth_tooweak(task);
   2695		pr_warn("RPC: server %s requires stronger authentication.\n",
   2696			task->tk_xprt->servername);
   2697		break;
   2698	default:
   2699		goto out_unparsable;
   2700	}
   2701	goto out_err;
   2702}
   2703
   2704static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
   2705		const void *obj)
   2706{
   2707}
   2708
   2709static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
   2710		void *obj)
   2711{
   2712	return 0;
   2713}
   2714
   2715static const struct rpc_procinfo rpcproc_null = {
   2716	.p_encode = rpcproc_encode_null,
   2717	.p_decode = rpcproc_decode_null,
   2718};
   2719
   2720static const struct rpc_procinfo rpcproc_null_noreply = {
   2721	.p_encode = rpcproc_encode_null,
   2722};
   2723
   2724static void
   2725rpc_null_call_prepare(struct rpc_task *task, void *data)
   2726{
   2727	task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
   2728	rpc_call_start(task);
   2729}
   2730
   2731static const struct rpc_call_ops rpc_null_ops = {
   2732	.rpc_call_prepare = rpc_null_call_prepare,
   2733	.rpc_call_done = rpc_default_callback,
   2734};
   2735
   2736static
   2737struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
   2738		struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
   2739		const struct rpc_call_ops *ops, void *data)
   2740{
   2741	struct rpc_message msg = {
   2742		.rpc_proc = &rpcproc_null,
   2743	};
   2744	struct rpc_task_setup task_setup_data = {
   2745		.rpc_client = clnt,
   2746		.rpc_xprt = xprt,
   2747		.rpc_message = &msg,
   2748		.rpc_op_cred = cred,
   2749		.callback_ops = ops ?: &rpc_null_ops,
   2750		.callback_data = data,
   2751		.flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
   2752			 RPC_TASK_NULLCREDS,
   2753	};
   2754
   2755	return rpc_run_task(&task_setup_data);
   2756}
   2757
   2758struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
   2759{
   2760	return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
   2761}
   2762EXPORT_SYMBOL_GPL(rpc_call_null);
   2763
   2764static int rpc_ping(struct rpc_clnt *clnt)
   2765{
   2766	struct rpc_task	*task;
   2767	int status;
   2768
   2769	task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
   2770	if (IS_ERR(task))
   2771		return PTR_ERR(task);
   2772	status = task->tk_status;
   2773	rpc_put_task(task);
   2774	return status;
   2775}
   2776
   2777static int rpc_ping_noreply(struct rpc_clnt *clnt)
   2778{
   2779	struct rpc_message msg = {
   2780		.rpc_proc = &rpcproc_null_noreply,
   2781	};
   2782	struct rpc_task_setup task_setup_data = {
   2783		.rpc_client = clnt,
   2784		.rpc_message = &msg,
   2785		.callback_ops = &rpc_null_ops,
   2786		.flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
   2787	};
   2788	struct rpc_task	*task;
   2789	int status;
   2790
   2791	task = rpc_run_task(&task_setup_data);
   2792	if (IS_ERR(task))
   2793		return PTR_ERR(task);
   2794	status = task->tk_status;
   2795	rpc_put_task(task);
   2796	return status;
   2797}
   2798
   2799struct rpc_cb_add_xprt_calldata {
   2800	struct rpc_xprt_switch *xps;
   2801	struct rpc_xprt *xprt;
   2802};
   2803
   2804static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
   2805{
   2806	struct rpc_cb_add_xprt_calldata *data = calldata;
   2807
   2808	if (task->tk_status == 0)
   2809		rpc_xprt_switch_add_xprt(data->xps, data->xprt);
   2810}
   2811
   2812static void rpc_cb_add_xprt_release(void *calldata)
   2813{
   2814	struct rpc_cb_add_xprt_calldata *data = calldata;
   2815
   2816	xprt_put(data->xprt);
   2817	xprt_switch_put(data->xps);
   2818	kfree(data);
   2819}
   2820
   2821static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
   2822	.rpc_call_prepare = rpc_null_call_prepare,
   2823	.rpc_call_done = rpc_cb_add_xprt_done,
   2824	.rpc_release = rpc_cb_add_xprt_release,
   2825};
   2826
   2827/**
   2828 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
   2829 * @clnt: pointer to struct rpc_clnt
   2830 * @xps: pointer to struct rpc_xprt_switch,
   2831 * @xprt: pointer struct rpc_xprt
   2832 * @dummy: unused
   2833 */
   2834int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
   2835		struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
   2836		void *dummy)
   2837{
   2838	struct rpc_cb_add_xprt_calldata *data;
   2839	struct rpc_task *task;
   2840
   2841	if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
   2842		rcu_read_lock();
   2843		pr_warn("SUNRPC: reached max allowed number (%d) did not add "
   2844			"transport to server: %s\n", clnt->cl_max_connect,
   2845			rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
   2846		rcu_read_unlock();
   2847		return -EINVAL;
   2848	}
   2849
   2850	data = kmalloc(sizeof(*data), GFP_KERNEL);
   2851	if (!data)
   2852		return -ENOMEM;
   2853	data->xps = xprt_switch_get(xps);
   2854	data->xprt = xprt_get(xprt);
   2855	if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
   2856		rpc_cb_add_xprt_release(data);
   2857		goto success;
   2858	}
   2859
   2860	task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
   2861			&rpc_cb_add_xprt_call_ops, data);
   2862	data->xps->xps_nunique_destaddr_xprts++;
   2863	rpc_put_task(task);
   2864success:
   2865	return 1;
   2866}
   2867EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
   2868
   2869/**
   2870 * rpc_clnt_setup_test_and_add_xprt()
   2871 *
   2872 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
   2873 *   1) caller of the test function must dereference the rpc_xprt_switch
   2874 *   and the rpc_xprt.
   2875 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
   2876 *   the rpc_call_done routine.
   2877 *
   2878 * Upon success (return of 1), the test function adds the new
   2879 * transport to the rpc_clnt xprt switch
   2880 *
   2881 * @clnt: struct rpc_clnt to get the new transport
   2882 * @xps:  the rpc_xprt_switch to hold the new transport
   2883 * @xprt: the rpc_xprt to test
   2884 * @data: a struct rpc_add_xprt_test pointer that holds the test function
   2885 *        and test function call data
   2886 */
   2887int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
   2888				     struct rpc_xprt_switch *xps,
   2889				     struct rpc_xprt *xprt,
   2890				     void *data)
   2891{
   2892	struct rpc_task *task;
   2893	struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
   2894	int status = -EADDRINUSE;
   2895
   2896	xprt = xprt_get(xprt);
   2897	xprt_switch_get(xps);
   2898
   2899	if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
   2900		goto out_err;
   2901
   2902	/* Test the connection */
   2903	task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
   2904	if (IS_ERR(task)) {
   2905		status = PTR_ERR(task);
   2906		goto out_err;
   2907	}
   2908	status = task->tk_status;
   2909	rpc_put_task(task);
   2910
   2911	if (status < 0)
   2912		goto out_err;
   2913
   2914	/* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
   2915	xtest->add_xprt_test(clnt, xprt, xtest->data);
   2916
   2917	xprt_put(xprt);
   2918	xprt_switch_put(xps);
   2919
   2920	/* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
   2921	return 1;
   2922out_err:
   2923	xprt_put(xprt);
   2924	xprt_switch_put(xps);
   2925	pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
   2926		status, xprt->address_strings[RPC_DISPLAY_ADDR]);
   2927	return status;
   2928}
   2929EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
   2930
   2931/**
   2932 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
   2933 * @clnt: pointer to struct rpc_clnt
   2934 * @xprtargs: pointer to struct xprt_create
   2935 * @setup: callback to test and/or set up the connection
   2936 * @data: pointer to setup function data
   2937 *
   2938 * Creates a new transport using the parameters set in args and
   2939 * adds it to clnt.
   2940 * If ping is set, then test that connectivity succeeds before
   2941 * adding the new transport.
   2942 *
   2943 */
   2944int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
   2945		struct xprt_create *xprtargs,
   2946		int (*setup)(struct rpc_clnt *,
   2947			struct rpc_xprt_switch *,
   2948			struct rpc_xprt *,
   2949			void *),
   2950		void *data)
   2951{
   2952	struct rpc_xprt_switch *xps;
   2953	struct rpc_xprt *xprt;
   2954	unsigned long connect_timeout;
   2955	unsigned long reconnect_timeout;
   2956	unsigned char resvport, reuseport;
   2957	int ret = 0, ident;
   2958
   2959	rcu_read_lock();
   2960	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
   2961	xprt = xprt_iter_xprt(&clnt->cl_xpi);
   2962	if (xps == NULL || xprt == NULL) {
   2963		rcu_read_unlock();
   2964		xprt_switch_put(xps);
   2965		return -EAGAIN;
   2966	}
   2967	resvport = xprt->resvport;
   2968	reuseport = xprt->reuseport;
   2969	connect_timeout = xprt->connect_timeout;
   2970	reconnect_timeout = xprt->max_reconnect_timeout;
   2971	ident = xprt->xprt_class->ident;
   2972	rcu_read_unlock();
   2973
   2974	if (!xprtargs->ident)
   2975		xprtargs->ident = ident;
   2976	xprt = xprt_create_transport(xprtargs);
   2977	if (IS_ERR(xprt)) {
   2978		ret = PTR_ERR(xprt);
   2979		goto out_put_switch;
   2980	}
   2981	xprt->resvport = resvport;
   2982	xprt->reuseport = reuseport;
   2983	if (xprt->ops->set_connect_timeout != NULL)
   2984		xprt->ops->set_connect_timeout(xprt,
   2985				connect_timeout,
   2986				reconnect_timeout);
   2987
   2988	rpc_xprt_switch_set_roundrobin(xps);
   2989	if (setup) {
   2990		ret = setup(clnt, xps, xprt, data);
   2991		if (ret != 0)
   2992			goto out_put_xprt;
   2993	}
   2994	rpc_xprt_switch_add_xprt(xps, xprt);
   2995out_put_xprt:
   2996	xprt_put(xprt);
   2997out_put_switch:
   2998	xprt_switch_put(xps);
   2999	return ret;
   3000}
   3001EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
   3002
   3003struct connect_timeout_data {
   3004	unsigned long connect_timeout;
   3005	unsigned long reconnect_timeout;
   3006};
   3007
   3008static int
   3009rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
   3010		struct rpc_xprt *xprt,
   3011		void *data)
   3012{
   3013	struct connect_timeout_data *timeo = data;
   3014
   3015	if (xprt->ops->set_connect_timeout)
   3016		xprt->ops->set_connect_timeout(xprt,
   3017				timeo->connect_timeout,
   3018				timeo->reconnect_timeout);
   3019	return 0;
   3020}
   3021
   3022void
   3023rpc_set_connect_timeout(struct rpc_clnt *clnt,
   3024		unsigned long connect_timeout,
   3025		unsigned long reconnect_timeout)
   3026{
   3027	struct connect_timeout_data timeout = {
   3028		.connect_timeout = connect_timeout,
   3029		.reconnect_timeout = reconnect_timeout,
   3030	};
   3031	rpc_clnt_iterate_for_each_xprt(clnt,
   3032			rpc_xprt_set_connect_timeout,
   3033			&timeout);
   3034}
   3035EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
   3036
   3037void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
   3038{
   3039	rcu_read_lock();
   3040	xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
   3041	rcu_read_unlock();
   3042}
   3043EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
   3044
   3045void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
   3046{
   3047	rcu_read_lock();
   3048	rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
   3049				 xprt);
   3050	rcu_read_unlock();
   3051}
   3052EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
   3053
   3054bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
   3055				   const struct sockaddr *sap)
   3056{
   3057	struct rpc_xprt_switch *xps;
   3058	bool ret;
   3059
   3060	rcu_read_lock();
   3061	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
   3062	ret = rpc_xprt_switch_has_addr(xps, sap);
   3063	rcu_read_unlock();
   3064	return ret;
   3065}
   3066EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
   3067
   3068#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
   3069static void rpc_show_header(void)
   3070{
   3071	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
   3072		"-timeout ---ops--\n");
   3073}
   3074
   3075static void rpc_show_task(const struct rpc_clnt *clnt,
   3076			  const struct rpc_task *task)
   3077{
   3078	const char *rpc_waitq = "none";
   3079
   3080	if (RPC_IS_QUEUED(task))
   3081		rpc_waitq = rpc_qname(task->tk_waitqueue);
   3082
   3083	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
   3084		task->tk_pid, task->tk_flags, task->tk_status,
   3085		clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
   3086		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
   3087		task->tk_action, rpc_waitq);
   3088}
   3089
   3090void rpc_show_tasks(struct net *net)
   3091{
   3092	struct rpc_clnt *clnt;
   3093	struct rpc_task *task;
   3094	int header = 0;
   3095	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
   3096
   3097	spin_lock(&sn->rpc_client_lock);
   3098	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
   3099		spin_lock(&clnt->cl_lock);
   3100		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
   3101			if (!header) {
   3102				rpc_show_header();
   3103				header++;
   3104			}
   3105			rpc_show_task(clnt, task);
   3106		}
   3107		spin_unlock(&clnt->cl_lock);
   3108	}
   3109	spin_unlock(&sn->rpc_client_lock);
   3110}
   3111#endif
   3112
   3113#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
   3114static int
   3115rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
   3116		struct rpc_xprt *xprt,
   3117		void *dummy)
   3118{
   3119	return xprt_enable_swap(xprt);
   3120}
   3121
   3122int
   3123rpc_clnt_swap_activate(struct rpc_clnt *clnt)
   3124{
   3125	while (clnt != clnt->cl_parent)
   3126		clnt = clnt->cl_parent;
   3127	if (atomic_inc_return(&clnt->cl_swapper) == 1)
   3128		return rpc_clnt_iterate_for_each_xprt(clnt,
   3129				rpc_clnt_swap_activate_callback, NULL);
   3130	return 0;
   3131}
   3132EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
   3133
   3134static int
   3135rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
   3136		struct rpc_xprt *xprt,
   3137		void *dummy)
   3138{
   3139	xprt_disable_swap(xprt);
   3140	return 0;
   3141}
   3142
   3143void
   3144rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
   3145{
   3146	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
   3147		rpc_clnt_iterate_for_each_xprt(clnt,
   3148				rpc_clnt_swap_deactivate_callback, NULL);
   3149}
   3150EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
   3151#endif /* CONFIG_SUNRPC_SWAP */