Linux RDMA and InfiniBand development
 help / color / mirror / Atom feed
From: Md Haris Iqbal <haris.iqbal@ionos.com>
To: linux-block@vger.kernel.org, linux-rdma@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, axboe@kernel.dk,
	bvanassche@acm.org, hch@lst.de, jgg@ziepe.ca, leon@kernel.org,
	jinpu.wang@ionos.com, Md Haris Iqbal <haris.iqbal@ionos.com>,
	Jia Li <jia.li@ionos.com>
Subject: [PATCH 09/13] block/brmr: client: main functionality
Date: Tue,  5 May 2026 09:46:21 +0200	[thread overview]
Message-ID: <20260505074644.195453-10-haris.iqbal@ionos.com> (raw)
In-Reply-To: <20260505074644.195453-1-haris.iqbal@ionos.com>

Add the BRMR client implementation:

  brmr-clt.c		client core: gendisk and tag-set creation per
			pool, blk-mq queue_rq() submitting block IOs to
			the underlying RMR pool, queue limit setup
			(chunk size, write-zeroes, discard, write-cache
			and FUA features) and device tear-down.
  brmr-clt-reque.c	per-CPU requeue queues used to retry IOs
			temporarily blocked on RMR resource exhaustion.
  brmr-clt-stats.c	per-pool statistics counters (request size
			distribution, BLK_STS_RESOURCE returns).

These files are not compiled until the modules are wired into the
build in a later patch in this series.

Signed-off-by: Md Haris Iqbal <haris.iqbal@ionos.com>
Signed-off-by: Jia Li <jia.li@ionos.com>
---
 drivers/block/brmr/brmr-clt-reque.c |  228 +++++
 drivers/block/brmr/brmr-clt-stats.c |  332 ++++++++
 drivers/block/brmr/brmr-clt.c       | 1222 +++++++++++++++++++++++++++
 3 files changed, 1782 insertions(+)
 create mode 100644 drivers/block/brmr/brmr-clt-reque.c
 create mode 100644 drivers/block/brmr/brmr-clt-stats.c
 create mode 100644 drivers/block/brmr/brmr-clt.c

