cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

svc_rdma_recvfrom.c (25026B)


      1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
      2/*
      3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
      4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
      5 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
      6 *
      7 * This software is available to you under a choice of one of two
      8 * licenses.  You may choose to be licensed under the terms of the GNU
      9 * General Public License (GPL) Version 2, available from the file
     10 * COPYING in the main directory of this source tree, or the BSD-type
     11 * license below:
     12 *
     13 * Redistribution and use in source and binary forms, with or without
     14 * modification, are permitted provided that the following conditions
     15 * are met:
     16 *
     17 *      Redistributions of source code must retain the above copyright
     18 *      notice, this list of conditions and the following disclaimer.
     19 *
     20 *      Redistributions in binary form must reproduce the above
     21 *      copyright notice, this list of conditions and the following
     22 *      disclaimer in the documentation and/or other materials provided
     23 *      with the distribution.
     24 *
     25 *      Neither the name of the Network Appliance, Inc. nor the names of
     26 *      its contributors may be used to endorse or promote products
     27 *      derived from this software without specific prior written
     28 *      permission.
     29 *
     30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     41 *
     42 * Author: Tom Tucker <tom@opengridcomputing.com>
     43 */
     44
     45/* Operation
     46 *
     47 * The main entry point is svc_rdma_recvfrom. This is called from
     48 * svc_recv when the transport indicates there is incoming data to
     49 * be read. "Data Ready" is signaled when an RDMA Receive completes,
     50 * or when a set of RDMA Reads complete.
     51 *
     52 * An svc_rqst is passed in. This structure contains an array of
     53 * free pages (rq_pages) that will contain the incoming RPC message.
     54 *
     55 * Short messages are moved directly into svc_rqst::rq_arg, and
     56 * the RPC Call is ready to be processed by the Upper Layer.
     57 * svc_rdma_recvfrom returns the length of the RPC Call message,
     58 * completing the reception of the RPC Call.
     59 *
     60 * However, when an incoming message has Read chunks,
     61 * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
     62 * data payload from the client. svc_rdma_recvfrom sets up the
     63 * RDMA Reads using pages in svc_rqst::rq_pages, which are
     64 * transferred to an svc_rdma_recv_ctxt for the duration of the
     65 * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
     66 * is still not yet ready.
     67 *
     68 * When the Read chunk payloads have become available on the
     69 * server, "Data Ready" is raised again, and svc_recv calls
     70 * svc_rdma_recvfrom again. This second call may use a different
     71 * svc_rqst than the first one, thus any information that needs
     72 * to be preserved across these two calls is kept in an
     73 * svc_rdma_recv_ctxt.
     74 *
     75 * The second call to svc_rdma_recvfrom performs final assembly
     76 * of the RPC Call message, using the RDMA Read sink pages kept in
     77 * the svc_rdma_recv_ctxt. The xdr_buf is copied from the
     78 * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns
     79 * the length of the completed RPC Call message.
     80 *
     81 * Page Management
     82 *
     83 * Pages under I/O must be transferred from the first svc_rqst to an
     84 * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns.
     85 *
     86 * The first svc_rqst supplies pages for RDMA Reads. These are moved
     87 * from rqstp::rq_pages into ctxt::pages. The consumed elements of
     88 * the rq_pages array are set to NULL and refilled with the first
     89 * svc_rdma_recvfrom call returns.
     90 *
     91 * During the second svc_rdma_recvfrom call, RDMA Read sink pages
     92 * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst.
     93 */
     94
     95#include <linux/slab.h>
     96#include <linux/spinlock.h>
     97#include <asm/unaligned.h>
     98#include <rdma/ib_verbs.h>
     99#include <rdma/rdma_cm.h>
    100
    101#include <linux/sunrpc/xdr.h>
    102#include <linux/sunrpc/debug.h>
    103#include <linux/sunrpc/rpc_rdma.h>
    104#include <linux/sunrpc/svc_rdma.h>
    105
    106#include "xprt_rdma.h"
    107#include <trace/events/rpcrdma.h>
    108
    109static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
    110
    111static inline struct svc_rdma_recv_ctxt *
    112svc_rdma_next_recv_ctxt(struct list_head *list)
    113{
    114	return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
    115					rc_list);
    116}
    117
    118static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma,
    119				   struct rpc_rdma_cid *cid)
    120{
    121	cid->ci_queue_id = rdma->sc_rq_cq->res.id;
    122	cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
    123}
    124
    125static struct svc_rdma_recv_ctxt *
    126svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
    127{
    128	struct svc_rdma_recv_ctxt *ctxt;
    129	dma_addr_t addr;
    130	void *buffer;
    131
    132	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
    133	if (!ctxt)
    134		goto fail0;
    135	buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
    136	if (!buffer)
    137		goto fail1;
    138	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
    139				 rdma->sc_max_req_size, DMA_FROM_DEVICE);
    140	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
    141		goto fail2;
    142
    143	svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
    144	pcl_init(&ctxt->rc_call_pcl);
    145	pcl_init(&ctxt->rc_read_pcl);
    146	pcl_init(&ctxt->rc_write_pcl);
    147	pcl_init(&ctxt->rc_reply_pcl);
    148
    149	ctxt->rc_recv_wr.next = NULL;
    150	ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
    151	ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge;
    152	ctxt->rc_recv_wr.num_sge = 1;
    153	ctxt->rc_cqe.done = svc_rdma_wc_receive;
    154	ctxt->rc_recv_sge.addr = addr;
    155	ctxt->rc_recv_sge.length = rdma->sc_max_req_size;
    156	ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey;
    157	ctxt->rc_recv_buf = buffer;
    158	ctxt->rc_temp = false;
    159	return ctxt;
    160
    161fail2:
    162	kfree(buffer);
    163fail1:
    164	kfree(ctxt);
    165fail0:
    166	return NULL;
    167}
    168
    169static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
    170				       struct svc_rdma_recv_ctxt *ctxt)
    171{
    172	ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr,
    173			    ctxt->rc_recv_sge.length, DMA_FROM_DEVICE);
    174	kfree(ctxt->rc_recv_buf);
    175	kfree(ctxt);
    176}
    177
    178/**
    179 * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt
    180 * @rdma: svcxprt_rdma being torn down
    181 *
    182 */
    183void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
    184{
    185	struct svc_rdma_recv_ctxt *ctxt;
    186	struct llist_node *node;
    187
    188	while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
    189		ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
    190		svc_rdma_recv_ctxt_destroy(rdma, ctxt);
    191	}
    192}
    193
    194/**
    195 * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt
    196 * @rdma: controlling svcxprt_rdma
    197 *
    198 * Returns a recv_ctxt or (rarely) NULL if none are available.
    199 */
    200struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
    201{
    202	struct svc_rdma_recv_ctxt *ctxt;
    203	struct llist_node *node;
    204
    205	node = llist_del_first(&rdma->sc_recv_ctxts);
    206	if (!node)
    207		goto out_empty;
    208	ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
    209
    210out:
    211	ctxt->rc_page_count = 0;
    212	return ctxt;
    213
    214out_empty:
    215	ctxt = svc_rdma_recv_ctxt_alloc(rdma);
    216	if (!ctxt)
    217		return NULL;
    218	goto out;
    219}
    220
    221/**
    222 * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list
    223 * @rdma: controlling svcxprt_rdma
    224 * @ctxt: object to return to the free list
    225 *
    226 */
    227void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
    228			    struct svc_rdma_recv_ctxt *ctxt)
    229{
    230	pcl_free(&ctxt->rc_call_pcl);
    231	pcl_free(&ctxt->rc_read_pcl);
    232	pcl_free(&ctxt->rc_write_pcl);
    233	pcl_free(&ctxt->rc_reply_pcl);
    234
    235	if (!ctxt->rc_temp)
    236		llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
    237	else
    238		svc_rdma_recv_ctxt_destroy(rdma, ctxt);
    239}
    240
    241/**
    242 * svc_rdma_release_rqst - Release transport-specific per-rqst resources
    243 * @rqstp: svc_rqst being released
    244 *
    245 * Ensure that the recv_ctxt is released whether or not a Reply
    246 * was sent. For example, the client could close the connection,
    247 * or svc_process could drop an RPC, before the Reply is sent.
    248 */
    249void svc_rdma_release_rqst(struct svc_rqst *rqstp)
    250{
    251	struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt;
    252	struct svc_xprt *xprt = rqstp->rq_xprt;
    253	struct svcxprt_rdma *rdma =
    254		container_of(xprt, struct svcxprt_rdma, sc_xprt);
    255
    256	rqstp->rq_xprt_ctxt = NULL;
    257	if (ctxt)
    258		svc_rdma_recv_ctxt_put(rdma, ctxt);
    259}
    260
    261static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
    262				   unsigned int wanted, bool temp)
    263{
    264	const struct ib_recv_wr *bad_wr = NULL;
    265	struct svc_rdma_recv_ctxt *ctxt;
    266	struct ib_recv_wr *recv_chain;
    267	int ret;
    268
    269	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
    270		return false;
    271
    272	recv_chain = NULL;
    273	while (wanted--) {
    274		ctxt = svc_rdma_recv_ctxt_get(rdma);
    275		if (!ctxt)
    276			break;
    277
    278		trace_svcrdma_post_recv(ctxt);
    279		ctxt->rc_temp = temp;
    280		ctxt->rc_recv_wr.next = recv_chain;
    281		recv_chain = &ctxt->rc_recv_wr;
    282		rdma->sc_pending_recvs++;
    283	}
    284	if (!recv_chain)
    285		return false;
    286
    287	ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
    288	if (ret)
    289		goto err_free;
    290	return true;
    291
    292err_free:
    293	trace_svcrdma_rq_post_err(rdma, ret);
    294	while (bad_wr) {
    295		ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
    296				    rc_recv_wr);
    297		bad_wr = bad_wr->next;
    298		svc_rdma_recv_ctxt_put(rdma, ctxt);
    299	}
    300	/* Since we're destroying the xprt, no need to reset
    301	 * sc_pending_recvs. */
    302	return false;
    303}
    304
    305/**
    306 * svc_rdma_post_recvs - Post initial set of Recv WRs
    307 * @rdma: fresh svcxprt_rdma
    308 *
    309 * Returns true if successful, otherwise false.
    310 */
    311bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
    312{
    313	return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
    314}
    315
    316/**
    317 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
    318 * @cq: Completion Queue context
    319 * @wc: Work Completion object
    320 *
    321 */
    322static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
    323{
    324	struct svcxprt_rdma *rdma = cq->cq_context;
    325	struct ib_cqe *cqe = wc->wr_cqe;
    326	struct svc_rdma_recv_ctxt *ctxt;
    327
    328	rdma->sc_pending_recvs--;
    329
    330	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
    331	ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
    332
    333	if (wc->status != IB_WC_SUCCESS)
    334		goto flushed;
    335	trace_svcrdma_wc_recv(wc, &ctxt->rc_cid);
    336
    337	/* If receive posting fails, the connection is about to be
    338	 * lost anyway. The server will not be able to send a reply
    339	 * for this RPC, and the client will retransmit this RPC
    340	 * anyway when it reconnects.
    341	 *
    342	 * Therefore we drop the Receive, even if status was SUCCESS
    343	 * to reduce the likelihood of replayed requests once the
    344	 * client reconnects.
    345	 */
    346	if (rdma->sc_pending_recvs < rdma->sc_max_requests)
    347		if (!svc_rdma_refresh_recvs(rdma, rdma->sc_recv_batch, false))
    348			goto dropped;
    349
    350	/* All wc fields are now known to be valid */
    351	ctxt->rc_byte_len = wc->byte_len;
    352
    353	spin_lock(&rdma->sc_rq_dto_lock);
    354	list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
    355	/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
    356	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
    357	spin_unlock(&rdma->sc_rq_dto_lock);
    358	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
    359		svc_xprt_enqueue(&rdma->sc_xprt);
    360	return;
    361
    362flushed:
    363	if (wc->status == IB_WC_WR_FLUSH_ERR)
    364		trace_svcrdma_wc_recv_flush(wc, &ctxt->rc_cid);
    365	else
    366		trace_svcrdma_wc_recv_err(wc, &ctxt->rc_cid);
    367dropped:
    368	svc_rdma_recv_ctxt_put(rdma, ctxt);
    369	svc_xprt_deferred_close(&rdma->sc_xprt);
    370}
    371
    372/**
    373 * svc_rdma_flush_recv_queues - Drain pending Receive work
    374 * @rdma: svcxprt_rdma being shut down
    375 *
    376 */
    377void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
    378{
    379	struct svc_rdma_recv_ctxt *ctxt;
    380
    381	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
    382		list_del(&ctxt->rc_list);
    383		svc_rdma_recv_ctxt_put(rdma, ctxt);
    384	}
    385}
    386
    387static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
    388				   struct svc_rdma_recv_ctxt *ctxt)
    389{
    390	struct xdr_buf *arg = &rqstp->rq_arg;
    391
    392	arg->head[0].iov_base = ctxt->rc_recv_buf;
    393	arg->head[0].iov_len = ctxt->rc_byte_len;
    394	arg->tail[0].iov_base = NULL;
    395	arg->tail[0].iov_len = 0;
    396	arg->page_len = 0;
    397	arg->page_base = 0;
    398	arg->buflen = ctxt->rc_byte_len;
    399	arg->len = ctxt->rc_byte_len;
    400}
    401
    402/**
    403 * xdr_count_read_segments - Count number of Read segments in Read list
    404 * @rctxt: Ingress receive context
    405 * @p: Start of an un-decoded Read list
    406 *
    407 * Before allocating anything, ensure the ingress Read list is safe
    408 * to use.
    409 *
    410 * The segment count is limited to how many segments can fit in the
    411 * transport header without overflowing the buffer. That's about 40
    412 * Read segments for a 1KB inline threshold.
    413 *
    414 * Return values:
    415 *   %true: Read list is valid. @rctxt's xdr_stream is updated to point
    416 *	    to the first byte past the Read list. rc_read_pcl and
    417 *	    rc_call_pcl cl_count fields are set to the number of
    418 *	    Read segments in the list.
    419 *  %false: Read list is corrupt. @rctxt's xdr_stream is left in an
    420 *	    unknown state.
    421 */
    422static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
    423{
    424	rctxt->rc_call_pcl.cl_count = 0;
    425	rctxt->rc_read_pcl.cl_count = 0;
    426	while (xdr_item_is_present(p)) {
    427		u32 position, handle, length;
    428		u64 offset;
    429
    430		p = xdr_inline_decode(&rctxt->rc_stream,
    431				      rpcrdma_readseg_maxsz * sizeof(*p));
    432		if (!p)
    433			return false;
    434
    435		xdr_decode_read_segment(p, &position, &handle,
    436					    &length, &offset);
    437		if (position) {
    438			if (position & 3)
    439				return false;
    440			++rctxt->rc_read_pcl.cl_count;
    441		} else {
    442			++rctxt->rc_call_pcl.cl_count;
    443		}
    444
    445		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
    446		if (!p)
    447			return false;
    448	}
    449	return true;
    450}
    451
    452/* Sanity check the Read list.
    453 *
    454 * Sanity checks:
    455 * - Read list does not overflow Receive buffer.
    456 * - Chunk size limited by largest NFS data payload.
    457 *
    458 * Return values:
    459 *   %true: Read list is valid. @rctxt's xdr_stream is updated
    460 *	    to point to the first byte past the Read list.
    461 *  %false: Read list is corrupt. @rctxt's xdr_stream is left
    462 *	    in an unknown state.
    463 */
    464static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
    465{
    466	__be32 *p;
    467
    468	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
    469	if (!p)
    470		return false;
    471	if (!xdr_count_read_segments(rctxt, p))
    472		return false;
    473	if (!pcl_alloc_call(rctxt, p))
    474		return false;
    475	return pcl_alloc_read(rctxt, p);
    476}
    477
    478static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt)
    479{
    480	u32 segcount;
    481	__be32 *p;
    482
    483	if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount))
    484		return false;
    485
    486	/* A bogus segcount causes this buffer overflow check to fail. */
    487	p = xdr_inline_decode(&rctxt->rc_stream,
    488			      segcount * rpcrdma_segment_maxsz * sizeof(*p));
    489	return p != NULL;
    490}
    491
    492/**
    493 * xdr_count_write_chunks - Count number of Write chunks in Write list
    494 * @rctxt: Received header and decoding state
    495 * @p: start of an un-decoded Write list
    496 *
    497 * Before allocating anything, ensure the ingress Write list is
    498 * safe to use.
    499 *
    500 * Return values:
    501 *       %true: Write list is valid. @rctxt's xdr_stream is updated
    502 *		to point to the first byte past the Write list, and
    503 *		the number of Write chunks is in rc_write_pcl.cl_count.
    504 *      %false: Write list is corrupt. @rctxt's xdr_stream is left
    505 *		in an indeterminate state.
    506 */
    507static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
    508{
    509	rctxt->rc_write_pcl.cl_count = 0;
    510	while (xdr_item_is_present(p)) {
    511		if (!xdr_check_write_chunk(rctxt))
    512			return false;
    513		++rctxt->rc_write_pcl.cl_count;
    514		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
    515		if (!p)
    516			return false;
    517	}
    518	return true;
    519}
    520
    521/* Sanity check the Write list.
    522 *
    523 * Implementation limits:
    524 * - This implementation currently supports only one Write chunk.
    525 *
    526 * Sanity checks:
    527 * - Write list does not overflow Receive buffer.
    528 * - Chunk size limited by largest NFS data payload.
    529 *
    530 * Return values:
    531 *       %true: Write list is valid. @rctxt's xdr_stream is updated
    532 *		to point to the first byte past the Write list.
    533 *      %false: Write list is corrupt. @rctxt's xdr_stream is left
    534 *		in an unknown state.
    535 */
    536static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
    537{
    538	__be32 *p;
    539
    540	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
    541	if (!p)
    542		return false;
    543	if (!xdr_count_write_chunks(rctxt, p))
    544		return false;
    545	if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
    546		return false;
    547
    548	rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
    549	return true;
    550}
    551
    552/* Sanity check the Reply chunk.
    553 *
    554 * Sanity checks:
    555 * - Reply chunk does not overflow Receive buffer.
    556 * - Chunk size limited by largest NFS data payload.
    557 *
    558 * Return values:
    559 *       %true: Reply chunk is valid. @rctxt's xdr_stream is updated
    560 *		to point to the first byte past the Reply chunk.
    561 *      %false: Reply chunk is corrupt. @rctxt's xdr_stream is left
    562 *		in an unknown state.
    563 */
    564static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
    565{
    566	__be32 *p;
    567
    568	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
    569	if (!p)
    570		return false;
    571
    572	if (!xdr_item_is_present(p))
    573		return true;
    574	if (!xdr_check_write_chunk(rctxt))
    575		return false;
    576
    577	rctxt->rc_reply_pcl.cl_count = 1;
    578	return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
    579}
    580
    581/* RPC-over-RDMA Version One private extension: Remote Invalidation.
    582 * Responder's choice: requester signals it can handle Send With
    583 * Invalidate, and responder chooses one R_key to invalidate.
    584 *
    585 * If there is exactly one distinct R_key in the received transport
    586 * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
    587 */
    588static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
    589				  struct svc_rdma_recv_ctxt *ctxt)
    590{
    591	struct svc_rdma_segment *segment;
    592	struct svc_rdma_chunk *chunk;
    593	u32 inv_rkey;
    594
    595	ctxt->rc_inv_rkey = 0;
    596
    597	if (!rdma->sc_snd_w_inv)
    598		return;
    599
    600	inv_rkey = 0;
    601	pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) {
    602		pcl_for_each_segment(segment, chunk) {
    603			if (inv_rkey == 0)
    604				inv_rkey = segment->rs_handle;
    605			else if (inv_rkey != segment->rs_handle)
    606				return;
    607		}
    608	}
    609	pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) {
    610		pcl_for_each_segment(segment, chunk) {
    611			if (inv_rkey == 0)
    612				inv_rkey = segment->rs_handle;
    613			else if (inv_rkey != segment->rs_handle)
    614				return;
    615		}
    616	}
    617	pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) {
    618		pcl_for_each_segment(segment, chunk) {
    619			if (inv_rkey == 0)
    620				inv_rkey = segment->rs_handle;
    621			else if (inv_rkey != segment->rs_handle)
    622				return;
    623		}
    624	}
    625	pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) {
    626		pcl_for_each_segment(segment, chunk) {
    627			if (inv_rkey == 0)
    628				inv_rkey = segment->rs_handle;
    629			else if (inv_rkey != segment->rs_handle)
    630				return;
    631		}
    632	}
    633	ctxt->rc_inv_rkey = inv_rkey;
    634}
    635
    636/**
    637 * svc_rdma_xdr_decode_req - Decode the transport header
    638 * @rq_arg: xdr_buf containing ingress RPC/RDMA message
    639 * @rctxt: state of decoding
    640 *
    641 * On entry, xdr->head[0].iov_base points to first byte of the
    642 * RPC-over-RDMA transport header.
    643 *
    644 * On successful exit, head[0] points to first byte past the
    645 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
    646 *
    647 * The length of the RPC-over-RDMA header is returned.
    648 *
    649 * Assumptions:
    650 * - The transport header is entirely contained in the head iovec.
    651 */
    652static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
    653				   struct svc_rdma_recv_ctxt *rctxt)
    654{
    655	__be32 *p, *rdma_argp;
    656	unsigned int hdr_len;
    657
    658	rdma_argp = rq_arg->head[0].iov_base;
    659	xdr_init_decode(&rctxt->rc_stream, rq_arg, rdma_argp, NULL);
    660
    661	p = xdr_inline_decode(&rctxt->rc_stream,
    662			      rpcrdma_fixed_maxsz * sizeof(*p));
    663	if (unlikely(!p))
    664		goto out_short;
    665	p++;
    666	if (*p != rpcrdma_version)
    667		goto out_version;
    668	p += 2;
    669	rctxt->rc_msgtype = *p;
    670	switch (rctxt->rc_msgtype) {
    671	case rdma_msg:
    672		break;
    673	case rdma_nomsg:
    674		break;
    675	case rdma_done:
    676		goto out_drop;
    677	case rdma_error:
    678		goto out_drop;
    679	default:
    680		goto out_proc;
    681	}
    682
    683	if (!xdr_check_read_list(rctxt))
    684		goto out_inval;
    685	if (!xdr_check_write_list(rctxt))
    686		goto out_inval;
    687	if (!xdr_check_reply_chunk(rctxt))
    688		goto out_inval;
    689
    690	rq_arg->head[0].iov_base = rctxt->rc_stream.p;
    691	hdr_len = xdr_stream_pos(&rctxt->rc_stream);
    692	rq_arg->head[0].iov_len -= hdr_len;
    693	rq_arg->len -= hdr_len;
    694	trace_svcrdma_decode_rqst(rctxt, rdma_argp, hdr_len);
    695	return hdr_len;
    696
    697out_short:
    698	trace_svcrdma_decode_short_err(rctxt, rq_arg->len);
    699	return -EINVAL;
    700
    701out_version:
    702	trace_svcrdma_decode_badvers_err(rctxt, rdma_argp);
    703	return -EPROTONOSUPPORT;
    704
    705out_drop:
    706	trace_svcrdma_decode_drop_err(rctxt, rdma_argp);
    707	return 0;
    708
    709out_proc:
    710	trace_svcrdma_decode_badproc_err(rctxt, rdma_argp);
    711	return -EINVAL;
    712
    713out_inval:
    714	trace_svcrdma_decode_parse_err(rctxt, rdma_argp);
    715	return -EINVAL;
    716}
    717
    718static void svc_rdma_send_error(struct svcxprt_rdma *rdma,
    719				struct svc_rdma_recv_ctxt *rctxt,
    720				int status)
    721{
    722	struct svc_rdma_send_ctxt *sctxt;
    723
    724	sctxt = svc_rdma_send_ctxt_get(rdma);
    725	if (!sctxt)
    726		return;
    727	svc_rdma_send_error_msg(rdma, sctxt, rctxt, status);
    728}
    729
    730/* By convention, backchannel calls arrive via rdma_msg type
    731 * messages, and never populate the chunk lists. This makes
    732 * the RPC/RDMA header small and fixed in size, so it is
    733 * straightforward to check the RPC header's direction field.
    734 */
    735static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt,
    736						struct svc_rdma_recv_ctxt *rctxt)
    737{
    738	__be32 *p = rctxt->rc_recv_buf;
    739
    740	if (!xprt->xpt_bc_xprt)
    741		return false;
    742
    743	if (rctxt->rc_msgtype != rdma_msg)
    744		return false;
    745
    746	if (!pcl_is_empty(&rctxt->rc_call_pcl))
    747		return false;
    748	if (!pcl_is_empty(&rctxt->rc_read_pcl))
    749		return false;
    750	if (!pcl_is_empty(&rctxt->rc_write_pcl))
    751		return false;
    752	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
    753		return false;
    754
    755	/* RPC call direction */
    756	if (*(p + 8) == cpu_to_be32(RPC_CALL))
    757		return false;
    758
    759	return true;
    760}
    761
    762/**
    763 * svc_rdma_recvfrom - Receive an RPC call
    764 * @rqstp: request structure into which to receive an RPC Call
    765 *
    766 * Returns:
    767 *	The positive number of bytes in the RPC Call message,
    768 *	%0 if there were no Calls ready to return,
    769 *	%-EINVAL if the Read chunk data is too large,
    770 *	%-ENOMEM if rdma_rw context pool was exhausted,
    771 *	%-ENOTCONN if posting failed (connection is lost),
    772 *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
    773 *
    774 * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
    775 * when there are no remaining ctxt's to process.
    776 *
    777 * The next ctxt is removed from the "receive" lists.
    778 *
    779 * - If the ctxt completes a Read, then finish assembling the Call
    780 *   message and return the number of bytes in the message.
    781 *
    782 * - If the ctxt completes a Receive, then construct the Call
    783 *   message from the contents of the Receive buffer.
    784 *
    785 *   - If there are no Read chunks in this message, then finish
    786 *     assembling the Call message and return the number of bytes
    787 *     in the message.
    788 *
    789 *   - If there are Read chunks in this message, post Read WRs to
    790 *     pull that payload and return 0.
    791 */
    792int svc_rdma_recvfrom(struct svc_rqst *rqstp)
    793{
    794	struct svc_xprt *xprt = rqstp->rq_xprt;
    795	struct svcxprt_rdma *rdma_xprt =
    796		container_of(xprt, struct svcxprt_rdma, sc_xprt);
    797	struct svc_rdma_recv_ctxt *ctxt;
    798	int ret;
    799
    800	rqstp->rq_xprt_ctxt = NULL;
    801
    802	ctxt = NULL;
    803	spin_lock(&rdma_xprt->sc_rq_dto_lock);
    804	ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
    805	if (ctxt)
    806		list_del(&ctxt->rc_list);
    807	else
    808		/* No new incoming requests, terminate the loop */
    809		clear_bit(XPT_DATA, &xprt->xpt_flags);
    810	spin_unlock(&rdma_xprt->sc_rq_dto_lock);
    811
    812	/* Unblock the transport for the next receive */
    813	svc_xprt_received(xprt);
    814	if (!ctxt)
    815		return 0;
    816
    817	percpu_counter_inc(&svcrdma_stat_recv);
    818	ib_dma_sync_single_for_cpu(rdma_xprt->sc_pd->device,
    819				   ctxt->rc_recv_sge.addr, ctxt->rc_byte_len,
    820				   DMA_FROM_DEVICE);
    821	svc_rdma_build_arg_xdr(rqstp, ctxt);
    822
    823	/* Prevent svc_xprt_release from releasing pages in rq_pages
    824	 * if we return 0 or an error.
    825	 */
    826	rqstp->rq_respages = rqstp->rq_pages;
    827	rqstp->rq_next_page = rqstp->rq_respages;
    828
    829	ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
    830	if (ret < 0)
    831		goto out_err;
    832	if (ret == 0)
    833		goto out_drop;
    834
    835	if (svc_rdma_is_reverse_direction_reply(xprt, ctxt))
    836		goto out_backchannel;
    837
    838	svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
    839
    840	if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
    841	    !pcl_is_empty(&ctxt->rc_call_pcl)) {
    842		ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
    843		if (ret < 0)
    844			goto out_readfail;
    845	}
    846
    847	rqstp->rq_xprt_ctxt = ctxt;
    848	rqstp->rq_prot = IPPROTO_MAX;
    849	svc_xprt_copy_addrs(rqstp, xprt);
    850	return rqstp->rq_arg.len;
    851
    852out_err:
    853	svc_rdma_send_error(rdma_xprt, ctxt, ret);
    854	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
    855	return 0;
    856
    857out_readfail:
    858	if (ret == -EINVAL)
    859		svc_rdma_send_error(rdma_xprt, ctxt, ret);
    860	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
    861	return ret;
    862
    863out_backchannel:
    864	svc_rdma_handle_bc_reply(rqstp, ctxt);
    865out_drop:
    866	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
    867	return 0;
    868}