xprt.c (55155B)
1// SPDX-License-Identifier: GPL-2.0-only 2/* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41#include <linux/module.h> 42 43#include <linux/types.h> 44#include <linux/interrupt.h> 45#include <linux/workqueue.h> 46#include <linux/net.h> 47#include <linux/ktime.h> 48 49#include <linux/sunrpc/clnt.h> 50#include <linux/sunrpc/metrics.h> 51#include <linux/sunrpc/bc_xprt.h> 52#include <linux/rcupdate.h> 53#include <linux/sched/mm.h> 54 55#include <trace/events/sunrpc.h> 56 57#include "sunrpc.h" 58#include "sysfs.h" 59#include "fail.h" 60 61/* 62 * Local variables 63 */ 64 65#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66# define RPCDBG_FACILITY RPCDBG_XPRT 67#endif 68 69/* 70 * Local functions 71 */ 72static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74static void xprt_destroy(struct rpc_xprt *xprt); 75static void xprt_request_init(struct rpc_task *task); 76static int xprt_request_prepare(struct rpc_rqst *req); 77 78static DEFINE_SPINLOCK(xprt_list_lock); 79static LIST_HEAD(xprt_list); 80 81static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 82{ 83 unsigned long timeout = jiffies + req->rq_timeout; 84 85 if (time_before(timeout, req->rq_majortimeo)) 86 return timeout; 87 return req->rq_majortimeo; 88} 89 90/** 91 * xprt_register_transport - register a transport implementation 92 * @transport: transport to register 93 * 94 * If a transport implementation is loaded as a kernel module, it can 95 * call this interface to make itself known to the RPC client. 96 * 97 * Returns: 98 * 0: transport successfully registered 99 * -EEXIST: transport already registered 100 * -EINVAL: transport module being unloaded 101 */ 102int xprt_register_transport(struct xprt_class *transport) 103{ 104 struct xprt_class *t; 105 int result; 106 107 result = -EEXIST; 108 spin_lock(&xprt_list_lock); 109 list_for_each_entry(t, &xprt_list, list) { 110 /* don't register the same transport class twice */ 111 if (t->ident == transport->ident) 112 goto out; 113 } 114 115 list_add_tail(&transport->list, &xprt_list); 116 printk(KERN_INFO "RPC: Registered %s transport module.\n", 117 transport->name); 118 result = 0; 119 120out: 121 spin_unlock(&xprt_list_lock); 122 return result; 123} 124EXPORT_SYMBOL_GPL(xprt_register_transport); 125 126/** 127 * xprt_unregister_transport - unregister a transport implementation 128 * @transport: transport to unregister 129 * 130 * Returns: 131 * 0: transport successfully unregistered 132 * -ENOENT: transport never registered 133 */ 134int xprt_unregister_transport(struct xprt_class *transport) 135{ 136 struct xprt_class *t; 137 int result; 138 139 result = 0; 140 spin_lock(&xprt_list_lock); 141 list_for_each_entry(t, &xprt_list, list) { 142 if (t == transport) { 143 printk(KERN_INFO 144 "RPC: Unregistered %s transport module.\n", 145 transport->name); 146 list_del_init(&transport->list); 147 goto out; 148 } 149 } 150 result = -ENOENT; 151 152out: 153 spin_unlock(&xprt_list_lock); 154 return result; 155} 156EXPORT_SYMBOL_GPL(xprt_unregister_transport); 157 158static void 159xprt_class_release(const struct xprt_class *t) 160{ 161 module_put(t->owner); 162} 163 164static const struct xprt_class * 165xprt_class_find_by_ident_locked(int ident) 166{ 167 const struct xprt_class *t; 168 169 list_for_each_entry(t, &xprt_list, list) { 170 if (t->ident != ident) 171 continue; 172 if (!try_module_get(t->owner)) 173 continue; 174 return t; 175 } 176 return NULL; 177} 178 179static const struct xprt_class * 180xprt_class_find_by_ident(int ident) 181{ 182 const struct xprt_class *t; 183 184 spin_lock(&xprt_list_lock); 185 t = xprt_class_find_by_ident_locked(ident); 186 spin_unlock(&xprt_list_lock); 187 return t; 188} 189 190static const struct xprt_class * 191xprt_class_find_by_netid_locked(const char *netid) 192{ 193 const struct xprt_class *t; 194 unsigned int i; 195 196 list_for_each_entry(t, &xprt_list, list) { 197 for (i = 0; t->netid[i][0] != '\0'; i++) { 198 if (strcmp(t->netid[i], netid) != 0) 199 continue; 200 if (!try_module_get(t->owner)) 201 continue; 202 return t; 203 } 204 } 205 return NULL; 206} 207 208static const struct xprt_class * 209xprt_class_find_by_netid(const char *netid) 210{ 211 const struct xprt_class *t; 212 213 spin_lock(&xprt_list_lock); 214 t = xprt_class_find_by_netid_locked(netid); 215 if (!t) { 216 spin_unlock(&xprt_list_lock); 217 request_module("rpc%s", netid); 218 spin_lock(&xprt_list_lock); 219 t = xprt_class_find_by_netid_locked(netid); 220 } 221 spin_unlock(&xprt_list_lock); 222 return t; 223} 224 225/** 226 * xprt_find_transport_ident - convert a netid into a transport identifier 227 * @netid: transport to load 228 * 229 * Returns: 230 * > 0: transport identifier 231 * -ENOENT: transport module not available 232 */ 233int xprt_find_transport_ident(const char *netid) 234{ 235 const struct xprt_class *t; 236 int ret; 237 238 t = xprt_class_find_by_netid(netid); 239 if (!t) 240 return -ENOENT; 241 ret = t->ident; 242 xprt_class_release(t); 243 return ret; 244} 245EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 246 247static void xprt_clear_locked(struct rpc_xprt *xprt) 248{ 249 xprt->snd_task = NULL; 250 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 251 clear_bit_unlock(XPRT_LOCKED, &xprt->state); 252 else 253 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 254} 255 256/** 257 * xprt_reserve_xprt - serialize write access to transports 258 * @task: task that is requesting access to the transport 259 * @xprt: pointer to the target transport 260 * 261 * This prevents mixing the payload of separate requests, and prevents 262 * transport connects from colliding with writes. No congestion control 263 * is provided. 264 */ 265int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 266{ 267 struct rpc_rqst *req = task->tk_rqstp; 268 269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 270 if (task == xprt->snd_task) 271 goto out_locked; 272 goto out_sleep; 273 } 274 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 275 goto out_unlock; 276 xprt->snd_task = task; 277 278out_locked: 279 trace_xprt_reserve_xprt(xprt, task); 280 return 1; 281 282out_unlock: 283 xprt_clear_locked(xprt); 284out_sleep: 285 task->tk_status = -EAGAIN; 286 if (RPC_IS_SOFT(task)) 287 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 288 xprt_request_timeout(req)); 289 else 290 rpc_sleep_on(&xprt->sending, task, NULL); 291 return 0; 292} 293EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 294 295static bool 296xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 297{ 298 return test_bit(XPRT_CWND_WAIT, &xprt->state); 299} 300 301static void 302xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 303{ 304 if (!list_empty(&xprt->xmit_queue)) { 305 /* Peek at head of queue to see if it can make progress */ 306 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 307 rq_xmit)->rq_cong) 308 return; 309 } 310 set_bit(XPRT_CWND_WAIT, &xprt->state); 311} 312 313static void 314xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 315{ 316 if (!RPCXPRT_CONGESTED(xprt)) 317 clear_bit(XPRT_CWND_WAIT, &xprt->state); 318} 319 320/* 321 * xprt_reserve_xprt_cong - serialize write access to transports 322 * @task: task that is requesting access to the transport 323 * 324 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 325 * integrated into the decision of whether a request is allowed to be 326 * woken up and given access to the transport. 327 * Note that the lock is only granted if we know there are free slots. 328 */ 329int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 330{ 331 struct rpc_rqst *req = task->tk_rqstp; 332 333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 334 if (task == xprt->snd_task) 335 goto out_locked; 336 goto out_sleep; 337 } 338 if (req == NULL) { 339 xprt->snd_task = task; 340 goto out_locked; 341 } 342 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 343 goto out_unlock; 344 if (!xprt_need_congestion_window_wait(xprt)) { 345 xprt->snd_task = task; 346 goto out_locked; 347 } 348out_unlock: 349 xprt_clear_locked(xprt); 350out_sleep: 351 task->tk_status = -EAGAIN; 352 if (RPC_IS_SOFT(task)) 353 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 354 xprt_request_timeout(req)); 355 else 356 rpc_sleep_on(&xprt->sending, task, NULL); 357 return 0; 358out_locked: 359 trace_xprt_reserve_cong(xprt, task); 360 return 1; 361} 362EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 363 364static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 365{ 366 int retval; 367 368 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 369 return 1; 370 spin_lock(&xprt->transport_lock); 371 retval = xprt->ops->reserve_xprt(xprt, task); 372 spin_unlock(&xprt->transport_lock); 373 return retval; 374} 375 376static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 377{ 378 struct rpc_xprt *xprt = data; 379 380 xprt->snd_task = task; 381 return true; 382} 383 384static void __xprt_lock_write_next(struct rpc_xprt *xprt) 385{ 386 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 387 return; 388 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 389 goto out_unlock; 390 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 391 __xprt_lock_write_func, xprt)) 392 return; 393out_unlock: 394 xprt_clear_locked(xprt); 395} 396 397static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 398{ 399 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 400 return; 401 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 402 goto out_unlock; 403 if (xprt_need_congestion_window_wait(xprt)) 404 goto out_unlock; 405 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 406 __xprt_lock_write_func, xprt)) 407 return; 408out_unlock: 409 xprt_clear_locked(xprt); 410} 411 412/** 413 * xprt_release_xprt - allow other requests to use a transport 414 * @xprt: transport with other tasks potentially waiting 415 * @task: task that is releasing access to the transport 416 * 417 * Note that "task" can be NULL. No congestion control is provided. 418 */ 419void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 420{ 421 if (xprt->snd_task == task) { 422 xprt_clear_locked(xprt); 423 __xprt_lock_write_next(xprt); 424 } 425 trace_xprt_release_xprt(xprt, task); 426} 427EXPORT_SYMBOL_GPL(xprt_release_xprt); 428 429/** 430 * xprt_release_xprt_cong - allow other requests to use a transport 431 * @xprt: transport with other tasks potentially waiting 432 * @task: task that is releasing access to the transport 433 * 434 * Note that "task" can be NULL. Another task is awoken to use the 435 * transport if the transport's congestion window allows it. 436 */ 437void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 438{ 439 if (xprt->snd_task == task) { 440 xprt_clear_locked(xprt); 441 __xprt_lock_write_next_cong(xprt); 442 } 443 trace_xprt_release_cong(xprt, task); 444} 445EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 446 447void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 448{ 449 if (xprt->snd_task != task) 450 return; 451 spin_lock(&xprt->transport_lock); 452 xprt->ops->release_xprt(xprt, task); 453 spin_unlock(&xprt->transport_lock); 454} 455 456/* 457 * Van Jacobson congestion avoidance. Check if the congestion window 458 * overflowed. Put the task to sleep if this is the case. 459 */ 460static int 461__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 462{ 463 if (req->rq_cong) 464 return 1; 465 trace_xprt_get_cong(xprt, req->rq_task); 466 if (RPCXPRT_CONGESTED(xprt)) { 467 xprt_set_congestion_window_wait(xprt); 468 return 0; 469 } 470 req->rq_cong = 1; 471 xprt->cong += RPC_CWNDSCALE; 472 return 1; 473} 474 475/* 476 * Adjust the congestion window, and wake up the next task 477 * that has been sleeping due to congestion 478 */ 479static void 480__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 481{ 482 if (!req->rq_cong) 483 return; 484 req->rq_cong = 0; 485 xprt->cong -= RPC_CWNDSCALE; 486 xprt_test_and_clear_congestion_window_wait(xprt); 487 trace_xprt_put_cong(xprt, req->rq_task); 488 __xprt_lock_write_next_cong(xprt); 489} 490 491/** 492 * xprt_request_get_cong - Request congestion control credits 493 * @xprt: pointer to transport 494 * @req: pointer to RPC request 495 * 496 * Useful for transports that require congestion control. 497 */ 498bool 499xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 500{ 501 bool ret = false; 502 503 if (req->rq_cong) 504 return true; 505 spin_lock(&xprt->transport_lock); 506 ret = __xprt_get_cong(xprt, req) != 0; 507 spin_unlock(&xprt->transport_lock); 508 return ret; 509} 510EXPORT_SYMBOL_GPL(xprt_request_get_cong); 511 512/** 513 * xprt_release_rqst_cong - housekeeping when request is complete 514 * @task: RPC request that recently completed 515 * 516 * Useful for transports that require congestion control. 517 */ 518void xprt_release_rqst_cong(struct rpc_task *task) 519{ 520 struct rpc_rqst *req = task->tk_rqstp; 521 522 __xprt_put_cong(req->rq_xprt, req); 523} 524EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 525 526static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 527{ 528 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 529 __xprt_lock_write_next_cong(xprt); 530} 531 532/* 533 * Clear the congestion window wait flag and wake up the next 534 * entry on xprt->sending 535 */ 536static void 537xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 538{ 539 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 540 spin_lock(&xprt->transport_lock); 541 __xprt_lock_write_next_cong(xprt); 542 spin_unlock(&xprt->transport_lock); 543 } 544} 545 546/** 547 * xprt_adjust_cwnd - adjust transport congestion window 548 * @xprt: pointer to xprt 549 * @task: recently completed RPC request used to adjust window 550 * @result: result code of completed RPC request 551 * 552 * The transport code maintains an estimate on the maximum number of out- 553 * standing RPC requests, using a smoothed version of the congestion 554 * avoidance implemented in 44BSD. This is basically the Van Jacobson 555 * congestion algorithm: If a retransmit occurs, the congestion window is 556 * halved; otherwise, it is incremented by 1/cwnd when 557 * 558 * - a reply is received and 559 * - a full number of requests are outstanding and 560 * - the congestion window hasn't been updated recently. 561 */ 562void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 563{ 564 struct rpc_rqst *req = task->tk_rqstp; 565 unsigned long cwnd = xprt->cwnd; 566 567 if (result >= 0 && cwnd <= xprt->cong) { 568 /* The (cwnd >> 1) term makes sure 569 * the result gets rounded properly. */ 570 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 571 if (cwnd > RPC_MAXCWND(xprt)) 572 cwnd = RPC_MAXCWND(xprt); 573 __xprt_lock_write_next_cong(xprt); 574 } else if (result == -ETIMEDOUT) { 575 cwnd >>= 1; 576 if (cwnd < RPC_CWNDSCALE) 577 cwnd = RPC_CWNDSCALE; 578 } 579 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 580 xprt->cong, xprt->cwnd, cwnd); 581 xprt->cwnd = cwnd; 582 __xprt_put_cong(xprt, req); 583} 584EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 585 586/** 587 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 588 * @xprt: transport with waiting tasks 589 * @status: result code to plant in each task before waking it 590 * 591 */ 592void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 593{ 594 if (status < 0) 595 rpc_wake_up_status(&xprt->pending, status); 596 else 597 rpc_wake_up(&xprt->pending); 598} 599EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 600 601/** 602 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 603 * @xprt: transport 604 * 605 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 606 * we don't in general want to force a socket disconnection due to 607 * an incomplete RPC call transmission. 608 */ 609void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 610{ 611 set_bit(XPRT_WRITE_SPACE, &xprt->state); 612} 613EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 614 615static bool 616xprt_clear_write_space_locked(struct rpc_xprt *xprt) 617{ 618 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 619 __xprt_lock_write_next(xprt); 620 dprintk("RPC: write space: waking waiting task on " 621 "xprt %p\n", xprt); 622 return true; 623 } 624 return false; 625} 626 627/** 628 * xprt_write_space - wake the task waiting for transport output buffer space 629 * @xprt: transport with waiting tasks 630 * 631 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 632 */ 633bool xprt_write_space(struct rpc_xprt *xprt) 634{ 635 bool ret; 636 637 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 638 return false; 639 spin_lock(&xprt->transport_lock); 640 ret = xprt_clear_write_space_locked(xprt); 641 spin_unlock(&xprt->transport_lock); 642 return ret; 643} 644EXPORT_SYMBOL_GPL(xprt_write_space); 645 646static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 647{ 648 s64 delta = ktime_to_ns(ktime_get() - abstime); 649 return likely(delta >= 0) ? 650 jiffies - nsecs_to_jiffies(delta) : 651 jiffies + nsecs_to_jiffies(-delta); 652} 653 654static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 655{ 656 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 657 unsigned long majortimeo = req->rq_timeout; 658 659 if (to->to_exponential) 660 majortimeo <<= to->to_retries; 661 else 662 majortimeo += to->to_increment * to->to_retries; 663 if (majortimeo > to->to_maxval || majortimeo == 0) 664 majortimeo = to->to_maxval; 665 return majortimeo; 666} 667 668static void xprt_reset_majortimeo(struct rpc_rqst *req) 669{ 670 req->rq_majortimeo += xprt_calc_majortimeo(req); 671} 672 673static void xprt_reset_minortimeo(struct rpc_rqst *req) 674{ 675 req->rq_minortimeo += req->rq_timeout; 676} 677 678static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 679{ 680 unsigned long time_init; 681 struct rpc_xprt *xprt = req->rq_xprt; 682 683 if (likely(xprt && xprt_connected(xprt))) 684 time_init = jiffies; 685 else 686 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 687 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 688 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 689 req->rq_minortimeo = time_init + req->rq_timeout; 690} 691 692/** 693 * xprt_adjust_timeout - adjust timeout values for next retransmit 694 * @req: RPC request containing parameters to use for the adjustment 695 * 696 */ 697int xprt_adjust_timeout(struct rpc_rqst *req) 698{ 699 struct rpc_xprt *xprt = req->rq_xprt; 700 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 701 int status = 0; 702 703 if (time_before(jiffies, req->rq_majortimeo)) { 704 if (time_before(jiffies, req->rq_minortimeo)) 705 return status; 706 if (to->to_exponential) 707 req->rq_timeout <<= 1; 708 else 709 req->rq_timeout += to->to_increment; 710 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 711 req->rq_timeout = to->to_maxval; 712 req->rq_retries++; 713 } else { 714 req->rq_timeout = to->to_initval; 715 req->rq_retries = 0; 716 xprt_reset_majortimeo(req); 717 /* Reset the RTT counters == "slow start" */ 718 spin_lock(&xprt->transport_lock); 719 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 720 spin_unlock(&xprt->transport_lock); 721 status = -ETIMEDOUT; 722 } 723 xprt_reset_minortimeo(req); 724 725 if (req->rq_timeout == 0) { 726 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 727 req->rq_timeout = 5 * HZ; 728 } 729 return status; 730} 731 732static void xprt_autoclose(struct work_struct *work) 733{ 734 struct rpc_xprt *xprt = 735 container_of(work, struct rpc_xprt, task_cleanup); 736 unsigned int pflags = memalloc_nofs_save(); 737 738 trace_xprt_disconnect_auto(xprt); 739 xprt->connect_cookie++; 740 smp_mb__before_atomic(); 741 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 742 xprt->ops->close(xprt); 743 xprt_release_write(xprt, NULL); 744 wake_up_bit(&xprt->state, XPRT_LOCKED); 745 memalloc_nofs_restore(pflags); 746} 747 748/** 749 * xprt_disconnect_done - mark a transport as disconnected 750 * @xprt: transport to flag for disconnect 751 * 752 */ 753void xprt_disconnect_done(struct rpc_xprt *xprt) 754{ 755 trace_xprt_disconnect_done(xprt); 756 spin_lock(&xprt->transport_lock); 757 xprt_clear_connected(xprt); 758 xprt_clear_write_space_locked(xprt); 759 xprt_clear_congestion_window_wait_locked(xprt); 760 xprt_wake_pending_tasks(xprt, -ENOTCONN); 761 spin_unlock(&xprt->transport_lock); 762} 763EXPORT_SYMBOL_GPL(xprt_disconnect_done); 764 765/** 766 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call 767 * @xprt: transport to disconnect 768 */ 769static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) 770{ 771 if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state)) 772 return; 773 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 774 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 775 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 776 rpc_wake_up_queued_task_set_status(&xprt->pending, 777 xprt->snd_task, -ENOTCONN); 778} 779 780/** 781 * xprt_force_disconnect - force a transport to disconnect 782 * @xprt: transport to disconnect 783 * 784 */ 785void xprt_force_disconnect(struct rpc_xprt *xprt) 786{ 787 trace_xprt_disconnect_force(xprt); 788 789 /* Don't race with the test_bit() in xprt_clear_locked() */ 790 spin_lock(&xprt->transport_lock); 791 xprt_schedule_autoclose_locked(xprt); 792 spin_unlock(&xprt->transport_lock); 793} 794EXPORT_SYMBOL_GPL(xprt_force_disconnect); 795 796static unsigned int 797xprt_connect_cookie(struct rpc_xprt *xprt) 798{ 799 return READ_ONCE(xprt->connect_cookie); 800} 801 802static bool 803xprt_request_retransmit_after_disconnect(struct rpc_task *task) 804{ 805 struct rpc_rqst *req = task->tk_rqstp; 806 struct rpc_xprt *xprt = req->rq_xprt; 807 808 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 809 !xprt_connected(xprt); 810} 811 812/** 813 * xprt_conditional_disconnect - force a transport to disconnect 814 * @xprt: transport to disconnect 815 * @cookie: 'connection cookie' 816 * 817 * This attempts to break the connection if and only if 'cookie' matches 818 * the current transport 'connection cookie'. It ensures that we don't 819 * try to break the connection more than once when we need to retransmit 820 * a batch of RPC requests. 821 * 822 */ 823void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 824{ 825 /* Don't race with the test_bit() in xprt_clear_locked() */ 826 spin_lock(&xprt->transport_lock); 827 if (cookie != xprt->connect_cookie) 828 goto out; 829 if (test_bit(XPRT_CLOSING, &xprt->state)) 830 goto out; 831 xprt_schedule_autoclose_locked(xprt); 832out: 833 spin_unlock(&xprt->transport_lock); 834} 835 836static bool 837xprt_has_timer(const struct rpc_xprt *xprt) 838{ 839 return xprt->idle_timeout != 0; 840} 841 842static void 843xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 844 __must_hold(&xprt->transport_lock) 845{ 846 xprt->last_used = jiffies; 847 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 848 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 849} 850 851static void 852xprt_init_autodisconnect(struct timer_list *t) 853{ 854 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 855 856 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 857 return; 858 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 859 xprt->last_used = jiffies; 860 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 861 return; 862 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 863} 864 865#if IS_ENABLED(CONFIG_FAIL_SUNRPC) 866static void xprt_inject_disconnect(struct rpc_xprt *xprt) 867{ 868 if (!fail_sunrpc.ignore_client_disconnect && 869 should_fail(&fail_sunrpc.attr, 1)) 870 xprt->ops->inject_disconnect(xprt); 871} 872#else 873static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 874{ 875} 876#endif 877 878bool xprt_lock_connect(struct rpc_xprt *xprt, 879 struct rpc_task *task, 880 void *cookie) 881{ 882 bool ret = false; 883 884 spin_lock(&xprt->transport_lock); 885 if (!test_bit(XPRT_LOCKED, &xprt->state)) 886 goto out; 887 if (xprt->snd_task != task) 888 goto out; 889 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 890 xprt->snd_task = cookie; 891 ret = true; 892out: 893 spin_unlock(&xprt->transport_lock); 894 return ret; 895} 896EXPORT_SYMBOL_GPL(xprt_lock_connect); 897 898void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 899{ 900 spin_lock(&xprt->transport_lock); 901 if (xprt->snd_task != cookie) 902 goto out; 903 if (!test_bit(XPRT_LOCKED, &xprt->state)) 904 goto out; 905 xprt->snd_task =NULL; 906 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 907 xprt->ops->release_xprt(xprt, NULL); 908 xprt_schedule_autodisconnect(xprt); 909out: 910 spin_unlock(&xprt->transport_lock); 911 wake_up_bit(&xprt->state, XPRT_LOCKED); 912} 913EXPORT_SYMBOL_GPL(xprt_unlock_connect); 914 915/** 916 * xprt_connect - schedule a transport connect operation 917 * @task: RPC task that is requesting the connect 918 * 919 */ 920void xprt_connect(struct rpc_task *task) 921{ 922 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 923 924 trace_xprt_connect(xprt); 925 926 if (!xprt_bound(xprt)) { 927 task->tk_status = -EAGAIN; 928 return; 929 } 930 if (!xprt_lock_write(xprt, task)) 931 return; 932 933 if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 934 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 935 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 936 xprt_request_timeout(task->tk_rqstp)); 937 938 if (test_bit(XPRT_CLOSING, &xprt->state)) 939 return; 940 if (xprt_test_and_set_connecting(xprt)) 941 return; 942 /* Race breaker */ 943 if (!xprt_connected(xprt)) { 944 xprt->stat.connect_start = jiffies; 945 xprt->ops->connect(xprt, task); 946 } else { 947 xprt_clear_connecting(xprt); 948 task->tk_status = 0; 949 rpc_wake_up_queued_task(&xprt->pending, task); 950 } 951 } 952 xprt_release_write(xprt, task); 953} 954 955/** 956 * xprt_reconnect_delay - compute the wait before scheduling a connect 957 * @xprt: transport instance 958 * 959 */ 960unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 961{ 962 unsigned long start, now = jiffies; 963 964 start = xprt->stat.connect_start + xprt->reestablish_timeout; 965 if (time_after(start, now)) 966 return start - now; 967 return 0; 968} 969EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 970 971/** 972 * xprt_reconnect_backoff - compute the new re-establish timeout 973 * @xprt: transport instance 974 * @init_to: initial reestablish timeout 975 * 976 */ 977void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 978{ 979 xprt->reestablish_timeout <<= 1; 980 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 981 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 982 if (xprt->reestablish_timeout < init_to) 983 xprt->reestablish_timeout = init_to; 984} 985EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 986 987enum xprt_xid_rb_cmp { 988 XID_RB_EQUAL, 989 XID_RB_LEFT, 990 XID_RB_RIGHT, 991}; 992static enum xprt_xid_rb_cmp 993xprt_xid_cmp(__be32 xid1, __be32 xid2) 994{ 995 if (xid1 == xid2) 996 return XID_RB_EQUAL; 997 if ((__force u32)xid1 < (__force u32)xid2) 998 return XID_RB_LEFT; 999 return XID_RB_RIGHT; 1000} 1001 1002static struct rpc_rqst * 1003xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 1004{ 1005 struct rb_node *n = xprt->recv_queue.rb_node; 1006 struct rpc_rqst *req; 1007 1008 while (n != NULL) { 1009 req = rb_entry(n, struct rpc_rqst, rq_recv); 1010 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1011 case XID_RB_LEFT: 1012 n = n->rb_left; 1013 break; 1014 case XID_RB_RIGHT: 1015 n = n->rb_right; 1016 break; 1017 case XID_RB_EQUAL: 1018 return req; 1019 } 1020 } 1021 return NULL; 1022} 1023 1024static void 1025xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1026{ 1027 struct rb_node **p = &xprt->recv_queue.rb_node; 1028 struct rb_node *n = NULL; 1029 struct rpc_rqst *req; 1030 1031 while (*p != NULL) { 1032 n = *p; 1033 req = rb_entry(n, struct rpc_rqst, rq_recv); 1034 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1035 case XID_RB_LEFT: 1036 p = &n->rb_left; 1037 break; 1038 case XID_RB_RIGHT: 1039 p = &n->rb_right; 1040 break; 1041 case XID_RB_EQUAL: 1042 WARN_ON_ONCE(new != req); 1043 return; 1044 } 1045 } 1046 rb_link_node(&new->rq_recv, n, p); 1047 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1048} 1049 1050static void 1051xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1052{ 1053 rb_erase(&req->rq_recv, &xprt->recv_queue); 1054} 1055 1056/** 1057 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1058 * @xprt: transport on which the original request was transmitted 1059 * @xid: RPC XID of incoming reply 1060 * 1061 * Caller holds xprt->queue_lock. 1062 */ 1063struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1064{ 1065 struct rpc_rqst *entry; 1066 1067 entry = xprt_request_rb_find(xprt, xid); 1068 if (entry != NULL) { 1069 trace_xprt_lookup_rqst(xprt, xid, 0); 1070 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1071 return entry; 1072 } 1073 1074 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1075 ntohl(xid)); 1076 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1077 xprt->stat.bad_xids++; 1078 return NULL; 1079} 1080EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1081 1082static bool 1083xprt_is_pinned_rqst(struct rpc_rqst *req) 1084{ 1085 return atomic_read(&req->rq_pin) != 0; 1086} 1087 1088/** 1089 * xprt_pin_rqst - Pin a request on the transport receive list 1090 * @req: Request to pin 1091 * 1092 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1093 * so should be holding xprt->queue_lock. 1094 */ 1095void xprt_pin_rqst(struct rpc_rqst *req) 1096{ 1097 atomic_inc(&req->rq_pin); 1098} 1099EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1100 1101/** 1102 * xprt_unpin_rqst - Unpin a request on the transport receive list 1103 * @req: Request to pin 1104 * 1105 * Caller should be holding xprt->queue_lock. 1106 */ 1107void xprt_unpin_rqst(struct rpc_rqst *req) 1108{ 1109 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1110 atomic_dec(&req->rq_pin); 1111 return; 1112 } 1113 if (atomic_dec_and_test(&req->rq_pin)) 1114 wake_up_var(&req->rq_pin); 1115} 1116EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1117 1118static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1119{ 1120 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1121} 1122 1123static bool 1124xprt_request_data_received(struct rpc_task *task) 1125{ 1126 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1127 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1128} 1129 1130static bool 1131xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1132{ 1133 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1134 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1135} 1136 1137/** 1138 * xprt_request_enqueue_receive - Add an request to the receive queue 1139 * @task: RPC task 1140 * 1141 */ 1142int 1143xprt_request_enqueue_receive(struct rpc_task *task) 1144{ 1145 struct rpc_rqst *req = task->tk_rqstp; 1146 struct rpc_xprt *xprt = req->rq_xprt; 1147 int ret; 1148 1149 if (!xprt_request_need_enqueue_receive(task, req)) 1150 return 0; 1151 1152 ret = xprt_request_prepare(task->tk_rqstp); 1153 if (ret) 1154 return ret; 1155 spin_lock(&xprt->queue_lock); 1156 1157 /* Update the softirq receive buffer */ 1158 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1159 sizeof(req->rq_private_buf)); 1160 1161 /* Add request to the receive list */ 1162 xprt_request_rb_insert(xprt, req); 1163 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1164 spin_unlock(&xprt->queue_lock); 1165 1166 /* Turn off autodisconnect */ 1167 del_singleshot_timer_sync(&xprt->timer); 1168 return 0; 1169} 1170 1171/** 1172 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1173 * @task: RPC task 1174 * 1175 * Caller must hold xprt->queue_lock. 1176 */ 1177static void 1178xprt_request_dequeue_receive_locked(struct rpc_task *task) 1179{ 1180 struct rpc_rqst *req = task->tk_rqstp; 1181 1182 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1183 xprt_request_rb_remove(req->rq_xprt, req); 1184} 1185 1186/** 1187 * xprt_update_rtt - Update RPC RTT statistics 1188 * @task: RPC request that recently completed 1189 * 1190 * Caller holds xprt->queue_lock. 1191 */ 1192void xprt_update_rtt(struct rpc_task *task) 1193{ 1194 struct rpc_rqst *req = task->tk_rqstp; 1195 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1196 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1197 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1198 1199 if (timer) { 1200 if (req->rq_ntrans == 1) 1201 rpc_update_rtt(rtt, timer, m); 1202 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1203 } 1204} 1205EXPORT_SYMBOL_GPL(xprt_update_rtt); 1206 1207/** 1208 * xprt_complete_rqst - called when reply processing is complete 1209 * @task: RPC request that recently completed 1210 * @copied: actual number of bytes received from the transport 1211 * 1212 * Caller holds xprt->queue_lock. 1213 */ 1214void xprt_complete_rqst(struct rpc_task *task, int copied) 1215{ 1216 struct rpc_rqst *req = task->tk_rqstp; 1217 struct rpc_xprt *xprt = req->rq_xprt; 1218 1219 xprt->stat.recvs++; 1220 1221 req->rq_private_buf.len = copied; 1222 /* Ensure all writes are done before we update */ 1223 /* req->rq_reply_bytes_recvd */ 1224 smp_wmb(); 1225 req->rq_reply_bytes_recvd = copied; 1226 xprt_request_dequeue_receive_locked(task); 1227 rpc_wake_up_queued_task(&xprt->pending, task); 1228} 1229EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1230 1231static void xprt_timer(struct rpc_task *task) 1232{ 1233 struct rpc_rqst *req = task->tk_rqstp; 1234 struct rpc_xprt *xprt = req->rq_xprt; 1235 1236 if (task->tk_status != -ETIMEDOUT) 1237 return; 1238 1239 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1240 if (!req->rq_reply_bytes_recvd) { 1241 if (xprt->ops->timer) 1242 xprt->ops->timer(xprt, task); 1243 } else 1244 task->tk_status = 0; 1245} 1246 1247/** 1248 * xprt_wait_for_reply_request_def - wait for reply 1249 * @task: pointer to rpc_task 1250 * 1251 * Set a request's retransmit timeout based on the transport's 1252 * default timeout parameters. Used by transports that don't adjust 1253 * the retransmit timeout based on round-trip time estimation, 1254 * and put the task to sleep on the pending queue. 1255 */ 1256void xprt_wait_for_reply_request_def(struct rpc_task *task) 1257{ 1258 struct rpc_rqst *req = task->tk_rqstp; 1259 1260 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1261 xprt_request_timeout(req)); 1262} 1263EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1264 1265/** 1266 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1267 * @task: pointer to rpc_task 1268 * 1269 * Set a request's retransmit timeout using the RTT estimator, 1270 * and put the task to sleep on the pending queue. 1271 */ 1272void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1273{ 1274 int timer = task->tk_msg.rpc_proc->p_timer; 1275 struct rpc_clnt *clnt = task->tk_client; 1276 struct rpc_rtt *rtt = clnt->cl_rtt; 1277 struct rpc_rqst *req = task->tk_rqstp; 1278 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1279 unsigned long timeout; 1280 1281 timeout = rpc_calc_rto(rtt, timer); 1282 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1283 if (timeout > max_timeout || timeout == 0) 1284 timeout = max_timeout; 1285 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1286 jiffies + timeout); 1287} 1288EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1289 1290/** 1291 * xprt_request_wait_receive - wait for the reply to an RPC request 1292 * @task: RPC task about to send a request 1293 * 1294 */ 1295void xprt_request_wait_receive(struct rpc_task *task) 1296{ 1297 struct rpc_rqst *req = task->tk_rqstp; 1298 struct rpc_xprt *xprt = req->rq_xprt; 1299 1300 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1301 return; 1302 /* 1303 * Sleep on the pending queue if we're expecting a reply. 1304 * The spinlock ensures atomicity between the test of 1305 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1306 */ 1307 spin_lock(&xprt->queue_lock); 1308 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1309 xprt->ops->wait_for_reply_request(task); 1310 /* 1311 * Send an extra queue wakeup call if the 1312 * connection was dropped in case the call to 1313 * rpc_sleep_on() raced. 1314 */ 1315 if (xprt_request_retransmit_after_disconnect(task)) 1316 rpc_wake_up_queued_task_set_status(&xprt->pending, 1317 task, -ENOTCONN); 1318 } 1319 spin_unlock(&xprt->queue_lock); 1320} 1321 1322static bool 1323xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1324{ 1325 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1326} 1327 1328/** 1329 * xprt_request_enqueue_transmit - queue a task for transmission 1330 * @task: pointer to rpc_task 1331 * 1332 * Add a task to the transmission queue. 1333 */ 1334void 1335xprt_request_enqueue_transmit(struct rpc_task *task) 1336{ 1337 struct rpc_rqst *pos, *req = task->tk_rqstp; 1338 struct rpc_xprt *xprt = req->rq_xprt; 1339 1340 if (xprt_request_need_enqueue_transmit(task, req)) { 1341 req->rq_bytes_sent = 0; 1342 spin_lock(&xprt->queue_lock); 1343 /* 1344 * Requests that carry congestion control credits are added 1345 * to the head of the list to avoid starvation issues. 1346 */ 1347 if (req->rq_cong) { 1348 xprt_clear_congestion_window_wait(xprt); 1349 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1350 if (pos->rq_cong) 1351 continue; 1352 /* Note: req is added _before_ pos */ 1353 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1354 INIT_LIST_HEAD(&req->rq_xmit2); 1355 goto out; 1356 } 1357 } else if (!req->rq_seqno) { 1358 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1359 if (pos->rq_task->tk_owner != task->tk_owner) 1360 continue; 1361 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1362 INIT_LIST_HEAD(&req->rq_xmit); 1363 goto out; 1364 } 1365 } 1366 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1367 INIT_LIST_HEAD(&req->rq_xmit2); 1368out: 1369 atomic_long_inc(&xprt->xmit_queuelen); 1370 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1371 spin_unlock(&xprt->queue_lock); 1372 } 1373} 1374 1375/** 1376 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1377 * @task: pointer to rpc_task 1378 * 1379 * Remove a task from the transmission queue 1380 * Caller must hold xprt->queue_lock 1381 */ 1382static void 1383xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1384{ 1385 struct rpc_rqst *req = task->tk_rqstp; 1386 1387 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1388 return; 1389 if (!list_empty(&req->rq_xmit)) { 1390 list_del(&req->rq_xmit); 1391 if (!list_empty(&req->rq_xmit2)) { 1392 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1393 struct rpc_rqst, rq_xmit2); 1394 list_del(&req->rq_xmit2); 1395 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1396 } 1397 } else 1398 list_del(&req->rq_xmit2); 1399 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1400} 1401 1402/** 1403 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1404 * @task: pointer to rpc_task 1405 * 1406 * Remove a task from the transmission queue 1407 */ 1408static void 1409xprt_request_dequeue_transmit(struct rpc_task *task) 1410{ 1411 struct rpc_rqst *req = task->tk_rqstp; 1412 struct rpc_xprt *xprt = req->rq_xprt; 1413 1414 spin_lock(&xprt->queue_lock); 1415 xprt_request_dequeue_transmit_locked(task); 1416 spin_unlock(&xprt->queue_lock); 1417} 1418 1419/** 1420 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1421 * @task: pointer to rpc_task 1422 * 1423 * Remove a task from the transmit and receive queues, and ensure that 1424 * it is not pinned by the receive work item. 1425 */ 1426void 1427xprt_request_dequeue_xprt(struct rpc_task *task) 1428{ 1429 struct rpc_rqst *req = task->tk_rqstp; 1430 struct rpc_xprt *xprt = req->rq_xprt; 1431 1432 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1433 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1434 xprt_is_pinned_rqst(req)) { 1435 spin_lock(&xprt->queue_lock); 1436 xprt_request_dequeue_transmit_locked(task); 1437 xprt_request_dequeue_receive_locked(task); 1438 while (xprt_is_pinned_rqst(req)) { 1439 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1440 spin_unlock(&xprt->queue_lock); 1441 xprt_wait_on_pinned_rqst(req); 1442 spin_lock(&xprt->queue_lock); 1443 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1444 } 1445 spin_unlock(&xprt->queue_lock); 1446 } 1447} 1448 1449/** 1450 * xprt_request_prepare - prepare an encoded request for transport 1451 * @req: pointer to rpc_rqst 1452 * 1453 * Calls into the transport layer to do whatever is needed to prepare 1454 * the request for transmission or receive. 1455 * Returns error, or zero. 1456 */ 1457static int 1458xprt_request_prepare(struct rpc_rqst *req) 1459{ 1460 struct rpc_xprt *xprt = req->rq_xprt; 1461 1462 if (xprt->ops->prepare_request) 1463 return xprt->ops->prepare_request(req); 1464 return 0; 1465} 1466 1467/** 1468 * xprt_request_need_retransmit - Test if a task needs retransmission 1469 * @task: pointer to rpc_task 1470 * 1471 * Test for whether a connection breakage requires the task to retransmit 1472 */ 1473bool 1474xprt_request_need_retransmit(struct rpc_task *task) 1475{ 1476 return xprt_request_retransmit_after_disconnect(task); 1477} 1478 1479/** 1480 * xprt_prepare_transmit - reserve the transport before sending a request 1481 * @task: RPC task about to send a request 1482 * 1483 */ 1484bool xprt_prepare_transmit(struct rpc_task *task) 1485{ 1486 struct rpc_rqst *req = task->tk_rqstp; 1487 struct rpc_xprt *xprt = req->rq_xprt; 1488 1489 if (!xprt_lock_write(xprt, task)) { 1490 /* Race breaker: someone may have transmitted us */ 1491 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1492 rpc_wake_up_queued_task_set_status(&xprt->sending, 1493 task, 0); 1494 return false; 1495 1496 } 1497 if (atomic_read(&xprt->swapper)) 1498 /* This will be clear in __rpc_execute */ 1499 current->flags |= PF_MEMALLOC; 1500 return true; 1501} 1502 1503void xprt_end_transmit(struct rpc_task *task) 1504{ 1505 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1506 1507 xprt_inject_disconnect(xprt); 1508 xprt_release_write(xprt, task); 1509} 1510 1511/** 1512 * xprt_request_transmit - send an RPC request on a transport 1513 * @req: pointer to request to transmit 1514 * @snd_task: RPC task that owns the transport lock 1515 * 1516 * This performs the transmission of a single request. 1517 * Note that if the request is not the same as snd_task, then it 1518 * does need to be pinned. 1519 * Returns '0' on success. 1520 */ 1521static int 1522xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1523{ 1524 struct rpc_xprt *xprt = req->rq_xprt; 1525 struct rpc_task *task = req->rq_task; 1526 unsigned int connect_cookie; 1527 int is_retrans = RPC_WAS_SENT(task); 1528 int status; 1529 1530 if (!req->rq_bytes_sent) { 1531 if (xprt_request_data_received(task)) { 1532 status = 0; 1533 goto out_dequeue; 1534 } 1535 /* Verify that our message lies in the RPCSEC_GSS window */ 1536 if (rpcauth_xmit_need_reencode(task)) { 1537 status = -EBADMSG; 1538 goto out_dequeue; 1539 } 1540 if (RPC_SIGNALLED(task)) { 1541 status = -ERESTARTSYS; 1542 goto out_dequeue; 1543 } 1544 } 1545 1546 /* 1547 * Update req->rq_ntrans before transmitting to avoid races with 1548 * xprt_update_rtt(), which needs to know that it is recording a 1549 * reply to the first transmission. 1550 */ 1551 req->rq_ntrans++; 1552 1553 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1554 connect_cookie = xprt->connect_cookie; 1555 status = xprt->ops->send_request(req); 1556 if (status != 0) { 1557 req->rq_ntrans--; 1558 trace_xprt_transmit(req, status); 1559 return status; 1560 } 1561 1562 if (is_retrans) { 1563 task->tk_client->cl_stats->rpcretrans++; 1564 trace_xprt_retransmit(req); 1565 } 1566 1567 xprt_inject_disconnect(xprt); 1568 1569 task->tk_flags |= RPC_TASK_SENT; 1570 spin_lock(&xprt->transport_lock); 1571 1572 xprt->stat.sends++; 1573 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1574 xprt->stat.bklog_u += xprt->backlog.qlen; 1575 xprt->stat.sending_u += xprt->sending.qlen; 1576 xprt->stat.pending_u += xprt->pending.qlen; 1577 spin_unlock(&xprt->transport_lock); 1578 1579 req->rq_connect_cookie = connect_cookie; 1580out_dequeue: 1581 trace_xprt_transmit(req, status); 1582 xprt_request_dequeue_transmit(task); 1583 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1584 return status; 1585} 1586 1587/** 1588 * xprt_transmit - send an RPC request on a transport 1589 * @task: controlling RPC task 1590 * 1591 * Attempts to drain the transmit queue. On exit, either the transport 1592 * signalled an error that needs to be handled before transmission can 1593 * resume, or @task finished transmitting, and detected that it already 1594 * received a reply. 1595 */ 1596void 1597xprt_transmit(struct rpc_task *task) 1598{ 1599 struct rpc_rqst *next, *req = task->tk_rqstp; 1600 struct rpc_xprt *xprt = req->rq_xprt; 1601 int status; 1602 1603 spin_lock(&xprt->queue_lock); 1604 for (;;) { 1605 next = list_first_entry_or_null(&xprt->xmit_queue, 1606 struct rpc_rqst, rq_xmit); 1607 if (!next) 1608 break; 1609 xprt_pin_rqst(next); 1610 spin_unlock(&xprt->queue_lock); 1611 status = xprt_request_transmit(next, task); 1612 if (status == -EBADMSG && next != req) 1613 status = 0; 1614 spin_lock(&xprt->queue_lock); 1615 xprt_unpin_rqst(next); 1616 if (status < 0) { 1617 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1618 task->tk_status = status; 1619 break; 1620 } 1621 /* Was @task transmitted, and has it received a reply? */ 1622 if (xprt_request_data_received(task) && 1623 !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1624 break; 1625 cond_resched_lock(&xprt->queue_lock); 1626 } 1627 spin_unlock(&xprt->queue_lock); 1628} 1629 1630static void xprt_complete_request_init(struct rpc_task *task) 1631{ 1632 if (task->tk_rqstp) 1633 xprt_request_init(task); 1634} 1635 1636void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1637{ 1638 set_bit(XPRT_CONGESTED, &xprt->state); 1639 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1640} 1641EXPORT_SYMBOL_GPL(xprt_add_backlog); 1642 1643static bool __xprt_set_rq(struct rpc_task *task, void *data) 1644{ 1645 struct rpc_rqst *req = data; 1646 1647 if (task->tk_rqstp == NULL) { 1648 memset(req, 0, sizeof(*req)); /* mark unused */ 1649 task->tk_rqstp = req; 1650 return true; 1651 } 1652 return false; 1653} 1654 1655bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1656{ 1657 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1658 clear_bit(XPRT_CONGESTED, &xprt->state); 1659 return false; 1660 } 1661 return true; 1662} 1663EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1664 1665static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1666{ 1667 bool ret = false; 1668 1669 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1670 goto out; 1671 spin_lock(&xprt->reserve_lock); 1672 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1673 xprt_add_backlog(xprt, task); 1674 ret = true; 1675 } 1676 spin_unlock(&xprt->reserve_lock); 1677out: 1678 return ret; 1679} 1680 1681static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1682{ 1683 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1684 1685 if (xprt->num_reqs >= xprt->max_reqs) 1686 goto out; 1687 ++xprt->num_reqs; 1688 spin_unlock(&xprt->reserve_lock); 1689 req = kzalloc(sizeof(*req), rpc_task_gfp_mask()); 1690 spin_lock(&xprt->reserve_lock); 1691 if (req != NULL) 1692 goto out; 1693 --xprt->num_reqs; 1694 req = ERR_PTR(-ENOMEM); 1695out: 1696 return req; 1697} 1698 1699static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1700{ 1701 if (xprt->num_reqs > xprt->min_reqs) { 1702 --xprt->num_reqs; 1703 kfree(req); 1704 return true; 1705 } 1706 return false; 1707} 1708 1709void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1710{ 1711 struct rpc_rqst *req; 1712 1713 spin_lock(&xprt->reserve_lock); 1714 if (!list_empty(&xprt->free)) { 1715 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1716 list_del(&req->rq_list); 1717 goto out_init_req; 1718 } 1719 req = xprt_dynamic_alloc_slot(xprt); 1720 if (!IS_ERR(req)) 1721 goto out_init_req; 1722 switch (PTR_ERR(req)) { 1723 case -ENOMEM: 1724 dprintk("RPC: dynamic allocation of request slot " 1725 "failed! Retrying\n"); 1726 task->tk_status = -ENOMEM; 1727 break; 1728 case -EAGAIN: 1729 xprt_add_backlog(xprt, task); 1730 dprintk("RPC: waiting for request slot\n"); 1731 fallthrough; 1732 default: 1733 task->tk_status = -EAGAIN; 1734 } 1735 spin_unlock(&xprt->reserve_lock); 1736 return; 1737out_init_req: 1738 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1739 xprt->num_reqs); 1740 spin_unlock(&xprt->reserve_lock); 1741 1742 task->tk_status = 0; 1743 task->tk_rqstp = req; 1744} 1745EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1746 1747void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1748{ 1749 spin_lock(&xprt->reserve_lock); 1750 if (!xprt_wake_up_backlog(xprt, req) && 1751 !xprt_dynamic_free_slot(xprt, req)) { 1752 memset(req, 0, sizeof(*req)); /* mark unused */ 1753 list_add(&req->rq_list, &xprt->free); 1754 } 1755 spin_unlock(&xprt->reserve_lock); 1756} 1757EXPORT_SYMBOL_GPL(xprt_free_slot); 1758 1759static void xprt_free_all_slots(struct rpc_xprt *xprt) 1760{ 1761 struct rpc_rqst *req; 1762 while (!list_empty(&xprt->free)) { 1763 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1764 list_del(&req->rq_list); 1765 kfree(req); 1766 } 1767} 1768 1769static DEFINE_IDA(rpc_xprt_ids); 1770 1771void xprt_cleanup_ids(void) 1772{ 1773 ida_destroy(&rpc_xprt_ids); 1774} 1775 1776static int xprt_alloc_id(struct rpc_xprt *xprt) 1777{ 1778 int id; 1779 1780 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1781 if (id < 0) 1782 return id; 1783 1784 xprt->id = id; 1785 return 0; 1786} 1787 1788static void xprt_free_id(struct rpc_xprt *xprt) 1789{ 1790 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1791} 1792 1793struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1794 unsigned int num_prealloc, 1795 unsigned int max_alloc) 1796{ 1797 struct rpc_xprt *xprt; 1798 struct rpc_rqst *req; 1799 int i; 1800 1801 xprt = kzalloc(size, GFP_KERNEL); 1802 if (xprt == NULL) 1803 goto out; 1804 1805 xprt_alloc_id(xprt); 1806 xprt_init(xprt, net); 1807 1808 for (i = 0; i < num_prealloc; i++) { 1809 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1810 if (!req) 1811 goto out_free; 1812 list_add(&req->rq_list, &xprt->free); 1813 } 1814 if (max_alloc > num_prealloc) 1815 xprt->max_reqs = max_alloc; 1816 else 1817 xprt->max_reqs = num_prealloc; 1818 xprt->min_reqs = num_prealloc; 1819 xprt->num_reqs = num_prealloc; 1820 1821 return xprt; 1822 1823out_free: 1824 xprt_free(xprt); 1825out: 1826 return NULL; 1827} 1828EXPORT_SYMBOL_GPL(xprt_alloc); 1829 1830void xprt_free(struct rpc_xprt *xprt) 1831{ 1832 put_net_track(xprt->xprt_net, &xprt->ns_tracker); 1833 xprt_free_all_slots(xprt); 1834 xprt_free_id(xprt); 1835 rpc_sysfs_xprt_destroy(xprt); 1836 kfree_rcu(xprt, rcu); 1837} 1838EXPORT_SYMBOL_GPL(xprt_free); 1839 1840static void 1841xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1842{ 1843 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1844} 1845 1846static __be32 1847xprt_alloc_xid(struct rpc_xprt *xprt) 1848{ 1849 __be32 xid; 1850 1851 spin_lock(&xprt->reserve_lock); 1852 xid = (__force __be32)xprt->xid++; 1853 spin_unlock(&xprt->reserve_lock); 1854 return xid; 1855} 1856 1857static void 1858xprt_init_xid(struct rpc_xprt *xprt) 1859{ 1860 xprt->xid = prandom_u32(); 1861} 1862 1863static void 1864xprt_request_init(struct rpc_task *task) 1865{ 1866 struct rpc_xprt *xprt = task->tk_xprt; 1867 struct rpc_rqst *req = task->tk_rqstp; 1868 1869 req->rq_task = task; 1870 req->rq_xprt = xprt; 1871 req->rq_buffer = NULL; 1872 req->rq_xid = xprt_alloc_xid(xprt); 1873 xprt_init_connect_cookie(req, xprt); 1874 req->rq_snd_buf.len = 0; 1875 req->rq_snd_buf.buflen = 0; 1876 req->rq_rcv_buf.len = 0; 1877 req->rq_rcv_buf.buflen = 0; 1878 req->rq_snd_buf.bvec = NULL; 1879 req->rq_rcv_buf.bvec = NULL; 1880 req->rq_release_snd_buf = NULL; 1881 xprt_init_majortimeo(task, req); 1882 1883 trace_xprt_reserve(req); 1884} 1885 1886static void 1887xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1888{ 1889 xprt->ops->alloc_slot(xprt, task); 1890 if (task->tk_rqstp != NULL) 1891 xprt_request_init(task); 1892} 1893 1894/** 1895 * xprt_reserve - allocate an RPC request slot 1896 * @task: RPC task requesting a slot allocation 1897 * 1898 * If the transport is marked as being congested, or if no more 1899 * slots are available, place the task on the transport's 1900 * backlog queue. 1901 */ 1902void xprt_reserve(struct rpc_task *task) 1903{ 1904 struct rpc_xprt *xprt = task->tk_xprt; 1905 1906 task->tk_status = 0; 1907 if (task->tk_rqstp != NULL) 1908 return; 1909 1910 task->tk_status = -EAGAIN; 1911 if (!xprt_throttle_congested(xprt, task)) 1912 xprt_do_reserve(xprt, task); 1913} 1914 1915/** 1916 * xprt_retry_reserve - allocate an RPC request slot 1917 * @task: RPC task requesting a slot allocation 1918 * 1919 * If no more slots are available, place the task on the transport's 1920 * backlog queue. 1921 * Note that the only difference with xprt_reserve is that we now 1922 * ignore the value of the XPRT_CONGESTED flag. 1923 */ 1924void xprt_retry_reserve(struct rpc_task *task) 1925{ 1926 struct rpc_xprt *xprt = task->tk_xprt; 1927 1928 task->tk_status = 0; 1929 if (task->tk_rqstp != NULL) 1930 return; 1931 1932 task->tk_status = -EAGAIN; 1933 xprt_do_reserve(xprt, task); 1934} 1935 1936/** 1937 * xprt_release - release an RPC request slot 1938 * @task: task which is finished with the slot 1939 * 1940 */ 1941void xprt_release(struct rpc_task *task) 1942{ 1943 struct rpc_xprt *xprt; 1944 struct rpc_rqst *req = task->tk_rqstp; 1945 1946 if (req == NULL) { 1947 if (task->tk_client) { 1948 xprt = task->tk_xprt; 1949 xprt_release_write(xprt, task); 1950 } 1951 return; 1952 } 1953 1954 xprt = req->rq_xprt; 1955 xprt_request_dequeue_xprt(task); 1956 spin_lock(&xprt->transport_lock); 1957 xprt->ops->release_xprt(xprt, task); 1958 if (xprt->ops->release_request) 1959 xprt->ops->release_request(task); 1960 xprt_schedule_autodisconnect(xprt); 1961 spin_unlock(&xprt->transport_lock); 1962 if (req->rq_buffer) 1963 xprt->ops->buf_free(task); 1964 xdr_free_bvec(&req->rq_rcv_buf); 1965 xdr_free_bvec(&req->rq_snd_buf); 1966 if (req->rq_cred != NULL) 1967 put_rpccred(req->rq_cred); 1968 if (req->rq_release_snd_buf) 1969 req->rq_release_snd_buf(req); 1970 1971 task->tk_rqstp = NULL; 1972 if (likely(!bc_prealloc(req))) 1973 xprt->ops->free_slot(xprt, req); 1974 else 1975 xprt_free_bc_request(req); 1976} 1977 1978#ifdef CONFIG_SUNRPC_BACKCHANNEL 1979void 1980xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1981{ 1982 struct xdr_buf *xbufp = &req->rq_snd_buf; 1983 1984 task->tk_rqstp = req; 1985 req->rq_task = task; 1986 xprt_init_connect_cookie(req, req->rq_xprt); 1987 /* 1988 * Set up the xdr_buf length. 1989 * This also indicates that the buffer is XDR encoded already. 1990 */ 1991 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1992 xbufp->tail[0].iov_len; 1993} 1994#endif 1995 1996static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1997{ 1998 kref_init(&xprt->kref); 1999 2000 spin_lock_init(&xprt->transport_lock); 2001 spin_lock_init(&xprt->reserve_lock); 2002 spin_lock_init(&xprt->queue_lock); 2003 2004 INIT_LIST_HEAD(&xprt->free); 2005 xprt->recv_queue = RB_ROOT; 2006 INIT_LIST_HEAD(&xprt->xmit_queue); 2007#if defined(CONFIG_SUNRPC_BACKCHANNEL) 2008 spin_lock_init(&xprt->bc_pa_lock); 2009 INIT_LIST_HEAD(&xprt->bc_pa_list); 2010#endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2011 INIT_LIST_HEAD(&xprt->xprt_switch); 2012 2013 xprt->last_used = jiffies; 2014 xprt->cwnd = RPC_INITCWND; 2015 xprt->bind_index = 0; 2016 2017 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2018 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2019 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2020 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2021 2022 xprt_init_xid(xprt); 2023 2024 xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL); 2025} 2026 2027/** 2028 * xprt_create_transport - create an RPC transport 2029 * @args: rpc transport creation arguments 2030 * 2031 */ 2032struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2033{ 2034 struct rpc_xprt *xprt; 2035 const struct xprt_class *t; 2036 2037 t = xprt_class_find_by_ident(args->ident); 2038 if (!t) { 2039 dprintk("RPC: transport (%d) not supported\n", args->ident); 2040 return ERR_PTR(-EIO); 2041 } 2042 2043 xprt = t->setup(args); 2044 xprt_class_release(t); 2045 2046 if (IS_ERR(xprt)) 2047 goto out; 2048 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2049 xprt->idle_timeout = 0; 2050 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2051 if (xprt_has_timer(xprt)) 2052 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2053 else 2054 timer_setup(&xprt->timer, NULL, 0); 2055 2056 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2057 xprt_destroy(xprt); 2058 return ERR_PTR(-EINVAL); 2059 } 2060 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2061 if (xprt->servername == NULL) { 2062 xprt_destroy(xprt); 2063 return ERR_PTR(-ENOMEM); 2064 } 2065 2066 rpc_xprt_debugfs_register(xprt); 2067 2068 trace_xprt_create(xprt); 2069out: 2070 return xprt; 2071} 2072 2073static void xprt_destroy_cb(struct work_struct *work) 2074{ 2075 struct rpc_xprt *xprt = 2076 container_of(work, struct rpc_xprt, task_cleanup); 2077 2078 trace_xprt_destroy(xprt); 2079 2080 rpc_xprt_debugfs_unregister(xprt); 2081 rpc_destroy_wait_queue(&xprt->binding); 2082 rpc_destroy_wait_queue(&xprt->pending); 2083 rpc_destroy_wait_queue(&xprt->sending); 2084 rpc_destroy_wait_queue(&xprt->backlog); 2085 kfree(xprt->servername); 2086 /* 2087 * Destroy any existing back channel 2088 */ 2089 xprt_destroy_backchannel(xprt, UINT_MAX); 2090 2091 /* 2092 * Tear down transport state and free the rpc_xprt 2093 */ 2094 xprt->ops->destroy(xprt); 2095} 2096 2097/** 2098 * xprt_destroy - destroy an RPC transport, killing off all requests. 2099 * @xprt: transport to destroy 2100 * 2101 */ 2102static void xprt_destroy(struct rpc_xprt *xprt) 2103{ 2104 /* 2105 * Exclude transport connect/disconnect handlers and autoclose 2106 */ 2107 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2108 2109 /* 2110 * xprt_schedule_autodisconnect() can run after XPRT_LOCKED 2111 * is cleared. We use ->transport_lock to ensure the mod_timer() 2112 * can only run *before* del_time_sync(), never after. 2113 */ 2114 spin_lock(&xprt->transport_lock); 2115 del_timer_sync(&xprt->timer); 2116 spin_unlock(&xprt->transport_lock); 2117 2118 /* 2119 * Destroy sockets etc from the system workqueue so they can 2120 * safely flush receive work running on rpciod. 2121 */ 2122 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2123 schedule_work(&xprt->task_cleanup); 2124} 2125 2126static void xprt_destroy_kref(struct kref *kref) 2127{ 2128 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2129} 2130 2131/** 2132 * xprt_get - return a reference to an RPC transport. 2133 * @xprt: pointer to the transport 2134 * 2135 */ 2136struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2137{ 2138 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2139 return xprt; 2140 return NULL; 2141} 2142EXPORT_SYMBOL_GPL(xprt_get); 2143 2144/** 2145 * xprt_put - release a reference to an RPC transport. 2146 * @xprt: pointer to the transport 2147 * 2148 */ 2149void xprt_put(struct rpc_xprt *xprt) 2150{ 2151 if (xprt != NULL) 2152 kref_put(&xprt->kref, xprt_destroy_kref); 2153} 2154EXPORT_SYMBOL_GPL(xprt_put);