diff --git a/drivers/block/brmr/brmr-clt-reque.c b/drivers/block/brmr/brmr-clt-reque.c
new file mode 100644
index 000000000000..252661486a0a
--- /dev/null
+++ b/drivers/block/brmr/brmr-clt-reque.c
@@ -0,0 +1,228 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Block device over RMR (BRMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+#include <linux/blk-mq.h>
+#include <linux/slab.h>
+#include <linux/wait.h>
+#include <linux/sched.h>
+
+#include "brmr-clt.h"
+#include "rmr.h"
+#include "rmr-pool.h"
+
+MODULE_AUTHOR("The RMR and BRMR developers");
+MODULE_VERSION(BRMR_VER_STRING);
+MODULE_DESCRIPTION("BRMR Block Device using RMR cluster");
+MODULE_LICENSE("GPL");
+
+static inline void brmr_requeue(struct brmr_queue *q)
+{
+	if (WARN_ON(!q->hctx))
+		return;
+
+	/* We can come here from interrupt, thus async=true */
+	blk_mq_run_hw_queue(q->hctx, true);
+}
+
+/**
+ * requeue implementation as used by ibnbd
+ */
+
+void brmr_init_cpu_qlists(struct brmr_cpu_qlist __percpu *cpu_queues)
+{
+	unsigned int cpu;
+	struct brmr_cpu_qlist *cpu_q;
+
+	for_each_possible_cpu(cpu) {
+		cpu_q = per_cpu_ptr(cpu_queues, cpu);
+
+		cpu_q->cpu = cpu;
+		INIT_LIST_HEAD(&cpu_q->requeue_list);
+		spin_lock_init(&cpu_q->requeue_lock);
+	}
+}
+
+/**
+ * brmr_get_cpu_qlist() - finds a list with HW queues to be requeued
+ *
+ * Description:
+ *     Each CPU has a list of HW queues, which needs to be requeed.  If a list
+ *     is not empty - it is marked with a bit.  This function finds first
+ *     set bit in a bitmap and returns corresponding CPU list.
+ */
+static struct brmr_cpu_qlist *
+brmr_get_cpu_qlist(struct brmr_clt_pool *pool, int cpu)
+{
+	int bit;
+
+	/* First half */
+	bit = find_next_bit(pool->cpu_queues_bm, nr_cpu_ids, cpu);
+	if (bit < nr_cpu_ids) {
+		return per_cpu_ptr(pool->cpu_queues, bit);
+	} else if (cpu != 0) {
+		/* Second half */
+		bit = find_next_bit(pool->cpu_queues_bm, cpu, 0);
+		if (bit < cpu)
+			return per_cpu_ptr(pool->cpu_queues, bit);
+	}
+
+	return NULL;
+}
+
+static inline int nxt_cpu(int cpu)
+{
+	return (cpu + 1) % nr_cpu_ids;
+}
+
+/**
+ * brmr_requeue_if_needed() - requeue if CPU queue is marked as non empty
+ *
+ * Description:
+ *     Each CPU has it's own list of HW queues, which should be requeued.
+ *     Function finds such list with HW queues, takes a list lock, picks up
+ *     the first HW queue out of the list and requeues it.
+ *
+ * Return:
+ *     True if the queue was requeued, false otherwise.
+ *
+ * Context:
+ *     Does not matter.
+ */
+static inline bool brmr_requeue_if_needed(struct brmr_clt_pool *pool)
+{
+	struct brmr_queue *q = NULL;
+	struct brmr_cpu_qlist *cpu_q;
+	unsigned long flags;
+	int *cpup;
+
+	/*
+	 * To keep fairness and not to let other queues starve we always
+	 * try to wake up someone else in round-robin manner.  That of course
+	 * increases latency but queues always have a chance to be executed.
+	 */
+	cpup = get_cpu_ptr(pool->cpu_rr);
+	for (cpu_q = brmr_get_cpu_qlist(pool, nxt_cpu(*cpup)); cpu_q;
+	     cpu_q = brmr_get_cpu_qlist(pool, nxt_cpu(cpu_q->cpu))) {
+		if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
+			continue;
+		if (likely(test_bit(cpu_q->cpu, pool->cpu_queues_bm))) {
+			q = list_first_entry_or_null(&cpu_q->requeue_list,
+						     typeof(*q), requeue_list);
+			if (WARN_ON(!q))
+				goto clear_bit;
+			list_del_init(&q->requeue_list);
+			clear_bit_unlock(0, &q->in_list);
+
+			if (list_empty(&cpu_q->requeue_list)) {
+				/* Clear bit if nothing is left */
+clear_bit:
+				clear_bit(cpu_q->cpu, pool->cpu_queues_bm);
+			}
+		}
+		spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
+
+		if (q)
+			break;
+	}
+
+	/**
+	 * Saves the CPU that is going to be requeued on the per-cpu var. Just
+	 * incrementing it doesn't work because brmr_get_cpu_qlist() will
+	 * always return the first CPU with something on the queue list when the
+	 * value stored on the var is greater than the last CPU with something
+	 * on the list.
+	 */
+	if (cpu_q)
+		*cpup = cpu_q->cpu;
+	put_cpu_var(pool->cpu_rr);
+
+	if (q)
+		brmr_requeue(q);
+
+	return !!q;
+}
+
+/**
+ * brmr_requeue_requests() - requeue all queues left in the list if
+ *     brmr_clt_pool is idling (there are no requests in-flight).
+ *
+ * Description:
+ *     This function tries to rerun all stopped queues if there are no
+ *     requests in-flight anymore.  This function tries to solve an obvious
+ *     problem, when number of tags < than number of queues (hctx), which
+ *     are stopped and put to sleep.  If last tag, which has been just put,
+ *     does not wake up all left queues (hctxs), IO requests hang forever.
+ *
+ *     That can happen when all number of tags, say N, have been exhausted
+ *     from one CPU, and we have many block devices per session, say M.
+ *     Each block device has it's own queue (hctx) for each CPU, so eventually
+ *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
+ *     If number of tags N < M x nr_cpu_ids finally we will get an IO hang.
+ *
+ *     To avoid this hang last caller of brmr_put_iu() (last caller is the
+ *     one who observes pool->busy == 0) must wake up all remaining queues.
+ *
+ * Context:
+ *     Called from msg_io_conf which in turn is a completion handler
+ *     that is called from interupt.
+ */
+void brmr_requeue_requests(struct brmr_clt_pool *pool)
+{
+	bool requeued;
+
+	do {
+		requeued = brmr_requeue_if_needed(pool);
+	} while (atomic_read(&pool->busy) == 0 && requeued);
+}
+
+bool brmr_add_to_requeue(struct brmr_clt_pool *pool, struct brmr_queue *q)
+{
+	struct brmr_cpu_qlist *cpu_q;
+	unsigned long flags;
+	bool added = true;
+	bool need_set;
+
+	cpu_q = get_cpu_ptr(pool->cpu_queues);
+	spin_lock_irqsave(&cpu_q->requeue_lock, flags);
+
+	if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
+		if (WARN_ON(!list_empty(&q->requeue_list)))
+			goto unlock;
+
+		need_set = !test_bit(cpu_q->cpu, pool->cpu_queues_bm);
+		if (need_set) {
+			set_bit(cpu_q->cpu, pool->cpu_queues_bm);
+			/* Paired with brmr_put_iu(). Set a bit first
+			 * and then observe the busy counter.
+			 */
+			smp_mb__before_atomic();
+		}
+		if (likely(atomic_read(&pool->busy))) {
+			list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
+		} else {
+			/* Very unlikely, but possible: busy counter was
+			 * observed as zero.  Drop all bits and return
+			 * false to restart the queue by ourselves.
+			 */
+			if (need_set)
+				clear_bit(cpu_q->cpu, pool->cpu_queues_bm);
+			clear_bit_unlock(0, &q->in_list);
+			added = false;
+		}
+	}
+unlock:
+	spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
+	put_cpu_ptr(pool->cpu_queues);
+
+	return added;
+}
+
diff --git a/drivers/block/brmr/brmr-clt-stats.c b/drivers/block/brmr/brmr-clt-stats.c
new file mode 100644
index 000000000000..de080fde779c
--- /dev/null
+++ b/drivers/block/brmr/brmr-clt-stats.c
@@ -0,0 +1,332 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Block device over RMR (BRMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+//#include <linux/module.h>
+//#include <linux/blkdev.h>
+//#include <linux/hdreg.h>
+
+#include "brmr-clt.h"
+#include "rmr.h"
+#include "rmr-pool.h"
+
+
+int brmr_clt_init_stats(struct brmr_clt_stats *stats)
+{
+	stats->pcpu_stats = alloc_percpu(typeof(*stats->pcpu_stats));
+	if (unlikely(!stats->pcpu_stats))
+		return -ENOMEM;
+
+	return 0;
+}
+
+void brmr_clt_free_stats(struct brmr_clt_stats *stats)
+{
+	free_percpu(stats->pcpu_stats);
+}
+
+int brmr_clt_reset_submitted_req(struct brmr_clt_stats *stats, bool enable)
+{
+	struct brmr_stats_pcpu *s;
+	int cpu;
+
+	if (unlikely(!enable))
+		return -EINVAL;
+
+	for_each_possible_cpu(cpu) {
+		s = per_cpu_ptr(stats->pcpu_stats, cpu);
+		memset(&s->submitted_requests, 0,
+		       sizeof(s->submitted_requests));
+	}
+
+	return 0;
+}
+
+int brmr_clt_reset_req_sizes(struct brmr_clt_stats *stats, bool enable)
+{
+	struct brmr_stats_pcpu *s;
+	int cpu;
+
+	if (unlikely(!enable))
+		return -EINVAL;
+
+	for_each_possible_cpu(cpu) {
+		s = per_cpu_ptr(stats->pcpu_stats, cpu);
+		memset(&s->request_sizes, 0,
+		       sizeof(s->request_sizes));
+	}
+
+	return 0;
+}
+
+static void brmr_update_submitted_requests(struct brmr_stats_pcpu *s,
+					   size_t size, int split, int d)
+{
+	s->submitted_requests.dir[d].total_sectors += (size >> SECTOR_SHIFT);
+	if (split)
+		s->submitted_requests.dir[d].cnt_split++;
+	else
+		s->submitted_requests.dir[d].cnt_whole++;
+}
+
+#define MAX_LEN (128*1024)
+#define NUM_CLASSES 16
+#define CLASSIFY_SHIFT (ilog2(MAX_LEN)-ilog2(NUM_CLASSES))
+
+/**
+   classifies length linearly in 16 classes:
+
+   input length in bytes
+   
+   <    0x2000 (8K)
+   >=   0x2000 (8K)
+   >=   0x4000 (16K)
+   >=   0x6000 (24K)
+   >=   0x8000 (32K)
+   >=   0xa000 (40K)
+   >=   0xc000 (48K)
+   >=   0xe000 (56K)
+   >=  0x10000 (64K)
+   >=  0x12000 (72K)
+   >=  0x14000 (80K)
+   >=  0x16000 (88K)
+   >=  0x18000 (96K)
+   >=  0x1a000 (104K)
+   >=  0x1c000 (112K)
+   >=  0x1e000 (120K)
+
+   Maximum value is 128K-1.
+   However everything larger is classified as class 15 as well.
+*/
+static inline int classify(long length)
+{
+	return length < MAX_LEN ? (length >> CLASSIFY_SHIFT) : NUM_CLASSES-1;
+}
+
+static void brmr_update_request_sizes(struct brmr_stats_pcpu *s,
+				      size_t size, int split, int d)
+{
+	int size_class = classify(size);
+	switch (split) {
+	case 0:
+		s->request_sizes.dir[d].cnt_whole[size_class]++;
+		break;
+	case 1:
+		s->request_sizes.dir[d].cnt_left[size_class]++;
+		break;
+	case 2:
+		s->request_sizes.dir[d].cnt_right[size_class]++;
+		break;
+	default:
+		WARN_ONCE(true,"unexpected value for split");
+	}
+}
+
+void brmr_update_stats(struct brmr_clt_stats *stats, size_t size, int split, int d)
+{
+	struct brmr_stats_pcpu *s;
+
+	s = this_cpu_ptr(stats->pcpu_stats);
+
+	brmr_update_submitted_requests(s, size, split, d);
+	brmr_update_request_sizes(s, size, split, d);
+}
+
+ssize_t brmr_clt_stats_rq_to_str(struct brmr_clt_stats *stats, char *page, size_t len)
+{
+	struct brmr_stats_rq sum;
+	struct brmr_stats_rq *r;
+	int cpu; int d;
+
+	memset(&sum, 0, sizeof(sum));
+
+	for_each_possible_cpu(cpu) {
+		r = &per_cpu_ptr(stats->pcpu_stats, cpu)->submitted_requests;
+
+		for (d=READ; d<=WRITE; d++) {
+			sum.dir[d].cnt_whole      += r->dir[d].cnt_whole;
+			sum.dir[d].cnt_split      += r->dir[d].cnt_split;
+			sum.dir[d].total_sectors  += r->dir[d].total_sectors;
+		}
+	}
+
+	return scnprintf(page, len, "%llu %llu %llu %llu %llu %llu\n",
+			 sum.dir[READ].cnt_whole, sum.dir[READ].cnt_split,
+			 sum.dir[READ].total_sectors,
+			 sum.dir[WRITE].cnt_whole, sum.dir[WRITE].cnt_split,
+			 sum.dir[WRITE].total_sectors);
+}
+
+ssize_t brmr_clt_stats_sizes_to_str(struct brmr_clt_stats *stats, char *page, size_t len)
+{
+	struct brmr_stats_sizes *sum;
+	struct brmr_stats_sizes *per_cpu;
+	int cpu; int d; int i; int cnt = 0;
+
+	sum = kzalloc(sizeof(*sum), GFP_KERNEL);
+	if (unlikely(!sum))
+		return -ENOMEM;
+
+	for (i = 0; i < STATS_SIZES_NUM; i++) {
+		for_each_possible_cpu(cpu) {
+			per_cpu = &per_cpu_ptr(stats->pcpu_stats, cpu)
+				->request_sizes;
+
+			for (d=READ; d<=WRITE; d++) {
+				sum->dir[d].cnt_whole[i]
+					+= per_cpu->dir[d].cnt_whole[i];
+				sum->dir[d].cnt_left[i]
+					+= per_cpu->dir[d].cnt_left[i];
+				sum->dir[d].cnt_right[i]
+					+= per_cpu->dir[d].cnt_right[i];
+			}
+		}
+	}
+
+	cnt += scnprintf(page + cnt, len - cnt,
+		"         READ        "
+		"        whole                left               right               "
+		"\n");
+	if (len - cnt <= 0)
+		goto free_return;
+
+	cnt += scnprintf(page + cnt, len - cnt,
+			 "<=   8 Kbytes: %19llu %19llu %19llu\n",
+			 sum->dir[READ].cnt_whole[0],
+			 sum->dir[READ].cnt_left[0],
+			 sum->dir[READ].cnt_right[0]);
+
+	for (i = 1; i < STATS_SIZES_NUM; i++) {
+
+		cnt += scnprintf(page + cnt, len - cnt,
+				 ">  %3d Kbytes: %19llu %19llu %19llu\n",
+				 (i)<<3,
+				 sum->dir[READ].cnt_whole[i],
+				 sum->dir[READ].cnt_left[i],
+				 sum->dir[READ].cnt_right[i]);
+
+		if (len - cnt <= 0)
+			goto free_return;
+	}
+
+	cnt += scnprintf(page + cnt, len - cnt,
+		"\n        WRITE        "
+		"        whole                left               right               "
+		"\n");
+	if (len - cnt <= 0)
+		goto free_return;
+
+	cnt += scnprintf(page + cnt, len - cnt,
+			 "<=   8 Kbytes: %19llu %19llu %19llu\n",
+			 sum->dir[WRITE].cnt_whole[0],
+			 sum->dir[WRITE].cnt_left[0],
+			 sum->dir[WRITE].cnt_right[0]);
+
+	for (i = 1; i < STATS_SIZES_NUM; i++) {
+
+		cnt += scnprintf(page + cnt, len - cnt,
+				 ">  %3d Kbytes: %19llu %19llu %19llu\n",
+				 (i)<<3,
+				 sum->dir[WRITE].cnt_whole[i],
+				 sum->dir[WRITE].cnt_left[i],
+				 sum->dir[WRITE].cnt_right[i]);
+
+		if (len - cnt <= 0)
+			goto free_return;
+	}
+
+free_return:
+	kfree(sum);
+
+	return cnt;
+}
+
+int brmr_clt_reset_sts_resource(struct brmr_clt_stats *stats, bool enable)
+{
+	struct brmr_stats_pcpu *s;
+	int cpu;
+
+	if (unlikely(!enable))
+		return -EINVAL;
+
+	for_each_possible_cpu(cpu) {
+		s = per_cpu_ptr(stats->pcpu_stats, cpu);
+		memset(&s->sts_resource, 0,
+		       sizeof(s->sts_resource));
+	}
+
+	return 0;
+}
+
+void brmr_clt_update_sts_resource(struct brmr_clt_stats *stats, int which)
+{
+	struct brmr_stats_pcpu *s;
+
+	s = this_cpu_ptr(stats->pcpu_stats);
+	switch (which) {
+	case 0:
+		s->sts_resource.get_iu++;
+		break;
+	case 1:
+		s->sts_resource.get_iu2++;
+		break;
+	case 2:
+		s->sts_resource.clt_request1++;
+		break;
+	case 3:
+		s->sts_resource.clt_request++;
+		break;
+	default:
+		WARN_ONCE(true,"unexpected value for which");
+	}
+}
+
+ssize_t brmr_stats_sts_resource_to_str(
+	struct brmr_clt_stats *stats, char *page, size_t len)
+{
+	struct brmr_stats_sts_resource sum;
+	struct brmr_stats_sts_resource *r;
+	int cpu;
+
+	memset(&sum, 0, sizeof(sum));
+
+	for_each_possible_cpu(cpu) {
+		r = &per_cpu_ptr(stats->pcpu_stats, cpu)->sts_resource;
+
+		sum.get_iu       += r->get_iu;
+		sum.get_iu2      += r->get_iu2;
+		sum.clt_request1 += r->clt_request1;
+		sum.clt_request  += r->clt_request;
+	}
+
+	return scnprintf(page, len, "%llu %llu %llu %llu\n",
+			 sum.get_iu, sum.get_iu2,
+			 sum.clt_request1, sum.clt_request);
+}
+
+ssize_t brmr_stats_sts_resource_per_cpu_to_str(
+	struct brmr_clt_stats *stats, char *page, size_t len)
+{
+	struct brmr_stats_sts_resource *r;
+	int cpu; int cnt = 0;
+
+	for_each_possible_cpu(cpu) {
+		r = &per_cpu_ptr(stats->pcpu_stats, cpu)->sts_resource;
+
+		cnt += scnprintf(page+cnt, len, "%d %llu %llu %llu %llu\n",
+				 cpu, r->get_iu, r->get_iu2,
+				 r->clt_request1, r->clt_request);
+		if (len - cnt <= 0)
+			goto return_cnt;
+	}
+
+return_cnt:
+	return cnt;
+}
+
diff --git a/drivers/block/brmr/brmr-clt.c b/drivers/block/brmr/brmr-clt.c
new file mode 100644
index 000000000000..6f3d2dd2a9d9
--- /dev/null
+++ b/drivers/block/brmr/brmr-clt.c
@@ -0,0 +1,1222 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Block device over RMR (BRMR)
+ *
+ * Copyright (c) 2026 IONOS SE
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/blkdev.h>
+#include <linux/hdreg.h>
+#include <linux/uio.h>
+
+#include "brmr-clt.h"
+
+MODULE_AUTHOR("The RMR and BRMR developers");
+MODULE_VERSION(BRMR_VER_STRING);
+MODULE_DESCRIPTION("BRMR Block Device using RMR cluster");
+MODULE_LICENSE("GPL");
+
+/*
+ * Maximum number of partitions an instance can have.
+ * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
+ */
+#define BRMR_PART_BITS		6
+
+static DEFINE_IDA(index_ida);
+static DEFINE_MUTEX(ida_lock);
+static DEFINE_MUTEX(brmr_device_lock);
+static LIST_HEAD(brmr_device_list);
+static int brmr_major;
+
+static int BRMR_DELAY_10ms   = 10;
+
+static int index_to_minor(int index)
+{
+	return index << BRMR_PART_BITS;
+}
+
+static int minor_to_index(int minor)
+{
+	return minor >> BRMR_PART_BITS;
+}
+
+static inline const char *rq_op_to_str(struct request *rq)
+{
+	switch (req_op(rq)) {
+	case REQ_OP_READ:
+		return "READ";
+	case REQ_OP_WRITE:
+		return "WRITE";
+	case REQ_OP_DISCARD:
+		return "DISCARD";
+	case REQ_OP_WRITE_ZEROES:
+		return "WRITE_ZEROES";
+	case REQ_OP_FLUSH:
+		return "FLUSH";
+	default:
+		return "UNKNOWN";
+	}
+	return "";
+}
+
+
+/* copy from blk.h */
+static inline bool biovec_phys_mergeable(struct request_queue *q,
+                struct bio_vec *vec1, struct bio_vec *vec2)
+{
+	unsigned long mask = queue_segment_boundary(q);
+	phys_addr_t addr1 = page_to_phys(vec1->bv_page) + vec1->bv_offset;
+	phys_addr_t addr2 = page_to_phys(vec2->bv_page) + vec2->bv_offset;
+
+	if (addr1 + vec1->bv_len != addr2)
+		return false;
+	// Comment out xen related code
+	/*
+	if (xen_domain() && !xen_biovec_phys_mergeable(vec1, vec2->bv_page))
+		return false;
+	*/
+	if ((addr1 | mask) != ((addr2 + vec2->bv_len - 1) | mask))
+		return false;
+	return true;
+}
+
+/* copy from blk_merge.c */
+static inline unsigned get_max_segment_size(const struct request_queue *q,
+                                            struct page *start_page,
+                                            unsigned long offset)
+{
+	unsigned long mask = queue_segment_boundary(q);
+
+	offset = mask & (page_to_phys(start_page) + offset);
+
+	/*
+	 * overflow may be triggered in case of zero page physical address
+	 * on 32bit arch, use queue's max segment size when that happens.
+	 */
+	return min_not_zero(mask - offset + 1,
+				(unsigned long)queue_max_segment_size(q));
+}
+
+static inline struct scatterlist *blk_next_sg(struct scatterlist **sg,
+                struct scatterlist *sglist)
+{
+        if (!*sg)
+                return sglist;
+
+        /*
+         * If the driver previously mapped a shorter list, we could see a
+         * termination bit prematurely unless it fully inits the sg table
+         * on each mapping. We KNOW that there must be more entries here
+         * or the driver would be buggy, so force clear the termination bit
+         * to avoid doing a full sg_init_table() in drivers for each command.
+         */
+        sg_unmark_end(*sg);
+        return sg_next(*sg);
+}
+
+/* only try to merge bvecs into one sg if they are from two bios */
+static inline bool
+__blk_segment_map_sg_merge(struct request_queue *q, struct bio_vec *bvec,
+                           struct bio_vec *bvprv, struct scatterlist **sg)
+{
+
+	int nbytes = bvec->bv_len;
+
+	if (!*sg)
+		return false;
+
+	if ((*sg)->length + nbytes > queue_max_segment_size(q))
+		return false;
+
+	if (!biovec_phys_mergeable(q, bvprv, bvec))
+		return false;
+
+	(*sg)->length += nbytes;
+
+	return true;
+}
+
+/*
+ * brmr_clt_get_iu() - Get an RMR I/O unit (iu)
+ *
+ * Description:
+ *	It gets an RMR I/O unit using rmr_clt_get_iu() and increments
+ *	the pool busy counter. It invokes rmr_clt_get_iu() with NO_WAIT
+ *	as brmr can requeue an I/O request.
+ *
+ *	Ref. brmr_add_to_requeue()
+ */
+static inline struct rmr_iu *brmr_clt_get_iu(struct brmr_clt_pool *pool, enum rmr_io_flags flag)
+{
+	struct rmr_iu *iu = rmr_clt_get_iu(pool->rmr, flag, NO_WAIT);
+	if (IS_ERR_OR_NULL(iu))
+		return iu;
+
+	atomic_inc(&pool->busy);
+
+	return iu;
+}
+
+/*
+ * brmr_clt_put_iu() - Put the RMR I/O unit (iu)
+ *
+ * Description:
+ *	It puts the RMR I/O unit using rmr_clt_put_iu() and decrements
+ *	the pool busy counter. It uses memory barrier to reflect the
+ *	busy counter.
+ *
+ *	Ref. brmr_add_to_requeue() and brmr_requeue_requests()
+ */
+static inline void brmr_clt_put_iu(struct brmr_clt_pool *pool, struct rmr_iu *iu)
+{
+	rmr_clt_put_iu(pool->rmr, iu);
+
+	atomic_dec(&pool->busy);
+	/*
+	 * Paired with brmr_add_to_requeue(). Decrement first
+	 * and then check queue bits.
+	 */
+	smp_mb__after_atomic();
+	brmr_requeue_requests(pool);
+}
+
+static void brmr_softirq_done_fn(struct request *rq)
+{
+	struct brmr_clt_iu *iu = blk_mq_rq_to_pdu(rq);
+	struct brmr_clt_dev *dev = iu->dev;
+
+	if (blk_rq_nr_phys_segments(rq))
+		sg_free_table_chained(&iu->sgt, BRMR_INLINE_SG_CNT);
+
+	brmr_clt_put_iu(dev->pool, iu->rmr_iu);
+	blk_mq_end_request(rq, iu->status);
+}
+
+static void brmr_request_conf(void *priv, int errno)
+{
+	struct brmr_clt_iu *iu = (struct brmr_clt_iu *)priv;
+	struct brmr_clt_dev *dev = iu->dev;
+	struct request *rq = iu->rq;
+
+	iu->status = (errno && errno != -ENOENT) ? BLK_STS_IOERR : BLK_STS_OK;
+
+	blk_mq_complete_request(rq);
+
+	if (errno == -ENOENT)
+		pr_debug("%s request for %s IGNORED err: %d\n",
+			 rq_op_to_str(rq), dev->gd->disk_name, errno);
+	else if (errno)
+		pr_err_ratelimited("%s request for %s failed with err: %d\n",
+				rq_op_to_str(rq), dev->gd->disk_name, errno);
+}
+
+static blk_status_t brmr_queue_rq(struct blk_mq_hw_ctx *hctx,
+				  const struct blk_mq_queue_data *bd)
+{
+	struct brmr_clt_dev *dev = bd->rq->q->disk->private_data;
+	struct brmr_clt_pool *pool = dev->pool;
+	struct brmr_clt_iu *iu = blk_mq_rq_to_pdu(bd->rq);
+	struct request *rq = bd->rq;
+	struct rmr_iu *rmr_iu;
+	unsigned int sg_cnt = 0;
+	size_t offset; size_t length;
+	enum rmr_io_flags flag;
+	unsigned short prio, seg;
+	int data_dir, err;
+	blk_status_t ret = BLK_STS_IOERR;
+
+	if (unlikely(dev->dev_state != DEV_STATE_READY))
+		return ret;
+
+	iu->rq = rq;
+	iu->dev = dev;
+
+	offset = blk_rq_pos(rq) << SECTOR_SHIFT;
+	length = blk_rq_bytes(rq);
+	flag = rq_to_rmr_flags(rq);
+	prio = req_get_ioprio(rq);
+	data_dir = rq_data_dir(rq);
+
+	rmr_iu = brmr_clt_get_iu(pool, flag);
+	if (unlikely(rmr_iu == NULL)) {
+		pr_debug("Got no tag to send a request to rmr_clt\n");
+
+		/* Increment statistic counter for it */
+		brmr_clt_update_sts_resource(&dev->stats, 0);
+
+		if (!brmr_add_to_requeue(pool, hctx->driver_data))
+			/*
+			 * TODO unlikely
+			 * Restarting queue with some delay is a stupid way
+			 * of handling resource contentions
+			 */
+			blk_mq_delay_run_hw_queue(hctx, BRMR_DELAY_10ms);
+
+		return BLK_STS_RESOURCE;
+	}
+	if (IS_ERR(rmr_iu)) {
+		pr_err("Error %pe when reserving resources for io in pool %s\n",
+		       rmr_iu, pool->rmr->poolname);
+		return BLK_STS_IOERR;
+	}
+	iu->rmr_iu = rmr_iu;
+
+	iu->sgt.sgl = iu->sgl;
+	seg = blk_rq_nr_phys_segments(rq);
+	if (seg) {
+		err = sg_alloc_table_chained(&iu->sgt, seg, iu->sgt.sgl, BRMR_INLINE_SG_CNT);
+		if (err) {
+			pr_err("sg_alloc_table_chained failed, ret=%x\n", err);
+			blk_mq_delay_run_hw_queue(hctx, BRMR_DELAY_10ms);
+			brmr_clt_put_iu(pool, rmr_iu);
+			return BLK_STS_RESOURCE;
+		}
+	}
+
+	/* We only support discards with single segment and write_zeroes request with no segment. */
+	/* See queue limits. */
+	if ((req_op(rq) != REQ_OP_DISCARD) && (req_op(rq) != REQ_OP_WRITE_ZEROES))
+		sg_cnt = blk_rq_map_sg(rq, iu->sgt.sgl);
+
+	blk_mq_start_request(rq);
+	brmr_update_stats(&dev->stats, length, 0, data_dir);
+
+	pr_debug("brmr %s request with flag %x offset %lu length %lu sg_cnt: %d\n",
+		 rq_op_to_str(rq), flag, offset, length, sg_cnt);
+
+	err = rmr_clt_request(pool->rmr, rmr_iu, offset, length, flag, prio,
+			      iu, brmr_request_conf, iu->sgt.sgl, sg_cnt);
+	if (likely(err == 0))
+		return BLK_STS_OK;
+
+	pr_err_ratelimited("sending %s request for %s failed with err: %d\n",
+			   rq_op_to_str(rq), dev->gd->disk_name, err);
+
+	if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
+		pr_debug("Got resource error %d when sending a request to rmr_clt\n", err);
+
+		brmr_clt_update_sts_resource(&dev->stats, 3);
+		blk_mq_delay_run_hw_queue(hctx, BRMR_DELAY_10ms);
+
+		ret = BLK_STS_RESOURCE;
+	} else {
+		ret = BLK_STS_IOERR;
+	}
+
+	if (seg)
+		sg_free_table_chained(&iu->sgt, BRMR_INLINE_SG_CNT);
+
+	brmr_clt_put_iu(pool, rmr_iu);
+	return ret;
+}
+
+static struct blk_mq_ops brmr_mq_ops = {
+	.queue_rq	= brmr_queue_rq,
+	.complete	= brmr_softirq_done_fn,
+};
+
+static struct brmr_clt_pool *brmr_clt_create_pool(const char *poolname)
+{
+	struct brmr_clt_pool *pool;
+	int err;
+	struct rmr_attrs attrs;
+
+	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+	if (!pool)
+		return ERR_PTR(-ENOMEM);
+
+	pool->rmr = rmr_clt_open(pool, NULL, poolname);
+	if (IS_ERR_OR_NULL(pool->rmr)) {
+		err = PTR_ERR(pool->rmr);
+		goto free_pool;
+	}
+	err = rmr_clt_query(pool->rmr, &attrs);
+	if (unlikely(err))
+		goto close_rmr;
+
+	pool->queue_depth = attrs.queue_depth;
+	pool->max_io_size = attrs.max_io_size;
+	pool->chunk_size = attrs.chunk_size;
+	pool->max_segments = attrs.max_segments;
+
+	snprintf(pool->poolname, sizeof(pool->poolname), "%s", poolname);
+
+	/*
+	 * When opening a new pool, allocate mq tags for that pool - they are
+	 * going to be shared among all devices opened in that pool
+	 */
+	pool->tag_set.ops		= &brmr_mq_ops;
+	pool->tag_set.queue_depth	= pool->queue_depth;
+	pool->tag_set.numa_node		= NUMA_NO_NODE;
+	pool->tag_set.flags		= BLK_MQ_F_TAG_QUEUE_SHARED;
+	pool->tag_set.cmd_size		= sizeof(struct brmr_clt_iu) + BRMR_RDMA_SGL_SIZE;
+	pool->tag_set.nr_hw_queues	= num_online_cpus();
+
+	err = blk_mq_alloc_tag_set(&pool->tag_set);
+	if (unlikely(err))
+		goto close_rmr;
+
+	refcount_set(&pool->refcount, 1);
+
+	atomic_set(&pool->busy, 0);
+	bitmap_zero(pool->cpu_queues_bm, NR_CPUS);
+	pool->cpu_rr = alloc_percpu(int);
+	if (unlikely(!pool->cpu_rr)) {
+		pr_err("Failed to alloc percpu var (cpu_rr)\n");
+		err = -ENOMEM;
+		goto free_tag_set;
+	}
+	pool->cpu_queues = alloc_percpu(struct brmr_cpu_qlist);
+	if (unlikely(!pool->cpu_queues)) {
+		pr_err("Failed to alloc percpu var (cpu_queues)\n");
+		err = -ENOMEM;
+		goto free_cpu_rr;
+	}
+	brmr_init_cpu_qlists(pool->cpu_queues);
+	return pool;
+free_cpu_rr:
+	free_percpu(pool->cpu_rr);
+free_tag_set:
+	blk_mq_free_tag_set(&pool->tag_set);
+close_rmr:
+	rmr_clt_close(pool->rmr);
+free_pool:
+	kfree(pool);
+
+	return ERR_PTR(err);
+}
+
+static void brmr_clt_free_pool(struct brmr_clt_pool *pool)
+{
+	free_percpu(pool->cpu_queues);
+	pool->cpu_queues = NULL;
+	free_percpu(pool->cpu_rr);
+	pool->cpu_rr = NULL;
+	blk_mq_free_tag_set(&pool->tag_set);
+	rmr_clt_close(pool->rmr);
+	kfree(pool);
+}
+
+static void brmr_clt_put_pool(struct brmr_clt_pool *pool)
+{
+	if (refcount_dec_and_test(&pool->refcount))
+		brmr_clt_free_pool(pool);
+	else
+		rmr_clt_put_pool(pool->rmr);
+}
+
+static inline bool brmr_clt_get_dev(struct brmr_clt_dev *dev)
+{
+	return refcount_inc_not_zero(&dev->refcount);
+}
+
+void brmr_clt_put_dev(struct brmr_clt_dev *dev)
+{
+	might_sleep();
+
+	if (refcount_dec_and_test(&dev->refcount)) {
+
+		mutex_lock(&ida_lock);
+		ida_free(&index_ida, dev->idx);
+		mutex_unlock(&ida_lock);
+
+		kfree(dev->hw_queues);
+
+		brmr_clt_put_pool(dev->pool);
+
+		if (!list_empty(&dev->list)) {
+			mutex_lock(&brmr_device_lock);
+			list_del(&dev->list);
+			mutex_unlock(&brmr_device_lock);
+		}
+		kfree(dev);
+	}
+}
+
+static int brmr_open(struct gendisk *disk, blk_mode_t mode)
+{
+	struct brmr_clt_dev *dev = disk->private_data;
+
+	if (READ_ONCE(dev->dev_state) != DEV_STATE_READY)
+		return -EIO;
+
+	if (!brmr_clt_get_dev(dev))
+		return -EIO;
+
+	return 0;
+}
+
+static void brmr_release(struct gendisk *gen)
+{
+	struct brmr_clt_dev *dev = gen->private_data;
+
+	brmr_clt_put_dev(dev);
+}
+
+#if 0
+static int brmr_getgeo(struct block_device *block_device,
+		       struct hd_geometry *geo)
+{
+	struct brmr_clt_dev *dev = block_device->bd_disk->private_data;
+
+	geo->cylinders	= (dev->size_sect & ~0x3f) >> 6;	/* size/64 */
+	geo->heads	= 4;
+	geo->sectors	= 16;
+	geo->start	= 0;
+
+	return 0;
+}
+#endif
+
+static const struct block_device_operations brmr_ops = {
+	.owner		= THIS_MODULE,
+	.open		= brmr_open,
+	.release	= brmr_release,
+	/*.getgeo		= brmr_getgeo,*/
+};
+
+/**
+ * brmr_clt_init_cmd() - Initialize message command
+ *
+ * @msg:	command message where to init
+ */
+static void brmr_clt_init_cmd(struct brmr_msg_cmd *msg)
+{
+	memset(msg, 0, sizeof(*msg));
+
+	msg->hdr.type = cpu_to_le16(BRMR_MSG_CMD);
+	msg->hdr.__padding = 0;
+	msg->ver = BRMR_PROTO_VER_MAJOR;
+}
+
+/**
+ * brmr_cmd_conf() - Confirmation function for brmr command message
+ *
+ * @priv:	priv pointer to brmr command private data
+ * @errno:	error number passed from RMR.
+ *		See description of errno in RMR function.
+ *
+ * Description:
+ *	Command response for a map new command can fail on multiple levels.
+ *	If RMR fails to send the message to any or one of the nodes, that would reflect on the
+ *	errno. If the command fails on BRMR level, that would reflect on the rsp struct.
+ *	The error number will be used differently by different commands accordingly.
+ */
+static void brmr_clt_cmd_conf(void *priv, int errno)
+{
+	struct brmr_cmd_priv *cmd_priv = (struct brmr_cmd_priv *)priv;
+
+	switch (cmd_priv->cmd_type) {
+	case BRMR_CMD_MAP:
+		pr_info("%s: BRMR_CMD_MAP err=%d\n", __func__, errno);
+		cmd_priv->errno = errno;
+		break;
+	case BRMR_CMD_REMAP:
+		pr_info("%s: BRMR_CMD_REMAP err=%d\n", __func__, errno);
+		break;
+	case BRMR_CMD_UNMAP:
+		pr_info("%s: BRMR_CMD_UNMAP err=%d\n", __func__, errno);
+		/*
+		 * No processing needed here.
+		 */
+		break;
+	case BRMR_CMD_GET_PARAMS:
+		pr_info("%s: BRMR_CMD_GET_PARAMS err=%d\n", __func__, errno);
+		if (errno)
+			cmd_priv->errno = errno;
+		break;
+
+	default:
+		pr_err("%s: Unknown command type %d err=%d\n", __func__, cmd_priv->cmd_type, errno);
+	}
+
+	complete(&cmd_priv->complete_done);
+}
+
+/**
+ * brmr_clt_send_msg_cmd() - Sends command message to rmr pool
+ *
+ * @dev:		pointer to brmr device
+ * @msg:		msg struct to be sent
+ * @rsp_buf:		response buffer where the response of the storage side is stored
+ * @rsp_buf_len:	length of the response buffer
+ *
+ * Return:
+ *	Negative if failed to sent command
+ *	As handled by each command in brmr_clt_cmd_conf, if succeeded to send command
+ *
+ * Context:
+ *	Would block until response is received
+ */
+static int brmr_clt_send_msg_cmd(struct brmr_clt_dev *dev, struct brmr_msg_cmd *msg, void *rsp_buf,
+			     size_t rsp_buf_len)
+{
+	struct brmr_cmd_priv cmd_priv;
+	struct kvec vec;
+	int ret;
+
+	vec = (struct kvec) {
+		.iov_base = msg,
+		.iov_len  = sizeof(*msg)
+	};
+
+	cmd_priv.dev = dev;
+	cmd_priv.cmd_type = msg->cmd_type;
+	cmd_priv.rsp_buf = rsp_buf;
+	cmd_priv.rsp_buf_len = rsp_buf_len;
+	cmd_priv.errno = 0;
+	init_completion(&cmd_priv.complete_done);
+
+	ret = rmr_clt_cmd_with_rsp(dev->pool->rmr, brmr_clt_cmd_conf, &cmd_priv, &vec, 1, rsp_buf,
+				   rsp_buf_len, sizeof(struct brmr_msg_cmd_rsp));
+
+	if (!ret) {
+		wait_for_completion(&cmd_priv.complete_done);
+		ret = cmd_priv.errno;
+	}
+
+	return ret;
+}
+
+static struct brmr_clt_dev *brmr_alloc_and_init_dev(struct brmr_clt_pool *pool,
+						u64 size)
+{
+	struct brmr_clt_dev *dev;
+	struct brmr_queue *q;
+	struct blk_mq_hw_ctx *hctx;
+	int ret;
+	unsigned long i;
+
+	/*
+	 * alloc device structure
+	 */
+	dev = kzalloc(sizeof(*dev), GFP_KERNEL);
+	if (!dev) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	INIT_LIST_HEAD(&dev->list);
+	dev->size_sect = size;
+	dev->pool = pool;
+	dev->dev_state = DEV_STATE_INIT;
+	dev->map_incomplete = false;
+	refcount_set(&dev->refcount, 1);
+
+	/*
+	 * Alloc a "queue" per cpu
+	 */
+	dev->hw_queues = kcalloc(nr_cpu_ids,
+				 sizeof(*dev->hw_queues), GFP_KERNEL);
+	if (unlikely(!dev->hw_queues)) {
+		ret = -ENOMEM;
+		goto free_dev;
+	}
+
+	/*
+	 * Get an id to be used in /dev/brmr<idx>
+	 */
+	mutex_lock(&ida_lock);
+	ret = ida_alloc_range(&index_ida, 0, minor_to_index(1 << MINORBITS) - 1,
+			      GFP_KERNEL);
+	mutex_unlock(&ida_lock);
+	if (ret < 0) {
+		pr_err("%s: ida_alloc_range() failed for pool %s, err: %d\n",
+		       __func__, pool->poolname, ret);
+		goto free_queues;
+	}
+	dev->idx = ret;
+
+	/*
+	 * Init mq queue
+	 */
+	dev->gd = blk_mq_alloc_disk(&pool->tag_set, NULL, dev);
+	if (IS_ERR(dev->gd)) {
+		ret = PTR_ERR(dev->gd);
+		pr_err("Failed to initialize mq: %pe\n", dev->queue);
+		goto remove_ida;
+	}
+	dev->queue = dev->gd->queue;
+
+	/*
+	 * Assign hardware contexts to our queues
+	 */
+	queue_for_each_hw_ctx(dev->queue, hctx, i) {
+		q = &dev->hw_queues[i];
+		INIT_LIST_HEAD(&q->requeue_list);
+		q->hctx = hctx;
+		hctx->driver_data = q;
+	}
+
+	return dev;
+
+remove_ida:
+	mutex_lock(&ida_lock);
+	ida_free(&index_ida, dev->idx);
+	mutex_unlock(&ida_lock);
+free_queues:
+	kfree(dev->hw_queues);
+free_dev:
+	kfree(dev);
+out:
+	return ERR_PTR(ret);
+}
+
+static int brmr_set_dev_params(struct brmr_clt_dev *dev)
+{
+	struct brmr_clt_pool *pool = dev->pool;
+	u32 chunk_size = brmr_pool_chunk_size(pool);
+	struct queue_limits lim;
+	int ret;
+
+	/* Aligns requests with the chunks in rmr client */
+	if (!is_power_of_2(chunk_size >> SECTOR_SHIFT)) {
+		pr_err("%u not a power of 2!\n", chunk_size);
+		return -EINVAL;
+	}
+
+	/*
+	 * Set request queue parameters via queue_limits API
+	 */
+	lim = queue_limits_start_update(dev->queue);
+	lim.logical_block_size = dev->logical_block_size;
+	lim.physical_block_size = dev->physical_block_size;
+	lim.max_segments = dev->max_segments;
+	lim.max_hw_sectors = dev->max_hw_sectors;
+	lim.max_write_zeroes_sectors = dev->max_write_zeroes_sectors;
+	lim.io_opt = brmr_pool_chunk_size(pool);
+	lim.chunk_sectors = chunk_size >> SECTOR_SHIFT;
+
+	/* however we don't support discards to */
+	/* discontiguous segments in one request */
+	lim.max_discard_segments = 1;
+	lim.max_hw_discard_sectors = dev->max_discard_sectors;
+	if (dev->secure_discard)
+		lim.max_secure_erase_sectors = dev->max_discard_sectors;
+
+	lim.discard_granularity = dev->discard_granularity;
+	lim.discard_alignment = dev->discard_alignment;
+
+	/* needed for ibtrs_map_sg_fr to work */
+	lim.virt_boundary_mask = SZ_4K - 1;
+
+	/* non-rotational device */
+	lim.features &= ~BLK_FEAT_ROTATIONAL;
+
+	if (dev->wc)
+		lim.features |= BLK_FEAT_WRITE_CACHE;
+	if (dev->fua)
+		lim.features |= BLK_FEAT_FUA;
+
+	ret = queue_limits_commit_update(dev->queue, &lim);
+	if (ret)
+		goto err;
+
+	blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
+	blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
+
+	ret = brmr_clt_init_stats(&dev->stats);
+	if (unlikely(ret))
+		goto err;
+
+	dev->gd->major = brmr_major;
+	dev->gd->minors = 1 << BRMR_PART_BITS;
+	dev->gd->first_minor = index_to_minor(dev->idx);
+	dev->gd->fops = &brmr_ops;
+	dev->gd->queue = dev->queue;
+	dev->gd->private_data = dev;
+	snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name),
+		 "brmr%d", dev->idx);
+	set_capacity(dev->gd, dev->size_sect);
+
+	return 0;
+
+err:
+	return ret;
+}
+
+/**
+ * brmr_get_remote_dev_params() - Gets device params from storage nodes
+ *
+ * @dev:		pointer to brmr device
+ *
+ * Description:
+ *	Does the following (sanity) checks
+ *	1) For an unmapped device, param get should succeed on all legs
+ *	2) There should not be a mixture of mapped and unmapped devices
+ *
+ *	In addition to above, it also does the following work
+ *	1) For a mapped device, read from a single leg is enough for success
+ *	2) For an unmapped device, it does validation checks for params for every leg
+ *
+ * Return:
+ *	Negative in case of failure
+ *	0 for success, and a non-mapped device is found
+ *	1 for success, and a mapped device is found
+ *
+ * Context:
+ *	Would block until response is received
+ */
+static int brmr_get_remote_dev_params(struct brmr_clt_dev *dev)
+{
+	struct brmr_clt_pool *pool = dev->pool;
+	struct brmr_msg_cmd msg;
+	struct brmr_msg_cmd_rsp *brmr_cmd_rsp;
+	void *rsp_buf;
+	size_t rsp_buf_len;
+	int err = 0, i;
+	bool partial_fail = false, mapped = false;
+
+	brmr_clt_init_cmd(&msg);
+	msg.cmd_type = BRMR_CMD_GET_PARAMS;
+
+	rsp_buf_len = sizeof(struct brmr_msg_cmd_rsp) * RMR_POOL_MAX_SESS;
+	rsp_buf = kzalloc(rsp_buf_len, GFP_KERNEL);
+	if (!rsp_buf)
+		return -ENOMEM;
+
+	err = brmr_clt_send_msg_cmd(dev, &msg, rsp_buf, rsp_buf_len);
+	if (err < 0) {
+		pr_err("%s: brmr_clt_send_msg_cmd failed with errno %d\n", __func__, err);
+		goto free_data;
+	} else if (err) {
+		/*
+		 * We cannot directly fail here, since we do not know if this is a map for a
+		 * newly created device, or for one which has gone through mapping before.
+		 *
+		 * For the former, any failure should end in the whole map process failing.
+		 * For the latter, a single read from a device with mapped state set should
+		 * be enough for us to go ahead and map.
+		 */
+		partial_fail = true;
+	}
+
+	/*
+	 * Lets do the sanity check first, because combining it with param checks makes the
+	 * entire loop harder to read
+	 */
+	for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+		struct brmr_cmd_get_params_rsp *get_params_rsp;
+
+		brmr_cmd_rsp = ((struct brmr_msg_cmd_rsp *)rsp_buf) + i;
+
+		/*
+		 * We do not need to worry about not seeing MAGIC.
+		 * This would happen for a non-working sessions, OR
+		 * for extra sessions in the end for which there are no legs in RMR (Don't care)
+		 *
+		 * For non-working sessions, we will be notified by RMR through the return value
+		 */
+		if (brmr_cmd_rsp->magic != BRMR_CMD_RSP_MAGIC)
+			continue;
+
+		/*
+		 * This is error returned by rmr-store.
+		 */
+		if (brmr_cmd_rsp->status)
+			partial_fail = true;
+
+		get_params_rsp = &brmr_cmd_rsp->get_params_rsp;
+
+		/*
+		 * If we find a mapped device, we save that info.
+		 */
+		if (get_params_rsp->mapped)
+			mapped = true;
+	}
+
+	/*
+	 * If there is no device mapped, it means that this is the first map after device creation
+	 * In such a case, we need all sessions to be up and running.
+	 */
+	if (mapped == false && partial_fail) {
+		pr_err("%s: Mapping first time, but got failure for some sessions\n", __func__);
+		err = -EINVAL;
+		goto free_data;
+	}
+
+	for (i = 0; i < RMR_POOL_MAX_SESS; i++) {
+		struct brmr_cmd_get_params_rsp *get_params_rsp;
+		struct brmr_blk_dev_params *rsp_dev_params;
+
+		brmr_cmd_rsp = ((struct brmr_msg_cmd_rsp *)rsp_buf) + i;
+
+		/*
+		 * We are tracking partial failures through the above loop, so
+		 * ignore it here.
+		 */
+		if (brmr_cmd_rsp->magic != BRMR_CMD_RSP_MAGIC ||
+		    brmr_cmd_rsp->status)
+			continue;
+
+		get_params_rsp = &brmr_cmd_rsp->get_params_rsp;
+
+		/*
+		 * We cheat a little, and do this sanity check here.
+		 *
+		 * If even a single device was mapped, and we have sessions with non-mapped
+		 * devices, it will be wrong to go forward with brmr map.
+		 */
+		if (mapped && !get_params_rsp->mapped) {
+			/*
+			 * This can only happen if a node went down and up.
+			 * And instead of re-adding a MAPPED device, a create was called
+			 * We cannot allow map this way, since this means discard could
+			 * have been skipped.
+			 */
+			pr_err("%s: Mixed combination of mapped+unmapped metadata found\n",
+			       __func__);
+			err = -EINVAL;
+			goto free_data;
+		}
+
+		/*
+		 * The device size_sect, which is the size provided by the user in the map
+		 * command, should be same as the mapped_size of every storage node's backend
+		 * device, which was provided during create_store.
+		 */
+		if (dev->size_sect != le64_to_cpu(get_params_rsp->mapped_size)) {
+			pr_err("%s: Mismatched mapped_size: (Provide) %llu != %llu (Remote)\n",
+			       __func__, dev->size_sect, le64_to_cpu(get_params_rsp->mapped_size));
+			err = -EINVAL;
+			goto free_data;
+		}
+
+		rsp_dev_params = &get_params_rsp->dev_params;
+
+		dev->max_write_zeroes_sectors = min_not_zero(
+						dev->max_write_zeroes_sectors,
+						le32_to_cpu(
+						rsp_dev_params->max_write_zeroes_sectors));
+		dev->max_discard_sectors = min_not_zero(brmr_pool_chunk_size(pool) >> SECTOR_SHIFT,
+						le32_to_cpu(rsp_dev_params->max_discard_sectors));
+		dev->physical_block_size = max_t(u16, dev->physical_block_size,
+						 le16_to_cpu(rsp_dev_params->physical_block_size));
+		dev->logical_block_size = max_t(u16, dev->logical_block_size,
+						le16_to_cpu(rsp_dev_params->logical_block_size));
+
+		dev->discard_granularity = dev->logical_block_size;
+		dev->discard_alignment = dev->logical_block_size;
+
+		/* secure_discard is actually true or false, but since we used
+		 * __le16 to transfer this value in msg, min_t should work fine here
+		 */
+		dev->secure_discard = min_t(u16, dev->secure_discard,
+					    le16_to_cpu(rsp_dev_params->secure_discard));
+
+		dev->cache_policy = rsp_dev_params->cache_policy;
+		dev->wc = !!(rsp_dev_params->cache_policy & BRMR_WRITEBACK);
+		dev->fua = !!(rsp_dev_params->cache_policy & BRMR_FUA);
+	}
+
+	/* max segments and max_hw_sectors we get from rtrs sessions values
+	 * stored in pool like in RNBD, not from bdev of the store side.
+	 */
+	dev->max_segments = pool->max_segments;
+	dev->max_hw_sectors = pool->max_io_size / SECTOR_SIZE;
+
+	/*
+	 * Return whether its a new map or an old one
+	 */
+	err = mapped;
+
+free_data:
+	kfree(rsp_buf);
+
+	return err;
+}
+
+/**
+ * brmr_clt_send_map_cmd() - Sends map command for a brmr device
+ *
+ * @dev:	pointer to brmr device
+ *
+ * Return:
+ *	Negative error value in case of failure
+ *	0 on success
+ *
+ * Context:
+ *	Would block until response is received
+ */
+static int brmr_clt_send_map_cmd(struct brmr_clt_dev *dev)
+{
+	struct brmr_clt_pool *pool = dev->pool;
+	struct brmr_msg_cmd msg;
+	struct brmr_blk_dev_params *dev_params = &(msg.map_new_cmd.dev_params);
+	void *rsp_buf;
+	size_t rsp_buf_len;
+	int err = 0;
+
+	brmr_clt_init_cmd(&msg);
+	msg.cmd_type = BRMR_CMD_MAP;
+
+	rsp_buf_len = sizeof(struct brmr_msg_cmd_rsp) * RMR_POOL_MAX_SESS;
+	rsp_buf = kzalloc(rsp_buf_len, GFP_KERNEL);
+	if (!rsp_buf)
+		return -ENOMEM;
+
+	msg.map_new_cmd.version = BRMR_CURRENT_HEADER_VERSION;
+	msg.map_new_cmd.mapped_size = dev->size_sect;
+
+	dev_params->max_hw_sectors = cpu_to_le32(dev->max_hw_sectors);
+	dev_params->max_write_zeroes_sectors = cpu_to_le32(dev->max_write_zeroes_sectors);
+	dev_params->max_discard_sectors = cpu_to_le32(dev->max_discard_sectors);
+	dev_params->discard_granularity = cpu_to_le32(dev->discard_granularity);
+	dev_params->discard_alignment = cpu_to_le32(dev->discard_alignment);
+	dev_params->physical_block_size = cpu_to_le16(dev->physical_block_size);
+	dev_params->logical_block_size = cpu_to_le16(dev->logical_block_size);
+	dev_params->max_segments = cpu_to_le16(dev->max_segments);
+	dev_params->secure_discard = cpu_to_le16(dev->secure_discard);
+	dev_params->cache_policy = dev->cache_policy;
+
+	err = brmr_clt_send_msg_cmd(dev, &msg, rsp_buf, rsp_buf_len);
+	if (err)
+		pr_err("Failed to send cmd msg BRMR_CMD_MAP in pool %s, err=%d\n",
+		       pool->poolname, err);
+
+	kfree(rsp_buf);
+	return err;
+}
+
+/*
+ * brmr_clt_send_unmap_cmd() - Send an unmap command to the server pool
+ *
+ * Sending may fail (e.g. no sessions connected). The failure is logged but
+ * not propagated — callers always continue with local cleanup regardless.
+ */
+static void brmr_clt_send_unmap_cmd(struct brmr_clt_dev *dev)
+{
+	struct brmr_msg_cmd msg;
+	void *rsp_buf;
+	size_t rsp_buf_len;
+	int ret;
+
+	brmr_clt_init_cmd(&msg);
+	msg.cmd_type = BRMR_CMD_UNMAP;
+
+	rsp_buf_len = sizeof(struct brmr_msg_cmd_rsp) * RMR_POOL_MAX_SESS;
+	rsp_buf = kzalloc(rsp_buf_len, GFP_KERNEL);
+	if (!rsp_buf) {
+		pr_err("Failed to alloc rsp_buf for unmap in pool %s\n",
+		       dev->pool->poolname);
+		return;
+	}
+
+	/*
+	 * Sending messages could fail. For example, there are no client pool sessions
+	 * connected to this pool. Unmap_dev still progresses and cleans up the device
+	 * states on the client side.
+	 */
+	ret = brmr_clt_send_msg_cmd(dev, &msg, rsp_buf, rsp_buf_len);
+	if (ret)
+		pr_err("Error %d when unmap device in pool %s\n",
+		       ret, dev->pool->poolname);
+
+	kfree(rsp_buf);
+}
+
+/**
+ * brmr_clt_map_device() - Maps brmr device through an rmr pool
+ *
+ * @id:	Id for the device
+ * @poolname:	rmr poolname which is to be used for mapping
+ * @size:	Size of the disk
+ *
+ * Description:
+ *	Opens rmr pool with pool name "poolname"
+ *	Allocated brmr device and initializes it
+ *	Maps brmr device using the rmr pool only if its not already mapped
+ *
+ * Return:
+ *	Pointer to allocated and mapped brmr device on success
+ *	Error pointer on failure
+ */
+struct brmr_clt_dev *brmr_clt_map_device(const char *poolname, u64 size)
+{
+	struct brmr_clt_pool *pool = NULL;
+	struct brmr_clt_dev *dev;
+	int ret, mapped;
+
+	/* Create brmr pool */
+	pool = brmr_clt_create_pool(poolname);
+	if (IS_ERR(pool)) {
+		ret = PTR_ERR(pool);
+		goto err_out;
+	}
+
+	/* Alloc device */
+	dev = brmr_alloc_and_init_dev(pool, size);
+	if (IS_ERR(dev)) {
+		pr_err("Error %pe allocating brmr device in pool %s\n",
+			dev, pool->poolname);
+		brmr_clt_put_pool(pool);
+		ret = PTR_ERR(dev);
+		goto err_out;
+	}
+
+	mapped = brmr_get_remote_dev_params(dev);
+	if (mapped < 0) {
+		pr_err("Failed to get remote devs block params in pool %s, err=%d\n",
+		       pool->poolname, mapped);
+		ret = mapped;
+		goto dest_dev;
+	}
+
+	/* Set device params */
+	ret = brmr_set_dev_params(dev);
+	if (unlikely(ret)) {
+		pr_err("Error %d brmr_set_dev_params in pool %s\n",
+		       ret, pool->poolname);
+		goto dest_dev;
+	}
+
+	/*
+	 * We send map command only if its a new map.
+	 * This must happen before add_disk() so the server is ready to serve
+	 * I/O by the time the kernel probes the partition table.
+	 */
+	if (!mapped) {
+		pr_info("%s: Sending map command through pool %s\n", __func__, pool->poolname);
+		ret = brmr_clt_send_map_cmd(dev);
+		if (ret) {
+			pr_err("Failed to send map cmd to pool %s, err=%d\n",
+			       pool->poolname, ret);
+			goto put_disk;
+		}
+	}
+
+	dev->dev_state = DEV_STATE_READY;
+
+	/*
+	 * Add gendisk
+	 */
+	ret = add_disk(dev->gd);
+	if (ret) {
+		pr_err("%s: add_disk failed with err %d\n", __func__, ret);
+		goto unmap_dev;
+	}
+
+	mutex_lock(&brmr_device_lock);
+	list_add(&dev->list, &brmr_device_list);
+	mutex_unlock(&brmr_device_lock);
+
+	return dev;
+
+unmap_dev:
+	dev->dev_state = DEV_STATE_INIT;
+	if (!mapped)
+		brmr_clt_send_unmap_cmd(dev);
+put_disk:
+	put_disk(dev->gd);
+	brmr_clt_free_stats(&dev->stats);
+dest_dev:
+	brmr_clt_put_dev(dev);
+err_out:
+	return ERR_PTR(ret);
+}
+
+static void destroy_gen_disk(struct brmr_clt_dev *dev)
+{
+	unsigned int memflags;
+
+	del_gendisk(dev->gd);
+	/*
+	 * Before marking queue as dying (blk_cleanup_queue() does that)
+	 * we have to be sure that everything in-flight has gone.
+	 * Blink with freeze/unfreeze.
+	 */
+	memflags = blk_mq_freeze_queue(dev->queue);
+	blk_mq_unfreeze_queue(dev->queue, memflags);
+	put_disk(dev->gd);
+}
+
+/**
+ * brmr_clt_close_device() - Closes a brmr device
+ *
+ * @dev:		pointer to brmr device to close
+ * @sysfs_self:	pointer to sysfs attribute
+ *
+ * Return:
+ *	0 in case of success
+ *	negative in case of failure
+ */
+int brmr_clt_close_device(struct brmr_clt_dev *dev,
+		      const struct attribute *sysfs_self)
+{
+	dev->dev_state = DEV_STATE_CLOSING;
+	destroy_gen_disk(dev);
+	brmr_clt_send_unmap_cmd(dev);
+	sysfs_remove_link(&dev->kobj, BRMR_LINK_NAME);
+
+	if (sysfs_self)
+		brmr_clt_destroy_dev_sysfs_files(dev, sysfs_self);
+
+	brmr_clt_free_stats(&dev->stats);
+	brmr_clt_put_dev(dev);
+
+	return 0;
+}
+
+struct brmr_clt_dev *find_and_get_device(const char *name)
+{
+	struct brmr_clt_dev *dev;
+
+	mutex_lock(&brmr_device_lock);
+	list_for_each_entry(dev, &brmr_device_list, list) {
+		if (strncasecmp(dev->pool->poolname, name, NAME_MAX))
+			continue;
+
+		if (brmr_clt_get_dev(dev)) {
+			mutex_unlock(&brmr_device_lock);
+			return dev;
+		}
+	}
+	mutex_unlock(&brmr_device_lock);
+
+	return NULL;
+}
+
+static int __init brmr_client_init(void)
+{
+	int err;
+
+	pr_info("Loading module %s, version %s\n",
+		KBUILD_MODNAME, BRMR_VER_STRING);
+
+	brmr_major = register_blkdev(brmr_major, "brmr");
+	if (brmr_major <= 0) {
+		pr_err("Failed to load module,"
+		       " block device registration failed\n");
+		err = -EBUSY;
+		goto out;
+	}
+
+	err = brmr_clt_create_sysfs_files();
+out:
+	return err;
+}
+
+static void __exit brmr_client_exit(void)
+{
+	struct brmr_clt_dev *dev, *tmp;
+
+	pr_info("Unloading module\n");
+
+	brmr_clt_destroy_sysfs_files();
+	unregister_blkdev(brmr_major, "brmr");
+
+	list_for_each_entry_safe(dev, tmp, &brmr_device_list, list) {
+		brmr_clt_close_device(dev, NULL);
+	}
+
+	ida_destroy(&index_ida);
+
+	pr_info("Module %s unloaded\n", KBUILD_MODNAME);
+}
+
+module_init(brmr_client_init);
+module_exit(brmr_client_exit);
-- 
2.43.0


  parent reply	other threads:[~2026-05-05  7:47 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-05  7:46 [LSF/MM/BPF RFC PATCH 00/13] Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 01/13] RDMA/rmr: add public and private headers Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 02/13] RDMA/rmr: add shared library code (pool, map, request) Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 03/13] RDMA/rmr: client: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 04/13] RDMA/rmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 05/13] RDMA/rmr: server: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 06/13] RDMA/rmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 07/13] RDMA/rmr: include client and server modules into kernel compilation Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 08/13] block/brmr: add private headers with brmr protocol structs and helpers Md Haris Iqbal
2026-05-05  7:46 ` Md Haris Iqbal [this message]
2026-05-05  7:46 ` [PATCH 10/13] block/brmr: client: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 11/13] block/brmr: server: main functionality Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 12/13] block/brmr: server: sysfs interface functions Md Haris Iqbal
2026-05-05  7:46 ` [PATCH 13/13] block/brmr: include client and server modules into kernel compilation Md Haris Iqbal

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260505074644.195453-10-haris.iqbal@ionos.com \
    --to=haris.iqbal@ionos.com \
    --cc=axboe@kernel.dk \
    --cc=bvanassche@acm.org \
    --cc=hch@lst.de \
    --cc=jgg@ziepe.ca \
    --cc=jia.li@ionos.com \
    --cc=jinpu.wang@ionos.com \
    --cc=leon@kernel.org \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-rdma@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox