cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xprt.c (55155B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/*
      3 *  linux/net/sunrpc/xprt.c
      4 *
      5 *  This is a generic RPC call interface supporting congestion avoidance,
      6 *  and asynchronous calls.
      7 *
      8 *  The interface works like this:
      9 *
     10 *  -	When a process places a call, it allocates a request slot if
     11 *	one is available. Otherwise, it sleeps on the backlog queue
     12 *	(xprt_reserve).
     13 *  -	Next, the caller puts together the RPC message, stuffs it into
     14 *	the request struct, and calls xprt_transmit().
     15 *  -	xprt_transmit sends the message and installs the caller on the
     16 *	transport's wait list. At the same time, if a reply is expected,
     17 *	it installs a timer that is run after the packet's timeout has
     18 *	expired.
     19 *  -	When a packet arrives, the data_ready handler walks the list of
     20 *	pending requests for that transport. If a matching XID is found, the
     21 *	caller is woken up, and the timer removed.
     22 *  -	When no reply arrives within the timeout interval, the timer is
     23 *	fired by the kernel and runs xprt_timer(). It either adjusts the
     24 *	timeout values (minor timeout) or wakes up the caller with a status
     25 *	of -ETIMEDOUT.
     26 *  -	When the caller receives a notification from RPC that a reply arrived,
     27 *	it should release the RPC slot, and process the reply.
     28 *	If the call timed out, it may choose to retry the operation by
     29 *	adjusting the initial timeout value, and simply calling rpc_call
     30 *	again.
     31 *
     32 *  Support for async RPC is done through a set of RPC-specific scheduling
     33 *  primitives that `transparently' work for processes as well as async
     34 *  tasks that rely on callbacks.
     35 *
     36 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
     37 *
     38 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
     39 */
     40
     41#include <linux/module.h>
     42
     43#include <linux/types.h>
     44#include <linux/interrupt.h>
     45#include <linux/workqueue.h>
     46#include <linux/net.h>
     47#include <linux/ktime.h>
     48
     49#include <linux/sunrpc/clnt.h>
     50#include <linux/sunrpc/metrics.h>
     51#include <linux/sunrpc/bc_xprt.h>
     52#include <linux/rcupdate.h>
     53#include <linux/sched/mm.h>
     54
     55#include <trace/events/sunrpc.h>
     56
     57#include "sunrpc.h"
     58#include "sysfs.h"
     59#include "fail.h"
     60
     61/*
     62 * Local variables
     63 */
     64
     65#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
     66# define RPCDBG_FACILITY	RPCDBG_XPRT
     67#endif
     68
     69/*
     70 * Local functions
     71 */
     72static void	xprt_init(struct rpc_xprt *xprt, struct net *net);
     73static __be32	xprt_alloc_xid(struct rpc_xprt *xprt);
     74static void	xprt_destroy(struct rpc_xprt *xprt);
     75static void	xprt_request_init(struct rpc_task *task);
     76static int	xprt_request_prepare(struct rpc_rqst *req);
     77
     78static DEFINE_SPINLOCK(xprt_list_lock);
     79static LIST_HEAD(xprt_list);
     80
     81static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
     82{
     83	unsigned long timeout = jiffies + req->rq_timeout;
     84
     85	if (time_before(timeout, req->rq_majortimeo))
     86		return timeout;
     87	return req->rq_majortimeo;
     88}
     89
     90/**
     91 * xprt_register_transport - register a transport implementation
     92 * @transport: transport to register
     93 *
     94 * If a transport implementation is loaded as a kernel module, it can
     95 * call this interface to make itself known to the RPC client.
     96 *
     97 * Returns:
     98 * 0:		transport successfully registered
     99 * -EEXIST:	transport already registered
    100 * -EINVAL:	transport module being unloaded
    101 */
    102int xprt_register_transport(struct xprt_class *transport)
    103{
    104	struct xprt_class *t;
    105	int result;
    106
    107	result = -EEXIST;
    108	spin_lock(&xprt_list_lock);
    109	list_for_each_entry(t, &xprt_list, list) {
    110		/* don't register the same transport class twice */
    111		if (t->ident == transport->ident)
    112			goto out;
    113	}
    114
    115	list_add_tail(&transport->list, &xprt_list);
    116	printk(KERN_INFO "RPC: Registered %s transport module.\n",
    117	       transport->name);
    118	result = 0;
    119
    120out:
    121	spin_unlock(&xprt_list_lock);
    122	return result;
    123}
    124EXPORT_SYMBOL_GPL(xprt_register_transport);
    125
    126/**
    127 * xprt_unregister_transport - unregister a transport implementation
    128 * @transport: transport to unregister
    129 *
    130 * Returns:
    131 * 0:		transport successfully unregistered
    132 * -ENOENT:	transport never registered
    133 */
    134int xprt_unregister_transport(struct xprt_class *transport)
    135{
    136	struct xprt_class *t;
    137	int result;
    138
    139	result = 0;
    140	spin_lock(&xprt_list_lock);
    141	list_for_each_entry(t, &xprt_list, list) {
    142		if (t == transport) {
    143			printk(KERN_INFO
    144				"RPC: Unregistered %s transport module.\n",
    145				transport->name);
    146			list_del_init(&transport->list);
    147			goto out;
    148		}
    149	}
    150	result = -ENOENT;
    151
    152out:
    153	spin_unlock(&xprt_list_lock);
    154	return result;
    155}
    156EXPORT_SYMBOL_GPL(xprt_unregister_transport);
    157
    158static void
    159xprt_class_release(const struct xprt_class *t)
    160{
    161	module_put(t->owner);
    162}
    163
    164static const struct xprt_class *
    165xprt_class_find_by_ident_locked(int ident)
    166{
    167	const struct xprt_class *t;
    168
    169	list_for_each_entry(t, &xprt_list, list) {
    170		if (t->ident != ident)
    171			continue;
    172		if (!try_module_get(t->owner))
    173			continue;
    174		return t;
    175	}
    176	return NULL;
    177}
    178
    179static const struct xprt_class *
    180xprt_class_find_by_ident(int ident)
    181{
    182	const struct xprt_class *t;
    183
    184	spin_lock(&xprt_list_lock);
    185	t = xprt_class_find_by_ident_locked(ident);
    186	spin_unlock(&xprt_list_lock);
    187	return t;
    188}
    189
    190static const struct xprt_class *
    191xprt_class_find_by_netid_locked(const char *netid)
    192{
    193	const struct xprt_class *t;
    194	unsigned int i;
    195
    196	list_for_each_entry(t, &xprt_list, list) {
    197		for (i = 0; t->netid[i][0] != '\0'; i++) {
    198			if (strcmp(t->netid[i], netid) != 0)
    199				continue;
    200			if (!try_module_get(t->owner))
    201				continue;
    202			return t;
    203		}
    204	}
    205	return NULL;
    206}
    207
    208static const struct xprt_class *
    209xprt_class_find_by_netid(const char *netid)
    210{
    211	const struct xprt_class *t;
    212
    213	spin_lock(&xprt_list_lock);
    214	t = xprt_class_find_by_netid_locked(netid);
    215	if (!t) {
    216		spin_unlock(&xprt_list_lock);
    217		request_module("rpc%s", netid);
    218		spin_lock(&xprt_list_lock);
    219		t = xprt_class_find_by_netid_locked(netid);
    220	}
    221	spin_unlock(&xprt_list_lock);
    222	return t;
    223}
    224
    225/**
    226 * xprt_find_transport_ident - convert a netid into a transport identifier
    227 * @netid: transport to load
    228 *
    229 * Returns:
    230 * > 0:		transport identifier
    231 * -ENOENT:	transport module not available
    232 */
    233int xprt_find_transport_ident(const char *netid)
    234{
    235	const struct xprt_class *t;
    236	int ret;
    237
    238	t = xprt_class_find_by_netid(netid);
    239	if (!t)
    240		return -ENOENT;
    241	ret = t->ident;
    242	xprt_class_release(t);
    243	return ret;
    244}
    245EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
    246
    247static void xprt_clear_locked(struct rpc_xprt *xprt)
    248{
    249	xprt->snd_task = NULL;
    250	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state))
    251		clear_bit_unlock(XPRT_LOCKED, &xprt->state);
    252	else
    253		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
    254}
    255
    256/**
    257 * xprt_reserve_xprt - serialize write access to transports
    258 * @task: task that is requesting access to the transport
    259 * @xprt: pointer to the target transport
    260 *
    261 * This prevents mixing the payload of separate requests, and prevents
    262 * transport connects from colliding with writes.  No congestion control
    263 * is provided.
    264 */
    265int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
    266{
    267	struct rpc_rqst *req = task->tk_rqstp;
    268
    269	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
    270		if (task == xprt->snd_task)
    271			goto out_locked;
    272		goto out_sleep;
    273	}
    274	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
    275		goto out_unlock;
    276	xprt->snd_task = task;
    277
    278out_locked:
    279	trace_xprt_reserve_xprt(xprt, task);
    280	return 1;
    281
    282out_unlock:
    283	xprt_clear_locked(xprt);
    284out_sleep:
    285	task->tk_status = -EAGAIN;
    286	if  (RPC_IS_SOFT(task))
    287		rpc_sleep_on_timeout(&xprt->sending, task, NULL,
    288				xprt_request_timeout(req));
    289	else
    290		rpc_sleep_on(&xprt->sending, task, NULL);
    291	return 0;
    292}
    293EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
    294
    295static bool
    296xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
    297{
    298	return test_bit(XPRT_CWND_WAIT, &xprt->state);
    299}
    300
    301static void
    302xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
    303{
    304	if (!list_empty(&xprt->xmit_queue)) {
    305		/* Peek at head of queue to see if it can make progress */
    306		if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
    307					rq_xmit)->rq_cong)
    308			return;
    309	}
    310	set_bit(XPRT_CWND_WAIT, &xprt->state);
    311}
    312
    313static void
    314xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
    315{
    316	if (!RPCXPRT_CONGESTED(xprt))
    317		clear_bit(XPRT_CWND_WAIT, &xprt->state);
    318}
    319
    320/*
    321 * xprt_reserve_xprt_cong - serialize write access to transports
    322 * @task: task that is requesting access to the transport
    323 *
    324 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
    325 * integrated into the decision of whether a request is allowed to be
    326 * woken up and given access to the transport.
    327 * Note that the lock is only granted if we know there are free slots.
    328 */
    329int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
    330{
    331	struct rpc_rqst *req = task->tk_rqstp;
    332
    333	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
    334		if (task == xprt->snd_task)
    335			goto out_locked;
    336		goto out_sleep;
    337	}
    338	if (req == NULL) {
    339		xprt->snd_task = task;
    340		goto out_locked;
    341	}
    342	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
    343		goto out_unlock;
    344	if (!xprt_need_congestion_window_wait(xprt)) {
    345		xprt->snd_task = task;
    346		goto out_locked;
    347	}
    348out_unlock:
    349	xprt_clear_locked(xprt);
    350out_sleep:
    351	task->tk_status = -EAGAIN;
    352	if (RPC_IS_SOFT(task))
    353		rpc_sleep_on_timeout(&xprt->sending, task, NULL,
    354				xprt_request_timeout(req));
    355	else
    356		rpc_sleep_on(&xprt->sending, task, NULL);
    357	return 0;
    358out_locked:
    359	trace_xprt_reserve_cong(xprt, task);
    360	return 1;
    361}
    362EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
    363
    364static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
    365{
    366	int retval;
    367
    368	if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
    369		return 1;
    370	spin_lock(&xprt->transport_lock);
    371	retval = xprt->ops->reserve_xprt(xprt, task);
    372	spin_unlock(&xprt->transport_lock);
    373	return retval;
    374}
    375
    376static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
    377{
    378	struct rpc_xprt *xprt = data;
    379
    380	xprt->snd_task = task;
    381	return true;
    382}
    383
    384static void __xprt_lock_write_next(struct rpc_xprt *xprt)
    385{
    386	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
    387		return;
    388	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
    389		goto out_unlock;
    390	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
    391				__xprt_lock_write_func, xprt))
    392		return;
    393out_unlock:
    394	xprt_clear_locked(xprt);
    395}
    396
    397static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
    398{
    399	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
    400		return;
    401	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
    402		goto out_unlock;
    403	if (xprt_need_congestion_window_wait(xprt))
    404		goto out_unlock;
    405	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
    406				__xprt_lock_write_func, xprt))
    407		return;
    408out_unlock:
    409	xprt_clear_locked(xprt);
    410}
    411
    412/**
    413 * xprt_release_xprt - allow other requests to use a transport
    414 * @xprt: transport with other tasks potentially waiting
    415 * @task: task that is releasing access to the transport
    416 *
    417 * Note that "task" can be NULL.  No congestion control is provided.
    418 */
    419void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
    420{
    421	if (xprt->snd_task == task) {
    422		xprt_clear_locked(xprt);
    423		__xprt_lock_write_next(xprt);
    424	}
    425	trace_xprt_release_xprt(xprt, task);
    426}
    427EXPORT_SYMBOL_GPL(xprt_release_xprt);
    428
    429/**
    430 * xprt_release_xprt_cong - allow other requests to use a transport
    431 * @xprt: transport with other tasks potentially waiting
    432 * @task: task that is releasing access to the transport
    433 *
    434 * Note that "task" can be NULL.  Another task is awoken to use the
    435 * transport if the transport's congestion window allows it.
    436 */
    437void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
    438{
    439	if (xprt->snd_task == task) {
    440		xprt_clear_locked(xprt);
    441		__xprt_lock_write_next_cong(xprt);
    442	}
    443	trace_xprt_release_cong(xprt, task);
    444}
    445EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
    446
    447void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
    448{
    449	if (xprt->snd_task != task)
    450		return;
    451	spin_lock(&xprt->transport_lock);
    452	xprt->ops->release_xprt(xprt, task);
    453	spin_unlock(&xprt->transport_lock);
    454}
    455
    456/*
    457 * Van Jacobson congestion avoidance. Check if the congestion window
    458 * overflowed. Put the task to sleep if this is the case.
    459 */
    460static int
    461__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
    462{
    463	if (req->rq_cong)
    464		return 1;
    465	trace_xprt_get_cong(xprt, req->rq_task);
    466	if (RPCXPRT_CONGESTED(xprt)) {
    467		xprt_set_congestion_window_wait(xprt);
    468		return 0;
    469	}
    470	req->rq_cong = 1;
    471	xprt->cong += RPC_CWNDSCALE;
    472	return 1;
    473}
    474
    475/*
    476 * Adjust the congestion window, and wake up the next task
    477 * that has been sleeping due to congestion
    478 */
    479static void
    480__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
    481{
    482	if (!req->rq_cong)
    483		return;
    484	req->rq_cong = 0;
    485	xprt->cong -= RPC_CWNDSCALE;
    486	xprt_test_and_clear_congestion_window_wait(xprt);
    487	trace_xprt_put_cong(xprt, req->rq_task);
    488	__xprt_lock_write_next_cong(xprt);
    489}
    490
    491/**
    492 * xprt_request_get_cong - Request congestion control credits
    493 * @xprt: pointer to transport
    494 * @req: pointer to RPC request
    495 *
    496 * Useful for transports that require congestion control.
    497 */
    498bool
    499xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
    500{
    501	bool ret = false;
    502
    503	if (req->rq_cong)
    504		return true;
    505	spin_lock(&xprt->transport_lock);
    506	ret = __xprt_get_cong(xprt, req) != 0;
    507	spin_unlock(&xprt->transport_lock);
    508	return ret;
    509}
    510EXPORT_SYMBOL_GPL(xprt_request_get_cong);
    511
    512/**
    513 * xprt_release_rqst_cong - housekeeping when request is complete
    514 * @task: RPC request that recently completed
    515 *
    516 * Useful for transports that require congestion control.
    517 */
    518void xprt_release_rqst_cong(struct rpc_task *task)
    519{
    520	struct rpc_rqst *req = task->tk_rqstp;
    521
    522	__xprt_put_cong(req->rq_xprt, req);
    523}
    524EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
    525
    526static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
    527{
    528	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
    529		__xprt_lock_write_next_cong(xprt);
    530}
    531
    532/*
    533 * Clear the congestion window wait flag and wake up the next
    534 * entry on xprt->sending
    535 */
    536static void
    537xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
    538{
    539	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
    540		spin_lock(&xprt->transport_lock);
    541		__xprt_lock_write_next_cong(xprt);
    542		spin_unlock(&xprt->transport_lock);
    543	}
    544}
    545
    546/**
    547 * xprt_adjust_cwnd - adjust transport congestion window
    548 * @xprt: pointer to xprt
    549 * @task: recently completed RPC request used to adjust window
    550 * @result: result code of completed RPC request
    551 *
    552 * The transport code maintains an estimate on the maximum number of out-
    553 * standing RPC requests, using a smoothed version of the congestion
    554 * avoidance implemented in 44BSD. This is basically the Van Jacobson
    555 * congestion algorithm: If a retransmit occurs, the congestion window is
    556 * halved; otherwise, it is incremented by 1/cwnd when
    557 *
    558 *	-	a reply is received and
    559 *	-	a full number of requests are outstanding and
    560 *	-	the congestion window hasn't been updated recently.
    561 */
    562void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
    563{
    564	struct rpc_rqst *req = task->tk_rqstp;
    565	unsigned long cwnd = xprt->cwnd;
    566
    567	if (result >= 0 && cwnd <= xprt->cong) {
    568		/* The (cwnd >> 1) term makes sure
    569		 * the result gets rounded properly. */
    570		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
    571		if (cwnd > RPC_MAXCWND(xprt))
    572			cwnd = RPC_MAXCWND(xprt);
    573		__xprt_lock_write_next_cong(xprt);
    574	} else if (result == -ETIMEDOUT) {
    575		cwnd >>= 1;
    576		if (cwnd < RPC_CWNDSCALE)
    577			cwnd = RPC_CWNDSCALE;
    578	}
    579	dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
    580			xprt->cong, xprt->cwnd, cwnd);
    581	xprt->cwnd = cwnd;
    582	__xprt_put_cong(xprt, req);
    583}
    584EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
    585
    586/**
    587 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
    588 * @xprt: transport with waiting tasks
    589 * @status: result code to plant in each task before waking it
    590 *
    591 */
    592void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
    593{
    594	if (status < 0)
    595		rpc_wake_up_status(&xprt->pending, status);
    596	else
    597		rpc_wake_up(&xprt->pending);
    598}
    599EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
    600
    601/**
    602 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
    603 * @xprt: transport
    604 *
    605 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
    606 * we don't in general want to force a socket disconnection due to
    607 * an incomplete RPC call transmission.
    608 */
    609void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
    610{
    611	set_bit(XPRT_WRITE_SPACE, &xprt->state);
    612}
    613EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
    614
    615static bool
    616xprt_clear_write_space_locked(struct rpc_xprt *xprt)
    617{
    618	if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
    619		__xprt_lock_write_next(xprt);
    620		dprintk("RPC:       write space: waking waiting task on "
    621				"xprt %p\n", xprt);
    622		return true;
    623	}
    624	return false;
    625}
    626
    627/**
    628 * xprt_write_space - wake the task waiting for transport output buffer space
    629 * @xprt: transport with waiting tasks
    630 *
    631 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
    632 */
    633bool xprt_write_space(struct rpc_xprt *xprt)
    634{
    635	bool ret;
    636
    637	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
    638		return false;
    639	spin_lock(&xprt->transport_lock);
    640	ret = xprt_clear_write_space_locked(xprt);
    641	spin_unlock(&xprt->transport_lock);
    642	return ret;
    643}
    644EXPORT_SYMBOL_GPL(xprt_write_space);
    645
    646static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
    647{
    648	s64 delta = ktime_to_ns(ktime_get() - abstime);
    649	return likely(delta >= 0) ?
    650		jiffies - nsecs_to_jiffies(delta) :
    651		jiffies + nsecs_to_jiffies(-delta);
    652}
    653
    654static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
    655{
    656	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
    657	unsigned long majortimeo = req->rq_timeout;
    658
    659	if (to->to_exponential)
    660		majortimeo <<= to->to_retries;
    661	else
    662		majortimeo += to->to_increment * to->to_retries;
    663	if (majortimeo > to->to_maxval || majortimeo == 0)
    664		majortimeo = to->to_maxval;
    665	return majortimeo;
    666}
    667
    668static void xprt_reset_majortimeo(struct rpc_rqst *req)
    669{
    670	req->rq_majortimeo += xprt_calc_majortimeo(req);
    671}
    672
    673static void xprt_reset_minortimeo(struct rpc_rqst *req)
    674{
    675	req->rq_minortimeo += req->rq_timeout;
    676}
    677
    678static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
    679{
    680	unsigned long time_init;
    681	struct rpc_xprt *xprt = req->rq_xprt;
    682
    683	if (likely(xprt && xprt_connected(xprt)))
    684		time_init = jiffies;
    685	else
    686		time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
    687	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
    688	req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
    689	req->rq_minortimeo = time_init + req->rq_timeout;
    690}
    691
    692/**
    693 * xprt_adjust_timeout - adjust timeout values for next retransmit
    694 * @req: RPC request containing parameters to use for the adjustment
    695 *
    696 */
    697int xprt_adjust_timeout(struct rpc_rqst *req)
    698{
    699	struct rpc_xprt *xprt = req->rq_xprt;
    700	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
    701	int status = 0;
    702
    703	if (time_before(jiffies, req->rq_majortimeo)) {
    704		if (time_before(jiffies, req->rq_minortimeo))
    705			return status;
    706		if (to->to_exponential)
    707			req->rq_timeout <<= 1;
    708		else
    709			req->rq_timeout += to->to_increment;
    710		if (to->to_maxval && req->rq_timeout >= to->to_maxval)
    711			req->rq_timeout = to->to_maxval;
    712		req->rq_retries++;
    713	} else {
    714		req->rq_timeout = to->to_initval;
    715		req->rq_retries = 0;
    716		xprt_reset_majortimeo(req);
    717		/* Reset the RTT counters == "slow start" */
    718		spin_lock(&xprt->transport_lock);
    719		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
    720		spin_unlock(&xprt->transport_lock);
    721		status = -ETIMEDOUT;
    722	}
    723	xprt_reset_minortimeo(req);
    724
    725	if (req->rq_timeout == 0) {
    726		printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
    727		req->rq_timeout = 5 * HZ;
    728	}
    729	return status;
    730}
    731
    732static void xprt_autoclose(struct work_struct *work)
    733{
    734	struct rpc_xprt *xprt =
    735		container_of(work, struct rpc_xprt, task_cleanup);
    736	unsigned int pflags = memalloc_nofs_save();
    737
    738	trace_xprt_disconnect_auto(xprt);
    739	xprt->connect_cookie++;
    740	smp_mb__before_atomic();
    741	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
    742	xprt->ops->close(xprt);
    743	xprt_release_write(xprt, NULL);
    744	wake_up_bit(&xprt->state, XPRT_LOCKED);
    745	memalloc_nofs_restore(pflags);
    746}
    747
    748/**
    749 * xprt_disconnect_done - mark a transport as disconnected
    750 * @xprt: transport to flag for disconnect
    751 *
    752 */
    753void xprt_disconnect_done(struct rpc_xprt *xprt)
    754{
    755	trace_xprt_disconnect_done(xprt);
    756	spin_lock(&xprt->transport_lock);
    757	xprt_clear_connected(xprt);
    758	xprt_clear_write_space_locked(xprt);
    759	xprt_clear_congestion_window_wait_locked(xprt);
    760	xprt_wake_pending_tasks(xprt, -ENOTCONN);
    761	spin_unlock(&xprt->transport_lock);
    762}
    763EXPORT_SYMBOL_GPL(xprt_disconnect_done);
    764
    765/**
    766 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call
    767 * @xprt: transport to disconnect
    768 */
    769static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
    770{
    771	if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state))
    772		return;
    773	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
    774		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
    775	else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
    776		rpc_wake_up_queued_task_set_status(&xprt->pending,
    777						   xprt->snd_task, -ENOTCONN);
    778}
    779
    780/**
    781 * xprt_force_disconnect - force a transport to disconnect
    782 * @xprt: transport to disconnect
    783 *
    784 */
    785void xprt_force_disconnect(struct rpc_xprt *xprt)
    786{
    787	trace_xprt_disconnect_force(xprt);
    788
    789	/* Don't race with the test_bit() in xprt_clear_locked() */
    790	spin_lock(&xprt->transport_lock);
    791	xprt_schedule_autoclose_locked(xprt);
    792	spin_unlock(&xprt->transport_lock);
    793}
    794EXPORT_SYMBOL_GPL(xprt_force_disconnect);
    795
    796static unsigned int
    797xprt_connect_cookie(struct rpc_xprt *xprt)
    798{
    799	return READ_ONCE(xprt->connect_cookie);
    800}
    801
    802static bool
    803xprt_request_retransmit_after_disconnect(struct rpc_task *task)
    804{
    805	struct rpc_rqst *req = task->tk_rqstp;
    806	struct rpc_xprt *xprt = req->rq_xprt;
    807
    808	return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
    809		!xprt_connected(xprt);
    810}
    811
    812/**
    813 * xprt_conditional_disconnect - force a transport to disconnect
    814 * @xprt: transport to disconnect
    815 * @cookie: 'connection cookie'
    816 *
    817 * This attempts to break the connection if and only if 'cookie' matches
    818 * the current transport 'connection cookie'. It ensures that we don't
    819 * try to break the connection more than once when we need to retransmit
    820 * a batch of RPC requests.
    821 *
    822 */
    823void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
    824{
    825	/* Don't race with the test_bit() in xprt_clear_locked() */
    826	spin_lock(&xprt->transport_lock);
    827	if (cookie != xprt->connect_cookie)
    828		goto out;
    829	if (test_bit(XPRT_CLOSING, &xprt->state))
    830		goto out;
    831	xprt_schedule_autoclose_locked(xprt);
    832out:
    833	spin_unlock(&xprt->transport_lock);
    834}
    835
    836static bool
    837xprt_has_timer(const struct rpc_xprt *xprt)
    838{
    839	return xprt->idle_timeout != 0;
    840}
    841
    842static void
    843xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
    844	__must_hold(&xprt->transport_lock)
    845{
    846	xprt->last_used = jiffies;
    847	if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
    848		mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
    849}
    850
    851static void
    852xprt_init_autodisconnect(struct timer_list *t)
    853{
    854	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
    855
    856	if (!RB_EMPTY_ROOT(&xprt->recv_queue))
    857		return;
    858	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
    859	xprt->last_used = jiffies;
    860	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
    861		return;
    862	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
    863}
    864
    865#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
    866static void xprt_inject_disconnect(struct rpc_xprt *xprt)
    867{
    868	if (!fail_sunrpc.ignore_client_disconnect &&
    869	    should_fail(&fail_sunrpc.attr, 1))
    870		xprt->ops->inject_disconnect(xprt);
    871}
    872#else
    873static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
    874{
    875}
    876#endif
    877
    878bool xprt_lock_connect(struct rpc_xprt *xprt,
    879		struct rpc_task *task,
    880		void *cookie)
    881{
    882	bool ret = false;
    883
    884	spin_lock(&xprt->transport_lock);
    885	if (!test_bit(XPRT_LOCKED, &xprt->state))
    886		goto out;
    887	if (xprt->snd_task != task)
    888		goto out;
    889	set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
    890	xprt->snd_task = cookie;
    891	ret = true;
    892out:
    893	spin_unlock(&xprt->transport_lock);
    894	return ret;
    895}
    896EXPORT_SYMBOL_GPL(xprt_lock_connect);
    897
    898void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
    899{
    900	spin_lock(&xprt->transport_lock);
    901	if (xprt->snd_task != cookie)
    902		goto out;
    903	if (!test_bit(XPRT_LOCKED, &xprt->state))
    904		goto out;
    905	xprt->snd_task =NULL;
    906	clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
    907	xprt->ops->release_xprt(xprt, NULL);
    908	xprt_schedule_autodisconnect(xprt);
    909out:
    910	spin_unlock(&xprt->transport_lock);
    911	wake_up_bit(&xprt->state, XPRT_LOCKED);
    912}
    913EXPORT_SYMBOL_GPL(xprt_unlock_connect);
    914
    915/**
    916 * xprt_connect - schedule a transport connect operation
    917 * @task: RPC task that is requesting the connect
    918 *
    919 */
    920void xprt_connect(struct rpc_task *task)
    921{
    922	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
    923
    924	trace_xprt_connect(xprt);
    925
    926	if (!xprt_bound(xprt)) {
    927		task->tk_status = -EAGAIN;
    928		return;
    929	}
    930	if (!xprt_lock_write(xprt, task))
    931		return;
    932
    933	if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
    934		task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
    935		rpc_sleep_on_timeout(&xprt->pending, task, NULL,
    936				xprt_request_timeout(task->tk_rqstp));
    937
    938		if (test_bit(XPRT_CLOSING, &xprt->state))
    939			return;
    940		if (xprt_test_and_set_connecting(xprt))
    941			return;
    942		/* Race breaker */
    943		if (!xprt_connected(xprt)) {
    944			xprt->stat.connect_start = jiffies;
    945			xprt->ops->connect(xprt, task);
    946		} else {
    947			xprt_clear_connecting(xprt);
    948			task->tk_status = 0;
    949			rpc_wake_up_queued_task(&xprt->pending, task);
    950		}
    951	}
    952	xprt_release_write(xprt, task);
    953}
    954
    955/**
    956 * xprt_reconnect_delay - compute the wait before scheduling a connect
    957 * @xprt: transport instance
    958 *
    959 */
    960unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
    961{
    962	unsigned long start, now = jiffies;
    963
    964	start = xprt->stat.connect_start + xprt->reestablish_timeout;
    965	if (time_after(start, now))
    966		return start - now;
    967	return 0;
    968}
    969EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
    970
    971/**
    972 * xprt_reconnect_backoff - compute the new re-establish timeout
    973 * @xprt: transport instance
    974 * @init_to: initial reestablish timeout
    975 *
    976 */
    977void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
    978{
    979	xprt->reestablish_timeout <<= 1;
    980	if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
    981		xprt->reestablish_timeout = xprt->max_reconnect_timeout;
    982	if (xprt->reestablish_timeout < init_to)
    983		xprt->reestablish_timeout = init_to;
    984}
    985EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
    986
    987enum xprt_xid_rb_cmp {
    988	XID_RB_EQUAL,
    989	XID_RB_LEFT,
    990	XID_RB_RIGHT,
    991};
    992static enum xprt_xid_rb_cmp
    993xprt_xid_cmp(__be32 xid1, __be32 xid2)
    994{
    995	if (xid1 == xid2)
    996		return XID_RB_EQUAL;
    997	if ((__force u32)xid1 < (__force u32)xid2)
    998		return XID_RB_LEFT;
    999	return XID_RB_RIGHT;
   1000}
   1001
   1002static struct rpc_rqst *
   1003xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
   1004{
   1005	struct rb_node *n = xprt->recv_queue.rb_node;
   1006	struct rpc_rqst *req;
   1007
   1008	while (n != NULL) {
   1009		req = rb_entry(n, struct rpc_rqst, rq_recv);
   1010		switch (xprt_xid_cmp(xid, req->rq_xid)) {
   1011		case XID_RB_LEFT:
   1012			n = n->rb_left;
   1013			break;
   1014		case XID_RB_RIGHT:
   1015			n = n->rb_right;
   1016			break;
   1017		case XID_RB_EQUAL:
   1018			return req;
   1019		}
   1020	}
   1021	return NULL;
   1022}
   1023
   1024static void
   1025xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
   1026{
   1027	struct rb_node **p = &xprt->recv_queue.rb_node;
   1028	struct rb_node *n = NULL;
   1029	struct rpc_rqst *req;
   1030
   1031	while (*p != NULL) {
   1032		n = *p;
   1033		req = rb_entry(n, struct rpc_rqst, rq_recv);
   1034		switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
   1035		case XID_RB_LEFT:
   1036			p = &n->rb_left;
   1037			break;
   1038		case XID_RB_RIGHT:
   1039			p = &n->rb_right;
   1040			break;
   1041		case XID_RB_EQUAL:
   1042			WARN_ON_ONCE(new != req);
   1043			return;
   1044		}
   1045	}
   1046	rb_link_node(&new->rq_recv, n, p);
   1047	rb_insert_color(&new->rq_recv, &xprt->recv_queue);
   1048}
   1049
   1050static void
   1051xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
   1052{
   1053	rb_erase(&req->rq_recv, &xprt->recv_queue);
   1054}
   1055
   1056/**
   1057 * xprt_lookup_rqst - find an RPC request corresponding to an XID
   1058 * @xprt: transport on which the original request was transmitted
   1059 * @xid: RPC XID of incoming reply
   1060 *
   1061 * Caller holds xprt->queue_lock.
   1062 */
   1063struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
   1064{
   1065	struct rpc_rqst *entry;
   1066
   1067	entry = xprt_request_rb_find(xprt, xid);
   1068	if (entry != NULL) {
   1069		trace_xprt_lookup_rqst(xprt, xid, 0);
   1070		entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
   1071		return entry;
   1072	}
   1073
   1074	dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
   1075			ntohl(xid));
   1076	trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
   1077	xprt->stat.bad_xids++;
   1078	return NULL;
   1079}
   1080EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
   1081
   1082static bool
   1083xprt_is_pinned_rqst(struct rpc_rqst *req)
   1084{
   1085	return atomic_read(&req->rq_pin) != 0;
   1086}
   1087
   1088/**
   1089 * xprt_pin_rqst - Pin a request on the transport receive list
   1090 * @req: Request to pin
   1091 *
   1092 * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
   1093 * so should be holding xprt->queue_lock.
   1094 */
   1095void xprt_pin_rqst(struct rpc_rqst *req)
   1096{
   1097	atomic_inc(&req->rq_pin);
   1098}
   1099EXPORT_SYMBOL_GPL(xprt_pin_rqst);
   1100
   1101/**
   1102 * xprt_unpin_rqst - Unpin a request on the transport receive list
   1103 * @req: Request to pin
   1104 *
   1105 * Caller should be holding xprt->queue_lock.
   1106 */
   1107void xprt_unpin_rqst(struct rpc_rqst *req)
   1108{
   1109	if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
   1110		atomic_dec(&req->rq_pin);
   1111		return;
   1112	}
   1113	if (atomic_dec_and_test(&req->rq_pin))
   1114		wake_up_var(&req->rq_pin);
   1115}
   1116EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
   1117
   1118static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
   1119{
   1120	wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
   1121}
   1122
   1123static bool
   1124xprt_request_data_received(struct rpc_task *task)
   1125{
   1126	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
   1127		READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
   1128}
   1129
   1130static bool
   1131xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
   1132{
   1133	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
   1134		READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
   1135}
   1136
   1137/**
   1138 * xprt_request_enqueue_receive - Add an request to the receive queue
   1139 * @task: RPC task
   1140 *
   1141 */
   1142int
   1143xprt_request_enqueue_receive(struct rpc_task *task)
   1144{
   1145	struct rpc_rqst *req = task->tk_rqstp;
   1146	struct rpc_xprt *xprt = req->rq_xprt;
   1147	int ret;
   1148
   1149	if (!xprt_request_need_enqueue_receive(task, req))
   1150		return 0;
   1151
   1152	ret = xprt_request_prepare(task->tk_rqstp);
   1153	if (ret)
   1154		return ret;
   1155	spin_lock(&xprt->queue_lock);
   1156
   1157	/* Update the softirq receive buffer */
   1158	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
   1159			sizeof(req->rq_private_buf));
   1160
   1161	/* Add request to the receive list */
   1162	xprt_request_rb_insert(xprt, req);
   1163	set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
   1164	spin_unlock(&xprt->queue_lock);
   1165
   1166	/* Turn off autodisconnect */
   1167	del_singleshot_timer_sync(&xprt->timer);
   1168	return 0;
   1169}
   1170
   1171/**
   1172 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
   1173 * @task: RPC task
   1174 *
   1175 * Caller must hold xprt->queue_lock.
   1176 */
   1177static void
   1178xprt_request_dequeue_receive_locked(struct rpc_task *task)
   1179{
   1180	struct rpc_rqst *req = task->tk_rqstp;
   1181
   1182	if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
   1183		xprt_request_rb_remove(req->rq_xprt, req);
   1184}
   1185
   1186/**
   1187 * xprt_update_rtt - Update RPC RTT statistics
   1188 * @task: RPC request that recently completed
   1189 *
   1190 * Caller holds xprt->queue_lock.
   1191 */
   1192void xprt_update_rtt(struct rpc_task *task)
   1193{
   1194	struct rpc_rqst *req = task->tk_rqstp;
   1195	struct rpc_rtt *rtt = task->tk_client->cl_rtt;
   1196	unsigned int timer = task->tk_msg.rpc_proc->p_timer;
   1197	long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
   1198
   1199	if (timer) {
   1200		if (req->rq_ntrans == 1)
   1201			rpc_update_rtt(rtt, timer, m);
   1202		rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
   1203	}
   1204}
   1205EXPORT_SYMBOL_GPL(xprt_update_rtt);
   1206
   1207/**
   1208 * xprt_complete_rqst - called when reply processing is complete
   1209 * @task: RPC request that recently completed
   1210 * @copied: actual number of bytes received from the transport
   1211 *
   1212 * Caller holds xprt->queue_lock.
   1213 */
   1214void xprt_complete_rqst(struct rpc_task *task, int copied)
   1215{
   1216	struct rpc_rqst *req = task->tk_rqstp;
   1217	struct rpc_xprt *xprt = req->rq_xprt;
   1218
   1219	xprt->stat.recvs++;
   1220
   1221	req->rq_private_buf.len = copied;
   1222	/* Ensure all writes are done before we update */
   1223	/* req->rq_reply_bytes_recvd */
   1224	smp_wmb();
   1225	req->rq_reply_bytes_recvd = copied;
   1226	xprt_request_dequeue_receive_locked(task);
   1227	rpc_wake_up_queued_task(&xprt->pending, task);
   1228}
   1229EXPORT_SYMBOL_GPL(xprt_complete_rqst);
   1230
   1231static void xprt_timer(struct rpc_task *task)
   1232{
   1233	struct rpc_rqst *req = task->tk_rqstp;
   1234	struct rpc_xprt *xprt = req->rq_xprt;
   1235
   1236	if (task->tk_status != -ETIMEDOUT)
   1237		return;
   1238
   1239	trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
   1240	if (!req->rq_reply_bytes_recvd) {
   1241		if (xprt->ops->timer)
   1242			xprt->ops->timer(xprt, task);
   1243	} else
   1244		task->tk_status = 0;
   1245}
   1246
   1247/**
   1248 * xprt_wait_for_reply_request_def - wait for reply
   1249 * @task: pointer to rpc_task
   1250 *
   1251 * Set a request's retransmit timeout based on the transport's
   1252 * default timeout parameters.  Used by transports that don't adjust
   1253 * the retransmit timeout based on round-trip time estimation,
   1254 * and put the task to sleep on the pending queue.
   1255 */
   1256void xprt_wait_for_reply_request_def(struct rpc_task *task)
   1257{
   1258	struct rpc_rqst *req = task->tk_rqstp;
   1259
   1260	rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
   1261			xprt_request_timeout(req));
   1262}
   1263EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
   1264
   1265/**
   1266 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
   1267 * @task: pointer to rpc_task
   1268 *
   1269 * Set a request's retransmit timeout using the RTT estimator,
   1270 * and put the task to sleep on the pending queue.
   1271 */
   1272void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
   1273{
   1274	int timer = task->tk_msg.rpc_proc->p_timer;
   1275	struct rpc_clnt *clnt = task->tk_client;
   1276	struct rpc_rtt *rtt = clnt->cl_rtt;
   1277	struct rpc_rqst *req = task->tk_rqstp;
   1278	unsigned long max_timeout = clnt->cl_timeout->to_maxval;
   1279	unsigned long timeout;
   1280
   1281	timeout = rpc_calc_rto(rtt, timer);
   1282	timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
   1283	if (timeout > max_timeout || timeout == 0)
   1284		timeout = max_timeout;
   1285	rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
   1286			jiffies + timeout);
   1287}
   1288EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
   1289
   1290/**
   1291 * xprt_request_wait_receive - wait for the reply to an RPC request
   1292 * @task: RPC task about to send a request
   1293 *
   1294 */
   1295void xprt_request_wait_receive(struct rpc_task *task)
   1296{
   1297	struct rpc_rqst *req = task->tk_rqstp;
   1298	struct rpc_xprt *xprt = req->rq_xprt;
   1299
   1300	if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
   1301		return;
   1302	/*
   1303	 * Sleep on the pending queue if we're expecting a reply.
   1304	 * The spinlock ensures atomicity between the test of
   1305	 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
   1306	 */
   1307	spin_lock(&xprt->queue_lock);
   1308	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
   1309		xprt->ops->wait_for_reply_request(task);
   1310		/*
   1311		 * Send an extra queue wakeup call if the
   1312		 * connection was dropped in case the call to
   1313		 * rpc_sleep_on() raced.
   1314		 */
   1315		if (xprt_request_retransmit_after_disconnect(task))
   1316			rpc_wake_up_queued_task_set_status(&xprt->pending,
   1317					task, -ENOTCONN);
   1318	}
   1319	spin_unlock(&xprt->queue_lock);
   1320}
   1321
   1322static bool
   1323xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
   1324{
   1325	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
   1326}
   1327
   1328/**
   1329 * xprt_request_enqueue_transmit - queue a task for transmission
   1330 * @task: pointer to rpc_task
   1331 *
   1332 * Add a task to the transmission queue.
   1333 */
   1334void
   1335xprt_request_enqueue_transmit(struct rpc_task *task)
   1336{
   1337	struct rpc_rqst *pos, *req = task->tk_rqstp;
   1338	struct rpc_xprt *xprt = req->rq_xprt;
   1339
   1340	if (xprt_request_need_enqueue_transmit(task, req)) {
   1341		req->rq_bytes_sent = 0;
   1342		spin_lock(&xprt->queue_lock);
   1343		/*
   1344		 * Requests that carry congestion control credits are added
   1345		 * to the head of the list to avoid starvation issues.
   1346		 */
   1347		if (req->rq_cong) {
   1348			xprt_clear_congestion_window_wait(xprt);
   1349			list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
   1350				if (pos->rq_cong)
   1351					continue;
   1352				/* Note: req is added _before_ pos */
   1353				list_add_tail(&req->rq_xmit, &pos->rq_xmit);
   1354				INIT_LIST_HEAD(&req->rq_xmit2);
   1355				goto out;
   1356			}
   1357		} else if (!req->rq_seqno) {
   1358			list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
   1359				if (pos->rq_task->tk_owner != task->tk_owner)
   1360					continue;
   1361				list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
   1362				INIT_LIST_HEAD(&req->rq_xmit);
   1363				goto out;
   1364			}
   1365		}
   1366		list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
   1367		INIT_LIST_HEAD(&req->rq_xmit2);
   1368out:
   1369		atomic_long_inc(&xprt->xmit_queuelen);
   1370		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
   1371		spin_unlock(&xprt->queue_lock);
   1372	}
   1373}
   1374
   1375/**
   1376 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
   1377 * @task: pointer to rpc_task
   1378 *
   1379 * Remove a task from the transmission queue
   1380 * Caller must hold xprt->queue_lock
   1381 */
   1382static void
   1383xprt_request_dequeue_transmit_locked(struct rpc_task *task)
   1384{
   1385	struct rpc_rqst *req = task->tk_rqstp;
   1386
   1387	if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
   1388		return;
   1389	if (!list_empty(&req->rq_xmit)) {
   1390		list_del(&req->rq_xmit);
   1391		if (!list_empty(&req->rq_xmit2)) {
   1392			struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
   1393					struct rpc_rqst, rq_xmit2);
   1394			list_del(&req->rq_xmit2);
   1395			list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
   1396		}
   1397	} else
   1398		list_del(&req->rq_xmit2);
   1399	atomic_long_dec(&req->rq_xprt->xmit_queuelen);
   1400}
   1401
   1402/**
   1403 * xprt_request_dequeue_transmit - remove a task from the transmission queue
   1404 * @task: pointer to rpc_task
   1405 *
   1406 * Remove a task from the transmission queue
   1407 */
   1408static void
   1409xprt_request_dequeue_transmit(struct rpc_task *task)
   1410{
   1411	struct rpc_rqst *req = task->tk_rqstp;
   1412	struct rpc_xprt *xprt = req->rq_xprt;
   1413
   1414	spin_lock(&xprt->queue_lock);
   1415	xprt_request_dequeue_transmit_locked(task);
   1416	spin_unlock(&xprt->queue_lock);
   1417}
   1418
   1419/**
   1420 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
   1421 * @task: pointer to rpc_task
   1422 *
   1423 * Remove a task from the transmit and receive queues, and ensure that
   1424 * it is not pinned by the receive work item.
   1425 */
   1426void
   1427xprt_request_dequeue_xprt(struct rpc_task *task)
   1428{
   1429	struct rpc_rqst	*req = task->tk_rqstp;
   1430	struct rpc_xprt *xprt = req->rq_xprt;
   1431
   1432	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
   1433	    test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
   1434	    xprt_is_pinned_rqst(req)) {
   1435		spin_lock(&xprt->queue_lock);
   1436		xprt_request_dequeue_transmit_locked(task);
   1437		xprt_request_dequeue_receive_locked(task);
   1438		while (xprt_is_pinned_rqst(req)) {
   1439			set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
   1440			spin_unlock(&xprt->queue_lock);
   1441			xprt_wait_on_pinned_rqst(req);
   1442			spin_lock(&xprt->queue_lock);
   1443			clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
   1444		}
   1445		spin_unlock(&xprt->queue_lock);
   1446	}
   1447}
   1448
   1449/**
   1450 * xprt_request_prepare - prepare an encoded request for transport
   1451 * @req: pointer to rpc_rqst
   1452 *
   1453 * Calls into the transport layer to do whatever is needed to prepare
   1454 * the request for transmission or receive.
   1455 * Returns error, or zero.
   1456 */
   1457static int
   1458xprt_request_prepare(struct rpc_rqst *req)
   1459{
   1460	struct rpc_xprt *xprt = req->rq_xprt;
   1461
   1462	if (xprt->ops->prepare_request)
   1463		return xprt->ops->prepare_request(req);
   1464	return 0;
   1465}
   1466
   1467/**
   1468 * xprt_request_need_retransmit - Test if a task needs retransmission
   1469 * @task: pointer to rpc_task
   1470 *
   1471 * Test for whether a connection breakage requires the task to retransmit
   1472 */
   1473bool
   1474xprt_request_need_retransmit(struct rpc_task *task)
   1475{
   1476	return xprt_request_retransmit_after_disconnect(task);
   1477}
   1478
   1479/**
   1480 * xprt_prepare_transmit - reserve the transport before sending a request
   1481 * @task: RPC task about to send a request
   1482 *
   1483 */
   1484bool xprt_prepare_transmit(struct rpc_task *task)
   1485{
   1486	struct rpc_rqst	*req = task->tk_rqstp;
   1487	struct rpc_xprt	*xprt = req->rq_xprt;
   1488
   1489	if (!xprt_lock_write(xprt, task)) {
   1490		/* Race breaker: someone may have transmitted us */
   1491		if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
   1492			rpc_wake_up_queued_task_set_status(&xprt->sending,
   1493					task, 0);
   1494		return false;
   1495
   1496	}
   1497	if (atomic_read(&xprt->swapper))
   1498		/* This will be clear in __rpc_execute */
   1499		current->flags |= PF_MEMALLOC;
   1500	return true;
   1501}
   1502
   1503void xprt_end_transmit(struct rpc_task *task)
   1504{
   1505	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
   1506
   1507	xprt_inject_disconnect(xprt);
   1508	xprt_release_write(xprt, task);
   1509}
   1510
   1511/**
   1512 * xprt_request_transmit - send an RPC request on a transport
   1513 * @req: pointer to request to transmit
   1514 * @snd_task: RPC task that owns the transport lock
   1515 *
   1516 * This performs the transmission of a single request.
   1517 * Note that if the request is not the same as snd_task, then it
   1518 * does need to be pinned.
   1519 * Returns '0' on success.
   1520 */
   1521static int
   1522xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
   1523{
   1524	struct rpc_xprt *xprt = req->rq_xprt;
   1525	struct rpc_task *task = req->rq_task;
   1526	unsigned int connect_cookie;
   1527	int is_retrans = RPC_WAS_SENT(task);
   1528	int status;
   1529
   1530	if (!req->rq_bytes_sent) {
   1531		if (xprt_request_data_received(task)) {
   1532			status = 0;
   1533			goto out_dequeue;
   1534		}
   1535		/* Verify that our message lies in the RPCSEC_GSS window */
   1536		if (rpcauth_xmit_need_reencode(task)) {
   1537			status = -EBADMSG;
   1538			goto out_dequeue;
   1539		}
   1540		if (RPC_SIGNALLED(task)) {
   1541			status = -ERESTARTSYS;
   1542			goto out_dequeue;
   1543		}
   1544	}
   1545
   1546	/*
   1547	 * Update req->rq_ntrans before transmitting to avoid races with
   1548	 * xprt_update_rtt(), which needs to know that it is recording a
   1549	 * reply to the first transmission.
   1550	 */
   1551	req->rq_ntrans++;
   1552
   1553	trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
   1554	connect_cookie = xprt->connect_cookie;
   1555	status = xprt->ops->send_request(req);
   1556	if (status != 0) {
   1557		req->rq_ntrans--;
   1558		trace_xprt_transmit(req, status);
   1559		return status;
   1560	}
   1561
   1562	if (is_retrans) {
   1563		task->tk_client->cl_stats->rpcretrans++;
   1564		trace_xprt_retransmit(req);
   1565	}
   1566
   1567	xprt_inject_disconnect(xprt);
   1568
   1569	task->tk_flags |= RPC_TASK_SENT;
   1570	spin_lock(&xprt->transport_lock);
   1571
   1572	xprt->stat.sends++;
   1573	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
   1574	xprt->stat.bklog_u += xprt->backlog.qlen;
   1575	xprt->stat.sending_u += xprt->sending.qlen;
   1576	xprt->stat.pending_u += xprt->pending.qlen;
   1577	spin_unlock(&xprt->transport_lock);
   1578
   1579	req->rq_connect_cookie = connect_cookie;
   1580out_dequeue:
   1581	trace_xprt_transmit(req, status);
   1582	xprt_request_dequeue_transmit(task);
   1583	rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
   1584	return status;
   1585}
   1586
   1587/**
   1588 * xprt_transmit - send an RPC request on a transport
   1589 * @task: controlling RPC task
   1590 *
   1591 * Attempts to drain the transmit queue. On exit, either the transport
   1592 * signalled an error that needs to be handled before transmission can
   1593 * resume, or @task finished transmitting, and detected that it already
   1594 * received a reply.
   1595 */
   1596void
   1597xprt_transmit(struct rpc_task *task)
   1598{
   1599	struct rpc_rqst *next, *req = task->tk_rqstp;
   1600	struct rpc_xprt	*xprt = req->rq_xprt;
   1601	int status;
   1602
   1603	spin_lock(&xprt->queue_lock);
   1604	for (;;) {
   1605		next = list_first_entry_or_null(&xprt->xmit_queue,
   1606						struct rpc_rqst, rq_xmit);
   1607		if (!next)
   1608			break;
   1609		xprt_pin_rqst(next);
   1610		spin_unlock(&xprt->queue_lock);
   1611		status = xprt_request_transmit(next, task);
   1612		if (status == -EBADMSG && next != req)
   1613			status = 0;
   1614		spin_lock(&xprt->queue_lock);
   1615		xprt_unpin_rqst(next);
   1616		if (status < 0) {
   1617			if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
   1618				task->tk_status = status;
   1619			break;
   1620		}
   1621		/* Was @task transmitted, and has it received a reply? */
   1622		if (xprt_request_data_received(task) &&
   1623		    !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
   1624			break;
   1625		cond_resched_lock(&xprt->queue_lock);
   1626	}
   1627	spin_unlock(&xprt->queue_lock);
   1628}
   1629
   1630static void xprt_complete_request_init(struct rpc_task *task)
   1631{
   1632	if (task->tk_rqstp)
   1633		xprt_request_init(task);
   1634}
   1635
   1636void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
   1637{
   1638	set_bit(XPRT_CONGESTED, &xprt->state);
   1639	rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
   1640}
   1641EXPORT_SYMBOL_GPL(xprt_add_backlog);
   1642
   1643static bool __xprt_set_rq(struct rpc_task *task, void *data)
   1644{
   1645	struct rpc_rqst *req = data;
   1646
   1647	if (task->tk_rqstp == NULL) {
   1648		memset(req, 0, sizeof(*req));	/* mark unused */
   1649		task->tk_rqstp = req;
   1650		return true;
   1651	}
   1652	return false;
   1653}
   1654
   1655bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
   1656{
   1657	if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
   1658		clear_bit(XPRT_CONGESTED, &xprt->state);
   1659		return false;
   1660	}
   1661	return true;
   1662}
   1663EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
   1664
   1665static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
   1666{
   1667	bool ret = false;
   1668
   1669	if (!test_bit(XPRT_CONGESTED, &xprt->state))
   1670		goto out;
   1671	spin_lock(&xprt->reserve_lock);
   1672	if (test_bit(XPRT_CONGESTED, &xprt->state)) {
   1673		xprt_add_backlog(xprt, task);
   1674		ret = true;
   1675	}
   1676	spin_unlock(&xprt->reserve_lock);
   1677out:
   1678	return ret;
   1679}
   1680
   1681static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
   1682{
   1683	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
   1684
   1685	if (xprt->num_reqs >= xprt->max_reqs)
   1686		goto out;
   1687	++xprt->num_reqs;
   1688	spin_unlock(&xprt->reserve_lock);
   1689	req = kzalloc(sizeof(*req), rpc_task_gfp_mask());
   1690	spin_lock(&xprt->reserve_lock);
   1691	if (req != NULL)
   1692		goto out;
   1693	--xprt->num_reqs;
   1694	req = ERR_PTR(-ENOMEM);
   1695out:
   1696	return req;
   1697}
   1698
   1699static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
   1700{
   1701	if (xprt->num_reqs > xprt->min_reqs) {
   1702		--xprt->num_reqs;
   1703		kfree(req);
   1704		return true;
   1705	}
   1706	return false;
   1707}
   1708
   1709void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
   1710{
   1711	struct rpc_rqst *req;
   1712
   1713	spin_lock(&xprt->reserve_lock);
   1714	if (!list_empty(&xprt->free)) {
   1715		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
   1716		list_del(&req->rq_list);
   1717		goto out_init_req;
   1718	}
   1719	req = xprt_dynamic_alloc_slot(xprt);
   1720	if (!IS_ERR(req))
   1721		goto out_init_req;
   1722	switch (PTR_ERR(req)) {
   1723	case -ENOMEM:
   1724		dprintk("RPC:       dynamic allocation of request slot "
   1725				"failed! Retrying\n");
   1726		task->tk_status = -ENOMEM;
   1727		break;
   1728	case -EAGAIN:
   1729		xprt_add_backlog(xprt, task);
   1730		dprintk("RPC:       waiting for request slot\n");
   1731		fallthrough;
   1732	default:
   1733		task->tk_status = -EAGAIN;
   1734	}
   1735	spin_unlock(&xprt->reserve_lock);
   1736	return;
   1737out_init_req:
   1738	xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
   1739				     xprt->num_reqs);
   1740	spin_unlock(&xprt->reserve_lock);
   1741
   1742	task->tk_status = 0;
   1743	task->tk_rqstp = req;
   1744}
   1745EXPORT_SYMBOL_GPL(xprt_alloc_slot);
   1746
   1747void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
   1748{
   1749	spin_lock(&xprt->reserve_lock);
   1750	if (!xprt_wake_up_backlog(xprt, req) &&
   1751	    !xprt_dynamic_free_slot(xprt, req)) {
   1752		memset(req, 0, sizeof(*req));	/* mark unused */
   1753		list_add(&req->rq_list, &xprt->free);
   1754	}
   1755	spin_unlock(&xprt->reserve_lock);
   1756}
   1757EXPORT_SYMBOL_GPL(xprt_free_slot);
   1758
   1759static void xprt_free_all_slots(struct rpc_xprt *xprt)
   1760{
   1761	struct rpc_rqst *req;
   1762	while (!list_empty(&xprt->free)) {
   1763		req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
   1764		list_del(&req->rq_list);
   1765		kfree(req);
   1766	}
   1767}
   1768
   1769static DEFINE_IDA(rpc_xprt_ids);
   1770
   1771void xprt_cleanup_ids(void)
   1772{
   1773	ida_destroy(&rpc_xprt_ids);
   1774}
   1775
   1776static int xprt_alloc_id(struct rpc_xprt *xprt)
   1777{
   1778	int id;
   1779
   1780	id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL);
   1781	if (id < 0)
   1782		return id;
   1783
   1784	xprt->id = id;
   1785	return 0;
   1786}
   1787
   1788static void xprt_free_id(struct rpc_xprt *xprt)
   1789{
   1790	ida_simple_remove(&rpc_xprt_ids, xprt->id);
   1791}
   1792
   1793struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
   1794		unsigned int num_prealloc,
   1795		unsigned int max_alloc)
   1796{
   1797	struct rpc_xprt *xprt;
   1798	struct rpc_rqst *req;
   1799	int i;
   1800
   1801	xprt = kzalloc(size, GFP_KERNEL);
   1802	if (xprt == NULL)
   1803		goto out;
   1804
   1805	xprt_alloc_id(xprt);
   1806	xprt_init(xprt, net);
   1807
   1808	for (i = 0; i < num_prealloc; i++) {
   1809		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
   1810		if (!req)
   1811			goto out_free;
   1812		list_add(&req->rq_list, &xprt->free);
   1813	}
   1814	if (max_alloc > num_prealloc)
   1815		xprt->max_reqs = max_alloc;
   1816	else
   1817		xprt->max_reqs = num_prealloc;
   1818	xprt->min_reqs = num_prealloc;
   1819	xprt->num_reqs = num_prealloc;
   1820
   1821	return xprt;
   1822
   1823out_free:
   1824	xprt_free(xprt);
   1825out:
   1826	return NULL;
   1827}
   1828EXPORT_SYMBOL_GPL(xprt_alloc);
   1829
   1830void xprt_free(struct rpc_xprt *xprt)
   1831{
   1832	put_net_track(xprt->xprt_net, &xprt->ns_tracker);
   1833	xprt_free_all_slots(xprt);
   1834	xprt_free_id(xprt);
   1835	rpc_sysfs_xprt_destroy(xprt);
   1836	kfree_rcu(xprt, rcu);
   1837}
   1838EXPORT_SYMBOL_GPL(xprt_free);
   1839
   1840static void
   1841xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
   1842{
   1843	req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
   1844}
   1845
   1846static __be32
   1847xprt_alloc_xid(struct rpc_xprt *xprt)
   1848{
   1849	__be32 xid;
   1850
   1851	spin_lock(&xprt->reserve_lock);
   1852	xid = (__force __be32)xprt->xid++;
   1853	spin_unlock(&xprt->reserve_lock);
   1854	return xid;
   1855}
   1856
   1857static void
   1858xprt_init_xid(struct rpc_xprt *xprt)
   1859{
   1860	xprt->xid = prandom_u32();
   1861}
   1862
   1863static void
   1864xprt_request_init(struct rpc_task *task)
   1865{
   1866	struct rpc_xprt *xprt = task->tk_xprt;
   1867	struct rpc_rqst	*req = task->tk_rqstp;
   1868
   1869	req->rq_task	= task;
   1870	req->rq_xprt    = xprt;
   1871	req->rq_buffer  = NULL;
   1872	req->rq_xid	= xprt_alloc_xid(xprt);
   1873	xprt_init_connect_cookie(req, xprt);
   1874	req->rq_snd_buf.len = 0;
   1875	req->rq_snd_buf.buflen = 0;
   1876	req->rq_rcv_buf.len = 0;
   1877	req->rq_rcv_buf.buflen = 0;
   1878	req->rq_snd_buf.bvec = NULL;
   1879	req->rq_rcv_buf.bvec = NULL;
   1880	req->rq_release_snd_buf = NULL;
   1881	xprt_init_majortimeo(task, req);
   1882
   1883	trace_xprt_reserve(req);
   1884}
   1885
   1886static void
   1887xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
   1888{
   1889	xprt->ops->alloc_slot(xprt, task);
   1890	if (task->tk_rqstp != NULL)
   1891		xprt_request_init(task);
   1892}
   1893
   1894/**
   1895 * xprt_reserve - allocate an RPC request slot
   1896 * @task: RPC task requesting a slot allocation
   1897 *
   1898 * If the transport is marked as being congested, or if no more
   1899 * slots are available, place the task on the transport's
   1900 * backlog queue.
   1901 */
   1902void xprt_reserve(struct rpc_task *task)
   1903{
   1904	struct rpc_xprt *xprt = task->tk_xprt;
   1905
   1906	task->tk_status = 0;
   1907	if (task->tk_rqstp != NULL)
   1908		return;
   1909
   1910	task->tk_status = -EAGAIN;
   1911	if (!xprt_throttle_congested(xprt, task))
   1912		xprt_do_reserve(xprt, task);
   1913}
   1914
   1915/**
   1916 * xprt_retry_reserve - allocate an RPC request slot
   1917 * @task: RPC task requesting a slot allocation
   1918 *
   1919 * If no more slots are available, place the task on the transport's
   1920 * backlog queue.
   1921 * Note that the only difference with xprt_reserve is that we now
   1922 * ignore the value of the XPRT_CONGESTED flag.
   1923 */
   1924void xprt_retry_reserve(struct rpc_task *task)
   1925{
   1926	struct rpc_xprt *xprt = task->tk_xprt;
   1927
   1928	task->tk_status = 0;
   1929	if (task->tk_rqstp != NULL)
   1930		return;
   1931
   1932	task->tk_status = -EAGAIN;
   1933	xprt_do_reserve(xprt, task);
   1934}
   1935
   1936/**
   1937 * xprt_release - release an RPC request slot
   1938 * @task: task which is finished with the slot
   1939 *
   1940 */
   1941void xprt_release(struct rpc_task *task)
   1942{
   1943	struct rpc_xprt	*xprt;
   1944	struct rpc_rqst	*req = task->tk_rqstp;
   1945
   1946	if (req == NULL) {
   1947		if (task->tk_client) {
   1948			xprt = task->tk_xprt;
   1949			xprt_release_write(xprt, task);
   1950		}
   1951		return;
   1952	}
   1953
   1954	xprt = req->rq_xprt;
   1955	xprt_request_dequeue_xprt(task);
   1956	spin_lock(&xprt->transport_lock);
   1957	xprt->ops->release_xprt(xprt, task);
   1958	if (xprt->ops->release_request)
   1959		xprt->ops->release_request(task);
   1960	xprt_schedule_autodisconnect(xprt);
   1961	spin_unlock(&xprt->transport_lock);
   1962	if (req->rq_buffer)
   1963		xprt->ops->buf_free(task);
   1964	xdr_free_bvec(&req->rq_rcv_buf);
   1965	xdr_free_bvec(&req->rq_snd_buf);
   1966	if (req->rq_cred != NULL)
   1967		put_rpccred(req->rq_cred);
   1968	if (req->rq_release_snd_buf)
   1969		req->rq_release_snd_buf(req);
   1970
   1971	task->tk_rqstp = NULL;
   1972	if (likely(!bc_prealloc(req)))
   1973		xprt->ops->free_slot(xprt, req);
   1974	else
   1975		xprt_free_bc_request(req);
   1976}
   1977
   1978#ifdef CONFIG_SUNRPC_BACKCHANNEL
   1979void
   1980xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
   1981{
   1982	struct xdr_buf *xbufp = &req->rq_snd_buf;
   1983
   1984	task->tk_rqstp = req;
   1985	req->rq_task = task;
   1986	xprt_init_connect_cookie(req, req->rq_xprt);
   1987	/*
   1988	 * Set up the xdr_buf length.
   1989	 * This also indicates that the buffer is XDR encoded already.
   1990	 */
   1991	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
   1992		xbufp->tail[0].iov_len;
   1993}
   1994#endif
   1995
   1996static void xprt_init(struct rpc_xprt *xprt, struct net *net)
   1997{
   1998	kref_init(&xprt->kref);
   1999
   2000	spin_lock_init(&xprt->transport_lock);
   2001	spin_lock_init(&xprt->reserve_lock);
   2002	spin_lock_init(&xprt->queue_lock);
   2003
   2004	INIT_LIST_HEAD(&xprt->free);
   2005	xprt->recv_queue = RB_ROOT;
   2006	INIT_LIST_HEAD(&xprt->xmit_queue);
   2007#if defined(CONFIG_SUNRPC_BACKCHANNEL)
   2008	spin_lock_init(&xprt->bc_pa_lock);
   2009	INIT_LIST_HEAD(&xprt->bc_pa_list);
   2010#endif /* CONFIG_SUNRPC_BACKCHANNEL */
   2011	INIT_LIST_HEAD(&xprt->xprt_switch);
   2012
   2013	xprt->last_used = jiffies;
   2014	xprt->cwnd = RPC_INITCWND;
   2015	xprt->bind_index = 0;
   2016
   2017	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
   2018	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
   2019	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
   2020	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
   2021
   2022	xprt_init_xid(xprt);
   2023
   2024	xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL);
   2025}
   2026
   2027/**
   2028 * xprt_create_transport - create an RPC transport
   2029 * @args: rpc transport creation arguments
   2030 *
   2031 */
   2032struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
   2033{
   2034	struct rpc_xprt	*xprt;
   2035	const struct xprt_class *t;
   2036
   2037	t = xprt_class_find_by_ident(args->ident);
   2038	if (!t) {
   2039		dprintk("RPC: transport (%d) not supported\n", args->ident);
   2040		return ERR_PTR(-EIO);
   2041	}
   2042
   2043	xprt = t->setup(args);
   2044	xprt_class_release(t);
   2045
   2046	if (IS_ERR(xprt))
   2047		goto out;
   2048	if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
   2049		xprt->idle_timeout = 0;
   2050	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
   2051	if (xprt_has_timer(xprt))
   2052		timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
   2053	else
   2054		timer_setup(&xprt->timer, NULL, 0);
   2055
   2056	if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
   2057		xprt_destroy(xprt);
   2058		return ERR_PTR(-EINVAL);
   2059	}
   2060	xprt->servername = kstrdup(args->servername, GFP_KERNEL);
   2061	if (xprt->servername == NULL) {
   2062		xprt_destroy(xprt);
   2063		return ERR_PTR(-ENOMEM);
   2064	}
   2065
   2066	rpc_xprt_debugfs_register(xprt);
   2067
   2068	trace_xprt_create(xprt);
   2069out:
   2070	return xprt;
   2071}
   2072
   2073static void xprt_destroy_cb(struct work_struct *work)
   2074{
   2075	struct rpc_xprt *xprt =
   2076		container_of(work, struct rpc_xprt, task_cleanup);
   2077
   2078	trace_xprt_destroy(xprt);
   2079
   2080	rpc_xprt_debugfs_unregister(xprt);
   2081	rpc_destroy_wait_queue(&xprt->binding);
   2082	rpc_destroy_wait_queue(&xprt->pending);
   2083	rpc_destroy_wait_queue(&xprt->sending);
   2084	rpc_destroy_wait_queue(&xprt->backlog);
   2085	kfree(xprt->servername);
   2086	/*
   2087	 * Destroy any existing back channel
   2088	 */
   2089	xprt_destroy_backchannel(xprt, UINT_MAX);
   2090
   2091	/*
   2092	 * Tear down transport state and free the rpc_xprt
   2093	 */
   2094	xprt->ops->destroy(xprt);
   2095}
   2096
   2097/**
   2098 * xprt_destroy - destroy an RPC transport, killing off all requests.
   2099 * @xprt: transport to destroy
   2100 *
   2101 */
   2102static void xprt_destroy(struct rpc_xprt *xprt)
   2103{
   2104	/*
   2105	 * Exclude transport connect/disconnect handlers and autoclose
   2106	 */
   2107	wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
   2108
   2109	/*
   2110	 * xprt_schedule_autodisconnect() can run after XPRT_LOCKED
   2111	 * is cleared.  We use ->transport_lock to ensure the mod_timer()
   2112	 * can only run *before* del_time_sync(), never after.
   2113	 */
   2114	spin_lock(&xprt->transport_lock);
   2115	del_timer_sync(&xprt->timer);
   2116	spin_unlock(&xprt->transport_lock);
   2117
   2118	/*
   2119	 * Destroy sockets etc from the system workqueue so they can
   2120	 * safely flush receive work running on rpciod.
   2121	 */
   2122	INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
   2123	schedule_work(&xprt->task_cleanup);
   2124}
   2125
   2126static void xprt_destroy_kref(struct kref *kref)
   2127{
   2128	xprt_destroy(container_of(kref, struct rpc_xprt, kref));
   2129}
   2130
   2131/**
   2132 * xprt_get - return a reference to an RPC transport.
   2133 * @xprt: pointer to the transport
   2134 *
   2135 */
   2136struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
   2137{
   2138	if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
   2139		return xprt;
   2140	return NULL;
   2141}
   2142EXPORT_SYMBOL_GPL(xprt_get);
   2143
   2144/**
   2145 * xprt_put - release a reference to an RPC transport.
   2146 * @xprt: pointer to the transport
   2147 *
   2148 */
   2149void xprt_put(struct rpc_xprt *xprt)
   2150{
   2151	if (xprt != NULL)
   2152		kref_put(&xprt->kref, xprt_destroy_kref);
   2153}
   2154EXPORT_SYMBOL_GPL(xprt_put);