svc_rdma_pcl.c (7900B)
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Copyright (c) 2020 Oracle. All rights reserved. 4 */ 5 6#include <linux/sunrpc/svc_rdma.h> 7#include <linux/sunrpc/rpc_rdma.h> 8 9#include "xprt_rdma.h" 10#include <trace/events/rpcrdma.h> 11 12/** 13 * pcl_free - Release all memory associated with a parsed chunk list 14 * @pcl: parsed chunk list 15 * 16 */ 17void pcl_free(struct svc_rdma_pcl *pcl) 18{ 19 while (!list_empty(&pcl->cl_chunks)) { 20 struct svc_rdma_chunk *chunk; 21 22 chunk = pcl_first_chunk(pcl); 23 list_del(&chunk->ch_list); 24 kfree(chunk); 25 } 26} 27 28static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position) 29{ 30 struct svc_rdma_chunk *chunk; 31 32 chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL); 33 if (!chunk) 34 return NULL; 35 36 chunk->ch_position = position; 37 chunk->ch_length = 0; 38 chunk->ch_payload_length = 0; 39 chunk->ch_segcount = 0; 40 return chunk; 41} 42 43static struct svc_rdma_chunk * 44pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position) 45{ 46 struct svc_rdma_chunk *pos; 47 48 pcl_for_each_chunk(pos, pcl) { 49 if (pos->ch_position == position) 50 return pos; 51 } 52 return NULL; 53} 54 55static void pcl_insert_position(struct svc_rdma_pcl *pcl, 56 struct svc_rdma_chunk *chunk) 57{ 58 struct svc_rdma_chunk *pos; 59 60 pcl_for_each_chunk(pos, pcl) { 61 if (pos->ch_position > chunk->ch_position) 62 break; 63 } 64 __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list); 65 pcl->cl_count++; 66} 67 68static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt, 69 struct svc_rdma_chunk *chunk, 70 u32 handle, u32 length, u64 offset) 71{ 72 struct svc_rdma_segment *segment; 73 74 segment = &chunk->ch_segments[chunk->ch_segcount]; 75 segment->rs_handle = handle; 76 segment->rs_length = length; 77 segment->rs_offset = offset; 78 79 trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment); 80 81 chunk->ch_length += length; 82 chunk->ch_segcount++; 83} 84 85/** 86 * pcl_alloc_call - Construct a parsed chunk list for the Call body 87 * @rctxt: Ingress receive context 88 * @p: Start of an un-decoded Read list 89 * 90 * Assumptions: 91 * - The incoming Read list has already been sanity checked. 92 * - cl_count is already set to the number of segments in 93 * the un-decoded list. 94 * - The list might not be in order by position. 95 * 96 * Return values: 97 * %true: Parsed chunk list was successfully constructed, and 98 * cl_count is updated to be the number of chunks (ie. 99 * unique positions) in the Read list. 100 * %false: Memory allocation failed. 101 */ 102bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 103{ 104 struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl; 105 unsigned int i, segcount = pcl->cl_count; 106 107 pcl->cl_count = 0; 108 for (i = 0; i < segcount; i++) { 109 struct svc_rdma_chunk *chunk; 110 u32 position, handle, length; 111 u64 offset; 112 113 p++; /* skip the list discriminator */ 114 p = xdr_decode_read_segment(p, &position, &handle, 115 &length, &offset); 116 if (position != 0) 117 continue; 118 119 if (pcl_is_empty(pcl)) { 120 chunk = pcl_alloc_chunk(segcount, position); 121 if (!chunk) 122 return false; 123 pcl_insert_position(pcl, chunk); 124 } else { 125 chunk = list_first_entry(&pcl->cl_chunks, 126 struct svc_rdma_chunk, 127 ch_list); 128 } 129 130 pcl_set_read_segment(rctxt, chunk, handle, length, offset); 131 } 132 133 return true; 134} 135 136/** 137 * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks 138 * @rctxt: Ingress receive context 139 * @p: Start of an un-decoded Read list 140 * 141 * Assumptions: 142 * - The incoming Read list has already been sanity checked. 143 * - cl_count is already set to the number of segments in 144 * the un-decoded list. 145 * - The list might not be in order by position. 146 * 147 * Return values: 148 * %true: Parsed chunk list was successfully constructed, and 149 * cl_count is updated to be the number of chunks (ie. 150 * unique position values) in the Read list. 151 * %false: Memory allocation failed. 152 * 153 * TODO: 154 * - Check for chunk range overlaps 155 */ 156bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 157{ 158 struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl; 159 unsigned int i, segcount = pcl->cl_count; 160 161 pcl->cl_count = 0; 162 for (i = 0; i < segcount; i++) { 163 struct svc_rdma_chunk *chunk; 164 u32 position, handle, length; 165 u64 offset; 166 167 p++; /* skip the list discriminator */ 168 p = xdr_decode_read_segment(p, &position, &handle, 169 &length, &offset); 170 if (position == 0) 171 continue; 172 173 chunk = pcl_lookup_position(pcl, position); 174 if (!chunk) { 175 chunk = pcl_alloc_chunk(segcount, position); 176 if (!chunk) 177 return false; 178 pcl_insert_position(pcl, chunk); 179 } 180 181 pcl_set_read_segment(rctxt, chunk, handle, length, offset); 182 } 183 184 return true; 185} 186 187/** 188 * pcl_alloc_write - Construct a parsed chunk list from a Write list 189 * @rctxt: Ingress receive context 190 * @pcl: Parsed chunk list to populate 191 * @p: Start of an un-decoded Write list 192 * 193 * Assumptions: 194 * - The incoming Write list has already been sanity checked, and 195 * - cl_count is set to the number of chunks in the un-decoded list. 196 * 197 * Return values: 198 * %true: Parsed chunk list was successfully constructed. 199 * %false: Memory allocation failed. 200 */ 201bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt, 202 struct svc_rdma_pcl *pcl, __be32 *p) 203{ 204 struct svc_rdma_segment *segment; 205 struct svc_rdma_chunk *chunk; 206 unsigned int i, j; 207 u32 segcount; 208 209 for (i = 0; i < pcl->cl_count; i++) { 210 p++; /* skip the list discriminator */ 211 segcount = be32_to_cpup(p++); 212 213 chunk = pcl_alloc_chunk(segcount, 0); 214 if (!chunk) 215 return false; 216 list_add_tail(&chunk->ch_list, &pcl->cl_chunks); 217 218 for (j = 0; j < segcount; j++) { 219 segment = &chunk->ch_segments[j]; 220 p = xdr_decode_rdma_segment(p, &segment->rs_handle, 221 &segment->rs_length, 222 &segment->rs_offset); 223 trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j); 224 225 chunk->ch_length += segment->rs_length; 226 chunk->ch_segcount++; 227 } 228 } 229 return true; 230} 231 232static int pcl_process_region(const struct xdr_buf *xdr, 233 unsigned int offset, unsigned int length, 234 int (*actor)(const struct xdr_buf *, void *), 235 void *data) 236{ 237 struct xdr_buf subbuf; 238 239 if (!length) 240 return 0; 241 if (xdr_buf_subsegment(xdr, &subbuf, offset, length)) 242 return -EMSGSIZE; 243 return actor(&subbuf, data); 244} 245 246/** 247 * pcl_process_nonpayloads - Process non-payload regions inside @xdr 248 * @pcl: Chunk list to process 249 * @xdr: xdr_buf to process 250 * @actor: Function to invoke on each non-payload region 251 * @data: Arguments for @actor 252 * 253 * This mechanism must ignore not only result payloads that were already 254 * sent via RDMA Write, but also XDR padding for those payloads that 255 * the upper layer has added. 256 * 257 * Assumptions: 258 * The xdr->len and ch_position fields are aligned to 4-byte multiples. 259 * 260 * Returns: 261 * On success, zero, 262 * %-EMSGSIZE on XDR buffer overflow, or 263 * The return value of @actor 264 */ 265int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl, 266 const struct xdr_buf *xdr, 267 int (*actor)(const struct xdr_buf *, void *), 268 void *data) 269{ 270 struct svc_rdma_chunk *chunk, *next; 271 unsigned int start; 272 int ret; 273 274 chunk = pcl_first_chunk(pcl); 275 276 /* No result payloads were generated */ 277 if (!chunk || !chunk->ch_payload_length) 278 return actor(xdr, data); 279 280 /* Process the region before the first result payload */ 281 ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data); 282 if (ret < 0) 283 return ret; 284 285 /* Process the regions between each middle result payload */ 286 while ((next = pcl_next_chunk(pcl, chunk))) { 287 if (!next->ch_payload_length) 288 break; 289 290 start = pcl_chunk_end_offset(chunk); 291 ret = pcl_process_region(xdr, start, next->ch_position - start, 292 actor, data); 293 if (ret < 0) 294 return ret; 295 296 chunk = next; 297 } 298 299 /* Process the region after the last result payload */ 300 start = pcl_chunk_end_offset(chunk); 301 ret = pcl_process_region(xdr, start, xdr->len - start, actor, data); 302 if (ret < 0) 303 return ret; 304 305 return 0; 306}