smc_rx.c (12222B)
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Shared Memory Communications over RDMA (SMC-R) and RoCE 4 * 5 * Manage RMBE 6 * copy new RMBE data into user space 7 * 8 * Copyright IBM Corp. 2016 9 * 10 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> 11 */ 12 13#include <linux/net.h> 14#include <linux/rcupdate.h> 15#include <linux/sched/signal.h> 16 17#include <net/sock.h> 18 19#include "smc.h" 20#include "smc_core.h" 21#include "smc_cdc.h" 22#include "smc_tx.h" /* smc_tx_consumer_update() */ 23#include "smc_rx.h" 24#include "smc_stats.h" 25#include "smc_tracepoint.h" 26 27/* callback implementation to wakeup consumers blocked with smc_rx_wait(). 28 * indirectly called by smc_cdc_msg_recv_action(). 29 */ 30static void smc_rx_wake_up(struct sock *sk) 31{ 32 struct socket_wq *wq; 33 34 /* derived from sock_def_readable() */ 35 /* called already in smc_listen_work() */ 36 rcu_read_lock(); 37 wq = rcu_dereference(sk->sk_wq); 38 if (skwq_has_sleeper(wq)) 39 wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | 40 EPOLLRDNORM | EPOLLRDBAND); 41 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); 42 if ((sk->sk_shutdown == SHUTDOWN_MASK) || 43 (sk->sk_state == SMC_CLOSED)) 44 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); 45 rcu_read_unlock(); 46} 47 48/* Update consumer cursor 49 * @conn connection to update 50 * @cons consumer cursor 51 * @len number of Bytes consumed 52 * Returns: 53 * 1 if we should end our receive, 0 otherwise 54 */ 55static int smc_rx_update_consumer(struct smc_sock *smc, 56 union smc_host_cursor cons, size_t len) 57{ 58 struct smc_connection *conn = &smc->conn; 59 struct sock *sk = &smc->sk; 60 bool force = false; 61 int diff, rc = 0; 62 63 smc_curs_add(conn->rmb_desc->len, &cons, len); 64 65 /* did we process urgent data? */ 66 if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { 67 diff = smc_curs_comp(conn->rmb_desc->len, &cons, 68 &conn->urg_curs); 69 if (sock_flag(sk, SOCK_URGINLINE)) { 70 if (diff == 0) { 71 force = true; 72 rc = 1; 73 conn->urg_state = SMC_URG_READ; 74 } 75 } else { 76 if (diff == 1) { 77 /* skip urgent byte */ 78 force = true; 79 smc_curs_add(conn->rmb_desc->len, &cons, 1); 80 conn->urg_rx_skip_pend = false; 81 } else if (diff < -1) 82 /* we read past urgent byte */ 83 conn->urg_state = SMC_URG_READ; 84 } 85 } 86 87 smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn); 88 89 /* send consumer cursor update if required */ 90 /* similar to advertising new TCP rcv_wnd if required */ 91 smc_tx_consumer_update(conn, force); 92 93 return rc; 94} 95 96static void smc_rx_update_cons(struct smc_sock *smc, size_t len) 97{ 98 struct smc_connection *conn = &smc->conn; 99 union smc_host_cursor cons; 100 101 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 102 smc_rx_update_consumer(smc, cons, len); 103} 104 105struct smc_spd_priv { 106 struct smc_sock *smc; 107 size_t len; 108}; 109 110static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, 111 struct pipe_buffer *buf) 112{ 113 struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; 114 struct smc_sock *smc = priv->smc; 115 struct smc_connection *conn; 116 struct sock *sk = &smc->sk; 117 118 if (sk->sk_state == SMC_CLOSED || 119 sk->sk_state == SMC_PEERFINCLOSEWAIT || 120 sk->sk_state == SMC_APPFINCLOSEWAIT) 121 goto out; 122 conn = &smc->conn; 123 lock_sock(sk); 124 smc_rx_update_cons(smc, priv->len); 125 release_sock(sk); 126 if (atomic_sub_and_test(priv->len, &conn->splice_pending)) 127 smc_rx_wake_up(sk); 128out: 129 kfree(priv); 130 put_page(buf->page); 131 sock_put(sk); 132} 133 134static const struct pipe_buf_operations smc_pipe_ops = { 135 .release = smc_rx_pipe_buf_release, 136 .get = generic_pipe_buf_get 137}; 138 139static void smc_rx_spd_release(struct splice_pipe_desc *spd, 140 unsigned int i) 141{ 142 put_page(spd->pages[i]); 143} 144 145static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, 146 struct smc_sock *smc) 147{ 148 struct splice_pipe_desc spd; 149 struct partial_page partial; 150 struct smc_spd_priv *priv; 151 int bytes; 152 153 priv = kzalloc(sizeof(*priv), GFP_KERNEL); 154 if (!priv) 155 return -ENOMEM; 156 priv->len = len; 157 priv->smc = smc; 158 partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; 159 partial.len = len; 160 partial.private = (unsigned long)priv; 161 162 spd.nr_pages_max = 1; 163 spd.nr_pages = 1; 164 spd.pages = &smc->conn.rmb_desc->pages; 165 spd.partial = &partial; 166 spd.ops = &smc_pipe_ops; 167 spd.spd_release = smc_rx_spd_release; 168 169 bytes = splice_to_pipe(pipe, &spd); 170 if (bytes > 0) { 171 sock_hold(&smc->sk); 172 get_page(smc->conn.rmb_desc->pages); 173 atomic_add(bytes, &smc->conn.splice_pending); 174 } 175 176 return bytes; 177} 178 179static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) 180{ 181 return atomic_read(&conn->bytes_to_rcv) && 182 !atomic_read(&conn->splice_pending); 183} 184 185/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted 186 * @smc smc socket 187 * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout 188 * @fcrit add'l criterion to evaluate as function pointer 189 * Returns: 190 * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. 191 * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). 192 */ 193int smc_rx_wait(struct smc_sock *smc, long *timeo, 194 int (*fcrit)(struct smc_connection *conn)) 195{ 196 DEFINE_WAIT_FUNC(wait, woken_wake_function); 197 struct smc_connection *conn = &smc->conn; 198 struct smc_cdc_conn_state_flags *cflags = 199 &conn->local_tx_ctrl.conn_state_flags; 200 struct sock *sk = &smc->sk; 201 int rc; 202 203 if (fcrit(conn)) 204 return 1; 205 sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); 206 add_wait_queue(sk_sleep(sk), &wait); 207 rc = sk_wait_event(sk, timeo, 208 sk->sk_err || 209 cflags->peer_conn_abort || 210 sk->sk_shutdown & RCV_SHUTDOWN || 211 conn->killed || 212 fcrit(conn), 213 &wait); 214 remove_wait_queue(sk_sleep(sk), &wait); 215 sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); 216 return rc; 217} 218 219static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, 220 int flags) 221{ 222 struct smc_connection *conn = &smc->conn; 223 union smc_host_cursor cons; 224 struct sock *sk = &smc->sk; 225 int rc = 0; 226 227 if (sock_flag(sk, SOCK_URGINLINE) || 228 !(conn->urg_state == SMC_URG_VALID) || 229 conn->urg_state == SMC_URG_READ) 230 return -EINVAL; 231 232 SMC_STAT_INC(smc, urg_data_cnt); 233 if (conn->urg_state == SMC_URG_VALID) { 234 if (!(flags & MSG_PEEK)) 235 smc->conn.urg_state = SMC_URG_READ; 236 msg->msg_flags |= MSG_OOB; 237 if (len > 0) { 238 if (!(flags & MSG_TRUNC)) 239 rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); 240 len = 1; 241 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 242 if (smc_curs_diff(conn->rmb_desc->len, &cons, 243 &conn->urg_curs) > 1) 244 conn->urg_rx_skip_pend = true; 245 /* Urgent Byte was already accounted for, but trigger 246 * skipping the urgent byte in non-inline case 247 */ 248 if (!(flags & MSG_PEEK)) 249 smc_rx_update_consumer(smc, cons, 0); 250 } else { 251 msg->msg_flags |= MSG_TRUNC; 252 } 253 254 return rc ? -EFAULT : len; 255 } 256 257 if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) 258 return 0; 259 260 return -EAGAIN; 261} 262 263static bool smc_rx_recvmsg_data_available(struct smc_sock *smc) 264{ 265 struct smc_connection *conn = &smc->conn; 266 267 if (smc_rx_data_available(conn)) 268 return true; 269 else if (conn->urg_state == SMC_URG_VALID) 270 /* we received a single urgent Byte - skip */ 271 smc_rx_update_cons(smc, 0); 272 return false; 273} 274 275/* smc_rx_recvmsg - receive data from RMBE 276 * @msg: copy data to receive buffer 277 * @pipe: copy data to pipe if set - indicates splice() call 278 * 279 * rcvbuf consumer: main API called by socket layer. 280 * Called under sk lock. 281 */ 282int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, 283 struct pipe_inode_info *pipe, size_t len, int flags) 284{ 285 size_t copylen, read_done = 0, read_remaining = len; 286 size_t chunk_len, chunk_off, chunk_len_sum; 287 struct smc_connection *conn = &smc->conn; 288 int (*func)(struct smc_connection *conn); 289 union smc_host_cursor cons; 290 int readable, chunk; 291 char *rcvbuf_base; 292 struct sock *sk; 293 int splbytes; 294 long timeo; 295 int target; /* Read at least these many bytes */ 296 int rc; 297 298 if (unlikely(flags & MSG_ERRQUEUE)) 299 return -EINVAL; /* future work for sk.sk_family == AF_SMC */ 300 301 sk = &smc->sk; 302 if (sk->sk_state == SMC_LISTEN) 303 return -ENOTCONN; 304 if (flags & MSG_OOB) 305 return smc_rx_recv_urg(smc, msg, len, flags); 306 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 307 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 308 309 readable = atomic_read(&conn->bytes_to_rcv); 310 if (readable >= conn->rmb_desc->len) 311 SMC_STAT_RMB_RX_FULL(smc, !conn->lnk); 312 313 if (len < readable) 314 SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk); 315 /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ 316 rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr; 317 318 do { /* while (read_remaining) */ 319 if (read_done >= target || (pipe && read_done)) 320 break; 321 322 if (conn->killed) 323 break; 324 325 if (smc_rx_recvmsg_data_available(smc)) 326 goto copy; 327 328 if (sk->sk_shutdown & RCV_SHUTDOWN) { 329 /* smc_cdc_msg_recv_action() could have run after 330 * above smc_rx_recvmsg_data_available() 331 */ 332 if (smc_rx_recvmsg_data_available(smc)) 333 goto copy; 334 break; 335 } 336 337 if (read_done) { 338 if (sk->sk_err || 339 sk->sk_state == SMC_CLOSED || 340 !timeo || 341 signal_pending(current)) 342 break; 343 } else { 344 if (sk->sk_err) { 345 read_done = sock_error(sk); 346 break; 347 } 348 if (sk->sk_state == SMC_CLOSED) { 349 if (!sock_flag(sk, SOCK_DONE)) { 350 /* This occurs when user tries to read 351 * from never connected socket. 352 */ 353 read_done = -ENOTCONN; 354 break; 355 } 356 break; 357 } 358 if (!timeo) 359 return -EAGAIN; 360 if (signal_pending(current)) { 361 read_done = sock_intr_errno(timeo); 362 break; 363 } 364 } 365 366 if (!smc_rx_data_available(conn)) { 367 smc_rx_wait(smc, &timeo, smc_rx_data_available); 368 continue; 369 } 370 371copy: 372 /* initialize variables for 1st iteration of subsequent loop */ 373 /* could be just 1 byte, even after waiting on data above */ 374 readable = atomic_read(&conn->bytes_to_rcv); 375 splbytes = atomic_read(&conn->splice_pending); 376 if (!readable || (msg && splbytes)) { 377 if (splbytes) 378 func = smc_rx_data_available_and_no_splice_pend; 379 else 380 func = smc_rx_data_available; 381 smc_rx_wait(smc, &timeo, func); 382 continue; 383 } 384 385 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); 386 /* subsequent splice() calls pick up where previous left */ 387 if (splbytes) 388 smc_curs_add(conn->rmb_desc->len, &cons, splbytes); 389 if (conn->urg_state == SMC_URG_VALID && 390 sock_flag(&smc->sk, SOCK_URGINLINE) && 391 readable > 1) 392 readable--; /* always stop at urgent Byte */ 393 /* not more than what user space asked for */ 394 copylen = min_t(size_t, read_remaining, readable); 395 /* determine chunks where to read from rcvbuf */ 396 /* either unwrapped case, or 1st chunk of wrapped case */ 397 chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - 398 cons.count); 399 chunk_len_sum = chunk_len; 400 chunk_off = cons.count; 401 smc_rmb_sync_sg_for_cpu(conn); 402 for (chunk = 0; chunk < 2; chunk++) { 403 if (!(flags & MSG_TRUNC)) { 404 if (msg) { 405 rc = memcpy_to_msg(msg, rcvbuf_base + 406 chunk_off, 407 chunk_len); 408 } else { 409 rc = smc_rx_splice(pipe, rcvbuf_base + 410 chunk_off, chunk_len, 411 smc); 412 } 413 if (rc < 0) { 414 if (!read_done) 415 read_done = -EFAULT; 416 smc_rmb_sync_sg_for_device(conn); 417 goto out; 418 } 419 } 420 read_remaining -= chunk_len; 421 read_done += chunk_len; 422 423 if (chunk_len_sum == copylen) 424 break; /* either on 1st or 2nd iteration */ 425 /* prepare next (== 2nd) iteration */ 426 chunk_len = copylen - chunk_len; /* remainder */ 427 chunk_len_sum += chunk_len; 428 chunk_off = 0; /* modulo offset in recv ring buffer */ 429 } 430 smc_rmb_sync_sg_for_device(conn); 431 432 /* update cursors */ 433 if (!(flags & MSG_PEEK)) { 434 /* increased in recv tasklet smc_cdc_msg_rcv() */ 435 smp_mb__before_atomic(); 436 atomic_sub(copylen, &conn->bytes_to_rcv); 437 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 438 smp_mb__after_atomic(); 439 if (msg && smc_rx_update_consumer(smc, cons, copylen)) 440 goto out; 441 } 442 443 trace_smc_rx_recvmsg(smc, copylen); 444 } while (read_remaining); 445out: 446 return read_done; 447} 448 449/* Initialize receive properties on connection establishment. NB: not __init! */ 450void smc_rx_init(struct smc_sock *smc) 451{ 452 smc->sk.sk_data_ready = smc_rx_wake_up; 453 atomic_set(&smc->conn.splice_pending, 0); 454 smc->conn.urg_state = SMC_URG_READ; 455}
