cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

smbdirect.c (73348B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 *   Copyright (C) 2017, Microsoft Corporation.
      4 *
      5 *   Author(s): Long Li <longli@microsoft.com>
      6 */
      7#include <linux/module.h>
      8#include <linux/highmem.h>
      9#include "smbdirect.h"
     10#include "cifs_debug.h"
     11#include "cifsproto.h"
     12#include "smb2proto.h"
     13
     14static struct smbd_response *get_empty_queue_buffer(
     15		struct smbd_connection *info);
     16static struct smbd_response *get_receive_buffer(
     17		struct smbd_connection *info);
     18static void put_receive_buffer(
     19		struct smbd_connection *info,
     20		struct smbd_response *response);
     21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
     22static void destroy_receive_buffers(struct smbd_connection *info);
     23
     24static void put_empty_packet(
     25		struct smbd_connection *info, struct smbd_response *response);
     26static void enqueue_reassembly(
     27		struct smbd_connection *info,
     28		struct smbd_response *response, int data_length);
     29static struct smbd_response *_get_first_reassembly(
     30		struct smbd_connection *info);
     31
     32static int smbd_post_recv(
     33		struct smbd_connection *info,
     34		struct smbd_response *response);
     35
     36static int smbd_post_send_empty(struct smbd_connection *info);
     37static int smbd_post_send_data(
     38		struct smbd_connection *info,
     39		struct kvec *iov, int n_vec, int remaining_data_length);
     40static int smbd_post_send_page(struct smbd_connection *info,
     41		struct page *page, unsigned long offset,
     42		size_t size, int remaining_data_length);
     43
     44static void destroy_mr_list(struct smbd_connection *info);
     45static int allocate_mr_list(struct smbd_connection *info);
     46
     47/* SMBD version number */
     48#define SMBD_V1	0x0100
     49
     50/* Port numbers for SMBD transport */
     51#define SMB_PORT	445
     52#define SMBD_PORT	5445
     53
     54/* Address lookup and resolve timeout in ms */
     55#define RDMA_RESOLVE_TIMEOUT	5000
     56
     57/* SMBD negotiation timeout in seconds */
     58#define SMBD_NEGOTIATE_TIMEOUT	120
     59
     60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
     61#define SMBD_MIN_RECEIVE_SIZE		128
     62#define SMBD_MIN_FRAGMENTED_SIZE	131072
     63
     64/*
     65 * Default maximum number of RDMA read/write outstanding on this connection
     66 * This value is possibly decreased during QP creation on hardware limit
     67 */
     68#define SMBD_CM_RESPONDER_RESOURCES	32
     69
     70/* Maximum number of retries on data transfer operations */
     71#define SMBD_CM_RETRY			6
     72/* No need to retry on Receiver Not Ready since SMBD manages credits */
     73#define SMBD_CM_RNR_RETRY		0
     74
     75/*
     76 * User configurable initial values per SMBD transport connection
     77 * as defined in [MS-SMBD] 3.1.1.1
     78 * Those may change after a SMBD negotiation
     79 */
     80/* The local peer's maximum number of credits to grant to the peer */
     81int smbd_receive_credit_max = 255;
     82
     83/* The remote peer's credit request of local peer */
     84int smbd_send_credit_target = 255;
     85
     86/* The maximum single message size can be sent to remote peer */
     87int smbd_max_send_size = 1364;
     88
     89/*  The maximum fragmented upper-layer payload receive size supported */
     90int smbd_max_fragmented_recv_size = 1024 * 1024;
     91
     92/*  The maximum single-message size which can be received */
     93int smbd_max_receive_size = 8192;
     94
     95/* The timeout to initiate send of a keepalive message on idle */
     96int smbd_keep_alive_interval = 120;
     97
     98/*
     99 * User configurable initial values for RDMA transport
    100 * The actual values used may be lower and are limited to hardware capabilities
    101 */
    102/* Default maximum number of SGEs in a RDMA write/read */
    103int smbd_max_frmr_depth = 2048;
    104
    105/* If payload is less than this byte, use RDMA send/recv not read/write */
    106int rdma_readwrite_threshold = 4096;
    107
    108/* Transport logging functions
    109 * Logging are defined as classes. They can be OR'ed to define the actual
    110 * logging level via module parameter smbd_logging_class
    111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
    112 * log_rdma_event()
    113 */
    114#define LOG_OUTGOING			0x1
    115#define LOG_INCOMING			0x2
    116#define LOG_READ			0x4
    117#define LOG_WRITE			0x8
    118#define LOG_RDMA_SEND			0x10
    119#define LOG_RDMA_RECV			0x20
    120#define LOG_KEEP_ALIVE			0x40
    121#define LOG_RDMA_EVENT			0x80
    122#define LOG_RDMA_MR			0x100
    123static unsigned int smbd_logging_class;
    124module_param(smbd_logging_class, uint, 0644);
    125MODULE_PARM_DESC(smbd_logging_class,
    126	"Logging class for SMBD transport 0x0 to 0x100");
    127
    128#define ERR		0x0
    129#define INFO		0x1
    130static unsigned int smbd_logging_level = ERR;
    131module_param(smbd_logging_level, uint, 0644);
    132MODULE_PARM_DESC(smbd_logging_level,
    133	"Logging level for SMBD transport, 0 (default): error, 1: info");
    134
    135#define log_rdma(level, class, fmt, args...)				\
    136do {									\
    137	if (level <= smbd_logging_level || class & smbd_logging_class)	\
    138		cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
    139} while (0)
    140
    141#define log_outgoing(level, fmt, args...) \
    142		log_rdma(level, LOG_OUTGOING, fmt, ##args)
    143#define log_incoming(level, fmt, args...) \
    144		log_rdma(level, LOG_INCOMING, fmt, ##args)
    145#define log_read(level, fmt, args...)	log_rdma(level, LOG_READ, fmt, ##args)
    146#define log_write(level, fmt, args...)	log_rdma(level, LOG_WRITE, fmt, ##args)
    147#define log_rdma_send(level, fmt, args...) \
    148		log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
    149#define log_rdma_recv(level, fmt, args...) \
    150		log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
    151#define log_keep_alive(level, fmt, args...) \
    152		log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
    153#define log_rdma_event(level, fmt, args...) \
    154		log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
    155#define log_rdma_mr(level, fmt, args...) \
    156		log_rdma(level, LOG_RDMA_MR, fmt, ##args)
    157
    158static void smbd_disconnect_rdma_work(struct work_struct *work)
    159{
    160	struct smbd_connection *info =
    161		container_of(work, struct smbd_connection, disconnect_work);
    162
    163	if (info->transport_status == SMBD_CONNECTED) {
    164		info->transport_status = SMBD_DISCONNECTING;
    165		rdma_disconnect(info->id);
    166	}
    167}
    168
    169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
    170{
    171	queue_work(info->workqueue, &info->disconnect_work);
    172}
    173
    174/* Upcall from RDMA CM */
    175static int smbd_conn_upcall(
    176		struct rdma_cm_id *id, struct rdma_cm_event *event)
    177{
    178	struct smbd_connection *info = id->context;
    179
    180	log_rdma_event(INFO, "event=%d status=%d\n",
    181		event->event, event->status);
    182
    183	switch (event->event) {
    184	case RDMA_CM_EVENT_ADDR_RESOLVED:
    185	case RDMA_CM_EVENT_ROUTE_RESOLVED:
    186		info->ri_rc = 0;
    187		complete(&info->ri_done);
    188		break;
    189
    190	case RDMA_CM_EVENT_ADDR_ERROR:
    191		info->ri_rc = -EHOSTUNREACH;
    192		complete(&info->ri_done);
    193		break;
    194
    195	case RDMA_CM_EVENT_ROUTE_ERROR:
    196		info->ri_rc = -ENETUNREACH;
    197		complete(&info->ri_done);
    198		break;
    199
    200	case RDMA_CM_EVENT_ESTABLISHED:
    201		log_rdma_event(INFO, "connected event=%d\n", event->event);
    202		info->transport_status = SMBD_CONNECTED;
    203		wake_up_interruptible(&info->conn_wait);
    204		break;
    205
    206	case RDMA_CM_EVENT_CONNECT_ERROR:
    207	case RDMA_CM_EVENT_UNREACHABLE:
    208	case RDMA_CM_EVENT_REJECTED:
    209		log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
    210		info->transport_status = SMBD_DISCONNECTED;
    211		wake_up_interruptible(&info->conn_wait);
    212		break;
    213
    214	case RDMA_CM_EVENT_DEVICE_REMOVAL:
    215	case RDMA_CM_EVENT_DISCONNECTED:
    216		/* This happenes when we fail the negotiation */
    217		if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
    218			info->transport_status = SMBD_DISCONNECTED;
    219			wake_up(&info->conn_wait);
    220			break;
    221		}
    222
    223		info->transport_status = SMBD_DISCONNECTED;
    224		wake_up_interruptible(&info->disconn_wait);
    225		wake_up_interruptible(&info->wait_reassembly_queue);
    226		wake_up_interruptible_all(&info->wait_send_queue);
    227		break;
    228
    229	default:
    230		break;
    231	}
    232
    233	return 0;
    234}
    235
    236/* Upcall from RDMA QP */
    237static void
    238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
    239{
    240	struct smbd_connection *info = context;
    241
    242	log_rdma_event(ERR, "%s on device %s info %p\n",
    243		ib_event_msg(event->event), event->device->name, info);
    244
    245	switch (event->event) {
    246	case IB_EVENT_CQ_ERR:
    247	case IB_EVENT_QP_FATAL:
    248		smbd_disconnect_rdma_connection(info);
    249		break;
    250
    251	default:
    252		break;
    253	}
    254}
    255
    256static inline void *smbd_request_payload(struct smbd_request *request)
    257{
    258	return (void *)request->packet;
    259}
    260
    261static inline void *smbd_response_payload(struct smbd_response *response)
    262{
    263	return (void *)response->packet;
    264}
    265
    266/* Called when a RDMA send is done */
    267static void send_done(struct ib_cq *cq, struct ib_wc *wc)
    268{
    269	int i;
    270	struct smbd_request *request =
    271		container_of(wc->wr_cqe, struct smbd_request, cqe);
    272
    273	log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
    274		request, wc->status);
    275
    276	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
    277		log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
    278			wc->status, wc->opcode);
    279		smbd_disconnect_rdma_connection(request->info);
    280	}
    281
    282	for (i = 0; i < request->num_sge; i++)
    283		ib_dma_unmap_single(request->info->id->device,
    284			request->sge[i].addr,
    285			request->sge[i].length,
    286			DMA_TO_DEVICE);
    287
    288	if (atomic_dec_and_test(&request->info->send_pending))
    289		wake_up(&request->info->wait_send_pending);
    290
    291	wake_up(&request->info->wait_post_send);
    292
    293	mempool_free(request, request->info->request_mempool);
    294}
    295
    296static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
    297{
    298	log_rdma_event(INFO, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
    299		       resp->min_version, resp->max_version,
    300		       resp->negotiated_version, resp->credits_requested,
    301		       resp->credits_granted, resp->status,
    302		       resp->max_readwrite_size, resp->preferred_send_size,
    303		       resp->max_receive_size, resp->max_fragmented_size);
    304}
    305
    306/*
    307 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
    308 * response, packet_length: the negotiation response message
    309 * return value: true if negotiation is a success, false if failed
    310 */
    311static bool process_negotiation_response(
    312		struct smbd_response *response, int packet_length)
    313{
    314	struct smbd_connection *info = response->info;
    315	struct smbd_negotiate_resp *packet = smbd_response_payload(response);
    316
    317	if (packet_length < sizeof(struct smbd_negotiate_resp)) {
    318		log_rdma_event(ERR,
    319			"error: packet_length=%d\n", packet_length);
    320		return false;
    321	}
    322
    323	if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
    324		log_rdma_event(ERR, "error: negotiated_version=%x\n",
    325			le16_to_cpu(packet->negotiated_version));
    326		return false;
    327	}
    328	info->protocol = le16_to_cpu(packet->negotiated_version);
    329
    330	if (packet->credits_requested == 0) {
    331		log_rdma_event(ERR, "error: credits_requested==0\n");
    332		return false;
    333	}
    334	info->receive_credit_target = le16_to_cpu(packet->credits_requested);
    335
    336	if (packet->credits_granted == 0) {
    337		log_rdma_event(ERR, "error: credits_granted==0\n");
    338		return false;
    339	}
    340	atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
    341
    342	atomic_set(&info->receive_credits, 0);
    343
    344	if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
    345		log_rdma_event(ERR, "error: preferred_send_size=%d\n",
    346			le32_to_cpu(packet->preferred_send_size));
    347		return false;
    348	}
    349	info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
    350
    351	if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
    352		log_rdma_event(ERR, "error: max_receive_size=%d\n",
    353			le32_to_cpu(packet->max_receive_size));
    354		return false;
    355	}
    356	info->max_send_size = min_t(int, info->max_send_size,
    357					le32_to_cpu(packet->max_receive_size));
    358
    359	if (le32_to_cpu(packet->max_fragmented_size) <
    360			SMBD_MIN_FRAGMENTED_SIZE) {
    361		log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
    362			le32_to_cpu(packet->max_fragmented_size));
    363		return false;
    364	}
    365	info->max_fragmented_send_size =
    366		le32_to_cpu(packet->max_fragmented_size);
    367	info->rdma_readwrite_threshold =
    368		rdma_readwrite_threshold > info->max_fragmented_send_size ?
    369		info->max_fragmented_send_size :
    370		rdma_readwrite_threshold;
    371
    372
    373	info->max_readwrite_size = min_t(u32,
    374			le32_to_cpu(packet->max_readwrite_size),
    375			info->max_frmr_depth * PAGE_SIZE);
    376	info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
    377
    378	return true;
    379}
    380
    381static void smbd_post_send_credits(struct work_struct *work)
    382{
    383	int ret = 0;
    384	int use_receive_queue = 1;
    385	int rc;
    386	struct smbd_response *response;
    387	struct smbd_connection *info =
    388		container_of(work, struct smbd_connection,
    389			post_send_credits_work);
    390
    391	if (info->transport_status != SMBD_CONNECTED) {
    392		wake_up(&info->wait_receive_queues);
    393		return;
    394	}
    395
    396	if (info->receive_credit_target >
    397		atomic_read(&info->receive_credits)) {
    398		while (true) {
    399			if (use_receive_queue)
    400				response = get_receive_buffer(info);
    401			else
    402				response = get_empty_queue_buffer(info);
    403			if (!response) {
    404				/* now switch to emtpy packet queue */
    405				if (use_receive_queue) {
    406					use_receive_queue = 0;
    407					continue;
    408				} else
    409					break;
    410			}
    411
    412			response->type = SMBD_TRANSFER_DATA;
    413			response->first_segment = false;
    414			rc = smbd_post_recv(info, response);
    415			if (rc) {
    416				log_rdma_recv(ERR,
    417					"post_recv failed rc=%d\n", rc);
    418				put_receive_buffer(info, response);
    419				break;
    420			}
    421
    422			ret++;
    423		}
    424	}
    425
    426	spin_lock(&info->lock_new_credits_offered);
    427	info->new_credits_offered += ret;
    428	spin_unlock(&info->lock_new_credits_offered);
    429
    430	/* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
    431	info->send_immediate = true;
    432	if (atomic_read(&info->receive_credits) <
    433		info->receive_credit_target - 1) {
    434		if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
    435		    info->send_immediate) {
    436			log_keep_alive(INFO, "send an empty message\n");
    437			smbd_post_send_empty(info);
    438		}
    439	}
    440}
    441
    442/* Called from softirq, when recv is done */
    443static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
    444{
    445	struct smbd_data_transfer *data_transfer;
    446	struct smbd_response *response =
    447		container_of(wc->wr_cqe, struct smbd_response, cqe);
    448	struct smbd_connection *info = response->info;
    449	int data_length = 0;
    450
    451	log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
    452		      response, response->type, wc->status, wc->opcode,
    453		      wc->byte_len, wc->pkey_index);
    454
    455	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
    456		log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
    457			wc->status, wc->opcode);
    458		smbd_disconnect_rdma_connection(info);
    459		goto error;
    460	}
    461
    462	ib_dma_sync_single_for_cpu(
    463		wc->qp->device,
    464		response->sge.addr,
    465		response->sge.length,
    466		DMA_FROM_DEVICE);
    467
    468	switch (response->type) {
    469	/* SMBD negotiation response */
    470	case SMBD_NEGOTIATE_RESP:
    471		dump_smbd_negotiate_resp(smbd_response_payload(response));
    472		info->full_packet_received = true;
    473		info->negotiate_done =
    474			process_negotiation_response(response, wc->byte_len);
    475		complete(&info->negotiate_completion);
    476		break;
    477
    478	/* SMBD data transfer packet */
    479	case SMBD_TRANSFER_DATA:
    480		data_transfer = smbd_response_payload(response);
    481		data_length = le32_to_cpu(data_transfer->data_length);
    482
    483		/*
    484		 * If this is a packet with data playload place the data in
    485		 * reassembly queue and wake up the reading thread
    486		 */
    487		if (data_length) {
    488			if (info->full_packet_received)
    489				response->first_segment = true;
    490
    491			if (le32_to_cpu(data_transfer->remaining_data_length))
    492				info->full_packet_received = false;
    493			else
    494				info->full_packet_received = true;
    495
    496			enqueue_reassembly(
    497				info,
    498				response,
    499				data_length);
    500		} else
    501			put_empty_packet(info, response);
    502
    503		if (data_length)
    504			wake_up_interruptible(&info->wait_reassembly_queue);
    505
    506		atomic_dec(&info->receive_credits);
    507		info->receive_credit_target =
    508			le16_to_cpu(data_transfer->credits_requested);
    509		if (le16_to_cpu(data_transfer->credits_granted)) {
    510			atomic_add(le16_to_cpu(data_transfer->credits_granted),
    511				&info->send_credits);
    512			/*
    513			 * We have new send credits granted from remote peer
    514			 * If any sender is waiting for credits, unblock it
    515			 */
    516			wake_up_interruptible(&info->wait_send_queue);
    517		}
    518
    519		log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
    520			     le16_to_cpu(data_transfer->flags),
    521			     le32_to_cpu(data_transfer->data_offset),
    522			     le32_to_cpu(data_transfer->data_length),
    523			     le32_to_cpu(data_transfer->remaining_data_length));
    524
    525		/* Send a KEEP_ALIVE response right away if requested */
    526		info->keep_alive_requested = KEEP_ALIVE_NONE;
    527		if (le16_to_cpu(data_transfer->flags) &
    528				SMB_DIRECT_RESPONSE_REQUESTED) {
    529			info->keep_alive_requested = KEEP_ALIVE_PENDING;
    530		}
    531
    532		return;
    533
    534	default:
    535		log_rdma_recv(ERR,
    536			"unexpected response type=%d\n", response->type);
    537	}
    538
    539error:
    540	put_receive_buffer(info, response);
    541}
    542
    543static struct rdma_cm_id *smbd_create_id(
    544		struct smbd_connection *info,
    545		struct sockaddr *dstaddr, int port)
    546{
    547	struct rdma_cm_id *id;
    548	int rc;
    549	__be16 *sport;
    550
    551	id = rdma_create_id(&init_net, smbd_conn_upcall, info,
    552		RDMA_PS_TCP, IB_QPT_RC);
    553	if (IS_ERR(id)) {
    554		rc = PTR_ERR(id);
    555		log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
    556		return id;
    557	}
    558
    559	if (dstaddr->sa_family == AF_INET6)
    560		sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
    561	else
    562		sport = &((struct sockaddr_in *)dstaddr)->sin_port;
    563
    564	*sport = htons(port);
    565
    566	init_completion(&info->ri_done);
    567	info->ri_rc = -ETIMEDOUT;
    568
    569	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
    570		RDMA_RESOLVE_TIMEOUT);
    571	if (rc) {
    572		log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
    573		goto out;
    574	}
    575	rc = wait_for_completion_interruptible_timeout(
    576		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
    577	/* e.g. if interrupted returns -ERESTARTSYS */
    578	if (rc < 0) {
    579		log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
    580		goto out;
    581	}
    582	rc = info->ri_rc;
    583	if (rc) {
    584		log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
    585		goto out;
    586	}
    587
    588	info->ri_rc = -ETIMEDOUT;
    589	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
    590	if (rc) {
    591		log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
    592		goto out;
    593	}
    594	rc = wait_for_completion_interruptible_timeout(
    595		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
    596	/* e.g. if interrupted returns -ERESTARTSYS */
    597	if (rc < 0)  {
    598		log_rdma_event(ERR, "rdma_resolve_addr timeout rc: %i\n", rc);
    599		goto out;
    600	}
    601	rc = info->ri_rc;
    602	if (rc) {
    603		log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
    604		goto out;
    605	}
    606
    607	return id;
    608
    609out:
    610	rdma_destroy_id(id);
    611	return ERR_PTR(rc);
    612}
    613
    614/*
    615 * Test if FRWR (Fast Registration Work Requests) is supported on the device
    616 * This implementation requries FRWR on RDMA read/write
    617 * return value: true if it is supported
    618 */
    619static bool frwr_is_supported(struct ib_device_attr *attrs)
    620{
    621	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
    622		return false;
    623	if (attrs->max_fast_reg_page_list_len == 0)
    624		return false;
    625	return true;
    626}
    627
    628static int smbd_ia_open(
    629		struct smbd_connection *info,
    630		struct sockaddr *dstaddr, int port)
    631{
    632	int rc;
    633
    634	info->id = smbd_create_id(info, dstaddr, port);
    635	if (IS_ERR(info->id)) {
    636		rc = PTR_ERR(info->id);
    637		goto out1;
    638	}
    639
    640	if (!frwr_is_supported(&info->id->device->attrs)) {
    641		log_rdma_event(ERR, "Fast Registration Work Requests (FRWR) is not supported\n");
    642		log_rdma_event(ERR, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
    643			       info->id->device->attrs.device_cap_flags,
    644			       info->id->device->attrs.max_fast_reg_page_list_len);
    645		rc = -EPROTONOSUPPORT;
    646		goto out2;
    647	}
    648	info->max_frmr_depth = min_t(int,
    649		smbd_max_frmr_depth,
    650		info->id->device->attrs.max_fast_reg_page_list_len);
    651	info->mr_type = IB_MR_TYPE_MEM_REG;
    652	if (info->id->device->attrs.kernel_cap_flags & IBK_SG_GAPS_REG)
    653		info->mr_type = IB_MR_TYPE_SG_GAPS;
    654
    655	info->pd = ib_alloc_pd(info->id->device, 0);
    656	if (IS_ERR(info->pd)) {
    657		rc = PTR_ERR(info->pd);
    658		log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
    659		goto out2;
    660	}
    661
    662	return 0;
    663
    664out2:
    665	rdma_destroy_id(info->id);
    666	info->id = NULL;
    667
    668out1:
    669	return rc;
    670}
    671
    672/*
    673 * Send a negotiation request message to the peer
    674 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
    675 * After negotiation, the transport is connected and ready for
    676 * carrying upper layer SMB payload
    677 */
    678static int smbd_post_send_negotiate_req(struct smbd_connection *info)
    679{
    680	struct ib_send_wr send_wr;
    681	int rc = -ENOMEM;
    682	struct smbd_request *request;
    683	struct smbd_negotiate_req *packet;
    684
    685	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
    686	if (!request)
    687		return rc;
    688
    689	request->info = info;
    690
    691	packet = smbd_request_payload(request);
    692	packet->min_version = cpu_to_le16(SMBD_V1);
    693	packet->max_version = cpu_to_le16(SMBD_V1);
    694	packet->reserved = 0;
    695	packet->credits_requested = cpu_to_le16(info->send_credit_target);
    696	packet->preferred_send_size = cpu_to_le32(info->max_send_size);
    697	packet->max_receive_size = cpu_to_le32(info->max_receive_size);
    698	packet->max_fragmented_size =
    699		cpu_to_le32(info->max_fragmented_recv_size);
    700
    701	request->num_sge = 1;
    702	request->sge[0].addr = ib_dma_map_single(
    703				info->id->device, (void *)packet,
    704				sizeof(*packet), DMA_TO_DEVICE);
    705	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
    706		rc = -EIO;
    707		goto dma_mapping_failed;
    708	}
    709
    710	request->sge[0].length = sizeof(*packet);
    711	request->sge[0].lkey = info->pd->local_dma_lkey;
    712
    713	ib_dma_sync_single_for_device(
    714		info->id->device, request->sge[0].addr,
    715		request->sge[0].length, DMA_TO_DEVICE);
    716
    717	request->cqe.done = send_done;
    718
    719	send_wr.next = NULL;
    720	send_wr.wr_cqe = &request->cqe;
    721	send_wr.sg_list = request->sge;
    722	send_wr.num_sge = request->num_sge;
    723	send_wr.opcode = IB_WR_SEND;
    724	send_wr.send_flags = IB_SEND_SIGNALED;
    725
    726	log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
    727		request->sge[0].addr,
    728		request->sge[0].length, request->sge[0].lkey);
    729
    730	atomic_inc(&info->send_pending);
    731	rc = ib_post_send(info->id->qp, &send_wr, NULL);
    732	if (!rc)
    733		return 0;
    734
    735	/* if we reach here, post send failed */
    736	log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
    737	atomic_dec(&info->send_pending);
    738	ib_dma_unmap_single(info->id->device, request->sge[0].addr,
    739		request->sge[0].length, DMA_TO_DEVICE);
    740
    741	smbd_disconnect_rdma_connection(info);
    742
    743dma_mapping_failed:
    744	mempool_free(request, info->request_mempool);
    745	return rc;
    746}
    747
    748/*
    749 * Extend the credits to remote peer
    750 * This implements [MS-SMBD] 3.1.5.9
    751 * The idea is that we should extend credits to remote peer as quickly as
    752 * it's allowed, to maintain data flow. We allocate as much receive
    753 * buffer as possible, and extend the receive credits to remote peer
    754 * return value: the new credtis being granted.
    755 */
    756static int manage_credits_prior_sending(struct smbd_connection *info)
    757{
    758	int new_credits;
    759
    760	spin_lock(&info->lock_new_credits_offered);
    761	new_credits = info->new_credits_offered;
    762	info->new_credits_offered = 0;
    763	spin_unlock(&info->lock_new_credits_offered);
    764
    765	return new_credits;
    766}
    767
    768/*
    769 * Check if we need to send a KEEP_ALIVE message
    770 * The idle connection timer triggers a KEEP_ALIVE message when expires
    771 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
    772 * back a response.
    773 * return value:
    774 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
    775 * 0: otherwise
    776 */
    777static int manage_keep_alive_before_sending(struct smbd_connection *info)
    778{
    779	if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
    780		info->keep_alive_requested = KEEP_ALIVE_SENT;
    781		return 1;
    782	}
    783	return 0;
    784}
    785
    786/* Post the send request */
    787static int smbd_post_send(struct smbd_connection *info,
    788		struct smbd_request *request)
    789{
    790	struct ib_send_wr send_wr;
    791	int rc, i;
    792
    793	for (i = 0; i < request->num_sge; i++) {
    794		log_rdma_send(INFO,
    795			"rdma_request sge[%d] addr=%llu length=%u\n",
    796			i, request->sge[i].addr, request->sge[i].length);
    797		ib_dma_sync_single_for_device(
    798			info->id->device,
    799			request->sge[i].addr,
    800			request->sge[i].length,
    801			DMA_TO_DEVICE);
    802	}
    803
    804	request->cqe.done = send_done;
    805
    806	send_wr.next = NULL;
    807	send_wr.wr_cqe = &request->cqe;
    808	send_wr.sg_list = request->sge;
    809	send_wr.num_sge = request->num_sge;
    810	send_wr.opcode = IB_WR_SEND;
    811	send_wr.send_flags = IB_SEND_SIGNALED;
    812
    813	rc = ib_post_send(info->id->qp, &send_wr, NULL);
    814	if (rc) {
    815		log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
    816		smbd_disconnect_rdma_connection(info);
    817		rc = -EAGAIN;
    818	} else
    819		/* Reset timer for idle connection after packet is sent */
    820		mod_delayed_work(info->workqueue, &info->idle_timer_work,
    821			info->keep_alive_interval*HZ);
    822
    823	return rc;
    824}
    825
    826static int smbd_post_send_sgl(struct smbd_connection *info,
    827	struct scatterlist *sgl, int data_length, int remaining_data_length)
    828{
    829	int num_sgs;
    830	int i, rc;
    831	int header_length;
    832	struct smbd_request *request;
    833	struct smbd_data_transfer *packet;
    834	int new_credits;
    835	struct scatterlist *sg;
    836
    837wait_credit:
    838	/* Wait for send credits. A SMBD packet needs one credit */
    839	rc = wait_event_interruptible(info->wait_send_queue,
    840		atomic_read(&info->send_credits) > 0 ||
    841		info->transport_status != SMBD_CONNECTED);
    842	if (rc)
    843		goto err_wait_credit;
    844
    845	if (info->transport_status != SMBD_CONNECTED) {
    846		log_outgoing(ERR, "disconnected not sending on wait_credit\n");
    847		rc = -EAGAIN;
    848		goto err_wait_credit;
    849	}
    850	if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
    851		atomic_inc(&info->send_credits);
    852		goto wait_credit;
    853	}
    854
    855wait_send_queue:
    856	wait_event(info->wait_post_send,
    857		atomic_read(&info->send_pending) < info->send_credit_target ||
    858		info->transport_status != SMBD_CONNECTED);
    859
    860	if (info->transport_status != SMBD_CONNECTED) {
    861		log_outgoing(ERR, "disconnected not sending on wait_send_queue\n");
    862		rc = -EAGAIN;
    863		goto err_wait_send_queue;
    864	}
    865
    866	if (unlikely(atomic_inc_return(&info->send_pending) >
    867				info->send_credit_target)) {
    868		atomic_dec(&info->send_pending);
    869		goto wait_send_queue;
    870	}
    871
    872	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
    873	if (!request) {
    874		rc = -ENOMEM;
    875		goto err_alloc;
    876	}
    877
    878	request->info = info;
    879
    880	/* Fill in the packet header */
    881	packet = smbd_request_payload(request);
    882	packet->credits_requested = cpu_to_le16(info->send_credit_target);
    883
    884	new_credits = manage_credits_prior_sending(info);
    885	atomic_add(new_credits, &info->receive_credits);
    886	packet->credits_granted = cpu_to_le16(new_credits);
    887
    888	info->send_immediate = false;
    889
    890	packet->flags = 0;
    891	if (manage_keep_alive_before_sending(info))
    892		packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
    893
    894	packet->reserved = 0;
    895	if (!data_length)
    896		packet->data_offset = 0;
    897	else
    898		packet->data_offset = cpu_to_le32(24);
    899	packet->data_length = cpu_to_le32(data_length);
    900	packet->remaining_data_length = cpu_to_le32(remaining_data_length);
    901	packet->padding = 0;
    902
    903	log_outgoing(INFO, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
    904		     le16_to_cpu(packet->credits_requested),
    905		     le16_to_cpu(packet->credits_granted),
    906		     le32_to_cpu(packet->data_offset),
    907		     le32_to_cpu(packet->data_length),
    908		     le32_to_cpu(packet->remaining_data_length));
    909
    910	/* Map the packet to DMA */
    911	header_length = sizeof(struct smbd_data_transfer);
    912	/* If this is a packet without payload, don't send padding */
    913	if (!data_length)
    914		header_length = offsetof(struct smbd_data_transfer, padding);
    915
    916	request->num_sge = 1;
    917	request->sge[0].addr = ib_dma_map_single(info->id->device,
    918						 (void *)packet,
    919						 header_length,
    920						 DMA_TO_DEVICE);
    921	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
    922		rc = -EIO;
    923		request->sge[0].addr = 0;
    924		goto err_dma;
    925	}
    926
    927	request->sge[0].length = header_length;
    928	request->sge[0].lkey = info->pd->local_dma_lkey;
    929
    930	/* Fill in the packet data payload */
    931	num_sgs = sgl ? sg_nents(sgl) : 0;
    932	for_each_sg(sgl, sg, num_sgs, i) {
    933		request->sge[i+1].addr =
    934			ib_dma_map_page(info->id->device, sg_page(sg),
    935			       sg->offset, sg->length, DMA_TO_DEVICE);
    936		if (ib_dma_mapping_error(
    937				info->id->device, request->sge[i+1].addr)) {
    938			rc = -EIO;
    939			request->sge[i+1].addr = 0;
    940			goto err_dma;
    941		}
    942		request->sge[i+1].length = sg->length;
    943		request->sge[i+1].lkey = info->pd->local_dma_lkey;
    944		request->num_sge++;
    945	}
    946
    947	rc = smbd_post_send(info, request);
    948	if (!rc)
    949		return 0;
    950
    951err_dma:
    952	for (i = 0; i < request->num_sge; i++)
    953		if (request->sge[i].addr)
    954			ib_dma_unmap_single(info->id->device,
    955					    request->sge[i].addr,
    956					    request->sge[i].length,
    957					    DMA_TO_DEVICE);
    958	mempool_free(request, info->request_mempool);
    959
    960	/* roll back receive credits and credits to be offered */
    961	spin_lock(&info->lock_new_credits_offered);
    962	info->new_credits_offered += new_credits;
    963	spin_unlock(&info->lock_new_credits_offered);
    964	atomic_sub(new_credits, &info->receive_credits);
    965
    966err_alloc:
    967	if (atomic_dec_and_test(&info->send_pending))
    968		wake_up(&info->wait_send_pending);
    969
    970err_wait_send_queue:
    971	/* roll back send credits and pending */
    972	atomic_inc(&info->send_credits);
    973
    974err_wait_credit:
    975	return rc;
    976}
    977
    978/*
    979 * Send a page
    980 * page: the page to send
    981 * offset: offset in the page to send
    982 * size: length in the page to send
    983 * remaining_data_length: remaining data to send in this payload
    984 */
    985static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
    986		unsigned long offset, size_t size, int remaining_data_length)
    987{
    988	struct scatterlist sgl;
    989
    990	sg_init_table(&sgl, 1);
    991	sg_set_page(&sgl, page, size, offset);
    992
    993	return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
    994}
    995
    996/*
    997 * Send an empty message
    998 * Empty message is used to extend credits to peer to for keep live
    999 * while there is no upper layer payload to send at the time
   1000 */
   1001static int smbd_post_send_empty(struct smbd_connection *info)
   1002{
   1003	info->count_send_empty++;
   1004	return smbd_post_send_sgl(info, NULL, 0, 0);
   1005}
   1006
   1007/*
   1008 * Send a data buffer
   1009 * iov: the iov array describing the data buffers
   1010 * n_vec: number of iov array
   1011 * remaining_data_length: remaining data to send following this packet
   1012 * in segmented SMBD packet
   1013 */
   1014static int smbd_post_send_data(
   1015	struct smbd_connection *info, struct kvec *iov, int n_vec,
   1016	int remaining_data_length)
   1017{
   1018	int i;
   1019	u32 data_length = 0;
   1020	struct scatterlist sgl[SMBDIRECT_MAX_SGE];
   1021
   1022	if (n_vec > SMBDIRECT_MAX_SGE) {
   1023		cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
   1024		return -EINVAL;
   1025	}
   1026
   1027	sg_init_table(sgl, n_vec);
   1028	for (i = 0; i < n_vec; i++) {
   1029		data_length += iov[i].iov_len;
   1030		sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
   1031	}
   1032
   1033	return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
   1034}
   1035
   1036/*
   1037 * Post a receive request to the transport
   1038 * The remote peer can only send data when a receive request is posted
   1039 * The interaction is controlled by send/receive credit system
   1040 */
   1041static int smbd_post_recv(
   1042		struct smbd_connection *info, struct smbd_response *response)
   1043{
   1044	struct ib_recv_wr recv_wr;
   1045	int rc = -EIO;
   1046
   1047	response->sge.addr = ib_dma_map_single(
   1048				info->id->device, response->packet,
   1049				info->max_receive_size, DMA_FROM_DEVICE);
   1050	if (ib_dma_mapping_error(info->id->device, response->sge.addr))
   1051		return rc;
   1052
   1053	response->sge.length = info->max_receive_size;
   1054	response->sge.lkey = info->pd->local_dma_lkey;
   1055
   1056	response->cqe.done = recv_done;
   1057
   1058	recv_wr.wr_cqe = &response->cqe;
   1059	recv_wr.next = NULL;
   1060	recv_wr.sg_list = &response->sge;
   1061	recv_wr.num_sge = 1;
   1062
   1063	rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
   1064	if (rc) {
   1065		ib_dma_unmap_single(info->id->device, response->sge.addr,
   1066				    response->sge.length, DMA_FROM_DEVICE);
   1067		smbd_disconnect_rdma_connection(info);
   1068		log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
   1069	}
   1070
   1071	return rc;
   1072}
   1073
   1074/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
   1075static int smbd_negotiate(struct smbd_connection *info)
   1076{
   1077	int rc;
   1078	struct smbd_response *response = get_receive_buffer(info);
   1079
   1080	response->type = SMBD_NEGOTIATE_RESP;
   1081	rc = smbd_post_recv(info, response);
   1082	log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
   1083		       rc, response->sge.addr,
   1084		       response->sge.length, response->sge.lkey);
   1085	if (rc)
   1086		return rc;
   1087
   1088	init_completion(&info->negotiate_completion);
   1089	info->negotiate_done = false;
   1090	rc = smbd_post_send_negotiate_req(info);
   1091	if (rc)
   1092		return rc;
   1093
   1094	rc = wait_for_completion_interruptible_timeout(
   1095		&info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
   1096	log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
   1097
   1098	if (info->negotiate_done)
   1099		return 0;
   1100
   1101	if (rc == 0)
   1102		rc = -ETIMEDOUT;
   1103	else if (rc == -ERESTARTSYS)
   1104		rc = -EINTR;
   1105	else
   1106		rc = -ENOTCONN;
   1107
   1108	return rc;
   1109}
   1110
   1111static void put_empty_packet(
   1112		struct smbd_connection *info, struct smbd_response *response)
   1113{
   1114	spin_lock(&info->empty_packet_queue_lock);
   1115	list_add_tail(&response->list, &info->empty_packet_queue);
   1116	info->count_empty_packet_queue++;
   1117	spin_unlock(&info->empty_packet_queue_lock);
   1118
   1119	queue_work(info->workqueue, &info->post_send_credits_work);
   1120}
   1121
   1122/*
   1123 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
   1124 * This is a queue for reassembling upper layer payload and present to upper
   1125 * layer. All the inncoming payload go to the reassembly queue, regardless of
   1126 * if reassembly is required. The uuper layer code reads from the queue for all
   1127 * incoming payloads.
   1128 * Put a received packet to the reassembly queue
   1129 * response: the packet received
   1130 * data_length: the size of payload in this packet
   1131 */
   1132static void enqueue_reassembly(
   1133	struct smbd_connection *info,
   1134	struct smbd_response *response,
   1135	int data_length)
   1136{
   1137	spin_lock(&info->reassembly_queue_lock);
   1138	list_add_tail(&response->list, &info->reassembly_queue);
   1139	info->reassembly_queue_length++;
   1140	/*
   1141	 * Make sure reassembly_data_length is updated after list and
   1142	 * reassembly_queue_length are updated. On the dequeue side
   1143	 * reassembly_data_length is checked without a lock to determine
   1144	 * if reassembly_queue_length and list is up to date
   1145	 */
   1146	virt_wmb();
   1147	info->reassembly_data_length += data_length;
   1148	spin_unlock(&info->reassembly_queue_lock);
   1149	info->count_reassembly_queue++;
   1150	info->count_enqueue_reassembly_queue++;
   1151}
   1152
   1153/*
   1154 * Get the first entry at the front of reassembly queue
   1155 * Caller is responsible for locking
   1156 * return value: the first entry if any, NULL if queue is empty
   1157 */
   1158static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
   1159{
   1160	struct smbd_response *ret = NULL;
   1161
   1162	if (!list_empty(&info->reassembly_queue)) {
   1163		ret = list_first_entry(
   1164			&info->reassembly_queue,
   1165			struct smbd_response, list);
   1166	}
   1167	return ret;
   1168}
   1169
   1170static struct smbd_response *get_empty_queue_buffer(
   1171		struct smbd_connection *info)
   1172{
   1173	struct smbd_response *ret = NULL;
   1174	unsigned long flags;
   1175
   1176	spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
   1177	if (!list_empty(&info->empty_packet_queue)) {
   1178		ret = list_first_entry(
   1179			&info->empty_packet_queue,
   1180			struct smbd_response, list);
   1181		list_del(&ret->list);
   1182		info->count_empty_packet_queue--;
   1183	}
   1184	spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
   1185
   1186	return ret;
   1187}
   1188
   1189/*
   1190 * Get a receive buffer
   1191 * For each remote send, we need to post a receive. The receive buffers are
   1192 * pre-allocated in advance.
   1193 * return value: the receive buffer, NULL if none is available
   1194 */
   1195static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
   1196{
   1197	struct smbd_response *ret = NULL;
   1198	unsigned long flags;
   1199
   1200	spin_lock_irqsave(&info->receive_queue_lock, flags);
   1201	if (!list_empty(&info->receive_queue)) {
   1202		ret = list_first_entry(
   1203			&info->receive_queue,
   1204			struct smbd_response, list);
   1205		list_del(&ret->list);
   1206		info->count_receive_queue--;
   1207		info->count_get_receive_buffer++;
   1208	}
   1209	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
   1210
   1211	return ret;
   1212}
   1213
   1214/*
   1215 * Return a receive buffer
   1216 * Upon returning of a receive buffer, we can post new receive and extend
   1217 * more receive credits to remote peer. This is done immediately after a
   1218 * receive buffer is returned.
   1219 */
   1220static void put_receive_buffer(
   1221	struct smbd_connection *info, struct smbd_response *response)
   1222{
   1223	unsigned long flags;
   1224
   1225	ib_dma_unmap_single(info->id->device, response->sge.addr,
   1226		response->sge.length, DMA_FROM_DEVICE);
   1227
   1228	spin_lock_irqsave(&info->receive_queue_lock, flags);
   1229	list_add_tail(&response->list, &info->receive_queue);
   1230	info->count_receive_queue++;
   1231	info->count_put_receive_buffer++;
   1232	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
   1233
   1234	queue_work(info->workqueue, &info->post_send_credits_work);
   1235}
   1236
   1237/* Preallocate all receive buffer on transport establishment */
   1238static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
   1239{
   1240	int i;
   1241	struct smbd_response *response;
   1242
   1243	INIT_LIST_HEAD(&info->reassembly_queue);
   1244	spin_lock_init(&info->reassembly_queue_lock);
   1245	info->reassembly_data_length = 0;
   1246	info->reassembly_queue_length = 0;
   1247
   1248	INIT_LIST_HEAD(&info->receive_queue);
   1249	spin_lock_init(&info->receive_queue_lock);
   1250	info->count_receive_queue = 0;
   1251
   1252	INIT_LIST_HEAD(&info->empty_packet_queue);
   1253	spin_lock_init(&info->empty_packet_queue_lock);
   1254	info->count_empty_packet_queue = 0;
   1255
   1256	init_waitqueue_head(&info->wait_receive_queues);
   1257
   1258	for (i = 0; i < num_buf; i++) {
   1259		response = mempool_alloc(info->response_mempool, GFP_KERNEL);
   1260		if (!response)
   1261			goto allocate_failed;
   1262
   1263		response->info = info;
   1264		list_add_tail(&response->list, &info->receive_queue);
   1265		info->count_receive_queue++;
   1266	}
   1267
   1268	return 0;
   1269
   1270allocate_failed:
   1271	while (!list_empty(&info->receive_queue)) {
   1272		response = list_first_entry(
   1273				&info->receive_queue,
   1274				struct smbd_response, list);
   1275		list_del(&response->list);
   1276		info->count_receive_queue--;
   1277
   1278		mempool_free(response, info->response_mempool);
   1279	}
   1280	return -ENOMEM;
   1281}
   1282
   1283static void destroy_receive_buffers(struct smbd_connection *info)
   1284{
   1285	struct smbd_response *response;
   1286
   1287	while ((response = get_receive_buffer(info)))
   1288		mempool_free(response, info->response_mempool);
   1289
   1290	while ((response = get_empty_queue_buffer(info)))
   1291		mempool_free(response, info->response_mempool);
   1292}
   1293
   1294/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
   1295static void idle_connection_timer(struct work_struct *work)
   1296{
   1297	struct smbd_connection *info = container_of(
   1298					work, struct smbd_connection,
   1299					idle_timer_work.work);
   1300
   1301	if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
   1302		log_keep_alive(ERR,
   1303			"error status info->keep_alive_requested=%d\n",
   1304			info->keep_alive_requested);
   1305		smbd_disconnect_rdma_connection(info);
   1306		return;
   1307	}
   1308
   1309	log_keep_alive(INFO, "about to send an empty idle message\n");
   1310	smbd_post_send_empty(info);
   1311
   1312	/* Setup the next idle timeout work */
   1313	queue_delayed_work(info->workqueue, &info->idle_timer_work,
   1314			info->keep_alive_interval*HZ);
   1315}
   1316
   1317/*
   1318 * Destroy the transport and related RDMA and memory resources
   1319 * Need to go through all the pending counters and make sure on one is using
   1320 * the transport while it is destroyed
   1321 */
   1322void smbd_destroy(struct TCP_Server_Info *server)
   1323{
   1324	struct smbd_connection *info = server->smbd_conn;
   1325	struct smbd_response *response;
   1326	unsigned long flags;
   1327
   1328	if (!info) {
   1329		log_rdma_event(INFO, "rdma session already destroyed\n");
   1330		return;
   1331	}
   1332
   1333	log_rdma_event(INFO, "destroying rdma session\n");
   1334	if (info->transport_status != SMBD_DISCONNECTED) {
   1335		rdma_disconnect(server->smbd_conn->id);
   1336		log_rdma_event(INFO, "wait for transport being disconnected\n");
   1337		wait_event_interruptible(
   1338			info->disconn_wait,
   1339			info->transport_status == SMBD_DISCONNECTED);
   1340	}
   1341
   1342	log_rdma_event(INFO, "destroying qp\n");
   1343	ib_drain_qp(info->id->qp);
   1344	rdma_destroy_qp(info->id);
   1345
   1346	log_rdma_event(INFO, "cancelling idle timer\n");
   1347	cancel_delayed_work_sync(&info->idle_timer_work);
   1348
   1349	log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
   1350	wait_event(info->wait_send_pending,
   1351		atomic_read(&info->send_pending) == 0);
   1352
   1353	/* It's not possible for upper layer to get to reassembly */
   1354	log_rdma_event(INFO, "drain the reassembly queue\n");
   1355	do {
   1356		spin_lock_irqsave(&info->reassembly_queue_lock, flags);
   1357		response = _get_first_reassembly(info);
   1358		if (response) {
   1359			list_del(&response->list);
   1360			spin_unlock_irqrestore(
   1361				&info->reassembly_queue_lock, flags);
   1362			put_receive_buffer(info, response);
   1363		} else
   1364			spin_unlock_irqrestore(
   1365				&info->reassembly_queue_lock, flags);
   1366	} while (response);
   1367	info->reassembly_data_length = 0;
   1368
   1369	log_rdma_event(INFO, "free receive buffers\n");
   1370	wait_event(info->wait_receive_queues,
   1371		info->count_receive_queue + info->count_empty_packet_queue
   1372			== info->receive_credit_max);
   1373	destroy_receive_buffers(info);
   1374
   1375	/*
   1376	 * For performance reasons, memory registration and deregistration
   1377	 * are not locked by srv_mutex. It is possible some processes are
   1378	 * blocked on transport srv_mutex while holding memory registration.
   1379	 * Release the transport srv_mutex to allow them to hit the failure
   1380	 * path when sending data, and then release memory registartions.
   1381	 */
   1382	log_rdma_event(INFO, "freeing mr list\n");
   1383	wake_up_interruptible_all(&info->wait_mr);
   1384	while (atomic_read(&info->mr_used_count)) {
   1385		cifs_server_unlock(server);
   1386		msleep(1000);
   1387		cifs_server_lock(server);
   1388	}
   1389	destroy_mr_list(info);
   1390
   1391	ib_free_cq(info->send_cq);
   1392	ib_free_cq(info->recv_cq);
   1393	ib_dealloc_pd(info->pd);
   1394	rdma_destroy_id(info->id);
   1395
   1396	/* free mempools */
   1397	mempool_destroy(info->request_mempool);
   1398	kmem_cache_destroy(info->request_cache);
   1399
   1400	mempool_destroy(info->response_mempool);
   1401	kmem_cache_destroy(info->response_cache);
   1402
   1403	info->transport_status = SMBD_DESTROYED;
   1404
   1405	destroy_workqueue(info->workqueue);
   1406	log_rdma_event(INFO,  "rdma session destroyed\n");
   1407	kfree(info);
   1408}
   1409
   1410/*
   1411 * Reconnect this SMBD connection, called from upper layer
   1412 * return value: 0 on success, or actual error code
   1413 */
   1414int smbd_reconnect(struct TCP_Server_Info *server)
   1415{
   1416	log_rdma_event(INFO, "reconnecting rdma session\n");
   1417
   1418	if (!server->smbd_conn) {
   1419		log_rdma_event(INFO, "rdma session already destroyed\n");
   1420		goto create_conn;
   1421	}
   1422
   1423	/*
   1424	 * This is possible if transport is disconnected and we haven't received
   1425	 * notification from RDMA, but upper layer has detected timeout
   1426	 */
   1427	if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
   1428		log_rdma_event(INFO, "disconnecting transport\n");
   1429		smbd_destroy(server);
   1430	}
   1431
   1432create_conn:
   1433	log_rdma_event(INFO, "creating rdma session\n");
   1434	server->smbd_conn = smbd_get_connection(
   1435		server, (struct sockaddr *) &server->dstaddr);
   1436
   1437	if (server->smbd_conn)
   1438		cifs_dbg(VFS, "RDMA transport re-established\n");
   1439
   1440	return server->smbd_conn ? 0 : -ENOENT;
   1441}
   1442
   1443static void destroy_caches_and_workqueue(struct smbd_connection *info)
   1444{
   1445	destroy_receive_buffers(info);
   1446	destroy_workqueue(info->workqueue);
   1447	mempool_destroy(info->response_mempool);
   1448	kmem_cache_destroy(info->response_cache);
   1449	mempool_destroy(info->request_mempool);
   1450	kmem_cache_destroy(info->request_cache);
   1451}
   1452
   1453#define MAX_NAME_LEN	80
   1454static int allocate_caches_and_workqueue(struct smbd_connection *info)
   1455{
   1456	char name[MAX_NAME_LEN];
   1457	int rc;
   1458
   1459	scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
   1460	info->request_cache =
   1461		kmem_cache_create(
   1462			name,
   1463			sizeof(struct smbd_request) +
   1464				sizeof(struct smbd_data_transfer),
   1465			0, SLAB_HWCACHE_ALIGN, NULL);
   1466	if (!info->request_cache)
   1467		return -ENOMEM;
   1468
   1469	info->request_mempool =
   1470		mempool_create(info->send_credit_target, mempool_alloc_slab,
   1471			mempool_free_slab, info->request_cache);
   1472	if (!info->request_mempool)
   1473		goto out1;
   1474
   1475	scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
   1476	info->response_cache =
   1477		kmem_cache_create(
   1478			name,
   1479			sizeof(struct smbd_response) +
   1480				info->max_receive_size,
   1481			0, SLAB_HWCACHE_ALIGN, NULL);
   1482	if (!info->response_cache)
   1483		goto out2;
   1484
   1485	info->response_mempool =
   1486		mempool_create(info->receive_credit_max, mempool_alloc_slab,
   1487		       mempool_free_slab, info->response_cache);
   1488	if (!info->response_mempool)
   1489		goto out3;
   1490
   1491	scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
   1492	info->workqueue = create_workqueue(name);
   1493	if (!info->workqueue)
   1494		goto out4;
   1495
   1496	rc = allocate_receive_buffers(info, info->receive_credit_max);
   1497	if (rc) {
   1498		log_rdma_event(ERR, "failed to allocate receive buffers\n");
   1499		goto out5;
   1500	}
   1501
   1502	return 0;
   1503
   1504out5:
   1505	destroy_workqueue(info->workqueue);
   1506out4:
   1507	mempool_destroy(info->response_mempool);
   1508out3:
   1509	kmem_cache_destroy(info->response_cache);
   1510out2:
   1511	mempool_destroy(info->request_mempool);
   1512out1:
   1513	kmem_cache_destroy(info->request_cache);
   1514	return -ENOMEM;
   1515}
   1516
   1517/* Create a SMBD connection, called by upper layer */
   1518static struct smbd_connection *_smbd_get_connection(
   1519	struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
   1520{
   1521	int rc;
   1522	struct smbd_connection *info;
   1523	struct rdma_conn_param conn_param;
   1524	struct ib_qp_init_attr qp_attr;
   1525	struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
   1526	struct ib_port_immutable port_immutable;
   1527	u32 ird_ord_hdr[2];
   1528
   1529	info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
   1530	if (!info)
   1531		return NULL;
   1532
   1533	info->transport_status = SMBD_CONNECTING;
   1534	rc = smbd_ia_open(info, dstaddr, port);
   1535	if (rc) {
   1536		log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
   1537		goto create_id_failed;
   1538	}
   1539
   1540	if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
   1541	    smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
   1542		log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
   1543			       smbd_send_credit_target,
   1544			       info->id->device->attrs.max_cqe,
   1545			       info->id->device->attrs.max_qp_wr);
   1546		goto config_failed;
   1547	}
   1548
   1549	if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
   1550	    smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
   1551		log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
   1552			       smbd_receive_credit_max,
   1553			       info->id->device->attrs.max_cqe,
   1554			       info->id->device->attrs.max_qp_wr);
   1555		goto config_failed;
   1556	}
   1557
   1558	info->receive_credit_max = smbd_receive_credit_max;
   1559	info->send_credit_target = smbd_send_credit_target;
   1560	info->max_send_size = smbd_max_send_size;
   1561	info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
   1562	info->max_receive_size = smbd_max_receive_size;
   1563	info->keep_alive_interval = smbd_keep_alive_interval;
   1564
   1565	if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
   1566		log_rdma_event(ERR,
   1567			"warning: device max_send_sge = %d too small\n",
   1568			info->id->device->attrs.max_send_sge);
   1569		log_rdma_event(ERR, "Queue Pair creation may fail\n");
   1570	}
   1571	if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
   1572		log_rdma_event(ERR,
   1573			"warning: device max_recv_sge = %d too small\n",
   1574			info->id->device->attrs.max_recv_sge);
   1575		log_rdma_event(ERR, "Queue Pair creation may fail\n");
   1576	}
   1577
   1578	info->send_cq = NULL;
   1579	info->recv_cq = NULL;
   1580	info->send_cq =
   1581		ib_alloc_cq_any(info->id->device, info,
   1582				info->send_credit_target, IB_POLL_SOFTIRQ);
   1583	if (IS_ERR(info->send_cq)) {
   1584		info->send_cq = NULL;
   1585		goto alloc_cq_failed;
   1586	}
   1587
   1588	info->recv_cq =
   1589		ib_alloc_cq_any(info->id->device, info,
   1590				info->receive_credit_max, IB_POLL_SOFTIRQ);
   1591	if (IS_ERR(info->recv_cq)) {
   1592		info->recv_cq = NULL;
   1593		goto alloc_cq_failed;
   1594	}
   1595
   1596	memset(&qp_attr, 0, sizeof(qp_attr));
   1597	qp_attr.event_handler = smbd_qp_async_error_upcall;
   1598	qp_attr.qp_context = info;
   1599	qp_attr.cap.max_send_wr = info->send_credit_target;
   1600	qp_attr.cap.max_recv_wr = info->receive_credit_max;
   1601	qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
   1602	qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
   1603	qp_attr.cap.max_inline_data = 0;
   1604	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
   1605	qp_attr.qp_type = IB_QPT_RC;
   1606	qp_attr.send_cq = info->send_cq;
   1607	qp_attr.recv_cq = info->recv_cq;
   1608	qp_attr.port_num = ~0;
   1609
   1610	rc = rdma_create_qp(info->id, info->pd, &qp_attr);
   1611	if (rc) {
   1612		log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
   1613		goto create_qp_failed;
   1614	}
   1615
   1616	memset(&conn_param, 0, sizeof(conn_param));
   1617	conn_param.initiator_depth = 0;
   1618
   1619	conn_param.responder_resources =
   1620		info->id->device->attrs.max_qp_rd_atom
   1621			< SMBD_CM_RESPONDER_RESOURCES ?
   1622		info->id->device->attrs.max_qp_rd_atom :
   1623		SMBD_CM_RESPONDER_RESOURCES;
   1624	info->responder_resources = conn_param.responder_resources;
   1625	log_rdma_mr(INFO, "responder_resources=%d\n",
   1626		info->responder_resources);
   1627
   1628	/* Need to send IRD/ORD in private data for iWARP */
   1629	info->id->device->ops.get_port_immutable(
   1630		info->id->device, info->id->port_num, &port_immutable);
   1631	if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
   1632		ird_ord_hdr[0] = info->responder_resources;
   1633		ird_ord_hdr[1] = 1;
   1634		conn_param.private_data = ird_ord_hdr;
   1635		conn_param.private_data_len = sizeof(ird_ord_hdr);
   1636	} else {
   1637		conn_param.private_data = NULL;
   1638		conn_param.private_data_len = 0;
   1639	}
   1640
   1641	conn_param.retry_count = SMBD_CM_RETRY;
   1642	conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
   1643	conn_param.flow_control = 0;
   1644
   1645	log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
   1646		&addr_in->sin_addr, port);
   1647
   1648	init_waitqueue_head(&info->conn_wait);
   1649	init_waitqueue_head(&info->disconn_wait);
   1650	init_waitqueue_head(&info->wait_reassembly_queue);
   1651	rc = rdma_connect(info->id, &conn_param);
   1652	if (rc) {
   1653		log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
   1654		goto rdma_connect_failed;
   1655	}
   1656
   1657	wait_event_interruptible(
   1658		info->conn_wait, info->transport_status != SMBD_CONNECTING);
   1659
   1660	if (info->transport_status != SMBD_CONNECTED) {
   1661		log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
   1662		goto rdma_connect_failed;
   1663	}
   1664
   1665	log_rdma_event(INFO, "rdma_connect connected\n");
   1666
   1667	rc = allocate_caches_and_workqueue(info);
   1668	if (rc) {
   1669		log_rdma_event(ERR, "cache allocation failed\n");
   1670		goto allocate_cache_failed;
   1671	}
   1672
   1673	init_waitqueue_head(&info->wait_send_queue);
   1674	INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
   1675	queue_delayed_work(info->workqueue, &info->idle_timer_work,
   1676		info->keep_alive_interval*HZ);
   1677
   1678	init_waitqueue_head(&info->wait_send_pending);
   1679	atomic_set(&info->send_pending, 0);
   1680
   1681	init_waitqueue_head(&info->wait_post_send);
   1682
   1683	INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
   1684	INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
   1685	info->new_credits_offered = 0;
   1686	spin_lock_init(&info->lock_new_credits_offered);
   1687
   1688	rc = smbd_negotiate(info);
   1689	if (rc) {
   1690		log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
   1691		goto negotiation_failed;
   1692	}
   1693
   1694	rc = allocate_mr_list(info);
   1695	if (rc) {
   1696		log_rdma_mr(ERR, "memory registration allocation failed\n");
   1697		goto allocate_mr_failed;
   1698	}
   1699
   1700	return info;
   1701
   1702allocate_mr_failed:
   1703	/* At this point, need to a full transport shutdown */
   1704	smbd_destroy(server);
   1705	return NULL;
   1706
   1707negotiation_failed:
   1708	cancel_delayed_work_sync(&info->idle_timer_work);
   1709	destroy_caches_and_workqueue(info);
   1710	info->transport_status = SMBD_NEGOTIATE_FAILED;
   1711	init_waitqueue_head(&info->conn_wait);
   1712	rdma_disconnect(info->id);
   1713	wait_event(info->conn_wait,
   1714		info->transport_status == SMBD_DISCONNECTED);
   1715
   1716allocate_cache_failed:
   1717rdma_connect_failed:
   1718	rdma_destroy_qp(info->id);
   1719
   1720create_qp_failed:
   1721alloc_cq_failed:
   1722	if (info->send_cq)
   1723		ib_free_cq(info->send_cq);
   1724	if (info->recv_cq)
   1725		ib_free_cq(info->recv_cq);
   1726
   1727config_failed:
   1728	ib_dealloc_pd(info->pd);
   1729	rdma_destroy_id(info->id);
   1730
   1731create_id_failed:
   1732	kfree(info);
   1733	return NULL;
   1734}
   1735
   1736struct smbd_connection *smbd_get_connection(
   1737	struct TCP_Server_Info *server, struct sockaddr *dstaddr)
   1738{
   1739	struct smbd_connection *ret;
   1740	int port = SMBD_PORT;
   1741
   1742try_again:
   1743	ret = _smbd_get_connection(server, dstaddr, port);
   1744
   1745	/* Try SMB_PORT if SMBD_PORT doesn't work */
   1746	if (!ret && port == SMBD_PORT) {
   1747		port = SMB_PORT;
   1748		goto try_again;
   1749	}
   1750	return ret;
   1751}
   1752
   1753/*
   1754 * Receive data from receive reassembly queue
   1755 * All the incoming data packets are placed in reassembly queue
   1756 * buf: the buffer to read data into
   1757 * size: the length of data to read
   1758 * return value: actual data read
   1759 * Note: this implementation copies the data from reassebmly queue to receive
   1760 * buffers used by upper layer. This is not the optimal code path. A better way
   1761 * to do it is to not have upper layer allocate its receive buffers but rather
   1762 * borrow the buffer from reassembly queue, and return it after data is
   1763 * consumed. But this will require more changes to upper layer code, and also
   1764 * need to consider packet boundaries while they still being reassembled.
   1765 */
   1766static int smbd_recv_buf(struct smbd_connection *info, char *buf,
   1767		unsigned int size)
   1768{
   1769	struct smbd_response *response;
   1770	struct smbd_data_transfer *data_transfer;
   1771	int to_copy, to_read, data_read, offset;
   1772	u32 data_length, remaining_data_length, data_offset;
   1773	int rc;
   1774
   1775again:
   1776	/*
   1777	 * No need to hold the reassembly queue lock all the time as we are
   1778	 * the only one reading from the front of the queue. The transport
   1779	 * may add more entries to the back of the queue at the same time
   1780	 */
   1781	log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
   1782		info->reassembly_data_length);
   1783	if (info->reassembly_data_length >= size) {
   1784		int queue_length;
   1785		int queue_removed = 0;
   1786
   1787		/*
   1788		 * Need to make sure reassembly_data_length is read before
   1789		 * reading reassembly_queue_length and calling
   1790		 * _get_first_reassembly. This call is lock free
   1791		 * as we never read at the end of the queue which are being
   1792		 * updated in SOFTIRQ as more data is received
   1793		 */
   1794		virt_rmb();
   1795		queue_length = info->reassembly_queue_length;
   1796		data_read = 0;
   1797		to_read = size;
   1798		offset = info->first_entry_offset;
   1799		while (data_read < size) {
   1800			response = _get_first_reassembly(info);
   1801			data_transfer = smbd_response_payload(response);
   1802			data_length = le32_to_cpu(data_transfer->data_length);
   1803			remaining_data_length =
   1804				le32_to_cpu(
   1805					data_transfer->remaining_data_length);
   1806			data_offset = le32_to_cpu(data_transfer->data_offset);
   1807
   1808			/*
   1809			 * The upper layer expects RFC1002 length at the
   1810			 * beginning of the payload. Return it to indicate
   1811			 * the total length of the packet. This minimize the
   1812			 * change to upper layer packet processing logic. This
   1813			 * will be eventually remove when an intermediate
   1814			 * transport layer is added
   1815			 */
   1816			if (response->first_segment && size == 4) {
   1817				unsigned int rfc1002_len =
   1818					data_length + remaining_data_length;
   1819				*((__be32 *)buf) = cpu_to_be32(rfc1002_len);
   1820				data_read = 4;
   1821				response->first_segment = false;
   1822				log_read(INFO, "returning rfc1002 length %d\n",
   1823					rfc1002_len);
   1824				goto read_rfc1002_done;
   1825			}
   1826
   1827			to_copy = min_t(int, data_length - offset, to_read);
   1828			memcpy(
   1829				buf + data_read,
   1830				(char *)data_transfer + data_offset + offset,
   1831				to_copy);
   1832
   1833			/* move on to the next buffer? */
   1834			if (to_copy == data_length - offset) {
   1835				queue_length--;
   1836				/*
   1837				 * No need to lock if we are not at the
   1838				 * end of the queue
   1839				 */
   1840				if (queue_length)
   1841					list_del(&response->list);
   1842				else {
   1843					spin_lock_irq(
   1844						&info->reassembly_queue_lock);
   1845					list_del(&response->list);
   1846					spin_unlock_irq(
   1847						&info->reassembly_queue_lock);
   1848				}
   1849				queue_removed++;
   1850				info->count_reassembly_queue--;
   1851				info->count_dequeue_reassembly_queue++;
   1852				put_receive_buffer(info, response);
   1853				offset = 0;
   1854				log_read(INFO, "put_receive_buffer offset=0\n");
   1855			} else
   1856				offset += to_copy;
   1857
   1858			to_read -= to_copy;
   1859			data_read += to_copy;
   1860
   1861			log_read(INFO, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
   1862				 to_copy, data_length - offset,
   1863				 to_read, data_read, offset);
   1864		}
   1865
   1866		spin_lock_irq(&info->reassembly_queue_lock);
   1867		info->reassembly_data_length -= data_read;
   1868		info->reassembly_queue_length -= queue_removed;
   1869		spin_unlock_irq(&info->reassembly_queue_lock);
   1870
   1871		info->first_entry_offset = offset;
   1872		log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
   1873			 data_read, info->reassembly_data_length,
   1874			 info->first_entry_offset);
   1875read_rfc1002_done:
   1876		return data_read;
   1877	}
   1878
   1879	log_read(INFO, "wait_event on more data\n");
   1880	rc = wait_event_interruptible(
   1881		info->wait_reassembly_queue,
   1882		info->reassembly_data_length >= size ||
   1883			info->transport_status != SMBD_CONNECTED);
   1884	/* Don't return any data if interrupted */
   1885	if (rc)
   1886		return rc;
   1887
   1888	if (info->transport_status != SMBD_CONNECTED) {
   1889		log_read(ERR, "disconnected\n");
   1890		return -ECONNABORTED;
   1891	}
   1892
   1893	goto again;
   1894}
   1895
   1896/*
   1897 * Receive a page from receive reassembly queue
   1898 * page: the page to read data into
   1899 * to_read: the length of data to read
   1900 * return value: actual data read
   1901 */
   1902static int smbd_recv_page(struct smbd_connection *info,
   1903		struct page *page, unsigned int page_offset,
   1904		unsigned int to_read)
   1905{
   1906	int ret;
   1907	char *to_address;
   1908	void *page_address;
   1909
   1910	/* make sure we have the page ready for read */
   1911	ret = wait_event_interruptible(
   1912		info->wait_reassembly_queue,
   1913		info->reassembly_data_length >= to_read ||
   1914			info->transport_status != SMBD_CONNECTED);
   1915	if (ret)
   1916		return ret;
   1917
   1918	/* now we can read from reassembly queue and not sleep */
   1919	page_address = kmap_atomic(page);
   1920	to_address = (char *) page_address + page_offset;
   1921
   1922	log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
   1923		page, to_address, to_read);
   1924
   1925	ret = smbd_recv_buf(info, to_address, to_read);
   1926	kunmap_atomic(page_address);
   1927
   1928	return ret;
   1929}
   1930
   1931/*
   1932 * Receive data from transport
   1933 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
   1934 * return: total bytes read, or 0. SMB Direct will not do partial read.
   1935 */
   1936int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
   1937{
   1938	char *buf;
   1939	struct page *page;
   1940	unsigned int to_read, page_offset;
   1941	int rc;
   1942
   1943	if (iov_iter_rw(&msg->msg_iter) == WRITE) {
   1944		/* It's a bug in upper layer to get there */
   1945		cifs_dbg(VFS, "Invalid msg iter dir %u\n",
   1946			 iov_iter_rw(&msg->msg_iter));
   1947		rc = -EINVAL;
   1948		goto out;
   1949	}
   1950
   1951	switch (iov_iter_type(&msg->msg_iter)) {
   1952	case ITER_KVEC:
   1953		buf = msg->msg_iter.kvec->iov_base;
   1954		to_read = msg->msg_iter.kvec->iov_len;
   1955		rc = smbd_recv_buf(info, buf, to_read);
   1956		break;
   1957
   1958	case ITER_BVEC:
   1959		page = msg->msg_iter.bvec->bv_page;
   1960		page_offset = msg->msg_iter.bvec->bv_offset;
   1961		to_read = msg->msg_iter.bvec->bv_len;
   1962		rc = smbd_recv_page(info, page, page_offset, to_read);
   1963		break;
   1964
   1965	default:
   1966		/* It's a bug in upper layer to get there */
   1967		cifs_dbg(VFS, "Invalid msg type %d\n",
   1968			 iov_iter_type(&msg->msg_iter));
   1969		rc = -EINVAL;
   1970	}
   1971
   1972out:
   1973	/* SMBDirect will read it all or nothing */
   1974	if (rc > 0)
   1975		msg->msg_iter.count = 0;
   1976	return rc;
   1977}
   1978
   1979/*
   1980 * Send data to transport
   1981 * Each rqst is transported as a SMBDirect payload
   1982 * rqst: the data to write
   1983 * return value: 0 if successfully write, otherwise error code
   1984 */
   1985int smbd_send(struct TCP_Server_Info *server,
   1986	int num_rqst, struct smb_rqst *rqst_array)
   1987{
   1988	struct smbd_connection *info = server->smbd_conn;
   1989	struct kvec vec;
   1990	int nvecs;
   1991	int size;
   1992	unsigned int buflen, remaining_data_length;
   1993	int start, i, j;
   1994	int max_iov_size =
   1995		info->max_send_size - sizeof(struct smbd_data_transfer);
   1996	struct kvec *iov;
   1997	int rc;
   1998	struct smb_rqst *rqst;
   1999	int rqst_idx;
   2000
   2001	if (info->transport_status != SMBD_CONNECTED) {
   2002		rc = -EAGAIN;
   2003		goto done;
   2004	}
   2005
   2006	/*
   2007	 * Add in the page array if there is one. The caller needs to set
   2008	 * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
   2009	 * ends at page boundary
   2010	 */
   2011	remaining_data_length = 0;
   2012	for (i = 0; i < num_rqst; i++)
   2013		remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
   2014
   2015	if (remaining_data_length > info->max_fragmented_send_size) {
   2016		log_write(ERR, "payload size %d > max size %d\n",
   2017			remaining_data_length, info->max_fragmented_send_size);
   2018		rc = -EINVAL;
   2019		goto done;
   2020	}
   2021
   2022	log_write(INFO, "num_rqst=%d total length=%u\n",
   2023			num_rqst, remaining_data_length);
   2024
   2025	rqst_idx = 0;
   2026next_rqst:
   2027	rqst = &rqst_array[rqst_idx];
   2028	iov = rqst->rq_iov;
   2029
   2030	cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
   2031		rqst_idx, smb_rqst_len(server, rqst));
   2032	for (i = 0; i < rqst->rq_nvec; i++)
   2033		dump_smb(iov[i].iov_base, iov[i].iov_len);
   2034
   2035
   2036	log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
   2037		  rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
   2038		  rqst->rq_tailsz, smb_rqst_len(server, rqst));
   2039
   2040	start = i = 0;
   2041	buflen = 0;
   2042	while (true) {
   2043		buflen += iov[i].iov_len;
   2044		if (buflen > max_iov_size) {
   2045			if (i > start) {
   2046				remaining_data_length -=
   2047					(buflen-iov[i].iov_len);
   2048				log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
   2049					  start, i, i - start,
   2050					  remaining_data_length);
   2051				rc = smbd_post_send_data(
   2052					info, &iov[start], i-start,
   2053					remaining_data_length);
   2054				if (rc)
   2055					goto done;
   2056			} else {
   2057				/* iov[start] is too big, break it */
   2058				nvecs = (buflen+max_iov_size-1)/max_iov_size;
   2059				log_write(INFO, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
   2060					  start, iov[start].iov_base,
   2061					  buflen, nvecs);
   2062				for (j = 0; j < nvecs; j++) {
   2063					vec.iov_base =
   2064						(char *)iov[start].iov_base +
   2065						j*max_iov_size;
   2066					vec.iov_len = max_iov_size;
   2067					if (j == nvecs-1)
   2068						vec.iov_len =
   2069							buflen -
   2070							max_iov_size*(nvecs-1);
   2071					remaining_data_length -= vec.iov_len;
   2072					log_write(INFO,
   2073						"sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
   2074						  j, vec.iov_base, vec.iov_len,
   2075						  remaining_data_length);
   2076					rc = smbd_post_send_data(
   2077						info, &vec, 1,
   2078						remaining_data_length);
   2079					if (rc)
   2080						goto done;
   2081				}
   2082				i++;
   2083				if (i == rqst->rq_nvec)
   2084					break;
   2085			}
   2086			start = i;
   2087			buflen = 0;
   2088		} else {
   2089			i++;
   2090			if (i == rqst->rq_nvec) {
   2091				/* send out all remaining vecs */
   2092				remaining_data_length -= buflen;
   2093				log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
   2094					  start, i, i - start,
   2095					  remaining_data_length);
   2096				rc = smbd_post_send_data(info, &iov[start],
   2097					i-start, remaining_data_length);
   2098				if (rc)
   2099					goto done;
   2100				break;
   2101			}
   2102		}
   2103		log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
   2104	}
   2105
   2106	/* now sending pages if there are any */
   2107	for (i = 0; i < rqst->rq_npages; i++) {
   2108		unsigned int offset;
   2109
   2110		rqst_page_get_length(rqst, i, &buflen, &offset);
   2111		nvecs = (buflen + max_iov_size - 1) / max_iov_size;
   2112		log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
   2113			buflen, nvecs);
   2114		for (j = 0; j < nvecs; j++) {
   2115			size = max_iov_size;
   2116			if (j == nvecs-1)
   2117				size = buflen - j*max_iov_size;
   2118			remaining_data_length -= size;
   2119			log_write(INFO, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
   2120				  i, j * max_iov_size + offset, size,
   2121				  remaining_data_length);
   2122			rc = smbd_post_send_page(
   2123				info, rqst->rq_pages[i],
   2124				j*max_iov_size + offset,
   2125				size, remaining_data_length);
   2126			if (rc)
   2127				goto done;
   2128		}
   2129	}
   2130
   2131	rqst_idx++;
   2132	if (rqst_idx < num_rqst)
   2133		goto next_rqst;
   2134
   2135done:
   2136	/*
   2137	 * As an optimization, we don't wait for individual I/O to finish
   2138	 * before sending the next one.
   2139	 * Send them all and wait for pending send count to get to 0
   2140	 * that means all the I/Os have been out and we are good to return
   2141	 */
   2142
   2143	wait_event(info->wait_send_pending,
   2144		atomic_read(&info->send_pending) == 0);
   2145
   2146	return rc;
   2147}
   2148
   2149static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
   2150{
   2151	struct smbd_mr *mr;
   2152	struct ib_cqe *cqe;
   2153
   2154	if (wc->status) {
   2155		log_rdma_mr(ERR, "status=%d\n", wc->status);
   2156		cqe = wc->wr_cqe;
   2157		mr = container_of(cqe, struct smbd_mr, cqe);
   2158		smbd_disconnect_rdma_connection(mr->conn);
   2159	}
   2160}
   2161
   2162/*
   2163 * The work queue function that recovers MRs
   2164 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
   2165 * again. Both calls are slow, so finish them in a workqueue. This will not
   2166 * block I/O path.
   2167 * There is one workqueue that recovers MRs, there is no need to lock as the
   2168 * I/O requests calling smbd_register_mr will never update the links in the
   2169 * mr_list.
   2170 */
   2171static void smbd_mr_recovery_work(struct work_struct *work)
   2172{
   2173	struct smbd_connection *info =
   2174		container_of(work, struct smbd_connection, mr_recovery_work);
   2175	struct smbd_mr *smbdirect_mr;
   2176	int rc;
   2177
   2178	list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
   2179		if (smbdirect_mr->state == MR_ERROR) {
   2180
   2181			/* recover this MR entry */
   2182			rc = ib_dereg_mr(smbdirect_mr->mr);
   2183			if (rc) {
   2184				log_rdma_mr(ERR,
   2185					"ib_dereg_mr failed rc=%x\n",
   2186					rc);
   2187				smbd_disconnect_rdma_connection(info);
   2188				continue;
   2189			}
   2190
   2191			smbdirect_mr->mr = ib_alloc_mr(
   2192				info->pd, info->mr_type,
   2193				info->max_frmr_depth);
   2194			if (IS_ERR(smbdirect_mr->mr)) {
   2195				log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
   2196					    info->mr_type,
   2197					    info->max_frmr_depth);
   2198				smbd_disconnect_rdma_connection(info);
   2199				continue;
   2200			}
   2201		} else
   2202			/* This MR is being used, don't recover it */
   2203			continue;
   2204
   2205		smbdirect_mr->state = MR_READY;
   2206
   2207		/* smbdirect_mr->state is updated by this function
   2208		 * and is read and updated by I/O issuing CPUs trying
   2209		 * to get a MR, the call to atomic_inc_return
   2210		 * implicates a memory barrier and guarantees this
   2211		 * value is updated before waking up any calls to
   2212		 * get_mr() from the I/O issuing CPUs
   2213		 */
   2214		if (atomic_inc_return(&info->mr_ready_count) == 1)
   2215			wake_up_interruptible(&info->wait_mr);
   2216	}
   2217}
   2218
   2219static void destroy_mr_list(struct smbd_connection *info)
   2220{
   2221	struct smbd_mr *mr, *tmp;
   2222
   2223	cancel_work_sync(&info->mr_recovery_work);
   2224	list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
   2225		if (mr->state == MR_INVALIDATED)
   2226			ib_dma_unmap_sg(info->id->device, mr->sgl,
   2227				mr->sgl_count, mr->dir);
   2228		ib_dereg_mr(mr->mr);
   2229		kfree(mr->sgl);
   2230		kfree(mr);
   2231	}
   2232}
   2233
   2234/*
   2235 * Allocate MRs used for RDMA read/write
   2236 * The number of MRs will not exceed hardware capability in responder_resources
   2237 * All MRs are kept in mr_list. The MR can be recovered after it's used
   2238 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
   2239 * as MRs are used and recovered for I/O, but the list links will not change
   2240 */
   2241static int allocate_mr_list(struct smbd_connection *info)
   2242{
   2243	int i;
   2244	struct smbd_mr *smbdirect_mr, *tmp;
   2245
   2246	INIT_LIST_HEAD(&info->mr_list);
   2247	init_waitqueue_head(&info->wait_mr);
   2248	spin_lock_init(&info->mr_list_lock);
   2249	atomic_set(&info->mr_ready_count, 0);
   2250	atomic_set(&info->mr_used_count, 0);
   2251	init_waitqueue_head(&info->wait_for_mr_cleanup);
   2252	/* Allocate more MRs (2x) than hardware responder_resources */
   2253	for (i = 0; i < info->responder_resources * 2; i++) {
   2254		smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
   2255		if (!smbdirect_mr)
   2256			goto out;
   2257		smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
   2258					info->max_frmr_depth);
   2259		if (IS_ERR(smbdirect_mr->mr)) {
   2260			log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
   2261				    info->mr_type, info->max_frmr_depth);
   2262			goto out;
   2263		}
   2264		smbdirect_mr->sgl = kcalloc(
   2265					info->max_frmr_depth,
   2266					sizeof(struct scatterlist),
   2267					GFP_KERNEL);
   2268		if (!smbdirect_mr->sgl) {
   2269			log_rdma_mr(ERR, "failed to allocate sgl\n");
   2270			ib_dereg_mr(smbdirect_mr->mr);
   2271			goto out;
   2272		}
   2273		smbdirect_mr->state = MR_READY;
   2274		smbdirect_mr->conn = info;
   2275
   2276		list_add_tail(&smbdirect_mr->list, &info->mr_list);
   2277		atomic_inc(&info->mr_ready_count);
   2278	}
   2279	INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
   2280	return 0;
   2281
   2282out:
   2283	kfree(smbdirect_mr);
   2284
   2285	list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
   2286		ib_dereg_mr(smbdirect_mr->mr);
   2287		kfree(smbdirect_mr->sgl);
   2288		kfree(smbdirect_mr);
   2289	}
   2290	return -ENOMEM;
   2291}
   2292
   2293/*
   2294 * Get a MR from mr_list. This function waits until there is at least one
   2295 * MR available in the list. It may access the list while the
   2296 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
   2297 * as they never modify the same places. However, there may be several CPUs
   2298 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
   2299 * protect this situation.
   2300 */
   2301static struct smbd_mr *get_mr(struct smbd_connection *info)
   2302{
   2303	struct smbd_mr *ret;
   2304	int rc;
   2305again:
   2306	rc = wait_event_interruptible(info->wait_mr,
   2307		atomic_read(&info->mr_ready_count) ||
   2308		info->transport_status != SMBD_CONNECTED);
   2309	if (rc) {
   2310		log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
   2311		return NULL;
   2312	}
   2313
   2314	if (info->transport_status != SMBD_CONNECTED) {
   2315		log_rdma_mr(ERR, "info->transport_status=%x\n",
   2316			info->transport_status);
   2317		return NULL;
   2318	}
   2319
   2320	spin_lock(&info->mr_list_lock);
   2321	list_for_each_entry(ret, &info->mr_list, list) {
   2322		if (ret->state == MR_READY) {
   2323			ret->state = MR_REGISTERED;
   2324			spin_unlock(&info->mr_list_lock);
   2325			atomic_dec(&info->mr_ready_count);
   2326			atomic_inc(&info->mr_used_count);
   2327			return ret;
   2328		}
   2329	}
   2330
   2331	spin_unlock(&info->mr_list_lock);
   2332	/*
   2333	 * It is possible that we could fail to get MR because other processes may
   2334	 * try to acquire a MR at the same time. If this is the case, retry it.
   2335	 */
   2336	goto again;
   2337}
   2338
   2339/*
   2340 * Register memory for RDMA read/write
   2341 * pages[]: the list of pages to register memory with
   2342 * num_pages: the number of pages to register
   2343 * tailsz: if non-zero, the bytes to register in the last page
   2344 * writing: true if this is a RDMA write (SMB read), false for RDMA read
   2345 * need_invalidate: true if this MR needs to be locally invalidated after I/O
   2346 * return value: the MR registered, NULL if failed.
   2347 */
   2348struct smbd_mr *smbd_register_mr(
   2349	struct smbd_connection *info, struct page *pages[], int num_pages,
   2350	int offset, int tailsz, bool writing, bool need_invalidate)
   2351{
   2352	struct smbd_mr *smbdirect_mr;
   2353	int rc, i;
   2354	enum dma_data_direction dir;
   2355	struct ib_reg_wr *reg_wr;
   2356
   2357	if (num_pages > info->max_frmr_depth) {
   2358		log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
   2359			num_pages, info->max_frmr_depth);
   2360		return NULL;
   2361	}
   2362
   2363	smbdirect_mr = get_mr(info);
   2364	if (!smbdirect_mr) {
   2365		log_rdma_mr(ERR, "get_mr returning NULL\n");
   2366		return NULL;
   2367	}
   2368	smbdirect_mr->need_invalidate = need_invalidate;
   2369	smbdirect_mr->sgl_count = num_pages;
   2370	sg_init_table(smbdirect_mr->sgl, num_pages);
   2371
   2372	log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
   2373			num_pages, offset, tailsz);
   2374
   2375	if (num_pages == 1) {
   2376		sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
   2377		goto skip_multiple_pages;
   2378	}
   2379
   2380	/* We have at least two pages to register */
   2381	sg_set_page(
   2382		&smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
   2383	i = 1;
   2384	while (i < num_pages - 1) {
   2385		sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
   2386		i++;
   2387	}
   2388	sg_set_page(&smbdirect_mr->sgl[i], pages[i],
   2389		tailsz ? tailsz : PAGE_SIZE, 0);
   2390
   2391skip_multiple_pages:
   2392	dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
   2393	smbdirect_mr->dir = dir;
   2394	rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
   2395	if (!rc) {
   2396		log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
   2397			num_pages, dir, rc);
   2398		goto dma_map_error;
   2399	}
   2400
   2401	rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
   2402		NULL, PAGE_SIZE);
   2403	if (rc != num_pages) {
   2404		log_rdma_mr(ERR,
   2405			"ib_map_mr_sg failed rc = %d num_pages = %x\n",
   2406			rc, num_pages);
   2407		goto map_mr_error;
   2408	}
   2409
   2410	ib_update_fast_reg_key(smbdirect_mr->mr,
   2411		ib_inc_rkey(smbdirect_mr->mr->rkey));
   2412	reg_wr = &smbdirect_mr->wr;
   2413	reg_wr->wr.opcode = IB_WR_REG_MR;
   2414	smbdirect_mr->cqe.done = register_mr_done;
   2415	reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
   2416	reg_wr->wr.num_sge = 0;
   2417	reg_wr->wr.send_flags = IB_SEND_SIGNALED;
   2418	reg_wr->mr = smbdirect_mr->mr;
   2419	reg_wr->key = smbdirect_mr->mr->rkey;
   2420	reg_wr->access = writing ?
   2421			IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
   2422			IB_ACCESS_REMOTE_READ;
   2423
   2424	/*
   2425	 * There is no need for waiting for complemtion on ib_post_send
   2426	 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
   2427	 * on the next ib_post_send when we actaully send I/O to remote peer
   2428	 */
   2429	rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
   2430	if (!rc)
   2431		return smbdirect_mr;
   2432
   2433	log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
   2434		rc, reg_wr->key);
   2435
   2436	/* If all failed, attempt to recover this MR by setting it MR_ERROR*/
   2437map_mr_error:
   2438	ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
   2439		smbdirect_mr->sgl_count, smbdirect_mr->dir);
   2440
   2441dma_map_error:
   2442	smbdirect_mr->state = MR_ERROR;
   2443	if (atomic_dec_and_test(&info->mr_used_count))
   2444		wake_up(&info->wait_for_mr_cleanup);
   2445
   2446	smbd_disconnect_rdma_connection(info);
   2447
   2448	return NULL;
   2449}
   2450
   2451static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
   2452{
   2453	struct smbd_mr *smbdirect_mr;
   2454	struct ib_cqe *cqe;
   2455
   2456	cqe = wc->wr_cqe;
   2457	smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
   2458	smbdirect_mr->state = MR_INVALIDATED;
   2459	if (wc->status != IB_WC_SUCCESS) {
   2460		log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
   2461		smbdirect_mr->state = MR_ERROR;
   2462	}
   2463	complete(&smbdirect_mr->invalidate_done);
   2464}
   2465
   2466/*
   2467 * Deregister a MR after I/O is done
   2468 * This function may wait if remote invalidation is not used
   2469 * and we have to locally invalidate the buffer to prevent data is being
   2470 * modified by remote peer after upper layer consumes it
   2471 */
   2472int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
   2473{
   2474	struct ib_send_wr *wr;
   2475	struct smbd_connection *info = smbdirect_mr->conn;
   2476	int rc = 0;
   2477
   2478	if (smbdirect_mr->need_invalidate) {
   2479		/* Need to finish local invalidation before returning */
   2480		wr = &smbdirect_mr->inv_wr;
   2481		wr->opcode = IB_WR_LOCAL_INV;
   2482		smbdirect_mr->cqe.done = local_inv_done;
   2483		wr->wr_cqe = &smbdirect_mr->cqe;
   2484		wr->num_sge = 0;
   2485		wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
   2486		wr->send_flags = IB_SEND_SIGNALED;
   2487
   2488		init_completion(&smbdirect_mr->invalidate_done);
   2489		rc = ib_post_send(info->id->qp, wr, NULL);
   2490		if (rc) {
   2491			log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
   2492			smbd_disconnect_rdma_connection(info);
   2493			goto done;
   2494		}
   2495		wait_for_completion(&smbdirect_mr->invalidate_done);
   2496		smbdirect_mr->need_invalidate = false;
   2497	} else
   2498		/*
   2499		 * For remote invalidation, just set it to MR_INVALIDATED
   2500		 * and defer to mr_recovery_work to recover the MR for next use
   2501		 */
   2502		smbdirect_mr->state = MR_INVALIDATED;
   2503
   2504	if (smbdirect_mr->state == MR_INVALIDATED) {
   2505		ib_dma_unmap_sg(
   2506			info->id->device, smbdirect_mr->sgl,
   2507			smbdirect_mr->sgl_count,
   2508			smbdirect_mr->dir);
   2509		smbdirect_mr->state = MR_READY;
   2510		if (atomic_inc_return(&info->mr_ready_count) == 1)
   2511			wake_up_interruptible(&info->wait_mr);
   2512	} else
   2513		/*
   2514		 * Schedule the work to do MR recovery for future I/Os MR
   2515		 * recovery is slow and don't want it to block current I/O
   2516		 */
   2517		queue_work(info->workqueue, &info->mr_recovery_work);
   2518
   2519done:
   2520	if (atomic_dec_and_test(&info->mr_used_count))
   2521		wake_up(&info->wait_for_mr_cleanup);
   2522
   2523	return rc;
   2524}