cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

rtrs-srv.c (58113B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * RDMA Transport Layer
      4 *
      5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
      6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
      7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
      8 */
      9
     10#undef pr_fmt
     11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
     12
     13#include <linux/module.h>
     14#include <linux/mempool.h>
     15
     16#include "rtrs-srv.h"
     17#include "rtrs-log.h"
     18#include <rdma/ib_cm.h>
     19#include <rdma/ib_verbs.h>
     20
     21MODULE_DESCRIPTION("RDMA Transport Server");
     22MODULE_LICENSE("GPL");
     23
     24/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
     25#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
     26#define DEFAULT_SESS_QUEUE_DEPTH 512
     27#define MAX_HDR_SIZE PAGE_SIZE
     28
     29/* We guarantee to serve 10 paths at least */
     30#define CHUNK_POOL_SZ 10
     31
     32static struct rtrs_rdma_dev_pd dev_pd;
     33static mempool_t *chunk_pool;
     34struct class *rtrs_dev_class;
     35static struct rtrs_srv_ib_ctx ib_ctx;
     36
     37static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
     38static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
     39
     40static bool always_invalidate = true;
     41module_param(always_invalidate, bool, 0444);
     42MODULE_PARM_DESC(always_invalidate,
     43		 "Invalidate memory registration for contiguous memory regions before accessing.");
     44
     45module_param_named(max_chunk_size, max_chunk_size, int, 0444);
     46MODULE_PARM_DESC(max_chunk_size,
     47		 "Max size for each IO request, when change the unit is in byte (default: "
     48		 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
     49
     50module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
     51MODULE_PARM_DESC(sess_queue_depth,
     52		 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
     53		 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
     54		 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
     55
     56static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
     57
     58static struct workqueue_struct *rtrs_wq;
     59
     60static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
     61{
     62	return container_of(c, struct rtrs_srv_con, c);
     63}
     64
     65static inline struct rtrs_srv_path *to_srv_path(struct rtrs_path *s)
     66{
     67	return container_of(s, struct rtrs_srv_path, s);
     68}
     69
     70static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path,
     71				  enum rtrs_srv_state new_state)
     72{
     73	enum rtrs_srv_state old_state;
     74	bool changed = false;
     75
     76	spin_lock_irq(&srv_path->state_lock);
     77	old_state = srv_path->state;
     78	switch (new_state) {
     79	case RTRS_SRV_CONNECTED:
     80		if (old_state == RTRS_SRV_CONNECTING)
     81			changed = true;
     82		break;
     83	case RTRS_SRV_CLOSING:
     84		if (old_state == RTRS_SRV_CONNECTING ||
     85		    old_state == RTRS_SRV_CONNECTED)
     86			changed = true;
     87		break;
     88	case RTRS_SRV_CLOSED:
     89		if (old_state == RTRS_SRV_CLOSING)
     90			changed = true;
     91		break;
     92	default:
     93		break;
     94	}
     95	if (changed)
     96		srv_path->state = new_state;
     97	spin_unlock_irq(&srv_path->state_lock);
     98
     99	return changed;
    100}
    101
    102static void free_id(struct rtrs_srv_op *id)
    103{
    104	if (!id)
    105		return;
    106	kfree(id);
    107}
    108
    109static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path)
    110{
    111	struct rtrs_srv_sess *srv = srv_path->srv;
    112	int i;
    113
    114	if (srv_path->ops_ids) {
    115		for (i = 0; i < srv->queue_depth; i++)
    116			free_id(srv_path->ops_ids[i]);
    117		kfree(srv_path->ops_ids);
    118		srv_path->ops_ids = NULL;
    119	}
    120}
    121
    122static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
    123
    124static struct ib_cqe io_comp_cqe = {
    125	.done = rtrs_srv_rdma_done
    126};
    127
    128static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
    129{
    130	struct rtrs_srv_path *srv_path = container_of(ref,
    131						      struct rtrs_srv_path,
    132						      ids_inflight_ref);
    133
    134	percpu_ref_exit(&srv_path->ids_inflight_ref);
    135	complete(&srv_path->complete_done);
    136}
    137
    138static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path)
    139{
    140	struct rtrs_srv_sess *srv = srv_path->srv;
    141	struct rtrs_srv_op *id;
    142	int i, ret;
    143
    144	srv_path->ops_ids = kcalloc(srv->queue_depth,
    145				    sizeof(*srv_path->ops_ids),
    146				    GFP_KERNEL);
    147	if (!srv_path->ops_ids)
    148		goto err;
    149
    150	for (i = 0; i < srv->queue_depth; ++i) {
    151		id = kzalloc(sizeof(*id), GFP_KERNEL);
    152		if (!id)
    153			goto err;
    154
    155		srv_path->ops_ids[i] = id;
    156	}
    157
    158	ret = percpu_ref_init(&srv_path->ids_inflight_ref,
    159			      rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
    160	if (ret) {
    161		pr_err("Percpu reference init failed\n");
    162		goto err;
    163	}
    164	init_completion(&srv_path->complete_done);
    165
    166	return 0;
    167
    168err:
    169	rtrs_srv_free_ops_ids(srv_path);
    170	return -ENOMEM;
    171}
    172
    173static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path)
    174{
    175	percpu_ref_get(&srv_path->ids_inflight_ref);
    176}
    177
    178static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path)
    179{
    180	percpu_ref_put(&srv_path->ids_inflight_ref);
    181}
    182
    183static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
    184{
    185	struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
    186	struct rtrs_path *s = con->c.path;
    187	struct rtrs_srv_path *srv_path = to_srv_path(s);
    188
    189	if (wc->status != IB_WC_SUCCESS) {
    190		rtrs_err(s, "REG MR failed: %s\n",
    191			  ib_wc_status_msg(wc->status));
    192		close_path(srv_path);
    193		return;
    194	}
    195}
    196
    197static struct ib_cqe local_reg_cqe = {
    198	.done = rtrs_srv_reg_mr_done
    199};
    200
    201static int rdma_write_sg(struct rtrs_srv_op *id)
    202{
    203	struct rtrs_path *s = id->con->c.path;
    204	struct rtrs_srv_path *srv_path = to_srv_path(s);
    205	dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id];
    206	struct rtrs_srv_mr *srv_mr;
    207	struct ib_send_wr inv_wr;
    208	struct ib_rdma_wr imm_wr;
    209	struct ib_rdma_wr *wr = NULL;
    210	enum ib_send_flags flags;
    211	size_t sg_cnt;
    212	int err, offset;
    213	bool need_inval;
    214	u32 rkey = 0;
    215	struct ib_reg_wr rwr;
    216	struct ib_sge *plist;
    217	struct ib_sge list;
    218
    219	sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
    220	need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
    221	if (sg_cnt != 1)
    222		return -EINVAL;
    223
    224	offset = 0;
    225
    226	wr		= &id->tx_wr;
    227	plist		= &id->tx_sg;
    228	plist->addr	= dma_addr + offset;
    229	plist->length	= le32_to_cpu(id->rd_msg->desc[0].len);
    230
    231	/* WR will fail with length error
    232	 * if this is 0
    233	 */
    234	if (plist->length == 0) {
    235		rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
    236		return -EINVAL;
    237	}
    238
    239	plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
    240	offset += plist->length;
    241
    242	wr->wr.sg_list	= plist;
    243	wr->wr.num_sge	= 1;
    244	wr->remote_addr	= le64_to_cpu(id->rd_msg->desc[0].addr);
    245	wr->rkey	= le32_to_cpu(id->rd_msg->desc[0].key);
    246	if (rkey == 0)
    247		rkey = wr->rkey;
    248	else
    249		/* Only one key is actually used */
    250		WARN_ON_ONCE(rkey != wr->rkey);
    251
    252	wr->wr.opcode = IB_WR_RDMA_WRITE;
    253	wr->wr.wr_cqe   = &io_comp_cqe;
    254	wr->wr.ex.imm_data = 0;
    255	wr->wr.send_flags  = 0;
    256
    257	if (need_inval && always_invalidate) {
    258		wr->wr.next = &rwr.wr;
    259		rwr.wr.next = &inv_wr;
    260		inv_wr.next = &imm_wr.wr;
    261	} else if (always_invalidate) {
    262		wr->wr.next = &rwr.wr;
    263		rwr.wr.next = &imm_wr.wr;
    264	} else if (need_inval) {
    265		wr->wr.next = &inv_wr;
    266		inv_wr.next = &imm_wr.wr;
    267	} else {
    268		wr->wr.next = &imm_wr.wr;
    269	}
    270	/*
    271	 * From time to time we have to post signaled sends,
    272	 * or send queue will fill up and only QP reset can help.
    273	 */
    274	flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
    275		0 : IB_SEND_SIGNALED;
    276
    277	if (need_inval) {
    278		inv_wr.sg_list = NULL;
    279		inv_wr.num_sge = 0;
    280		inv_wr.opcode = IB_WR_SEND_WITH_INV;
    281		inv_wr.wr_cqe   = &io_comp_cqe;
    282		inv_wr.send_flags = 0;
    283		inv_wr.ex.invalidate_rkey = rkey;
    284	}
    285
    286	imm_wr.wr.next = NULL;
    287	if (always_invalidate) {
    288		struct rtrs_msg_rkey_rsp *msg;
    289
    290		srv_mr = &srv_path->mrs[id->msg_id];
    291		rwr.wr.opcode = IB_WR_REG_MR;
    292		rwr.wr.wr_cqe = &local_reg_cqe;
    293		rwr.wr.num_sge = 0;
    294		rwr.mr = srv_mr->mr;
    295		rwr.wr.send_flags = 0;
    296		rwr.key = srv_mr->mr->rkey;
    297		rwr.access = (IB_ACCESS_LOCAL_WRITE |
    298			      IB_ACCESS_REMOTE_WRITE);
    299		msg = srv_mr->iu->buf;
    300		msg->buf_id = cpu_to_le16(id->msg_id);
    301		msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
    302		msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
    303
    304		list.addr   = srv_mr->iu->dma_addr;
    305		list.length = sizeof(*msg);
    306		list.lkey   = srv_path->s.dev->ib_pd->local_dma_lkey;
    307		imm_wr.wr.sg_list = &list;
    308		imm_wr.wr.num_sge = 1;
    309		imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
    310		ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
    311					      srv_mr->iu->dma_addr,
    312					      srv_mr->iu->size, DMA_TO_DEVICE);
    313	} else {
    314		imm_wr.wr.sg_list = NULL;
    315		imm_wr.wr.num_sge = 0;
    316		imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
    317	}
    318	imm_wr.wr.send_flags = flags;
    319	imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
    320							     0, need_inval));
    321
    322	imm_wr.wr.wr_cqe   = &io_comp_cqe;
    323	ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr,
    324				      offset, DMA_BIDIRECTIONAL);
    325
    326	err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
    327	if (err)
    328		rtrs_err(s,
    329			  "Posting RDMA-Write-Request to QP failed, err: %d\n",
    330			  err);
    331
    332	return err;
    333}
    334
    335/**
    336 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
    337 *                      requests or on successful WRITE request.
    338 * @con:	the connection to send back result
    339 * @id:		the id associated with the IO
    340 * @errno:	the error number of the IO.
    341 *
    342 * Return 0 on success, errno otherwise.
    343 */
    344static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
    345			    int errno)
    346{
    347	struct rtrs_path *s = con->c.path;
    348	struct rtrs_srv_path *srv_path = to_srv_path(s);
    349	struct ib_send_wr inv_wr, *wr = NULL;
    350	struct ib_rdma_wr imm_wr;
    351	struct ib_reg_wr rwr;
    352	struct rtrs_srv_mr *srv_mr;
    353	bool need_inval = false;
    354	enum ib_send_flags flags;
    355	u32 imm;
    356	int err;
    357
    358	if (id->dir == READ) {
    359		struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
    360		size_t sg_cnt;
    361
    362		need_inval = le16_to_cpu(rd_msg->flags) &
    363				RTRS_MSG_NEED_INVAL_F;
    364		sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
    365
    366		if (need_inval) {
    367			if (sg_cnt) {
    368				inv_wr.wr_cqe   = &io_comp_cqe;
    369				inv_wr.sg_list = NULL;
    370				inv_wr.num_sge = 0;
    371				inv_wr.opcode = IB_WR_SEND_WITH_INV;
    372				inv_wr.send_flags = 0;
    373				/* Only one key is actually used */
    374				inv_wr.ex.invalidate_rkey =
    375					le32_to_cpu(rd_msg->desc[0].key);
    376			} else {
    377				WARN_ON_ONCE(1);
    378				need_inval = false;
    379			}
    380		}
    381	}
    382
    383	if (need_inval && always_invalidate) {
    384		wr = &inv_wr;
    385		inv_wr.next = &rwr.wr;
    386		rwr.wr.next = &imm_wr.wr;
    387	} else if (always_invalidate) {
    388		wr = &rwr.wr;
    389		rwr.wr.next = &imm_wr.wr;
    390	} else if (need_inval) {
    391		wr = &inv_wr;
    392		inv_wr.next = &imm_wr.wr;
    393	} else {
    394		wr = &imm_wr.wr;
    395	}
    396	/*
    397	 * From time to time we have to post signalled sends,
    398	 * or send queue will fill up and only QP reset can help.
    399	 */
    400	flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
    401		0 : IB_SEND_SIGNALED;
    402	imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
    403	imm_wr.wr.next = NULL;
    404	if (always_invalidate) {
    405		struct ib_sge list;
    406		struct rtrs_msg_rkey_rsp *msg;
    407
    408		srv_mr = &srv_path->mrs[id->msg_id];
    409		rwr.wr.next = &imm_wr.wr;
    410		rwr.wr.opcode = IB_WR_REG_MR;
    411		rwr.wr.wr_cqe = &local_reg_cqe;
    412		rwr.wr.num_sge = 0;
    413		rwr.wr.send_flags = 0;
    414		rwr.mr = srv_mr->mr;
    415		rwr.key = srv_mr->mr->rkey;
    416		rwr.access = (IB_ACCESS_LOCAL_WRITE |
    417			      IB_ACCESS_REMOTE_WRITE);
    418		msg = srv_mr->iu->buf;
    419		msg->buf_id = cpu_to_le16(id->msg_id);
    420		msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
    421		msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
    422
    423		list.addr   = srv_mr->iu->dma_addr;
    424		list.length = sizeof(*msg);
    425		list.lkey   = srv_path->s.dev->ib_pd->local_dma_lkey;
    426		imm_wr.wr.sg_list = &list;
    427		imm_wr.wr.num_sge = 1;
    428		imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
    429		ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
    430					      srv_mr->iu->dma_addr,
    431					      srv_mr->iu->size, DMA_TO_DEVICE);
    432	} else {
    433		imm_wr.wr.sg_list = NULL;
    434		imm_wr.wr.num_sge = 0;
    435		imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
    436	}
    437	imm_wr.wr.send_flags = flags;
    438	imm_wr.wr.wr_cqe   = &io_comp_cqe;
    439
    440	imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
    441
    442	err = ib_post_send(id->con->c.qp, wr, NULL);
    443	if (err)
    444		rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
    445			     err);
    446
    447	return err;
    448}
    449
    450void close_path(struct rtrs_srv_path *srv_path)
    451{
    452	if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING))
    453		queue_work(rtrs_wq, &srv_path->close_work);
    454	WARN_ON(srv_path->state != RTRS_SRV_CLOSING);
    455}
    456
    457static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
    458{
    459	switch (state) {
    460	case RTRS_SRV_CONNECTING:
    461		return "RTRS_SRV_CONNECTING";
    462	case RTRS_SRV_CONNECTED:
    463		return "RTRS_SRV_CONNECTED";
    464	case RTRS_SRV_CLOSING:
    465		return "RTRS_SRV_CLOSING";
    466	case RTRS_SRV_CLOSED:
    467		return "RTRS_SRV_CLOSED";
    468	default:
    469		return "UNKNOWN";
    470	}
    471}
    472
    473/**
    474 * rtrs_srv_resp_rdma() - Finish an RDMA request
    475 *
    476 * @id:		Internal RTRS operation identifier
    477 * @status:	Response Code sent to the other side for this operation.
    478 *		0 = success, <=0 error
    479 * Context: any
    480 *
    481 * Finish a RDMA operation. A message is sent to the client and the
    482 * corresponding memory areas will be released.
    483 */
    484bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
    485{
    486	struct rtrs_srv_path *srv_path;
    487	struct rtrs_srv_con *con;
    488	struct rtrs_path *s;
    489	int err;
    490
    491	if (WARN_ON(!id))
    492		return true;
    493
    494	con = id->con;
    495	s = con->c.path;
    496	srv_path = to_srv_path(s);
    497
    498	id->status = status;
    499
    500	if (srv_path->state != RTRS_SRV_CONNECTED) {
    501		rtrs_err_rl(s,
    502			    "Sending I/O response failed,  server path %s is disconnected, path state %s\n",
    503			    kobject_name(&srv_path->kobj),
    504			    rtrs_srv_state_str(srv_path->state));
    505		goto out;
    506	}
    507	if (always_invalidate) {
    508		struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id];
    509
    510		ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
    511	}
    512	if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
    513		rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n",
    514			 kobject_name(&srv_path->kobj),
    515			 con->c.cid);
    516		atomic_add(1, &con->c.sq_wr_avail);
    517		spin_lock(&con->rsp_wr_wait_lock);
    518		list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
    519		spin_unlock(&con->rsp_wr_wait_lock);
    520		return false;
    521	}
    522
    523	if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
    524		err = send_io_resp_imm(con, id, status);
    525	else
    526		err = rdma_write_sg(id);
    527
    528	if (err) {
    529		rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err,
    530			    kobject_name(&srv_path->kobj));
    531		close_path(srv_path);
    532	}
    533out:
    534	rtrs_srv_put_ops_ids(srv_path);
    535	return true;
    536}
    537EXPORT_SYMBOL(rtrs_srv_resp_rdma);
    538
    539/**
    540 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
    541 * @srv:	Session pointer
    542 * @priv:	The private pointer that is associated with the session.
    543 */
    544void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv)
    545{
    546	srv->priv = priv;
    547}
    548EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
    549
    550static void unmap_cont_bufs(struct rtrs_srv_path *srv_path)
    551{
    552	int i;
    553
    554	for (i = 0; i < srv_path->mrs_num; i++) {
    555		struct rtrs_srv_mr *srv_mr;
    556
    557		srv_mr = &srv_path->mrs[i];
    558		rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
    559		ib_dereg_mr(srv_mr->mr);
    560		ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl,
    561				srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
    562		sg_free_table(&srv_mr->sgt);
    563	}
    564	kfree(srv_path->mrs);
    565}
    566
    567static int map_cont_bufs(struct rtrs_srv_path *srv_path)
    568{
    569	struct rtrs_srv_sess *srv = srv_path->srv;
    570	struct rtrs_path *ss = &srv_path->s;
    571	int i, mri, err, mrs_num;
    572	unsigned int chunk_bits;
    573	int chunks_per_mr = 1;
    574
    575	/*
    576	 * Here we map queue_depth chunks to MR.  Firstly we have to
    577	 * figure out how many chunks can we map per MR.
    578	 */
    579	if (always_invalidate) {
    580		/*
    581		 * in order to do invalidate for each chunks of memory, we needs
    582		 * more memory regions.
    583		 */
    584		mrs_num = srv->queue_depth;
    585	} else {
    586		chunks_per_mr =
    587			srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
    588		mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
    589		chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
    590	}
    591
    592	srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL);
    593	if (!srv_path->mrs)
    594		return -ENOMEM;
    595
    596	srv_path->mrs_num = mrs_num;
    597
    598	for (mri = 0; mri < mrs_num; mri++) {
    599		struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri];
    600		struct sg_table *sgt = &srv_mr->sgt;
    601		struct scatterlist *s;
    602		struct ib_mr *mr;
    603		int nr, chunks;
    604
    605		chunks = chunks_per_mr * mri;
    606		if (!always_invalidate)
    607			chunks_per_mr = min_t(int, chunks_per_mr,
    608					      srv->queue_depth - chunks);
    609
    610		err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
    611		if (err)
    612			goto err;
    613
    614		for_each_sg(sgt->sgl, s, chunks_per_mr, i)
    615			sg_set_page(s, srv->chunks[chunks + i],
    616				    max_chunk_size, 0);
    617
    618		nr = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl,
    619				   sgt->nents, DMA_BIDIRECTIONAL);
    620		if (nr < sgt->nents) {
    621			err = nr < 0 ? nr : -EINVAL;
    622			goto free_sg;
    623		}
    624		mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
    625				 sgt->nents);
    626		if (IS_ERR(mr)) {
    627			err = PTR_ERR(mr);
    628			goto unmap_sg;
    629		}
    630		nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
    631				  NULL, max_chunk_size);
    632		if (nr < 0 || nr < sgt->nents) {
    633			err = nr < 0 ? nr : -EINVAL;
    634			goto dereg_mr;
    635		}
    636
    637		if (always_invalidate) {
    638			srv_mr->iu = rtrs_iu_alloc(1,
    639					sizeof(struct rtrs_msg_rkey_rsp),
    640					GFP_KERNEL, srv_path->s.dev->ib_dev,
    641					DMA_TO_DEVICE, rtrs_srv_rdma_done);
    642			if (!srv_mr->iu) {
    643				err = -ENOMEM;
    644				rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
    645				goto dereg_mr;
    646			}
    647		}
    648		/* Eventually dma addr for each chunk can be cached */
    649		for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
    650			srv_path->dma_addr[chunks + i] = sg_dma_address(s);
    651
    652		ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
    653		srv_mr->mr = mr;
    654
    655		continue;
    656err:
    657		while (mri--) {
    658			srv_mr = &srv_path->mrs[mri];
    659			sgt = &srv_mr->sgt;
    660			mr = srv_mr->mr;
    661			rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
    662dereg_mr:
    663			ib_dereg_mr(mr);
    664unmap_sg:
    665			ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl,
    666					sgt->nents, DMA_BIDIRECTIONAL);
    667free_sg:
    668			sg_free_table(sgt);
    669		}
    670		kfree(srv_path->mrs);
    671
    672		return err;
    673	}
    674
    675	chunk_bits = ilog2(srv->queue_depth - 1) + 1;
    676	srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
    677
    678	return 0;
    679}
    680
    681static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
    682{
    683	close_path(to_srv_path(c->path));
    684}
    685
    686static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path)
    687{
    688	rtrs_init_hb(&srv_path->s, &io_comp_cqe,
    689		      RTRS_HB_INTERVAL_MS,
    690		      RTRS_HB_MISSED_MAX,
    691		      rtrs_srv_hb_err_handler,
    692		      rtrs_wq);
    693}
    694
    695static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path)
    696{
    697	rtrs_start_hb(&srv_path->s);
    698}
    699
    700static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path)
    701{
    702	rtrs_stop_hb(&srv_path->s);
    703}
    704
    705static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
    706{
    707	struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
    708	struct rtrs_path *s = con->c.path;
    709	struct rtrs_srv_path *srv_path = to_srv_path(s);
    710	struct rtrs_iu *iu;
    711
    712	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
    713	rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
    714
    715	if (wc->status != IB_WC_SUCCESS) {
    716		rtrs_err(s, "Sess info response send failed: %s\n",
    717			  ib_wc_status_msg(wc->status));
    718		close_path(srv_path);
    719		return;
    720	}
    721	WARN_ON(wc->opcode != IB_WC_SEND);
    722}
    723
    724static void rtrs_srv_path_up(struct rtrs_srv_path *srv_path)
    725{
    726	struct rtrs_srv_sess *srv = srv_path->srv;
    727	struct rtrs_srv_ctx *ctx = srv->ctx;
    728	int up;
    729
    730	mutex_lock(&srv->paths_ev_mutex);
    731	up = ++srv->paths_up;
    732	if (up == 1)
    733		ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
    734	mutex_unlock(&srv->paths_ev_mutex);
    735
    736	/* Mark session as established */
    737	srv_path->established = true;
    738}
    739
    740static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path)
    741{
    742	struct rtrs_srv_sess *srv = srv_path->srv;
    743	struct rtrs_srv_ctx *ctx = srv->ctx;
    744
    745	if (!srv_path->established)
    746		return;
    747
    748	srv_path->established = false;
    749	mutex_lock(&srv->paths_ev_mutex);
    750	WARN_ON(!srv->paths_up);
    751	if (--srv->paths_up == 0)
    752		ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
    753	mutex_unlock(&srv->paths_ev_mutex);
    754}
    755
    756static bool exist_pathname(struct rtrs_srv_ctx *ctx,
    757			   const char *pathname, const uuid_t *path_uuid)
    758{
    759	struct rtrs_srv_sess *srv;
    760	struct rtrs_srv_path *srv_path;
    761	bool found = false;
    762
    763	mutex_lock(&ctx->srv_mutex);
    764	list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
    765		mutex_lock(&srv->paths_mutex);
    766
    767		/* when a client with same uuid and same sessname tried to add a path */
    768		if (uuid_equal(&srv->paths_uuid, path_uuid)) {
    769			mutex_unlock(&srv->paths_mutex);
    770			continue;
    771		}
    772
    773		list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
    774			if (strlen(srv_path->s.sessname) == strlen(pathname) &&
    775			    !strcmp(srv_path->s.sessname, pathname)) {
    776				found = true;
    777				break;
    778			}
    779		}
    780		mutex_unlock(&srv->paths_mutex);
    781		if (found)
    782			break;
    783	}
    784	mutex_unlock(&ctx->srv_mutex);
    785	return found;
    786}
    787
    788static int post_recv_path(struct rtrs_srv_path *srv_path);
    789static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
    790
    791static int process_info_req(struct rtrs_srv_con *con,
    792			    struct rtrs_msg_info_req *msg)
    793{
    794	struct rtrs_path *s = con->c.path;
    795	struct rtrs_srv_path *srv_path = to_srv_path(s);
    796	struct ib_send_wr *reg_wr = NULL;
    797	struct rtrs_msg_info_rsp *rsp;
    798	struct rtrs_iu *tx_iu;
    799	struct ib_reg_wr *rwr;
    800	int mri, err;
    801	size_t tx_sz;
    802
    803	err = post_recv_path(srv_path);
    804	if (err) {
    805		rtrs_err(s, "post_recv_path(), err: %d\n", err);
    806		return err;
    807	}
    808
    809	if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) {
    810		rtrs_err(s, "pathname cannot contain / and .\n");
    811		return -EINVAL;
    812	}
    813
    814	if (exist_pathname(srv_path->srv->ctx,
    815			   msg->pathname, &srv_path->srv->paths_uuid)) {
    816		rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname);
    817		return -EPERM;
    818	}
    819	strscpy(srv_path->s.sessname, msg->pathname,
    820		sizeof(srv_path->s.sessname));
    821
    822	rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL);
    823	if (!rwr)
    824		return -ENOMEM;
    825
    826	tx_sz  = sizeof(*rsp);
    827	tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num;
    828	tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev,
    829			       DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
    830	if (!tx_iu) {
    831		err = -ENOMEM;
    832		goto rwr_free;
    833	}
    834
    835	rsp = tx_iu->buf;
    836	rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
    837	rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num);
    838
    839	for (mri = 0; mri < srv_path->mrs_num; mri++) {
    840		struct ib_mr *mr = srv_path->mrs[mri].mr;
    841
    842		rsp->desc[mri].addr = cpu_to_le64(mr->iova);
    843		rsp->desc[mri].key  = cpu_to_le32(mr->rkey);
    844		rsp->desc[mri].len  = cpu_to_le32(mr->length);
    845
    846		/*
    847		 * Fill in reg MR request and chain them *backwards*
    848		 */
    849		rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
    850		rwr[mri].wr.opcode = IB_WR_REG_MR;
    851		rwr[mri].wr.wr_cqe = &local_reg_cqe;
    852		rwr[mri].wr.num_sge = 0;
    853		rwr[mri].wr.send_flags = 0;
    854		rwr[mri].mr = mr;
    855		rwr[mri].key = mr->rkey;
    856		rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
    857				   IB_ACCESS_REMOTE_WRITE);
    858		reg_wr = &rwr[mri].wr;
    859	}
    860
    861	err = rtrs_srv_create_path_files(srv_path);
    862	if (err)
    863		goto iu_free;
    864	kobject_get(&srv_path->kobj);
    865	get_device(&srv_path->srv->dev);
    866	rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED);
    867	rtrs_srv_start_hb(srv_path);
    868
    869	/*
    870	 * We do not account number of established connections at the current
    871	 * moment, we rely on the client, which should send info request when
    872	 * all connections are successfully established.  Thus, simply notify
    873	 * listener with a proper event if we are the first path.
    874	 */
    875	rtrs_srv_path_up(srv_path);
    876
    877	ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
    878				      tx_iu->dma_addr,
    879				      tx_iu->size, DMA_TO_DEVICE);
    880
    881	/* Send info response */
    882	err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
    883	if (err) {
    884		rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
    885iu_free:
    886		rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1);
    887	}
    888rwr_free:
    889	kfree(rwr);
    890
    891	return err;
    892}
    893
    894static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
    895{
    896	struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
    897	struct rtrs_path *s = con->c.path;
    898	struct rtrs_srv_path *srv_path = to_srv_path(s);
    899	struct rtrs_msg_info_req *msg;
    900	struct rtrs_iu *iu;
    901	int err;
    902
    903	WARN_ON(con->c.cid);
    904
    905	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
    906	if (wc->status != IB_WC_SUCCESS) {
    907		rtrs_err(s, "Sess info request receive failed: %s\n",
    908			  ib_wc_status_msg(wc->status));
    909		goto close;
    910	}
    911	WARN_ON(wc->opcode != IB_WC_RECV);
    912
    913	if (wc->byte_len < sizeof(*msg)) {
    914		rtrs_err(s, "Sess info request is malformed: size %d\n",
    915			  wc->byte_len);
    916		goto close;
    917	}
    918	ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr,
    919				   iu->size, DMA_FROM_DEVICE);
    920	msg = iu->buf;
    921	if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
    922		rtrs_err(s, "Sess info request is malformed: type %d\n",
    923			  le16_to_cpu(msg->type));
    924		goto close;
    925	}
    926	err = process_info_req(con, msg);
    927	if (err)
    928		goto close;
    929
    930out:
    931	rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
    932	return;
    933close:
    934	close_path(srv_path);
    935	goto out;
    936}
    937
    938static int post_recv_info_req(struct rtrs_srv_con *con)
    939{
    940	struct rtrs_path *s = con->c.path;
    941	struct rtrs_srv_path *srv_path = to_srv_path(s);
    942	struct rtrs_iu *rx_iu;
    943	int err;
    944
    945	rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
    946			       GFP_KERNEL, srv_path->s.dev->ib_dev,
    947			       DMA_FROM_DEVICE, rtrs_srv_info_req_done);
    948	if (!rx_iu)
    949		return -ENOMEM;
    950	/* Prepare for getting info response */
    951	err = rtrs_iu_post_recv(&con->c, rx_iu);
    952	if (err) {
    953		rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
    954		rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1);
    955		return err;
    956	}
    957
    958	return 0;
    959}
    960
    961static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
    962{
    963	int i, err;
    964
    965	for (i = 0; i < q_size; i++) {
    966		err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
    967		if (err)
    968			return err;
    969	}
    970
    971	return 0;
    972}
    973
    974static int post_recv_path(struct rtrs_srv_path *srv_path)
    975{
    976	struct rtrs_srv_sess *srv = srv_path->srv;
    977	struct rtrs_path *s = &srv_path->s;
    978	size_t q_size;
    979	int err, cid;
    980
    981	for (cid = 0; cid < srv_path->s.con_num; cid++) {
    982		if (cid == 0)
    983			q_size = SERVICE_CON_QUEUE_DEPTH;
    984		else
    985			q_size = srv->queue_depth;
    986
    987		err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size);
    988		if (err) {
    989			rtrs_err(s, "post_recv_io(), err: %d\n", err);
    990			return err;
    991		}
    992	}
    993
    994	return 0;
    995}
    996
    997static void process_read(struct rtrs_srv_con *con,
    998			 struct rtrs_msg_rdma_read *msg,
    999			 u32 buf_id, u32 off)
   1000{
   1001	struct rtrs_path *s = con->c.path;
   1002	struct rtrs_srv_path *srv_path = to_srv_path(s);
   1003	struct rtrs_srv_sess *srv = srv_path->srv;
   1004	struct rtrs_srv_ctx *ctx = srv->ctx;
   1005	struct rtrs_srv_op *id;
   1006
   1007	size_t usr_len, data_len;
   1008	void *data;
   1009	int ret;
   1010
   1011	if (srv_path->state != RTRS_SRV_CONNECTED) {
   1012		rtrs_err_rl(s,
   1013			     "Processing read request failed,  session is disconnected, sess state %s\n",
   1014			     rtrs_srv_state_str(srv_path->state));
   1015		return;
   1016	}
   1017	if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
   1018		rtrs_err_rl(s,
   1019			    "Processing read request failed, invalid message\n");
   1020		return;
   1021	}
   1022	rtrs_srv_get_ops_ids(srv_path);
   1023	rtrs_srv_update_rdma_stats(srv_path->stats, off, READ);
   1024	id = srv_path->ops_ids[buf_id];
   1025	id->con		= con;
   1026	id->dir		= READ;
   1027	id->msg_id	= buf_id;
   1028	id->rd_msg	= msg;
   1029	usr_len = le16_to_cpu(msg->usr_len);
   1030	data_len = off - usr_len;
   1031	data = page_address(srv->chunks[buf_id]);
   1032	ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len,
   1033			   data + data_len, usr_len);
   1034
   1035	if (ret) {
   1036		rtrs_err_rl(s,
   1037			     "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
   1038			     buf_id, ret);
   1039		goto send_err_msg;
   1040	}
   1041
   1042	return;
   1043
   1044send_err_msg:
   1045	ret = send_io_resp_imm(con, id, ret);
   1046	if (ret < 0) {
   1047		rtrs_err_rl(s,
   1048			     "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
   1049			     buf_id, ret);
   1050		close_path(srv_path);
   1051	}
   1052	rtrs_srv_put_ops_ids(srv_path);
   1053}
   1054
   1055static void process_write(struct rtrs_srv_con *con,
   1056			  struct rtrs_msg_rdma_write *req,
   1057			  u32 buf_id, u32 off)
   1058{
   1059	struct rtrs_path *s = con->c.path;
   1060	struct rtrs_srv_path *srv_path = to_srv_path(s);
   1061	struct rtrs_srv_sess *srv = srv_path->srv;
   1062	struct rtrs_srv_ctx *ctx = srv->ctx;
   1063	struct rtrs_srv_op *id;
   1064
   1065	size_t data_len, usr_len;
   1066	void *data;
   1067	int ret;
   1068
   1069	if (srv_path->state != RTRS_SRV_CONNECTED) {
   1070		rtrs_err_rl(s,
   1071			     "Processing write request failed,  session is disconnected, sess state %s\n",
   1072			     rtrs_srv_state_str(srv_path->state));
   1073		return;
   1074	}
   1075	rtrs_srv_get_ops_ids(srv_path);
   1076	rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE);
   1077	id = srv_path->ops_ids[buf_id];
   1078	id->con    = con;
   1079	id->dir    = WRITE;
   1080	id->msg_id = buf_id;
   1081
   1082	usr_len = le16_to_cpu(req->usr_len);
   1083	data_len = off - usr_len;
   1084	data = page_address(srv->chunks[buf_id]);
   1085	ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len,
   1086			       data + data_len, usr_len);
   1087	if (ret) {
   1088		rtrs_err_rl(s,
   1089			     "Processing write request failed, user module callback reports err: %d\n",
   1090			     ret);
   1091		goto send_err_msg;
   1092	}
   1093
   1094	return;
   1095
   1096send_err_msg:
   1097	ret = send_io_resp_imm(con, id, ret);
   1098	if (ret < 0) {
   1099		rtrs_err_rl(s,
   1100			     "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
   1101			     buf_id, ret);
   1102		close_path(srv_path);
   1103	}
   1104	rtrs_srv_put_ops_ids(srv_path);
   1105}
   1106
   1107static void process_io_req(struct rtrs_srv_con *con, void *msg,
   1108			   u32 id, u32 off)
   1109{
   1110	struct rtrs_path *s = con->c.path;
   1111	struct rtrs_srv_path *srv_path = to_srv_path(s);
   1112	struct rtrs_msg_rdma_hdr *hdr;
   1113	unsigned int type;
   1114
   1115	ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev,
   1116				   srv_path->dma_addr[id],
   1117				   max_chunk_size, DMA_BIDIRECTIONAL);
   1118	hdr = msg;
   1119	type = le16_to_cpu(hdr->type);
   1120
   1121	switch (type) {
   1122	case RTRS_MSG_WRITE:
   1123		process_write(con, msg, id, off);
   1124		break;
   1125	case RTRS_MSG_READ:
   1126		process_read(con, msg, id, off);
   1127		break;
   1128	default:
   1129		rtrs_err(s,
   1130			  "Processing I/O request failed, unknown message type received: 0x%02x\n",
   1131			  type);
   1132		goto err;
   1133	}
   1134
   1135	return;
   1136
   1137err:
   1138	close_path(srv_path);
   1139}
   1140
   1141static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
   1142{
   1143	struct rtrs_srv_mr *mr =
   1144		container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
   1145	struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
   1146	struct rtrs_path *s = con->c.path;
   1147	struct rtrs_srv_path *srv_path = to_srv_path(s);
   1148	struct rtrs_srv_sess *srv = srv_path->srv;
   1149	u32 msg_id, off;
   1150	void *data;
   1151
   1152	if (wc->status != IB_WC_SUCCESS) {
   1153		rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
   1154			  ib_wc_status_msg(wc->status));
   1155		close_path(srv_path);
   1156	}
   1157	msg_id = mr->msg_id;
   1158	off = mr->msg_off;
   1159	data = page_address(srv->chunks[msg_id]) + off;
   1160	process_io_req(con, data, msg_id, off);
   1161}
   1162
   1163static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
   1164			      struct rtrs_srv_mr *mr)
   1165{
   1166	struct ib_send_wr wr = {
   1167		.opcode		    = IB_WR_LOCAL_INV,
   1168		.wr_cqe		    = &mr->inv_cqe,
   1169		.send_flags	    = IB_SEND_SIGNALED,
   1170		.ex.invalidate_rkey = mr->mr->rkey,
   1171	};
   1172	mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
   1173
   1174	return ib_post_send(con->c.qp, &wr, NULL);
   1175}
   1176
   1177static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
   1178{
   1179	spin_lock(&con->rsp_wr_wait_lock);
   1180	while (!list_empty(&con->rsp_wr_wait_list)) {
   1181		struct rtrs_srv_op *id;
   1182		int ret;
   1183
   1184		id = list_entry(con->rsp_wr_wait_list.next,
   1185				struct rtrs_srv_op, wait_list);
   1186		list_del(&id->wait_list);
   1187
   1188		spin_unlock(&con->rsp_wr_wait_lock);
   1189		ret = rtrs_srv_resp_rdma(id, id->status);
   1190		spin_lock(&con->rsp_wr_wait_lock);
   1191
   1192		if (!ret) {
   1193			list_add(&id->wait_list, &con->rsp_wr_wait_list);
   1194			break;
   1195		}
   1196	}
   1197	spin_unlock(&con->rsp_wr_wait_lock);
   1198}
   1199
   1200static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
   1201{
   1202	struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
   1203	struct rtrs_path *s = con->c.path;
   1204	struct rtrs_srv_path *srv_path = to_srv_path(s);
   1205	struct rtrs_srv_sess *srv = srv_path->srv;
   1206	u32 imm_type, imm_payload;
   1207	int err;
   1208
   1209	if (wc->status != IB_WC_SUCCESS) {
   1210		if (wc->status != IB_WC_WR_FLUSH_ERR) {
   1211			rtrs_err(s,
   1212				  "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
   1213				  ib_wc_status_msg(wc->status), wc->wr_cqe,
   1214				  wc->opcode, wc->vendor_err, wc->byte_len);
   1215			close_path(srv_path);
   1216		}
   1217		return;
   1218	}
   1219
   1220	switch (wc->opcode) {
   1221	case IB_WC_RECV_RDMA_WITH_IMM:
   1222		/*
   1223		 * post_recv() RDMA write completions of IO reqs (read/write)
   1224		 * and hb
   1225		 */
   1226		if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
   1227			return;
   1228		err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
   1229		if (err) {
   1230			rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
   1231			close_path(srv_path);
   1232			break;
   1233		}
   1234		rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
   1235			       &imm_type, &imm_payload);
   1236		if (imm_type == RTRS_IO_REQ_IMM) {
   1237			u32 msg_id, off;
   1238			void *data;
   1239
   1240			msg_id = imm_payload >> srv_path->mem_bits;
   1241			off = imm_payload & ((1 << srv_path->mem_bits) - 1);
   1242			if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
   1243				rtrs_err(s, "Wrong msg_id %u, off %u\n",
   1244					  msg_id, off);
   1245				close_path(srv_path);
   1246				return;
   1247			}
   1248			if (always_invalidate) {
   1249				struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id];
   1250
   1251				mr->msg_off = off;
   1252				mr->msg_id = msg_id;
   1253				err = rtrs_srv_inv_rkey(con, mr);
   1254				if (err) {
   1255					rtrs_err(s, "rtrs_post_recv(), err: %d\n",
   1256						  err);
   1257					close_path(srv_path);
   1258					break;
   1259				}
   1260			} else {
   1261				data = page_address(srv->chunks[msg_id]) + off;
   1262				process_io_req(con, data, msg_id, off);
   1263			}
   1264		} else if (imm_type == RTRS_HB_MSG_IMM) {
   1265			WARN_ON(con->c.cid);
   1266			rtrs_send_hb_ack(&srv_path->s);
   1267		} else if (imm_type == RTRS_HB_ACK_IMM) {
   1268			WARN_ON(con->c.cid);
   1269			srv_path->s.hb_missed_cnt = 0;
   1270		} else {
   1271			rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
   1272		}
   1273		break;
   1274	case IB_WC_RDMA_WRITE:
   1275	case IB_WC_SEND:
   1276		/*
   1277		 * post_send() RDMA write completions of IO reqs (read/write)
   1278		 * and hb.
   1279		 */
   1280		atomic_add(s->signal_interval, &con->c.sq_wr_avail);
   1281
   1282		if (!list_empty_careful(&con->rsp_wr_wait_list))
   1283			rtrs_rdma_process_wr_wait_list(con);
   1284
   1285		break;
   1286	default:
   1287		rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
   1288		return;
   1289	}
   1290}
   1291
   1292/**
   1293 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname.
   1294 * @srv:	Session
   1295 * @pathname:	Pathname buffer
   1296 * @len:	Length of sessname buffer
   1297 */
   1298int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname,
   1299			   size_t len)
   1300{
   1301	struct rtrs_srv_path *srv_path;
   1302	int err = -ENOTCONN;
   1303
   1304	mutex_lock(&srv->paths_mutex);
   1305	list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
   1306		if (srv_path->state != RTRS_SRV_CONNECTED)
   1307			continue;
   1308		strscpy(pathname, srv_path->s.sessname,
   1309			min_t(size_t, sizeof(srv_path->s.sessname), len));
   1310		err = 0;
   1311		break;
   1312	}
   1313	mutex_unlock(&srv->paths_mutex);
   1314
   1315	return err;
   1316}
   1317EXPORT_SYMBOL(rtrs_srv_get_path_name);
   1318
   1319/**
   1320 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
   1321 * @srv:	Session
   1322 */
   1323int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv)
   1324{
   1325	return srv->queue_depth;
   1326}
   1327EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
   1328
   1329static int find_next_bit_ring(struct rtrs_srv_path *srv_path)
   1330{
   1331	struct ib_device *ib_dev = srv_path->s.dev->ib_dev;
   1332	int v;
   1333
   1334	v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask);
   1335	if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
   1336		v = cpumask_first(&cq_affinity_mask);
   1337	return v;
   1338}
   1339
   1340static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path)
   1341{
   1342	srv_path->cur_cq_vector = find_next_bit_ring(srv_path);
   1343
   1344	return srv_path->cur_cq_vector;
   1345}
   1346
   1347static void rtrs_srv_dev_release(struct device *dev)
   1348{
   1349	struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess,
   1350						 dev);
   1351
   1352	kfree(srv);
   1353}
   1354
   1355static void free_srv(struct rtrs_srv_sess *srv)
   1356{
   1357	int i;
   1358
   1359	WARN_ON(refcount_read(&srv->refcount));
   1360	for (i = 0; i < srv->queue_depth; i++)
   1361		mempool_free(srv->chunks[i], chunk_pool);
   1362	kfree(srv->chunks);
   1363	mutex_destroy(&srv->paths_mutex);
   1364	mutex_destroy(&srv->paths_ev_mutex);
   1365	/* last put to release the srv structure */
   1366	put_device(&srv->dev);
   1367}
   1368
   1369static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx,
   1370					  const uuid_t *paths_uuid,
   1371					  bool first_conn)
   1372{
   1373	struct rtrs_srv_sess *srv;
   1374	int i;
   1375
   1376	mutex_lock(&ctx->srv_mutex);
   1377	list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
   1378		if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
   1379		    refcount_inc_not_zero(&srv->refcount)) {
   1380			mutex_unlock(&ctx->srv_mutex);
   1381			return srv;
   1382		}
   1383	}
   1384	mutex_unlock(&ctx->srv_mutex);
   1385	/*
   1386	 * If this request is not the first connection request from the
   1387	 * client for this session then fail and return error.
   1388	 */
   1389	if (!first_conn) {
   1390		pr_err_ratelimited("Error: Not the first connection request for this session\n");
   1391		return ERR_PTR(-ENXIO);
   1392	}
   1393
   1394	/* need to allocate a new srv */
   1395	srv = kzalloc(sizeof(*srv), GFP_KERNEL);
   1396	if  (!srv)
   1397		return ERR_PTR(-ENOMEM);
   1398
   1399	INIT_LIST_HEAD(&srv->paths_list);
   1400	mutex_init(&srv->paths_mutex);
   1401	mutex_init(&srv->paths_ev_mutex);
   1402	uuid_copy(&srv->paths_uuid, paths_uuid);
   1403	srv->queue_depth = sess_queue_depth;
   1404	srv->ctx = ctx;
   1405	device_initialize(&srv->dev);
   1406	srv->dev.release = rtrs_srv_dev_release;
   1407
   1408	srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
   1409			      GFP_KERNEL);
   1410	if (!srv->chunks)
   1411		goto err_free_srv;
   1412
   1413	for (i = 0; i < srv->queue_depth; i++) {
   1414		srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
   1415		if (!srv->chunks[i])
   1416			goto err_free_chunks;
   1417	}
   1418	refcount_set(&srv->refcount, 1);
   1419	mutex_lock(&ctx->srv_mutex);
   1420	list_add(&srv->ctx_list, &ctx->srv_list);
   1421	mutex_unlock(&ctx->srv_mutex);
   1422
   1423	return srv;
   1424
   1425err_free_chunks:
   1426	while (i--)
   1427		mempool_free(srv->chunks[i], chunk_pool);
   1428	kfree(srv->chunks);
   1429
   1430err_free_srv:
   1431	kfree(srv);
   1432	return ERR_PTR(-ENOMEM);
   1433}
   1434
   1435static void put_srv(struct rtrs_srv_sess *srv)
   1436{
   1437	if (refcount_dec_and_test(&srv->refcount)) {
   1438		struct rtrs_srv_ctx *ctx = srv->ctx;
   1439
   1440		WARN_ON(srv->dev.kobj.state_in_sysfs);
   1441
   1442		mutex_lock(&ctx->srv_mutex);
   1443		list_del(&srv->ctx_list);
   1444		mutex_unlock(&ctx->srv_mutex);
   1445		free_srv(srv);
   1446	}
   1447}
   1448
   1449static void __add_path_to_srv(struct rtrs_srv_sess *srv,
   1450			      struct rtrs_srv_path *srv_path)
   1451{
   1452	list_add_tail(&srv_path->s.entry, &srv->paths_list);
   1453	srv->paths_num++;
   1454	WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
   1455}
   1456
   1457static void del_path_from_srv(struct rtrs_srv_path *srv_path)
   1458{
   1459	struct rtrs_srv_sess *srv = srv_path->srv;
   1460
   1461	if (WARN_ON(!srv))
   1462		return;
   1463
   1464	mutex_lock(&srv->paths_mutex);
   1465	list_del(&srv_path->s.entry);
   1466	WARN_ON(!srv->paths_num);
   1467	srv->paths_num--;
   1468	mutex_unlock(&srv->paths_mutex);
   1469}
   1470
   1471/* return true if addresses are the same, error other wise */
   1472static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
   1473{
   1474	switch (a->sa_family) {
   1475	case AF_IB:
   1476		return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
   1477			      &((struct sockaddr_ib *)b)->sib_addr,
   1478			      sizeof(struct ib_addr)) &&
   1479			(b->sa_family == AF_IB);
   1480	case AF_INET:
   1481		return memcmp(&((struct sockaddr_in *)a)->sin_addr,
   1482			      &((struct sockaddr_in *)b)->sin_addr,
   1483			      sizeof(struct in_addr)) &&
   1484			(b->sa_family == AF_INET);
   1485	case AF_INET6:
   1486		return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
   1487			      &((struct sockaddr_in6 *)b)->sin6_addr,
   1488			      sizeof(struct in6_addr)) &&
   1489			(b->sa_family == AF_INET6);
   1490	default:
   1491		return -ENOENT;
   1492	}
   1493}
   1494
   1495static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv,
   1496				    struct rdma_addr *addr)
   1497{
   1498	struct rtrs_srv_path *srv_path;
   1499
   1500	list_for_each_entry(srv_path, &srv->paths_list, s.entry)
   1501		if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr,
   1502				  (struct sockaddr *)&addr->dst_addr) &&
   1503		    !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr,
   1504				  (struct sockaddr *)&addr->src_addr))
   1505			return true;
   1506
   1507	return false;
   1508}
   1509
   1510static void free_path(struct rtrs_srv_path *srv_path)
   1511{
   1512	if (srv_path->kobj.state_in_sysfs) {
   1513		kobject_del(&srv_path->kobj);
   1514		kobject_put(&srv_path->kobj);
   1515	} else {
   1516		kfree(srv_path->stats);
   1517		kfree(srv_path);
   1518	}
   1519}
   1520
   1521static void rtrs_srv_close_work(struct work_struct *work)
   1522{
   1523	struct rtrs_srv_path *srv_path;
   1524	struct rtrs_srv_con *con;
   1525	int i;
   1526
   1527	srv_path = container_of(work, typeof(*srv_path), close_work);
   1528
   1529	rtrs_srv_destroy_path_files(srv_path);
   1530	rtrs_srv_stop_hb(srv_path);
   1531
   1532	for (i = 0; i < srv_path->s.con_num; i++) {
   1533		if (!srv_path->s.con[i])
   1534			continue;
   1535		con = to_srv_con(srv_path->s.con[i]);
   1536		rdma_disconnect(con->c.cm_id);
   1537		ib_drain_qp(con->c.qp);
   1538	}
   1539
   1540	/*
   1541	 * Degrade ref count to the usual model with a single shared
   1542	 * atomic_t counter
   1543	 */
   1544	percpu_ref_kill(&srv_path->ids_inflight_ref);
   1545
   1546	/* Wait for all completion */
   1547	wait_for_completion(&srv_path->complete_done);
   1548
   1549	/* Notify upper layer if we are the last path */
   1550	rtrs_srv_path_down(srv_path);
   1551
   1552	unmap_cont_bufs(srv_path);
   1553	rtrs_srv_free_ops_ids(srv_path);
   1554
   1555	for (i = 0; i < srv_path->s.con_num; i++) {
   1556		if (!srv_path->s.con[i])
   1557			continue;
   1558		con = to_srv_con(srv_path->s.con[i]);
   1559		rtrs_cq_qp_destroy(&con->c);
   1560		rdma_destroy_id(con->c.cm_id);
   1561		kfree(con);
   1562	}
   1563	rtrs_ib_dev_put(srv_path->s.dev);
   1564
   1565	del_path_from_srv(srv_path);
   1566	put_srv(srv_path->srv);
   1567	srv_path->srv = NULL;
   1568	rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED);
   1569
   1570	kfree(srv_path->dma_addr);
   1571	kfree(srv_path->s.con);
   1572	free_path(srv_path);
   1573}
   1574
   1575static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path,
   1576			       struct rdma_cm_id *cm_id)
   1577{
   1578	struct rtrs_srv_sess *srv = srv_path->srv;
   1579	struct rtrs_msg_conn_rsp msg;
   1580	struct rdma_conn_param param;
   1581	int err;
   1582
   1583	param = (struct rdma_conn_param) {
   1584		.rnr_retry_count = 7,
   1585		.private_data = &msg,
   1586		.private_data_len = sizeof(msg),
   1587	};
   1588
   1589	msg = (struct rtrs_msg_conn_rsp) {
   1590		.magic = cpu_to_le16(RTRS_MAGIC),
   1591		.version = cpu_to_le16(RTRS_PROTO_VER),
   1592		.queue_depth = cpu_to_le16(srv->queue_depth),
   1593		.max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
   1594		.max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
   1595	};
   1596
   1597	if (always_invalidate)
   1598		msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
   1599
   1600	err = rdma_accept(cm_id, &param);
   1601	if (err)
   1602		pr_err("rdma_accept(), err: %d\n", err);
   1603
   1604	return err;
   1605}
   1606
   1607static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
   1608{
   1609	struct rtrs_msg_conn_rsp msg;
   1610	int err;
   1611
   1612	msg = (struct rtrs_msg_conn_rsp) {
   1613		.magic = cpu_to_le16(RTRS_MAGIC),
   1614		.version = cpu_to_le16(RTRS_PROTO_VER),
   1615		.errno = cpu_to_le16(errno),
   1616	};
   1617
   1618	err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
   1619	if (err)
   1620		pr_err("rdma_reject(), err: %d\n", err);
   1621
   1622	/* Bounce errno back */
   1623	return errno;
   1624}
   1625
   1626static struct rtrs_srv_path *
   1627__find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid)
   1628{
   1629	struct rtrs_srv_path *srv_path;
   1630
   1631	list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
   1632		if (uuid_equal(&srv_path->s.uuid, sess_uuid))
   1633			return srv_path;
   1634	}
   1635
   1636	return NULL;
   1637}
   1638
   1639static int create_con(struct rtrs_srv_path *srv_path,
   1640		      struct rdma_cm_id *cm_id,
   1641		      unsigned int cid)
   1642{
   1643	struct rtrs_srv_sess *srv = srv_path->srv;
   1644	struct rtrs_path *s = &srv_path->s;
   1645	struct rtrs_srv_con *con;
   1646
   1647	u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
   1648	int err, cq_vector;
   1649
   1650	con = kzalloc(sizeof(*con), GFP_KERNEL);
   1651	if (!con) {
   1652		err = -ENOMEM;
   1653		goto err;
   1654	}
   1655
   1656	spin_lock_init(&con->rsp_wr_wait_lock);
   1657	INIT_LIST_HEAD(&con->rsp_wr_wait_list);
   1658	con->c.cm_id = cm_id;
   1659	con->c.path = &srv_path->s;
   1660	con->c.cid = cid;
   1661	atomic_set(&con->c.wr_cnt, 1);
   1662	wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr;
   1663
   1664	if (con->c.cid == 0) {
   1665		/*
   1666		 * All receive and all send (each requiring invalidate)
   1667		 * + 2 for drain and heartbeat
   1668		 */
   1669		max_send_wr = min_t(int, wr_limit,
   1670				    SERVICE_CON_QUEUE_DEPTH * 2 + 2);
   1671		max_recv_wr = max_send_wr;
   1672		s->signal_interval = min_not_zero(srv->queue_depth,
   1673						  (size_t)SERVICE_CON_QUEUE_DEPTH);
   1674	} else {
   1675		/* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
   1676		if (always_invalidate)
   1677			max_send_wr =
   1678				min_t(int, wr_limit,
   1679				      srv->queue_depth * (1 + 4) + 1);
   1680		else
   1681			max_send_wr =
   1682				min_t(int, wr_limit,
   1683				      srv->queue_depth * (1 + 2) + 1);
   1684
   1685		max_recv_wr = srv->queue_depth + 1;
   1686		/*
   1687		 * If we have all receive requests posted and
   1688		 * all write requests posted and each read request
   1689		 * requires an invalidate request + drain
   1690		 * and qp gets into error state.
   1691		 */
   1692	}
   1693	cq_num = max_send_wr + max_recv_wr;
   1694	atomic_set(&con->c.sq_wr_avail, max_send_wr);
   1695	cq_vector = rtrs_srv_get_next_cq_vector(srv_path);
   1696
   1697	/* TODO: SOFTIRQ can be faster, but be careful with softirq context */
   1698	err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num,
   1699				 max_send_wr, max_recv_wr,
   1700				 IB_POLL_WORKQUEUE);
   1701	if (err) {
   1702		rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
   1703		goto free_con;
   1704	}
   1705	if (con->c.cid == 0) {
   1706		err = post_recv_info_req(con);
   1707		if (err)
   1708			goto free_cqqp;
   1709	}
   1710	WARN_ON(srv_path->s.con[cid]);
   1711	srv_path->s.con[cid] = &con->c;
   1712
   1713	/*
   1714	 * Change context from server to current connection.  The other
   1715	 * way is to use cm_id->qp->qp_context, which does not work on OFED.
   1716	 */
   1717	cm_id->context = &con->c;
   1718
   1719	return 0;
   1720
   1721free_cqqp:
   1722	rtrs_cq_qp_destroy(&con->c);
   1723free_con:
   1724	kfree(con);
   1725
   1726err:
   1727	return err;
   1728}
   1729
   1730static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv,
   1731					   struct rdma_cm_id *cm_id,
   1732					   unsigned int con_num,
   1733					   unsigned int recon_cnt,
   1734					   const uuid_t *uuid)
   1735{
   1736	struct rtrs_srv_path *srv_path;
   1737	int err = -ENOMEM;
   1738	char str[NAME_MAX];
   1739	struct rtrs_addr path;
   1740
   1741	if (srv->paths_num >= MAX_PATHS_NUM) {
   1742		err = -ECONNRESET;
   1743		goto err;
   1744	}
   1745	if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
   1746		err = -EEXIST;
   1747		pr_err("Path with same addr exists\n");
   1748		goto err;
   1749	}
   1750	srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL);
   1751	if (!srv_path)
   1752		goto err;
   1753
   1754	srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL);
   1755	if (!srv_path->stats)
   1756		goto err_free_sess;
   1757
   1758	srv_path->stats->srv_path = srv_path;
   1759
   1760	srv_path->dma_addr = kcalloc(srv->queue_depth,
   1761				     sizeof(*srv_path->dma_addr),
   1762				     GFP_KERNEL);
   1763	if (!srv_path->dma_addr)
   1764		goto err_free_stats;
   1765
   1766	srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con),
   1767				  GFP_KERNEL);
   1768	if (!srv_path->s.con)
   1769		goto err_free_dma_addr;
   1770
   1771	srv_path->state = RTRS_SRV_CONNECTING;
   1772	srv_path->srv = srv;
   1773	srv_path->cur_cq_vector = -1;
   1774	srv_path->s.dst_addr = cm_id->route.addr.dst_addr;
   1775	srv_path->s.src_addr = cm_id->route.addr.src_addr;
   1776
   1777	/* temporary until receiving session-name from client */
   1778	path.src = &srv_path->s.src_addr;
   1779	path.dst = &srv_path->s.dst_addr;
   1780	rtrs_addr_to_str(&path, str, sizeof(str));
   1781	strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname));
   1782
   1783	srv_path->s.con_num = con_num;
   1784	srv_path->s.irq_con_num = con_num;
   1785	srv_path->s.recon_cnt = recon_cnt;
   1786	uuid_copy(&srv_path->s.uuid, uuid);
   1787	spin_lock_init(&srv_path->state_lock);
   1788	INIT_WORK(&srv_path->close_work, rtrs_srv_close_work);
   1789	rtrs_srv_init_hb(srv_path);
   1790
   1791	srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
   1792	if (!srv_path->s.dev) {
   1793		err = -ENOMEM;
   1794		goto err_free_con;
   1795	}
   1796	err = map_cont_bufs(srv_path);
   1797	if (err)
   1798		goto err_put_dev;
   1799
   1800	err = rtrs_srv_alloc_ops_ids(srv_path);
   1801	if (err)
   1802		goto err_unmap_bufs;
   1803
   1804	__add_path_to_srv(srv, srv_path);
   1805
   1806	return srv_path;
   1807
   1808err_unmap_bufs:
   1809	unmap_cont_bufs(srv_path);
   1810err_put_dev:
   1811	rtrs_ib_dev_put(srv_path->s.dev);
   1812err_free_con:
   1813	kfree(srv_path->s.con);
   1814err_free_dma_addr:
   1815	kfree(srv_path->dma_addr);
   1816err_free_stats:
   1817	kfree(srv_path->stats);
   1818err_free_sess:
   1819	kfree(srv_path);
   1820err:
   1821	return ERR_PTR(err);
   1822}
   1823
   1824static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
   1825			      const struct rtrs_msg_conn_req *msg,
   1826			      size_t len)
   1827{
   1828	struct rtrs_srv_ctx *ctx = cm_id->context;
   1829	struct rtrs_srv_path *srv_path;
   1830	struct rtrs_srv_sess *srv;
   1831
   1832	u16 version, con_num, cid;
   1833	u16 recon_cnt;
   1834	int err = -ECONNRESET;
   1835
   1836	if (len < sizeof(*msg)) {
   1837		pr_err("Invalid RTRS connection request\n");
   1838		goto reject_w_err;
   1839	}
   1840	if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
   1841		pr_err("Invalid RTRS magic\n");
   1842		goto reject_w_err;
   1843	}
   1844	version = le16_to_cpu(msg->version);
   1845	if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
   1846		pr_err("Unsupported major RTRS version: %d, expected %d\n",
   1847		       version >> 8, RTRS_PROTO_VER_MAJOR);
   1848		goto reject_w_err;
   1849	}
   1850	con_num = le16_to_cpu(msg->cid_num);
   1851	if (con_num > 4096) {
   1852		/* Sanity check */
   1853		pr_err("Too many connections requested: %d\n", con_num);
   1854		goto reject_w_err;
   1855	}
   1856	cid = le16_to_cpu(msg->cid);
   1857	if (cid >= con_num) {
   1858		/* Sanity check */
   1859		pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
   1860		goto reject_w_err;
   1861	}
   1862	recon_cnt = le16_to_cpu(msg->recon_cnt);
   1863	srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
   1864	if (IS_ERR(srv)) {
   1865		err = PTR_ERR(srv);
   1866		pr_err("get_or_create_srv(), error %d\n", err);
   1867		goto reject_w_err;
   1868	}
   1869	mutex_lock(&srv->paths_mutex);
   1870	srv_path = __find_path(srv, &msg->sess_uuid);
   1871	if (srv_path) {
   1872		struct rtrs_path *s = &srv_path->s;
   1873
   1874		/* Session already holds a reference */
   1875		put_srv(srv);
   1876
   1877		if (srv_path->state != RTRS_SRV_CONNECTING) {
   1878			rtrs_err(s, "Session in wrong state: %s\n",
   1879				  rtrs_srv_state_str(srv_path->state));
   1880			mutex_unlock(&srv->paths_mutex);
   1881			goto reject_w_err;
   1882		}
   1883		/*
   1884		 * Sanity checks
   1885		 */
   1886		if (con_num != s->con_num || cid >= s->con_num) {
   1887			rtrs_err(s, "Incorrect request: %d, %d\n",
   1888				  cid, con_num);
   1889			mutex_unlock(&srv->paths_mutex);
   1890			goto reject_w_err;
   1891		}
   1892		if (s->con[cid]) {
   1893			rtrs_err(s, "Connection already exists: %d\n",
   1894				  cid);
   1895			mutex_unlock(&srv->paths_mutex);
   1896			goto reject_w_err;
   1897		}
   1898	} else {
   1899		srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt,
   1900				    &msg->sess_uuid);
   1901		if (IS_ERR(srv_path)) {
   1902			mutex_unlock(&srv->paths_mutex);
   1903			put_srv(srv);
   1904			err = PTR_ERR(srv_path);
   1905			pr_err("RTRS server session allocation failed: %d\n", err);
   1906			goto reject_w_err;
   1907		}
   1908	}
   1909	err = create_con(srv_path, cm_id, cid);
   1910	if (err) {
   1911		rtrs_err((&srv_path->s), "create_con(), error %d\n", err);
   1912		rtrs_rdma_do_reject(cm_id, err);
   1913		/*
   1914		 * Since session has other connections we follow normal way
   1915		 * through workqueue, but still return an error to tell cma.c
   1916		 * to call rdma_destroy_id() for current connection.
   1917		 */
   1918		goto close_and_return_err;
   1919	}
   1920	err = rtrs_rdma_do_accept(srv_path, cm_id);
   1921	if (err) {
   1922		rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err);
   1923		rtrs_rdma_do_reject(cm_id, err);
   1924		/*
   1925		 * Since current connection was successfully added to the
   1926		 * session we follow normal way through workqueue to close the
   1927		 * session, thus return 0 to tell cma.c we call
   1928		 * rdma_destroy_id() ourselves.
   1929		 */
   1930		err = 0;
   1931		goto close_and_return_err;
   1932	}
   1933	mutex_unlock(&srv->paths_mutex);
   1934
   1935	return 0;
   1936
   1937reject_w_err:
   1938	return rtrs_rdma_do_reject(cm_id, err);
   1939
   1940close_and_return_err:
   1941	mutex_unlock(&srv->paths_mutex);
   1942	close_path(srv_path);
   1943
   1944	return err;
   1945}
   1946
   1947static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
   1948				     struct rdma_cm_event *ev)
   1949{
   1950	struct rtrs_srv_path *srv_path = NULL;
   1951	struct rtrs_path *s = NULL;
   1952
   1953	if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
   1954		struct rtrs_con *c = cm_id->context;
   1955
   1956		s = c->path;
   1957		srv_path = to_srv_path(s);
   1958	}
   1959
   1960	switch (ev->event) {
   1961	case RDMA_CM_EVENT_CONNECT_REQUEST:
   1962		/*
   1963		 * In case of error cma.c will destroy cm_id,
   1964		 * see cma_process_remove()
   1965		 */
   1966		return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
   1967					  ev->param.conn.private_data_len);
   1968	case RDMA_CM_EVENT_ESTABLISHED:
   1969		/* Nothing here */
   1970		break;
   1971	case RDMA_CM_EVENT_REJECTED:
   1972	case RDMA_CM_EVENT_CONNECT_ERROR:
   1973	case RDMA_CM_EVENT_UNREACHABLE:
   1974		rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
   1975			  rdma_event_msg(ev->event), ev->status);
   1976		fallthrough;
   1977	case RDMA_CM_EVENT_DISCONNECTED:
   1978	case RDMA_CM_EVENT_ADDR_CHANGE:
   1979	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
   1980	case RDMA_CM_EVENT_DEVICE_REMOVAL:
   1981		close_path(srv_path);
   1982		break;
   1983	default:
   1984		pr_err("Ignoring unexpected CM event %s, err %d\n",
   1985		       rdma_event_msg(ev->event), ev->status);
   1986		break;
   1987	}
   1988
   1989	return 0;
   1990}
   1991
   1992static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
   1993					    struct sockaddr *addr,
   1994					    enum rdma_ucm_port_space ps)
   1995{
   1996	struct rdma_cm_id *cm_id;
   1997	int ret;
   1998
   1999	cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
   2000			       ctx, ps, IB_QPT_RC);
   2001	if (IS_ERR(cm_id)) {
   2002		ret = PTR_ERR(cm_id);
   2003		pr_err("Creating id for RDMA connection failed, err: %d\n",
   2004		       ret);
   2005		goto err_out;
   2006	}
   2007	ret = rdma_bind_addr(cm_id, addr);
   2008	if (ret) {
   2009		pr_err("Binding RDMA address failed, err: %d\n", ret);
   2010		goto err_cm;
   2011	}
   2012	ret = rdma_listen(cm_id, 64);
   2013	if (ret) {
   2014		pr_err("Listening on RDMA connection failed, err: %d\n",
   2015		       ret);
   2016		goto err_cm;
   2017	}
   2018
   2019	return cm_id;
   2020
   2021err_cm:
   2022	rdma_destroy_id(cm_id);
   2023err_out:
   2024
   2025	return ERR_PTR(ret);
   2026}
   2027
   2028static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
   2029{
   2030	struct sockaddr_in6 sin = {
   2031		.sin6_family	= AF_INET6,
   2032		.sin6_addr	= IN6ADDR_ANY_INIT,
   2033		.sin6_port	= htons(port),
   2034	};
   2035	struct sockaddr_ib sib = {
   2036		.sib_family			= AF_IB,
   2037		.sib_sid	= cpu_to_be64(RDMA_IB_IP_PS_IB | port),
   2038		.sib_sid_mask	= cpu_to_be64(0xffffffffffffffffULL),
   2039		.sib_pkey	= cpu_to_be16(0xffff),
   2040	};
   2041	struct rdma_cm_id *cm_ip, *cm_ib;
   2042	int ret;
   2043
   2044	/*
   2045	 * We accept both IPoIB and IB connections, so we need to keep
   2046	 * two cm id's, one for each socket type and port space.
   2047	 * If the cm initialization of one of the id's fails, we abort
   2048	 * everything.
   2049	 */
   2050	cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
   2051	if (IS_ERR(cm_ip))
   2052		return PTR_ERR(cm_ip);
   2053
   2054	cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
   2055	if (IS_ERR(cm_ib)) {
   2056		ret = PTR_ERR(cm_ib);
   2057		goto free_cm_ip;
   2058	}
   2059
   2060	ctx->cm_id_ip = cm_ip;
   2061	ctx->cm_id_ib = cm_ib;
   2062
   2063	return 0;
   2064
   2065free_cm_ip:
   2066	rdma_destroy_id(cm_ip);
   2067
   2068	return ret;
   2069}
   2070
   2071static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
   2072{
   2073	struct rtrs_srv_ctx *ctx;
   2074
   2075	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
   2076	if (!ctx)
   2077		return NULL;
   2078
   2079	ctx->ops = *ops;
   2080	mutex_init(&ctx->srv_mutex);
   2081	INIT_LIST_HEAD(&ctx->srv_list);
   2082
   2083	return ctx;
   2084}
   2085
   2086static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
   2087{
   2088	WARN_ON(!list_empty(&ctx->srv_list));
   2089	mutex_destroy(&ctx->srv_mutex);
   2090	kfree(ctx);
   2091}
   2092
   2093static int rtrs_srv_add_one(struct ib_device *device)
   2094{
   2095	struct rtrs_srv_ctx *ctx;
   2096	int ret = 0;
   2097
   2098	mutex_lock(&ib_ctx.ib_dev_mutex);
   2099	if (ib_ctx.ib_dev_count)
   2100		goto out;
   2101
   2102	/*
   2103	 * Since our CM IDs are NOT bound to any ib device we will create them
   2104	 * only once
   2105	 */
   2106	ctx = ib_ctx.srv_ctx;
   2107	ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
   2108	if (ret) {
   2109		/*
   2110		 * We errored out here.
   2111		 * According to the ib code, if we encounter an error here then the
   2112		 * error code is ignored, and no more calls to our ops are made.
   2113		 */
   2114		pr_err("Failed to initialize RDMA connection");
   2115		goto err_out;
   2116	}
   2117
   2118out:
   2119	/*
   2120	 * Keep a track on the number of ib devices added
   2121	 */
   2122	ib_ctx.ib_dev_count++;
   2123
   2124err_out:
   2125	mutex_unlock(&ib_ctx.ib_dev_mutex);
   2126	return ret;
   2127}
   2128
   2129static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
   2130{
   2131	struct rtrs_srv_ctx *ctx;
   2132
   2133	mutex_lock(&ib_ctx.ib_dev_mutex);
   2134	ib_ctx.ib_dev_count--;
   2135
   2136	if (ib_ctx.ib_dev_count)
   2137		goto out;
   2138
   2139	/*
   2140	 * Since our CM IDs are NOT bound to any ib device we will remove them
   2141	 * only once, when the last device is removed
   2142	 */
   2143	ctx = ib_ctx.srv_ctx;
   2144	rdma_destroy_id(ctx->cm_id_ip);
   2145	rdma_destroy_id(ctx->cm_id_ib);
   2146
   2147out:
   2148	mutex_unlock(&ib_ctx.ib_dev_mutex);
   2149}
   2150
   2151static struct ib_client rtrs_srv_client = {
   2152	.name	= "rtrs_server",
   2153	.add	= rtrs_srv_add_one,
   2154	.remove	= rtrs_srv_remove_one
   2155};
   2156
   2157/**
   2158 * rtrs_srv_open() - open RTRS server context
   2159 * @ops:		callback functions
   2160 * @port:               port to listen on
   2161 *
   2162 * Creates server context with specified callbacks.
   2163 *
   2164 * Return a valid pointer on success otherwise PTR_ERR.
   2165 */
   2166struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
   2167{
   2168	struct rtrs_srv_ctx *ctx;
   2169	int err;
   2170
   2171	ctx = alloc_srv_ctx(ops);
   2172	if (!ctx)
   2173		return ERR_PTR(-ENOMEM);
   2174
   2175	mutex_init(&ib_ctx.ib_dev_mutex);
   2176	ib_ctx.srv_ctx = ctx;
   2177	ib_ctx.port = port;
   2178
   2179	err = ib_register_client(&rtrs_srv_client);
   2180	if (err) {
   2181		free_srv_ctx(ctx);
   2182		return ERR_PTR(err);
   2183	}
   2184
   2185	return ctx;
   2186}
   2187EXPORT_SYMBOL(rtrs_srv_open);
   2188
   2189static void close_paths(struct rtrs_srv_sess *srv)
   2190{
   2191	struct rtrs_srv_path *srv_path;
   2192
   2193	mutex_lock(&srv->paths_mutex);
   2194	list_for_each_entry(srv_path, &srv->paths_list, s.entry)
   2195		close_path(srv_path);
   2196	mutex_unlock(&srv->paths_mutex);
   2197}
   2198
   2199static void close_ctx(struct rtrs_srv_ctx *ctx)
   2200{
   2201	struct rtrs_srv_sess *srv;
   2202
   2203	mutex_lock(&ctx->srv_mutex);
   2204	list_for_each_entry(srv, &ctx->srv_list, ctx_list)
   2205		close_paths(srv);
   2206	mutex_unlock(&ctx->srv_mutex);
   2207	flush_workqueue(rtrs_wq);
   2208}
   2209
   2210/**
   2211 * rtrs_srv_close() - close RTRS server context
   2212 * @ctx: pointer to server context
   2213 *
   2214 * Closes RTRS server context with all client sessions.
   2215 */
   2216void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
   2217{
   2218	ib_unregister_client(&rtrs_srv_client);
   2219	mutex_destroy(&ib_ctx.ib_dev_mutex);
   2220	close_ctx(ctx);
   2221	free_srv_ctx(ctx);
   2222}
   2223EXPORT_SYMBOL(rtrs_srv_close);
   2224
   2225static int check_module_params(void)
   2226{
   2227	if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
   2228		pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
   2229		       sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
   2230		return -EINVAL;
   2231	}
   2232	if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
   2233		pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
   2234		       max_chunk_size, MIN_CHUNK_SIZE);
   2235		return -EINVAL;
   2236	}
   2237
   2238	/*
   2239	 * Check if IB immediate data size is enough to hold the mem_id and the
   2240	 * offset inside the memory chunk
   2241	 */
   2242	if ((ilog2(sess_queue_depth - 1) + 1) +
   2243	    (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
   2244		pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
   2245		       MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
   2246		return -EINVAL;
   2247	}
   2248
   2249	return 0;
   2250}
   2251
   2252static int __init rtrs_server_init(void)
   2253{
   2254	int err;
   2255
   2256	pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
   2257		KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
   2258		max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
   2259		sess_queue_depth, always_invalidate);
   2260
   2261	rtrs_rdma_dev_pd_init(0, &dev_pd);
   2262
   2263	err = check_module_params();
   2264	if (err) {
   2265		pr_err("Failed to load module, invalid module parameters, err: %d\n",
   2266		       err);
   2267		return err;
   2268	}
   2269	chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
   2270					      get_order(max_chunk_size));
   2271	if (!chunk_pool)
   2272		return -ENOMEM;
   2273	rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
   2274	if (IS_ERR(rtrs_dev_class)) {
   2275		err = PTR_ERR(rtrs_dev_class);
   2276		goto out_chunk_pool;
   2277	}
   2278	rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
   2279	if (!rtrs_wq) {
   2280		err = -ENOMEM;
   2281		goto out_dev_class;
   2282	}
   2283
   2284	return 0;
   2285
   2286out_dev_class:
   2287	class_destroy(rtrs_dev_class);
   2288out_chunk_pool:
   2289	mempool_destroy(chunk_pool);
   2290
   2291	return err;
   2292}
   2293
   2294static void __exit rtrs_server_exit(void)
   2295{
   2296	destroy_workqueue(rtrs_wq);
   2297	class_destroy(rtrs_dev_class);
   2298	mempool_destroy(chunk_pool);
   2299	rtrs_rdma_dev_pd_deinit(&dev_pd);
   2300}
   2301
   2302module_init(rtrs_server_init);
   2303module_exit(rtrs_server_exit);