cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

svc.c (41171B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/*
      3 * linux/net/sunrpc/svc.c
      4 *
      5 * High-level RPC service routines
      6 *
      7 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
      8 *
      9 * Multiple threads pools and NUMAisation
     10 * Copyright (c) 2006 Silicon Graphics, Inc.
     11 * by Greg Banks <gnb@melbourne.sgi.com>
     12 */
     13
     14#include <linux/linkage.h>
     15#include <linux/sched/signal.h>
     16#include <linux/errno.h>
     17#include <linux/net.h>
     18#include <linux/in.h>
     19#include <linux/mm.h>
     20#include <linux/interrupt.h>
     21#include <linux/module.h>
     22#include <linux/kthread.h>
     23#include <linux/slab.h>
     24
     25#include <linux/sunrpc/types.h>
     26#include <linux/sunrpc/xdr.h>
     27#include <linux/sunrpc/stats.h>
     28#include <linux/sunrpc/svcsock.h>
     29#include <linux/sunrpc/clnt.h>
     30#include <linux/sunrpc/bc_xprt.h>
     31
     32#include <trace/events/sunrpc.h>
     33
     34#include "fail.h"
     35
     36#define RPCDBG_FACILITY	RPCDBG_SVCDSP
     37
     38static void svc_unregister(const struct svc_serv *serv, struct net *net);
     39
     40#define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
     41
     42/*
     43 * Mode for mapping cpus to pools.
     44 */
     45enum {
     46	SVC_POOL_AUTO = -1,	/* choose one of the others */
     47	SVC_POOL_GLOBAL,	/* no mapping, just a single global pool
     48				 * (legacy & UP mode) */
     49	SVC_POOL_PERCPU,	/* one pool per cpu */
     50	SVC_POOL_PERNODE	/* one pool per numa node */
     51};
     52
     53/*
     54 * Structure for mapping cpus to pools and vice versa.
     55 * Setup once during sunrpc initialisation.
     56 */
     57
     58struct svc_pool_map {
     59	int count;			/* How many svc_servs use us */
     60	int mode;			/* Note: int not enum to avoid
     61					 * warnings about "enumeration value
     62					 * not handled in switch" */
     63	unsigned int npools;
     64	unsigned int *pool_to;		/* maps pool id to cpu or node */
     65	unsigned int *to_pool;		/* maps cpu or node to pool id */
     66};
     67
     68static struct svc_pool_map svc_pool_map = {
     69	.mode = SVC_POOL_DEFAULT
     70};
     71
     72static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
     73
     74static int
     75param_set_pool_mode(const char *val, const struct kernel_param *kp)
     76{
     77	int *ip = (int *)kp->arg;
     78	struct svc_pool_map *m = &svc_pool_map;
     79	int err;
     80
     81	mutex_lock(&svc_pool_map_mutex);
     82
     83	err = -EBUSY;
     84	if (m->count)
     85		goto out;
     86
     87	err = 0;
     88	if (!strncmp(val, "auto", 4))
     89		*ip = SVC_POOL_AUTO;
     90	else if (!strncmp(val, "global", 6))
     91		*ip = SVC_POOL_GLOBAL;
     92	else if (!strncmp(val, "percpu", 6))
     93		*ip = SVC_POOL_PERCPU;
     94	else if (!strncmp(val, "pernode", 7))
     95		*ip = SVC_POOL_PERNODE;
     96	else
     97		err = -EINVAL;
     98
     99out:
    100	mutex_unlock(&svc_pool_map_mutex);
    101	return err;
    102}
    103
    104static int
    105param_get_pool_mode(char *buf, const struct kernel_param *kp)
    106{
    107	int *ip = (int *)kp->arg;
    108
    109	switch (*ip)
    110	{
    111	case SVC_POOL_AUTO:
    112		return strlcpy(buf, "auto\n", 20);
    113	case SVC_POOL_GLOBAL:
    114		return strlcpy(buf, "global\n", 20);
    115	case SVC_POOL_PERCPU:
    116		return strlcpy(buf, "percpu\n", 20);
    117	case SVC_POOL_PERNODE:
    118		return strlcpy(buf, "pernode\n", 20);
    119	default:
    120		return sprintf(buf, "%d\n", *ip);
    121	}
    122}
    123
    124module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
    125		 &svc_pool_map.mode, 0644);
    126
    127/*
    128 * Detect best pool mapping mode heuristically,
    129 * according to the machine's topology.
    130 */
    131static int
    132svc_pool_map_choose_mode(void)
    133{
    134	unsigned int node;
    135
    136	if (nr_online_nodes > 1) {
    137		/*
    138		 * Actually have multiple NUMA nodes,
    139		 * so split pools on NUMA node boundaries
    140		 */
    141		return SVC_POOL_PERNODE;
    142	}
    143
    144	node = first_online_node;
    145	if (nr_cpus_node(node) > 2) {
    146		/*
    147		 * Non-trivial SMP, or CONFIG_NUMA on
    148		 * non-NUMA hardware, e.g. with a generic
    149		 * x86_64 kernel on Xeons.  In this case we
    150		 * want to divide the pools on cpu boundaries.
    151		 */
    152		return SVC_POOL_PERCPU;
    153	}
    154
    155	/* default: one global pool */
    156	return SVC_POOL_GLOBAL;
    157}
    158
    159/*
    160 * Allocate the to_pool[] and pool_to[] arrays.
    161 * Returns 0 on success or an errno.
    162 */
    163static int
    164svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
    165{
    166	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
    167	if (!m->to_pool)
    168		goto fail;
    169	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
    170	if (!m->pool_to)
    171		goto fail_free;
    172
    173	return 0;
    174
    175fail_free:
    176	kfree(m->to_pool);
    177	m->to_pool = NULL;
    178fail:
    179	return -ENOMEM;
    180}
    181
    182/*
    183 * Initialise the pool map for SVC_POOL_PERCPU mode.
    184 * Returns number of pools or <0 on error.
    185 */
    186static int
    187svc_pool_map_init_percpu(struct svc_pool_map *m)
    188{
    189	unsigned int maxpools = nr_cpu_ids;
    190	unsigned int pidx = 0;
    191	unsigned int cpu;
    192	int err;
    193
    194	err = svc_pool_map_alloc_arrays(m, maxpools);
    195	if (err)
    196		return err;
    197
    198	for_each_online_cpu(cpu) {
    199		BUG_ON(pidx >= maxpools);
    200		m->to_pool[cpu] = pidx;
    201		m->pool_to[pidx] = cpu;
    202		pidx++;
    203	}
    204	/* cpus brought online later all get mapped to pool0, sorry */
    205
    206	return pidx;
    207};
    208
    209
    210/*
    211 * Initialise the pool map for SVC_POOL_PERNODE mode.
    212 * Returns number of pools or <0 on error.
    213 */
    214static int
    215svc_pool_map_init_pernode(struct svc_pool_map *m)
    216{
    217	unsigned int maxpools = nr_node_ids;
    218	unsigned int pidx = 0;
    219	unsigned int node;
    220	int err;
    221
    222	err = svc_pool_map_alloc_arrays(m, maxpools);
    223	if (err)
    224		return err;
    225
    226	for_each_node_with_cpus(node) {
    227		/* some architectures (e.g. SN2) have cpuless nodes */
    228		BUG_ON(pidx > maxpools);
    229		m->to_pool[node] = pidx;
    230		m->pool_to[pidx] = node;
    231		pidx++;
    232	}
    233	/* nodes brought online later all get mapped to pool0, sorry */
    234
    235	return pidx;
    236}
    237
    238
    239/*
    240 * Add a reference to the global map of cpus to pools (and
    241 * vice versa) if pools are in use.
    242 * Initialise the map if we're the first user.
    243 * Returns the number of pools. If this is '1', no reference
    244 * was taken.
    245 */
    246static unsigned int
    247svc_pool_map_get(void)
    248{
    249	struct svc_pool_map *m = &svc_pool_map;
    250	int npools = -1;
    251
    252	mutex_lock(&svc_pool_map_mutex);
    253
    254	if (m->count++) {
    255		mutex_unlock(&svc_pool_map_mutex);
    256		WARN_ON_ONCE(m->npools <= 1);
    257		return m->npools;
    258	}
    259
    260	if (m->mode == SVC_POOL_AUTO)
    261		m->mode = svc_pool_map_choose_mode();
    262
    263	switch (m->mode) {
    264	case SVC_POOL_PERCPU:
    265		npools = svc_pool_map_init_percpu(m);
    266		break;
    267	case SVC_POOL_PERNODE:
    268		npools = svc_pool_map_init_pernode(m);
    269		break;
    270	}
    271
    272	if (npools <= 0) {
    273		/* default, or memory allocation failure */
    274		npools = 1;
    275		m->mode = SVC_POOL_GLOBAL;
    276	}
    277	m->npools = npools;
    278
    279	if (npools == 1)
    280		/* service is unpooled, so doesn't hold a reference */
    281		m->count--;
    282
    283	mutex_unlock(&svc_pool_map_mutex);
    284	return npools;
    285}
    286
    287/*
    288 * Drop a reference to the global map of cpus to pools, if
    289 * pools were in use, i.e. if npools > 1.
    290 * When the last reference is dropped, the map data is
    291 * freed; this allows the sysadmin to change the pool
    292 * mode using the pool_mode module option without
    293 * rebooting or re-loading sunrpc.ko.
    294 */
    295static void
    296svc_pool_map_put(int npools)
    297{
    298	struct svc_pool_map *m = &svc_pool_map;
    299
    300	if (npools <= 1)
    301		return;
    302	mutex_lock(&svc_pool_map_mutex);
    303
    304	if (!--m->count) {
    305		kfree(m->to_pool);
    306		m->to_pool = NULL;
    307		kfree(m->pool_to);
    308		m->pool_to = NULL;
    309		m->npools = 0;
    310	}
    311
    312	mutex_unlock(&svc_pool_map_mutex);
    313}
    314
    315static int svc_pool_map_get_node(unsigned int pidx)
    316{
    317	const struct svc_pool_map *m = &svc_pool_map;
    318
    319	if (m->count) {
    320		if (m->mode == SVC_POOL_PERCPU)
    321			return cpu_to_node(m->pool_to[pidx]);
    322		if (m->mode == SVC_POOL_PERNODE)
    323			return m->pool_to[pidx];
    324	}
    325	return NUMA_NO_NODE;
    326}
    327/*
    328 * Set the given thread's cpus_allowed mask so that it
    329 * will only run on cpus in the given pool.
    330 */
    331static inline void
    332svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
    333{
    334	struct svc_pool_map *m = &svc_pool_map;
    335	unsigned int node = m->pool_to[pidx];
    336
    337	/*
    338	 * The caller checks for sv_nrpools > 1, which
    339	 * implies that we've been initialized.
    340	 */
    341	WARN_ON_ONCE(m->count == 0);
    342	if (m->count == 0)
    343		return;
    344
    345	switch (m->mode) {
    346	case SVC_POOL_PERCPU:
    347	{
    348		set_cpus_allowed_ptr(task, cpumask_of(node));
    349		break;
    350	}
    351	case SVC_POOL_PERNODE:
    352	{
    353		set_cpus_allowed_ptr(task, cpumask_of_node(node));
    354		break;
    355	}
    356	}
    357}
    358
    359/**
    360 * svc_pool_for_cpu - Select pool to run a thread on this cpu
    361 * @serv: An RPC service
    362 *
    363 * Use the active CPU and the svc_pool_map's mode setting to
    364 * select the svc thread pool to use. Once initialized, the
    365 * svc_pool_map does not change.
    366 *
    367 * Return value:
    368 *   A pointer to an svc_pool
    369 */
    370struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv)
    371{
    372	struct svc_pool_map *m = &svc_pool_map;
    373	int cpu = raw_smp_processor_id();
    374	unsigned int pidx = 0;
    375
    376	if (serv->sv_nrpools <= 1)
    377		return serv->sv_pools;
    378
    379	switch (m->mode) {
    380	case SVC_POOL_PERCPU:
    381		pidx = m->to_pool[cpu];
    382		break;
    383	case SVC_POOL_PERNODE:
    384		pidx = m->to_pool[cpu_to_node(cpu)];
    385		break;
    386	}
    387
    388	return &serv->sv_pools[pidx % serv->sv_nrpools];
    389}
    390
    391int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
    392{
    393	int err;
    394
    395	err = rpcb_create_local(net);
    396	if (err)
    397		return err;
    398
    399	/* Remove any stale portmap registrations */
    400	svc_unregister(serv, net);
    401	return 0;
    402}
    403EXPORT_SYMBOL_GPL(svc_rpcb_setup);
    404
    405void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
    406{
    407	svc_unregister(serv, net);
    408	rpcb_put_local(net);
    409}
    410EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
    411
    412static int svc_uses_rpcbind(struct svc_serv *serv)
    413{
    414	struct svc_program	*progp;
    415	unsigned int		i;
    416
    417	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
    418		for (i = 0; i < progp->pg_nvers; i++) {
    419			if (progp->pg_vers[i] == NULL)
    420				continue;
    421			if (!progp->pg_vers[i]->vs_hidden)
    422				return 1;
    423		}
    424	}
    425
    426	return 0;
    427}
    428
    429int svc_bind(struct svc_serv *serv, struct net *net)
    430{
    431	if (!svc_uses_rpcbind(serv))
    432		return 0;
    433	return svc_rpcb_setup(serv, net);
    434}
    435EXPORT_SYMBOL_GPL(svc_bind);
    436
    437#if defined(CONFIG_SUNRPC_BACKCHANNEL)
    438static void
    439__svc_init_bc(struct svc_serv *serv)
    440{
    441	INIT_LIST_HEAD(&serv->sv_cb_list);
    442	spin_lock_init(&serv->sv_cb_lock);
    443	init_waitqueue_head(&serv->sv_cb_waitq);
    444}
    445#else
    446static void
    447__svc_init_bc(struct svc_serv *serv)
    448{
    449}
    450#endif
    451
    452/*
    453 * Create an RPC service
    454 */
    455static struct svc_serv *
    456__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
    457	     int (*threadfn)(void *data))
    458{
    459	struct svc_serv	*serv;
    460	unsigned int vers;
    461	unsigned int xdrsize;
    462	unsigned int i;
    463
    464	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
    465		return NULL;
    466	serv->sv_name      = prog->pg_name;
    467	serv->sv_program   = prog;
    468	kref_init(&serv->sv_refcnt);
    469	serv->sv_stats     = prog->pg_stats;
    470	if (bufsize > RPCSVC_MAXPAYLOAD)
    471		bufsize = RPCSVC_MAXPAYLOAD;
    472	serv->sv_max_payload = bufsize? bufsize : 4096;
    473	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
    474	serv->sv_threadfn = threadfn;
    475	xdrsize = 0;
    476	while (prog) {
    477		prog->pg_lovers = prog->pg_nvers-1;
    478		for (vers=0; vers<prog->pg_nvers ; vers++)
    479			if (prog->pg_vers[vers]) {
    480				prog->pg_hivers = vers;
    481				if (prog->pg_lovers > vers)
    482					prog->pg_lovers = vers;
    483				if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
    484					xdrsize = prog->pg_vers[vers]->vs_xdrsize;
    485			}
    486		prog = prog->pg_next;
    487	}
    488	serv->sv_xdrsize   = xdrsize;
    489	INIT_LIST_HEAD(&serv->sv_tempsocks);
    490	INIT_LIST_HEAD(&serv->sv_permsocks);
    491	timer_setup(&serv->sv_temptimer, NULL, 0);
    492	spin_lock_init(&serv->sv_lock);
    493
    494	__svc_init_bc(serv);
    495
    496	serv->sv_nrpools = npools;
    497	serv->sv_pools =
    498		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
    499			GFP_KERNEL);
    500	if (!serv->sv_pools) {
    501		kfree(serv);
    502		return NULL;
    503	}
    504
    505	for (i = 0; i < serv->sv_nrpools; i++) {
    506		struct svc_pool *pool = &serv->sv_pools[i];
    507
    508		dprintk("svc: initialising pool %u for %s\n",
    509				i, serv->sv_name);
    510
    511		pool->sp_id = i;
    512		INIT_LIST_HEAD(&pool->sp_sockets);
    513		INIT_LIST_HEAD(&pool->sp_all_threads);
    514		spin_lock_init(&pool->sp_lock);
    515	}
    516
    517	return serv;
    518}
    519
    520/**
    521 * svc_create - Create an RPC service
    522 * @prog: the RPC program the new service will handle
    523 * @bufsize: maximum message size for @prog
    524 * @threadfn: a function to service RPC requests for @prog
    525 *
    526 * Returns an instantiated struct svc_serv object or NULL.
    527 */
    528struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize,
    529			    int (*threadfn)(void *data))
    530{
    531	return __svc_create(prog, bufsize, 1, threadfn);
    532}
    533EXPORT_SYMBOL_GPL(svc_create);
    534
    535/**
    536 * svc_create_pooled - Create an RPC service with pooled threads
    537 * @prog: the RPC program the new service will handle
    538 * @bufsize: maximum message size for @prog
    539 * @threadfn: a function to service RPC requests for @prog
    540 *
    541 * Returns an instantiated struct svc_serv object or NULL.
    542 */
    543struct svc_serv *svc_create_pooled(struct svc_program *prog,
    544				   unsigned int bufsize,
    545				   int (*threadfn)(void *data))
    546{
    547	struct svc_serv *serv;
    548	unsigned int npools = svc_pool_map_get();
    549
    550	serv = __svc_create(prog, bufsize, npools, threadfn);
    551	if (!serv)
    552		goto out_err;
    553	return serv;
    554out_err:
    555	svc_pool_map_put(npools);
    556	return NULL;
    557}
    558EXPORT_SYMBOL_GPL(svc_create_pooled);
    559
    560/*
    561 * Destroy an RPC service. Should be called with appropriate locking to
    562 * protect sv_permsocks and sv_tempsocks.
    563 */
    564void
    565svc_destroy(struct kref *ref)
    566{
    567	struct svc_serv *serv = container_of(ref, struct svc_serv, sv_refcnt);
    568
    569	dprintk("svc: svc_destroy(%s)\n", serv->sv_program->pg_name);
    570	del_timer_sync(&serv->sv_temptimer);
    571
    572	/*
    573	 * The last user is gone and thus all sockets have to be destroyed to
    574	 * the point. Check this.
    575	 */
    576	BUG_ON(!list_empty(&serv->sv_permsocks));
    577	BUG_ON(!list_empty(&serv->sv_tempsocks));
    578
    579	cache_clean_deferred(serv);
    580
    581	svc_pool_map_put(serv->sv_nrpools);
    582
    583	kfree(serv->sv_pools);
    584	kfree(serv);
    585}
    586EXPORT_SYMBOL_GPL(svc_destroy);
    587
    588/*
    589 * Allocate an RPC server's buffer space.
    590 * We allocate pages and place them in rq_pages.
    591 */
    592static int
    593svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
    594{
    595	unsigned int pages, arghi;
    596
    597	/* bc_xprt uses fore channel allocated buffers */
    598	if (svc_is_backchannel(rqstp))
    599		return 1;
    600
    601	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
    602				       * We assume one is at most one page
    603				       */
    604	arghi = 0;
    605	WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
    606	if (pages > RPCSVC_MAXPAGES)
    607		pages = RPCSVC_MAXPAGES;
    608	while (pages) {
    609		struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
    610		if (!p)
    611			break;
    612		rqstp->rq_pages[arghi++] = p;
    613		pages--;
    614	}
    615	return pages == 0;
    616}
    617
    618/*
    619 * Release an RPC server buffer
    620 */
    621static void
    622svc_release_buffer(struct svc_rqst *rqstp)
    623{
    624	unsigned int i;
    625
    626	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
    627		if (rqstp->rq_pages[i])
    628			put_page(rqstp->rq_pages[i]);
    629}
    630
    631struct svc_rqst *
    632svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
    633{
    634	struct svc_rqst	*rqstp;
    635
    636	rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
    637	if (!rqstp)
    638		return rqstp;
    639
    640	__set_bit(RQ_BUSY, &rqstp->rq_flags);
    641	spin_lock_init(&rqstp->rq_lock);
    642	rqstp->rq_server = serv;
    643	rqstp->rq_pool = pool;
    644
    645	rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
    646	if (!rqstp->rq_scratch_page)
    647		goto out_enomem;
    648
    649	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
    650	if (!rqstp->rq_argp)
    651		goto out_enomem;
    652
    653	rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
    654	if (!rqstp->rq_resp)
    655		goto out_enomem;
    656
    657	if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
    658		goto out_enomem;
    659
    660	return rqstp;
    661out_enomem:
    662	svc_rqst_free(rqstp);
    663	return NULL;
    664}
    665EXPORT_SYMBOL_GPL(svc_rqst_alloc);
    666
    667static struct svc_rqst *
    668svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
    669{
    670	struct svc_rqst	*rqstp;
    671
    672	rqstp = svc_rqst_alloc(serv, pool, node);
    673	if (!rqstp)
    674		return ERR_PTR(-ENOMEM);
    675
    676	svc_get(serv);
    677	spin_lock_bh(&serv->sv_lock);
    678	serv->sv_nrthreads += 1;
    679	spin_unlock_bh(&serv->sv_lock);
    680
    681	spin_lock_bh(&pool->sp_lock);
    682	pool->sp_nrthreads++;
    683	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
    684	spin_unlock_bh(&pool->sp_lock);
    685	return rqstp;
    686}
    687
    688/*
    689 * Choose a pool in which to create a new thread, for svc_set_num_threads
    690 */
    691static inline struct svc_pool *
    692choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
    693{
    694	if (pool != NULL)
    695		return pool;
    696
    697	return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
    698}
    699
    700/*
    701 * Choose a thread to kill, for svc_set_num_threads
    702 */
    703static inline struct task_struct *
    704choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
    705{
    706	unsigned int i;
    707	struct task_struct *task = NULL;
    708
    709	if (pool != NULL) {
    710		spin_lock_bh(&pool->sp_lock);
    711	} else {
    712		/* choose a pool in round-robin fashion */
    713		for (i = 0; i < serv->sv_nrpools; i++) {
    714			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
    715			spin_lock_bh(&pool->sp_lock);
    716			if (!list_empty(&pool->sp_all_threads))
    717				goto found_pool;
    718			spin_unlock_bh(&pool->sp_lock);
    719		}
    720		return NULL;
    721	}
    722
    723found_pool:
    724	if (!list_empty(&pool->sp_all_threads)) {
    725		struct svc_rqst *rqstp;
    726
    727		/*
    728		 * Remove from the pool->sp_all_threads list
    729		 * so we don't try to kill it again.
    730		 */
    731		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
    732		set_bit(RQ_VICTIM, &rqstp->rq_flags);
    733		list_del_rcu(&rqstp->rq_all);
    734		task = rqstp->rq_task;
    735	}
    736	spin_unlock_bh(&pool->sp_lock);
    737
    738	return task;
    739}
    740
    741/* create new threads */
    742static int
    743svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
    744{
    745	struct svc_rqst	*rqstp;
    746	struct task_struct *task;
    747	struct svc_pool *chosen_pool;
    748	unsigned int state = serv->sv_nrthreads-1;
    749	int node;
    750
    751	do {
    752		nrservs--;
    753		chosen_pool = choose_pool(serv, pool, &state);
    754
    755		node = svc_pool_map_get_node(chosen_pool->sp_id);
    756		rqstp = svc_prepare_thread(serv, chosen_pool, node);
    757		if (IS_ERR(rqstp))
    758			return PTR_ERR(rqstp);
    759
    760		task = kthread_create_on_node(serv->sv_threadfn, rqstp,
    761					      node, "%s", serv->sv_name);
    762		if (IS_ERR(task)) {
    763			svc_exit_thread(rqstp);
    764			return PTR_ERR(task);
    765		}
    766
    767		rqstp->rq_task = task;
    768		if (serv->sv_nrpools > 1)
    769			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
    770
    771		svc_sock_update_bufs(serv);
    772		wake_up_process(task);
    773	} while (nrservs > 0);
    774
    775	return 0;
    776}
    777
    778/*
    779 * Create or destroy enough new threads to make the number
    780 * of threads the given number.  If `pool' is non-NULL, applies
    781 * only to threads in that pool, otherwise round-robins between
    782 * all pools.  Caller must ensure that mutual exclusion between this and
    783 * server startup or shutdown.
    784 */
    785
    786/* destroy old threads */
    787static int
    788svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
    789{
    790	struct task_struct *task;
    791	unsigned int state = serv->sv_nrthreads-1;
    792
    793	/* destroy old threads */
    794	do {
    795		task = choose_victim(serv, pool, &state);
    796		if (task == NULL)
    797			break;
    798		kthread_stop(task);
    799		nrservs++;
    800	} while (nrservs < 0);
    801	return 0;
    802}
    803
    804int
    805svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
    806{
    807	if (pool == NULL) {
    808		nrservs -= serv->sv_nrthreads;
    809	} else {
    810		spin_lock_bh(&pool->sp_lock);
    811		nrservs -= pool->sp_nrthreads;
    812		spin_unlock_bh(&pool->sp_lock);
    813	}
    814
    815	if (nrservs > 0)
    816		return svc_start_kthreads(serv, pool, nrservs);
    817	if (nrservs < 0)
    818		return svc_stop_kthreads(serv, pool, nrservs);
    819	return 0;
    820}
    821EXPORT_SYMBOL_GPL(svc_set_num_threads);
    822
    823/**
    824 * svc_rqst_replace_page - Replace one page in rq_pages[]
    825 * @rqstp: svc_rqst with pages to replace
    826 * @page: replacement page
    827 *
    828 * When replacing a page in rq_pages, batch the release of the
    829 * replaced pages to avoid hammering the page allocator.
    830 */
    831void svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
    832{
    833	if (*rqstp->rq_next_page) {
    834		if (!pagevec_space(&rqstp->rq_pvec))
    835			__pagevec_release(&rqstp->rq_pvec);
    836		pagevec_add(&rqstp->rq_pvec, *rqstp->rq_next_page);
    837	}
    838
    839	get_page(page);
    840	*(rqstp->rq_next_page++) = page;
    841}
    842EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
    843
    844/*
    845 * Called from a server thread as it's exiting. Caller must hold the "service
    846 * mutex" for the service.
    847 */
    848void
    849svc_rqst_free(struct svc_rqst *rqstp)
    850{
    851	svc_release_buffer(rqstp);
    852	if (rqstp->rq_scratch_page)
    853		put_page(rqstp->rq_scratch_page);
    854	kfree(rqstp->rq_resp);
    855	kfree(rqstp->rq_argp);
    856	kfree(rqstp->rq_auth_data);
    857	kfree_rcu(rqstp, rq_rcu_head);
    858}
    859EXPORT_SYMBOL_GPL(svc_rqst_free);
    860
    861void
    862svc_exit_thread(struct svc_rqst *rqstp)
    863{
    864	struct svc_serv	*serv = rqstp->rq_server;
    865	struct svc_pool	*pool = rqstp->rq_pool;
    866
    867	spin_lock_bh(&pool->sp_lock);
    868	pool->sp_nrthreads--;
    869	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
    870		list_del_rcu(&rqstp->rq_all);
    871	spin_unlock_bh(&pool->sp_lock);
    872
    873	spin_lock_bh(&serv->sv_lock);
    874	serv->sv_nrthreads -= 1;
    875	spin_unlock_bh(&serv->sv_lock);
    876	svc_sock_update_bufs(serv);
    877
    878	svc_rqst_free(rqstp);
    879
    880	svc_put(serv);
    881}
    882EXPORT_SYMBOL_GPL(svc_exit_thread);
    883
    884/*
    885 * Register an "inet" protocol family netid with the local
    886 * rpcbind daemon via an rpcbind v4 SET request.
    887 *
    888 * No netconfig infrastructure is available in the kernel, so
    889 * we map IP_ protocol numbers to netids by hand.
    890 *
    891 * Returns zero on success; a negative errno value is returned
    892 * if any error occurs.
    893 */
    894static int __svc_rpcb_register4(struct net *net, const u32 program,
    895				const u32 version,
    896				const unsigned short protocol,
    897				const unsigned short port)
    898{
    899	const struct sockaddr_in sin = {
    900		.sin_family		= AF_INET,
    901		.sin_addr.s_addr	= htonl(INADDR_ANY),
    902		.sin_port		= htons(port),
    903	};
    904	const char *netid;
    905	int error;
    906
    907	switch (protocol) {
    908	case IPPROTO_UDP:
    909		netid = RPCBIND_NETID_UDP;
    910		break;
    911	case IPPROTO_TCP:
    912		netid = RPCBIND_NETID_TCP;
    913		break;
    914	default:
    915		return -ENOPROTOOPT;
    916	}
    917
    918	error = rpcb_v4_register(net, program, version,
    919					(const struct sockaddr *)&sin, netid);
    920
    921	/*
    922	 * User space didn't support rpcbind v4, so retry this
    923	 * registration request with the legacy rpcbind v2 protocol.
    924	 */
    925	if (error == -EPROTONOSUPPORT)
    926		error = rpcb_register(net, program, version, protocol, port);
    927
    928	return error;
    929}
    930
    931#if IS_ENABLED(CONFIG_IPV6)
    932/*
    933 * Register an "inet6" protocol family netid with the local
    934 * rpcbind daemon via an rpcbind v4 SET request.
    935 *
    936 * No netconfig infrastructure is available in the kernel, so
    937 * we map IP_ protocol numbers to netids by hand.
    938 *
    939 * Returns zero on success; a negative errno value is returned
    940 * if any error occurs.
    941 */
    942static int __svc_rpcb_register6(struct net *net, const u32 program,
    943				const u32 version,
    944				const unsigned short protocol,
    945				const unsigned short port)
    946{
    947	const struct sockaddr_in6 sin6 = {
    948		.sin6_family		= AF_INET6,
    949		.sin6_addr		= IN6ADDR_ANY_INIT,
    950		.sin6_port		= htons(port),
    951	};
    952	const char *netid;
    953	int error;
    954
    955	switch (protocol) {
    956	case IPPROTO_UDP:
    957		netid = RPCBIND_NETID_UDP6;
    958		break;
    959	case IPPROTO_TCP:
    960		netid = RPCBIND_NETID_TCP6;
    961		break;
    962	default:
    963		return -ENOPROTOOPT;
    964	}
    965
    966	error = rpcb_v4_register(net, program, version,
    967					(const struct sockaddr *)&sin6, netid);
    968
    969	/*
    970	 * User space didn't support rpcbind version 4, so we won't
    971	 * use a PF_INET6 listener.
    972	 */
    973	if (error == -EPROTONOSUPPORT)
    974		error = -EAFNOSUPPORT;
    975
    976	return error;
    977}
    978#endif	/* IS_ENABLED(CONFIG_IPV6) */
    979
    980/*
    981 * Register a kernel RPC service via rpcbind version 4.
    982 *
    983 * Returns zero on success; a negative errno value is returned
    984 * if any error occurs.
    985 */
    986static int __svc_register(struct net *net, const char *progname,
    987			  const u32 program, const u32 version,
    988			  const int family,
    989			  const unsigned short protocol,
    990			  const unsigned short port)
    991{
    992	int error = -EAFNOSUPPORT;
    993
    994	switch (family) {
    995	case PF_INET:
    996		error = __svc_rpcb_register4(net, program, version,
    997						protocol, port);
    998		break;
    999#if IS_ENABLED(CONFIG_IPV6)
   1000	case PF_INET6:
   1001		error = __svc_rpcb_register6(net, program, version,
   1002						protocol, port);
   1003#endif
   1004	}
   1005
   1006	trace_svc_register(progname, version, protocol, port, family, error);
   1007	return error;
   1008}
   1009
   1010int svc_rpcbind_set_version(struct net *net,
   1011			    const struct svc_program *progp,
   1012			    u32 version, int family,
   1013			    unsigned short proto,
   1014			    unsigned short port)
   1015{
   1016	return __svc_register(net, progp->pg_name, progp->pg_prog,
   1017				version, family, proto, port);
   1018
   1019}
   1020EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);
   1021
   1022int svc_generic_rpcbind_set(struct net *net,
   1023			    const struct svc_program *progp,
   1024			    u32 version, int family,
   1025			    unsigned short proto,
   1026			    unsigned short port)
   1027{
   1028	const struct svc_version *vers = progp->pg_vers[version];
   1029	int error;
   1030
   1031	if (vers == NULL)
   1032		return 0;
   1033
   1034	if (vers->vs_hidden) {
   1035		trace_svc_noregister(progp->pg_name, version, proto,
   1036				     port, family, 0);
   1037		return 0;
   1038	}
   1039
   1040	/*
   1041	 * Don't register a UDP port if we need congestion
   1042	 * control.
   1043	 */
   1044	if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
   1045		return 0;
   1046
   1047	error = svc_rpcbind_set_version(net, progp, version,
   1048					family, proto, port);
   1049
   1050	return (vers->vs_rpcb_optnl) ? 0 : error;
   1051}
   1052EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
   1053
   1054/**
   1055 * svc_register - register an RPC service with the local portmapper
   1056 * @serv: svc_serv struct for the service to register
   1057 * @net: net namespace for the service to register
   1058 * @family: protocol family of service's listener socket
   1059 * @proto: transport protocol number to advertise
   1060 * @port: port to advertise
   1061 *
   1062 * Service is registered for any address in the passed-in protocol family
   1063 */
   1064int svc_register(const struct svc_serv *serv, struct net *net,
   1065		 const int family, const unsigned short proto,
   1066		 const unsigned short port)
   1067{
   1068	struct svc_program	*progp;
   1069	unsigned int		i;
   1070	int			error = 0;
   1071
   1072	WARN_ON_ONCE(proto == 0 && port == 0);
   1073	if (proto == 0 && port == 0)
   1074		return -EINVAL;
   1075
   1076	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
   1077		for (i = 0; i < progp->pg_nvers; i++) {
   1078
   1079			error = progp->pg_rpcbind_set(net, progp, i,
   1080					family, proto, port);
   1081			if (error < 0) {
   1082				printk(KERN_WARNING "svc: failed to register "
   1083					"%sv%u RPC service (errno %d).\n",
   1084					progp->pg_name, i, -error);
   1085				break;
   1086			}
   1087		}
   1088	}
   1089
   1090	return error;
   1091}
   1092
   1093/*
   1094 * If user space is running rpcbind, it should take the v4 UNSET
   1095 * and clear everything for this [program, version].  If user space
   1096 * is running portmap, it will reject the v4 UNSET, but won't have
   1097 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
   1098 * in this case to clear all existing entries for [program, version].
   1099 */
   1100static void __svc_unregister(struct net *net, const u32 program, const u32 version,
   1101			     const char *progname)
   1102{
   1103	int error;
   1104
   1105	error = rpcb_v4_register(net, program, version, NULL, "");
   1106
   1107	/*
   1108	 * User space didn't support rpcbind v4, so retry this
   1109	 * request with the legacy rpcbind v2 protocol.
   1110	 */
   1111	if (error == -EPROTONOSUPPORT)
   1112		error = rpcb_register(net, program, version, 0, 0);
   1113
   1114	trace_svc_unregister(progname, version, error);
   1115}
   1116
   1117/*
   1118 * All netids, bind addresses and ports registered for [program, version]
   1119 * are removed from the local rpcbind database (if the service is not
   1120 * hidden) to make way for a new instance of the service.
   1121 *
   1122 * The result of unregistration is reported via dprintk for those who want
   1123 * verification of the result, but is otherwise not important.
   1124 */
   1125static void svc_unregister(const struct svc_serv *serv, struct net *net)
   1126{
   1127	struct svc_program *progp;
   1128	unsigned long flags;
   1129	unsigned int i;
   1130
   1131	clear_thread_flag(TIF_SIGPENDING);
   1132
   1133	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
   1134		for (i = 0; i < progp->pg_nvers; i++) {
   1135			if (progp->pg_vers[i] == NULL)
   1136				continue;
   1137			if (progp->pg_vers[i]->vs_hidden)
   1138				continue;
   1139			__svc_unregister(net, progp->pg_prog, i, progp->pg_name);
   1140		}
   1141	}
   1142
   1143	spin_lock_irqsave(&current->sighand->siglock, flags);
   1144	recalc_sigpending();
   1145	spin_unlock_irqrestore(&current->sighand->siglock, flags);
   1146}
   1147
   1148/*
   1149 * dprintk the given error with the address of the client that caused it.
   1150 */
   1151#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
   1152static __printf(2, 3)
   1153void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
   1154{
   1155	struct va_format vaf;
   1156	va_list args;
   1157	char 	buf[RPC_MAX_ADDRBUFLEN];
   1158
   1159	va_start(args, fmt);
   1160
   1161	vaf.fmt = fmt;
   1162	vaf.va = &args;
   1163
   1164	dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
   1165
   1166	va_end(args);
   1167}
   1168#else
   1169static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
   1170#endif
   1171
   1172__be32
   1173svc_generic_init_request(struct svc_rqst *rqstp,
   1174		const struct svc_program *progp,
   1175		struct svc_process_info *ret)
   1176{
   1177	const struct svc_version *versp = NULL;	/* compiler food */
   1178	const struct svc_procedure *procp = NULL;
   1179
   1180	if (rqstp->rq_vers >= progp->pg_nvers )
   1181		goto err_bad_vers;
   1182	versp = progp->pg_vers[rqstp->rq_vers];
   1183	if (!versp)
   1184		goto err_bad_vers;
   1185
   1186	/*
   1187	 * Some protocol versions (namely NFSv4) require some form of
   1188	 * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
   1189	 * In other words, UDP is not allowed. We mark those when setting
   1190	 * up the svc_xprt, and verify that here.
   1191	 *
   1192	 * The spec is not very clear about what error should be returned
   1193	 * when someone tries to access a server that is listening on UDP
   1194	 * for lower versions. RPC_PROG_MISMATCH seems to be the closest
   1195	 * fit.
   1196	 */
   1197	if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
   1198	    !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
   1199		goto err_bad_vers;
   1200
   1201	if (rqstp->rq_proc >= versp->vs_nproc)
   1202		goto err_bad_proc;
   1203	rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
   1204	if (!procp)
   1205		goto err_bad_proc;
   1206
   1207	/* Initialize storage for argp and resp */
   1208	memset(rqstp->rq_argp, 0, procp->pc_argsize);
   1209	memset(rqstp->rq_resp, 0, procp->pc_ressize);
   1210
   1211	/* Bump per-procedure stats counter */
   1212	versp->vs_count[rqstp->rq_proc]++;
   1213
   1214	ret->dispatch = versp->vs_dispatch;
   1215	return rpc_success;
   1216err_bad_vers:
   1217	ret->mismatch.lovers = progp->pg_lovers;
   1218	ret->mismatch.hivers = progp->pg_hivers;
   1219	return rpc_prog_mismatch;
   1220err_bad_proc:
   1221	return rpc_proc_unavail;
   1222}
   1223EXPORT_SYMBOL_GPL(svc_generic_init_request);
   1224
   1225/*
   1226 * Common routine for processing the RPC request.
   1227 */
   1228static int
   1229svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
   1230{
   1231	struct svc_program	*progp;
   1232	const struct svc_procedure *procp = NULL;
   1233	struct svc_serv		*serv = rqstp->rq_server;
   1234	struct svc_process_info process;
   1235	__be32			*statp;
   1236	u32			prog, vers;
   1237	__be32			rpc_stat;
   1238	int			auth_res, rc;
   1239	__be32			*reply_statp;
   1240
   1241	rpc_stat = rpc_success;
   1242
   1243	if (argv->iov_len < 6*4)
   1244		goto err_short_len;
   1245
   1246	/* Will be turned off by GSS integrity and privacy services */
   1247	__set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
   1248	/* Will be turned off only when NFSv4 Sessions are used */
   1249	__set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
   1250	__clear_bit(RQ_DROPME, &rqstp->rq_flags);
   1251
   1252	svc_putu32(resv, rqstp->rq_xid);
   1253
   1254	vers = svc_getnl(argv);
   1255
   1256	/* First words of reply: */
   1257	svc_putnl(resv, 1);		/* REPLY */
   1258
   1259	if (vers != 2)		/* RPC version number */
   1260		goto err_bad_rpc;
   1261
   1262	/* Save position in case we later decide to reject: */
   1263	reply_statp = resv->iov_base + resv->iov_len;
   1264
   1265	svc_putnl(resv, 0);		/* ACCEPT */
   1266
   1267	rqstp->rq_prog = prog = svc_getnl(argv);	/* program number */
   1268	rqstp->rq_vers = svc_getnl(argv);	/* version number */
   1269	rqstp->rq_proc = svc_getnl(argv);	/* procedure number */
   1270
   1271	for (progp = serv->sv_program; progp; progp = progp->pg_next)
   1272		if (prog == progp->pg_prog)
   1273			break;
   1274
   1275	/*
   1276	 * Decode auth data, and add verifier to reply buffer.
   1277	 * We do this before anything else in order to get a decent
   1278	 * auth verifier.
   1279	 */
   1280	auth_res = svc_authenticate(rqstp);
   1281	/* Also give the program a chance to reject this call: */
   1282	if (auth_res == SVC_OK && progp)
   1283		auth_res = progp->pg_authenticate(rqstp);
   1284	if (auth_res != SVC_OK)
   1285		trace_svc_authenticate(rqstp, auth_res);
   1286	switch (auth_res) {
   1287	case SVC_OK:
   1288		break;
   1289	case SVC_GARBAGE:
   1290		goto err_garbage;
   1291	case SVC_SYSERR:
   1292		rpc_stat = rpc_system_err;
   1293		goto err_bad;
   1294	case SVC_DENIED:
   1295		goto err_bad_auth;
   1296	case SVC_CLOSE:
   1297		goto close;
   1298	case SVC_DROP:
   1299		goto dropit;
   1300	case SVC_COMPLETE:
   1301		goto sendit;
   1302	}
   1303
   1304	if (progp == NULL)
   1305		goto err_bad_prog;
   1306
   1307	rpc_stat = progp->pg_init_request(rqstp, progp, &process);
   1308	switch (rpc_stat) {
   1309	case rpc_success:
   1310		break;
   1311	case rpc_prog_unavail:
   1312		goto err_bad_prog;
   1313	case rpc_prog_mismatch:
   1314		goto err_bad_vers;
   1315	case rpc_proc_unavail:
   1316		goto err_bad_proc;
   1317	}
   1318
   1319	procp = rqstp->rq_procinfo;
   1320	/* Should this check go into the dispatcher? */
   1321	if (!procp || !procp->pc_func)
   1322		goto err_bad_proc;
   1323
   1324	/* Syntactic check complete */
   1325	serv->sv_stats->rpccnt++;
   1326	trace_svc_process(rqstp, progp->pg_name);
   1327
   1328	/* Build the reply header. */
   1329	statp = resv->iov_base +resv->iov_len;
   1330	svc_putnl(resv, RPC_SUCCESS);
   1331
   1332	/* un-reserve some of the out-queue now that we have a
   1333	 * better idea of reply size
   1334	 */
   1335	if (procp->pc_xdrressize)
   1336		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
   1337
   1338	/* Call the function that processes the request. */
   1339	rc = process.dispatch(rqstp, statp);
   1340	if (procp->pc_release)
   1341		procp->pc_release(rqstp);
   1342	if (!rc)
   1343		goto dropit;
   1344	if (rqstp->rq_auth_stat != rpc_auth_ok)
   1345		goto err_bad_auth;
   1346
   1347	/* Check RPC status result */
   1348	if (*statp != rpc_success)
   1349		resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
   1350
   1351	if (procp->pc_encode == NULL)
   1352		goto dropit;
   1353
   1354 sendit:
   1355	if (svc_authorise(rqstp))
   1356		goto close_xprt;
   1357	return 1;		/* Caller can now send it */
   1358
   1359 dropit:
   1360	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
   1361	dprintk("svc: svc_process dropit\n");
   1362	return 0;
   1363
   1364 close:
   1365	svc_authorise(rqstp);
   1366close_xprt:
   1367	if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
   1368		svc_xprt_close(rqstp->rq_xprt);
   1369	dprintk("svc: svc_process close\n");
   1370	return 0;
   1371
   1372err_short_len:
   1373	svc_printk(rqstp, "short len %zd, dropping request\n",
   1374			argv->iov_len);
   1375	goto close_xprt;
   1376
   1377err_bad_rpc:
   1378	serv->sv_stats->rpcbadfmt++;
   1379	svc_putnl(resv, 1);	/* REJECT */
   1380	svc_putnl(resv, 0);	/* RPC_MISMATCH */
   1381	svc_putnl(resv, 2);	/* Only RPCv2 supported */
   1382	svc_putnl(resv, 2);
   1383	goto sendit;
   1384
   1385err_bad_auth:
   1386	dprintk("svc: authentication failed (%d)\n",
   1387		be32_to_cpu(rqstp->rq_auth_stat));
   1388	serv->sv_stats->rpcbadauth++;
   1389	/* Restore write pointer to location of accept status: */
   1390	xdr_ressize_check(rqstp, reply_statp);
   1391	svc_putnl(resv, 1);	/* REJECT */
   1392	svc_putnl(resv, 1);	/* AUTH_ERROR */
   1393	svc_putu32(resv, rqstp->rq_auth_stat);	/* status */
   1394	goto sendit;
   1395
   1396err_bad_prog:
   1397	dprintk("svc: unknown program %d\n", prog);
   1398	serv->sv_stats->rpcbadfmt++;
   1399	svc_putnl(resv, RPC_PROG_UNAVAIL);
   1400	goto sendit;
   1401
   1402err_bad_vers:
   1403	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
   1404		       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
   1405
   1406	serv->sv_stats->rpcbadfmt++;
   1407	svc_putnl(resv, RPC_PROG_MISMATCH);
   1408	svc_putnl(resv, process.mismatch.lovers);
   1409	svc_putnl(resv, process.mismatch.hivers);
   1410	goto sendit;
   1411
   1412err_bad_proc:
   1413	svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
   1414
   1415	serv->sv_stats->rpcbadfmt++;
   1416	svc_putnl(resv, RPC_PROC_UNAVAIL);
   1417	goto sendit;
   1418
   1419err_garbage:
   1420	svc_printk(rqstp, "failed to decode args\n");
   1421
   1422	rpc_stat = rpc_garbage_args;
   1423err_bad:
   1424	serv->sv_stats->rpcbadfmt++;
   1425	svc_putnl(resv, ntohl(rpc_stat));
   1426	goto sendit;
   1427}
   1428
   1429/*
   1430 * Process the RPC request.
   1431 */
   1432int
   1433svc_process(struct svc_rqst *rqstp)
   1434{
   1435	struct kvec		*argv = &rqstp->rq_arg.head[0];
   1436	struct kvec		*resv = &rqstp->rq_res.head[0];
   1437	struct svc_serv		*serv = rqstp->rq_server;
   1438	u32			dir;
   1439
   1440#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
   1441	if (!fail_sunrpc.ignore_server_disconnect &&
   1442	    should_fail(&fail_sunrpc.attr, 1))
   1443		svc_xprt_deferred_close(rqstp->rq_xprt);
   1444#endif
   1445
   1446	/*
   1447	 * Setup response xdr_buf.
   1448	 * Initially it has just one page
   1449	 */
   1450	rqstp->rq_next_page = &rqstp->rq_respages[1];
   1451	resv->iov_base = page_address(rqstp->rq_respages[0]);
   1452	resv->iov_len = 0;
   1453	rqstp->rq_res.pages = rqstp->rq_respages + 1;
   1454	rqstp->rq_res.len = 0;
   1455	rqstp->rq_res.page_base = 0;
   1456	rqstp->rq_res.page_len = 0;
   1457	rqstp->rq_res.buflen = PAGE_SIZE;
   1458	rqstp->rq_res.tail[0].iov_base = NULL;
   1459	rqstp->rq_res.tail[0].iov_len = 0;
   1460
   1461	dir  = svc_getnl(argv);
   1462	if (dir != 0) {
   1463		/* direction != CALL */
   1464		svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
   1465		serv->sv_stats->rpcbadfmt++;
   1466		goto out_drop;
   1467	}
   1468
   1469	/* Returns 1 for send, 0 for drop */
   1470	if (likely(svc_process_common(rqstp, argv, resv)))
   1471		return svc_send(rqstp);
   1472
   1473out_drop:
   1474	svc_drop(rqstp);
   1475	return 0;
   1476}
   1477EXPORT_SYMBOL_GPL(svc_process);
   1478
   1479#if defined(CONFIG_SUNRPC_BACKCHANNEL)
   1480/*
   1481 * Process a backchannel RPC request that arrived over an existing
   1482 * outbound connection
   1483 */
   1484int
   1485bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
   1486	       struct svc_rqst *rqstp)
   1487{
   1488	struct kvec	*argv = &rqstp->rq_arg.head[0];
   1489	struct kvec	*resv = &rqstp->rq_res.head[0];
   1490	struct rpc_task *task;
   1491	int proc_error;
   1492	int error;
   1493
   1494	dprintk("svc: %s(%p)\n", __func__, req);
   1495
   1496	/* Build the svc_rqst used by the common processing routine */
   1497	rqstp->rq_xid = req->rq_xid;
   1498	rqstp->rq_prot = req->rq_xprt->prot;
   1499	rqstp->rq_server = serv;
   1500	rqstp->rq_bc_net = req->rq_xprt->xprt_net;
   1501
   1502	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
   1503	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
   1504	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
   1505	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
   1506
   1507	/* Adjust the argument buffer length */
   1508	rqstp->rq_arg.len = req->rq_private_buf.len;
   1509	if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
   1510		rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
   1511		rqstp->rq_arg.page_len = 0;
   1512	} else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
   1513			rqstp->rq_arg.page_len)
   1514		rqstp->rq_arg.page_len = rqstp->rq_arg.len -
   1515			rqstp->rq_arg.head[0].iov_len;
   1516	else
   1517		rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
   1518			rqstp->rq_arg.page_len;
   1519
   1520	/* reset result send buffer "put" position */
   1521	resv->iov_len = 0;
   1522
   1523	/*
   1524	 * Skip the next two words because they've already been
   1525	 * processed in the transport
   1526	 */
   1527	svc_getu32(argv);	/* XID */
   1528	svc_getnl(argv);	/* CALLDIR */
   1529
   1530	/* Parse and execute the bc call */
   1531	proc_error = svc_process_common(rqstp, argv, resv);
   1532
   1533	atomic_dec(&req->rq_xprt->bc_slot_count);
   1534	if (!proc_error) {
   1535		/* Processing error: drop the request */
   1536		xprt_free_bc_request(req);
   1537		error = -EINVAL;
   1538		goto out;
   1539	}
   1540	/* Finally, send the reply synchronously */
   1541	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
   1542	task = rpc_run_bc_task(req);
   1543	if (IS_ERR(task)) {
   1544		error = PTR_ERR(task);
   1545		goto out;
   1546	}
   1547
   1548	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
   1549	error = task->tk_status;
   1550	rpc_put_task(task);
   1551
   1552out:
   1553	dprintk("svc: %s(), error=%d\n", __func__, error);
   1554	return error;
   1555}
   1556EXPORT_SYMBOL_GPL(bc_svc_process);
   1557#endif /* CONFIG_SUNRPC_BACKCHANNEL */
   1558
   1559/*
   1560 * Return (transport-specific) limit on the rpc payload.
   1561 */
   1562u32 svc_max_payload(const struct svc_rqst *rqstp)
   1563{
   1564	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
   1565
   1566	if (rqstp->rq_server->sv_max_payload < max)
   1567		max = rqstp->rq_server->sv_max_payload;
   1568	return max;
   1569}
   1570EXPORT_SYMBOL_GPL(svc_max_payload);
   1571
   1572/**
   1573 * svc_proc_name - Return RPC procedure name in string form
   1574 * @rqstp: svc_rqst to operate on
   1575 *
   1576 * Return value:
   1577 *   Pointer to a NUL-terminated string
   1578 */
   1579const char *svc_proc_name(const struct svc_rqst *rqstp)
   1580{
   1581	if (rqstp && rqstp->rq_procinfo)
   1582		return rqstp->rq_procinfo->pc_name;
   1583	return "unknown";
   1584}
   1585
   1586
   1587/**
   1588 * svc_encode_result_payload - mark a range of bytes as a result payload
   1589 * @rqstp: svc_rqst to operate on
   1590 * @offset: payload's byte offset in rqstp->rq_res
   1591 * @length: size of payload, in bytes
   1592 *
   1593 * Returns zero on success, or a negative errno if a permanent
   1594 * error occurred.
   1595 */
   1596int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
   1597			      unsigned int length)
   1598{
   1599	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
   1600							   length);
   1601}
   1602EXPORT_SYMBOL_GPL(svc_encode_result_payload);
   1603
   1604/**
   1605 * svc_fill_write_vector - Construct data argument for VFS write call
   1606 * @rqstp: svc_rqst to operate on
   1607 * @payload: xdr_buf containing only the write data payload
   1608 *
   1609 * Fills in rqstp::rq_vec, and returns the number of elements.
   1610 */
   1611unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
   1612				   struct xdr_buf *payload)
   1613{
   1614	struct page **pages = payload->pages;
   1615	struct kvec *first = payload->head;
   1616	struct kvec *vec = rqstp->rq_vec;
   1617	size_t total = payload->len;
   1618	unsigned int i;
   1619
   1620	/* Some types of transport can present the write payload
   1621	 * entirely in rq_arg.pages. In this case, @first is empty.
   1622	 */
   1623	i = 0;
   1624	if (first->iov_len) {
   1625		vec[i].iov_base = first->iov_base;
   1626		vec[i].iov_len = min_t(size_t, total, first->iov_len);
   1627		total -= vec[i].iov_len;
   1628		++i;
   1629	}
   1630
   1631	while (total) {
   1632		vec[i].iov_base = page_address(*pages);
   1633		vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
   1634		total -= vec[i].iov_len;
   1635		++i;
   1636		++pages;
   1637	}
   1638
   1639	WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
   1640	return i;
   1641}
   1642EXPORT_SYMBOL_GPL(svc_fill_write_vector);
   1643
   1644/**
   1645 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
   1646 * @rqstp: svc_rqst to operate on
   1647 * @first: buffer containing first section of pathname
   1648 * @p: buffer containing remaining section of pathname
   1649 * @total: total length of the pathname argument
   1650 *
   1651 * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
   1652 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
   1653 * the returned string.
   1654 */
   1655char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
   1656				void *p, size_t total)
   1657{
   1658	size_t len, remaining;
   1659	char *result, *dst;
   1660
   1661	result = kmalloc(total + 1, GFP_KERNEL);
   1662	if (!result)
   1663		return ERR_PTR(-ESERVERFAULT);
   1664
   1665	dst = result;
   1666	remaining = total;
   1667
   1668	len = min_t(size_t, total, first->iov_len);
   1669	if (len) {
   1670		memcpy(dst, first->iov_base, len);
   1671		dst += len;
   1672		remaining -= len;
   1673	}
   1674
   1675	if (remaining) {
   1676		len = min_t(size_t, remaining, PAGE_SIZE);
   1677		memcpy(dst, p, len);
   1678		dst += len;
   1679	}
   1680
   1681	*dst = '\0';
   1682
   1683	/* Sanity check: Linux doesn't allow the pathname argument to
   1684	 * contain a NUL byte.
   1685	 */
   1686	if (strlen(result) != total) {
   1687		kfree(result);
   1688		return ERR_PTR(-EINVAL);
   1689	}
   1690	return result;
   1691}
   1692EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);