cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

svc_rdma_transport.c (19456B)


      1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
      2/*
      3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
      4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
      5 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
      6 *
      7 * This software is available to you under a choice of one of two
      8 * licenses.  You may choose to be licensed under the terms of the GNU
      9 * General Public License (GPL) Version 2, available from the file
     10 * COPYING in the main directory of this source tree, or the BSD-type
     11 * license below:
     12 *
     13 * Redistribution and use in source and binary forms, with or without
     14 * modification, are permitted provided that the following conditions
     15 * are met:
     16 *
     17 *      Redistributions of source code must retain the above copyright
     18 *      notice, this list of conditions and the following disclaimer.
     19 *
     20 *      Redistributions in binary form must reproduce the above
     21 *      copyright notice, this list of conditions and the following
     22 *      disclaimer in the documentation and/or other materials provided
     23 *      with the distribution.
     24 *
     25 *      Neither the name of the Network Appliance, Inc. nor the names of
     26 *      its contributors may be used to endorse or promote products
     27 *      derived from this software without specific prior written
     28 *      permission.
     29 *
     30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     41 *
     42 * Author: Tom Tucker <tom@opengridcomputing.com>
     43 */
     44
     45#include <linux/interrupt.h>
     46#include <linux/sched.h>
     47#include <linux/slab.h>
     48#include <linux/spinlock.h>
     49#include <linux/workqueue.h>
     50#include <linux/export.h>
     51
     52#include <rdma/ib_verbs.h>
     53#include <rdma/rdma_cm.h>
     54#include <rdma/rw.h>
     55
     56#include <linux/sunrpc/addr.h>
     57#include <linux/sunrpc/debug.h>
     58#include <linux/sunrpc/svc_xprt.h>
     59#include <linux/sunrpc/svc_rdma.h>
     60
     61#include "xprt_rdma.h"
     62#include <trace/events/rpcrdma.h>
     63
     64#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
     65
     66static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
     67						 struct net *net);
     68static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
     69					struct net *net,
     70					struct sockaddr *sa, int salen,
     71					int flags);
     72static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
     73static void svc_rdma_detach(struct svc_xprt *xprt);
     74static void svc_rdma_free(struct svc_xprt *xprt);
     75static int svc_rdma_has_wspace(struct svc_xprt *xprt);
     76static void svc_rdma_secure_port(struct svc_rqst *);
     77static void svc_rdma_kill_temp_xprt(struct svc_xprt *);
     78
     79static const struct svc_xprt_ops svc_rdma_ops = {
     80	.xpo_create = svc_rdma_create,
     81	.xpo_recvfrom = svc_rdma_recvfrom,
     82	.xpo_sendto = svc_rdma_sendto,
     83	.xpo_result_payload = svc_rdma_result_payload,
     84	.xpo_release_rqst = svc_rdma_release_rqst,
     85	.xpo_detach = svc_rdma_detach,
     86	.xpo_free = svc_rdma_free,
     87	.xpo_has_wspace = svc_rdma_has_wspace,
     88	.xpo_accept = svc_rdma_accept,
     89	.xpo_secure_port = svc_rdma_secure_port,
     90	.xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt,
     91};
     92
     93struct svc_xprt_class svc_rdma_class = {
     94	.xcl_name = "rdma",
     95	.xcl_owner = THIS_MODULE,
     96	.xcl_ops = &svc_rdma_ops,
     97	.xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
     98	.xcl_ident = XPRT_TRANSPORT_RDMA,
     99};
    100
    101/* QP event handler */
    102static void qp_event_handler(struct ib_event *event, void *context)
    103{
    104	struct svc_xprt *xprt = context;
    105
    106	trace_svcrdma_qp_error(event, (struct sockaddr *)&xprt->xpt_remote);
    107	switch (event->event) {
    108	/* These are considered benign events */
    109	case IB_EVENT_PATH_MIG:
    110	case IB_EVENT_COMM_EST:
    111	case IB_EVENT_SQ_DRAINED:
    112	case IB_EVENT_QP_LAST_WQE_REACHED:
    113		break;
    114
    115	/* These are considered fatal events */
    116	case IB_EVENT_PATH_MIG_ERR:
    117	case IB_EVENT_QP_FATAL:
    118	case IB_EVENT_QP_REQ_ERR:
    119	case IB_EVENT_QP_ACCESS_ERR:
    120	case IB_EVENT_DEVICE_FATAL:
    121	default:
    122		svc_xprt_deferred_close(xprt);
    123		break;
    124	}
    125}
    126
    127static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
    128						 struct net *net)
    129{
    130	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
    131
    132	if (!cma_xprt) {
    133		dprintk("svcrdma: failed to create new transport\n");
    134		return NULL;
    135	}
    136	svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
    137	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
    138	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
    139	init_llist_head(&cma_xprt->sc_send_ctxts);
    140	init_llist_head(&cma_xprt->sc_recv_ctxts);
    141	init_llist_head(&cma_xprt->sc_rw_ctxts);
    142	init_waitqueue_head(&cma_xprt->sc_send_wait);
    143
    144	spin_lock_init(&cma_xprt->sc_lock);
    145	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
    146	spin_lock_init(&cma_xprt->sc_send_lock);
    147	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
    148
    149	/*
    150	 * Note that this implies that the underlying transport support
    151	 * has some form of congestion control (see RFC 7530 section 3.1
    152	 * paragraph 2). For now, we assume that all supported RDMA
    153	 * transports are suitable here.
    154	 */
    155	set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags);
    156
    157	return cma_xprt;
    158}
    159
    160static void
    161svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
    162			       struct rdma_conn_param *param)
    163{
    164	const struct rpcrdma_connect_private *pmsg = param->private_data;
    165
    166	if (pmsg &&
    167	    pmsg->cp_magic == rpcrdma_cmp_magic &&
    168	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
    169		newxprt->sc_snd_w_inv = pmsg->cp_flags &
    170					RPCRDMA_CMP_F_SND_W_INV_OK;
    171
    172		dprintk("svcrdma: client send_size %u, recv_size %u "
    173			"remote inv %ssupported\n",
    174			rpcrdma_decode_buffer_size(pmsg->cp_send_size),
    175			rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
    176			newxprt->sc_snd_w_inv ? "" : "un");
    177	}
    178}
    179
    180/*
    181 * This function handles the CONNECT_REQUEST event on a listening
    182 * endpoint. It is passed the cma_id for the _new_ connection. The context in
    183 * this cma_id is inherited from the listening cma_id and is the svc_xprt
    184 * structure for the listening endpoint.
    185 *
    186 * This function creates a new xprt for the new connection and enqueues it on
    187 * the accept queue for the listent xprt. When the listen thread is kicked, it
    188 * will call the recvfrom method on the listen xprt which will accept the new
    189 * connection.
    190 */
    191static void handle_connect_req(struct rdma_cm_id *new_cma_id,
    192			       struct rdma_conn_param *param)
    193{
    194	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
    195	struct svcxprt_rdma *newxprt;
    196	struct sockaddr *sa;
    197
    198	/* Create a new transport */
    199	newxprt = svc_rdma_create_xprt(listen_xprt->sc_xprt.xpt_server,
    200				       listen_xprt->sc_xprt.xpt_net);
    201	if (!newxprt)
    202		return;
    203	newxprt->sc_cm_id = new_cma_id;
    204	new_cma_id->context = newxprt;
    205	svc_rdma_parse_connect_private(newxprt, param);
    206
    207	/* Save client advertised inbound read limit for use later in accept. */
    208	newxprt->sc_ord = param->initiator_depth;
    209
    210	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
    211	newxprt->sc_xprt.xpt_remotelen = svc_addr_len(sa);
    212	memcpy(&newxprt->sc_xprt.xpt_remote, sa,
    213	       newxprt->sc_xprt.xpt_remotelen);
    214	snprintf(newxprt->sc_xprt.xpt_remotebuf,
    215		 sizeof(newxprt->sc_xprt.xpt_remotebuf) - 1, "%pISc", sa);
    216
    217	/* The remote port is arbitrary and not under the control of the
    218	 * client ULP. Set it to a fixed value so that the DRC continues
    219	 * to be effective after a reconnect.
    220	 */
    221	rpc_set_port((struct sockaddr *)&newxprt->sc_xprt.xpt_remote, 0);
    222
    223	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
    224	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
    225
    226	/*
    227	 * Enqueue the new transport on the accept queue of the listening
    228	 * transport
    229	 */
    230	spin_lock(&listen_xprt->sc_lock);
    231	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
    232	spin_unlock(&listen_xprt->sc_lock);
    233
    234	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
    235	svc_xprt_enqueue(&listen_xprt->sc_xprt);
    236}
    237
    238/**
    239 * svc_rdma_listen_handler - Handle CM events generated on a listening endpoint
    240 * @cma_id: the server's listener rdma_cm_id
    241 * @event: details of the event
    242 *
    243 * Return values:
    244 *     %0: Do not destroy @cma_id
    245 *     %1: Destroy @cma_id (never returned here)
    246 *
    247 * NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners.
    248 */
    249static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id,
    250				   struct rdma_cm_event *event)
    251{
    252	switch (event->event) {
    253	case RDMA_CM_EVENT_CONNECT_REQUEST:
    254		handle_connect_req(cma_id, &event->param.conn);
    255		break;
    256	default:
    257		break;
    258	}
    259	return 0;
    260}
    261
    262/**
    263 * svc_rdma_cma_handler - Handle CM events on client connections
    264 * @cma_id: the server's listener rdma_cm_id
    265 * @event: details of the event
    266 *
    267 * Return values:
    268 *     %0: Do not destroy @cma_id
    269 *     %1: Destroy @cma_id (never returned here)
    270 */
    271static int svc_rdma_cma_handler(struct rdma_cm_id *cma_id,
    272				struct rdma_cm_event *event)
    273{
    274	struct svcxprt_rdma *rdma = cma_id->context;
    275	struct svc_xprt *xprt = &rdma->sc_xprt;
    276
    277	switch (event->event) {
    278	case RDMA_CM_EVENT_ESTABLISHED:
    279		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
    280
    281		/* Handle any requests that were received while
    282		 * CONN_PENDING was set. */
    283		svc_xprt_enqueue(xprt);
    284		break;
    285	case RDMA_CM_EVENT_DISCONNECTED:
    286	case RDMA_CM_EVENT_DEVICE_REMOVAL:
    287		svc_xprt_deferred_close(xprt);
    288		break;
    289	default:
    290		break;
    291	}
    292	return 0;
    293}
    294
    295/*
    296 * Create a listening RDMA service endpoint.
    297 */
    298static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
    299					struct net *net,
    300					struct sockaddr *sa, int salen,
    301					int flags)
    302{
    303	struct rdma_cm_id *listen_id;
    304	struct svcxprt_rdma *cma_xprt;
    305	int ret;
    306
    307	if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6)
    308		return ERR_PTR(-EAFNOSUPPORT);
    309	cma_xprt = svc_rdma_create_xprt(serv, net);
    310	if (!cma_xprt)
    311		return ERR_PTR(-ENOMEM);
    312	set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
    313	strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener");
    314
    315	listen_id = rdma_create_id(net, svc_rdma_listen_handler, cma_xprt,
    316				   RDMA_PS_TCP, IB_QPT_RC);
    317	if (IS_ERR(listen_id)) {
    318		ret = PTR_ERR(listen_id);
    319		goto err0;
    320	}
    321
    322	/* Allow both IPv4 and IPv6 sockets to bind a single port
    323	 * at the same time.
    324	 */
    325#if IS_ENABLED(CONFIG_IPV6)
    326	ret = rdma_set_afonly(listen_id, 1);
    327	if (ret)
    328		goto err1;
    329#endif
    330	ret = rdma_bind_addr(listen_id, sa);
    331	if (ret)
    332		goto err1;
    333	cma_xprt->sc_cm_id = listen_id;
    334
    335	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
    336	if (ret)
    337		goto err1;
    338
    339	/*
    340	 * We need to use the address from the cm_id in case the
    341	 * caller specified 0 for the port number.
    342	 */
    343	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
    344	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
    345
    346	return &cma_xprt->sc_xprt;
    347
    348 err1:
    349	rdma_destroy_id(listen_id);
    350 err0:
    351	kfree(cma_xprt);
    352	return ERR_PTR(ret);
    353}
    354
    355/*
    356 * This is the xpo_recvfrom function for listening endpoints. Its
    357 * purpose is to accept incoming connections. The CMA callback handler
    358 * has already created a new transport and attached it to the new CMA
    359 * ID.
    360 *
    361 * There is a queue of pending connections hung on the listening
    362 * transport. This queue contains the new svc_xprt structure. This
    363 * function takes svc_xprt structures off the accept_q and completes
    364 * the connection.
    365 */
    366static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
    367{
    368	struct svcxprt_rdma *listen_rdma;
    369	struct svcxprt_rdma *newxprt = NULL;
    370	struct rdma_conn_param conn_param;
    371	struct rpcrdma_connect_private pmsg;
    372	struct ib_qp_init_attr qp_attr;
    373	unsigned int ctxts, rq_depth;
    374	struct ib_device *dev;
    375	int ret = 0;
    376	RPC_IFDEBUG(struct sockaddr *sap);
    377
    378	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
    379	clear_bit(XPT_CONN, &xprt->xpt_flags);
    380	/* Get the next entry off the accept list */
    381	spin_lock(&listen_rdma->sc_lock);
    382	if (!list_empty(&listen_rdma->sc_accept_q)) {
    383		newxprt = list_entry(listen_rdma->sc_accept_q.next,
    384				     struct svcxprt_rdma, sc_accept_q);
    385		list_del_init(&newxprt->sc_accept_q);
    386	}
    387	if (!list_empty(&listen_rdma->sc_accept_q))
    388		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
    389	spin_unlock(&listen_rdma->sc_lock);
    390	if (!newxprt)
    391		return NULL;
    392
    393	dev = newxprt->sc_cm_id->device;
    394	newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
    395
    396	/* Qualify the transport resource defaults with the
    397	 * capabilities of this particular device */
    398	/* Transport header, head iovec, tail iovec */
    399	newxprt->sc_max_send_sges = 3;
    400	/* Add one SGE per page list entry */
    401	newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1;
    402	if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
    403		newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
    404	newxprt->sc_max_req_size = svcrdma_max_req_size;
    405	newxprt->sc_max_requests = svcrdma_max_requests;
    406	newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
    407	newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH;
    408	rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests +
    409		   newxprt->sc_recv_batch;
    410	if (rq_depth > dev->attrs.max_qp_wr) {
    411		pr_warn("svcrdma: reducing receive depth to %d\n",
    412			dev->attrs.max_qp_wr);
    413		rq_depth = dev->attrs.max_qp_wr;
    414		newxprt->sc_recv_batch = 1;
    415		newxprt->sc_max_requests = rq_depth - 2;
    416		newxprt->sc_max_bc_requests = 2;
    417	}
    418	newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests);
    419	ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES);
    420	ctxts *= newxprt->sc_max_requests;
    421	newxprt->sc_sq_depth = rq_depth + ctxts;
    422	if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) {
    423		pr_warn("svcrdma: reducing send depth to %d\n",
    424			dev->attrs.max_qp_wr);
    425		newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
    426	}
    427	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
    428
    429	newxprt->sc_pd = ib_alloc_pd(dev, 0);
    430	if (IS_ERR(newxprt->sc_pd)) {
    431		trace_svcrdma_pd_err(newxprt, PTR_ERR(newxprt->sc_pd));
    432		goto errout;
    433	}
    434	newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth,
    435					    IB_POLL_WORKQUEUE);
    436	if (IS_ERR(newxprt->sc_sq_cq))
    437		goto errout;
    438	newxprt->sc_rq_cq =
    439		ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE);
    440	if (IS_ERR(newxprt->sc_rq_cq))
    441		goto errout;
    442
    443	memset(&qp_attr, 0, sizeof qp_attr);
    444	qp_attr.event_handler = qp_event_handler;
    445	qp_attr.qp_context = &newxprt->sc_xprt;
    446	qp_attr.port_num = newxprt->sc_port_num;
    447	qp_attr.cap.max_rdma_ctxs = ctxts;
    448	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts;
    449	qp_attr.cap.max_recv_wr = rq_depth;
    450	qp_attr.cap.max_send_sge = newxprt->sc_max_send_sges;
    451	qp_attr.cap.max_recv_sge = 1;
    452	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
    453	qp_attr.qp_type = IB_QPT_RC;
    454	qp_attr.send_cq = newxprt->sc_sq_cq;
    455	qp_attr.recv_cq = newxprt->sc_rq_cq;
    456	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n",
    457		newxprt->sc_cm_id, newxprt->sc_pd);
    458	dprintk("    cap.max_send_wr = %d, cap.max_recv_wr = %d\n",
    459		qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr);
    460	dprintk("    cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
    461		qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge);
    462
    463	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
    464	if (ret) {
    465		trace_svcrdma_qp_err(newxprt, ret);
    466		goto errout;
    467	}
    468	newxprt->sc_qp = newxprt->sc_cm_id->qp;
    469
    470	if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
    471		newxprt->sc_snd_w_inv = false;
    472	if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
    473	    !rdma_ib_or_roce(dev, newxprt->sc_port_num)) {
    474		trace_svcrdma_fabric_err(newxprt, -EINVAL);
    475		goto errout;
    476	}
    477
    478	if (!svc_rdma_post_recvs(newxprt))
    479		goto errout;
    480
    481	/* Construct RDMA-CM private message */
    482	pmsg.cp_magic = rpcrdma_cmp_magic;
    483	pmsg.cp_version = RPCRDMA_CMP_VERSION;
    484	pmsg.cp_flags = 0;
    485	pmsg.cp_send_size = pmsg.cp_recv_size =
    486		rpcrdma_encode_buffer_size(newxprt->sc_max_req_size);
    487
    488	/* Accept Connection */
    489	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
    490	memset(&conn_param, 0, sizeof conn_param);
    491	conn_param.responder_resources = 0;
    492	conn_param.initiator_depth = min_t(int, newxprt->sc_ord,
    493					   dev->attrs.max_qp_init_rd_atom);
    494	if (!conn_param.initiator_depth) {
    495		ret = -EINVAL;
    496		trace_svcrdma_initdepth_err(newxprt, ret);
    497		goto errout;
    498	}
    499	conn_param.private_data = &pmsg;
    500	conn_param.private_data_len = sizeof(pmsg);
    501	rdma_lock_handler(newxprt->sc_cm_id);
    502	newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
    503	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
    504	rdma_unlock_handler(newxprt->sc_cm_id);
    505	if (ret) {
    506		trace_svcrdma_accept_err(newxprt, ret);
    507		goto errout;
    508	}
    509
    510#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
    511	dprintk("svcrdma: new connection %p accepted:\n", newxprt);
    512	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
    513	dprintk("    local address   : %pIS:%u\n", sap, rpc_get_port(sap));
    514	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
    515	dprintk("    remote address  : %pIS:%u\n", sap, rpc_get_port(sap));
    516	dprintk("    max_sge         : %d\n", newxprt->sc_max_send_sges);
    517	dprintk("    sq_depth        : %d\n", newxprt->sc_sq_depth);
    518	dprintk("    rdma_rw_ctxs    : %d\n", ctxts);
    519	dprintk("    max_requests    : %d\n", newxprt->sc_max_requests);
    520	dprintk("    ord             : %d\n", conn_param.initiator_depth);
    521#endif
    522
    523	return &newxprt->sc_xprt;
    524
    525 errout:
    526	/* Take a reference in case the DTO handler runs */
    527	svc_xprt_get(&newxprt->sc_xprt);
    528	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
    529		ib_destroy_qp(newxprt->sc_qp);
    530	rdma_destroy_id(newxprt->sc_cm_id);
    531	/* This call to put will destroy the transport */
    532	svc_xprt_put(&newxprt->sc_xprt);
    533	return NULL;
    534}
    535
    536static void svc_rdma_detach(struct svc_xprt *xprt)
    537{
    538	struct svcxprt_rdma *rdma =
    539		container_of(xprt, struct svcxprt_rdma, sc_xprt);
    540
    541	rdma_disconnect(rdma->sc_cm_id);
    542}
    543
    544static void __svc_rdma_free(struct work_struct *work)
    545{
    546	struct svcxprt_rdma *rdma =
    547		container_of(work, struct svcxprt_rdma, sc_work);
    548
    549	/* This blocks until the Completion Queues are empty */
    550	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
    551		ib_drain_qp(rdma->sc_qp);
    552
    553	svc_rdma_flush_recv_queues(rdma);
    554
    555	svc_rdma_destroy_rw_ctxts(rdma);
    556	svc_rdma_send_ctxts_destroy(rdma);
    557	svc_rdma_recv_ctxts_destroy(rdma);
    558
    559	/* Destroy the QP if present (not a listener) */
    560	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
    561		ib_destroy_qp(rdma->sc_qp);
    562
    563	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
    564		ib_free_cq(rdma->sc_sq_cq);
    565
    566	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
    567		ib_free_cq(rdma->sc_rq_cq);
    568
    569	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
    570		ib_dealloc_pd(rdma->sc_pd);
    571
    572	/* Destroy the CM ID */
    573	rdma_destroy_id(rdma->sc_cm_id);
    574
    575	kfree(rdma);
    576}
    577
    578static void svc_rdma_free(struct svc_xprt *xprt)
    579{
    580	struct svcxprt_rdma *rdma =
    581		container_of(xprt, struct svcxprt_rdma, sc_xprt);
    582
    583	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
    584	schedule_work(&rdma->sc_work);
    585}
    586
    587static int svc_rdma_has_wspace(struct svc_xprt *xprt)
    588{
    589	struct svcxprt_rdma *rdma =
    590		container_of(xprt, struct svcxprt_rdma, sc_xprt);
    591
    592	/*
    593	 * If there are already waiters on the SQ,
    594	 * return false.
    595	 */
    596	if (waitqueue_active(&rdma->sc_send_wait))
    597		return 0;
    598
    599	/* Otherwise return true. */
    600	return 1;
    601}
    602
    603static void svc_rdma_secure_port(struct svc_rqst *rqstp)
    604{
    605	__set_bit(RQ_SECURE, &rqstp->rq_flags);
    606}
    607
    608static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
    609{
    610}