cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

rnbd-clt.c (46587B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * RDMA Network Block Driver
      4 *
      5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
      6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
      7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
      8 */
      9
     10#undef pr_fmt
     11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
     12
     13#include <linux/module.h>
     14#include <linux/blkdev.h>
     15#include <linux/hdreg.h>
     16#include <linux/scatterlist.h>
     17#include <linux/idr.h>
     18
     19#include "rnbd-clt.h"
     20
     21MODULE_DESCRIPTION("RDMA Network Block Device Client");
     22MODULE_LICENSE("GPL");
     23
     24static int rnbd_client_major;
     25static DEFINE_IDA(index_ida);
     26static DEFINE_MUTEX(sess_lock);
     27static LIST_HEAD(sess_list);
     28static struct workqueue_struct *rnbd_clt_wq;
     29
     30/*
     31 * Maximum number of partitions an instance can have.
     32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
     33 */
     34#define RNBD_PART_BITS		6
     35
     36static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
     37{
     38	return refcount_inc_not_zero(&sess->refcount);
     39}
     40
     41static void free_sess(struct rnbd_clt_session *sess);
     42
     43static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
     44{
     45	might_sleep();
     46
     47	if (refcount_dec_and_test(&sess->refcount))
     48		free_sess(sess);
     49}
     50
     51static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
     52{
     53	might_sleep();
     54
     55	if (!refcount_dec_and_test(&dev->refcount))
     56		return;
     57
     58	ida_free(&index_ida, dev->clt_device_id);
     59	kfree(dev->hw_queues);
     60	kfree(dev->pathname);
     61	rnbd_clt_put_sess(dev->sess);
     62	mutex_destroy(&dev->lock);
     63	kfree(dev);
     64}
     65
     66static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
     67{
     68	return refcount_inc_not_zero(&dev->refcount);
     69}
     70
     71static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
     72				 const struct rnbd_msg_open_rsp *rsp)
     73{
     74	struct rnbd_clt_session *sess = dev->sess;
     75
     76	if (!rsp->logical_block_size)
     77		return -EINVAL;
     78
     79	dev->device_id		    = le32_to_cpu(rsp->device_id);
     80	dev->nsectors		    = le64_to_cpu(rsp->nsectors);
     81	dev->logical_block_size	    = le16_to_cpu(rsp->logical_block_size);
     82	dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
     83	dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
     84	dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
     85	dev->discard_alignment	    = le32_to_cpu(rsp->discard_alignment);
     86	dev->secure_discard	    = le16_to_cpu(rsp->secure_discard);
     87	dev->wc			    = !!(rsp->cache_policy & RNBD_WRITEBACK);
     88	dev->fua		    = !!(rsp->cache_policy & RNBD_FUA);
     89
     90	dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
     91	dev->max_segments = sess->max_segments;
     92
     93	return 0;
     94}
     95
     96static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
     97				    size_t new_nsectors)
     98{
     99	rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
    100		       dev->nsectors, new_nsectors);
    101	dev->nsectors = new_nsectors;
    102	set_capacity_and_notify(dev->gd, dev->nsectors);
    103	return 0;
    104}
    105
    106static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
    107				struct rnbd_msg_open_rsp *rsp)
    108{
    109	struct kobject *gd_kobj;
    110	int err = 0;
    111
    112	mutex_lock(&dev->lock);
    113	if (dev->dev_state == DEV_STATE_UNMAPPED) {
    114		rnbd_clt_info(dev,
    115			       "Ignoring Open-Response message from server for  unmapped device\n");
    116		err = -ENOENT;
    117		goto out;
    118	}
    119	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
    120		u64 nsectors = le64_to_cpu(rsp->nsectors);
    121
    122		/*
    123		 * If the device was remapped and the size changed in the
    124		 * meantime we need to revalidate it
    125		 */
    126		if (dev->nsectors != nsectors)
    127			rnbd_clt_change_capacity(dev, nsectors);
    128		gd_kobj = &disk_to_dev(dev->gd)->kobj;
    129		kobject_uevent(gd_kobj, KOBJ_ONLINE);
    130		rnbd_clt_info(dev, "Device online, device remapped successfully\n");
    131	}
    132	err = rnbd_clt_set_dev_attr(dev, rsp);
    133	if (err)
    134		goto out;
    135	dev->dev_state = DEV_STATE_MAPPED;
    136
    137out:
    138	mutex_unlock(&dev->lock);
    139
    140	return err;
    141}
    142
    143int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
    144{
    145	int ret = 0;
    146
    147	mutex_lock(&dev->lock);
    148	if (dev->dev_state != DEV_STATE_MAPPED) {
    149		pr_err("Failed to set new size of the device, device is not opened\n");
    150		ret = -ENOENT;
    151		goto out;
    152	}
    153	ret = rnbd_clt_change_capacity(dev, newsize);
    154
    155out:
    156	mutex_unlock(&dev->lock);
    157
    158	return ret;
    159}
    160
    161static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
    162{
    163	if (WARN_ON(!q->hctx))
    164		return;
    165
    166	/* We can come here from interrupt, thus async=true */
    167	blk_mq_run_hw_queue(q->hctx, true);
    168}
    169
    170enum {
    171	RNBD_DELAY_IFBUSY = -1,
    172};
    173
    174/**
    175 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
    176 * @sess:	Session to find a queue for
    177 * @cpu:	Cpu to start the search from
    178 *
    179 * Description:
    180 *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
    181 *     is not empty - it is marked with a bit.  This function finds first
    182 *     set bit in a bitmap and returns corresponding CPU list.
    183 */
    184static struct rnbd_cpu_qlist *
    185rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
    186{
    187	int bit;
    188
    189	/* Search from cpu to nr_cpu_ids */
    190	bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
    191	if (bit < nr_cpu_ids) {
    192		return per_cpu_ptr(sess->cpu_queues, bit);
    193	} else if (cpu != 0) {
    194		/* Search from 0 to cpu */
    195		bit = find_first_bit(sess->cpu_queues_bm, cpu);
    196		if (bit < cpu)
    197			return per_cpu_ptr(sess->cpu_queues, bit);
    198	}
    199
    200	return NULL;
    201}
    202
    203static inline int nxt_cpu(int cpu)
    204{
    205	return (cpu + 1) % nr_cpu_ids;
    206}
    207
    208/**
    209 * rnbd_rerun_if_needed() - rerun next queue marked as stopped
    210 * @sess:	Session to rerun a queue on
    211 *
    212 * Description:
    213 *     Each CPU has it's own list of HW queues, which should be rerun.
    214 *     Function finds such list with HW queues, takes a list lock, picks up
    215 *     the first HW queue out of the list and requeues it.
    216 *
    217 * Return:
    218 *     True if the queue was requeued, false otherwise.
    219 *
    220 * Context:
    221 *     Does not matter.
    222 */
    223static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
    224{
    225	struct rnbd_queue *q = NULL;
    226	struct rnbd_cpu_qlist *cpu_q;
    227	unsigned long flags;
    228	int *cpup;
    229
    230	/*
    231	 * To keep fairness and not to let other queues starve we always
    232	 * try to wake up someone else in round-robin manner.  That of course
    233	 * increases latency but queues always have a chance to be executed.
    234	 */
    235	cpup = get_cpu_ptr(sess->cpu_rr);
    236	for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
    237	     cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
    238		if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
    239			continue;
    240		if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm))
    241			goto unlock;
    242		q = list_first_entry_or_null(&cpu_q->requeue_list,
    243					     typeof(*q), requeue_list);
    244		if (WARN_ON(!q))
    245			goto clear_bit;
    246		list_del_init(&q->requeue_list);
    247		clear_bit_unlock(0, &q->in_list);
    248
    249		if (list_empty(&cpu_q->requeue_list)) {
    250			/* Clear bit if nothing is left */
    251clear_bit:
    252			clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
    253		}
    254unlock:
    255		spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
    256
    257		if (q)
    258			break;
    259	}
    260
    261	/**
    262	 * Saves the CPU that is going to be requeued on the per-cpu var. Just
    263	 * incrementing it doesn't work because rnbd_get_cpu_qlist() will
    264	 * always return the first CPU with something on the queue list when the
    265	 * value stored on the var is greater than the last CPU with something
    266	 * on the list.
    267	 */
    268	if (cpu_q)
    269		*cpup = cpu_q->cpu;
    270	put_cpu_ptr(sess->cpu_rr);
    271
    272	if (q)
    273		rnbd_clt_dev_requeue(q);
    274
    275	return q;
    276}
    277
    278/**
    279 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
    280 *				 session is idling (there are no requests
    281 *				 in-flight).
    282 * @sess:	Session to rerun the queues on
    283 *
    284 * Description:
    285 *     This function tries to rerun all stopped queues if there are no
    286 *     requests in-flight anymore.  This function tries to solve an obvious
    287 *     problem, when number of tags < than number of queues (hctx), which
    288 *     are stopped and put to sleep.  If last permit, which has been just put,
    289 *     does not wake up all left queues (hctxs), IO requests hang forever.
    290 *
    291 *     That can happen when all number of permits, say N, have been exhausted
    292 *     from one CPU, and we have many block devices per session, say M.
    293 *     Each block device has it's own queue (hctx) for each CPU, so eventually
    294 *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
    295 *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
    296 *
    297 *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
    298 *     one who observes sess->busy == 0) must wake up all remaining queues.
    299 *
    300 * Context:
    301 *     Does not matter.
    302 */
    303static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
    304{
    305	bool requeued;
    306
    307	do {
    308		requeued = rnbd_rerun_if_needed(sess);
    309	} while (atomic_read(&sess->busy) == 0 && requeued);
    310}
    311
    312static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
    313					     enum rtrs_clt_con_type con_type,
    314					     enum wait_type wait)
    315{
    316	struct rtrs_permit *permit;
    317
    318	permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait);
    319	if (permit)
    320		/* We have a subtle rare case here, when all permits can be
    321		 * consumed before busy counter increased.  This is safe,
    322		 * because loser will get NULL as a permit, observe 0 busy
    323		 * counter and immediately restart the queue himself.
    324		 */
    325		atomic_inc(&sess->busy);
    326
    327	return permit;
    328}
    329
    330static void rnbd_put_permit(struct rnbd_clt_session *sess,
    331			     struct rtrs_permit *permit)
    332{
    333	rtrs_clt_put_permit(sess->rtrs, permit);
    334	atomic_dec(&sess->busy);
    335	/* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
    336	 * and then check queue bits.
    337	 */
    338	smp_mb__after_atomic();
    339	rnbd_rerun_all_if_idle(sess);
    340}
    341
    342static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
    343				     enum rtrs_clt_con_type con_type,
    344				     enum wait_type wait)
    345{
    346	struct rnbd_iu *iu;
    347	struct rtrs_permit *permit;
    348
    349	iu = kzalloc(sizeof(*iu), GFP_KERNEL);
    350	if (!iu)
    351		return NULL;
    352
    353	permit = rnbd_get_permit(sess, con_type, wait);
    354	if (!permit) {
    355		kfree(iu);
    356		return NULL;
    357	}
    358
    359	iu->permit = permit;
    360	/*
    361	 * 1st reference is dropped after finishing sending a "user" message,
    362	 * 2nd reference is dropped after confirmation with the response is
    363	 * returned.
    364	 * 1st and 2nd can happen in any order, so the rnbd_iu should be
    365	 * released (rtrs_permit returned to rtrs) only after both
    366	 * are finished.
    367	 */
    368	atomic_set(&iu->refcount, 2);
    369	init_waitqueue_head(&iu->comp.wait);
    370	iu->comp.errno = INT_MAX;
    371
    372	if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) {
    373		rnbd_put_permit(sess, permit);
    374		kfree(iu);
    375		return NULL;
    376	}
    377
    378	return iu;
    379}
    380
    381static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
    382{
    383	if (atomic_dec_and_test(&iu->refcount)) {
    384		sg_free_table(&iu->sgt);
    385		rnbd_put_permit(sess, iu->permit);
    386		kfree(iu);
    387	}
    388}
    389
    390static void rnbd_softirq_done_fn(struct request *rq)
    391{
    392	struct rnbd_clt_dev *dev	= rq->q->disk->private_data;
    393	struct rnbd_clt_session *sess	= dev->sess;
    394	struct rnbd_iu *iu;
    395
    396	iu = blk_mq_rq_to_pdu(rq);
    397	sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
    398	rnbd_put_permit(sess, iu->permit);
    399	blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
    400}
    401
    402static void msg_io_conf(void *priv, int errno)
    403{
    404	struct rnbd_iu *iu = priv;
    405	struct rnbd_clt_dev *dev = iu->dev;
    406	struct request *rq = iu->rq;
    407	int rw = rq_data_dir(rq);
    408
    409	iu->errno = errno;
    410
    411	blk_mq_complete_request(rq);
    412
    413	if (errno)
    414		rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
    415				 rw == READ ? "read" : "write", errno);
    416}
    417
    418static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
    419{
    420	iu->comp.errno = errno;
    421	wake_up(&iu->comp.wait);
    422}
    423
    424static void msg_conf(void *priv, int errno)
    425{
    426	struct rnbd_iu *iu = priv;
    427
    428	iu->errno = errno;
    429	schedule_work(&iu->work);
    430}
    431
    432static int send_usr_msg(struct rtrs_clt_sess *rtrs, int dir,
    433			struct rnbd_iu *iu, struct kvec *vec,
    434			size_t len, struct scatterlist *sg, unsigned int sg_len,
    435			void (*conf)(struct work_struct *work),
    436			int *errno, int wait)
    437{
    438	int err;
    439	struct rtrs_clt_req_ops req_ops;
    440
    441	INIT_WORK(&iu->work, conf);
    442	req_ops = (struct rtrs_clt_req_ops) {
    443		.priv = iu,
    444		.conf_fn = msg_conf,
    445	};
    446	err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
    447				vec, 1, len, sg, sg_len);
    448	if (!err && wait) {
    449		wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
    450		*errno = iu->comp.errno;
    451	} else {
    452		*errno = 0;
    453	}
    454
    455	return err;
    456}
    457
    458static void msg_close_conf(struct work_struct *work)
    459{
    460	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
    461	struct rnbd_clt_dev *dev = iu->dev;
    462
    463	wake_up_iu_comp(iu, iu->errno);
    464	rnbd_put_iu(dev->sess, iu);
    465	rnbd_clt_put_dev(dev);
    466}
    467
    468static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id,
    469			  enum wait_type wait)
    470{
    471	struct rnbd_clt_session *sess = dev->sess;
    472	struct rnbd_msg_close msg;
    473	struct rnbd_iu *iu;
    474	struct kvec vec = {
    475		.iov_base = &msg,
    476		.iov_len  = sizeof(msg)
    477	};
    478	int err, errno;
    479
    480	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
    481	if (!iu)
    482		return -ENOMEM;
    483
    484	iu->buf = NULL;
    485	iu->dev = dev;
    486
    487	msg.hdr.type	= cpu_to_le16(RNBD_MSG_CLOSE);
    488	msg.device_id	= cpu_to_le32(device_id);
    489
    490	WARN_ON(!rnbd_clt_get_dev(dev));
    491	err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
    492			   msg_close_conf, &errno, wait);
    493	if (err) {
    494		rnbd_clt_put_dev(dev);
    495		rnbd_put_iu(sess, iu);
    496	} else {
    497		err = errno;
    498	}
    499
    500	rnbd_put_iu(sess, iu);
    501	return err;
    502}
    503
    504static void msg_open_conf(struct work_struct *work)
    505{
    506	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
    507	struct rnbd_msg_open_rsp *rsp = iu->buf;
    508	struct rnbd_clt_dev *dev = iu->dev;
    509	int errno = iu->errno;
    510
    511	if (errno) {
    512		rnbd_clt_err(dev,
    513			      "Opening failed, server responded: %d\n",
    514			      errno);
    515	} else {
    516		errno = process_msg_open_rsp(dev, rsp);
    517		if (errno) {
    518			u32 device_id = le32_to_cpu(rsp->device_id);
    519			/*
    520			 * If server thinks its fine, but we fail to process
    521			 * then be nice and send a close to server.
    522			 */
    523			send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT);
    524		}
    525	}
    526	kfree(rsp);
    527	wake_up_iu_comp(iu, errno);
    528	rnbd_put_iu(dev->sess, iu);
    529	rnbd_clt_put_dev(dev);
    530}
    531
    532static void msg_sess_info_conf(struct work_struct *work)
    533{
    534	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
    535	struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
    536	struct rnbd_clt_session *sess = iu->sess;
    537
    538	if (!iu->errno)
    539		sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
    540
    541	kfree(rsp);
    542	wake_up_iu_comp(iu, iu->errno);
    543	rnbd_put_iu(sess, iu);
    544	rnbd_clt_put_sess(sess);
    545}
    546
    547static int send_msg_open(struct rnbd_clt_dev *dev, enum wait_type wait)
    548{
    549	struct rnbd_clt_session *sess = dev->sess;
    550	struct rnbd_msg_open_rsp *rsp;
    551	struct rnbd_msg_open msg;
    552	struct rnbd_iu *iu;
    553	struct kvec vec = {
    554		.iov_base = &msg,
    555		.iov_len  = sizeof(msg)
    556	};
    557	int err, errno;
    558
    559	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
    560	if (!rsp)
    561		return -ENOMEM;
    562
    563	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
    564	if (!iu) {
    565		kfree(rsp);
    566		return -ENOMEM;
    567	}
    568
    569	iu->buf = rsp;
    570	iu->dev = dev;
    571
    572	sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
    573
    574	msg.hdr.type	= cpu_to_le16(RNBD_MSG_OPEN);
    575	msg.access_mode	= dev->access_mode;
    576	strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
    577
    578	WARN_ON(!rnbd_clt_get_dev(dev));
    579	err = send_usr_msg(sess->rtrs, READ, iu,
    580			   &vec, sizeof(*rsp), iu->sgt.sgl, 1,
    581			   msg_open_conf, &errno, wait);
    582	if (err) {
    583		rnbd_clt_put_dev(dev);
    584		rnbd_put_iu(sess, iu);
    585		kfree(rsp);
    586	} else {
    587		err = errno;
    588	}
    589
    590	rnbd_put_iu(sess, iu);
    591	return err;
    592}
    593
    594static int send_msg_sess_info(struct rnbd_clt_session *sess, enum wait_type wait)
    595{
    596	struct rnbd_msg_sess_info_rsp *rsp;
    597	struct rnbd_msg_sess_info msg;
    598	struct rnbd_iu *iu;
    599	struct kvec vec = {
    600		.iov_base = &msg,
    601		.iov_len  = sizeof(msg)
    602	};
    603	int err, errno;
    604
    605	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
    606	if (!rsp)
    607		return -ENOMEM;
    608
    609	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
    610	if (!iu) {
    611		kfree(rsp);
    612		return -ENOMEM;
    613	}
    614
    615	iu->buf = rsp;
    616	iu->sess = sess;
    617	sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
    618
    619	msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
    620	msg.ver      = RNBD_PROTO_VER_MAJOR;
    621
    622	if (!rnbd_clt_get_sess(sess)) {
    623		/*
    624		 * That can happen only in one case, when RTRS has restablished
    625		 * the connection and link_ev() is called, but session is almost
    626		 * dead, last reference on session is put and caller is waiting
    627		 * for RTRS to close everything.
    628		 */
    629		err = -ENODEV;
    630		goto put_iu;
    631	}
    632	err = send_usr_msg(sess->rtrs, READ, iu,
    633			   &vec, sizeof(*rsp), iu->sgt.sgl, 1,
    634			   msg_sess_info_conf, &errno, wait);
    635	if (err) {
    636		rnbd_clt_put_sess(sess);
    637put_iu:
    638		rnbd_put_iu(sess, iu);
    639		kfree(rsp);
    640	} else {
    641		err = errno;
    642	}
    643	rnbd_put_iu(sess, iu);
    644	return err;
    645}
    646
    647static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
    648{
    649	struct rnbd_clt_dev *dev;
    650	struct kobject *gd_kobj;
    651
    652	mutex_lock(&sess->lock);
    653	list_for_each_entry(dev, &sess->devs_list, list) {
    654		rnbd_clt_err(dev, "Device disconnected.\n");
    655
    656		mutex_lock(&dev->lock);
    657		if (dev->dev_state == DEV_STATE_MAPPED) {
    658			dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
    659			gd_kobj = &disk_to_dev(dev->gd)->kobj;
    660			kobject_uevent(gd_kobj, KOBJ_OFFLINE);
    661		}
    662		mutex_unlock(&dev->lock);
    663	}
    664	mutex_unlock(&sess->lock);
    665}
    666
    667static void remap_devs(struct rnbd_clt_session *sess)
    668{
    669	struct rnbd_clt_dev *dev;
    670	struct rtrs_attrs attrs;
    671	int err;
    672
    673	/*
    674	 * Careful here: we are called from RTRS link event directly,
    675	 * thus we can't send any RTRS request and wait for response
    676	 * or RTRS will not be able to complete request with failure
    677	 * if something goes wrong (failing of outstanding requests
    678	 * happens exactly from the context where we are blocking now).
    679	 *
    680	 * So to avoid deadlocks each usr message sent from here must
    681	 * be asynchronous.
    682	 */
    683
    684	err = send_msg_sess_info(sess, RTRS_PERMIT_NOWAIT);
    685	if (err) {
    686		pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
    687		return;
    688	}
    689
    690	err = rtrs_clt_query(sess->rtrs, &attrs);
    691	if (err) {
    692		pr_err("rtrs_clt_query(\"%s\"): %d\n", sess->sessname, err);
    693		return;
    694	}
    695	mutex_lock(&sess->lock);
    696	sess->max_io_size = attrs.max_io_size;
    697
    698	list_for_each_entry(dev, &sess->devs_list, list) {
    699		bool skip;
    700
    701		mutex_lock(&dev->lock);
    702		skip = (dev->dev_state == DEV_STATE_INIT);
    703		mutex_unlock(&dev->lock);
    704		if (skip)
    705			/*
    706			 * When device is establishing connection for the first
    707			 * time - do not remap, it will be closed soon.
    708			 */
    709			continue;
    710
    711		rnbd_clt_info(dev, "session reconnected, remapping device\n");
    712		err = send_msg_open(dev, RTRS_PERMIT_NOWAIT);
    713		if (err) {
    714			rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
    715			break;
    716		}
    717	}
    718	mutex_unlock(&sess->lock);
    719}
    720
    721static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
    722{
    723	struct rnbd_clt_session *sess = priv;
    724
    725	switch (ev) {
    726	case RTRS_CLT_LINK_EV_DISCONNECTED:
    727		set_dev_states_to_disconnected(sess);
    728		break;
    729	case RTRS_CLT_LINK_EV_RECONNECTED:
    730		remap_devs(sess);
    731		break;
    732	default:
    733		pr_err("Unknown session event received (%d), session: %s\n",
    734		       ev, sess->sessname);
    735	}
    736}
    737
    738static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
    739{
    740	unsigned int cpu;
    741	struct rnbd_cpu_qlist *cpu_q;
    742
    743	for_each_possible_cpu(cpu) {
    744		cpu_q = per_cpu_ptr(cpu_queues, cpu);
    745
    746		cpu_q->cpu = cpu;
    747		INIT_LIST_HEAD(&cpu_q->requeue_list);
    748		spin_lock_init(&cpu_q->requeue_lock);
    749	}
    750}
    751
    752static void destroy_mq_tags(struct rnbd_clt_session *sess)
    753{
    754	if (sess->tag_set.tags)
    755		blk_mq_free_tag_set(&sess->tag_set);
    756}
    757
    758static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
    759{
    760	sess->rtrs_ready = true;
    761	wake_up_all(&sess->rtrs_waitq);
    762}
    763
    764static void close_rtrs(struct rnbd_clt_session *sess)
    765{
    766	might_sleep();
    767
    768	if (!IS_ERR_OR_NULL(sess->rtrs)) {
    769		rtrs_clt_close(sess->rtrs);
    770		sess->rtrs = NULL;
    771		wake_up_rtrs_waiters(sess);
    772	}
    773}
    774
    775static void free_sess(struct rnbd_clt_session *sess)
    776{
    777	WARN_ON(!list_empty(&sess->devs_list));
    778
    779	might_sleep();
    780
    781	close_rtrs(sess);
    782	destroy_mq_tags(sess);
    783	if (!list_empty(&sess->list)) {
    784		mutex_lock(&sess_lock);
    785		list_del(&sess->list);
    786		mutex_unlock(&sess_lock);
    787	}
    788	free_percpu(sess->cpu_queues);
    789	free_percpu(sess->cpu_rr);
    790	mutex_destroy(&sess->lock);
    791	kfree(sess);
    792}
    793
    794static struct rnbd_clt_session *alloc_sess(const char *sessname)
    795{
    796	struct rnbd_clt_session *sess;
    797	int err, cpu;
    798
    799	sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
    800	if (!sess)
    801		return ERR_PTR(-ENOMEM);
    802	strscpy(sess->sessname, sessname, sizeof(sess->sessname));
    803	atomic_set(&sess->busy, 0);
    804	mutex_init(&sess->lock);
    805	INIT_LIST_HEAD(&sess->devs_list);
    806	INIT_LIST_HEAD(&sess->list);
    807	bitmap_zero(sess->cpu_queues_bm, num_possible_cpus());
    808	init_waitqueue_head(&sess->rtrs_waitq);
    809	refcount_set(&sess->refcount, 1);
    810
    811	sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
    812	if (!sess->cpu_queues) {
    813		err = -ENOMEM;
    814		goto err;
    815	}
    816	rnbd_init_cpu_qlists(sess->cpu_queues);
    817
    818	/*
    819	 * That is simple percpu variable which stores cpu indices, which are
    820	 * incremented on each access.  We need that for the sake of fairness
    821	 * to wake up queues in a round-robin manner.
    822	 */
    823	sess->cpu_rr = alloc_percpu(int);
    824	if (!sess->cpu_rr) {
    825		err = -ENOMEM;
    826		goto err;
    827	}
    828	for_each_possible_cpu(cpu)
    829		* per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
    830
    831	return sess;
    832
    833err:
    834	free_sess(sess);
    835
    836	return ERR_PTR(err);
    837}
    838
    839static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
    840{
    841	wait_event(sess->rtrs_waitq, sess->rtrs_ready);
    842	if (IS_ERR_OR_NULL(sess->rtrs))
    843		return -ECONNRESET;
    844
    845	return 0;
    846}
    847
    848static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
    849	__releases(&sess_lock)
    850	__acquires(&sess_lock)
    851{
    852	DEFINE_WAIT(wait);
    853
    854	prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
    855	if (IS_ERR_OR_NULL(sess->rtrs)) {
    856		finish_wait(&sess->rtrs_waitq, &wait);
    857		return;
    858	}
    859	mutex_unlock(&sess_lock);
    860	/* loop in caller, see __find_and_get_sess().
    861	 * You can't leave mutex locked and call schedule(), you will catch a
    862	 * deadlock with a caller of free_sess(), which has just put the last
    863	 * reference and is about to take the sess_lock in order to delete
    864	 * the session from the list.
    865	 */
    866	schedule();
    867	mutex_lock(&sess_lock);
    868}
    869
    870static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
    871	__releases(&sess_lock)
    872	__acquires(&sess_lock)
    873{
    874	struct rnbd_clt_session *sess, *sn;
    875	int err;
    876
    877again:
    878	list_for_each_entry_safe(sess, sn, &sess_list, list) {
    879		if (strcmp(sessname, sess->sessname))
    880			continue;
    881
    882		if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
    883			/*
    884			 * No RTRS connection, session is dying.
    885			 */
    886			continue;
    887
    888		if (rnbd_clt_get_sess(sess)) {
    889			/*
    890			 * Alive session is found, wait for RTRS connection.
    891			 */
    892			mutex_unlock(&sess_lock);
    893			err = wait_for_rtrs_connection(sess);
    894			if (err)
    895				rnbd_clt_put_sess(sess);
    896			mutex_lock(&sess_lock);
    897
    898			if (err)
    899				/* Session is dying, repeat the loop */
    900				goto again;
    901
    902			return sess;
    903		}
    904		/*
    905		 * Ref is 0, session is dying, wait for RTRS disconnect
    906		 * in order to avoid session names clashes.
    907		 */
    908		wait_for_rtrs_disconnection(sess);
    909		/*
    910		 * RTRS is disconnected and soon session will be freed,
    911		 * so repeat a loop.
    912		 */
    913		goto again;
    914	}
    915
    916	return NULL;
    917}
    918
    919/* caller is responsible for initializing 'first' to false */
    920static struct
    921rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
    922{
    923	struct rnbd_clt_session *sess = NULL;
    924
    925	mutex_lock(&sess_lock);
    926	sess = __find_and_get_sess(sessname);
    927	if (!sess) {
    928		sess = alloc_sess(sessname);
    929		if (IS_ERR(sess)) {
    930			mutex_unlock(&sess_lock);
    931			return sess;
    932		}
    933		list_add(&sess->list, &sess_list);
    934		*first = true;
    935	}
    936	mutex_unlock(&sess_lock);
    937
    938	return sess;
    939}
    940
    941static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
    942{
    943	struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
    944
    945	if (dev->read_only && (mode & FMODE_WRITE))
    946		return -EPERM;
    947
    948	if (dev->dev_state == DEV_STATE_UNMAPPED ||
    949	    !rnbd_clt_get_dev(dev))
    950		return -EIO;
    951
    952	return 0;
    953}
    954
    955static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
    956{
    957	struct rnbd_clt_dev *dev = gen->private_data;
    958
    959	rnbd_clt_put_dev(dev);
    960}
    961
    962static int rnbd_client_getgeo(struct block_device *block_device,
    963			      struct hd_geometry *geo)
    964{
    965	u64 size;
    966	struct rnbd_clt_dev *dev;
    967
    968	dev = block_device->bd_disk->private_data;
    969	size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
    970	geo->cylinders	= size >> 6;	/* size/64 */
    971	geo->heads	= 4;
    972	geo->sectors	= 16;
    973	geo->start	= 0;
    974
    975	return 0;
    976}
    977
    978static const struct block_device_operations rnbd_client_ops = {
    979	.owner		= THIS_MODULE,
    980	.open		= rnbd_client_open,
    981	.release	= rnbd_client_release,
    982	.getgeo		= rnbd_client_getgeo
    983};
    984
    985/* The amount of data that belongs to an I/O and the amount of data that
    986 * should be read or written to the disk (bi_size) can differ.
    987 *
    988 * E.g. When WRITE_SAME is used, only a small amount of data is
    989 * transferred that is then written repeatedly over a lot of sectors.
    990 *
    991 * Get the size of data to be transferred via RTRS by summing up the size
    992 * of the scather-gather list entries.
    993 */
    994static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
    995{
    996	struct scatterlist *sg;
    997	size_t tsize = 0;
    998	int i;
    999
   1000	for_each_sg(sglist, sg, len, i)
   1001		tsize += sg->length;
   1002	return tsize;
   1003}
   1004
   1005static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
   1006				     struct request *rq,
   1007				     struct rnbd_iu *iu)
   1008{
   1009	struct rtrs_clt_sess *rtrs = dev->sess->rtrs;
   1010	struct rtrs_permit *permit = iu->permit;
   1011	struct rnbd_msg_io msg;
   1012	struct rtrs_clt_req_ops req_ops;
   1013	unsigned int sg_cnt = 0;
   1014	struct kvec vec;
   1015	size_t size;
   1016	int err;
   1017
   1018	iu->rq		= rq;
   1019	iu->dev		= dev;
   1020	msg.sector	= cpu_to_le64(blk_rq_pos(rq));
   1021	msg.bi_size	= cpu_to_le32(blk_rq_bytes(rq));
   1022	msg.rw		= cpu_to_le32(rq_to_rnbd_flags(rq));
   1023	msg.prio	= cpu_to_le16(req_get_ioprio(rq));
   1024
   1025	/*
   1026	 * We only support discards with single segment for now.
   1027	 * See queue limits.
   1028	 */
   1029	if (req_op(rq) != REQ_OP_DISCARD)
   1030		sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
   1031
   1032	if (sg_cnt == 0)
   1033		sg_mark_end(&iu->sgt.sgl[0]);
   1034
   1035	msg.hdr.type	= cpu_to_le16(RNBD_MSG_IO);
   1036	msg.device_id	= cpu_to_le32(dev->device_id);
   1037
   1038	vec = (struct kvec) {
   1039		.iov_base = &msg,
   1040		.iov_len  = sizeof(msg)
   1041	};
   1042	size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
   1043	req_ops = (struct rtrs_clt_req_ops) {
   1044		.priv = iu,
   1045		.conf_fn = msg_io_conf,
   1046	};
   1047	err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
   1048			       &vec, 1, size, iu->sgt.sgl, sg_cnt);
   1049	if (err) {
   1050		rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
   1051				 err);
   1052		return err;
   1053	}
   1054
   1055	return 0;
   1056}
   1057
   1058/**
   1059 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
   1060 * @dev:	Device to be checked
   1061 * @q:		Queue to be added to the requeue list if required
   1062 *
   1063 * Description:
   1064 *     If session is busy, that means someone will requeue us when resources
   1065 *     are freed.  If session is not doing anything - device is not added to
   1066 *     the list and @false is returned.
   1067 */
   1068static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
   1069						struct rnbd_queue *q)
   1070{
   1071	struct rnbd_clt_session *sess = dev->sess;
   1072	struct rnbd_cpu_qlist *cpu_q;
   1073	unsigned long flags;
   1074	bool added = true;
   1075	bool need_set;
   1076
   1077	cpu_q = get_cpu_ptr(sess->cpu_queues);
   1078	spin_lock_irqsave(&cpu_q->requeue_lock, flags);
   1079
   1080	if (!test_and_set_bit_lock(0, &q->in_list)) {
   1081		if (WARN_ON(!list_empty(&q->requeue_list)))
   1082			goto unlock;
   1083
   1084		need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
   1085		if (need_set) {
   1086			set_bit(cpu_q->cpu, sess->cpu_queues_bm);
   1087			/* Paired with rnbd_put_permit(). Set a bit first
   1088			 * and then observe the busy counter.
   1089			 */
   1090			smp_mb__before_atomic();
   1091		}
   1092		if (atomic_read(&sess->busy)) {
   1093			list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
   1094		} else {
   1095			/* Very unlikely, but possible: busy counter was
   1096			 * observed as zero.  Drop all bits and return
   1097			 * false to restart the queue by ourselves.
   1098			 */
   1099			if (need_set)
   1100				clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
   1101			clear_bit_unlock(0, &q->in_list);
   1102			added = false;
   1103		}
   1104	}
   1105unlock:
   1106	spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
   1107	put_cpu_ptr(sess->cpu_queues);
   1108
   1109	return added;
   1110}
   1111
   1112static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
   1113					struct blk_mq_hw_ctx *hctx,
   1114					int delay)
   1115{
   1116	struct rnbd_queue *q = hctx->driver_data;
   1117
   1118	if (delay != RNBD_DELAY_IFBUSY)
   1119		blk_mq_delay_run_hw_queue(hctx, delay);
   1120	else if (!rnbd_clt_dev_add_to_requeue(dev, q))
   1121		/*
   1122		 * If session is not busy we have to restart
   1123		 * the queue ourselves.
   1124		 */
   1125		blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
   1126}
   1127
   1128static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
   1129				   const struct blk_mq_queue_data *bd)
   1130{
   1131	struct request *rq = bd->rq;
   1132	struct rnbd_clt_dev *dev = rq->q->disk->private_data;
   1133	struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
   1134	int err;
   1135	blk_status_t ret = BLK_STS_IOERR;
   1136
   1137	if (dev->dev_state != DEV_STATE_MAPPED)
   1138		return BLK_STS_IOERR;
   1139
   1140	iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
   1141				      RTRS_PERMIT_NOWAIT);
   1142	if (!iu->permit) {
   1143		rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
   1144		return BLK_STS_RESOURCE;
   1145	}
   1146
   1147	iu->sgt.sgl = iu->first_sgl;
   1148	err = sg_alloc_table_chained(&iu->sgt,
   1149				     /* Even-if the request has no segment,
   1150				      * sglist must have one entry at least.
   1151				      */
   1152				     blk_rq_nr_phys_segments(rq) ? : 1,
   1153				     iu->sgt.sgl,
   1154				     RNBD_INLINE_SG_CNT);
   1155	if (err) {
   1156		rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
   1157		rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
   1158		rnbd_put_permit(dev->sess, iu->permit);
   1159		return BLK_STS_RESOURCE;
   1160	}
   1161
   1162	blk_mq_start_request(rq);
   1163	err = rnbd_client_xfer_request(dev, rq, iu);
   1164	if (err == 0)
   1165		return BLK_STS_OK;
   1166	if (err == -EAGAIN || err == -ENOMEM) {
   1167		rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
   1168		ret = BLK_STS_RESOURCE;
   1169	}
   1170	sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
   1171	rnbd_put_permit(dev->sess, iu->permit);
   1172	return ret;
   1173}
   1174
   1175static int rnbd_rdma_poll(struct blk_mq_hw_ctx *hctx, struct io_comp_batch *iob)
   1176{
   1177	struct rnbd_queue *q = hctx->driver_data;
   1178	struct rnbd_clt_dev *dev = q->dev;
   1179	int cnt;
   1180
   1181	cnt = rtrs_clt_rdma_cq_direct(dev->sess->rtrs, hctx->queue_num);
   1182	return cnt;
   1183}
   1184
   1185static int rnbd_rdma_map_queues(struct blk_mq_tag_set *set)
   1186{
   1187	struct rnbd_clt_session *sess = set->driver_data;
   1188
   1189	/* shared read/write queues */
   1190	set->map[HCTX_TYPE_DEFAULT].nr_queues = num_online_cpus();
   1191	set->map[HCTX_TYPE_DEFAULT].queue_offset = 0;
   1192	set->map[HCTX_TYPE_READ].nr_queues = num_online_cpus();
   1193	set->map[HCTX_TYPE_READ].queue_offset = 0;
   1194	blk_mq_map_queues(&set->map[HCTX_TYPE_DEFAULT]);
   1195	blk_mq_map_queues(&set->map[HCTX_TYPE_READ]);
   1196
   1197	if (sess->nr_poll_queues) {
   1198		/* dedicated queue for poll */
   1199		set->map[HCTX_TYPE_POLL].nr_queues = sess->nr_poll_queues;
   1200		set->map[HCTX_TYPE_POLL].queue_offset = set->map[HCTX_TYPE_READ].queue_offset +
   1201			set->map[HCTX_TYPE_READ].nr_queues;
   1202		blk_mq_map_queues(&set->map[HCTX_TYPE_POLL]);
   1203		pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n",
   1204			sess->sessname,
   1205			set->map[HCTX_TYPE_DEFAULT].nr_queues,
   1206			set->map[HCTX_TYPE_READ].nr_queues,
   1207			set->map[HCTX_TYPE_POLL].nr_queues);
   1208	} else {
   1209		pr_info("[session=%s] mapped %d/%d default/read queues.\n",
   1210			sess->sessname,
   1211			set->map[HCTX_TYPE_DEFAULT].nr_queues,
   1212			set->map[HCTX_TYPE_READ].nr_queues);
   1213	}
   1214
   1215	return 0;
   1216}
   1217
   1218static struct blk_mq_ops rnbd_mq_ops = {
   1219	.queue_rq	= rnbd_queue_rq,
   1220	.complete	= rnbd_softirq_done_fn,
   1221	.map_queues     = rnbd_rdma_map_queues,
   1222	.poll           = rnbd_rdma_poll,
   1223};
   1224
   1225static int setup_mq_tags(struct rnbd_clt_session *sess)
   1226{
   1227	struct blk_mq_tag_set *tag_set = &sess->tag_set;
   1228
   1229	memset(tag_set, 0, sizeof(*tag_set));
   1230	tag_set->ops		= &rnbd_mq_ops;
   1231	tag_set->queue_depth	= sess->queue_depth;
   1232	tag_set->numa_node		= NUMA_NO_NODE;
   1233	tag_set->flags		= BLK_MQ_F_SHOULD_MERGE |
   1234				  BLK_MQ_F_TAG_QUEUE_SHARED;
   1235	tag_set->cmd_size	= sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
   1236
   1237	/* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */
   1238	tag_set->nr_maps        = sess->nr_poll_queues ? HCTX_MAX_TYPES : 2;
   1239	/*
   1240	 * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues
   1241	 * others are for HCTX_TYPE_POLL
   1242	 */
   1243	tag_set->nr_hw_queues	= num_online_cpus() + sess->nr_poll_queues;
   1244	tag_set->driver_data    = sess;
   1245
   1246	return blk_mq_alloc_tag_set(tag_set);
   1247}
   1248
   1249static struct rnbd_clt_session *
   1250find_and_get_or_create_sess(const char *sessname,
   1251			    const struct rtrs_addr *paths,
   1252			    size_t path_cnt, u16 port_nr, u32 nr_poll_queues)
   1253{
   1254	struct rnbd_clt_session *sess;
   1255	struct rtrs_attrs attrs;
   1256	int err;
   1257	bool first = false;
   1258	struct rtrs_clt_ops rtrs_ops;
   1259
   1260	sess = find_or_create_sess(sessname, &first);
   1261	if (sess == ERR_PTR(-ENOMEM)) {
   1262		return ERR_PTR(-ENOMEM);
   1263	} else if ((nr_poll_queues && !first) ||  (!nr_poll_queues && sess->nr_poll_queues)) {
   1264		/*
   1265		 * A device MUST have its own session to use the polling-mode.
   1266		 * It must fail to map new device with the same session.
   1267		 */
   1268		err = -EINVAL;
   1269		goto put_sess;
   1270	}
   1271
   1272	if (!first)
   1273		return sess;
   1274
   1275	if (!path_cnt) {
   1276		pr_err("Session %s not found, and path parameter not given", sessname);
   1277		err = -ENXIO;
   1278		goto put_sess;
   1279	}
   1280
   1281	rtrs_ops = (struct rtrs_clt_ops) {
   1282		.priv = sess,
   1283		.link_ev = rnbd_clt_link_ev,
   1284	};
   1285	/*
   1286	 * Nothing was found, establish rtrs connection and proceed further.
   1287	 */
   1288	sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
   1289				   paths, path_cnt, port_nr,
   1290				   0, /* Do not use pdu of rtrs */
   1291				   RECONNECT_DELAY,
   1292				   MAX_RECONNECTS, nr_poll_queues);
   1293	if (IS_ERR(sess->rtrs)) {
   1294		err = PTR_ERR(sess->rtrs);
   1295		goto wake_up_and_put;
   1296	}
   1297
   1298	err = rtrs_clt_query(sess->rtrs, &attrs);
   1299	if (err)
   1300		goto close_rtrs;
   1301
   1302	sess->max_io_size = attrs.max_io_size;
   1303	sess->queue_depth = attrs.queue_depth;
   1304	sess->nr_poll_queues = nr_poll_queues;
   1305	sess->max_segments = attrs.max_segments;
   1306
   1307	err = setup_mq_tags(sess);
   1308	if (err)
   1309		goto close_rtrs;
   1310
   1311	err = send_msg_sess_info(sess, RTRS_PERMIT_WAIT);
   1312	if (err)
   1313		goto close_rtrs;
   1314
   1315	wake_up_rtrs_waiters(sess);
   1316
   1317	return sess;
   1318
   1319close_rtrs:
   1320	close_rtrs(sess);
   1321put_sess:
   1322	rnbd_clt_put_sess(sess);
   1323
   1324	return ERR_PTR(err);
   1325
   1326wake_up_and_put:
   1327	wake_up_rtrs_waiters(sess);
   1328	goto put_sess;
   1329}
   1330
   1331static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
   1332				       struct rnbd_queue *q,
   1333				       struct blk_mq_hw_ctx *hctx)
   1334{
   1335	INIT_LIST_HEAD(&q->requeue_list);
   1336	q->dev  = dev;
   1337	q->hctx = hctx;
   1338}
   1339
   1340static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
   1341{
   1342	unsigned long i;
   1343	struct blk_mq_hw_ctx *hctx;
   1344	struct rnbd_queue *q;
   1345
   1346	queue_for_each_hw_ctx(dev->queue, hctx, i) {
   1347		q = &dev->hw_queues[i];
   1348		rnbd_init_hw_queue(dev, q, hctx);
   1349		hctx->driver_data = q;
   1350	}
   1351}
   1352
   1353static void setup_request_queue(struct rnbd_clt_dev *dev)
   1354{
   1355	blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
   1356	blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
   1357	blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
   1358
   1359	/*
   1360	 * we don't support discards to "discontiguous" segments
   1361	 * in on request
   1362	 */
   1363	blk_queue_max_discard_segments(dev->queue, 1);
   1364
   1365	blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
   1366	dev->queue->limits.discard_granularity	= dev->discard_granularity;
   1367	dev->queue->limits.discard_alignment	= dev->discard_alignment;
   1368	if (dev->secure_discard)
   1369		blk_queue_max_secure_erase_sectors(dev->queue,
   1370				dev->max_discard_sectors);
   1371	blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
   1372	blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
   1373	blk_queue_max_segments(dev->queue, dev->max_segments);
   1374	blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
   1375	blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
   1376	blk_queue_write_cache(dev->queue, dev->wc, dev->fua);
   1377}
   1378
   1379static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
   1380{
   1381	int err;
   1382
   1383	dev->gd->major		= rnbd_client_major;
   1384	dev->gd->first_minor	= idx << RNBD_PART_BITS;
   1385	dev->gd->minors		= 1 << RNBD_PART_BITS;
   1386	dev->gd->fops		= &rnbd_client_ops;
   1387	dev->gd->queue		= dev->queue;
   1388	dev->gd->private_data	= dev;
   1389	snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
   1390		 idx);
   1391	pr_debug("disk_name=%s, capacity=%zu\n",
   1392		 dev->gd->disk_name,
   1393		 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
   1394		 );
   1395
   1396	set_capacity(dev->gd, dev->nsectors);
   1397
   1398	if (dev->access_mode == RNBD_ACCESS_RO) {
   1399		dev->read_only = true;
   1400		set_disk_ro(dev->gd, true);
   1401	} else {
   1402		dev->read_only = false;
   1403	}
   1404
   1405	/*
   1406	 * Network device does not need rotational
   1407	 */
   1408	blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
   1409	err = add_disk(dev->gd);
   1410	if (err)
   1411		blk_cleanup_disk(dev->gd);
   1412
   1413	return err;
   1414}
   1415
   1416static int rnbd_client_setup_device(struct rnbd_clt_dev *dev)
   1417{
   1418	int idx = dev->clt_device_id;
   1419
   1420	dev->size = dev->nsectors * dev->logical_block_size;
   1421
   1422	dev->gd = blk_mq_alloc_disk(&dev->sess->tag_set, dev);
   1423	if (IS_ERR(dev->gd))
   1424		return PTR_ERR(dev->gd);
   1425	dev->queue = dev->gd->queue;
   1426	rnbd_init_mq_hw_queues(dev);
   1427
   1428	setup_request_queue(dev);
   1429	return rnbd_clt_setup_gen_disk(dev, idx);
   1430}
   1431
   1432static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
   1433				      enum rnbd_access_mode access_mode,
   1434				      const char *pathname,
   1435				      u32 nr_poll_queues)
   1436{
   1437	struct rnbd_clt_dev *dev;
   1438	int ret;
   1439
   1440	dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
   1441	if (!dev)
   1442		return ERR_PTR(-ENOMEM);
   1443
   1444	/*
   1445	 * nr_cpu_ids: the number of softirq queues
   1446	 * nr_poll_queues: the number of polling queues
   1447	 */
   1448	dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues,
   1449				 sizeof(*dev->hw_queues),
   1450				 GFP_KERNEL);
   1451	if (!dev->hw_queues) {
   1452		ret = -ENOMEM;
   1453		goto out_alloc;
   1454	}
   1455
   1456	ret = ida_alloc_max(&index_ida, 1 << (MINORBITS - RNBD_PART_BITS),
   1457			    GFP_KERNEL);
   1458	if (ret < 0) {
   1459		pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
   1460		       pathname, sess->sessname, ret);
   1461		goto out_queues;
   1462	}
   1463
   1464	dev->pathname = kstrdup(pathname, GFP_KERNEL);
   1465	if (!dev->pathname) {
   1466		ret = -ENOMEM;
   1467		goto out_queues;
   1468	}
   1469
   1470	dev->clt_device_id	= ret;
   1471	dev->sess		= sess;
   1472	dev->access_mode	= access_mode;
   1473	dev->nr_poll_queues	= nr_poll_queues;
   1474	mutex_init(&dev->lock);
   1475	refcount_set(&dev->refcount, 1);
   1476	dev->dev_state = DEV_STATE_INIT;
   1477
   1478	/*
   1479	 * Here we called from sysfs entry, thus clt-sysfs is
   1480	 * responsible that session will not disappear.
   1481	 */
   1482	WARN_ON(!rnbd_clt_get_sess(sess));
   1483
   1484	return dev;
   1485
   1486out_queues:
   1487	kfree(dev->hw_queues);
   1488out_alloc:
   1489	kfree(dev);
   1490	return ERR_PTR(ret);
   1491}
   1492
   1493static bool __exists_dev(const char *pathname, const char *sessname)
   1494{
   1495	struct rnbd_clt_session *sess;
   1496	struct rnbd_clt_dev *dev;
   1497	bool found = false;
   1498
   1499	list_for_each_entry(sess, &sess_list, list) {
   1500		if (sessname && strncmp(sess->sessname, sessname,
   1501					sizeof(sess->sessname)))
   1502			continue;
   1503		mutex_lock(&sess->lock);
   1504		list_for_each_entry(dev, &sess->devs_list, list) {
   1505			if (strlen(dev->pathname) == strlen(pathname) &&
   1506			    !strcmp(dev->pathname, pathname)) {
   1507				found = true;
   1508				break;
   1509			}
   1510		}
   1511		mutex_unlock(&sess->lock);
   1512		if (found)
   1513			break;
   1514	}
   1515
   1516	return found;
   1517}
   1518
   1519static bool exists_devpath(const char *pathname, const char *sessname)
   1520{
   1521	bool found;
   1522
   1523	mutex_lock(&sess_lock);
   1524	found = __exists_dev(pathname, sessname);
   1525	mutex_unlock(&sess_lock);
   1526
   1527	return found;
   1528}
   1529
   1530static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev *dev)
   1531{
   1532	bool found;
   1533	struct rnbd_clt_session *sess = dev->sess;
   1534
   1535	mutex_lock(&sess_lock);
   1536	found = __exists_dev(dev->pathname, sess->sessname);
   1537	if (!found) {
   1538		mutex_lock(&sess->lock);
   1539		list_add_tail(&dev->list, &sess->devs_list);
   1540		mutex_unlock(&sess->lock);
   1541	}
   1542	mutex_unlock(&sess_lock);
   1543
   1544	return found;
   1545}
   1546
   1547static void delete_dev(struct rnbd_clt_dev *dev)
   1548{
   1549	struct rnbd_clt_session *sess = dev->sess;
   1550
   1551	mutex_lock(&sess->lock);
   1552	list_del(&dev->list);
   1553	mutex_unlock(&sess->lock);
   1554}
   1555
   1556struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
   1557					   struct rtrs_addr *paths,
   1558					   size_t path_cnt, u16 port_nr,
   1559					   const char *pathname,
   1560					   enum rnbd_access_mode access_mode,
   1561					   u32 nr_poll_queues)
   1562{
   1563	struct rnbd_clt_session *sess;
   1564	struct rnbd_clt_dev *dev;
   1565	int ret;
   1566
   1567	if (exists_devpath(pathname, sessname))
   1568		return ERR_PTR(-EEXIST);
   1569
   1570	sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr, nr_poll_queues);
   1571	if (IS_ERR(sess))
   1572		return ERR_CAST(sess);
   1573
   1574	dev = init_dev(sess, access_mode, pathname, nr_poll_queues);
   1575	if (IS_ERR(dev)) {
   1576		pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
   1577		       pathname, sess->sessname, PTR_ERR(dev));
   1578		ret = PTR_ERR(dev);
   1579		goto put_sess;
   1580	}
   1581	if (insert_dev_if_not_exists_devpath(dev)) {
   1582		ret = -EEXIST;
   1583		goto put_dev;
   1584	}
   1585	ret = send_msg_open(dev, RTRS_PERMIT_WAIT);
   1586	if (ret) {
   1587		rnbd_clt_err(dev,
   1588			      "map_device: failed, can't open remote device, err: %d\n",
   1589			      ret);
   1590		goto del_dev;
   1591	}
   1592	mutex_lock(&dev->lock);
   1593	pr_debug("Opened remote device: session=%s, path='%s'\n",
   1594		 sess->sessname, pathname);
   1595	ret = rnbd_client_setup_device(dev);
   1596	if (ret) {
   1597		rnbd_clt_err(dev,
   1598			      "map_device: Failed to configure device, err: %d\n",
   1599			      ret);
   1600		mutex_unlock(&dev->lock);
   1601		goto send_close;
   1602	}
   1603
   1604	rnbd_clt_info(dev,
   1605		       "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, wc: %d, fua: %d)\n",
   1606		       dev->gd->disk_name, dev->nsectors,
   1607		       dev->logical_block_size, dev->physical_block_size,
   1608		       dev->max_discard_sectors,
   1609		       dev->discard_granularity, dev->discard_alignment,
   1610		       dev->secure_discard, dev->max_segments,
   1611		       dev->max_hw_sectors, dev->wc, dev->fua);
   1612
   1613	mutex_unlock(&dev->lock);
   1614	rnbd_clt_put_sess(sess);
   1615
   1616	return dev;
   1617
   1618send_close:
   1619	send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
   1620del_dev:
   1621	delete_dev(dev);
   1622put_dev:
   1623	rnbd_clt_put_dev(dev);
   1624put_sess:
   1625	rnbd_clt_put_sess(sess);
   1626
   1627	return ERR_PTR(ret);
   1628}
   1629
   1630static void destroy_gen_disk(struct rnbd_clt_dev *dev)
   1631{
   1632	del_gendisk(dev->gd);
   1633	blk_cleanup_disk(dev->gd);
   1634}
   1635
   1636static void destroy_sysfs(struct rnbd_clt_dev *dev,
   1637			  const struct attribute *sysfs_self)
   1638{
   1639	rnbd_clt_remove_dev_symlink(dev);
   1640	if (dev->kobj.state_initialized) {
   1641		if (sysfs_self)
   1642			/* To avoid deadlock firstly remove itself */
   1643			sysfs_remove_file_self(&dev->kobj, sysfs_self);
   1644		kobject_del(&dev->kobj);
   1645		kobject_put(&dev->kobj);
   1646	}
   1647}
   1648
   1649int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
   1650			   const struct attribute *sysfs_self)
   1651{
   1652	struct rnbd_clt_session *sess = dev->sess;
   1653	int refcount, ret = 0;
   1654	bool was_mapped;
   1655
   1656	mutex_lock(&dev->lock);
   1657	if (dev->dev_state == DEV_STATE_UNMAPPED) {
   1658		rnbd_clt_info(dev, "Device is already being unmapped\n");
   1659		ret = -EALREADY;
   1660		goto err;
   1661	}
   1662	refcount = refcount_read(&dev->refcount);
   1663	if (!force && refcount > 1) {
   1664		rnbd_clt_err(dev,
   1665			      "Closing device failed, device is in use, (%d device users)\n",
   1666			      refcount - 1);
   1667		ret = -EBUSY;
   1668		goto err;
   1669	}
   1670	was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
   1671	dev->dev_state = DEV_STATE_UNMAPPED;
   1672	mutex_unlock(&dev->lock);
   1673
   1674	delete_dev(dev);
   1675	destroy_sysfs(dev, sysfs_self);
   1676	destroy_gen_disk(dev);
   1677	if (was_mapped && sess->rtrs)
   1678		send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
   1679
   1680	rnbd_clt_info(dev, "Device is unmapped\n");
   1681
   1682	/* Likely last reference put */
   1683	rnbd_clt_put_dev(dev);
   1684
   1685	/*
   1686	 * Here device and session can be vanished!
   1687	 */
   1688
   1689	return 0;
   1690err:
   1691	mutex_unlock(&dev->lock);
   1692
   1693	return ret;
   1694}
   1695
   1696int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
   1697{
   1698	int err;
   1699
   1700	mutex_lock(&dev->lock);
   1701	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
   1702		err = 0;
   1703	else if (dev->dev_state == DEV_STATE_UNMAPPED)
   1704		err = -ENODEV;
   1705	else if (dev->dev_state == DEV_STATE_MAPPED)
   1706		err = -EALREADY;
   1707	else
   1708		err = -EBUSY;
   1709	mutex_unlock(&dev->lock);
   1710	if (!err) {
   1711		rnbd_clt_info(dev, "Remapping device.\n");
   1712		err = send_msg_open(dev, RTRS_PERMIT_WAIT);
   1713		if (err)
   1714			rnbd_clt_err(dev, "remap_device: %d\n", err);
   1715	}
   1716
   1717	return err;
   1718}
   1719
   1720static void unmap_device_work(struct work_struct *work)
   1721{
   1722	struct rnbd_clt_dev *dev;
   1723
   1724	dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
   1725	rnbd_clt_unmap_device(dev, true, NULL);
   1726}
   1727
   1728static void rnbd_destroy_sessions(void)
   1729{
   1730	struct rnbd_clt_session *sess, *sn;
   1731	struct rnbd_clt_dev *dev, *tn;
   1732
   1733	/* Firstly forbid access through sysfs interface */
   1734	rnbd_clt_destroy_sysfs_files();
   1735
   1736	/*
   1737	 * Here at this point there is no any concurrent access to sessions
   1738	 * list and devices list:
   1739	 *   1. New session or device can't be created - session sysfs files
   1740	 *      are removed.
   1741	 *   2. Device or session can't be removed - module reference is taken
   1742	 *      into account in unmap device sysfs callback.
   1743	 *   3. No IO requests inflight - each file open of block_dev increases
   1744	 *      module reference in get_disk().
   1745	 *
   1746	 * But still there can be user requests inflights, which are sent by
   1747	 * asynchronous send_msg_*() functions, thus before unmapping devices
   1748	 * RTRS session must be explicitly closed.
   1749	 */
   1750
   1751	list_for_each_entry_safe(sess, sn, &sess_list, list) {
   1752		if (!rnbd_clt_get_sess(sess))
   1753			continue;
   1754		close_rtrs(sess);
   1755		list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
   1756			/*
   1757			 * Here unmap happens in parallel for only one reason:
   1758			 * blk_cleanup_queue() takes around half a second, so
   1759			 * on huge amount of devices the whole module unload
   1760			 * procedure takes minutes.
   1761			 */
   1762			INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
   1763			queue_work(rnbd_clt_wq, &dev->unmap_on_rmmod_work);
   1764		}
   1765		rnbd_clt_put_sess(sess);
   1766	}
   1767	/* Wait for all scheduled unmap works */
   1768	flush_workqueue(rnbd_clt_wq);
   1769	WARN_ON(!list_empty(&sess_list));
   1770}
   1771
   1772static int __init rnbd_client_init(void)
   1773{
   1774	int err = 0;
   1775
   1776	BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
   1777	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
   1778	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
   1779	BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
   1780	BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
   1781	BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
   1782	rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
   1783	if (rnbd_client_major <= 0) {
   1784		pr_err("Failed to load module, block device registration failed\n");
   1785		return -EBUSY;
   1786	}
   1787
   1788	err = rnbd_clt_create_sysfs_files();
   1789	if (err) {
   1790		pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
   1791		       err);
   1792		unregister_blkdev(rnbd_client_major, "rnbd");
   1793		return err;
   1794	}
   1795	rnbd_clt_wq = alloc_workqueue("rnbd_clt_wq", 0, 0);
   1796	if (!rnbd_clt_wq) {
   1797		pr_err("Failed to load module, alloc_workqueue failed.\n");
   1798		rnbd_clt_destroy_sysfs_files();
   1799		unregister_blkdev(rnbd_client_major, "rnbd");
   1800		err = -ENOMEM;
   1801	}
   1802
   1803	return err;
   1804}
   1805
   1806static void __exit rnbd_client_exit(void)
   1807{
   1808	rnbd_destroy_sessions();
   1809	unregister_blkdev(rnbd_client_major, "rnbd");
   1810	ida_destroy(&index_ida);
   1811	destroy_workqueue(rnbd_clt_wq);
   1812}
   1813
   1814module_init(rnbd_client_init);
   1815module_exit(rnbd_client_exit);